diff --git a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/HistoryService.java b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/HistoryService.java index 4c097c5789d438dccca14dbb827d23c2c227e15c..7fbcc1cfe52eafcdeb1741b8f888ffce1e40909b 100644 --- a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/HistoryService.java +++ b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/HistoryService.java @@ -2,6 +2,8 @@ package rocks.theodolite.benchmarks.uc2.hazelcastjet; import com.google.common.math.StatsAccumulator; import io.confluent.kafka.serializers.KafkaAvroDeserializer; + +import java.time.Duration; import java.util.Properties; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; @@ -39,17 +41,16 @@ public class HistoryService extends HazelcastJetService { final String kafkaOutputTopic = config.getProperty(ConfigurationKeys.KAFKA_OUTPUT_TOPIC).toString(); - // Transform minutes to milliseconds - final int downsampleInterval = Integer.parseInt( - config.getProperty(ConfigurationKeys.DOWNSAMPLE_INTERVAL).toString()); - final int downsampleIntervalMs = downsampleInterval * 60_000; + final Duration downsampleInterval = Duration.ofMinutes( + Integer.parseInt(config.getProperty( + ConfigurationKeys.DOWNSAMPLE_INTERVAL).toString())); this.pipelineFactory = new Uc2PipelineFactory( kafkaProps, this.kafkaInputTopic, kafkaWriteProps, kafkaOutputTopic, - downsampleIntervalMs); + downsampleInterval); } @Override diff --git a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2PipelineFactory.java b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2PipelineFactory.java index b4c8fa662eab7a68a165fcbb474b6293d29202be..cc3ed394feffd4492b125a4bb085e21109ce61dc 100644 --- a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2PipelineFactory.java +++ b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2PipelineFactory.java @@ -11,17 +11,19 @@ import com.hazelcast.jet.pipeline.Sinks; import com.hazelcast.jet.pipeline.StreamSource; import com.hazelcast.jet.pipeline.StreamStage; import com.hazelcast.jet.pipeline.WindowDefinition; +import java.time.Duration; import java.util.Map; import java.util.Map.Entry; import java.util.Properties; import rocks.theodolite.benchmarks.commons.hazelcastjet.PipelineFactory; -import rocks.theodolite.benchmarks.uc2.hazelcastjet.uc2specifics.StatsAccumulatorSupplier; import rocks.theodolite.benchmarks.commons.model.records.ActivePowerRecord; +import rocks.theodolite.benchmarks.uc2.hazelcastjet.uc2specifics.StatsAccumulatorSupplier; + public class Uc2PipelineFactory extends PipelineFactory { - private final int downsampleIntervalInMs; + private final Duration downsampleInterval; /** * Factory for uc2 pipelines. @@ -38,10 +40,10 @@ public class Uc2PipelineFactory extends PipelineFactory { final String kafkaInputTopic, final Properties kafkaWritePropsForPipeline, final String kafkaOutputTopic, - final int downsampleIntervalInMs) { + final Duration downsampleIntervalInMs) { super(kafkaReadPropsForPipeline, kafkaInputTopic, kafkaWritePropsForPipeline,kafkaOutputTopic); - this.downsampleIntervalInMs = downsampleIntervalInMs; + this.downsampleInterval = downsampleIntervalInMs; } /** @@ -91,7 +93,7 @@ public class Uc2PipelineFactory extends PipelineFactory { .withNativeTimestamps(0) .setLocalParallelism(1) .groupingKey(record -> record.getValue().getIdentifier()) - .window(WindowDefinition.tumbling(downsampleIntervalInMs)) + .window(WindowDefinition.tumbling(downsampleInterval.toMillis())) .aggregate(this.uc2AggregateOperation()) .map(agg -> { final String theKey = agg.key(); diff --git a/theodolite-benchmarks/uc2-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2PipelineTest.java b/theodolite-benchmarks/uc2-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2PipelineTest.java index 00e3019bfba84625981107b4229c337a8ceeff83..8461d9cebe9e9556744777d0626cfeca5152b13d 100644 --- a/theodolite-benchmarks/uc2-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2PipelineTest.java +++ b/theodolite-benchmarks/uc2-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2PipelineTest.java @@ -12,6 +12,8 @@ import com.hazelcast.jet.pipeline.test.AssertionCompletedException; import com.hazelcast.jet.pipeline.test.Assertions; import com.hazelcast.jet.pipeline.test.TestSources; import com.hazelcast.jet.test.SerialTest; + +import java.time.Duration; import java.util.Map; import java.util.Map.Entry; import java.util.Properties; @@ -44,7 +46,7 @@ public class Uc2PipelineTest extends JetTestSupport { final int testItemsPerSecond = 1; final String testSensorName = "TEST-SENSOR"; final Double testValueInW = 10.0; - final int testWindowInMs = 5000; + final Duration testWindow = Duration.ofSeconds(5); // Create mock jet instance with configuration final String testClusterName = randomName(); @@ -65,7 +67,7 @@ public class Uc2PipelineTest extends JetTestSupport { // Create pipeline to test final Properties properties = new Properties(); final Uc2PipelineFactory factory = new Uc2PipelineFactory( - properties,"",properties,"", testWindowInMs); + properties,"",properties,"", testWindow); this.uc2Topology = factory.extendUc2Topology(testSource); this.testPipeline = factory.getPipe(); @@ -84,9 +86,8 @@ public class Uc2PipelineTest extends JetTestSupport { // Assertion this.uc2Topology.apply(Assertions.assertCollectedEventually(timeout, - collection -> Assert.assertTrue( - "Not the right amount items in Stats Object!", - collection.get(collection.size() - 1).getValue().equals(expectedOutput)))); + collection -> Assert.assertEquals("Not the right amount items in Stats Object!", + expectedOutput, collection.get(collection.size() - 1).getValue()))); // Run the test! try {