diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/ChildParentsTransformer.java b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/ChildParentsTransformer.java index 47bd04a12da3cc72744a8dc0503154015486c2bf..8e1f9f7058dced1ee50a5b1c1737e6846cf6525e 100644 --- a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/ChildParentsTransformer.java +++ b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/ChildParentsTransformer.java @@ -18,8 +18,6 @@ import rocks.theodolite.benchmarks.commons.model.sensorregistry.SensorRegistry; */ public class ChildParentsTransformer { - public ChildParentsTransformer() {} - /** * Constructs a map of keys to their set of parents out of a SensorRegistry. * diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/HistoryService.java b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/HistoryService.java index 97ea33eda56f34d5f1a2f8e5def8373c259540d0..1d1ac89692ca70fb0582f51fdcc8692d2f034208 100644 --- a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/HistoryService.java +++ b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/HistoryService.java @@ -53,16 +53,20 @@ public class HistoryService extends HazelcastJetService { final String feedbackTopic = this.config.getString(Uc4ConfigurationKeys.KAFKA_FEEDBACK_TOPIC); - final Duration windowSize = Duration.ofMillis( + final Duration emirPeriod = Duration.ofMillis( this.config.getInt(Uc4ConfigurationKeys.EMIT_PERIOD_MS)); + final Duration gracePeriod = Duration.ofMillis( + this.config.getInt(Uc4ConfigurationKeys.GRACE_PERIOD_MS)); + this.pipelineFactory = new Uc4PipelineFactory( kafkaProps, kafkaConfigReadProps, kafkaAggregationReadProps, kafkaWriteProps, this.kafkaInputTopic, outputTopic, configurationTopic, feedbackTopic, - windowSize); + emirPeriod, + gracePeriod); } @Override diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4ConfigurationKeys.java b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4ConfigurationKeys.java index 6c4c63396208cafc68a83835f29609b8582370ca..7a85c2dc5fc65a7374f3d19ef7c9e561acd6ec13 100644 --- a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4ConfigurationKeys.java +++ b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4ConfigurationKeys.java @@ -12,6 +12,7 @@ public class Uc4ConfigurationKeys { public static final String KAFKA_FEEDBACK_TOPIC = "kafka.feedback.topic"; public static final String EMIT_PERIOD_MS = "emit.period.ms"; - // public static final String GRACE_PERIOD_MS = "grace.period.ms"; + + public static final String GRACE_PERIOD_MS = "grace.period.ms"; } diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineFactory.java b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineFactory.java index 59b5941fb9f0090074869b00d49ad26c68e40165..db7fdfce06790a047b80a392843c93d3c0f467d9 100644 --- a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineFactory.java +++ b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineFactory.java @@ -45,6 +45,8 @@ public class Uc4PipelineFactory extends PipelineFactory { private final Duration emitPeriod; + private final Duration gracePeriod; + /** * Builds a pipeline which can be used for stream processing using Hazelcast Jet. @@ -61,7 +63,7 @@ public class Uc4PipelineFactory extends PipelineFactory { * @param kafkaOutputTopic The name of the output topic used for the pipeline. * @param kafkaConfigurationTopic The name of the configuration topic used for the pipeline. * @param kafkaFeedbackTopic The name of the feedback topic used for the pipeline. - * @param windowSize The window size of the tumbling window used in this pipeline. + * @param emitPeriod The window size of the tumbling window used in this pipeline. */ public Uc4PipelineFactory(final Properties kafkaInputReadPropsForPipeline, // NOPMD final Properties kafkaConfigPropsForPipeline, @@ -71,7 +73,8 @@ public class Uc4PipelineFactory extends PipelineFactory { final String kafkaOutputTopic, final String kafkaConfigurationTopic, final String kafkaFeedbackTopic, - final Duration windowSize) { + final Duration emitPeriod, + final Duration gracePeriod) { super(kafkaInputReadPropsForPipeline, kafkaInputTopic, kafkaWritePropsForPipeline, kafkaOutputTopic); @@ -79,7 +82,8 @@ public class Uc4PipelineFactory extends PipelineFactory { this.kafkaFeedbackPropsForPipeline = kafkaFeedbackPropsForPipeline; this.kafkaConfigurationTopic = kafkaConfigurationTopic; this.kafkaFeedbackTopic = kafkaFeedbackTopic; - this.emitPeriod = windowSize; + this.emitPeriod = emitPeriod; + this.gracePeriod = gracePeriod; } /** @@ -155,13 +159,13 @@ public class Uc4PipelineFactory extends PipelineFactory { ////////////////////////////////// // (1) Configuration Stream this.pipe.readFrom(configurationSource) - .withNativeTimestamps(0) + .withNativeTimestamps(this.gracePeriod.toMillis()) .filter(entry -> entry.getKey() == Event.SENSOR_REGISTRY_CHANGED || entry.getKey() == Event.SENSOR_REGISTRY_STATUS) .map(data -> Util.entry(data.getKey(), SensorRegistry.fromJson(data.getValue()))) .flatMapStateful(HashMap::new, new ConfigFlatMap()) .writeTo(Sinks.mapWithUpdating( - SENSOR_PARENT_MAP_NAME, // The addressed IMAP + SENSOR_PARENT_MAP_NAME, // The addressed IMap Entry::getKey, // The key to look for (oldValue, newEntry) -> newEntry.getValue())); @@ -169,13 +173,13 @@ public class Uc4PipelineFactory extends PipelineFactory { // (1) Sensor Input Stream final StreamStage<Entry<String, ActivePowerRecord>> inputStream = this.pipe .readFrom(inputSource) - .withNativeTimestamps(0); + .withNativeTimestamps(this.gracePeriod.toMillis()); ////////////////////////////////// // (1) Aggregation Stream final StreamStage<Entry<String, ActivePowerRecord>> aggregations = this.pipe .readFrom(aggregationSource) - .withNativeTimestamps(0) + .withNativeTimestamps(this.gracePeriod.toMillis()) .map(entry -> { // Map Aggregated to ActivePowerRecord final AggregatedActivePowerRecord agg = entry.getValue(); final ActivePowerRecord record = new ActivePowerRecord( @@ -214,17 +218,11 @@ public class Uc4PipelineFactory extends PipelineFactory { final ActivePowerRecord record = entry.getValue().getRecord(); final Set<String> groups = entry.getValue().getGroups(); - // Transformed Data - final String[] groupList = groups.toArray(String[]::new); - final SensorGroupKey[] newKeyList = new SensorGroupKey[groupList.length]; - final List<Entry<SensorGroupKey, ActivePowerRecord>> newEntryList = new ArrayList<>(); - for (int i = 0; i < groupList.length; i++) { - newKeyList[i] = new SensorGroupKey(keyGroupId, groupList[i]); - newEntryList.add(Util.entry(newKeyList[i], record)); - } - // Return traversable list of new entry elements - return Traversers.traverseIterable(newEntryList); + return Traversers.traverseStream( + groups + .stream() + .map(group -> Util.entry(new SensorGroupKey(keyGroupId, group), record))); }); ////////////////////////////////// @@ -253,7 +251,8 @@ public class Uc4PipelineFactory extends PipelineFactory { return windowedLastValues .groupingKey(entry -> entry.getKey().getGroup()) - .aggregate(aggrOp).map(agg -> Util.entry(agg.getKey(), agg.getValue())); + .aggregate(aggrOp) + .map(agg -> Util.entry(agg.getKey(), agg.getValue())); } @@ -270,7 +269,7 @@ public class Uc4PipelineFactory extends PipelineFactory { final Map<String, Set<String>> flatMapStage, final Entry<Event, SensorRegistry> eventItem) { // Transform new Input - final ChildParentsTransformer transformer = new ChildParentsTransformer("default-name"); + final ChildParentsTransformer transformer = new ChildParentsTransformer(); final Map<String, Set<String>> mapFromRegistry = transformer.constructChildParentsPairs(eventItem.getValue()); diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/resources/META-INF/application.properties b/theodolite-benchmarks/uc4-hazelcastjet/src/main/resources/META-INF/application.properties index af877044b6e17665b6a18af41ec72ab6cedf0f91..6fa35fa124939c72b0312c7f632a11b699b43c85 100644 --- a/theodolite-benchmarks/uc4-hazelcastjet/src/main/resources/META-INF/application.properties +++ b/theodolite-benchmarks/uc4-hazelcastjet/src/main/resources/META-INF/application.properties @@ -10,4 +10,4 @@ kafka.feedback.topic=aggregation-feedback schema.registry.url=http://localhost:8081 emit.period.ms=5000 -#grace.period.ms=0 +grace.period.ms=0 diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineTest.java b/theodolite-benchmarks/uc4-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineTest.java index 29a561d1bd039f70b2540014f970a03094418532..7248791a572d81316dce48950a0ec36e96e21e74 100644 --- a/theodolite-benchmarks/uc4-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineTest.java +++ b/theodolite-benchmarks/uc4-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineTest.java @@ -120,7 +120,7 @@ public class Uc4PipelineTest extends JetTestSupport { final Properties properties = new Properties(); final Uc4PipelineFactory factory = new Uc4PipelineFactory( properties, properties, properties, properties, "", "", - "", "", testWindowSize); + "", "", testWindowSize, Duration.ofMillis(0)); this.uc4Topology = factory.extendUc4Topology(testInputSource, testAggregationSource, testConfigSource);