From 993cf117054e3e6dd03ea5eb3e5c07e73cf7e2cc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6ren=20Henning?= <soeren.henning@email.uni-kiel.de> Date: Tue, 2 Mar 2021 11:48:07 +0100 Subject: [PATCH] Rearrange use case enumeration in sources --- .../.settings/org.eclipse.jdt.ui.prefs | 2 +- benchmarks/uc2-application/Dockerfile | 2 +- benchmarks/uc2-application/build.gradle | 2 +- .../uc2}/application/HistoryService.java | 20 +- .../uc2/streamprocessing/TopologyBuilder.java | 194 +++------------- .../Uc2KafkaStreamsBuilder.java | 43 +--- .../streamprocessing/util/StatsFactory.java | 2 +- .../resources/META-INF/application.properties | 6 +- benchmarks/uc2-workload-generator/Dockerfile | 10 +- .../uc2-workload-generator/build.gradle | 2 +- .../uc2/workloadgenerator/LoadGenerator.java | 48 +--- .../.settings/org.eclipse.jdt.ui.prefs | 2 +- benchmarks/uc3-application/Dockerfile | 4 +- .../uc3/application/HistoryService.java | 10 +- .../uc3}/streamprocessing/HourOfDayKey.java | 2 +- .../streamprocessing/HourOfDayKeyFactory.java | 2 +- .../streamprocessing/HourOfDayKeySerde.java | 2 +- .../HourOfDayRecordFactory.java | 2 +- .../RecordDatabaseAdapter.java | 2 +- .../streamprocessing/StatsKeyFactory.java | 2 +- .../streamprocessing/StatsRecordFactory.java | 2 +- .../uc3/streamprocessing/TopologyBuilder.java | 55 +++-- .../Uc3KafkaStreamsBuilder.java | 24 +- .../resources/META-INF/application.properties | 3 +- benchmarks/uc3-workload-generator/Dockerfile | 12 +- .../uc3/workloadgenerator/LoadGenerator.java | 5 +- .../.settings/org.eclipse.jdt.ui.prefs | 2 +- benchmarks/uc4-application/Dockerfile | 4 +- .../README.md | 0 benchmarks/uc4-application/build.gradle | 2 +- .../uc4}/application/AggregationService.java | 10 +- .../ChildParentsTransformer.java | 2 +- .../ChildParentsTransformerSupplier.java | 2 +- .../JointFlatTransformer.java | 2 +- .../JointFlatTransformerSupplier.java | 2 +- .../streamprocessing/JointRecordParents.java | 2 +- .../OptionalParentsSerde.java | 2 +- .../uc4}/streamprocessing/ParentsSerde.java | 2 +- .../streamprocessing/RecordAggregator.java | 2 +- .../streamprocessing/SensorParentKey.java | 2 +- .../SensorParentKeySerde.java | 2 +- .../uc4/streamprocessing/TopologyBuilder.java | 215 +++++++++++++----- .../Uc4KafkaStreamsBuilder.java | 41 +++- .../resources/META-INF/application.properties | 7 +- .../OptionalParentsSerdeTest.java | 2 +- .../streamprocessing/ParentsSerdeTest.java | 2 +- .../SensorParentKeySerdeTest.java | 2 +- .../uc4}/streamprocessing/SerdeTester.java | 2 +- .../streamprocessing/SerdeTesterFactory.java | 2 +- benchmarks/uc4-workload-generator/Dockerfile | 2 +- .../uc4-workload-generator/build.gradle | 2 +- .../workloadgenerator/ConfigPublisher.java | 2 +- .../uc4/workloadgenerator/LoadGenerator.java | 51 ++++- .../SensorRegistryBuilder.java | 2 +- .../resources/META-INF/application.properties | 0 .../SensorRegistryBuilderTest.java | 2 +- 56 files changed, 414 insertions(+), 418 deletions(-) rename benchmarks/{uc4-application/src/main/java/theodolite/uc4 => uc2-application/src/main/java/theodolite/uc2}/application/HistoryService.java (64%) rename benchmarks/{uc4-application/src/main/java/theodolite/uc4 => uc2-application/src/main/java/theodolite/uc2}/streamprocessing/util/StatsFactory.java (91%) rename benchmarks/{uc4-application/src/main/java/theodolite/uc4 => uc3-application/src/main/java/theodolite/uc3}/streamprocessing/HourOfDayKey.java (96%) rename benchmarks/{uc4-application/src/main/java/theodolite/uc4 => uc3-application/src/main/java/theodolite/uc3}/streamprocessing/HourOfDayKeyFactory.java (92%) rename benchmarks/{uc4-application/src/main/java/theodolite/uc4 => uc3-application/src/main/java/theodolite/uc3}/streamprocessing/HourOfDayKeySerde.java (96%) rename benchmarks/{uc4-application/src/main/java/theodolite/uc4 => uc3-application/src/main/java/theodolite/uc3}/streamprocessing/HourOfDayRecordFactory.java (95%) rename benchmarks/{uc4-application/src/main/java/theodolite/uc4 => uc3-application/src/main/java/theodolite/uc3}/streamprocessing/RecordDatabaseAdapter.java (98%) rename benchmarks/{uc4-application/src/main/java/theodolite/uc4 => uc3-application/src/main/java/theodolite/uc3}/streamprocessing/StatsKeyFactory.java (88%) rename benchmarks/{uc4-application/src/main/java/theodolite/uc4 => uc3-application/src/main/java/theodolite/uc3}/streamprocessing/StatsRecordFactory.java (94%) rename benchmarks/{uc2-application => uc4-application}/README.md (100%) rename benchmarks/{uc2-application/src/main/java/theodolite/uc2 => uc4-application/src/main/java/theodolite/uc4}/application/AggregationService.java (85%) rename benchmarks/{uc2-application/src/main/java/theodolite/uc2 => uc4-application/src/main/java/theodolite/uc4}/streamprocessing/ChildParentsTransformer.java (99%) rename benchmarks/{uc2-application/src/main/java/theodolite/uc2 => uc4-application/src/main/java/theodolite/uc4}/streamprocessing/ChildParentsTransformerSupplier.java (97%) rename benchmarks/{uc2-application/src/main/java/theodolite/uc2 => uc4-application/src/main/java/theodolite/uc4}/streamprocessing/JointFlatTransformer.java (98%) rename benchmarks/{uc2-application/src/main/java/theodolite/uc2 => uc4-application/src/main/java/theodolite/uc4}/streamprocessing/JointFlatTransformerSupplier.java (96%) rename benchmarks/{uc2-application/src/main/java/theodolite/uc2 => uc4-application/src/main/java/theodolite/uc4}/streamprocessing/JointRecordParents.java (96%) rename benchmarks/{uc2-application/src/main/java/theodolite/uc2 => uc4-application/src/main/java/theodolite/uc4}/streamprocessing/OptionalParentsSerde.java (97%) rename benchmarks/{uc2-application/src/main/java/theodolite/uc2 => uc4-application/src/main/java/theodolite/uc4}/streamprocessing/ParentsSerde.java (96%) rename benchmarks/{uc2-application/src/main/java/theodolite/uc2 => uc4-application/src/main/java/theodolite/uc4}/streamprocessing/RecordAggregator.java (97%) rename benchmarks/{uc2-application/src/main/java/theodolite/uc2 => uc4-application/src/main/java/theodolite/uc4}/streamprocessing/SensorParentKey.java (96%) rename benchmarks/{uc2-application/src/main/java/theodolite/uc2 => uc4-application/src/main/java/theodolite/uc4}/streamprocessing/SensorParentKeySerde.java (95%) rename benchmarks/{uc2-application/src/test/java/theodolite/uc2 => uc4-application/src/test/java/theodolite/uc4}/streamprocessing/OptionalParentsSerdeTest.java (95%) rename benchmarks/{uc2-application/src/test/java/theodolite/uc2 => uc4-application/src/test/java/theodolite/uc4}/streamprocessing/ParentsSerdeTest.java (91%) rename benchmarks/{uc2-application/src/test/java/theodolite/uc2 => uc4-application/src/test/java/theodolite/uc4}/streamprocessing/SensorParentKeySerdeTest.java (92%) rename benchmarks/{uc2-application/src/test/java/theodolite/uc2 => uc4-application/src/test/java/theodolite/uc4}/streamprocessing/SerdeTester.java (94%) rename benchmarks/{uc2-application/src/test/java/theodolite/uc2 => uc4-application/src/test/java/theodolite/uc4}/streamprocessing/SerdeTesterFactory.java (94%) rename benchmarks/{uc2-workload-generator/src/main/java/theodolite/uc2 => uc4-workload-generator/src/main/java/theodolite/uc4}/workloadgenerator/ConfigPublisher.java (98%) rename benchmarks/{uc2-workload-generator/src/main/java/theodolite/uc2 => uc4-workload-generator/src/main/java/theodolite/uc4}/workloadgenerator/SensorRegistryBuilder.java (97%) rename benchmarks/{uc2-workload-generator => uc4-workload-generator}/src/main/resources/META-INF/application.properties (100%) rename benchmarks/{uc2-workload-generator/src/test/java/theodolite/uc2 => uc4-workload-generator/src/test/java/theodolite/uc4}/workloadgenerator/SensorRegistryBuilderTest.java (97%) diff --git a/benchmarks/uc1-application/.settings/org.eclipse.jdt.ui.prefs b/benchmarks/uc1-application/.settings/org.eclipse.jdt.ui.prefs index 4e04e2891..fa98ca63d 100644 --- a/benchmarks/uc1-application/.settings/org.eclipse.jdt.ui.prefs +++ b/benchmarks/uc1-application/.settings/org.eclipse.jdt.ui.prefs @@ -32,7 +32,7 @@ cleanup.qualify_static_member_accesses_with_declaring_class=true cleanup.qualify_static_method_accesses_with_declaring_class=false cleanup.remove_private_constructors=true cleanup.remove_redundant_modifiers=false -cleanup.remove_redundant_semicolons=false +cleanup.remove_redundant_semicolons=true cleanup.remove_redundant_type_arguments=true cleanup.remove_trailing_whitespaces=true cleanup.remove_trailing_whitespaces_all=true diff --git a/benchmarks/uc2-application/Dockerfile b/benchmarks/uc2-application/Dockerfile index 99076645a..5177dcede 100644 --- a/benchmarks/uc2-application/Dockerfile +++ b/benchmarks/uc2-application/Dockerfile @@ -2,5 +2,5 @@ FROM openjdk:11-slim ADD build/distributions/uc2-application.tar / -CMD JAVA_OPTS="$JAVA_OPTS -Dorg.slf4j.simpleLogger.defaultLogLevel=$LOG_LEVEL" \ +CMD JAVA_OPTS="$JAVA_OPTS -Dorg.slf4j.simpleLogger.defaultLogLevel=$LOG_LEVEL" \ /uc2-application/bin/uc2-application \ No newline at end of file diff --git a/benchmarks/uc2-application/build.gradle b/benchmarks/uc2-application/build.gradle index ea3d8779a..e4d3f5346 100644 --- a/benchmarks/uc2-application/build.gradle +++ b/benchmarks/uc2-application/build.gradle @@ -1 +1 @@ -mainClassName = "theodolite.uc2.application.AggregationService" +mainClassName = "theodolite.uc2.application.HistoryService" diff --git a/benchmarks/uc4-application/src/main/java/theodolite/uc4/application/HistoryService.java b/benchmarks/uc2-application/src/main/java/theodolite/uc2/application/HistoryService.java similarity index 64% rename from benchmarks/uc4-application/src/main/java/theodolite/uc4/application/HistoryService.java rename to benchmarks/uc2-application/src/main/java/theodolite/uc2/application/HistoryService.java index 12f35e8dc..1aa28400c 100644 --- a/benchmarks/uc4-application/src/main/java/theodolite/uc4/application/HistoryService.java +++ b/benchmarks/uc2-application/src/main/java/theodolite/uc2/application/HistoryService.java @@ -1,11 +1,12 @@ -package theodolite.uc4.application; +package theodolite.uc2.application; import java.time.Duration; +import java.util.Objects; import java.util.concurrent.CompletableFuture; import org.apache.commons.configuration2.Configuration; import org.apache.kafka.streams.KafkaStreams; import theodolite.commons.kafkastreams.ConfigurationKeys; -import theodolite.uc4.streamprocessing.Uc4KafkaStreamsBuilder; +import theodolite.uc2.streamprocessing.Uc2KafkaStreamsBuilder; import titan.ccp.common.configuration.ServiceConfigurations; /** @@ -18,6 +19,8 @@ public class HistoryService { private final Configuration config = ServiceConfigurations.createWithDefaults(); private final CompletableFuture<Void> stopEvent = new CompletableFuture<>(); + private final int windowDurationMinutes = Integer + .parseInt(Objects.requireNonNullElse(System.getenv("KAFKA_WINDOW_DURATION_MINUTES"), "60")); /** * Start the service. @@ -31,17 +34,12 @@ public class HistoryService { * */ private void createKafkaStreamsApplication() { - // Use case specific stream configuration - final Uc4KafkaStreamsBuilder uc4KafkaStreamsBuilder = new Uc4KafkaStreamsBuilder(this.config); - uc4KafkaStreamsBuilder + final Uc2KafkaStreamsBuilder uc2KafkaStreamsBuilder = new Uc2KafkaStreamsBuilder(this.config); + uc2KafkaStreamsBuilder .outputTopic(this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC)) - .aggregtionDuration( - Duration.ofDays(this.config.getInt(ConfigurationKeys.AGGREGATION_DURATION_DAYS))) - .aggregationAdvance( - Duration.ofDays(this.config.getInt(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS))); + .windowDuration(Duration.ofMinutes(this.windowDurationMinutes)); - // Configuration of the stream application - final KafkaStreams kafkaStreams = uc4KafkaStreamsBuilder.build(); + final KafkaStreams kafkaStreams = uc2KafkaStreamsBuilder.build(); this.stopEvent.thenRun(kafkaStreams::close); kafkaStreams.start(); diff --git a/benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/TopologyBuilder.java b/benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/TopologyBuilder.java index 74e9bb99b..eda7c495a 100644 --- a/benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/TopologyBuilder.java +++ b/benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/TopologyBuilder.java @@ -1,206 +1,74 @@ package theodolite.uc2.streamprocessing; +import com.google.common.math.Stats; import java.time.Duration; import java.util.Properties; -import java.util.Set; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.Consumed; -import org.apache.kafka.streams.kstream.Grouped; -import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Produced; -import org.apache.kafka.streams.kstream.Suppressed; -import org.apache.kafka.streams.kstream.Suppressed.BufferConfig; import org.apache.kafka.streams.kstream.TimeWindows; -import org.apache.kafka.streams.kstream.Windowed; -import org.apache.kafka.streams.kstream.WindowedSerdes; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import theodolite.uc2.streamprocessing.util.StatsFactory; +import titan.ccp.common.kafka.GenericSerde; import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; -import titan.ccp.configuration.events.Event; -import titan.ccp.configuration.events.EventSerde; import titan.ccp.model.records.ActivePowerRecord; -import titan.ccp.model.records.AggregatedActivePowerRecord; -import titan.ccp.model.sensorregistry.SensorRegistry; /** * Builds Kafka Stream Topology for the History microservice. */ public class TopologyBuilder { - // Streams Variables + + private static final Logger LOGGER = LoggerFactory.getLogger(TopologyBuilder.class); + private final String inputTopic; - private final String feedbackTopic; private final String outputTopic; - private final String configurationTopic; - private final Duration emitPeriod; - private final Duration gracePeriod; - - // Serdes private final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory; + private final Duration duration; private final StreamsBuilder builder = new StreamsBuilder(); - private final RecordAggregator recordAggregator = new RecordAggregator(); /** * Create a new {@link TopologyBuilder} using the given topics. - * - * @param inputTopic The topic where to read sensor measurements from. - * @param configurationTopic The topic where the hierarchy of the sensors is published. - * @param feedbackTopic The topic where aggregation results are written to for feedback. - * @param outputTopic The topic where to publish aggregation results. - * @param emitPeriod The Duration results are emitted with. - * @param gracePeriod The Duration for how long late arriving records are considered. - * @param srAvroSerdeFactory Factory for creating avro SERDEs - * */ public TopologyBuilder(final String inputTopic, final String outputTopic, - final String feedbackTopic, final String configurationTopic, - final Duration emitPeriod, final Duration gracePeriod, - final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory) { + final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory, + final Duration duration) { this.inputTopic = inputTopic; this.outputTopic = outputTopic; - this.feedbackTopic = feedbackTopic; - this.configurationTopic = configurationTopic; - this.emitPeriod = emitPeriod; - this.gracePeriod = gracePeriod; - this.srAvroSerdeFactory = srAvroSerdeFactory; + this.duration = duration; } /** - * Build the {@link Topology} for the Aggregation microservice. + * Build the {@link Topology} for the History microservice. */ public Topology build(final Properties properties) { - // 1. Build Parent-Sensor Table - final KTable<String, Set<String>> parentSensorTable = this.buildParentSensorTable(); - - // 2. Build Input Table - final KTable<String, ActivePowerRecord> inputTable = this.buildInputTable(); - - // 3. Build Last Value Table from Input and Parent-Sensor Table - final KTable<Windowed<SensorParentKey>, ActivePowerRecord> lastValueTable = - this.buildLastValueTable(parentSensorTable, inputTable); - - // 4. Build Aggregations Stream - final KTable<Windowed<String>, AggregatedActivePowerRecord> aggregations = - this.buildAggregationStream(lastValueTable); - - // 6. Expose Feedback Stream - this.exposeFeedbackStream(aggregations); - - // 5. Expose Aggregations Stream - this.exposeOutputStream(aggregations); - - return this.builder.build(properties); - } - - private KTable<String, ActivePowerRecord> buildInputTable() { - final KStream<String, ActivePowerRecord> values = this.builder - .stream(this.inputTopic, Consumed.with( - Serdes.String(), - this.srAvroSerdeFactory.forValues())); - - final KStream<String, ActivePowerRecord> aggregationsInput = this.builder - .stream(this.feedbackTopic, Consumed.with( - Serdes.String(), - this.srAvroSerdeFactory.<AggregatedActivePowerRecord>forValues())) - .mapValues(r -> new ActivePowerRecord(r.getIdentifier(), r.getTimestamp(), r.getSumInW())); - - final KTable<String, ActivePowerRecord> inputTable = values - .merge(aggregationsInput) - .groupByKey(Grouped.with( - Serdes.String(), - this.srAvroSerdeFactory.forValues())) - .reduce((aggr, value) -> value, Materialized.with( - Serdes.String(), - this.srAvroSerdeFactory.forValues())); - return inputTable; - } - - private KTable<String, Set<String>> buildParentSensorTable() { - final KStream<Event, String> configurationStream = this.builder - .stream(this.configurationTopic, Consumed.with(EventSerde.serde(), Serdes.String())) - .filter((key, value) -> key == Event.SENSOR_REGISTRY_CHANGED - || key == Event.SENSOR_REGISTRY_STATUS); - - return configurationStream - .mapValues(data -> SensorRegistry.fromJson(data)) - .flatTransform(new ChildParentsTransformerSupplier()) - .groupByKey(Grouped.with(Serdes.String(), OptionalParentsSerde.serde())) - .aggregate( - () -> Set.<String>of(), - (key, newValue, oldValue) -> newValue.orElse(null), - Materialized.with(Serdes.String(), ParentsSerde.serde())); - } - - private KTable<Windowed<SensorParentKey>, ActivePowerRecord> buildLastValueTable( - final KTable<String, Set<String>> parentSensorTable, - final KTable<String, ActivePowerRecord> inputTable) { - - return inputTable - .join(parentSensorTable, (record, parents) -> new JointRecordParents(parents, record)) - .toStream() - .flatTransform(new JointFlatTransformerSupplier()) - .groupByKey(Grouped.with( - SensorParentKeySerde.serde(), - this.srAvroSerdeFactory.forValues())) - .windowedBy(TimeWindows.of(this.emitPeriod).grace(this.gracePeriod)) - .reduce( - // TODO Configurable window aggregation function - (oldVal, newVal) -> newVal.getTimestamp() >= oldVal.getTimestamp() ? newVal : oldVal, - Materialized.with( - SensorParentKeySerde.serde(), - this.srAvroSerdeFactory.forValues())); - } - - private KTable<Windowed<String>, AggregatedActivePowerRecord> buildAggregationStream( - final KTable<Windowed<SensorParentKey>, ActivePowerRecord> lastValueTable) { - return lastValueTable - .groupBy( - (k, v) -> KeyValue.pair(new Windowed<>(k.key().getParent(), k.window()), v), - Grouped.with( - new WindowedSerdes.TimeWindowedSerde<>( - Serdes.String(), - this.emitPeriod.toMillis()), - this.srAvroSerdeFactory.forValues())) + this.builder + .stream(this.inputTopic, + Consumed.with(Serdes.String(), + this.srAvroSerdeFactory.<ActivePowerRecord>forValues())) + .groupByKey() + .windowedBy(TimeWindows.of(this.duration)) + // .aggregate( + // () -> 0.0, + // (key, activePowerRecord, agg) -> agg + activePowerRecord.getValueInW(), + // Materialized.with(Serdes.String(), Serdes.Double())) .aggregate( - () -> null, - this.recordAggregator::add, - this.recordAggregator::substract, + () -> Stats.of(), + (k, record, stats) -> StatsFactory.accumulate(stats, record.getValueInW()), Materialized.with( - new WindowedSerdes.TimeWindowedSerde<>( - Serdes.String(), - this.emitPeriod.toMillis()), - this.srAvroSerdeFactory.forValues())) - // TODO timestamp -1 indicates that this record is emitted by an substract event - .filter((k, record) -> record.getTimestamp() != -1); - } - - private void exposeFeedbackStream( - final KTable<Windowed<String>, AggregatedActivePowerRecord> aggregations) { - - aggregations + Serdes.String(), + GenericSerde.from(Stats::toByteArray, Stats::fromByteArray))) .toStream() - .filter((k, record) -> record != null) - .selectKey((k, v) -> k.key()) - .to(this.feedbackTopic, Produced.with( - Serdes.String(), - this.srAvroSerdeFactory.forValues())); - } - - private void exposeOutputStream( - final KTable<Windowed<String>, AggregatedActivePowerRecord> aggregations) { + .map((k, s) -> KeyValue.pair(k.key(), s.toString())) + .peek((k, v) -> LOGGER.info(k + ": " + v)) + .to(this.outputTopic, Produced.with(Serdes.String(), Serdes.String())); - aggregations - // .suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded())) - .suppress(Suppressed.untilTimeLimit(this.emitPeriod, BufferConfig.unbounded())) - .toStream() - .filter((k, record) -> record != null) - .selectKey((k, v) -> k.key()) - .to(this.outputTopic, Produced.with( - Serdes.String(), - this.srAvroSerdeFactory.forValues())); + return this.builder.build(properties); } } diff --git a/benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/Uc2KafkaStreamsBuilder.java b/benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/Uc2KafkaStreamsBuilder.java index 7e077b101..1d6019f27 100644 --- a/benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/Uc2KafkaStreamsBuilder.java +++ b/benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/Uc2KafkaStreamsBuilder.java @@ -11,62 +11,33 @@ import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; /** * Builder for the Kafka Streams configuration. */ -public class Uc2KafkaStreamsBuilder extends KafkaStreamsBuilder { // NOPMD builder method +public class Uc2KafkaStreamsBuilder extends KafkaStreamsBuilder { - private static final Duration EMIT_PERIOD_DEFAULT = Duration.ofSeconds(1); - private static final Duration GRACE_PERIOD_DEFAULT = Duration.ZERO; - - private String feedbackTopic; // NOPMD private String outputTopic; // NOPMD - private String configurationTopic; // NOPMD - private Duration emitPeriod; // NOPMD - private Duration gracePeriod; // NOPMD + private Duration windowDuration; // NOPMD public Uc2KafkaStreamsBuilder(final Configuration config) { super(config); } - public Uc2KafkaStreamsBuilder feedbackTopic(final String feedbackTopic) { - this.feedbackTopic = feedbackTopic; - return this; - } - public Uc2KafkaStreamsBuilder outputTopic(final String outputTopic) { this.outputTopic = outputTopic; return this; } - public Uc2KafkaStreamsBuilder configurationTopic(final String configurationTopic) { - this.configurationTopic = configurationTopic; - return this; - } - - public Uc2KafkaStreamsBuilder emitPeriod(final Duration emitPeriod) { - this.emitPeriod = Objects.requireNonNull(emitPeriod); - return this; - } - - public Uc2KafkaStreamsBuilder gracePeriod(final Duration gracePeriod) { - this.gracePeriod = Objects.requireNonNull(gracePeriod); + public Uc2KafkaStreamsBuilder windowDuration(final Duration windowDuration) { + this.windowDuration = windowDuration; return this; } @Override protected Topology buildTopology(final Properties properties) { Objects.requireNonNull(this.inputTopic, "Input topic has not been set."); - Objects.requireNonNull(this.feedbackTopic, "Feedback topic has not been set."); Objects.requireNonNull(this.outputTopic, "Output topic has not been set."); - Objects.requireNonNull(this.configurationTopic, "Configuration topic has not been set."); - - final TopologyBuilder topologyBuilder = new TopologyBuilder( - this.inputTopic, - this.outputTopic, - this.feedbackTopic, - this.configurationTopic, - this.emitPeriod == null ? EMIT_PERIOD_DEFAULT : this.emitPeriod, - this.gracePeriod == null ? GRACE_PERIOD_DEFAULT : this.gracePeriod, - new SchemaRegistryAvroSerdeFactory(this.schemaRegistryUrl)); + Objects.requireNonNull(this.windowDuration, "Window duration has not been set."); + final TopologyBuilder topologyBuilder = new TopologyBuilder(this.inputTopic, this.outputTopic, + new SchemaRegistryAvroSerdeFactory(this.schemaRegistryUrl), this.windowDuration); return topologyBuilder.build(properties); } diff --git a/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/util/StatsFactory.java b/benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/util/StatsFactory.java similarity index 91% rename from benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/util/StatsFactory.java rename to benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/util/StatsFactory.java index e97fbcd21..e4aff4fc8 100644 --- a/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/util/StatsFactory.java +++ b/benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/util/StatsFactory.java @@ -1,4 +1,4 @@ -package theodolite.uc4.streamprocessing.util; +package theodolite.uc2.streamprocessing.util; import com.google.common.math.Stats; import com.google.common.math.StatsAccumulator; diff --git a/benchmarks/uc2-application/src/main/resources/META-INF/application.properties b/benchmarks/uc2-application/src/main/resources/META-INF/application.properties index 8f1af5f59..15293b138 100644 --- a/benchmarks/uc2-application/src/main/resources/META-INF/application.properties +++ b/benchmarks/uc2-application/src/main/resources/META-INF/application.properties @@ -3,11 +3,7 @@ application.version=0.0.1 kafka.bootstrap.servers=localhost:9092 kafka.input.topic=input -kafka.configuration.topic=configuration -kafka.feedback.topic=aggregation-feedback kafka.output.topic=output +kafka.window.duration.minutes=1 schema.registry.url=http://localhost:8091 - -emit.period.ms=5000 -grace.period.ms=0 \ No newline at end of file diff --git a/benchmarks/uc2-workload-generator/Dockerfile b/benchmarks/uc2-workload-generator/Dockerfile index 162243e05..55593e029 100644 --- a/benchmarks/uc2-workload-generator/Dockerfile +++ b/benchmarks/uc2-workload-generator/Dockerfile @@ -1,6 +1,6 @@ -FROM openjdk:11-slim - -ADD build/distributions/uc2-workload-generator.tar / - -CMD JAVA_OPTS="$JAVA_OPTS -Dorg.slf4j.simpleLogger.defaultLogLevel=$LOG_LEVEL" \ +FROM openjdk:11-slim + +ADD build/distributions/uc2-workload-generator.tar / + +CMD JAVA_OPTS="$JAVA_OPTS -Dorg.slf4j.simpleLogger.defaultLogLevel=$LOG_LEVEL" \ /uc2-workload-generator/bin/uc2-workload-generator \ No newline at end of file diff --git a/benchmarks/uc2-workload-generator/build.gradle b/benchmarks/uc2-workload-generator/build.gradle index b92e0c2ed..f2c3e5d2e 100644 --- a/benchmarks/uc2-workload-generator/build.gradle +++ b/benchmarks/uc2-workload-generator/build.gradle @@ -1 +1 @@ -mainClassName = "theodolite.uc2.workloadgenerator.LoadGenerator" +mainClassName = "theodolite.uc2.workloadgenerator.LoadGenerator" diff --git a/benchmarks/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/LoadGenerator.java b/benchmarks/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/LoadGenerator.java index 826387c48..2c5b59bc1 100644 --- a/benchmarks/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/LoadGenerator.java +++ b/benchmarks/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/LoadGenerator.java @@ -1,65 +1,19 @@ package theodolite.uc2.workloadgenerator; -import java.util.Objects; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import theodolite.commons.workloadgeneration.KeySpace; -import titan.ccp.configuration.events.Event; -import titan.ccp.model.sensorregistry.SensorRegistry; /** * Load generator for Theodolite use case UC2. */ public final class LoadGenerator { - private static final int SLEEP_PERIOD = 30_000; - private static final Logger LOGGER = LoggerFactory.getLogger(LoadGenerator.class); private LoadGenerator() {} - /** - * Start load generator. - */ public static void main(final String[] args) { - final boolean sendRegistry = Boolean.parseBoolean(Objects.requireNonNullElse( - System.getenv("SEND_REGISTRY"), - "true")); - final String kafkaBootstrapServers = Objects.requireNonNullElse( - System.getenv("KAFKA_BOOTSTRAP_SERVERS"), - "localhost:9092"); - final int numSensors = Integer.parseInt(Objects.requireNonNullElse( - System.getenv("NUM_SENSORS"), - "1")); - final int numNestedGroups = Integer.parseInt(Objects.requireNonNullElse( - System.getenv("NUM_NESTED_GROUPS"), - "1")); - - // Build sensor hierarchy - final SensorRegistry sensorRegistry = - new SensorRegistryBuilder(numNestedGroups, numSensors).build(); - LOGGER.info("Start workload generator for use case UC2"); - theodolite.commons.workloadgeneration.LoadGenerator.fromEnvironment() - .withKeySpace(new KeySpace("s_", sensorRegistry.getMachineSensors().size())) - .withBeforeAction(() -> { - if (sendRegistry) { - final ConfigPublisher configPublisher = - new ConfigPublisher(kafkaBootstrapServers, "configuration"); - configPublisher.publish(Event.SENSOR_REGISTRY_CHANGED, sensorRegistry.toJson()); - configPublisher.close(); - LOGGER.info("Configuration sent."); - - LOGGER.info("Now wait 30 seconds..."); - try { - Thread.sleep(SLEEP_PERIOD); - } catch (final InterruptedException e) { - LOGGER.error(e.getMessage(), e); - } - LOGGER.info("...and start generating load."); - } - }) - .run(); + theodolite.commons.workloadgeneration.LoadGenerator.fromEnvironment().run(); } - } diff --git a/benchmarks/uc3-application/.settings/org.eclipse.jdt.ui.prefs b/benchmarks/uc3-application/.settings/org.eclipse.jdt.ui.prefs index 4e04e2891..fa98ca63d 100644 --- a/benchmarks/uc3-application/.settings/org.eclipse.jdt.ui.prefs +++ b/benchmarks/uc3-application/.settings/org.eclipse.jdt.ui.prefs @@ -32,7 +32,7 @@ cleanup.qualify_static_member_accesses_with_declaring_class=true cleanup.qualify_static_method_accesses_with_declaring_class=false cleanup.remove_private_constructors=true cleanup.remove_redundant_modifiers=false -cleanup.remove_redundant_semicolons=false +cleanup.remove_redundant_semicolons=true cleanup.remove_redundant_type_arguments=true cleanup.remove_trailing_whitespaces=true cleanup.remove_trailing_whitespaces_all=true diff --git a/benchmarks/uc3-application/Dockerfile b/benchmarks/uc3-application/Dockerfile index c70a24268..61141baaf 100644 --- a/benchmarks/uc3-application/Dockerfile +++ b/benchmarks/uc3-application/Dockerfile @@ -1,8 +1,6 @@ FROM openjdk:11-slim - ADD build/distributions/uc3-application.tar / - CMD JAVA_OPTS="$JAVA_OPTS -Dorg.slf4j.simpleLogger.defaultLogLevel=$LOG_LEVEL" \ - /uc3-application/bin/uc3-application \ No newline at end of file + /uc3-application/bin/uc3-application diff --git a/benchmarks/uc3-application/src/main/java/theodolite/uc3/application/HistoryService.java b/benchmarks/uc3-application/src/main/java/theodolite/uc3/application/HistoryService.java index 349512f98..84fb29969 100644 --- a/benchmarks/uc3-application/src/main/java/theodolite/uc3/application/HistoryService.java +++ b/benchmarks/uc3-application/src/main/java/theodolite/uc3/application/HistoryService.java @@ -1,7 +1,6 @@ package theodolite.uc3.application; import java.time.Duration; -import java.util.Objects; import java.util.concurrent.CompletableFuture; import org.apache.commons.configuration2.Configuration; import org.apache.kafka.streams.KafkaStreams; @@ -19,8 +18,6 @@ public class HistoryService { private final Configuration config = ServiceConfigurations.createWithDefaults(); private final CompletableFuture<Void> stopEvent = new CompletableFuture<>(); - private final int windowDurationMinutes = Integer - .parseInt(Objects.requireNonNullElse(System.getenv("KAFKA_WINDOW_DURATION_MINUTES"), "60")); /** * Start the service. @@ -34,11 +31,16 @@ public class HistoryService { * */ private void createKafkaStreamsApplication() { + // Use case specific stream configuration final Uc3KafkaStreamsBuilder uc3KafkaStreamsBuilder = new Uc3KafkaStreamsBuilder(this.config); uc3KafkaStreamsBuilder .outputTopic(this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC)) - .windowDuration(Duration.ofMinutes(this.windowDurationMinutes)); + .aggregtionDuration( + Duration.ofDays(this.config.getInt(ConfigurationKeys.AGGREGATION_DURATION_DAYS))) + .aggregationAdvance( + Duration.ofDays(this.config.getInt(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS))); + // Configuration of the stream application final KafkaStreams kafkaStreams = uc3KafkaStreamsBuilder.build(); this.stopEvent.thenRun(kafkaStreams::close); diff --git a/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/HourOfDayKey.java b/benchmarks/uc3-application/src/main/java/theodolite/uc3/streamprocessing/HourOfDayKey.java similarity index 96% rename from benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/HourOfDayKey.java rename to benchmarks/uc3-application/src/main/java/theodolite/uc3/streamprocessing/HourOfDayKey.java index 97807e3bd..549674f9f 100644 --- a/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/HourOfDayKey.java +++ b/benchmarks/uc3-application/src/main/java/theodolite/uc3/streamprocessing/HourOfDayKey.java @@ -1,4 +1,4 @@ -package theodolite.uc4.streamprocessing; +package theodolite.uc3.streamprocessing; import java.util.Objects; diff --git a/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/HourOfDayKeyFactory.java b/benchmarks/uc3-application/src/main/java/theodolite/uc3/streamprocessing/HourOfDayKeyFactory.java similarity index 92% rename from benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/HourOfDayKeyFactory.java rename to benchmarks/uc3-application/src/main/java/theodolite/uc3/streamprocessing/HourOfDayKeyFactory.java index edb9ad2b2..837ca9d32 100644 --- a/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/HourOfDayKeyFactory.java +++ b/benchmarks/uc3-application/src/main/java/theodolite/uc3/streamprocessing/HourOfDayKeyFactory.java @@ -1,4 +1,4 @@ -package theodolite.uc4.streamprocessing; +package theodolite.uc3.streamprocessing; import java.time.LocalDateTime; diff --git a/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/HourOfDayKeySerde.java b/benchmarks/uc3-application/src/main/java/theodolite/uc3/streamprocessing/HourOfDayKeySerde.java similarity index 96% rename from benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/HourOfDayKeySerde.java rename to benchmarks/uc3-application/src/main/java/theodolite/uc3/streamprocessing/HourOfDayKeySerde.java index ff404ab12..6855907e7 100644 --- a/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/HourOfDayKeySerde.java +++ b/benchmarks/uc3-application/src/main/java/theodolite/uc3/streamprocessing/HourOfDayKeySerde.java @@ -1,4 +1,4 @@ -package theodolite.uc4.streamprocessing; +package theodolite.uc3.streamprocessing; import org.apache.kafka.common.serialization.Serde; import titan.ccp.common.kafka.simpleserdes.BufferSerde; diff --git a/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/HourOfDayRecordFactory.java b/benchmarks/uc3-application/src/main/java/theodolite/uc3/streamprocessing/HourOfDayRecordFactory.java similarity index 95% rename from benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/HourOfDayRecordFactory.java rename to benchmarks/uc3-application/src/main/java/theodolite/uc3/streamprocessing/HourOfDayRecordFactory.java index 7249309ce..dfa9b95b0 100644 --- a/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/HourOfDayRecordFactory.java +++ b/benchmarks/uc3-application/src/main/java/theodolite/uc3/streamprocessing/HourOfDayRecordFactory.java @@ -1,4 +1,4 @@ -package theodolite.uc4.streamprocessing; +package theodolite.uc3.streamprocessing; import com.google.common.math.Stats; import org.apache.kafka.streams.kstream.Windowed; diff --git a/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/RecordDatabaseAdapter.java b/benchmarks/uc3-application/src/main/java/theodolite/uc3/streamprocessing/RecordDatabaseAdapter.java similarity index 98% rename from benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/RecordDatabaseAdapter.java rename to benchmarks/uc3-application/src/main/java/theodolite/uc3/streamprocessing/RecordDatabaseAdapter.java index 8f693d5d3..342cb3e04 100644 --- a/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/RecordDatabaseAdapter.java +++ b/benchmarks/uc3-application/src/main/java/theodolite/uc3/streamprocessing/RecordDatabaseAdapter.java @@ -1,4 +1,4 @@ -package theodolite.uc4.streamprocessing; +package theodolite.uc3.streamprocessing; import java.util.Collection; import java.util.List; diff --git a/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/StatsKeyFactory.java b/benchmarks/uc3-application/src/main/java/theodolite/uc3/streamprocessing/StatsKeyFactory.java similarity index 88% rename from benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/StatsKeyFactory.java rename to benchmarks/uc3-application/src/main/java/theodolite/uc3/streamprocessing/StatsKeyFactory.java index cf67efbd3..0e414c4a1 100644 --- a/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/StatsKeyFactory.java +++ b/benchmarks/uc3-application/src/main/java/theodolite/uc3/streamprocessing/StatsKeyFactory.java @@ -1,4 +1,4 @@ -package theodolite.uc4.streamprocessing; +package theodolite.uc3.streamprocessing; import java.time.LocalDateTime; diff --git a/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/StatsRecordFactory.java b/benchmarks/uc3-application/src/main/java/theodolite/uc3/streamprocessing/StatsRecordFactory.java similarity index 94% rename from benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/StatsRecordFactory.java rename to benchmarks/uc3-application/src/main/java/theodolite/uc3/streamprocessing/StatsRecordFactory.java index 79eb4b9f7..31935df9d 100644 --- a/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/StatsRecordFactory.java +++ b/benchmarks/uc3-application/src/main/java/theodolite/uc3/streamprocessing/StatsRecordFactory.java @@ -1,4 +1,4 @@ -package theodolite.uc4.streamprocessing; +package theodolite.uc3.streamprocessing; import com.google.common.math.Stats; import org.apache.avro.specific.SpecificRecord; diff --git a/benchmarks/uc3-application/src/main/java/theodolite/uc3/streamprocessing/TopologyBuilder.java b/benchmarks/uc3-application/src/main/java/theodolite/uc3/streamprocessing/TopologyBuilder.java index d6d6d4ffb..1e976c071 100644 --- a/benchmarks/uc3-application/src/main/java/theodolite/uc3/streamprocessing/TopologyBuilder.java +++ b/benchmarks/uc3-application/src/main/java/theodolite/uc3/streamprocessing/TopologyBuilder.java @@ -2,17 +2,20 @@ package theodolite.uc3.streamprocessing; import com.google.common.math.Stats; import java.time.Duration; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; import java.util.Properties; +import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Grouped; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.kstream.TimeWindows; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import theodolite.uc3.streamprocessing.util.StatsFactory; import titan.ccp.common.kafka.GenericSerde; import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; @@ -23,12 +26,16 @@ import titan.ccp.model.records.ActivePowerRecord; */ public class TopologyBuilder { - private static final Logger LOGGER = LoggerFactory.getLogger(TopologyBuilder.class); + // private static final Logger LOGGER = LoggerFactory.getLogger(TopologyBuilder.class); + + private final ZoneId zone = ZoneId.of("Europe/Paris"); // TODO as parameter + private final String inputTopic; private final String outputTopic; private final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory; - private final Duration duration; + private final Duration aggregtionDuration; + private final Duration aggregationAdvance; private final StreamsBuilder builder = new StreamsBuilder(); @@ -37,37 +44,51 @@ public class TopologyBuilder { */ public TopologyBuilder(final String inputTopic, final String outputTopic, final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory, - final Duration duration) { + final Duration aggregtionDuration, final Duration aggregationAdvance) { this.inputTopic = inputTopic; this.outputTopic = outputTopic; this.srAvroSerdeFactory = srAvroSerdeFactory; - this.duration = duration; + this.aggregtionDuration = aggregtionDuration; + this.aggregationAdvance = aggregationAdvance; } /** * Build the {@link Topology} for the History microservice. */ public Topology build(final Properties properties) { + final StatsKeyFactory<HourOfDayKey> keyFactory = new HourOfDayKeyFactory(); + final Serde<HourOfDayKey> keySerde = HourOfDayKeySerde.create(); + this.builder .stream(this.inputTopic, Consumed.with(Serdes.String(), this.srAvroSerdeFactory.<ActivePowerRecord>forValues())) - .groupByKey() - .windowedBy(TimeWindows.of(this.duration)) - // .aggregate( - // () -> 0.0, - // (key, activePowerRecord, agg) -> agg + activePowerRecord.getValueInW(), - // Materialized.with(Serdes.String(), Serdes.Double())) + .selectKey((key, value) -> { + final Instant instant = Instant.ofEpochMilli(value.getTimestamp()); + final LocalDateTime dateTime = LocalDateTime.ofInstant(instant, this.zone); + return keyFactory.createKey(value.getIdentifier(), dateTime); + }) + .groupByKey( + Grouped.with(keySerde, this.srAvroSerdeFactory.forValues())) + .windowedBy(TimeWindows.of(this.aggregtionDuration).advanceBy(this.aggregationAdvance)) .aggregate( () -> Stats.of(), (k, record, stats) -> StatsFactory.accumulate(stats, record.getValueInW()), - Materialized.with( - Serdes.String(), + Materialized.with(keySerde, GenericSerde.from(Stats::toByteArray, Stats::fromByteArray))) .toStream() - .map((k, s) -> KeyValue.pair(k.key(), s.toString())) - .peek((k, v) -> LOGGER.info(k + ": " + v)) - .to(this.outputTopic, Produced.with(Serdes.String(), Serdes.String())); + .map((key, stats) -> KeyValue.pair( + keyFactory.getSensorId(key.key()), + stats.toString())) + // TODO + // statsRecordFactory.create(key, value))) + // .peek((k, v) -> LOGGER.info("{}: {}", k, v)) // TODO Temp logging + .to( + this.outputTopic, + Produced.with( + Serdes.String(), + Serdes.String())); + // this.serdes.avroValues())); return this.builder.build(properties); } diff --git a/benchmarks/uc3-application/src/main/java/theodolite/uc3/streamprocessing/Uc3KafkaStreamsBuilder.java b/benchmarks/uc3-application/src/main/java/theodolite/uc3/streamprocessing/Uc3KafkaStreamsBuilder.java index 70113271a..ea9b06460 100644 --- a/benchmarks/uc3-application/src/main/java/theodolite/uc3/streamprocessing/Uc3KafkaStreamsBuilder.java +++ b/benchmarks/uc3-application/src/main/java/theodolite/uc3/streamprocessing/Uc3KafkaStreamsBuilder.java @@ -14,7 +14,8 @@ import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; public class Uc3KafkaStreamsBuilder extends KafkaStreamsBuilder { private String outputTopic; // NOPMD - private Duration windowDuration; // NOPMD + private Duration aggregtionDuration; // NOPMD + private Duration aggregationAdvance; // NOPMD public Uc3KafkaStreamsBuilder(final Configuration config) { super(config); @@ -25,8 +26,13 @@ public class Uc3KafkaStreamsBuilder extends KafkaStreamsBuilder { return this; } - public Uc3KafkaStreamsBuilder windowDuration(final Duration windowDuration) { - this.windowDuration = windowDuration; + public Uc3KafkaStreamsBuilder aggregtionDuration(final Duration aggregtionDuration) { + this.aggregtionDuration = aggregtionDuration; + return this; + } + + public Uc3KafkaStreamsBuilder aggregationAdvance(final Duration aggregationAdvance) { + this.aggregationAdvance = aggregationAdvance; return this; } @@ -34,10 +40,16 @@ public class Uc3KafkaStreamsBuilder extends KafkaStreamsBuilder { protected Topology buildTopology(final Properties properties) { Objects.requireNonNull(this.inputTopic, "Input topic has not been set."); Objects.requireNonNull(this.outputTopic, "Output topic has not been set."); - Objects.requireNonNull(this.windowDuration, "Window duration has not been set."); + Objects.requireNonNull(this.aggregtionDuration, "Aggregation duration has not been set."); + Objects.requireNonNull(this.aggregationAdvance, "Aggregation advance period has not been set."); + + final TopologyBuilder topologyBuilder = new TopologyBuilder( + this.inputTopic, + this.outputTopic, + new SchemaRegistryAvroSerdeFactory(this.schemaRegistryUrl), + this.aggregtionDuration, + this.aggregationAdvance); - final TopologyBuilder topologyBuilder = new TopologyBuilder(this.inputTopic, this.outputTopic, - new SchemaRegistryAvroSerdeFactory(this.schemaRegistryUrl), this.windowDuration); return topologyBuilder.build(properties); } diff --git a/benchmarks/uc3-application/src/main/resources/META-INF/application.properties b/benchmarks/uc3-application/src/main/resources/META-INF/application.properties index 011406f7e..1273441a6 100644 --- a/benchmarks/uc3-application/src/main/resources/META-INF/application.properties +++ b/benchmarks/uc3-application/src/main/resources/META-INF/application.properties @@ -4,6 +4,7 @@ application.version=0.0.1 kafka.bootstrap.servers=localhost:9092 kafka.input.topic=input kafka.output.topic=output -kafka.window.duration.minutes=1 +aggregation.duration.days=30 +aggregation.advance.days=1 schema.registry.url=http://localhost:8091 diff --git a/benchmarks/uc3-workload-generator/Dockerfile b/benchmarks/uc3-workload-generator/Dockerfile index 6efd5ec61..8422c9d53 100644 --- a/benchmarks/uc3-workload-generator/Dockerfile +++ b/benchmarks/uc3-workload-generator/Dockerfile @@ -1,6 +1,6 @@ -FROM openjdk:11-slim - -ADD build/distributions/uc3-workload-generator.tar / - -CMD JAVA_OPTS="$JAVA_OPTS -Dorg.slf4j.simpleLogger.defaultLogLevel=$LOG_LEVEL" \ - /uc3-workload-generator/bin/uc3-workload-generator \ No newline at end of file +FROM openjdk:11-slim + +ADD build/distributions/uc3-workload-generator.tar / + +CMD JAVA_OPTS="$JAVA_OPTS -Dorg.slf4j.simpleLogger.defaultLogLevel=$LOG_LEVEL" \ + /uc3-workload-generator/bin/uc3-workload-generator diff --git a/benchmarks/uc3-workload-generator/src/main/java/theodolite/uc3/workloadgenerator/LoadGenerator.java b/benchmarks/uc3-workload-generator/src/main/java/theodolite/uc3/workloadgenerator/LoadGenerator.java index 662113fd1..97527abfd 100644 --- a/benchmarks/uc3-workload-generator/src/main/java/theodolite/uc3/workloadgenerator/LoadGenerator.java +++ b/benchmarks/uc3-workload-generator/src/main/java/theodolite/uc3/workloadgenerator/LoadGenerator.java @@ -10,10 +10,13 @@ public final class LoadGenerator { private static final Logger LOGGER = LoggerFactory.getLogger(LoadGenerator.class); - private LoadGenerator() {} + private LoadGenerator() { + throw new UnsupportedOperationException(); + } public static void main(final String[] args) { LOGGER.info("Start workload generator for use case UC3"); theodolite.commons.workloadgeneration.LoadGenerator.fromEnvironment().run(); } + } diff --git a/benchmarks/uc4-application/.settings/org.eclipse.jdt.ui.prefs b/benchmarks/uc4-application/.settings/org.eclipse.jdt.ui.prefs index 4e04e2891..fa98ca63d 100644 --- a/benchmarks/uc4-application/.settings/org.eclipse.jdt.ui.prefs +++ b/benchmarks/uc4-application/.settings/org.eclipse.jdt.ui.prefs @@ -32,7 +32,7 @@ cleanup.qualify_static_member_accesses_with_declaring_class=true cleanup.qualify_static_method_accesses_with_declaring_class=false cleanup.remove_private_constructors=true cleanup.remove_redundant_modifiers=false -cleanup.remove_redundant_semicolons=false +cleanup.remove_redundant_semicolons=true cleanup.remove_redundant_type_arguments=true cleanup.remove_trailing_whitespaces=true cleanup.remove_trailing_whitespaces_all=true diff --git a/benchmarks/uc4-application/Dockerfile b/benchmarks/uc4-application/Dockerfile index 8cb65188a..add251c0e 100644 --- a/benchmarks/uc4-application/Dockerfile +++ b/benchmarks/uc4-application/Dockerfile @@ -1,8 +1,6 @@ FROM openjdk:11-slim - ADD build/distributions/uc4-application.tar / - -CMD JAVA_OPTS="$JAVA_OPTS -Dorg.slf4j.simpleLogger.defaultLogLevel=$LOG_LEVEL" \ +CMD JAVA_OPTS="$JAVA_OPTS -Dorg.slf4j.simpleLogger.defaultLogLevel=$LOG_LEVEL" \ /uc4-application/bin/uc4-application diff --git a/benchmarks/uc2-application/README.md b/benchmarks/uc4-application/README.md similarity index 100% rename from benchmarks/uc2-application/README.md rename to benchmarks/uc4-application/README.md diff --git a/benchmarks/uc4-application/build.gradle b/benchmarks/uc4-application/build.gradle index 566630221..9cb1b311d 100644 --- a/benchmarks/uc4-application/build.gradle +++ b/benchmarks/uc4-application/build.gradle @@ -1 +1 @@ -mainClassName = "theodolite.uc4.application.HistoryService" +mainClassName = "theodolite.uc4.application.AggregationService" diff --git a/benchmarks/uc2-application/src/main/java/theodolite/uc2/application/AggregationService.java b/benchmarks/uc4-application/src/main/java/theodolite/uc4/application/AggregationService.java similarity index 85% rename from benchmarks/uc2-application/src/main/java/theodolite/uc2/application/AggregationService.java rename to benchmarks/uc4-application/src/main/java/theodolite/uc4/application/AggregationService.java index 2f828278f..5c9d0910e 100644 --- a/benchmarks/uc2-application/src/main/java/theodolite/uc2/application/AggregationService.java +++ b/benchmarks/uc4-application/src/main/java/theodolite/uc4/application/AggregationService.java @@ -1,11 +1,11 @@ -package theodolite.uc2.application; +package theodolite.uc4.application; import java.time.Duration; import java.util.concurrent.CompletableFuture; import org.apache.commons.configuration2.Configuration; import org.apache.kafka.streams.KafkaStreams; import theodolite.commons.kafkastreams.ConfigurationKeys; -import theodolite.uc2.streamprocessing.Uc2KafkaStreamsBuilder; +import theodolite.uc4.streamprocessing.Uc4KafkaStreamsBuilder; import titan.ccp.common.configuration.ServiceConfigurations; /** @@ -36,15 +36,15 @@ public class AggregationService { * @param clusterSession the database session which the application should use. */ private void createKafkaStreamsApplication() { - final Uc2KafkaStreamsBuilder uc2KafkaStreamsBuilder = new Uc2KafkaStreamsBuilder(this.config); - uc2KafkaStreamsBuilder + final Uc4KafkaStreamsBuilder uc4KafkaStreamsBuilder = new Uc4KafkaStreamsBuilder(this.config); + uc4KafkaStreamsBuilder .feedbackTopic(this.config.getString(ConfigurationKeys.KAFKA_FEEDBACK_TOPIC)) .outputTopic(this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC)) .configurationTopic(this.config.getString(ConfigurationKeys.KAFKA_CONFIGURATION_TOPIC)) .emitPeriod(Duration.ofMillis(this.config.getLong(ConfigurationKeys.EMIT_PERIOD_MS))) .gracePeriod(Duration.ofMillis(this.config.getLong(ConfigurationKeys.GRACE_PERIOD_MS))); - final KafkaStreams kafkaStreams = uc2KafkaStreamsBuilder.build(); + final KafkaStreams kafkaStreams = uc4KafkaStreamsBuilder.build(); this.stopEvent.thenRun(kafkaStreams::close); kafkaStreams.start(); diff --git a/benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/ChildParentsTransformer.java b/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/ChildParentsTransformer.java similarity index 99% rename from benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/ChildParentsTransformer.java rename to benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/ChildParentsTransformer.java index d4f9097ad..db28c86bc 100644 --- a/benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/ChildParentsTransformer.java +++ b/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/ChildParentsTransformer.java @@ -1,4 +1,4 @@ -package theodolite.uc2.streamprocessing; +package theodolite.uc4.streamprocessing; import java.util.Map; import java.util.Optional; diff --git a/benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/ChildParentsTransformerSupplier.java b/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/ChildParentsTransformerSupplier.java similarity index 97% rename from benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/ChildParentsTransformerSupplier.java rename to benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/ChildParentsTransformerSupplier.java index 2b2d71c2f..d17757d68 100644 --- a/benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/ChildParentsTransformerSupplier.java +++ b/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/ChildParentsTransformerSupplier.java @@ -1,4 +1,4 @@ -package theodolite.uc2.streamprocessing; +package theodolite.uc4.streamprocessing; import java.util.Map; import java.util.Optional; diff --git a/benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/JointFlatTransformer.java b/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/JointFlatTransformer.java similarity index 98% rename from benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/JointFlatTransformer.java rename to benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/JointFlatTransformer.java index 724c7f6e2..d3500adff 100644 --- a/benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/JointFlatTransformer.java +++ b/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/JointFlatTransformer.java @@ -1,4 +1,4 @@ -package theodolite.uc2.streamprocessing; +package theodolite.uc4.streamprocessing; import com.google.common.base.MoreObjects; import java.util.ArrayList; diff --git a/benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/JointFlatTransformerSupplier.java b/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/JointFlatTransformerSupplier.java similarity index 96% rename from benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/JointFlatTransformerSupplier.java rename to benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/JointFlatTransformerSupplier.java index 7d9a7df3d..51c7ce1f6 100644 --- a/benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/JointFlatTransformerSupplier.java +++ b/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/JointFlatTransformerSupplier.java @@ -1,4 +1,4 @@ -package theodolite.uc2.streamprocessing; +package theodolite.uc4.streamprocessing; import java.util.Map; import java.util.Set; diff --git a/benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/JointRecordParents.java b/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/JointRecordParents.java similarity index 96% rename from benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/JointRecordParents.java rename to benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/JointRecordParents.java index cba05f1ed..e9a5a824e 100644 --- a/benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/JointRecordParents.java +++ b/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/JointRecordParents.java @@ -1,4 +1,4 @@ -package theodolite.uc2.streamprocessing; +package theodolite.uc4.streamprocessing; import java.util.Objects; import java.util.Set; diff --git a/benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/OptionalParentsSerde.java b/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/OptionalParentsSerde.java similarity index 97% rename from benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/OptionalParentsSerde.java rename to benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/OptionalParentsSerde.java index 5cb8f1ed8..a1e9767da 100644 --- a/benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/OptionalParentsSerde.java +++ b/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/OptionalParentsSerde.java @@ -1,4 +1,4 @@ -package theodolite.uc2.streamprocessing; +package theodolite.uc4.streamprocessing; import java.util.HashSet; import java.util.Optional; diff --git a/benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/ParentsSerde.java b/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/ParentsSerde.java similarity index 96% rename from benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/ParentsSerde.java rename to benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/ParentsSerde.java index 266eaad01..df6f848b5 100644 --- a/benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/ParentsSerde.java +++ b/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/ParentsSerde.java @@ -1,4 +1,4 @@ -package theodolite.uc2.streamprocessing; +package theodolite.uc4.streamprocessing; import java.util.HashSet; import java.util.Set; diff --git a/benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/RecordAggregator.java b/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/RecordAggregator.java similarity index 97% rename from benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/RecordAggregator.java rename to benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/RecordAggregator.java index 9564e994d..34ef3762d 100644 --- a/benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/RecordAggregator.java +++ b/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/RecordAggregator.java @@ -1,4 +1,4 @@ -package theodolite.uc2.streamprocessing; +package theodolite.uc4.streamprocessing; import org.apache.kafka.streams.kstream.Windowed; import titan.ccp.model.records.ActivePowerRecord; diff --git a/benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/SensorParentKey.java b/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/SensorParentKey.java similarity index 96% rename from benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/SensorParentKey.java rename to benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/SensorParentKey.java index a4fb5b339..667cc6d5e 100644 --- a/benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/SensorParentKey.java +++ b/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/SensorParentKey.java @@ -1,4 +1,4 @@ -package theodolite.uc2.streamprocessing; +package theodolite.uc4.streamprocessing; import java.util.Objects; diff --git a/benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/SensorParentKeySerde.java b/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/SensorParentKeySerde.java similarity index 95% rename from benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/SensorParentKeySerde.java rename to benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/SensorParentKeySerde.java index d6773c615..63b9e44b5 100644 --- a/benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/SensorParentKeySerde.java +++ b/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/SensorParentKeySerde.java @@ -1,4 +1,4 @@ -package theodolite.uc2.streamprocessing; +package theodolite.uc4.streamprocessing; import org.apache.kafka.common.serialization.Serde; import titan.ccp.common.kafka.simpleserdes.BufferSerde; diff --git a/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/TopologyBuilder.java b/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/TopologyBuilder.java index a0c87ba47..623870313 100644 --- a/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/TopologyBuilder.java +++ b/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/TopologyBuilder.java @@ -1,95 +1,206 @@ package theodolite.uc4.streamprocessing; -import com.google.common.math.Stats; import java.time.Duration; -import java.time.Instant; -import java.time.LocalDateTime; -import java.time.ZoneId; import java.util.Properties; -import org.apache.kafka.common.serialization.Serde; +import java.util.Set; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.Grouped; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.streams.kstream.Suppressed; +import org.apache.kafka.streams.kstream.Suppressed.BufferConfig; import org.apache.kafka.streams.kstream.TimeWindows; -import theodolite.uc4.streamprocessing.util.StatsFactory; -import titan.ccp.common.kafka.GenericSerde; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.WindowedSerdes; import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; +import titan.ccp.configuration.events.Event; +import titan.ccp.configuration.events.EventSerde; import titan.ccp.model.records.ActivePowerRecord; +import titan.ccp.model.records.AggregatedActivePowerRecord; +import titan.ccp.model.sensorregistry.SensorRegistry; /** * Builds Kafka Stream Topology for the History microservice. */ public class TopologyBuilder { - - // private static final Logger LOGGER = LoggerFactory.getLogger(TopologyBuilder.class); - - private final ZoneId zone = ZoneId.of("Europe/Paris"); // TODO as parameter - - + // Streams Variables private final String inputTopic; + private final String feedbackTopic; private final String outputTopic; + private final String configurationTopic; + private final Duration emitPeriod; + private final Duration gracePeriod; + + // Serdes private final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory; - private final Duration aggregtionDuration; - private final Duration aggregationAdvance; private final StreamsBuilder builder = new StreamsBuilder(); + private final RecordAggregator recordAggregator = new RecordAggregator(); /** * Create a new {@link TopologyBuilder} using the given topics. + * + * @param inputTopic The topic where to read sensor measurements from. + * @param configurationTopic The topic where the hierarchy of the sensors is published. + * @param feedbackTopic The topic where aggregation results are written to for feedback. + * @param outputTopic The topic where to publish aggregation results. + * @param emitPeriod The Duration results are emitted with. + * @param gracePeriod The Duration for how long late arriving records are considered. + * @param srAvroSerdeFactory Factory for creating avro SERDEs + * */ public TopologyBuilder(final String inputTopic, final String outputTopic, - final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory, - final Duration aggregtionDuration, final Duration aggregationAdvance) { + final String feedbackTopic, final String configurationTopic, + final Duration emitPeriod, final Duration gracePeriod, + final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory) { this.inputTopic = inputTopic; this.outputTopic = outputTopic; + this.feedbackTopic = feedbackTopic; + this.configurationTopic = configurationTopic; + this.emitPeriod = emitPeriod; + this.gracePeriod = gracePeriod; + this.srAvroSerdeFactory = srAvroSerdeFactory; - this.aggregtionDuration = aggregtionDuration; - this.aggregationAdvance = aggregationAdvance; } /** - * Build the {@link Topology} for the History microservice. + * Build the {@link Topology} for the Aggregation microservice. */ public Topology build(final Properties properties) { - final StatsKeyFactory<HourOfDayKey> keyFactory = new HourOfDayKeyFactory(); - final Serde<HourOfDayKey> keySerde = HourOfDayKeySerde.create(); - - this.builder - .stream(this.inputTopic, - Consumed.with(Serdes.String(), - this.srAvroSerdeFactory.<ActivePowerRecord>forValues())) - .selectKey((key, value) -> { - final Instant instant = Instant.ofEpochMilli(value.getTimestamp()); - final LocalDateTime dateTime = LocalDateTime.ofInstant(instant, this.zone); - return keyFactory.createKey(value.getIdentifier(), dateTime); - }) - .groupByKey( - Grouped.with(keySerde, this.srAvroSerdeFactory.forValues())) - .windowedBy(TimeWindows.of(this.aggregtionDuration).advanceBy(this.aggregationAdvance)) + // 1. Build Parent-Sensor Table + final KTable<String, Set<String>> parentSensorTable = this.buildParentSensorTable(); + + // 2. Build Input Table + final KTable<String, ActivePowerRecord> inputTable = this.buildInputTable(); + + // 3. Build Last Value Table from Input and Parent-Sensor Table + final KTable<Windowed<SensorParentKey>, ActivePowerRecord> lastValueTable = + this.buildLastValueTable(parentSensorTable, inputTable); + + // 4. Build Aggregations Stream + final KTable<Windowed<String>, AggregatedActivePowerRecord> aggregations = + this.buildAggregationStream(lastValueTable); + + // 6. Expose Feedback Stream + this.exposeFeedbackStream(aggregations); + + // 5. Expose Aggregations Stream + this.exposeOutputStream(aggregations); + + return this.builder.build(properties); + } + + private KTable<String, ActivePowerRecord> buildInputTable() { + final KStream<String, ActivePowerRecord> values = this.builder + .stream(this.inputTopic, Consumed.with( + Serdes.String(), + this.srAvroSerdeFactory.forValues())); + + final KStream<String, ActivePowerRecord> aggregationsInput = this.builder + .stream(this.feedbackTopic, Consumed.with( + Serdes.String(), + this.srAvroSerdeFactory.<AggregatedActivePowerRecord>forValues())) + .mapValues(r -> new ActivePowerRecord(r.getIdentifier(), r.getTimestamp(), r.getSumInW())); + + final KTable<String, ActivePowerRecord> inputTable = values + .merge(aggregationsInput) + .groupByKey(Grouped.with( + Serdes.String(), + this.srAvroSerdeFactory.forValues())) + .reduce((aggr, value) -> value, Materialized.with( + Serdes.String(), + this.srAvroSerdeFactory.forValues())); + return inputTable; + } + + private KTable<String, Set<String>> buildParentSensorTable() { + final KStream<Event, String> configurationStream = this.builder + .stream(this.configurationTopic, Consumed.with(EventSerde.serde(), Serdes.String())) + .filter((key, value) -> key == Event.SENSOR_REGISTRY_CHANGED + || key == Event.SENSOR_REGISTRY_STATUS); + + return configurationStream + .mapValues(data -> SensorRegistry.fromJson(data)) + .flatTransform(new ChildParentsTransformerSupplier()) + .groupByKey(Grouped.with(Serdes.String(), OptionalParentsSerde.serde())) .aggregate( - () -> Stats.of(), - (k, record, stats) -> StatsFactory.accumulate(stats, record.getValueInW()), - Materialized.with(keySerde, - GenericSerde.from(Stats::toByteArray, Stats::fromByteArray))) + () -> Set.<String>of(), + (key, newValue, oldValue) -> newValue.orElse(null), + Materialized.with(Serdes.String(), ParentsSerde.serde())); + } + + private KTable<Windowed<SensorParentKey>, ActivePowerRecord> buildLastValueTable( + final KTable<String, Set<String>> parentSensorTable, + final KTable<String, ActivePowerRecord> inputTable) { + + return inputTable + .join(parentSensorTable, (record, parents) -> new JointRecordParents(parents, record)) .toStream() - .map((key, stats) -> KeyValue.pair( - keyFactory.getSensorId(key.key()), - stats.toString())) - // TODO - // statsRecordFactory.create(key, value))) - // .peek((k, v) -> LOGGER.info("{}: {}", k, v)) // TODO Temp logging - .to( - this.outputTopic, - Produced.with( - Serdes.String(), - Serdes.String())); - // this.serdes.avroValues())); + .flatTransform(new JointFlatTransformerSupplier()) + .groupByKey(Grouped.with( + SensorParentKeySerde.serde(), + this.srAvroSerdeFactory.forValues())) + .windowedBy(TimeWindows.of(this.emitPeriod).grace(this.gracePeriod)) + .reduce( + // TODO Configurable window aggregation function + (oldVal, newVal) -> newVal.getTimestamp() >= oldVal.getTimestamp() ? newVal : oldVal, + Materialized.with( + SensorParentKeySerde.serde(), + this.srAvroSerdeFactory.forValues())); + } - return this.builder.build(properties); + private KTable<Windowed<String>, AggregatedActivePowerRecord> buildAggregationStream( + final KTable<Windowed<SensorParentKey>, ActivePowerRecord> lastValueTable) { + return lastValueTable + .groupBy( + (k, v) -> KeyValue.pair(new Windowed<>(k.key().getParent(), k.window()), v), + Grouped.with( + new WindowedSerdes.TimeWindowedSerde<>( + Serdes.String(), + this.emitPeriod.toMillis()), + this.srAvroSerdeFactory.forValues())) + .aggregate( + () -> null, + this.recordAggregator::add, + this.recordAggregator::substract, + Materialized.with( + new WindowedSerdes.TimeWindowedSerde<>( + Serdes.String(), + this.emitPeriod.toMillis()), + this.srAvroSerdeFactory.forValues())) + // TODO timestamp -1 indicates that this record is emitted by an substract event + .filter((k, record) -> record.getTimestamp() != -1); + } + + private void exposeFeedbackStream( + final KTable<Windowed<String>, AggregatedActivePowerRecord> aggregations) { + + aggregations + .toStream() + .filter((k, record) -> record != null) + .selectKey((k, v) -> k.key()) + .to(this.feedbackTopic, Produced.with( + Serdes.String(), + this.srAvroSerdeFactory.forValues())); + } + + private void exposeOutputStream( + final KTable<Windowed<String>, AggregatedActivePowerRecord> aggregations) { + + aggregations + // .suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded())) + .suppress(Suppressed.untilTimeLimit(this.emitPeriod, BufferConfig.unbounded())) + .toStream() + .filter((k, record) -> record != null) + .selectKey((k, v) -> k.key()) + .to(this.outputTopic, Produced.with( + Serdes.String(), + this.srAvroSerdeFactory.forValues())); } } diff --git a/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/Uc4KafkaStreamsBuilder.java b/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/Uc4KafkaStreamsBuilder.java index 67c652967..9f1af3ba0 100644 --- a/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/Uc4KafkaStreamsBuilder.java +++ b/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/Uc4KafkaStreamsBuilder.java @@ -11,44 +11,61 @@ import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; /** * Builder for the Kafka Streams configuration. */ -public class Uc4KafkaStreamsBuilder extends KafkaStreamsBuilder { +public class Uc4KafkaStreamsBuilder extends KafkaStreamsBuilder { // NOPMD builder method + private static final Duration EMIT_PERIOD_DEFAULT = Duration.ofSeconds(1); + private static final Duration GRACE_PERIOD_DEFAULT = Duration.ZERO; + + private String feedbackTopic; // NOPMD private String outputTopic; // NOPMD - private Duration aggregtionDuration; // NOPMD - private Duration aggregationAdvance; // NOPMD + private String configurationTopic; // NOPMD + private Duration emitPeriod; // NOPMD + private Duration gracePeriod; // NOPMD public Uc4KafkaStreamsBuilder(final Configuration config) { super(config); } + public Uc4KafkaStreamsBuilder feedbackTopic(final String feedbackTopic) { + this.feedbackTopic = feedbackTopic; + return this; + } + public Uc4KafkaStreamsBuilder outputTopic(final String outputTopic) { this.outputTopic = outputTopic; return this; } - public Uc4KafkaStreamsBuilder aggregtionDuration(final Duration aggregtionDuration) { - this.aggregtionDuration = aggregtionDuration; + public Uc4KafkaStreamsBuilder configurationTopic(final String configurationTopic) { + this.configurationTopic = configurationTopic; + return this; + } + + public Uc4KafkaStreamsBuilder emitPeriod(final Duration emitPeriod) { + this.emitPeriod = Objects.requireNonNull(emitPeriod); return this; } - public Uc4KafkaStreamsBuilder aggregationAdvance(final Duration aggregationAdvance) { - this.aggregationAdvance = aggregationAdvance; + public Uc4KafkaStreamsBuilder gracePeriod(final Duration gracePeriod) { + this.gracePeriod = Objects.requireNonNull(gracePeriod); return this; } @Override protected Topology buildTopology(final Properties properties) { Objects.requireNonNull(this.inputTopic, "Input topic has not been set."); + Objects.requireNonNull(this.feedbackTopic, "Feedback topic has not been set."); Objects.requireNonNull(this.outputTopic, "Output topic has not been set."); - Objects.requireNonNull(this.aggregtionDuration, "Aggregation duration has not been set."); - Objects.requireNonNull(this.aggregationAdvance, "Aggregation advance period has not been set."); + Objects.requireNonNull(this.configurationTopic, "Configuration topic has not been set."); final TopologyBuilder topologyBuilder = new TopologyBuilder( this.inputTopic, this.outputTopic, - new SchemaRegistryAvroSerdeFactory(this.schemaRegistryUrl), - this.aggregtionDuration, - this.aggregationAdvance); + this.feedbackTopic, + this.configurationTopic, + this.emitPeriod == null ? EMIT_PERIOD_DEFAULT : this.emitPeriod, + this.gracePeriod == null ? GRACE_PERIOD_DEFAULT : this.gracePeriod, + new SchemaRegistryAvroSerdeFactory(this.schemaRegistryUrl)); return topologyBuilder.build(properties); } diff --git a/benchmarks/uc4-application/src/main/resources/META-INF/application.properties b/benchmarks/uc4-application/src/main/resources/META-INF/application.properties index b46681533..ce0609107 100644 --- a/benchmarks/uc4-application/src/main/resources/META-INF/application.properties +++ b/benchmarks/uc4-application/src/main/resources/META-INF/application.properties @@ -3,8 +3,11 @@ application.version=0.0.1 kafka.bootstrap.servers=localhost:9092 kafka.input.topic=input +kafka.configuration.topic=configuration +kafka.feedback.topic=aggregation-feedback kafka.output.topic=output -aggregation.duration.days=30 -aggregation.advance.days=1 schema.registry.url=http://localhost:8091 + +emit.period.ms=5000 +grace.period.ms=0 \ No newline at end of file diff --git a/benchmarks/uc2-application/src/test/java/theodolite/uc2/streamprocessing/OptionalParentsSerdeTest.java b/benchmarks/uc4-application/src/test/java/theodolite/uc4/streamprocessing/OptionalParentsSerdeTest.java similarity index 95% rename from benchmarks/uc2-application/src/test/java/theodolite/uc2/streamprocessing/OptionalParentsSerdeTest.java rename to benchmarks/uc4-application/src/test/java/theodolite/uc4/streamprocessing/OptionalParentsSerdeTest.java index 54e8c460e..600fc0b15 100644 --- a/benchmarks/uc2-application/src/test/java/theodolite/uc2/streamprocessing/OptionalParentsSerdeTest.java +++ b/benchmarks/uc4-application/src/test/java/theodolite/uc4/streamprocessing/OptionalParentsSerdeTest.java @@ -1,4 +1,4 @@ -package theodolite.uc2.streamprocessing; +package theodolite.uc4.streamprocessing; import java.util.Optional; import java.util.Set; diff --git a/benchmarks/uc2-application/src/test/java/theodolite/uc2/streamprocessing/ParentsSerdeTest.java b/benchmarks/uc4-application/src/test/java/theodolite/uc4/streamprocessing/ParentsSerdeTest.java similarity index 91% rename from benchmarks/uc2-application/src/test/java/theodolite/uc2/streamprocessing/ParentsSerdeTest.java rename to benchmarks/uc4-application/src/test/java/theodolite/uc4/streamprocessing/ParentsSerdeTest.java index f12604d6a..994593e27 100644 --- a/benchmarks/uc2-application/src/test/java/theodolite/uc2/streamprocessing/ParentsSerdeTest.java +++ b/benchmarks/uc4-application/src/test/java/theodolite/uc4/streamprocessing/ParentsSerdeTest.java @@ -1,4 +1,4 @@ -package theodolite.uc2.streamprocessing; +package theodolite.uc4.streamprocessing; import java.util.Set; import org.junit.Test; diff --git a/benchmarks/uc2-application/src/test/java/theodolite/uc2/streamprocessing/SensorParentKeySerdeTest.java b/benchmarks/uc4-application/src/test/java/theodolite/uc4/streamprocessing/SensorParentKeySerdeTest.java similarity index 92% rename from benchmarks/uc2-application/src/test/java/theodolite/uc2/streamprocessing/SensorParentKeySerdeTest.java rename to benchmarks/uc4-application/src/test/java/theodolite/uc4/streamprocessing/SensorParentKeySerdeTest.java index 7ca99bcb7..34f87fa98 100644 --- a/benchmarks/uc2-application/src/test/java/theodolite/uc2/streamprocessing/SensorParentKeySerdeTest.java +++ b/benchmarks/uc4-application/src/test/java/theodolite/uc4/streamprocessing/SensorParentKeySerdeTest.java @@ -1,4 +1,4 @@ -package theodolite.uc2.streamprocessing; +package theodolite.uc4.streamprocessing; import org.junit.Test; diff --git a/benchmarks/uc2-application/src/test/java/theodolite/uc2/streamprocessing/SerdeTester.java b/benchmarks/uc4-application/src/test/java/theodolite/uc4/streamprocessing/SerdeTester.java similarity index 94% rename from benchmarks/uc2-application/src/test/java/theodolite/uc2/streamprocessing/SerdeTester.java rename to benchmarks/uc4-application/src/test/java/theodolite/uc4/streamprocessing/SerdeTester.java index 8e9f5a360..b5d5f942d 100644 --- a/benchmarks/uc2-application/src/test/java/theodolite/uc2/streamprocessing/SerdeTester.java +++ b/benchmarks/uc4-application/src/test/java/theodolite/uc4/streamprocessing/SerdeTester.java @@ -1,4 +1,4 @@ -package theodolite.uc2.streamprocessing; +package theodolite.uc4.streamprocessing; import static org.junit.Assert.assertEquals; import java.util.function.Function; diff --git a/benchmarks/uc2-application/src/test/java/theodolite/uc2/streamprocessing/SerdeTesterFactory.java b/benchmarks/uc4-application/src/test/java/theodolite/uc4/streamprocessing/SerdeTesterFactory.java similarity index 94% rename from benchmarks/uc2-application/src/test/java/theodolite/uc2/streamprocessing/SerdeTesterFactory.java rename to benchmarks/uc4-application/src/test/java/theodolite/uc4/streamprocessing/SerdeTesterFactory.java index 5cdbfc605..e8083ed77 100644 --- a/benchmarks/uc2-application/src/test/java/theodolite/uc2/streamprocessing/SerdeTesterFactory.java +++ b/benchmarks/uc4-application/src/test/java/theodolite/uc4/streamprocessing/SerdeTesterFactory.java @@ -1,4 +1,4 @@ -package theodolite.uc2.streamprocessing; +package theodolite.uc4.streamprocessing; import org.apache.kafka.common.serialization.Serde; diff --git a/benchmarks/uc4-workload-generator/Dockerfile b/benchmarks/uc4-workload-generator/Dockerfile index 8f077637a..f39923e59 100644 --- a/benchmarks/uc4-workload-generator/Dockerfile +++ b/benchmarks/uc4-workload-generator/Dockerfile @@ -2,5 +2,5 @@ FROM openjdk:11-slim ADD build/distributions/uc4-workload-generator.tar / -CMD JAVA_OPTS="$JAVA_OPTS -Dorg.slf4j.simpleLogger.defaultLogLevel=$LOG_LEVEL" \ +CMD JAVA_OPTS="$JAVA_OPTS -Dorg.slf4j.simpleLogger.defaultLogLevel=$LOG_LEVEL" \ /uc4-workload-generator/bin/uc4-workload-generator diff --git a/benchmarks/uc4-workload-generator/build.gradle b/benchmarks/uc4-workload-generator/build.gradle index 76bbce013..8865ec939 100644 --- a/benchmarks/uc4-workload-generator/build.gradle +++ b/benchmarks/uc4-workload-generator/build.gradle @@ -1 +1 @@ -mainClassName = "theodolite.uc4.workloadgenerator.LoadGenerator" +mainClassName = "theodolite.uc4.workloadgenerator.LoadGenerator" diff --git a/benchmarks/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/ConfigPublisher.java b/benchmarks/uc4-workload-generator/src/main/java/theodolite/uc4/workloadgenerator/ConfigPublisher.java similarity index 98% rename from benchmarks/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/ConfigPublisher.java rename to benchmarks/uc4-workload-generator/src/main/java/theodolite/uc4/workloadgenerator/ConfigPublisher.java index ad24e8e4b..ad0ee7082 100644 --- a/benchmarks/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/ConfigPublisher.java +++ b/benchmarks/uc4-workload-generator/src/main/java/theodolite/uc4/workloadgenerator/ConfigPublisher.java @@ -1,4 +1,4 @@ -package theodolite.uc2.workloadgenerator; +package theodolite.uc4.workloadgenerator; import java.util.Properties; import java.util.concurrent.ExecutionException; diff --git a/benchmarks/uc4-workload-generator/src/main/java/theodolite/uc4/workloadgenerator/LoadGenerator.java b/benchmarks/uc4-workload-generator/src/main/java/theodolite/uc4/workloadgenerator/LoadGenerator.java index c0d885ed1..8320d16b9 100644 --- a/benchmarks/uc4-workload-generator/src/main/java/theodolite/uc4/workloadgenerator/LoadGenerator.java +++ b/benchmarks/uc4-workload-generator/src/main/java/theodolite/uc4/workloadgenerator/LoadGenerator.java @@ -1,22 +1,65 @@ package theodolite.uc4.workloadgenerator; +import java.util.Objects; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import theodolite.commons.workloadgeneration.KeySpace; +import titan.ccp.configuration.events.Event; +import titan.ccp.model.sensorregistry.SensorRegistry; /** * Load generator for Theodolite use case UC4. */ public final class LoadGenerator { + private static final int SLEEP_PERIOD = 30_000; + private static final Logger LOGGER = LoggerFactory.getLogger(LoadGenerator.class); - private LoadGenerator() { - throw new UnsupportedOperationException(); - } + private LoadGenerator() {} + /** + * Start load generator. + */ public static void main(final String[] args) { + final boolean sendRegistry = Boolean.parseBoolean(Objects.requireNonNullElse( + System.getenv("SEND_REGISTRY"), + "true")); + final String kafkaBootstrapServers = Objects.requireNonNullElse( + System.getenv("KAFKA_BOOTSTRAP_SERVERS"), + "localhost:9092"); + final int numSensors = Integer.parseInt(Objects.requireNonNullElse( + System.getenv("NUM_SENSORS"), + "1")); + final int numNestedGroups = Integer.parseInt(Objects.requireNonNullElse( + System.getenv("NUM_NESTED_GROUPS"), + "1")); + + // Build sensor hierarchy + final SensorRegistry sensorRegistry = + new SensorRegistryBuilder(numNestedGroups, numSensors).build(); + LOGGER.info("Start workload generator for use case UC4"); - theodolite.commons.workloadgeneration.LoadGenerator.fromEnvironment().run(); + theodolite.commons.workloadgeneration.LoadGenerator.fromEnvironment() + .withKeySpace(new KeySpace("s_", sensorRegistry.getMachineSensors().size())) + .withBeforeAction(() -> { + if (sendRegistry) { + final ConfigPublisher configPublisher = + new ConfigPublisher(kafkaBootstrapServers, "configuration"); + configPublisher.publish(Event.SENSOR_REGISTRY_CHANGED, sensorRegistry.toJson()); + configPublisher.close(); + LOGGER.info("Configuration sent."); + + LOGGER.info("Now wait 30 seconds..."); + try { + Thread.sleep(SLEEP_PERIOD); + } catch (final InterruptedException e) { + LOGGER.error(e.getMessage(), e); + } + LOGGER.info("...and start generating load."); + } + }) + .run(); } } diff --git a/benchmarks/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/SensorRegistryBuilder.java b/benchmarks/uc4-workload-generator/src/main/java/theodolite/uc4/workloadgenerator/SensorRegistryBuilder.java similarity index 97% rename from benchmarks/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/SensorRegistryBuilder.java rename to benchmarks/uc4-workload-generator/src/main/java/theodolite/uc4/workloadgenerator/SensorRegistryBuilder.java index 7c34ac894..60303056a 100644 --- a/benchmarks/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/SensorRegistryBuilder.java +++ b/benchmarks/uc4-workload-generator/src/main/java/theodolite/uc4/workloadgenerator/SensorRegistryBuilder.java @@ -1,4 +1,4 @@ -package theodolite.uc2.workloadgenerator; +package theodolite.uc4.workloadgenerator; import titan.ccp.model.sensorregistry.MutableAggregatedSensor; import titan.ccp.model.sensorregistry.MutableSensorRegistry; diff --git a/benchmarks/uc2-workload-generator/src/main/resources/META-INF/application.properties b/benchmarks/uc4-workload-generator/src/main/resources/META-INF/application.properties similarity index 100% rename from benchmarks/uc2-workload-generator/src/main/resources/META-INF/application.properties rename to benchmarks/uc4-workload-generator/src/main/resources/META-INF/application.properties diff --git a/benchmarks/uc2-workload-generator/src/test/java/theodolite/uc2/workloadgenerator/SensorRegistryBuilderTest.java b/benchmarks/uc4-workload-generator/src/test/java/theodolite/uc4/workloadgenerator/SensorRegistryBuilderTest.java similarity index 97% rename from benchmarks/uc2-workload-generator/src/test/java/theodolite/uc2/workloadgenerator/SensorRegistryBuilderTest.java rename to benchmarks/uc4-workload-generator/src/test/java/theodolite/uc4/workloadgenerator/SensorRegistryBuilderTest.java index 17b208eda..424c84ec9 100644 --- a/benchmarks/uc2-workload-generator/src/test/java/theodolite/uc2/workloadgenerator/SensorRegistryBuilderTest.java +++ b/benchmarks/uc4-workload-generator/src/test/java/theodolite/uc4/workloadgenerator/SensorRegistryBuilderTest.java @@ -1,4 +1,4 @@ -package theodolite.uc2.workloadgenerator; +package theodolite.uc4.workloadgenerator; import java.util.Collection; -- GitLab