diff --git a/theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/ConfigurationKeys.java b/theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/ConfigurationKeys.java index 44e82126d365e3d373194160cca3b63cd1c46c98..4c726f523efc2b8b7320941ff2fe0d3a1c1d7acf 100644 --- a/theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/ConfigurationKeys.java +++ b/theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/ConfigurationKeys.java @@ -32,6 +32,7 @@ public class ConfigurationKeys { // UC4 public static final String KAFKA_CONFIGURATION_TOPIC = "kafka.configuration.topic"; public static final String KAFKA_FEEDBACK_TOPIC = "kafka.feedback.topic"; - public static final String WINDOW_SIZE_UC4 = "window.size"; + public static final String EMIT_PERIOD_MS = "window.size"; + // public static final String EMIT_PERIOD_MS = "emit.period.ms"; } diff --git a/theodolite-benchmarks/uc4-beam/src/main/resources/META-INF/application.properties b/theodolite-benchmarks/uc4-beam/src/main/resources/META-INF/application.properties index 547b92a3a19dda930e582878752c4eeaacee6a04..633b5084d03caf481ca4b4902fb0bbd5dee982ef 100644 --- a/theodolite-benchmarks/uc4-beam/src/main/resources/META-INF/application.properties +++ b/theodolite-benchmarks/uc4-beam/src/main/resources/META-INF/application.properties @@ -6,13 +6,10 @@ kafka.input.topic=input kafka.output.topic=output kafka.configuration.topic=configuration kafka.feedback.topic=aggregation-feedback -kafka.window.duration.minutes=1 schema.registry.url=http://localhost:8081 -aggregation.duration.days=30 -aggregation.advance.days=1 - +kafka.window.duration.minutes=1 trigger.interval=15 grace.period.ms=270 diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/HistoryService.java b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/HistoryService.java index f2b5cb41c2328f41bc481caa5f9a363a032da1ac..678f774a59166dd97337a584f708b1d4036c2c0b 100644 --- a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/HistoryService.java +++ b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/HistoryService.java @@ -2,6 +2,7 @@ package rocks.theodolite.benchmarks.uc4.hazelcastjet; import io.confluent.kafka.serializers.KafkaAvroDeserializer; import io.confluent.kafka.serializers.KafkaAvroSerializer; +import java.time.Duration; import java.util.Properties; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; @@ -46,24 +47,22 @@ public class HistoryService extends HazelcastJetService { StringSerializer.class.getCanonicalName(), KafkaAvroSerializer.class.getCanonicalName()); - final String kafkaOutputTopic = - this.config.getProperty(ConfigurationKeys.KAFKA_OUTPUT_TOPIC).toString(); + final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC); - final String kafkaConfigurationTopic = - this.config.getProperty(ConfigurationKeys.KAFKA_CONFIGURATION_TOPIC).toString(); + final String configurationTopic = + this.config.getString(ConfigurationKeys.KAFKA_CONFIGURATION_TOPIC); - final String kafkaFeedbackTopic = - this.config.getProperty(ConfigurationKeys.KAFKA_FEEDBACK_TOPIC).toString(); + final String feedbackTopic = this.config.getString(ConfigurationKeys.KAFKA_FEEDBACK_TOPIC); - final int windowSize = Integer.parseInt( - this.config.getProperty(ConfigurationKeys.WINDOW_SIZE_UC4).toString()); + final Duration windowSize = Duration.ofMillis( + this.config.getInt(ConfigurationKeys.EMIT_PERIOD_MS)); this.pipelineFactory = new Uc4PipelineFactory( kafkaProps, kafkaConfigReadProps, kafkaAggregationReadProps, kafkaWriteProps, - this.kafkaInputTopic, kafkaOutputTopic, kafkaConfigurationTopic, kafkaFeedbackTopic, + this.kafkaInputTopic, outputTopic, configurationTopic, feedbackTopic, windowSize); } diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineFactory.java b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineFactory.java index abac638ec99b32bdc4d3198dbae08fb6fdc9754f..83e0edcb8f6a3fba4559aa1bc5c17ac4d57e1312 100644 --- a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineFactory.java +++ b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineFactory.java @@ -15,6 +15,7 @@ import com.hazelcast.jet.pipeline.StreamSource; import com.hazelcast.jet.pipeline.StreamStage; import com.hazelcast.jet.pipeline.StreamStageWithKey; import com.hazelcast.jet.pipeline.WindowDefinition; +import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -42,7 +43,7 @@ public class Uc4PipelineFactory extends PipelineFactory { private final String kafkaConfigurationTopic; private final String kafkaFeedbackTopic; - private final int windowSize; + private final Duration windowSize; /** @@ -70,7 +71,7 @@ public class Uc4PipelineFactory extends PipelineFactory { final String kafkaOutputTopic, final String kafkaConfigurationTopic, final String kafkaFeedbackTopic, - final int windowSize) { + final Duration windowSize) { super(kafkaInputReadPropsForPipeline, kafkaInputTopic, kafkaWritePropsForPipeline, kafkaOutputTopic); @@ -230,8 +231,7 @@ public class Uc4PipelineFactory extends PipelineFactory { // (5) UC4 Last Value Map // Table with tumbling window differentiation [ (sensorKey,Group) , value ],Time final StageWithWindow<Entry<SensorGroupKey, ActivePowerRecord>> windowedLastValues = - dupliAsFlatmappedStage - .window(WindowDefinition.tumbling(this.windowSize)); + dupliAsFlatmappedStage.window(WindowDefinition.tumbling(this.windowSize.toMillis())); final AggregateOperation1<Entry<SensorGroupKey, ActivePowerRecord>, AggregatedActivePowerRecordAccumulator, AggregatedActivePowerRecord> aggrOp = // NOCS AggregateOperation diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineTest.java b/theodolite-benchmarks/uc4-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineTest.java index 5c06a2c3cdacdf5d23d2a2559c150987f4add07e..29a561d1bd039f70b2540014f970a03094418532 100644 --- a/theodolite-benchmarks/uc4-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineTest.java +++ b/theodolite-benchmarks/uc4-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineTest.java @@ -13,6 +13,7 @@ import com.hazelcast.jet.pipeline.test.AssertionCompletedException; import com.hazelcast.jet.pipeline.test.Assertions; import com.hazelcast.jet.pipeline.test.TestSources; import com.hazelcast.jet.test.SerialTest; +import java.time.Duration; import java.util.Map; import java.util.Map.Entry; import java.util.Objects; @@ -52,7 +53,8 @@ public class Uc4PipelineTest extends JetTestSupport { final String testLevel1GroupName = "TEST-LEVEL1-GROUP"; final String testLevel2GroupName = "TEST-LEVEL2-GROUP"; final Double testValueInW = 10.0; - final int testWindowSize = 5000; // As window size is bugged, not necessary. + // As window size is bugged, not necessary. + final Duration testWindowSize = Duration.ofMillis(5000); // Create mocked Hazelcast Jet instance with configuration final String testClusterName = randomName(); @@ -117,10 +119,11 @@ public class Uc4PipelineTest extends JetTestSupport { // Create pipeline to test final Properties properties = new Properties(); final Uc4PipelineFactory factory = new Uc4PipelineFactory( - properties,properties,properties,properties,"","", - "","", testWindowSize); + properties, properties, properties, properties, "", "", + "", "", testWindowSize); - this.uc4Topology = factory.extendUc4Topology(testInputSource, testAggregationSource, testConfigSource); + this.uc4Topology = + factory.extendUc4Topology(testInputSource, testAggregationSource, testConfigSource); this.uc4Topology.writeTo(Sinks.logger()); this.testPipeline = factory.getPipe(); @@ -202,7 +205,8 @@ public class Uc4PipelineTest extends JetTestSupport { .registerSerializer(ImmutableSensorRegistry.class, ImmutableSensorRegistryUc4Serializer.class); this.testInstance.newJob(this.testPipeline, jobConfig).join(); - Assert.fail("Job should have completed with an AssertionCompletedException, but completed normally"); + Assert.fail( + "Job should have completed with an AssertionCompletedException, but completed normally"); } catch (final CompletionException e) { final String errorMsg = e.getCause().getMessage(); @@ -210,8 +214,8 @@ public class Uc4PipelineTest extends JetTestSupport { "Job was expected to complete with AssertionCompletedException, but completed with: " + e.getCause(), errorMsg.contains(AssertionCompletedException.class.getName())); - } catch (final Exception e){ - LOGGER.error("Test is broken",e); + } catch (final Exception e) { + LOGGER.error("Test is broken", e); } }