diff --git a/benchmarks/uc1-application/.settings/org.eclipse.jdt.ui.prefs b/benchmarks/uc1-application/.settings/org.eclipse.jdt.ui.prefs index 4e04e2891754324a6e1bf55348b6a38f592bb301..fa98ca63d77bdee891150bd6713f70197a75cefc 100644 --- a/benchmarks/uc1-application/.settings/org.eclipse.jdt.ui.prefs +++ b/benchmarks/uc1-application/.settings/org.eclipse.jdt.ui.prefs @@ -32,7 +32,7 @@ cleanup.qualify_static_member_accesses_with_declaring_class=true cleanup.qualify_static_method_accesses_with_declaring_class=false cleanup.remove_private_constructors=true cleanup.remove_redundant_modifiers=false -cleanup.remove_redundant_semicolons=false +cleanup.remove_redundant_semicolons=true cleanup.remove_redundant_type_arguments=true cleanup.remove_trailing_whitespaces=true cleanup.remove_trailing_whitespaces_all=true diff --git a/benchmarks/uc2-application/Dockerfile b/benchmarks/uc2-application/Dockerfile index 99076645ab5e1c3b1a77d2aec7408dc8846f9f51..5177dcede26016990b73467460fd358823c43c76 100644 --- a/benchmarks/uc2-application/Dockerfile +++ b/benchmarks/uc2-application/Dockerfile @@ -2,5 +2,5 @@ FROM openjdk:11-slim ADD build/distributions/uc2-application.tar / -CMD JAVA_OPTS="$JAVA_OPTS -Dorg.slf4j.simpleLogger.defaultLogLevel=$LOG_LEVEL" \ +CMD JAVA_OPTS="$JAVA_OPTS -Dorg.slf4j.simpleLogger.defaultLogLevel=$LOG_LEVEL" \ /uc2-application/bin/uc2-application \ No newline at end of file diff --git a/benchmarks/uc2-application/build.gradle b/benchmarks/uc2-application/build.gradle index ea3d8779a0cd5406808df190d623d1508a143b9d..e4d3f5346e401def9c9a5a49820d0682eafb0ad3 100644 --- a/benchmarks/uc2-application/build.gradle +++ b/benchmarks/uc2-application/build.gradle @@ -1 +1 @@ -mainClassName = "theodolite.uc2.application.AggregationService" +mainClassName = "theodolite.uc2.application.HistoryService" diff --git a/benchmarks/uc4-application/src/main/java/theodolite/uc4/application/HistoryService.java b/benchmarks/uc2-application/src/main/java/theodolite/uc2/application/HistoryService.java similarity index 64% rename from benchmarks/uc4-application/src/main/java/theodolite/uc4/application/HistoryService.java rename to benchmarks/uc2-application/src/main/java/theodolite/uc2/application/HistoryService.java index 12f35e8dcc532b19e470722094ba5aff07420ad2..1aa28400cc9d55c77518a880d8cc2f48a2823a6b 100644 --- a/benchmarks/uc4-application/src/main/java/theodolite/uc4/application/HistoryService.java +++ b/benchmarks/uc2-application/src/main/java/theodolite/uc2/application/HistoryService.java @@ -1,11 +1,12 @@ -package theodolite.uc4.application; +package theodolite.uc2.application; import java.time.Duration; +import java.util.Objects; import java.util.concurrent.CompletableFuture; import org.apache.commons.configuration2.Configuration; import org.apache.kafka.streams.KafkaStreams; import theodolite.commons.kafkastreams.ConfigurationKeys; -import theodolite.uc4.streamprocessing.Uc4KafkaStreamsBuilder; +import theodolite.uc2.streamprocessing.Uc2KafkaStreamsBuilder; import titan.ccp.common.configuration.ServiceConfigurations; /** @@ -18,6 +19,8 @@ public class HistoryService { private final Configuration config = ServiceConfigurations.createWithDefaults(); private final CompletableFuture<Void> stopEvent = new CompletableFuture<>(); + private final int windowDurationMinutes = Integer + .parseInt(Objects.requireNonNullElse(System.getenv("KAFKA_WINDOW_DURATION_MINUTES"), "60")); /** * Start the service. @@ -31,17 +34,12 @@ public class HistoryService { * */ private void createKafkaStreamsApplication() { - // Use case specific stream configuration - final Uc4KafkaStreamsBuilder uc4KafkaStreamsBuilder = new Uc4KafkaStreamsBuilder(this.config); - uc4KafkaStreamsBuilder + final Uc2KafkaStreamsBuilder uc2KafkaStreamsBuilder = new Uc2KafkaStreamsBuilder(this.config); + uc2KafkaStreamsBuilder .outputTopic(this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC)) - .aggregtionDuration( - Duration.ofDays(this.config.getInt(ConfigurationKeys.AGGREGATION_DURATION_DAYS))) - .aggregationAdvance( - Duration.ofDays(this.config.getInt(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS))); + .windowDuration(Duration.ofMinutes(this.windowDurationMinutes)); - // Configuration of the stream application - final KafkaStreams kafkaStreams = uc4KafkaStreamsBuilder.build(); + final KafkaStreams kafkaStreams = uc2KafkaStreamsBuilder.build(); this.stopEvent.thenRun(kafkaStreams::close); kafkaStreams.start(); diff --git a/benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/TopologyBuilder.java b/benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/TopologyBuilder.java index 74e9bb99b80efec4c27d7eb50668d622a5d951f9..eda7c495a2cff6d58b62a8a6a74ea8e1b2d89aca 100644 --- a/benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/TopologyBuilder.java +++ b/benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/TopologyBuilder.java @@ -1,206 +1,74 @@ package theodolite.uc2.streamprocessing; +import com.google.common.math.Stats; import java.time.Duration; import java.util.Properties; -import java.util.Set; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.Consumed; -import org.apache.kafka.streams.kstream.Grouped; -import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Produced; -import org.apache.kafka.streams.kstream.Suppressed; -import org.apache.kafka.streams.kstream.Suppressed.BufferConfig; import org.apache.kafka.streams.kstream.TimeWindows; -import org.apache.kafka.streams.kstream.Windowed; -import org.apache.kafka.streams.kstream.WindowedSerdes; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import theodolite.uc2.streamprocessing.util.StatsFactory; +import titan.ccp.common.kafka.GenericSerde; import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; -import titan.ccp.configuration.events.Event; -import titan.ccp.configuration.events.EventSerde; import titan.ccp.model.records.ActivePowerRecord; -import titan.ccp.model.records.AggregatedActivePowerRecord; -import titan.ccp.model.sensorregistry.SensorRegistry; /** * Builds Kafka Stream Topology for the History microservice. */ public class TopologyBuilder { - // Streams Variables + + private static final Logger LOGGER = LoggerFactory.getLogger(TopologyBuilder.class); + private final String inputTopic; - private final String feedbackTopic; private final String outputTopic; - private final String configurationTopic; - private final Duration emitPeriod; - private final Duration gracePeriod; - - // Serdes private final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory; + private final Duration duration; private final StreamsBuilder builder = new StreamsBuilder(); - private final RecordAggregator recordAggregator = new RecordAggregator(); /** * Create a new {@link TopologyBuilder} using the given topics. - * - * @param inputTopic The topic where to read sensor measurements from. - * @param configurationTopic The topic where the hierarchy of the sensors is published. - * @param feedbackTopic The topic where aggregation results are written to for feedback. - * @param outputTopic The topic where to publish aggregation results. - * @param emitPeriod The Duration results are emitted with. - * @param gracePeriod The Duration for how long late arriving records are considered. - * @param srAvroSerdeFactory Factory for creating avro SERDEs - * */ public TopologyBuilder(final String inputTopic, final String outputTopic, - final String feedbackTopic, final String configurationTopic, - final Duration emitPeriod, final Duration gracePeriod, - final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory) { + final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory, + final Duration duration) { this.inputTopic = inputTopic; this.outputTopic = outputTopic; - this.feedbackTopic = feedbackTopic; - this.configurationTopic = configurationTopic; - this.emitPeriod = emitPeriod; - this.gracePeriod = gracePeriod; - this.srAvroSerdeFactory = srAvroSerdeFactory; + this.duration = duration; } /** - * Build the {@link Topology} for the Aggregation microservice. + * Build the {@link Topology} for the History microservice. */ public Topology build(final Properties properties) { - // 1. Build Parent-Sensor Table - final KTable<String, Set<String>> parentSensorTable = this.buildParentSensorTable(); - - // 2. Build Input Table - final KTable<String, ActivePowerRecord> inputTable = this.buildInputTable(); - - // 3. Build Last Value Table from Input and Parent-Sensor Table - final KTable<Windowed<SensorParentKey>, ActivePowerRecord> lastValueTable = - this.buildLastValueTable(parentSensorTable, inputTable); - - // 4. Build Aggregations Stream - final KTable<Windowed<String>, AggregatedActivePowerRecord> aggregations = - this.buildAggregationStream(lastValueTable); - - // 6. Expose Feedback Stream - this.exposeFeedbackStream(aggregations); - - // 5. Expose Aggregations Stream - this.exposeOutputStream(aggregations); - - return this.builder.build(properties); - } - - private KTable<String, ActivePowerRecord> buildInputTable() { - final KStream<String, ActivePowerRecord> values = this.builder - .stream(this.inputTopic, Consumed.with( - Serdes.String(), - this.srAvroSerdeFactory.forValues())); - - final KStream<String, ActivePowerRecord> aggregationsInput = this.builder - .stream(this.feedbackTopic, Consumed.with( - Serdes.String(), - this.srAvroSerdeFactory.<AggregatedActivePowerRecord>forValues())) - .mapValues(r -> new ActivePowerRecord(r.getIdentifier(), r.getTimestamp(), r.getSumInW())); - - final KTable<String, ActivePowerRecord> inputTable = values - .merge(aggregationsInput) - .groupByKey(Grouped.with( - Serdes.String(), - this.srAvroSerdeFactory.forValues())) - .reduce((aggr, value) -> value, Materialized.with( - Serdes.String(), - this.srAvroSerdeFactory.forValues())); - return inputTable; - } - - private KTable<String, Set<String>> buildParentSensorTable() { - final KStream<Event, String> configurationStream = this.builder - .stream(this.configurationTopic, Consumed.with(EventSerde.serde(), Serdes.String())) - .filter((key, value) -> key == Event.SENSOR_REGISTRY_CHANGED - || key == Event.SENSOR_REGISTRY_STATUS); - - return configurationStream - .mapValues(data -> SensorRegistry.fromJson(data)) - .flatTransform(new ChildParentsTransformerSupplier()) - .groupByKey(Grouped.with(Serdes.String(), OptionalParentsSerde.serde())) - .aggregate( - () -> Set.<String>of(), - (key, newValue, oldValue) -> newValue.orElse(null), - Materialized.with(Serdes.String(), ParentsSerde.serde())); - } - - private KTable<Windowed<SensorParentKey>, ActivePowerRecord> buildLastValueTable( - final KTable<String, Set<String>> parentSensorTable, - final KTable<String, ActivePowerRecord> inputTable) { - - return inputTable - .join(parentSensorTable, (record, parents) -> new JointRecordParents(parents, record)) - .toStream() - .flatTransform(new JointFlatTransformerSupplier()) - .groupByKey(Grouped.with( - SensorParentKeySerde.serde(), - this.srAvroSerdeFactory.forValues())) - .windowedBy(TimeWindows.of(this.emitPeriod).grace(this.gracePeriod)) - .reduce( - // TODO Configurable window aggregation function - (oldVal, newVal) -> newVal.getTimestamp() >= oldVal.getTimestamp() ? newVal : oldVal, - Materialized.with( - SensorParentKeySerde.serde(), - this.srAvroSerdeFactory.forValues())); - } - - private KTable<Windowed<String>, AggregatedActivePowerRecord> buildAggregationStream( - final KTable<Windowed<SensorParentKey>, ActivePowerRecord> lastValueTable) { - return lastValueTable - .groupBy( - (k, v) -> KeyValue.pair(new Windowed<>(k.key().getParent(), k.window()), v), - Grouped.with( - new WindowedSerdes.TimeWindowedSerde<>( - Serdes.String(), - this.emitPeriod.toMillis()), - this.srAvroSerdeFactory.forValues())) + this.builder + .stream(this.inputTopic, + Consumed.with(Serdes.String(), + this.srAvroSerdeFactory.<ActivePowerRecord>forValues())) + .groupByKey() + .windowedBy(TimeWindows.of(this.duration)) + // .aggregate( + // () -> 0.0, + // (key, activePowerRecord, agg) -> agg + activePowerRecord.getValueInW(), + // Materialized.with(Serdes.String(), Serdes.Double())) .aggregate( - () -> null, - this.recordAggregator::add, - this.recordAggregator::substract, + () -> Stats.of(), + (k, record, stats) -> StatsFactory.accumulate(stats, record.getValueInW()), Materialized.with( - new WindowedSerdes.TimeWindowedSerde<>( - Serdes.String(), - this.emitPeriod.toMillis()), - this.srAvroSerdeFactory.forValues())) - // TODO timestamp -1 indicates that this record is emitted by an substract event - .filter((k, record) -> record.getTimestamp() != -1); - } - - private void exposeFeedbackStream( - final KTable<Windowed<String>, AggregatedActivePowerRecord> aggregations) { - - aggregations + Serdes.String(), + GenericSerde.from(Stats::toByteArray, Stats::fromByteArray))) .toStream() - .filter((k, record) -> record != null) - .selectKey((k, v) -> k.key()) - .to(this.feedbackTopic, Produced.with( - Serdes.String(), - this.srAvroSerdeFactory.forValues())); - } - - private void exposeOutputStream( - final KTable<Windowed<String>, AggregatedActivePowerRecord> aggregations) { + .map((k, s) -> KeyValue.pair(k.key(), s.toString())) + .peek((k, v) -> LOGGER.info(k + ": " + v)) + .to(this.outputTopic, Produced.with(Serdes.String(), Serdes.String())); - aggregations - // .suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded())) - .suppress(Suppressed.untilTimeLimit(this.emitPeriod, BufferConfig.unbounded())) - .toStream() - .filter((k, record) -> record != null) - .selectKey((k, v) -> k.key()) - .to(this.outputTopic, Produced.with( - Serdes.String(), - this.srAvroSerdeFactory.forValues())); + return this.builder.build(properties); } } diff --git a/benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/Uc2KafkaStreamsBuilder.java b/benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/Uc2KafkaStreamsBuilder.java index 7e077b101c0e1bfab359fc347ffe8c4acc9b88fc..1d6019f27cb78f6643e111095edbbdd9f6c03e1b 100644 --- a/benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/Uc2KafkaStreamsBuilder.java +++ b/benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/Uc2KafkaStreamsBuilder.java @@ -11,62 +11,33 @@ import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; /** * Builder for the Kafka Streams configuration. */ -public class Uc2KafkaStreamsBuilder extends KafkaStreamsBuilder { // NOPMD builder method +public class Uc2KafkaStreamsBuilder extends KafkaStreamsBuilder { - private static final Duration EMIT_PERIOD_DEFAULT = Duration.ofSeconds(1); - private static final Duration GRACE_PERIOD_DEFAULT = Duration.ZERO; - - private String feedbackTopic; // NOPMD private String outputTopic; // NOPMD - private String configurationTopic; // NOPMD - private Duration emitPeriod; // NOPMD - private Duration gracePeriod; // NOPMD + private Duration windowDuration; // NOPMD public Uc2KafkaStreamsBuilder(final Configuration config) { super(config); } - public Uc2KafkaStreamsBuilder feedbackTopic(final String feedbackTopic) { - this.feedbackTopic = feedbackTopic; - return this; - } - public Uc2KafkaStreamsBuilder outputTopic(final String outputTopic) { this.outputTopic = outputTopic; return this; } - public Uc2KafkaStreamsBuilder configurationTopic(final String configurationTopic) { - this.configurationTopic = configurationTopic; - return this; - } - - public Uc2KafkaStreamsBuilder emitPeriod(final Duration emitPeriod) { - this.emitPeriod = Objects.requireNonNull(emitPeriod); - return this; - } - - public Uc2KafkaStreamsBuilder gracePeriod(final Duration gracePeriod) { - this.gracePeriod = Objects.requireNonNull(gracePeriod); + public Uc2KafkaStreamsBuilder windowDuration(final Duration windowDuration) { + this.windowDuration = windowDuration; return this; } @Override protected Topology buildTopology(final Properties properties) { Objects.requireNonNull(this.inputTopic, "Input topic has not been set."); - Objects.requireNonNull(this.feedbackTopic, "Feedback topic has not been set."); Objects.requireNonNull(this.outputTopic, "Output topic has not been set."); - Objects.requireNonNull(this.configurationTopic, "Configuration topic has not been set."); - - final TopologyBuilder topologyBuilder = new TopologyBuilder( - this.inputTopic, - this.outputTopic, - this.feedbackTopic, - this.configurationTopic, - this.emitPeriod == null ? EMIT_PERIOD_DEFAULT : this.emitPeriod, - this.gracePeriod == null ? GRACE_PERIOD_DEFAULT : this.gracePeriod, - new SchemaRegistryAvroSerdeFactory(this.schemaRegistryUrl)); + Objects.requireNonNull(this.windowDuration, "Window duration has not been set."); + final TopologyBuilder topologyBuilder = new TopologyBuilder(this.inputTopic, this.outputTopic, + new SchemaRegistryAvroSerdeFactory(this.schemaRegistryUrl), this.windowDuration); return topologyBuilder.build(properties); } diff --git a/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/util/StatsFactory.java b/benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/util/StatsFactory.java similarity index 91% rename from benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/util/StatsFactory.java rename to benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/util/StatsFactory.java index e97fbcd216c57a8aa965ee7a295c5633fa34810e..e4aff4fc80cea24c20be537f6aa5cda7c2be909a 100644 --- a/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/util/StatsFactory.java +++ b/benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/util/StatsFactory.java @@ -1,4 +1,4 @@ -package theodolite.uc4.streamprocessing.util; +package theodolite.uc2.streamprocessing.util; import com.google.common.math.Stats; import com.google.common.math.StatsAccumulator; diff --git a/benchmarks/uc2-application/src/main/resources/META-INF/application.properties b/benchmarks/uc2-application/src/main/resources/META-INF/application.properties index 8f1af5f590eff7f2b12706d61a7c89d9152f7949..15293b1387b96688401bbc48bc2d1615c7b63aba 100644 --- a/benchmarks/uc2-application/src/main/resources/META-INF/application.properties +++ b/benchmarks/uc2-application/src/main/resources/META-INF/application.properties @@ -3,11 +3,7 @@ application.version=0.0.1 kafka.bootstrap.servers=localhost:9092 kafka.input.topic=input -kafka.configuration.topic=configuration -kafka.feedback.topic=aggregation-feedback kafka.output.topic=output +kafka.window.duration.minutes=1 schema.registry.url=http://localhost:8091 - -emit.period.ms=5000 -grace.period.ms=0 \ No newline at end of file diff --git a/benchmarks/uc2-workload-generator/Dockerfile b/benchmarks/uc2-workload-generator/Dockerfile index 162243e055732de84d1680dba609425f4068dbc2..55593e0295efb0c4f7d4c484b1b104c256f9b958 100644 --- a/benchmarks/uc2-workload-generator/Dockerfile +++ b/benchmarks/uc2-workload-generator/Dockerfile @@ -1,6 +1,6 @@ -FROM openjdk:11-slim - -ADD build/distributions/uc2-workload-generator.tar / - -CMD JAVA_OPTS="$JAVA_OPTS -Dorg.slf4j.simpleLogger.defaultLogLevel=$LOG_LEVEL" \ +FROM openjdk:11-slim + +ADD build/distributions/uc2-workload-generator.tar / + +CMD JAVA_OPTS="$JAVA_OPTS -Dorg.slf4j.simpleLogger.defaultLogLevel=$LOG_LEVEL" \ /uc2-workload-generator/bin/uc2-workload-generator \ No newline at end of file diff --git a/benchmarks/uc2-workload-generator/build.gradle b/benchmarks/uc2-workload-generator/build.gradle index b92e0c2edc54786ea957338b9981922f0a6a7b32..f2c3e5d2e73b655dffd94222ecfbc4fc31b7f722 100644 --- a/benchmarks/uc2-workload-generator/build.gradle +++ b/benchmarks/uc2-workload-generator/build.gradle @@ -1 +1 @@ -mainClassName = "theodolite.uc2.workloadgenerator.LoadGenerator" +mainClassName = "theodolite.uc2.workloadgenerator.LoadGenerator" diff --git a/benchmarks/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/LoadGenerator.java b/benchmarks/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/LoadGenerator.java index 826387c484455fed4a7accb5dda56a66a4b63713..2c5b59bc19f703c4216bc02920b62bcf9da5d5fb 100644 --- a/benchmarks/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/LoadGenerator.java +++ b/benchmarks/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/LoadGenerator.java @@ -1,65 +1,19 @@ package theodolite.uc2.workloadgenerator; -import java.util.Objects; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import theodolite.commons.workloadgeneration.KeySpace; -import titan.ccp.configuration.events.Event; -import titan.ccp.model.sensorregistry.SensorRegistry; /** * Load generator for Theodolite use case UC2. */ public final class LoadGenerator { - private static final int SLEEP_PERIOD = 30_000; - private static final Logger LOGGER = LoggerFactory.getLogger(LoadGenerator.class); private LoadGenerator() {} - /** - * Start load generator. - */ public static void main(final String[] args) { - final boolean sendRegistry = Boolean.parseBoolean(Objects.requireNonNullElse( - System.getenv("SEND_REGISTRY"), - "true")); - final String kafkaBootstrapServers = Objects.requireNonNullElse( - System.getenv("KAFKA_BOOTSTRAP_SERVERS"), - "localhost:9092"); - final int numSensors = Integer.parseInt(Objects.requireNonNullElse( - System.getenv("NUM_SENSORS"), - "1")); - final int numNestedGroups = Integer.parseInt(Objects.requireNonNullElse( - System.getenv("NUM_NESTED_GROUPS"), - "1")); - - // Build sensor hierarchy - final SensorRegistry sensorRegistry = - new SensorRegistryBuilder(numNestedGroups, numSensors).build(); - LOGGER.info("Start workload generator for use case UC2"); - theodolite.commons.workloadgeneration.LoadGenerator.fromEnvironment() - .withKeySpace(new KeySpace("s_", sensorRegistry.getMachineSensors().size())) - .withBeforeAction(() -> { - if (sendRegistry) { - final ConfigPublisher configPublisher = - new ConfigPublisher(kafkaBootstrapServers, "configuration"); - configPublisher.publish(Event.SENSOR_REGISTRY_CHANGED, sensorRegistry.toJson()); - configPublisher.close(); - LOGGER.info("Configuration sent."); - - LOGGER.info("Now wait 30 seconds..."); - try { - Thread.sleep(SLEEP_PERIOD); - } catch (final InterruptedException e) { - LOGGER.error(e.getMessage(), e); - } - LOGGER.info("...and start generating load."); - } - }) - .run(); + theodolite.commons.workloadgeneration.LoadGenerator.fromEnvironment().run(); } - } diff --git a/benchmarks/uc3-application/.settings/org.eclipse.jdt.ui.prefs b/benchmarks/uc3-application/.settings/org.eclipse.jdt.ui.prefs index 4e04e2891754324a6e1bf55348b6a38f592bb301..fa98ca63d77bdee891150bd6713f70197a75cefc 100644 --- a/benchmarks/uc3-application/.settings/org.eclipse.jdt.ui.prefs +++ b/benchmarks/uc3-application/.settings/org.eclipse.jdt.ui.prefs @@ -32,7 +32,7 @@ cleanup.qualify_static_member_accesses_with_declaring_class=true cleanup.qualify_static_method_accesses_with_declaring_class=false cleanup.remove_private_constructors=true cleanup.remove_redundant_modifiers=false -cleanup.remove_redundant_semicolons=false +cleanup.remove_redundant_semicolons=true cleanup.remove_redundant_type_arguments=true cleanup.remove_trailing_whitespaces=true cleanup.remove_trailing_whitespaces_all=true diff --git a/benchmarks/uc3-application/Dockerfile b/benchmarks/uc3-application/Dockerfile index c70a24268e114e924b5f06dc7a8979100f5d8455..61141baaf752af4b596c8a04cd0d7cc2e6d740af 100644 --- a/benchmarks/uc3-application/Dockerfile +++ b/benchmarks/uc3-application/Dockerfile @@ -1,8 +1,6 @@ FROM openjdk:11-slim - ADD build/distributions/uc3-application.tar / - CMD JAVA_OPTS="$JAVA_OPTS -Dorg.slf4j.simpleLogger.defaultLogLevel=$LOG_LEVEL" \ - /uc3-application/bin/uc3-application \ No newline at end of file + /uc3-application/bin/uc3-application diff --git a/benchmarks/uc3-application/src/main/java/theodolite/uc3/application/HistoryService.java b/benchmarks/uc3-application/src/main/java/theodolite/uc3/application/HistoryService.java index 349512f988bb182d8851e458a1bce244c756bbfe..84fb29969d2ce37a1d443752790379b1af634df5 100644 --- a/benchmarks/uc3-application/src/main/java/theodolite/uc3/application/HistoryService.java +++ b/benchmarks/uc3-application/src/main/java/theodolite/uc3/application/HistoryService.java @@ -1,7 +1,6 @@ package theodolite.uc3.application; import java.time.Duration; -import java.util.Objects; import java.util.concurrent.CompletableFuture; import org.apache.commons.configuration2.Configuration; import org.apache.kafka.streams.KafkaStreams; @@ -19,8 +18,6 @@ public class HistoryService { private final Configuration config = ServiceConfigurations.createWithDefaults(); private final CompletableFuture<Void> stopEvent = new CompletableFuture<>(); - private final int windowDurationMinutes = Integer - .parseInt(Objects.requireNonNullElse(System.getenv("KAFKA_WINDOW_DURATION_MINUTES"), "60")); /** * Start the service. @@ -34,11 +31,16 @@ public class HistoryService { * */ private void createKafkaStreamsApplication() { + // Use case specific stream configuration final Uc3KafkaStreamsBuilder uc3KafkaStreamsBuilder = new Uc3KafkaStreamsBuilder(this.config); uc3KafkaStreamsBuilder .outputTopic(this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC)) - .windowDuration(Duration.ofMinutes(this.windowDurationMinutes)); + .aggregtionDuration( + Duration.ofDays(this.config.getInt(ConfigurationKeys.AGGREGATION_DURATION_DAYS))) + .aggregationAdvance( + Duration.ofDays(this.config.getInt(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS))); + // Configuration of the stream application final KafkaStreams kafkaStreams = uc3KafkaStreamsBuilder.build(); this.stopEvent.thenRun(kafkaStreams::close); diff --git a/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/HourOfDayKey.java b/benchmarks/uc3-application/src/main/java/theodolite/uc3/streamprocessing/HourOfDayKey.java similarity index 96% rename from benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/HourOfDayKey.java rename to benchmarks/uc3-application/src/main/java/theodolite/uc3/streamprocessing/HourOfDayKey.java index 97807e3bdecf4000cc2edeed364b8f9d1bc9bb8e..549674f9f546a26d38491195edc2139aeadd785b 100644 --- a/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/HourOfDayKey.java +++ b/benchmarks/uc3-application/src/main/java/theodolite/uc3/streamprocessing/HourOfDayKey.java @@ -1,4 +1,4 @@ -package theodolite.uc4.streamprocessing; +package theodolite.uc3.streamprocessing; import java.util.Objects; diff --git a/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/HourOfDayKeyFactory.java b/benchmarks/uc3-application/src/main/java/theodolite/uc3/streamprocessing/HourOfDayKeyFactory.java similarity index 92% rename from benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/HourOfDayKeyFactory.java rename to benchmarks/uc3-application/src/main/java/theodolite/uc3/streamprocessing/HourOfDayKeyFactory.java index edb9ad2b20ac645dfade840130e1be67d2505304..837ca9d32e1a353917adcd3f70eb1af51d801613 100644 --- a/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/HourOfDayKeyFactory.java +++ b/benchmarks/uc3-application/src/main/java/theodolite/uc3/streamprocessing/HourOfDayKeyFactory.java @@ -1,4 +1,4 @@ -package theodolite.uc4.streamprocessing; +package theodolite.uc3.streamprocessing; import java.time.LocalDateTime; diff --git a/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/HourOfDayKeySerde.java b/benchmarks/uc3-application/src/main/java/theodolite/uc3/streamprocessing/HourOfDayKeySerde.java similarity index 96% rename from benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/HourOfDayKeySerde.java rename to benchmarks/uc3-application/src/main/java/theodolite/uc3/streamprocessing/HourOfDayKeySerde.java index ff404ab121ca2e60da65f11d89b8ec5849bd600d..6855907e7f357d681c3bd9a6054bf15ad29711ed 100644 --- a/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/HourOfDayKeySerde.java +++ b/benchmarks/uc3-application/src/main/java/theodolite/uc3/streamprocessing/HourOfDayKeySerde.java @@ -1,4 +1,4 @@ -package theodolite.uc4.streamprocessing; +package theodolite.uc3.streamprocessing; import org.apache.kafka.common.serialization.Serde; import titan.ccp.common.kafka.simpleserdes.BufferSerde; diff --git a/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/HourOfDayRecordFactory.java b/benchmarks/uc3-application/src/main/java/theodolite/uc3/streamprocessing/HourOfDayRecordFactory.java similarity index 95% rename from benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/HourOfDayRecordFactory.java rename to benchmarks/uc3-application/src/main/java/theodolite/uc3/streamprocessing/HourOfDayRecordFactory.java index 7249309cea036bff9203ce9a7aa32489f69edebe..dfa9b95b08b95bf29621969c56a1e76cdcfc7877 100644 --- a/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/HourOfDayRecordFactory.java +++ b/benchmarks/uc3-application/src/main/java/theodolite/uc3/streamprocessing/HourOfDayRecordFactory.java @@ -1,4 +1,4 @@ -package theodolite.uc4.streamprocessing; +package theodolite.uc3.streamprocessing; import com.google.common.math.Stats; import org.apache.kafka.streams.kstream.Windowed; diff --git a/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/RecordDatabaseAdapter.java b/benchmarks/uc3-application/src/main/java/theodolite/uc3/streamprocessing/RecordDatabaseAdapter.java similarity index 98% rename from benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/RecordDatabaseAdapter.java rename to benchmarks/uc3-application/src/main/java/theodolite/uc3/streamprocessing/RecordDatabaseAdapter.java index 8f693d5d3d309eb73a017b8d33dfcd63e70724fb..342cb3e04cd632fc4e8129de0bad6f12e8119dfa 100644 --- a/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/RecordDatabaseAdapter.java +++ b/benchmarks/uc3-application/src/main/java/theodolite/uc3/streamprocessing/RecordDatabaseAdapter.java @@ -1,4 +1,4 @@ -package theodolite.uc4.streamprocessing; +package theodolite.uc3.streamprocessing; import java.util.Collection; import java.util.List; diff --git a/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/StatsKeyFactory.java b/benchmarks/uc3-application/src/main/java/theodolite/uc3/streamprocessing/StatsKeyFactory.java similarity index 88% rename from benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/StatsKeyFactory.java rename to benchmarks/uc3-application/src/main/java/theodolite/uc3/streamprocessing/StatsKeyFactory.java index cf67efbd34362c337a956d80f14731cf9b9d6b77..0e414c4a13f1cf7df1da5f0026b6de82e1c1c6ce 100644 --- a/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/StatsKeyFactory.java +++ b/benchmarks/uc3-application/src/main/java/theodolite/uc3/streamprocessing/StatsKeyFactory.java @@ -1,4 +1,4 @@ -package theodolite.uc4.streamprocessing; +package theodolite.uc3.streamprocessing; import java.time.LocalDateTime; diff --git a/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/StatsRecordFactory.java b/benchmarks/uc3-application/src/main/java/theodolite/uc3/streamprocessing/StatsRecordFactory.java similarity index 94% rename from benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/StatsRecordFactory.java rename to benchmarks/uc3-application/src/main/java/theodolite/uc3/streamprocessing/StatsRecordFactory.java index 79eb4b9f76e4429cf84d0af0e56875ea0386e218..31935df9db0949b05e602109b3edc23dee9499af 100644 --- a/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/StatsRecordFactory.java +++ b/benchmarks/uc3-application/src/main/java/theodolite/uc3/streamprocessing/StatsRecordFactory.java @@ -1,4 +1,4 @@ -package theodolite.uc4.streamprocessing; +package theodolite.uc3.streamprocessing; import com.google.common.math.Stats; import org.apache.avro.specific.SpecificRecord; diff --git a/benchmarks/uc3-application/src/main/java/theodolite/uc3/streamprocessing/TopologyBuilder.java b/benchmarks/uc3-application/src/main/java/theodolite/uc3/streamprocessing/TopologyBuilder.java index d6d6d4ffb7ebb1236be73dd681c900311853e732..1e976c07158720b3681d89413a5f277b1395f32d 100644 --- a/benchmarks/uc3-application/src/main/java/theodolite/uc3/streamprocessing/TopologyBuilder.java +++ b/benchmarks/uc3-application/src/main/java/theodolite/uc3/streamprocessing/TopologyBuilder.java @@ -2,17 +2,20 @@ package theodolite.uc3.streamprocessing; import com.google.common.math.Stats; import java.time.Duration; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; import java.util.Properties; +import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Grouped; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.kstream.TimeWindows; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import theodolite.uc3.streamprocessing.util.StatsFactory; import titan.ccp.common.kafka.GenericSerde; import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; @@ -23,12 +26,16 @@ import titan.ccp.model.records.ActivePowerRecord; */ public class TopologyBuilder { - private static final Logger LOGGER = LoggerFactory.getLogger(TopologyBuilder.class); + // private static final Logger LOGGER = LoggerFactory.getLogger(TopologyBuilder.class); + + private final ZoneId zone = ZoneId.of("Europe/Paris"); // TODO as parameter + private final String inputTopic; private final String outputTopic; private final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory; - private final Duration duration; + private final Duration aggregtionDuration; + private final Duration aggregationAdvance; private final StreamsBuilder builder = new StreamsBuilder(); @@ -37,37 +44,51 @@ public class TopologyBuilder { */ public TopologyBuilder(final String inputTopic, final String outputTopic, final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory, - final Duration duration) { + final Duration aggregtionDuration, final Duration aggregationAdvance) { this.inputTopic = inputTopic; this.outputTopic = outputTopic; this.srAvroSerdeFactory = srAvroSerdeFactory; - this.duration = duration; + this.aggregtionDuration = aggregtionDuration; + this.aggregationAdvance = aggregationAdvance; } /** * Build the {@link Topology} for the History microservice. */ public Topology build(final Properties properties) { + final StatsKeyFactory<HourOfDayKey> keyFactory = new HourOfDayKeyFactory(); + final Serde<HourOfDayKey> keySerde = HourOfDayKeySerde.create(); + this.builder .stream(this.inputTopic, Consumed.with(Serdes.String(), this.srAvroSerdeFactory.<ActivePowerRecord>forValues())) - .groupByKey() - .windowedBy(TimeWindows.of(this.duration)) - // .aggregate( - // () -> 0.0, - // (key, activePowerRecord, agg) -> agg + activePowerRecord.getValueInW(), - // Materialized.with(Serdes.String(), Serdes.Double())) + .selectKey((key, value) -> { + final Instant instant = Instant.ofEpochMilli(value.getTimestamp()); + final LocalDateTime dateTime = LocalDateTime.ofInstant(instant, this.zone); + return keyFactory.createKey(value.getIdentifier(), dateTime); + }) + .groupByKey( + Grouped.with(keySerde, this.srAvroSerdeFactory.forValues())) + .windowedBy(TimeWindows.of(this.aggregtionDuration).advanceBy(this.aggregationAdvance)) .aggregate( () -> Stats.of(), (k, record, stats) -> StatsFactory.accumulate(stats, record.getValueInW()), - Materialized.with( - Serdes.String(), + Materialized.with(keySerde, GenericSerde.from(Stats::toByteArray, Stats::fromByteArray))) .toStream() - .map((k, s) -> KeyValue.pair(k.key(), s.toString())) - .peek((k, v) -> LOGGER.info(k + ": " + v)) - .to(this.outputTopic, Produced.with(Serdes.String(), Serdes.String())); + .map((key, stats) -> KeyValue.pair( + keyFactory.getSensorId(key.key()), + stats.toString())) + // TODO + // statsRecordFactory.create(key, value))) + // .peek((k, v) -> LOGGER.info("{}: {}", k, v)) // TODO Temp logging + .to( + this.outputTopic, + Produced.with( + Serdes.String(), + Serdes.String())); + // this.serdes.avroValues())); return this.builder.build(properties); } diff --git a/benchmarks/uc3-application/src/main/java/theodolite/uc3/streamprocessing/Uc3KafkaStreamsBuilder.java b/benchmarks/uc3-application/src/main/java/theodolite/uc3/streamprocessing/Uc3KafkaStreamsBuilder.java index 70113271a9d3c23499b85c07bf9d0a76db59f820..ea9b064602b1aa7cf7350826da18990ae3191d43 100644 --- a/benchmarks/uc3-application/src/main/java/theodolite/uc3/streamprocessing/Uc3KafkaStreamsBuilder.java +++ b/benchmarks/uc3-application/src/main/java/theodolite/uc3/streamprocessing/Uc3KafkaStreamsBuilder.java @@ -14,7 +14,8 @@ import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; public class Uc3KafkaStreamsBuilder extends KafkaStreamsBuilder { private String outputTopic; // NOPMD - private Duration windowDuration; // NOPMD + private Duration aggregtionDuration; // NOPMD + private Duration aggregationAdvance; // NOPMD public Uc3KafkaStreamsBuilder(final Configuration config) { super(config); @@ -25,8 +26,13 @@ public class Uc3KafkaStreamsBuilder extends KafkaStreamsBuilder { return this; } - public Uc3KafkaStreamsBuilder windowDuration(final Duration windowDuration) { - this.windowDuration = windowDuration; + public Uc3KafkaStreamsBuilder aggregtionDuration(final Duration aggregtionDuration) { + this.aggregtionDuration = aggregtionDuration; + return this; + } + + public Uc3KafkaStreamsBuilder aggregationAdvance(final Duration aggregationAdvance) { + this.aggregationAdvance = aggregationAdvance; return this; } @@ -34,10 +40,16 @@ public class Uc3KafkaStreamsBuilder extends KafkaStreamsBuilder { protected Topology buildTopology(final Properties properties) { Objects.requireNonNull(this.inputTopic, "Input topic has not been set."); Objects.requireNonNull(this.outputTopic, "Output topic has not been set."); - Objects.requireNonNull(this.windowDuration, "Window duration has not been set."); + Objects.requireNonNull(this.aggregtionDuration, "Aggregation duration has not been set."); + Objects.requireNonNull(this.aggregationAdvance, "Aggregation advance period has not been set."); + + final TopologyBuilder topologyBuilder = new TopologyBuilder( + this.inputTopic, + this.outputTopic, + new SchemaRegistryAvroSerdeFactory(this.schemaRegistryUrl), + this.aggregtionDuration, + this.aggregationAdvance); - final TopologyBuilder topologyBuilder = new TopologyBuilder(this.inputTopic, this.outputTopic, - new SchemaRegistryAvroSerdeFactory(this.schemaRegistryUrl), this.windowDuration); return topologyBuilder.build(properties); } diff --git a/benchmarks/uc3-application/src/main/resources/META-INF/application.properties b/benchmarks/uc3-application/src/main/resources/META-INF/application.properties index 011406f7ef1e23647eeae150d349f472214cbcd4..1273441a61763325c812541e1af8c243f81a31a5 100644 --- a/benchmarks/uc3-application/src/main/resources/META-INF/application.properties +++ b/benchmarks/uc3-application/src/main/resources/META-INF/application.properties @@ -4,6 +4,7 @@ application.version=0.0.1 kafka.bootstrap.servers=localhost:9092 kafka.input.topic=input kafka.output.topic=output -kafka.window.duration.minutes=1 +aggregation.duration.days=30 +aggregation.advance.days=1 schema.registry.url=http://localhost:8091 diff --git a/benchmarks/uc3-workload-generator/Dockerfile b/benchmarks/uc3-workload-generator/Dockerfile index 6efd5ec6163815c467ef22e18f3d2cc1e0e3259a..8422c9d5371b86ced0a38c141c461aef452133ac 100644 --- a/benchmarks/uc3-workload-generator/Dockerfile +++ b/benchmarks/uc3-workload-generator/Dockerfile @@ -1,6 +1,6 @@ -FROM openjdk:11-slim - -ADD build/distributions/uc3-workload-generator.tar / - -CMD JAVA_OPTS="$JAVA_OPTS -Dorg.slf4j.simpleLogger.defaultLogLevel=$LOG_LEVEL" \ - /uc3-workload-generator/bin/uc3-workload-generator \ No newline at end of file +FROM openjdk:11-slim + +ADD build/distributions/uc3-workload-generator.tar / + +CMD JAVA_OPTS="$JAVA_OPTS -Dorg.slf4j.simpleLogger.defaultLogLevel=$LOG_LEVEL" \ + /uc3-workload-generator/bin/uc3-workload-generator diff --git a/benchmarks/uc3-workload-generator/src/main/java/theodolite/uc3/workloadgenerator/LoadGenerator.java b/benchmarks/uc3-workload-generator/src/main/java/theodolite/uc3/workloadgenerator/LoadGenerator.java index 662113fd1ae76e64d13933a01d18d9d08e950613..97527abfdd86f5ea39c20c3da31cd7cd26b674e5 100644 --- a/benchmarks/uc3-workload-generator/src/main/java/theodolite/uc3/workloadgenerator/LoadGenerator.java +++ b/benchmarks/uc3-workload-generator/src/main/java/theodolite/uc3/workloadgenerator/LoadGenerator.java @@ -10,10 +10,13 @@ public final class LoadGenerator { private static final Logger LOGGER = LoggerFactory.getLogger(LoadGenerator.class); - private LoadGenerator() {} + private LoadGenerator() { + throw new UnsupportedOperationException(); + } public static void main(final String[] args) { LOGGER.info("Start workload generator for use case UC3"); theodolite.commons.workloadgeneration.LoadGenerator.fromEnvironment().run(); } + } diff --git a/benchmarks/uc4-application/.settings/org.eclipse.jdt.ui.prefs b/benchmarks/uc4-application/.settings/org.eclipse.jdt.ui.prefs index 4e04e2891754324a6e1bf55348b6a38f592bb301..fa98ca63d77bdee891150bd6713f70197a75cefc 100644 --- a/benchmarks/uc4-application/.settings/org.eclipse.jdt.ui.prefs +++ b/benchmarks/uc4-application/.settings/org.eclipse.jdt.ui.prefs @@ -32,7 +32,7 @@ cleanup.qualify_static_member_accesses_with_declaring_class=true cleanup.qualify_static_method_accesses_with_declaring_class=false cleanup.remove_private_constructors=true cleanup.remove_redundant_modifiers=false -cleanup.remove_redundant_semicolons=false +cleanup.remove_redundant_semicolons=true cleanup.remove_redundant_type_arguments=true cleanup.remove_trailing_whitespaces=true cleanup.remove_trailing_whitespaces_all=true diff --git a/benchmarks/uc4-application/Dockerfile b/benchmarks/uc4-application/Dockerfile index 8cb65188ab9885af0dc4e243319969626cb74d62..add251c0ef11324830bcada9174fbbdecc18d532 100644 --- a/benchmarks/uc4-application/Dockerfile +++ b/benchmarks/uc4-application/Dockerfile @@ -1,8 +1,6 @@ FROM openjdk:11-slim - ADD build/distributions/uc4-application.tar / - -CMD JAVA_OPTS="$JAVA_OPTS -Dorg.slf4j.simpleLogger.defaultLogLevel=$LOG_LEVEL" \ +CMD JAVA_OPTS="$JAVA_OPTS -Dorg.slf4j.simpleLogger.defaultLogLevel=$LOG_LEVEL" \ /uc4-application/bin/uc4-application diff --git a/benchmarks/uc2-application/README.md b/benchmarks/uc4-application/README.md similarity index 100% rename from benchmarks/uc2-application/README.md rename to benchmarks/uc4-application/README.md diff --git a/benchmarks/uc4-application/build.gradle b/benchmarks/uc4-application/build.gradle index 56663022144166711d6bebce0f6480e358a738b5..9cb1b311d8f50769d371952db886e4a00a454591 100644 --- a/benchmarks/uc4-application/build.gradle +++ b/benchmarks/uc4-application/build.gradle @@ -1 +1 @@ -mainClassName = "theodolite.uc4.application.HistoryService" +mainClassName = "theodolite.uc4.application.AggregationService" diff --git a/benchmarks/uc2-application/src/main/java/theodolite/uc2/application/AggregationService.java b/benchmarks/uc4-application/src/main/java/theodolite/uc4/application/AggregationService.java similarity index 85% rename from benchmarks/uc2-application/src/main/java/theodolite/uc2/application/AggregationService.java rename to benchmarks/uc4-application/src/main/java/theodolite/uc4/application/AggregationService.java index 2f828278f5a3033c3e479bf82f3c8c5d9d4c380c..5c9d0910e7fbc60e58b13fc838f7ef2407de2aa3 100644 --- a/benchmarks/uc2-application/src/main/java/theodolite/uc2/application/AggregationService.java +++ b/benchmarks/uc4-application/src/main/java/theodolite/uc4/application/AggregationService.java @@ -1,11 +1,11 @@ -package theodolite.uc2.application; +package theodolite.uc4.application; import java.time.Duration; import java.util.concurrent.CompletableFuture; import org.apache.commons.configuration2.Configuration; import org.apache.kafka.streams.KafkaStreams; import theodolite.commons.kafkastreams.ConfigurationKeys; -import theodolite.uc2.streamprocessing.Uc2KafkaStreamsBuilder; +import theodolite.uc4.streamprocessing.Uc4KafkaStreamsBuilder; import titan.ccp.common.configuration.ServiceConfigurations; /** @@ -36,15 +36,15 @@ public class AggregationService { * @param clusterSession the database session which the application should use. */ private void createKafkaStreamsApplication() { - final Uc2KafkaStreamsBuilder uc2KafkaStreamsBuilder = new Uc2KafkaStreamsBuilder(this.config); - uc2KafkaStreamsBuilder + final Uc4KafkaStreamsBuilder uc4KafkaStreamsBuilder = new Uc4KafkaStreamsBuilder(this.config); + uc4KafkaStreamsBuilder .feedbackTopic(this.config.getString(ConfigurationKeys.KAFKA_FEEDBACK_TOPIC)) .outputTopic(this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC)) .configurationTopic(this.config.getString(ConfigurationKeys.KAFKA_CONFIGURATION_TOPIC)) .emitPeriod(Duration.ofMillis(this.config.getLong(ConfigurationKeys.EMIT_PERIOD_MS))) .gracePeriod(Duration.ofMillis(this.config.getLong(ConfigurationKeys.GRACE_PERIOD_MS))); - final KafkaStreams kafkaStreams = uc2KafkaStreamsBuilder.build(); + final KafkaStreams kafkaStreams = uc4KafkaStreamsBuilder.build(); this.stopEvent.thenRun(kafkaStreams::close); kafkaStreams.start(); diff --git a/benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/ChildParentsTransformer.java b/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/ChildParentsTransformer.java similarity index 99% rename from benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/ChildParentsTransformer.java rename to benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/ChildParentsTransformer.java index d4f9097ad0fa176842872e43f2f69a8616a65166..db28c86bce79caa4345a3a2bc7914c3e2bbd1a32 100644 --- a/benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/ChildParentsTransformer.java +++ b/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/ChildParentsTransformer.java @@ -1,4 +1,4 @@ -package theodolite.uc2.streamprocessing; +package theodolite.uc4.streamprocessing; import java.util.Map; import java.util.Optional; diff --git a/benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/ChildParentsTransformerSupplier.java b/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/ChildParentsTransformerSupplier.java similarity index 97% rename from benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/ChildParentsTransformerSupplier.java rename to benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/ChildParentsTransformerSupplier.java index 2b2d71c2f95d052cee19394e3e62e674776f8627..d17757d6800890eaf5260af9c25914344ca4a625 100644 --- a/benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/ChildParentsTransformerSupplier.java +++ b/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/ChildParentsTransformerSupplier.java @@ -1,4 +1,4 @@ -package theodolite.uc2.streamprocessing; +package theodolite.uc4.streamprocessing; import java.util.Map; import java.util.Optional; diff --git a/benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/JointFlatTransformer.java b/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/JointFlatTransformer.java similarity index 98% rename from benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/JointFlatTransformer.java rename to benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/JointFlatTransformer.java index 724c7f6e2eaebc7be53f03b89d143d885c4a055c..d3500adff664cba8f3f92707a0adba34534404b7 100644 --- a/benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/JointFlatTransformer.java +++ b/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/JointFlatTransformer.java @@ -1,4 +1,4 @@ -package theodolite.uc2.streamprocessing; +package theodolite.uc4.streamprocessing; import com.google.common.base.MoreObjects; import java.util.ArrayList; diff --git a/benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/JointFlatTransformerSupplier.java b/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/JointFlatTransformerSupplier.java similarity index 96% rename from benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/JointFlatTransformerSupplier.java rename to benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/JointFlatTransformerSupplier.java index 7d9a7df3d465260623abef2b13e9f3765925bc57..51c7ce1f6cb144c88356ef1b32bdfce400e1ffb4 100644 --- a/benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/JointFlatTransformerSupplier.java +++ b/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/JointFlatTransformerSupplier.java @@ -1,4 +1,4 @@ -package theodolite.uc2.streamprocessing; +package theodolite.uc4.streamprocessing; import java.util.Map; import java.util.Set; diff --git a/benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/JointRecordParents.java b/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/JointRecordParents.java similarity index 96% rename from benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/JointRecordParents.java rename to benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/JointRecordParents.java index cba05f1ed8e585d5c31aaa92207e0d2854436736..e9a5a824e43dfbab83151da5c2a8f18f9105f494 100644 --- a/benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/JointRecordParents.java +++ b/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/JointRecordParents.java @@ -1,4 +1,4 @@ -package theodolite.uc2.streamprocessing; +package theodolite.uc4.streamprocessing; import java.util.Objects; import java.util.Set; diff --git a/benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/OptionalParentsSerde.java b/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/OptionalParentsSerde.java similarity index 97% rename from benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/OptionalParentsSerde.java rename to benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/OptionalParentsSerde.java index 5cb8f1ed8fcc1cecff1eefa4922531555a78c25f..a1e9767da047951e04d4c3914c2d1b36bd18626b 100644 --- a/benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/OptionalParentsSerde.java +++ b/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/OptionalParentsSerde.java @@ -1,4 +1,4 @@ -package theodolite.uc2.streamprocessing; +package theodolite.uc4.streamprocessing; import java.util.HashSet; import java.util.Optional; diff --git a/benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/ParentsSerde.java b/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/ParentsSerde.java similarity index 96% rename from benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/ParentsSerde.java rename to benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/ParentsSerde.java index 266eaad015979a9e4ae748f7647ddcaf5947c78b..df6f848b5dfde10a96aceaf4d4a293364d52b982 100644 --- a/benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/ParentsSerde.java +++ b/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/ParentsSerde.java @@ -1,4 +1,4 @@ -package theodolite.uc2.streamprocessing; +package theodolite.uc4.streamprocessing; import java.util.HashSet; import java.util.Set; diff --git a/benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/RecordAggregator.java b/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/RecordAggregator.java similarity index 97% rename from benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/RecordAggregator.java rename to benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/RecordAggregator.java index 9564e994da8fc909147bec76097c737f14247868..34ef3762d6a3219958329762ce6e39844684068a 100644 --- a/benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/RecordAggregator.java +++ b/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/RecordAggregator.java @@ -1,4 +1,4 @@ -package theodolite.uc2.streamprocessing; +package theodolite.uc4.streamprocessing; import org.apache.kafka.streams.kstream.Windowed; import titan.ccp.model.records.ActivePowerRecord; diff --git a/benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/SensorParentKey.java b/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/SensorParentKey.java similarity index 96% rename from benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/SensorParentKey.java rename to benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/SensorParentKey.java index a4fb5b33966882b94d46c96282bdaaed92d67ebd..667cc6d5ee83a41f7c04fc8074a18ef1a9422b0e 100644 --- a/benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/SensorParentKey.java +++ b/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/SensorParentKey.java @@ -1,4 +1,4 @@ -package theodolite.uc2.streamprocessing; +package theodolite.uc4.streamprocessing; import java.util.Objects; diff --git a/benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/SensorParentKeySerde.java b/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/SensorParentKeySerde.java similarity index 95% rename from benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/SensorParentKeySerde.java rename to benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/SensorParentKeySerde.java index d6773c6159f1d04ddf1c3f36fd25447575befce8..63b9e44b5a7bde8f47fe7620b286aefa7fc60841 100644 --- a/benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/SensorParentKeySerde.java +++ b/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/SensorParentKeySerde.java @@ -1,4 +1,4 @@ -package theodolite.uc2.streamprocessing; +package theodolite.uc4.streamprocessing; import org.apache.kafka.common.serialization.Serde; import titan.ccp.common.kafka.simpleserdes.BufferSerde; diff --git a/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/TopologyBuilder.java b/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/TopologyBuilder.java index a0c87ba4702b9c3f191291a3f04679cc73fcb04b..623870313cd341d0594fee38d2fd0ae297abbeae 100644 --- a/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/TopologyBuilder.java +++ b/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/TopologyBuilder.java @@ -1,95 +1,206 @@ package theodolite.uc4.streamprocessing; -import com.google.common.math.Stats; import java.time.Duration; -import java.time.Instant; -import java.time.LocalDateTime; -import java.time.ZoneId; import java.util.Properties; -import org.apache.kafka.common.serialization.Serde; +import java.util.Set; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.Grouped; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.streams.kstream.Suppressed; +import org.apache.kafka.streams.kstream.Suppressed.BufferConfig; import org.apache.kafka.streams.kstream.TimeWindows; -import theodolite.uc4.streamprocessing.util.StatsFactory; -import titan.ccp.common.kafka.GenericSerde; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.WindowedSerdes; import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; +import titan.ccp.configuration.events.Event; +import titan.ccp.configuration.events.EventSerde; import titan.ccp.model.records.ActivePowerRecord; +import titan.ccp.model.records.AggregatedActivePowerRecord; +import titan.ccp.model.sensorregistry.SensorRegistry; /** * Builds Kafka Stream Topology for the History microservice. */ public class TopologyBuilder { - - // private static final Logger LOGGER = LoggerFactory.getLogger(TopologyBuilder.class); - - private final ZoneId zone = ZoneId.of("Europe/Paris"); // TODO as parameter - - + // Streams Variables private final String inputTopic; + private final String feedbackTopic; private final String outputTopic; + private final String configurationTopic; + private final Duration emitPeriod; + private final Duration gracePeriod; + + // Serdes private final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory; - private final Duration aggregtionDuration; - private final Duration aggregationAdvance; private final StreamsBuilder builder = new StreamsBuilder(); + private final RecordAggregator recordAggregator = new RecordAggregator(); /** * Create a new {@link TopologyBuilder} using the given topics. + * + * @param inputTopic The topic where to read sensor measurements from. + * @param configurationTopic The topic where the hierarchy of the sensors is published. + * @param feedbackTopic The topic where aggregation results are written to for feedback. + * @param outputTopic The topic where to publish aggregation results. + * @param emitPeriod The Duration results are emitted with. + * @param gracePeriod The Duration for how long late arriving records are considered. + * @param srAvroSerdeFactory Factory for creating avro SERDEs + * */ public TopologyBuilder(final String inputTopic, final String outputTopic, - final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory, - final Duration aggregtionDuration, final Duration aggregationAdvance) { + final String feedbackTopic, final String configurationTopic, + final Duration emitPeriod, final Duration gracePeriod, + final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory) { this.inputTopic = inputTopic; this.outputTopic = outputTopic; + this.feedbackTopic = feedbackTopic; + this.configurationTopic = configurationTopic; + this.emitPeriod = emitPeriod; + this.gracePeriod = gracePeriod; + this.srAvroSerdeFactory = srAvroSerdeFactory; - this.aggregtionDuration = aggregtionDuration; - this.aggregationAdvance = aggregationAdvance; } /** - * Build the {@link Topology} for the History microservice. + * Build the {@link Topology} for the Aggregation microservice. */ public Topology build(final Properties properties) { - final StatsKeyFactory<HourOfDayKey> keyFactory = new HourOfDayKeyFactory(); - final Serde<HourOfDayKey> keySerde = HourOfDayKeySerde.create(); - - this.builder - .stream(this.inputTopic, - Consumed.with(Serdes.String(), - this.srAvroSerdeFactory.<ActivePowerRecord>forValues())) - .selectKey((key, value) -> { - final Instant instant = Instant.ofEpochMilli(value.getTimestamp()); - final LocalDateTime dateTime = LocalDateTime.ofInstant(instant, this.zone); - return keyFactory.createKey(value.getIdentifier(), dateTime); - }) - .groupByKey( - Grouped.with(keySerde, this.srAvroSerdeFactory.forValues())) - .windowedBy(TimeWindows.of(this.aggregtionDuration).advanceBy(this.aggregationAdvance)) + // 1. Build Parent-Sensor Table + final KTable<String, Set<String>> parentSensorTable = this.buildParentSensorTable(); + + // 2. Build Input Table + final KTable<String, ActivePowerRecord> inputTable = this.buildInputTable(); + + // 3. Build Last Value Table from Input and Parent-Sensor Table + final KTable<Windowed<SensorParentKey>, ActivePowerRecord> lastValueTable = + this.buildLastValueTable(parentSensorTable, inputTable); + + // 4. Build Aggregations Stream + final KTable<Windowed<String>, AggregatedActivePowerRecord> aggregations = + this.buildAggregationStream(lastValueTable); + + // 6. Expose Feedback Stream + this.exposeFeedbackStream(aggregations); + + // 5. Expose Aggregations Stream + this.exposeOutputStream(aggregations); + + return this.builder.build(properties); + } + + private KTable<String, ActivePowerRecord> buildInputTable() { + final KStream<String, ActivePowerRecord> values = this.builder + .stream(this.inputTopic, Consumed.with( + Serdes.String(), + this.srAvroSerdeFactory.forValues())); + + final KStream<String, ActivePowerRecord> aggregationsInput = this.builder + .stream(this.feedbackTopic, Consumed.with( + Serdes.String(), + this.srAvroSerdeFactory.<AggregatedActivePowerRecord>forValues())) + .mapValues(r -> new ActivePowerRecord(r.getIdentifier(), r.getTimestamp(), r.getSumInW())); + + final KTable<String, ActivePowerRecord> inputTable = values + .merge(aggregationsInput) + .groupByKey(Grouped.with( + Serdes.String(), + this.srAvroSerdeFactory.forValues())) + .reduce((aggr, value) -> value, Materialized.with( + Serdes.String(), + this.srAvroSerdeFactory.forValues())); + return inputTable; + } + + private KTable<String, Set<String>> buildParentSensorTable() { + final KStream<Event, String> configurationStream = this.builder + .stream(this.configurationTopic, Consumed.with(EventSerde.serde(), Serdes.String())) + .filter((key, value) -> key == Event.SENSOR_REGISTRY_CHANGED + || key == Event.SENSOR_REGISTRY_STATUS); + + return configurationStream + .mapValues(data -> SensorRegistry.fromJson(data)) + .flatTransform(new ChildParentsTransformerSupplier()) + .groupByKey(Grouped.with(Serdes.String(), OptionalParentsSerde.serde())) .aggregate( - () -> Stats.of(), - (k, record, stats) -> StatsFactory.accumulate(stats, record.getValueInW()), - Materialized.with(keySerde, - GenericSerde.from(Stats::toByteArray, Stats::fromByteArray))) + () -> Set.<String>of(), + (key, newValue, oldValue) -> newValue.orElse(null), + Materialized.with(Serdes.String(), ParentsSerde.serde())); + } + + private KTable<Windowed<SensorParentKey>, ActivePowerRecord> buildLastValueTable( + final KTable<String, Set<String>> parentSensorTable, + final KTable<String, ActivePowerRecord> inputTable) { + + return inputTable + .join(parentSensorTable, (record, parents) -> new JointRecordParents(parents, record)) .toStream() - .map((key, stats) -> KeyValue.pair( - keyFactory.getSensorId(key.key()), - stats.toString())) - // TODO - // statsRecordFactory.create(key, value))) - // .peek((k, v) -> LOGGER.info("{}: {}", k, v)) // TODO Temp logging - .to( - this.outputTopic, - Produced.with( - Serdes.String(), - Serdes.String())); - // this.serdes.avroValues())); + .flatTransform(new JointFlatTransformerSupplier()) + .groupByKey(Grouped.with( + SensorParentKeySerde.serde(), + this.srAvroSerdeFactory.forValues())) + .windowedBy(TimeWindows.of(this.emitPeriod).grace(this.gracePeriod)) + .reduce( + // TODO Configurable window aggregation function + (oldVal, newVal) -> newVal.getTimestamp() >= oldVal.getTimestamp() ? newVal : oldVal, + Materialized.with( + SensorParentKeySerde.serde(), + this.srAvroSerdeFactory.forValues())); + } - return this.builder.build(properties); + private KTable<Windowed<String>, AggregatedActivePowerRecord> buildAggregationStream( + final KTable<Windowed<SensorParentKey>, ActivePowerRecord> lastValueTable) { + return lastValueTable + .groupBy( + (k, v) -> KeyValue.pair(new Windowed<>(k.key().getParent(), k.window()), v), + Grouped.with( + new WindowedSerdes.TimeWindowedSerde<>( + Serdes.String(), + this.emitPeriod.toMillis()), + this.srAvroSerdeFactory.forValues())) + .aggregate( + () -> null, + this.recordAggregator::add, + this.recordAggregator::substract, + Materialized.with( + new WindowedSerdes.TimeWindowedSerde<>( + Serdes.String(), + this.emitPeriod.toMillis()), + this.srAvroSerdeFactory.forValues())) + // TODO timestamp -1 indicates that this record is emitted by an substract event + .filter((k, record) -> record.getTimestamp() != -1); + } + + private void exposeFeedbackStream( + final KTable<Windowed<String>, AggregatedActivePowerRecord> aggregations) { + + aggregations + .toStream() + .filter((k, record) -> record != null) + .selectKey((k, v) -> k.key()) + .to(this.feedbackTopic, Produced.with( + Serdes.String(), + this.srAvroSerdeFactory.forValues())); + } + + private void exposeOutputStream( + final KTable<Windowed<String>, AggregatedActivePowerRecord> aggregations) { + + aggregations + // .suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded())) + .suppress(Suppressed.untilTimeLimit(this.emitPeriod, BufferConfig.unbounded())) + .toStream() + .filter((k, record) -> record != null) + .selectKey((k, v) -> k.key()) + .to(this.outputTopic, Produced.with( + Serdes.String(), + this.srAvroSerdeFactory.forValues())); } } diff --git a/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/Uc4KafkaStreamsBuilder.java b/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/Uc4KafkaStreamsBuilder.java index 67c652967194f59db560b8ad6fd86410725b3c9c..9f1af3ba066bcdfef7f8e9073947d570a1327515 100644 --- a/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/Uc4KafkaStreamsBuilder.java +++ b/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/Uc4KafkaStreamsBuilder.java @@ -11,44 +11,61 @@ import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; /** * Builder for the Kafka Streams configuration. */ -public class Uc4KafkaStreamsBuilder extends KafkaStreamsBuilder { +public class Uc4KafkaStreamsBuilder extends KafkaStreamsBuilder { // NOPMD builder method + private static final Duration EMIT_PERIOD_DEFAULT = Duration.ofSeconds(1); + private static final Duration GRACE_PERIOD_DEFAULT = Duration.ZERO; + + private String feedbackTopic; // NOPMD private String outputTopic; // NOPMD - private Duration aggregtionDuration; // NOPMD - private Duration aggregationAdvance; // NOPMD + private String configurationTopic; // NOPMD + private Duration emitPeriod; // NOPMD + private Duration gracePeriod; // NOPMD public Uc4KafkaStreamsBuilder(final Configuration config) { super(config); } + public Uc4KafkaStreamsBuilder feedbackTopic(final String feedbackTopic) { + this.feedbackTopic = feedbackTopic; + return this; + } + public Uc4KafkaStreamsBuilder outputTopic(final String outputTopic) { this.outputTopic = outputTopic; return this; } - public Uc4KafkaStreamsBuilder aggregtionDuration(final Duration aggregtionDuration) { - this.aggregtionDuration = aggregtionDuration; + public Uc4KafkaStreamsBuilder configurationTopic(final String configurationTopic) { + this.configurationTopic = configurationTopic; + return this; + } + + public Uc4KafkaStreamsBuilder emitPeriod(final Duration emitPeriod) { + this.emitPeriod = Objects.requireNonNull(emitPeriod); return this; } - public Uc4KafkaStreamsBuilder aggregationAdvance(final Duration aggregationAdvance) { - this.aggregationAdvance = aggregationAdvance; + public Uc4KafkaStreamsBuilder gracePeriod(final Duration gracePeriod) { + this.gracePeriod = Objects.requireNonNull(gracePeriod); return this; } @Override protected Topology buildTopology(final Properties properties) { Objects.requireNonNull(this.inputTopic, "Input topic has not been set."); + Objects.requireNonNull(this.feedbackTopic, "Feedback topic has not been set."); Objects.requireNonNull(this.outputTopic, "Output topic has not been set."); - Objects.requireNonNull(this.aggregtionDuration, "Aggregation duration has not been set."); - Objects.requireNonNull(this.aggregationAdvance, "Aggregation advance period has not been set."); + Objects.requireNonNull(this.configurationTopic, "Configuration topic has not been set."); final TopologyBuilder topologyBuilder = new TopologyBuilder( this.inputTopic, this.outputTopic, - new SchemaRegistryAvroSerdeFactory(this.schemaRegistryUrl), - this.aggregtionDuration, - this.aggregationAdvance); + this.feedbackTopic, + this.configurationTopic, + this.emitPeriod == null ? EMIT_PERIOD_DEFAULT : this.emitPeriod, + this.gracePeriod == null ? GRACE_PERIOD_DEFAULT : this.gracePeriod, + new SchemaRegistryAvroSerdeFactory(this.schemaRegistryUrl)); return topologyBuilder.build(properties); } diff --git a/benchmarks/uc4-application/src/main/resources/META-INF/application.properties b/benchmarks/uc4-application/src/main/resources/META-INF/application.properties index b46681533e63bf86a51439778a46940da348559d..ce06091076e6ff7f9ede355c7f54c12b3d872119 100644 --- a/benchmarks/uc4-application/src/main/resources/META-INF/application.properties +++ b/benchmarks/uc4-application/src/main/resources/META-INF/application.properties @@ -3,8 +3,11 @@ application.version=0.0.1 kafka.bootstrap.servers=localhost:9092 kafka.input.topic=input +kafka.configuration.topic=configuration +kafka.feedback.topic=aggregation-feedback kafka.output.topic=output -aggregation.duration.days=30 -aggregation.advance.days=1 schema.registry.url=http://localhost:8091 + +emit.period.ms=5000 +grace.period.ms=0 \ No newline at end of file diff --git a/benchmarks/uc2-application/src/test/java/theodolite/uc2/streamprocessing/OptionalParentsSerdeTest.java b/benchmarks/uc4-application/src/test/java/theodolite/uc4/streamprocessing/OptionalParentsSerdeTest.java similarity index 95% rename from benchmarks/uc2-application/src/test/java/theodolite/uc2/streamprocessing/OptionalParentsSerdeTest.java rename to benchmarks/uc4-application/src/test/java/theodolite/uc4/streamprocessing/OptionalParentsSerdeTest.java index 54e8c460e642d53bb013ef6888570d6fc36ff614..600fc0b15ccc3ac3d902565fba1d073e37d98d0f 100644 --- a/benchmarks/uc2-application/src/test/java/theodolite/uc2/streamprocessing/OptionalParentsSerdeTest.java +++ b/benchmarks/uc4-application/src/test/java/theodolite/uc4/streamprocessing/OptionalParentsSerdeTest.java @@ -1,4 +1,4 @@ -package theodolite.uc2.streamprocessing; +package theodolite.uc4.streamprocessing; import java.util.Optional; import java.util.Set; diff --git a/benchmarks/uc2-application/src/test/java/theodolite/uc2/streamprocessing/ParentsSerdeTest.java b/benchmarks/uc4-application/src/test/java/theodolite/uc4/streamprocessing/ParentsSerdeTest.java similarity index 91% rename from benchmarks/uc2-application/src/test/java/theodolite/uc2/streamprocessing/ParentsSerdeTest.java rename to benchmarks/uc4-application/src/test/java/theodolite/uc4/streamprocessing/ParentsSerdeTest.java index f12604d6a19ca36e9c151210005c910b37908307..994593e27914af2ad56693e4b08b8143b27000b7 100644 --- a/benchmarks/uc2-application/src/test/java/theodolite/uc2/streamprocessing/ParentsSerdeTest.java +++ b/benchmarks/uc4-application/src/test/java/theodolite/uc4/streamprocessing/ParentsSerdeTest.java @@ -1,4 +1,4 @@ -package theodolite.uc2.streamprocessing; +package theodolite.uc4.streamprocessing; import java.util.Set; import org.junit.Test; diff --git a/benchmarks/uc2-application/src/test/java/theodolite/uc2/streamprocessing/SensorParentKeySerdeTest.java b/benchmarks/uc4-application/src/test/java/theodolite/uc4/streamprocessing/SensorParentKeySerdeTest.java similarity index 92% rename from benchmarks/uc2-application/src/test/java/theodolite/uc2/streamprocessing/SensorParentKeySerdeTest.java rename to benchmarks/uc4-application/src/test/java/theodolite/uc4/streamprocessing/SensorParentKeySerdeTest.java index 7ca99bcb79baeb5f95a8270b99a559f2f108867e..34f87fa98ca7de7d6ca24a49a73729e5ecc2e74b 100644 --- a/benchmarks/uc2-application/src/test/java/theodolite/uc2/streamprocessing/SensorParentKeySerdeTest.java +++ b/benchmarks/uc4-application/src/test/java/theodolite/uc4/streamprocessing/SensorParentKeySerdeTest.java @@ -1,4 +1,4 @@ -package theodolite.uc2.streamprocessing; +package theodolite.uc4.streamprocessing; import org.junit.Test; diff --git a/benchmarks/uc2-application/src/test/java/theodolite/uc2/streamprocessing/SerdeTester.java b/benchmarks/uc4-application/src/test/java/theodolite/uc4/streamprocessing/SerdeTester.java similarity index 94% rename from benchmarks/uc2-application/src/test/java/theodolite/uc2/streamprocessing/SerdeTester.java rename to benchmarks/uc4-application/src/test/java/theodolite/uc4/streamprocessing/SerdeTester.java index 8e9f5a3608e5bae032c6e79b7cd059a0776987c2..b5d5f942dac068379fe90a7462545adb7a11e7df 100644 --- a/benchmarks/uc2-application/src/test/java/theodolite/uc2/streamprocessing/SerdeTester.java +++ b/benchmarks/uc4-application/src/test/java/theodolite/uc4/streamprocessing/SerdeTester.java @@ -1,4 +1,4 @@ -package theodolite.uc2.streamprocessing; +package theodolite.uc4.streamprocessing; import static org.junit.Assert.assertEquals; import java.util.function.Function; diff --git a/benchmarks/uc2-application/src/test/java/theodolite/uc2/streamprocessing/SerdeTesterFactory.java b/benchmarks/uc4-application/src/test/java/theodolite/uc4/streamprocessing/SerdeTesterFactory.java similarity index 94% rename from benchmarks/uc2-application/src/test/java/theodolite/uc2/streamprocessing/SerdeTesterFactory.java rename to benchmarks/uc4-application/src/test/java/theodolite/uc4/streamprocessing/SerdeTesterFactory.java index 5cdbfc60574bfc924423516f80ec61850853bcff..e8083ed778c450ef6717ca7b9c73daa3d96a7af3 100644 --- a/benchmarks/uc2-application/src/test/java/theodolite/uc2/streamprocessing/SerdeTesterFactory.java +++ b/benchmarks/uc4-application/src/test/java/theodolite/uc4/streamprocessing/SerdeTesterFactory.java @@ -1,4 +1,4 @@ -package theodolite.uc2.streamprocessing; +package theodolite.uc4.streamprocessing; import org.apache.kafka.common.serialization.Serde; diff --git a/benchmarks/uc4-workload-generator/Dockerfile b/benchmarks/uc4-workload-generator/Dockerfile index 8f077637acb82e23ee69a8df749baeb72b3098af..f39923e59d3079d3b163ffc5d2e4906599de026d 100644 --- a/benchmarks/uc4-workload-generator/Dockerfile +++ b/benchmarks/uc4-workload-generator/Dockerfile @@ -2,5 +2,5 @@ FROM openjdk:11-slim ADD build/distributions/uc4-workload-generator.tar / -CMD JAVA_OPTS="$JAVA_OPTS -Dorg.slf4j.simpleLogger.defaultLogLevel=$LOG_LEVEL" \ +CMD JAVA_OPTS="$JAVA_OPTS -Dorg.slf4j.simpleLogger.defaultLogLevel=$LOG_LEVEL" \ /uc4-workload-generator/bin/uc4-workload-generator diff --git a/benchmarks/uc4-workload-generator/build.gradle b/benchmarks/uc4-workload-generator/build.gradle index 76bbce013b67bab325bac06c1986693da3028f0c..8865ec9391213f3d8c52be2366573dee09652087 100644 --- a/benchmarks/uc4-workload-generator/build.gradle +++ b/benchmarks/uc4-workload-generator/build.gradle @@ -1 +1 @@ -mainClassName = "theodolite.uc4.workloadgenerator.LoadGenerator" +mainClassName = "theodolite.uc4.workloadgenerator.LoadGenerator" diff --git a/benchmarks/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/ConfigPublisher.java b/benchmarks/uc4-workload-generator/src/main/java/theodolite/uc4/workloadgenerator/ConfigPublisher.java similarity index 98% rename from benchmarks/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/ConfigPublisher.java rename to benchmarks/uc4-workload-generator/src/main/java/theodolite/uc4/workloadgenerator/ConfigPublisher.java index ad24e8e4bc8f86b7ed4d5dc2822622f8da22d6d1..ad0ee7082da9116f9ccb66a79d48b36bfb30da2e 100644 --- a/benchmarks/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/ConfigPublisher.java +++ b/benchmarks/uc4-workload-generator/src/main/java/theodolite/uc4/workloadgenerator/ConfigPublisher.java @@ -1,4 +1,4 @@ -package theodolite.uc2.workloadgenerator; +package theodolite.uc4.workloadgenerator; import java.util.Properties; import java.util.concurrent.ExecutionException; diff --git a/benchmarks/uc4-workload-generator/src/main/java/theodolite/uc4/workloadgenerator/LoadGenerator.java b/benchmarks/uc4-workload-generator/src/main/java/theodolite/uc4/workloadgenerator/LoadGenerator.java index c0d885ed1730e0d658a7d176d21d7c57529c55b0..8320d16b98fa1d253064d08397d5df1bb8e17b79 100644 --- a/benchmarks/uc4-workload-generator/src/main/java/theodolite/uc4/workloadgenerator/LoadGenerator.java +++ b/benchmarks/uc4-workload-generator/src/main/java/theodolite/uc4/workloadgenerator/LoadGenerator.java @@ -1,22 +1,65 @@ package theodolite.uc4.workloadgenerator; +import java.util.Objects; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import theodolite.commons.workloadgeneration.KeySpace; +import titan.ccp.configuration.events.Event; +import titan.ccp.model.sensorregistry.SensorRegistry; /** * Load generator for Theodolite use case UC4. */ public final class LoadGenerator { + private static final int SLEEP_PERIOD = 30_000; + private static final Logger LOGGER = LoggerFactory.getLogger(LoadGenerator.class); - private LoadGenerator() { - throw new UnsupportedOperationException(); - } + private LoadGenerator() {} + /** + * Start load generator. + */ public static void main(final String[] args) { + final boolean sendRegistry = Boolean.parseBoolean(Objects.requireNonNullElse( + System.getenv("SEND_REGISTRY"), + "true")); + final String kafkaBootstrapServers = Objects.requireNonNullElse( + System.getenv("KAFKA_BOOTSTRAP_SERVERS"), + "localhost:9092"); + final int numSensors = Integer.parseInt(Objects.requireNonNullElse( + System.getenv("NUM_SENSORS"), + "1")); + final int numNestedGroups = Integer.parseInt(Objects.requireNonNullElse( + System.getenv("NUM_NESTED_GROUPS"), + "1")); + + // Build sensor hierarchy + final SensorRegistry sensorRegistry = + new SensorRegistryBuilder(numNestedGroups, numSensors).build(); + LOGGER.info("Start workload generator for use case UC4"); - theodolite.commons.workloadgeneration.LoadGenerator.fromEnvironment().run(); + theodolite.commons.workloadgeneration.LoadGenerator.fromEnvironment() + .withKeySpace(new KeySpace("s_", sensorRegistry.getMachineSensors().size())) + .withBeforeAction(() -> { + if (sendRegistry) { + final ConfigPublisher configPublisher = + new ConfigPublisher(kafkaBootstrapServers, "configuration"); + configPublisher.publish(Event.SENSOR_REGISTRY_CHANGED, sensorRegistry.toJson()); + configPublisher.close(); + LOGGER.info("Configuration sent."); + + LOGGER.info("Now wait 30 seconds..."); + try { + Thread.sleep(SLEEP_PERIOD); + } catch (final InterruptedException e) { + LOGGER.error(e.getMessage(), e); + } + LOGGER.info("...and start generating load."); + } + }) + .run(); } } diff --git a/benchmarks/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/SensorRegistryBuilder.java b/benchmarks/uc4-workload-generator/src/main/java/theodolite/uc4/workloadgenerator/SensorRegistryBuilder.java similarity index 97% rename from benchmarks/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/SensorRegistryBuilder.java rename to benchmarks/uc4-workload-generator/src/main/java/theodolite/uc4/workloadgenerator/SensorRegistryBuilder.java index 7c34ac89471386f4ddd508a304f2197602beab27..60303056a01466b908b73e51377427f5d8347441 100644 --- a/benchmarks/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/SensorRegistryBuilder.java +++ b/benchmarks/uc4-workload-generator/src/main/java/theodolite/uc4/workloadgenerator/SensorRegistryBuilder.java @@ -1,4 +1,4 @@ -package theodolite.uc2.workloadgenerator; +package theodolite.uc4.workloadgenerator; import titan.ccp.model.sensorregistry.MutableAggregatedSensor; import titan.ccp.model.sensorregistry.MutableSensorRegistry; diff --git a/benchmarks/uc2-workload-generator/src/main/resources/META-INF/application.properties b/benchmarks/uc4-workload-generator/src/main/resources/META-INF/application.properties similarity index 100% rename from benchmarks/uc2-workload-generator/src/main/resources/META-INF/application.properties rename to benchmarks/uc4-workload-generator/src/main/resources/META-INF/application.properties diff --git a/benchmarks/uc2-workload-generator/src/test/java/theodolite/uc2/workloadgenerator/SensorRegistryBuilderTest.java b/benchmarks/uc4-workload-generator/src/test/java/theodolite/uc4/workloadgenerator/SensorRegistryBuilderTest.java similarity index 97% rename from benchmarks/uc2-workload-generator/src/test/java/theodolite/uc2/workloadgenerator/SensorRegistryBuilderTest.java rename to benchmarks/uc4-workload-generator/src/test/java/theodolite/uc4/workloadgenerator/SensorRegistryBuilderTest.java index 17b208edac4acafa92b7a75e053e2fe97a9afdb6..424c84ec96cdd90077fb7934686cd021b040e732 100644 --- a/benchmarks/uc2-workload-generator/src/test/java/theodolite/uc2/workloadgenerator/SensorRegistryBuilderTest.java +++ b/benchmarks/uc4-workload-generator/src/test/java/theodolite/uc4/workloadgenerator/SensorRegistryBuilderTest.java @@ -1,4 +1,4 @@ -package theodolite.uc2.workloadgenerator; +package theodolite.uc4.workloadgenerator; import java.util.Collection;