diff --git a/application-kafkastreams-commons/src/main/java/theodolite/commons/kafkastreams/ConfigurationKeys.java b/application-kafkastreams-commons/src/main/java/theodolite/commons/kafkastreams/ConfigurationKeys.java index 260dbba9c1f094ac14679b6c7c4637046a687eee..6302e4c69904aaf57e3f936ee9ad0ead11414a8d 100644 --- a/application-kafkastreams-commons/src/main/java/theodolite/commons/kafkastreams/ConfigurationKeys.java +++ b/application-kafkastreams-commons/src/main/java/theodolite/commons/kafkastreams/ConfigurationKeys.java @@ -22,14 +22,16 @@ public final class ConfigurationKeys { public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic"; // Additional topics + public static final String KAFKA_FEEDBACK_TOPIC = "kafka.feedback.topic"; + public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic"; - public static final String CONFIGURATION_KAFKA_TOPIC = "configuration.kafka.topic"; + public static final String KAFKA_CONFIGURATION_TOPIC = "kafka.configuration.topic"; // UC2 - public static final String WINDOW_SIZE_MS = "window.size.ms"; + public static final String EMIT_PERIOD_MS = "emit.period.ms"; - public static final String WINDOW_GRACE_MS = "window.grace.ms"; + public static final String GRACE_PERIOD_MS = "grace.period.ms"; // UC3 public static final String KAFKA_WINDOW_DURATION_MINUTES = "kafka.window.duration.minutes"; diff --git a/build.gradle b/build.gradle index 9311474c4c23d8c3400768b1f7d2d538fd5597e6..1e388cb9665b43e004a1854248acc04e1cda387c 100644 --- a/build.gradle +++ b/build.gradle @@ -64,9 +64,10 @@ configure(useCaseApplications) { // These dependencies are used internally, and not exposed to consumers on their own compile classpath. implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true } implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true } + implementation 'org.apache.kafka:kafka-streams:2.6.0' // enable TransformerSuppliers implementation 'com.google.guava:guava:24.1-jre' implementation 'org.jctools:jctools-core:2.1.1' - implementation 'org.slf4j:slf4j-simple:1.6.1' + implementation 'org.slf4j:slf4j-simple:1.7.25' implementation project(':application-kafkastreams-commons') // Use JUnit test framework @@ -82,7 +83,7 @@ configure(useCaseGenerators) { implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true } implementation 'com.google.guava:guava:24.1-jre' implementation 'org.jctools:jctools-core:2.1.1' - implementation 'org.slf4j:slf4j-simple:1.6.1' + implementation 'org.slf4j:slf4j-simple:1.7.25' // These dependencies are used for the workload-generator-commmon implementation project(':workload-generator-commons') @@ -96,7 +97,7 @@ configure(useCaseGenerators) { configure(commonProjects) { dependencies { // These dependencies are used internally, and not exposed to consumers on their own compile classpath. - implementation 'org.slf4j:slf4j-simple:1.6.1' + implementation 'org.slf4j:slf4j-simple:1.7.25' implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true } implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true } diff --git a/execution/run_uc2.sh b/execution/run_uc2.sh index 68f592cc963847f56f316e3c214b2b4bb1d64fc6..a2a43a806ab8cc796f45d9c88f4cbf87049b0c3f 100755 --- a/execution/run_uc2.sh +++ b/execution/run_uc2.sh @@ -22,7 +22,7 @@ echo "EXECUTION_MINUTES: $EXECUTION_MINUTES" #PARTITIONS=40 #kubectl run temp-kafka --rm --attach --restart=Never --image=solsson/kafka --command -- bash -c "./bin/kafka-topics.sh --zookeeper my-confluent-cp-zookeeper:2181 --create --topic input --partitions $PARTITIONS --replication-factor 1; ./bin/kafka-topics.sh --zookeeper my-confluent-cp-zookeeper:2181 --create --topic configuration --partitions 1 --replication-factor 1; ./bin/kafka-topics.sh --zookeeper my-confluent-cp-zookeeper:2181 --create --topic output --partitions $PARTITIONS --replication-factor 1" PARTITIONS=$PARTITIONS -kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --create --topic input --partitions $PARTITIONS --replication-factor 1; kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --create --topic configuration --partitions 1 --replication-factor 1; kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --create --topic output --partitions $PARTITIONS --replication-factor 1" +kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --create --topic input --partitions $PARTITIONS --replication-factor 1; kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --create --topic aggregation-feedback --partitions $PARTITIONS --replication-factor 1; kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --create --topic configuration --partitions 1 --replication-factor 1; kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --create --topic output --partitions $PARTITIONS --replication-factor 1" # Start workload generator NUM_NESTED_GROUPS=$DIM_VALUE @@ -73,9 +73,9 @@ echo "$APPLICATION_YAML" | kubectl delete -f - #kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --delete --topic 'input,output,configuration,titan-.*'" echo "Finished execution, print topics:" #kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list" | sed -n -E '/^(titan-.*|input|output|configuration)( - marked for deletion)?$/p' -while test $(kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list" | sed -n -E '/^(theodolite-.*|input|output|configuration)( - marked for deletion)?$/p' | wc -l) -gt 0 +while test $(kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list" | sed -n -E '/^(theodolite-.*|input|aggregation-feedback|output|configuration)( - marked for deletion)?$/p' | wc -l) -gt 0 do - kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --delete --topic 'input|output|configuration|theodolite-.*'" + kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --delete --topic 'input|aggregation-feedback|output|configuration|theodolite-.*'" echo "Wait for topic deletion" sleep 5s #echo "Finished waiting, print topics:" @@ -104,7 +104,7 @@ do if [ $found -ne 1 ]; then echo "ZooKeeper reset was successful." break - else + else echo "ZooKeeper reset was not successful. Retrying in 5s." sleep 5s fi diff --git a/execution/uc2-application/aggregation-deployment.yaml b/execution/uc2-application/aggregation-deployment.yaml index 199966a31d0ccac1f5bb8e3b1c0e17e1cae1f8c9..3eca4749ad1decbf9b3fd1973fcad94febf355d8 100644 --- a/execution/uc2-application/aggregation-deployment.yaml +++ b/execution/uc2-application/aggregation-deployment.yaml @@ -28,6 +28,8 @@ spec: value: "{{KAFKA_STREAMS_COMMIT_INTERVAL_MS}}" - name: JAVA_OPTS value: "-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=5555" + - name: LOG_LEVEL + value: "INFO" resources: limits: memory: "{{MEMORY_LIMIT}}" diff --git a/uc2-application/src/main/java/theodolite/uc2/application/AggregationService.java b/uc2-application/src/main/java/theodolite/uc2/application/AggregationService.java index a193fe134311e656f1010c738675210689e1b9d6..c094adfcd7952e81115dae84ed9c0d371e380c98 100644 --- a/uc2-application/src/main/java/theodolite/uc2/application/AggregationService.java +++ b/uc2-application/src/main/java/theodolite/uc2/application/AggregationService.java @@ -40,10 +40,11 @@ public class AggregationService { final Uc2KafkaStreamsBuilder uc2KafkaStreamsBuilder = new Uc2KafkaStreamsBuilder(); uc2KafkaStreamsBuilder .inputTopic(this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC)) + .feedbackTopic(this.config.getString(ConfigurationKeys.KAFKA_FEEDBACK_TOPIC)) .outputTopic(this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC)) - .configurationTopic(this.config.getString(ConfigurationKeys.CONFIGURATION_KAFKA_TOPIC)) - .windowSize(Duration.ofMillis(this.config.getLong(ConfigurationKeys.WINDOW_SIZE_MS))) - .gracePeriod(Duration.ofMillis(this.config.getLong(ConfigurationKeys.WINDOW_GRACE_MS))); + .configurationTopic(this.config.getString(ConfigurationKeys.KAFKA_CONFIGURATION_TOPIC)) + .emitPeriod(Duration.ofMillis(this.config.getLong(ConfigurationKeys.EMIT_PERIOD_MS))) + .gracePeriod(Duration.ofMillis(this.config.getLong(ConfigurationKeys.GRACE_PERIOD_MS))); // Configuration of the stream application final KafkaStreams kafkaStreams = uc2KafkaStreamsBuilder diff --git a/uc2-application/src/main/java/theodolite/uc2/application/ConfigurationKeys.java b/uc2-application/src/main/java/theodolite/uc2/application/ConfigurationKeys.java deleted file mode 100644 index 78d72af1d3eb3585606d349166f6bafdf1048b48..0000000000000000000000000000000000000000 --- a/uc2-application/src/main/java/theodolite/uc2/application/ConfigurationKeys.java +++ /dev/null @@ -1,31 +0,0 @@ -package theodolite.uc2.application; - -/** - * Keys to access configuration parameters. - */ -public final class ConfigurationKeys { - public static final String APPLICATION_NAME = "application.name"; - - public static final String APPLICATION_VERSION = "application.version"; - - public static final String CONFIGURATION_KAFKA_TOPIC = "configuration.kafka.topic"; - - public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers"; - - public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic"; - - public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic"; - - public static final String WINDOW_SIZE_MS = "window.size.ms"; - - public static final String WINDOW_GRACE_MS = "window.grace.ms"; - - public static final String NUM_THREADS = "num.threads"; - - public static final String COMMIT_INTERVAL_MS = "commit.interval.ms"; - - public static final String CACHE_MAX_BYTES_BUFFERING = "cache.max.bytes.buffering"; - - private ConfigurationKeys() {} - -} diff --git a/uc2-application/src/main/java/theodolite/uc2/streamprocessing/ChildParentsTransformerFactory.java b/uc2-application/src/main/java/theodolite/uc2/streamprocessing/ChildParentsTransformerFactory.java deleted file mode 100644 index 3060fdaaf2605766df93b767e50e426c5ebafae9..0000000000000000000000000000000000000000 --- a/uc2-application/src/main/java/theodolite/uc2/streamprocessing/ChildParentsTransformerFactory.java +++ /dev/null @@ -1,52 +0,0 @@ -package theodolite.uc2.streamprocessing; - -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.kstream.TransformerSupplier; -import org.apache.kafka.streams.state.KeyValueStore; -import org.apache.kafka.streams.state.StoreBuilder; -import org.apache.kafka.streams.state.Stores; -import titan.ccp.configuration.events.Event; -import titan.ccp.model.sensorregistry.SensorRegistry; - -/** - * Factory class configuration required by {@link ChildParentsTransformer}. - */ -public class ChildParentsTransformerFactory { - - private static final String STORE_NAME = "CHILD-PARENTS-TRANSFORM-STATE"; - - /** - * Returns a {@link TransformerSupplier} for {@link ChildParentsTransformer}. - */ - public TransformerSupplier<Event, SensorRegistry, Iterable<KeyValue<String, Optional<Set<String>>>>> getTransformerSupplier() { // NOCS - return new TransformerSupplier<>() { - @Override - public ChildParentsTransformer get() { - return new ChildParentsTransformer(STORE_NAME); - } - }; - } - - /** - * Returns a {@link StoreBuilder} for {@link ChildParentsTransformer}. - */ - public StoreBuilder<KeyValueStore<String, Set<String>>> getStoreBuilder() { - return Stores.keyValueStoreBuilder( - Stores.persistentKeyValueStore(STORE_NAME), - Serdes.String(), - ParentsSerde.serde()) - .withLoggingEnabled(Map.of()); - } - - /** - * Returns the store name for {@link ChildParentsTransformer}. - */ - public String getStoreName() { - return STORE_NAME; - } - -} diff --git a/uc2-application/src/main/java/theodolite/uc2/streamprocessing/ChildParentsTransformerSupplier.java b/uc2-application/src/main/java/theodolite/uc2/streamprocessing/ChildParentsTransformerSupplier.java new file mode 100644 index 0000000000000000000000000000000000000000..2b2d71c2f95d052cee19394e3e62e674776f8627 --- /dev/null +++ b/uc2-application/src/main/java/theodolite/uc2/streamprocessing/ChildParentsTransformerSupplier.java @@ -0,0 +1,40 @@ +package theodolite.uc2.streamprocessing; + +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Transformer; +import org.apache.kafka.streams.kstream.TransformerSupplier; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.Stores; +import titan.ccp.configuration.events.Event; +import titan.ccp.model.sensorregistry.SensorRegistry; + +/** + * Supplier class for a {@link ChildParentsTransformer}. + */ +public class ChildParentsTransformerSupplier implements + TransformerSupplier<Event, SensorRegistry, Iterable<KeyValue<String, Optional<Set<String>>>>> { + + private static final String STORE_NAME = "CHILD-PARENTS-TRANSFORM-STATE"; + + @Override + public Transformer<Event, SensorRegistry, Iterable<KeyValue<String, Optional<Set<String>>>>> get() { // NOCS + return new ChildParentsTransformer(STORE_NAME); + } + + @Override + public Set<StoreBuilder<?>> stores() { + final StoreBuilder<KeyValueStore<String, Set<String>>> store = Stores.keyValueStoreBuilder( + Stores.persistentKeyValueStore(STORE_NAME), + Serdes.String(), + ParentsSerde.serde()) + .withLoggingEnabled(Map.of()); + + return Set.of(store); + } + +} diff --git a/uc2-application/src/main/java/theodolite/uc2/streamprocessing/JointFlatTransformerFactory.java b/uc2-application/src/main/java/theodolite/uc2/streamprocessing/JointFlatTransformerFactory.java deleted file mode 100644 index cf4362a21ebd0e7b3bb9c4cad4ca871d0b3f2ea8..0000000000000000000000000000000000000000 --- a/uc2-application/src/main/java/theodolite/uc2/streamprocessing/JointFlatTransformerFactory.java +++ /dev/null @@ -1,50 +0,0 @@ -package theodolite.uc2.streamprocessing; - -import java.util.Map; -import java.util.Set; -import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.kstream.TransformerSupplier; -import org.apache.kafka.streams.state.KeyValueStore; -import org.apache.kafka.streams.state.StoreBuilder; -import org.apache.kafka.streams.state.Stores; -import titan.ccp.model.records.ActivePowerRecord; - -/** - * Factory class configuration required by {@link JointFlatTransformerFactory}. - */ -public class JointFlatTransformerFactory { - - private static final String STORE_NAME = "JOINT-FLAT-MAP-TRANSFORM-STATE"; - - /** - * Returns a {@link TransformerSupplier} for {@link JointFlatTransformer}. - */ - public TransformerSupplier<String, JointRecordParents, Iterable<KeyValue<SensorParentKey, ActivePowerRecord>>> getTransformerSupplier() { // NOCS - return new TransformerSupplier<>() { - @Override - public JointFlatTransformer get() { - return new JointFlatTransformer(STORE_NAME); - } - }; - } - - /** - * Returns a {@link StoreBuilder} for {@link JointFlatTransformer}. - */ - public StoreBuilder<KeyValueStore<String, Set<String>>> getStoreBuilder() { - return Stores.keyValueStoreBuilder( - Stores.persistentKeyValueStore(STORE_NAME), - Serdes.String(), - ParentsSerde.serde()) - .withLoggingEnabled(Map.of()); - } - - /** - * Returns the store name for {@link JointFlatTransformer}. - */ - public String getStoreName() { - return STORE_NAME; - } - -} diff --git a/uc2-application/src/main/java/theodolite/uc2/streamprocessing/JointFlatTransformerSupplier.java b/uc2-application/src/main/java/theodolite/uc2/streamprocessing/JointFlatTransformerSupplier.java new file mode 100644 index 0000000000000000000000000000000000000000..7d9a7df3d465260623abef2b13e9f3765925bc57 --- /dev/null +++ b/uc2-application/src/main/java/theodolite/uc2/streamprocessing/JointFlatTransformerSupplier.java @@ -0,0 +1,38 @@ +package theodolite.uc2.streamprocessing; + +import java.util.Map; +import java.util.Set; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Transformer; +import org.apache.kafka.streams.kstream.TransformerSupplier; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.Stores; +import titan.ccp.model.records.ActivePowerRecord; + +/** + * Supplier class for {@link JointFlatTransformerSupplier}. + */ +public class JointFlatTransformerSupplier implements + TransformerSupplier<String, JointRecordParents, Iterable<KeyValue<SensorParentKey, ActivePowerRecord>>> { // NOCS + + private static final String STORE_NAME = "JOINT-FLAT-MAP-TRANSFORM-STATE"; + + @Override + public Transformer<String, JointRecordParents, Iterable<KeyValue<SensorParentKey, ActivePowerRecord>>> get() { // NOCS + return new JointFlatTransformer(STORE_NAME); + } + + @Override + public Set<StoreBuilder<?>> stores() { + final StoreBuilder<KeyValueStore<String, Set<String>>> store = Stores.keyValueStoreBuilder( + Stores.persistentKeyValueStore(STORE_NAME), + Serdes.String(), + ParentsSerde.serde()) + .withLoggingEnabled(Map.of()); + + return Set.of(store); + } + +} diff --git a/uc2-application/src/main/java/theodolite/uc2/streamprocessing/TopologyBuilder.java b/uc2-application/src/main/java/theodolite/uc2/streamprocessing/TopologyBuilder.java index b2dfae12a0bd207b490086d8ca0767d5a6b9cb1d..c09fa3ead7553bda5cd8e8f09079f846b89d5d17 100644 --- a/uc2-application/src/main/java/theodolite/uc2/streamprocessing/TopologyBuilder.java +++ b/uc2-application/src/main/java/theodolite/uc2/streamprocessing/TopologyBuilder.java @@ -1,6 +1,5 @@ package theodolite.uc2.streamprocessing; -import com.google.common.math.StatsAccumulator; import java.time.Duration; import java.util.Set; import org.apache.kafka.common.serialization.Serdes; @@ -31,40 +30,48 @@ import titan.ccp.model.sensorregistry.SensorRegistry; * Builds Kafka Stream Topology for the History microservice. */ public class TopologyBuilder { - - - private static final int LATENCY_OUTPOUT_THRESHOLD = 1000; - private static final Logger LOGGER = LoggerFactory.getLogger(TopologyBuilder.class); - + // Streams Variables private final String inputTopic; + private final String feedbackTopic; private final String outputTopic; private final String configurationTopic; - private final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory; - private final Duration windowSize; + private final Duration emitPeriod; private final Duration gracePeriod; + // SERDEs + private final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory; + private final StreamsBuilder builder = new StreamsBuilder(); private final RecordAggregator recordAggregator = new RecordAggregator(); - private StatsAccumulator latencyStats = new StatsAccumulator(); - private long lastTime = System.currentTimeMillis(); - /** * Create a new {@link TopologyBuilder} using the given topics. + * + * @param inputTopic The topic where to read sensor measurements from. + * @param configurationTopic The topic where the hierarchy of the sensors is published. + * @param feedbackTopic The topic where aggregation results are written to for feedback. + * @param outputTopic The topic where to publish aggregation results. + * @param emitPeriod The Duration results are emitted with. + * @param gracePeriod The Duration for how long late arriving records are considered. + * @param srAvroSerdeFactory Factory for creating avro SERDEs + * */ public TopologyBuilder(final String inputTopic, final String outputTopic, - final String configurationTopic, final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory, - final Duration windowSize, final Duration gracePeriod) { + final String feedbackTopic, final String configurationTopic, + final Duration emitPeriod, final Duration gracePeriod, + final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory) { this.inputTopic = inputTopic; - this.outputTopic = outputTopic; + this.feedbackTopic = feedbackTopic; this.configurationTopic = configurationTopic; - this.srAvroSerdeFactory = srAvroSerdeFactory; - this.windowSize = windowSize; + this.outputTopic = outputTopic; + this.emitPeriod = emitPeriod; this.gracePeriod = gracePeriod; + + this.srAvroSerdeFactory = srAvroSerdeFactory; } /** - * Build the {@link Topology} for the History microservice. + * Build the {@link Topology} for the Aggregation microservice. */ public Topology build() { // 1. Build Parent-Sensor Table @@ -78,9 +85,12 @@ public class TopologyBuilder { this.buildLastValueTable(parentSensorTable, inputTable); // 4. Build Aggregations Stream - final KStream<String, AggregatedActivePowerRecord> aggregations = + final KTable<Windowed<String>, AggregatedActivePowerRecord> aggregations = this.buildAggregationStream(lastValueTable); + // 6. Expose Feedback Stream + this.exposeFeedbackStream(aggregations); + // 5. Expose Aggregations Stream this.exposeOutputStream(aggregations); @@ -92,19 +102,20 @@ public class TopologyBuilder { .stream(this.inputTopic, Consumed.with( Serdes.String(), this.srAvroSerdeFactory.forValues())); + final KStream<String, ActivePowerRecord> aggregationsInput = this.builder - .stream(this.outputTopic, Consumed.with( + .stream(this.feedbackTopic, Consumed.with( Serdes.String(), this.srAvroSerdeFactory.<AggregatedActivePowerRecord>forValues())) .mapValues(r -> new ActivePowerRecord(r.getIdentifier(), r.getTimestamp(), r.getSumInW())); final KTable<String, ActivePowerRecord> inputTable = values .merge(aggregationsInput) - .mapValues((k, v) -> new ActivePowerRecord(v.getIdentifier(), System.currentTimeMillis(), - v.getValueInW())) - .groupByKey(Grouped.with(Serdes.String(), + .groupByKey(Grouped.with( + Serdes.String(), this.srAvroSerdeFactory.forValues())) - .reduce((aggr, value) -> value, Materialized.with(Serdes.String(), + .reduce((aggr, value) -> value, Materialized.with( + Serdes.String(), this.srAvroSerdeFactory.forValues())); return inputTable; } @@ -115,15 +126,9 @@ public class TopologyBuilder { .filter((key, value) -> key == Event.SENSOR_REGISTRY_CHANGED || key == Event.SENSOR_REGISTRY_STATUS); - final ChildParentsTransformerFactory childParentsTransformerFactory = - new ChildParentsTransformerFactory(); - this.builder.addStateStore(childParentsTransformerFactory.getStoreBuilder()); - return configurationStream .mapValues(data -> SensorRegistry.fromJson(data)) - .flatTransform( - childParentsTransformerFactory.getTransformerSupplier(), - childParentsTransformerFactory.getStoreName()) + .flatTransform(new ChildParentsTransformerSupplier()) .groupByKey(Grouped.with(Serdes.String(), OptionalParentsSerde.serde())) .aggregate( () -> Set.<String>of(), @@ -131,33 +136,27 @@ public class TopologyBuilder { Materialized.with(Serdes.String(), ParentsSerde.serde())); } - private KTable<Windowed<SensorParentKey>, ActivePowerRecord> buildLastValueTable( final KTable<String, Set<String>> parentSensorTable, final KTable<String, ActivePowerRecord> inputTable) { - final JointFlatTransformerFactory jointFlatMapTransformerFactory = - new JointFlatTransformerFactory(); - this.builder.addStateStore(jointFlatMapTransformerFactory.getStoreBuilder()); return inputTable .join(parentSensorTable, (record, parents) -> new JointRecordParents(parents, record)) .toStream() - .flatTransform( - jointFlatMapTransformerFactory.getTransformerSupplier(), - jointFlatMapTransformerFactory.getStoreName()) + .flatTransform(new JointFlatTransformerSupplier()) .groupByKey(Grouped.with( SensorParentKeySerde.serde(), this.srAvroSerdeFactory.forValues())) - .windowedBy(TimeWindows.of(this.windowSize).grace(this.gracePeriod)) + .windowedBy(TimeWindows.of(this.emitPeriod).grace(this.gracePeriod)) .reduce( // TODO Configurable window aggregation function - (aggValue, newValue) -> newValue, - Materialized.with(SensorParentKeySerde.serde(), + (oldVal, newVal) -> newVal.getTimestamp() >= oldVal.getTimestamp() ? newVal : oldVal, + Materialized.with( + SensorParentKeySerde.serde(), this.srAvroSerdeFactory.forValues())); - } - private KStream<String, AggregatedActivePowerRecord> buildAggregationStream( + private KTable<Windowed<String>, AggregatedActivePowerRecord> buildAggregationStream( final KTable<Windowed<SensorParentKey>, ActivePowerRecord> lastValueTable) { return lastValueTable .groupBy( @@ -165,50 +164,42 @@ public class TopologyBuilder { Grouped.with( new WindowedSerdes.TimeWindowedSerde<>( Serdes.String(), - this.windowSize.toMillis()), + this.emitPeriod.toMillis()), this.srAvroSerdeFactory.forValues())) .aggregate( - () -> null, this.recordAggregator::add, this.recordAggregator::substract, + () -> null, + this.recordAggregator::add, + this.recordAggregator::substract, Materialized.with( new WindowedSerdes.TimeWindowedSerde<>( Serdes.String(), - this.windowSize.toMillis()), + this.emitPeriod.toMillis()), this.srAvroSerdeFactory.forValues())) - .suppress(Suppressed.untilTimeLimit(this.windowSize, BufferConfig.unbounded())) - // .suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded())) - .toStream() // TODO timestamp -1 indicates that this record is emitted by an substract event - .filter((k, record) -> record.getTimestamp() != -1) - .map((k, v) -> KeyValue.pair(k.key(), v)); // TODO compute Timestamp + .filter((k, record) -> record.getTimestamp() != -1); + } + + private void exposeFeedbackStream( + final KTable<Windowed<String>, AggregatedActivePowerRecord> aggregations) { + + aggregations + .toStream() + .filter((k, record) -> record != null) + .selectKey((k, v) -> k.key()) + .to(this.feedbackTopic, Produced.with( + Serdes.String(), + this.srAvroSerdeFactory.forValues())); } - private void exposeOutputStream(final KStream<String, AggregatedActivePowerRecord> aggregations) { + private void exposeOutputStream( + final KTable<Windowed<String>, AggregatedActivePowerRecord> aggregations) { + aggregations - .peek((k, v) -> { - final long time = System.currentTimeMillis(); - final long latency = time - v.getTimestamp(); - this.latencyStats.add(latency); - if (time - this.lastTime >= LATENCY_OUTPOUT_THRESHOLD) { - if (LOGGER.isInfoEnabled()) { - LOGGER.info("latency," - + time + ',' - + this.latencyStats.mean() + ',' - + (this.latencyStats.count() > 0 - ? this.latencyStats.populationStandardDeviation() - : Double.NaN) - + ',' - + (this.latencyStats.count() > 1 - ? this.latencyStats.sampleStandardDeviation() - : Double.NaN) - + ',' - + this.latencyStats.min() + ',' - + this.latencyStats.max() + ',' - + this.latencyStats.count()); - } - this.latencyStats = new StatsAccumulator(); - this.lastTime = time; - } - }) + // .suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded())) + .suppress(Suppressed.untilTimeLimit(this.emitPeriod, BufferConfig.unbounded())) + .toStream() + .filter((k, record) -> record != null) + .selectKey((k, v) -> k.key()) .to(this.outputTopic, Produced.with( Serdes.String(), this.srAvroSerdeFactory.forValues())); diff --git a/uc2-application/src/main/java/theodolite/uc2/streamprocessing/Uc2KafkaStreamsBuilder.java b/uc2-application/src/main/java/theodolite/uc2/streamprocessing/Uc2KafkaStreamsBuilder.java index 2f3e5c7e994a3d194810016c4664a5a83c4cc21b..16addb8510eec2254d4787edbfbfbe186996fdea 100644 --- a/uc2-application/src/main/java/theodolite/uc2/streamprocessing/Uc2KafkaStreamsBuilder.java +++ b/uc2-application/src/main/java/theodolite/uc2/streamprocessing/Uc2KafkaStreamsBuilder.java @@ -11,13 +11,14 @@ import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; */ public class Uc2KafkaStreamsBuilder extends KafkaStreamsBuilder { // NOPMD builder method - private static final Duration WINDOW_SIZE_DEFAULT = Duration.ofSeconds(1); + private static final Duration EMIT_PERIOD_DEFAULT = Duration.ofSeconds(1); private static final Duration GRACE_PERIOD_DEFAULT = Duration.ZERO; private String inputTopic; // NOPMD + private String feedbackTopic; // NOPMD private String outputTopic; // NOPMD private String configurationTopic; // NOPMD - private Duration windowSize; // NOPMD + private Duration emitPeriod; // NOPMD private Duration gracePeriod; // NOPMD public Uc2KafkaStreamsBuilder inputTopic(final String inputTopic) { @@ -25,6 +26,11 @@ public class Uc2KafkaStreamsBuilder extends KafkaStreamsBuilder { // NOPMD build return this; } + public Uc2KafkaStreamsBuilder feedbackTopic(final String feedbackTopic) { + this.feedbackTopic = feedbackTopic; + return this; + } + public Uc2KafkaStreamsBuilder outputTopic(final String outputTopic) { this.outputTopic = outputTopic; return this; @@ -35,8 +41,8 @@ public class Uc2KafkaStreamsBuilder extends KafkaStreamsBuilder { // NOPMD build return this; } - public Uc2KafkaStreamsBuilder windowSize(final Duration windowSize) { - this.windowSize = Objects.requireNonNull(windowSize); + public Uc2KafkaStreamsBuilder emitPeriod(final Duration emitPeriod) { + this.emitPeriod = Objects.requireNonNull(emitPeriod); return this; } @@ -48,16 +54,18 @@ public class Uc2KafkaStreamsBuilder extends KafkaStreamsBuilder { // NOPMD build @Override protected Topology buildTopology() { Objects.requireNonNull(this.inputTopic, "Input topic has not been set."); + Objects.requireNonNull(this.feedbackTopic, "Feedback topic has not been set."); Objects.requireNonNull(this.outputTopic, "Output topic has not been set."); Objects.requireNonNull(this.configurationTopic, "Configuration topic has not been set."); final TopologyBuilder topologyBuilder = new TopologyBuilder( this.inputTopic, + this.feedbackTopic, this.outputTopic, this.configurationTopic, - new SchemaRegistryAvroSerdeFactory(this.schemaRegistryUrl), - this.windowSize == null ? WINDOW_SIZE_DEFAULT : this.windowSize, - this.gracePeriod == null ? GRACE_PERIOD_DEFAULT : this.gracePeriod); + this.emitPeriod == null ? EMIT_PERIOD_DEFAULT : this.emitPeriod, + this.gracePeriod == null ? GRACE_PERIOD_DEFAULT : this.gracePeriod, + new SchemaRegistryAvroSerdeFactory(this.schemaRegistryUrl)); return topologyBuilder.build(); } diff --git a/uc2-application/src/main/resources/META-INF/application.properties b/uc2-application/src/main/resources/META-INF/application.properties index 74f47163d0fa02d1e3b582aab53bc8907a7855af..10c47960adb012ba5c572e3833a37d821189eb8e 100644 --- a/uc2-application/src/main/resources/META-INF/application.properties +++ b/uc2-application/src/main/resources/META-INF/application.properties @@ -1,18 +1,16 @@ application.name=theodolite-uc2-application application.version=0.0.1 -configuration.host=localhost -configuration.port=8082 -configuration.kafka.topic=configuration - kafka.bootstrap.servers=localhost:9092 kafka.input.topic=input +kafka.configuration.topic=configuration +kafka.feedback.topic=aggregation-feedback kafka.output.topic=output schema.registry.url=http://localhost:8091 -window.size.ms=1000 -window.grace.ms=0 +emit.period.ms=5000 +grace.period.ms=0 num.threads=1 commit.interval.ms=100