Skip to content
Snippets Groups Projects
Commit 4601f11c authored by Lorenz Boguhn's avatar Lorenz Boguhn
Browse files

Uc2 hazelcastjet: change window time to Duration

parent da545d35
Branches
Tags
1 merge request!275Refactor hazelcast jet benchmarks:
...@@ -2,6 +2,8 @@ package rocks.theodolite.benchmarks.uc2.hazelcastjet; ...@@ -2,6 +2,8 @@ package rocks.theodolite.benchmarks.uc2.hazelcastjet;
import com.google.common.math.StatsAccumulator; import com.google.common.math.StatsAccumulator;
import io.confluent.kafka.serializers.KafkaAvroDeserializer; import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import java.time.Duration;
import java.util.Properties; import java.util.Properties;
import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.serialization.StringSerializer;
...@@ -39,17 +41,16 @@ public class HistoryService extends HazelcastJetService { ...@@ -39,17 +41,16 @@ public class HistoryService extends HazelcastJetService {
final String kafkaOutputTopic = final String kafkaOutputTopic =
config.getProperty(ConfigurationKeys.KAFKA_OUTPUT_TOPIC).toString(); config.getProperty(ConfigurationKeys.KAFKA_OUTPUT_TOPIC).toString();
// Transform minutes to milliseconds final Duration downsampleInterval = Duration.ofMinutes(
final int downsampleInterval = Integer.parseInt( Integer.parseInt(config.getProperty(
config.getProperty(ConfigurationKeys.DOWNSAMPLE_INTERVAL).toString()); ConfigurationKeys.DOWNSAMPLE_INTERVAL).toString()));
final int downsampleIntervalMs = downsampleInterval * 60_000;
this.pipelineFactory = new Uc2PipelineFactory( this.pipelineFactory = new Uc2PipelineFactory(
kafkaProps, kafkaProps,
this.kafkaInputTopic, this.kafkaInputTopic,
kafkaWriteProps, kafkaWriteProps,
kafkaOutputTopic, kafkaOutputTopic,
downsampleIntervalMs); downsampleInterval);
} }
@Override @Override
......
...@@ -11,17 +11,19 @@ import com.hazelcast.jet.pipeline.Sinks; ...@@ -11,17 +11,19 @@ import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.StreamSource; import com.hazelcast.jet.pipeline.StreamSource;
import com.hazelcast.jet.pipeline.StreamStage; import com.hazelcast.jet.pipeline.StreamStage;
import com.hazelcast.jet.pipeline.WindowDefinition; import com.hazelcast.jet.pipeline.WindowDefinition;
import java.time.Duration;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Properties; import java.util.Properties;
import rocks.theodolite.benchmarks.commons.hazelcastjet.PipelineFactory; import rocks.theodolite.benchmarks.commons.hazelcastjet.PipelineFactory;
import rocks.theodolite.benchmarks.uc2.hazelcastjet.uc2specifics.StatsAccumulatorSupplier;
import rocks.theodolite.benchmarks.commons.model.records.ActivePowerRecord; import rocks.theodolite.benchmarks.commons.model.records.ActivePowerRecord;
import rocks.theodolite.benchmarks.uc2.hazelcastjet.uc2specifics.StatsAccumulatorSupplier;
public class Uc2PipelineFactory extends PipelineFactory { public class Uc2PipelineFactory extends PipelineFactory {
private final int downsampleIntervalInMs; private final Duration downsampleInterval;
/** /**
* Factory for uc2 pipelines. * Factory for uc2 pipelines.
...@@ -38,10 +40,10 @@ public class Uc2PipelineFactory extends PipelineFactory { ...@@ -38,10 +40,10 @@ public class Uc2PipelineFactory extends PipelineFactory {
final String kafkaInputTopic, final String kafkaInputTopic,
final Properties kafkaWritePropsForPipeline, final Properties kafkaWritePropsForPipeline,
final String kafkaOutputTopic, final String kafkaOutputTopic,
final int downsampleIntervalInMs) { final Duration downsampleIntervalInMs) {
super(kafkaReadPropsForPipeline, kafkaInputTopic, super(kafkaReadPropsForPipeline, kafkaInputTopic,
kafkaWritePropsForPipeline,kafkaOutputTopic); kafkaWritePropsForPipeline,kafkaOutputTopic);
this.downsampleIntervalInMs = downsampleIntervalInMs; this.downsampleInterval = downsampleIntervalInMs;
} }
/** /**
...@@ -91,7 +93,7 @@ public class Uc2PipelineFactory extends PipelineFactory { ...@@ -91,7 +93,7 @@ public class Uc2PipelineFactory extends PipelineFactory {
.withNativeTimestamps(0) .withNativeTimestamps(0)
.setLocalParallelism(1) .setLocalParallelism(1)
.groupingKey(record -> record.getValue().getIdentifier()) .groupingKey(record -> record.getValue().getIdentifier())
.window(WindowDefinition.tumbling(downsampleIntervalInMs)) .window(WindowDefinition.tumbling(downsampleInterval.toMillis()))
.aggregate(this.uc2AggregateOperation()) .aggregate(this.uc2AggregateOperation())
.map(agg -> { .map(agg -> {
final String theKey = agg.key(); final String theKey = agg.key();
......
...@@ -12,6 +12,8 @@ import com.hazelcast.jet.pipeline.test.AssertionCompletedException; ...@@ -12,6 +12,8 @@ import com.hazelcast.jet.pipeline.test.AssertionCompletedException;
import com.hazelcast.jet.pipeline.test.Assertions; import com.hazelcast.jet.pipeline.test.Assertions;
import com.hazelcast.jet.pipeline.test.TestSources; import com.hazelcast.jet.pipeline.test.TestSources;
import com.hazelcast.jet.test.SerialTest; import com.hazelcast.jet.test.SerialTest;
import java.time.Duration;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Properties; import java.util.Properties;
...@@ -44,7 +46,7 @@ public class Uc2PipelineTest extends JetTestSupport { ...@@ -44,7 +46,7 @@ public class Uc2PipelineTest extends JetTestSupport {
final int testItemsPerSecond = 1; final int testItemsPerSecond = 1;
final String testSensorName = "TEST-SENSOR"; final String testSensorName = "TEST-SENSOR";
final Double testValueInW = 10.0; final Double testValueInW = 10.0;
final int testWindowInMs = 5000; final Duration testWindow = Duration.ofSeconds(5);
// Create mock jet instance with configuration // Create mock jet instance with configuration
final String testClusterName = randomName(); final String testClusterName = randomName();
...@@ -65,7 +67,7 @@ public class Uc2PipelineTest extends JetTestSupport { ...@@ -65,7 +67,7 @@ public class Uc2PipelineTest extends JetTestSupport {
// Create pipeline to test // Create pipeline to test
final Properties properties = new Properties(); final Properties properties = new Properties();
final Uc2PipelineFactory factory = new Uc2PipelineFactory( final Uc2PipelineFactory factory = new Uc2PipelineFactory(
properties,"",properties,"", testWindowInMs); properties,"",properties,"", testWindow);
this.uc2Topology = factory.extendUc2Topology(testSource); this.uc2Topology = factory.extendUc2Topology(testSource);
this.testPipeline = factory.getPipe(); this.testPipeline = factory.getPipe();
...@@ -84,9 +86,8 @@ public class Uc2PipelineTest extends JetTestSupport { ...@@ -84,9 +86,8 @@ public class Uc2PipelineTest extends JetTestSupport {
// Assertion // Assertion
this.uc2Topology.apply(Assertions.assertCollectedEventually(timeout, this.uc2Topology.apply(Assertions.assertCollectedEventually(timeout,
collection -> Assert.assertTrue( collection -> Assert.assertEquals("Not the right amount items in Stats Object!",
"Not the right amount items in Stats Object!", expectedOutput, collection.get(collection.size() - 1).getValue())));
collection.get(collection.size() - 1).getValue().equals(expectedOutput))));
// Run the test! // Run the test!
try { try {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment