diff --git a/theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/ConfigurationKeys.java b/theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/ConfigurationKeys.java
index ce2fffc7562ca4b46dfa2403032b7de25e0b76a3..36ce6ded69d38d5df45cf436e5baa3f2e4bc7489 100644
--- a/theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/ConfigurationKeys.java
+++ b/theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/ConfigurationKeys.java
@@ -26,10 +26,11 @@ public class ConfigurationKeys {
// UC3
public static final String AGGREGATION_DURATION_DAYS = "aggregation.duration.days";
public static final String AGGREGATION_ADVANCE_DAYS = "aggregation.advance.days";
+ public static final String AGGREGATION_EMIT_PERIOD_SECONDS = "aggregation.emit.period.seconds";
// UC4
public static final String KAFKA_CONFIGURATION_TOPIC = "kafka.configuration.topic";
public static final String KAFKA_FEEDBACK_TOPIC = "kafka.feedback.topic";
public static final String WINDOW_SIZE_UC4 = "window.size";
-
+
}
diff --git a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/HistoryService.java b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/HistoryService.java
index b2ba886a68e617328bce2616fd58a2ae4656a7d2..ff93b866078ac794221793ce10342a67c914f32e 100644
--- a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/HistoryService.java
+++ b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/HistoryService.java
@@ -1,7 +1,6 @@
package rocks.theodolite.benchmarks.uc3.hazelcastjet;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
-
import java.time.Duration;
import java.util.Properties;
import org.apache.kafka.common.serialization.StringDeserializer;
@@ -10,8 +9,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rocks.theodolite.benchmarks.commons.hazelcastjet.ConfigurationKeys;
import rocks.theodolite.benchmarks.commons.hazelcastjet.HazelcastJetService;
-import rocks.theodolite.benchmarks.uc3.hazelcastjet.uc3specifics.HourOfDayKey;
-import rocks.theodolite.benchmarks.uc3.hazelcastjet.uc3specifics.HourOfDayKeySerializer;
/**
* A microservice that aggregate incoming messages in a sliding window.
@@ -21,8 +18,8 @@ public class HistoryService extends HazelcastJetService {
private static final Logger LOGGER = LoggerFactory.getLogger(HistoryService.class);
/**
- * Constructs the use case logic for UC3.
- * Retrieves the needed values and instantiates a pipeline factory.
+ * Constructs the use case logic for UC3. Retrieves the needed values and instantiates a pipeline
+ * factory.
*/
public HistoryService() {
super(LOGGER);
@@ -37,21 +34,25 @@ public class HistoryService extends HazelcastJetService {
StringSerializer.class.getCanonicalName());
final String kafkaOutputTopic =
- config.getProperty(ConfigurationKeys.KAFKA_OUTPUT_TOPIC).toString();
+ this.config.getProperty(ConfigurationKeys.KAFKA_OUTPUT_TOPIC).toString();
final Duration windowSize = Duration.ofDays(Integer.parseInt(
- config.getProperty(ConfigurationKeys.AGGREGATION_DURATION_DAYS).toString()));
+ this.config.getProperty(ConfigurationKeys.AGGREGATION_DURATION_DAYS).toString()));
final Duration hoppingSize = Duration.ofDays(Integer.parseInt(
- config.getProperty(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS).toString()));
+ this.config.getProperty(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS).toString()));
+
+ final Duration emitPeriod = Duration.ofSeconds(Integer.parseInt(
+ this.config.getProperty(ConfigurationKeys.AGGREGATION_EMIT_PERIOD_SECONDS).toString()));
this.pipelineFactory = new Uc3PipelineFactory(
kafkaProps,
- kafkaInputTopic,
+ this.kafkaInputTopic,
kafkaWriteProps,
kafkaOutputTopic,
windowSize,
- hoppingSize);
+ hoppingSize,
+ emitPeriod);
}
@Override
diff --git a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/uc3specifics/HourOfDayKey.java b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/HourOfDayKey.java
similarity index 93%
rename from theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/uc3specifics/HourOfDayKey.java
rename to theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/HourOfDayKey.java
index c69f433f3af7ec0484c254af9e59e7d284379cb0..6fe3343ce0f6dc4d9828a0147dce9e328ad76b02 100644
--- a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/uc3specifics/HourOfDayKey.java
+++ b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/HourOfDayKey.java
@@ -1,4 +1,4 @@
-package rocks.theodolite.benchmarks.uc3.hazelcastjet.uc3specifics;
+package rocks.theodolite.benchmarks.uc3.hazelcastjet;
import java.util.Objects;
diff --git a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/uc3specifics/HourOfDayKeySerializer.java b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/HourOfDayKeySerializer.java
similarity index 92%
rename from theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/uc3specifics/HourOfDayKeySerializer.java
rename to theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/HourOfDayKeySerializer.java
index 91ba3f2be26f4317a1dec81caf9080da8c1edc9c..c689bab16b7392dc8d958bee61c4fdecb50ba0b7 100644
--- a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/uc3specifics/HourOfDayKeySerializer.java
+++ b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/HourOfDayKeySerializer.java
@@ -1,4 +1,4 @@
-package rocks.theodolite.benchmarks.uc3.hazelcastjet.uc3specifics;
+package rocks.theodolite.benchmarks.uc3.hazelcastjet;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
diff --git a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/uc3specifics/HoursOfDayKeyFactory.java b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/HoursOfDayKeyFactory.java
similarity index 87%
rename from theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/uc3specifics/HoursOfDayKeyFactory.java
rename to theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/HoursOfDayKeyFactory.java
index 4eddb85efebf5b8b07317d0cd39f36b90d3f4fcd..af32575e5433f26e19361fa62ea460a78bb9dd66 100644
--- a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/uc3specifics/HoursOfDayKeyFactory.java
+++ b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/HoursOfDayKeyFactory.java
@@ -1,4 +1,4 @@
-package rocks.theodolite.benchmarks.uc3.hazelcastjet.uc3specifics;
+package rocks.theodolite.benchmarks.uc3.hazelcastjet;
import java.time.LocalDateTime;
diff --git a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/uc3specifics/StatsKeyFactory.java b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/StatsKeyFactory.java
similarity index 82%
rename from theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/uc3specifics/StatsKeyFactory.java
rename to theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/StatsKeyFactory.java
index 2a404781e5916473604f14f87b9c3eccf9eda342..b731ee27509e6e303437fdff9f8c9327ef99dea3 100644
--- a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/uc3specifics/StatsKeyFactory.java
+++ b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/StatsKeyFactory.java
@@ -1,4 +1,4 @@
-package rocks.theodolite.benchmarks.uc3.hazelcastjet.uc3specifics;
+package rocks.theodolite.benchmarks.uc3.hazelcastjet;
import java.time.LocalDateTime;
diff --git a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3PipelineFactory.java b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3PipelineFactory.java
index 474a9e85789901ccc9d5cb51fd2df9b89159e0c0..69fe67756e0922085f795cbce4b36a9d23ba9121 100644
--- a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3PipelineFactory.java
+++ b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3PipelineFactory.java
@@ -8,7 +8,6 @@ import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.StreamSource;
import com.hazelcast.jet.pipeline.StreamStage;
import com.hazelcast.jet.pipeline.WindowDefinition;
-
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
@@ -18,51 +17,52 @@ import java.util.Properties;
import java.util.TimeZone;
import rocks.theodolite.benchmarks.commons.hazelcastjet.PipelineFactory;
import rocks.theodolite.benchmarks.commons.model.records.ActivePowerRecord;
-import rocks.theodolite.benchmarks.uc3.hazelcastjet.uc3specifics.HourOfDayKey;
-import rocks.theodolite.benchmarks.uc3.hazelcastjet.uc3specifics.HoursOfDayKeyFactory;
-import rocks.theodolite.benchmarks.uc3.hazelcastjet.uc3specifics.StatsKeyFactory;
/**
- * PipelineFactory for use case 3.
- * Allows to build and extend pipelines.
+ * PipelineFactory for use case 3. Allows to build and extend pipelines.
*/
public class Uc3PipelineFactory extends PipelineFactory {
private final Duration hoppingSize;
private final Duration windowSize;
+ private final Duration emitPeriod;
/**
* Build a new Pipeline.
+ *
* @param kafkaReadPropsForPipeline Properties Object containing the necessary kafka reads
* attributes.
* @param kafkaWritePropsForPipeline Properties Object containing the necessary kafka write
* attributes.
* @param kafkaInputTopic The name of the input topic used for the pipeline.
* @param kafkaOutputTopic The name of the output topic used for the pipeline.
- * @param hoppingSize The hop length of the sliding window used in the aggregation of
- * this pipeline.
- * @param windowSize The window length of the sliding window used in the aggregation of
- * this pipeline.
+ * @param hoppingSize The hop length of the sliding window used in the aggregation of this
+ * pipeline.
+ * @param windowSize The window length of the sliding window used in the aggregation of this
+ * pipeline.
*/
public Uc3PipelineFactory(final Properties kafkaReadPropsForPipeline,
- final String kafkaInputTopic,
- final Properties kafkaWritePropsForPipeline,
- final String kafkaOutputTopic,
- final Duration windowSize,
- final Duration hoppingSize) {
- super(kafkaReadPropsForPipeline, kafkaInputTopic,
- kafkaWritePropsForPipeline,kafkaOutputTopic);
+ final String kafkaInputTopic,
+ final Properties kafkaWritePropsForPipeline,
+ final String kafkaOutputTopic,
+ final Duration windowSize,
+ final Duration hoppingSize,
+ final Duration emitPeriod) {
+ super(
+ kafkaReadPropsForPipeline,
+ kafkaInputTopic,
+ kafkaWritePropsForPipeline,
+ kafkaOutputTopic);
this.windowSize = windowSize;
this.hoppingSize = hoppingSize;
+ this.emitPeriod = emitPeriod;
}
-
-
/**
* Builds a pipeline which can be used for stream processing using Hazelcast Jet.
- * @return a pipeline used which can be used in a Hazelcast Jet Instance to process data
- * for UC3.
+ *
+ * @return a pipeline used which can be used in a Hazelcast Jet Instance to process data for UC3.
*/
@Override
public Pipeline buildPipeline() {
@@ -70,7 +70,7 @@ public class Uc3PipelineFactory extends PipelineFactory {
// Define the source
final StreamSource<Map.Entry<String, ActivePowerRecord>> kafkaSource = KafkaSources
.<String, ActivePowerRecord>kafka(
- kafkaReadPropsForPipeline, kafkaInputTopic);
+ this.kafkaReadPropsForPipeline, this.kafkaInputTopic);
// Extend topology for UC3
final StreamStage<Map.Entry<String, String>> uc3Product =
@@ -80,9 +80,9 @@ public class Uc3PipelineFactory extends PipelineFactory {
uc3Product.writeTo(Sinks.logger());
// Add Sink2: Write back to kafka for the final benchmark
uc3Product.writeTo(KafkaSinks.<String, String>kafka(
- kafkaWritePropsForPipeline, kafkaOutputTopic));
+ this.kafkaWritePropsForPipeline, this.kafkaOutputTopic));
- return pipe;
+ return this.pipe;
}
/**
@@ -98,11 +98,11 @@ public class Uc3PipelineFactory extends PipelineFactory {
* and value of the Entry object. It can be used to be further modified or directly be
* written into a sink.
*/
- public StreamStage<Map.Entry<String, String>>
- extendUc3Topology(final StreamSource<Map.Entry<String, ActivePowerRecord>> source) {
+ public StreamStage<Map.Entry<String, String>> extendUc3Topology(
+ final StreamSource<Map.Entry<String, ActivePowerRecord>> source) {
// Build the pipeline topology.
- return pipe
+ return this.pipe
.readFrom(source)
// use Timestamps
.withNativeTimestamps(0)
@@ -112,7 +112,8 @@ public class Uc3PipelineFactory extends PipelineFactory {
.map(record -> {
final String sensorId = record.getValue().getIdentifier();
final long timestamp = record.getValue().getTimestamp();
- final LocalDateTime dateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp),
+ final LocalDateTime dateTime = LocalDateTime.ofInstant(
+ Instant.ofEpochMilli(timestamp),
TimeZone.getDefault().toZoneId());
final StatsKeyFactory<HourOfDayKey> keyFactory = new HoursOfDayKeyFactory();
@@ -123,15 +124,17 @@ public class Uc3PipelineFactory extends PipelineFactory {
// group by new keys
.groupingKey(Entry::getKey)
// Sliding/Hopping Window
- .window(WindowDefinition.sliding(windowSize.toMillis(), hoppingSize.toMillis()))
+ .window(WindowDefinition
+ .sliding(this.windowSize.toMillis(), this.hoppingSize.toMillis())
+ .setEarlyResultsPeriod(this.emitPeriod.toMillis()))
// get average value of group (sensoreId,hourOfDay)
.aggregate(
AggregateOperations.averagingDouble(record -> record.getValue().getValueInW()))
- // map to return pair (sensorID,hourOfDay) -> (averaged what value)
+ // map to return pair sensorID -> stats
.map(agg -> {
- final String theValue = agg.getValue().toString();
- final String theKey = agg.getKey().toString();
- return Map.entry(theKey, theValue);
+ final String sensorId = agg.getKey().getSensorId();
+ final String stats = agg.getValue().toString(); // TODO just double, not stats
+ return Map.entry(sensorId, stats);
});
}
}
diff --git a/theodolite-benchmarks/uc3-hazelcastjet/src/main/resources/META-INF/application.properties b/theodolite-benchmarks/uc3-hazelcastjet/src/main/resources/META-INF/application.properties
index 0e7d3f42f71c4ea3959f1cb5093a3a579596d7a7..53e559dc32b4950ce677597e6724e016bc2bc2d2 100644
--- a/theodolite-benchmarks/uc3-hazelcastjet/src/main/resources/META-INF/application.properties
+++ b/theodolite-benchmarks/uc3-hazelcastjet/src/main/resources/META-INF/application.properties
@@ -6,6 +6,7 @@ kafka.input.topic=input
kafka.output.topic=output
aggregation.duration.days=30
aggregation.advance.days=1
+aggregation.emit.period.seconds=15
schema.registry.url=http://localhost:8081