diff --git a/benchmarks/application-kafkastreams-commons/src/main/java/theodolite/commons/kafkastreams/KafkaStreamsBuilder.java b/benchmarks/application-kafkastreams-commons/src/main/java/theodolite/commons/kafkastreams/KafkaStreamsBuilder.java index ef1ece3549b1aabf60a4ff5b15028b7e50288cd9..89bd3147f0d3bb7a5fecc5d8c7d277bd294494ad 100644 --- a/benchmarks/application-kafkastreams-commons/src/main/java/theodolite/commons/kafkastreams/KafkaStreamsBuilder.java +++ b/benchmarks/application-kafkastreams-commons/src/main/java/theodolite/commons/kafkastreams/KafkaStreamsBuilder.java @@ -110,14 +110,15 @@ public abstract class KafkaStreamsBuilder { * * @return A {@code Topology} for a {@code KafkaStreams} application. */ - protected abstract Topology buildTopology(); + protected abstract Topology buildTopology(Properties properties); /** * Builds the {@link KafkaStreams} instance. */ public KafkaStreams build() { // Create the Kafka streams instance. - return new KafkaStreams(this.buildTopology(), this.buildProperties()); + final Properties properties = this.buildProperties(); + return new KafkaStreams(this.buildTopology(properties), properties); } } diff --git a/benchmarks/uc1-application/src/main/java/theodolite/uc1/streamprocessing/TopologyBuilder.java b/benchmarks/uc1-application/src/main/java/theodolite/uc1/streamprocessing/TopologyBuilder.java index 1c30e0c2c83b3d8a2f3dca4df0c7aec99cc4f450..75c833aa722654395b1adc6f739395eea5256820 100644 --- a/benchmarks/uc1-application/src/main/java/theodolite/uc1/streamprocessing/TopologyBuilder.java +++ b/benchmarks/uc1-application/src/main/java/theodolite/uc1/streamprocessing/TopologyBuilder.java @@ -1,6 +1,7 @@ package theodolite.uc1.streamprocessing; import com.google.gson.Gson; +import java.util.Properties; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.Topology; @@ -36,7 +37,7 @@ public class TopologyBuilder { /** * Build the {@link Topology} for the History microservice. */ - public Topology build() { + public Topology build(final Properties properties) { this.builder .stream(this.inputTopic, Consumed.with( Serdes.String(), @@ -44,6 +45,6 @@ public class TopologyBuilder { .mapValues(v -> this.gson.toJson(v)) .foreach((k, v) -> LOGGER.info("Key: " + k + " Value: " + v)); - return this.builder.build(); + return this.builder.build(properties); } } diff --git a/benchmarks/uc1-application/src/main/java/theodolite/uc1/streamprocessing/Uc1KafkaStreamsBuilder.java b/benchmarks/uc1-application/src/main/java/theodolite/uc1/streamprocessing/Uc1KafkaStreamsBuilder.java index 14335282863bff5a170716b228ea363e3d739685..cc39bb04623c06a4d41cb2c695804ed41818a67c 100644 --- a/benchmarks/uc1-application/src/main/java/theodolite/uc1/streamprocessing/Uc1KafkaStreamsBuilder.java +++ b/benchmarks/uc1-application/src/main/java/theodolite/uc1/streamprocessing/Uc1KafkaStreamsBuilder.java @@ -1,6 +1,7 @@ package theodolite.uc1.streamprocessing; import java.util.Objects; +import java.util.Properties; import org.apache.commons.configuration2.Configuration; import org.apache.kafka.streams.Topology; import theodolite.commons.kafkastreams.KafkaStreamsBuilder; @@ -16,9 +17,9 @@ public class Uc1KafkaStreamsBuilder extends KafkaStreamsBuilder { } @Override - protected Topology buildTopology() { + protected Topology buildTopology(final Properties properties) { Objects.requireNonNull(this.inputTopic, "Input topic has not been set."); return new TopologyBuilder(this.inputTopic, - new SchemaRegistryAvroSerdeFactory(this.schemaRegistryUrl)).build(); + new SchemaRegistryAvroSerdeFactory(this.schemaRegistryUrl)).build(properties); } } diff --git a/benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/TopologyBuilder.java b/benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/TopologyBuilder.java index 6a254d9b75ae3d9d39cf9dd887f6d4fccb6119c4..3dba062b31549712f4df2dc84f3a4f71572999d8 100644 --- a/benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/TopologyBuilder.java +++ b/benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/TopologyBuilder.java @@ -1,6 +1,7 @@ package theodolite.uc2.streamprocessing; import java.time.Duration; +import java.util.Properties; import java.util.Set; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KeyValue; @@ -71,7 +72,7 @@ public class TopologyBuilder { /** * Build the {@link Topology} for the Aggregation microservice. */ - public Topology build() { + public Topology build(final Properties properties) { // 1. Build Parent-Sensor Table final KTable<String, Set<String>> parentSensorTable = this.buildParentSensorTable(); @@ -92,7 +93,7 @@ public class TopologyBuilder { // 5. Expose Aggregations Stream this.exposeOutputStream(aggregations); - return this.builder.build(); + return this.builder.build(properties); } private KTable<String, ActivePowerRecord> buildInputTable() { diff --git a/benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/Uc2KafkaStreamsBuilder.java b/benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/Uc2KafkaStreamsBuilder.java index 1a606ee3df5e6ac2f43b650afe4a9aed036df9cd..78f73268e0ed7a30d5fe6105bab7d4b05897527f 100644 --- a/benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/Uc2KafkaStreamsBuilder.java +++ b/benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/Uc2KafkaStreamsBuilder.java @@ -2,6 +2,7 @@ package theodolite.uc2.streamprocessing; import java.time.Duration; import java.util.Objects; +import java.util.Properties; import org.apache.commons.configuration2.Configuration; import org.apache.kafka.streams.Topology; import theodolite.commons.kafkastreams.KafkaStreamsBuilder; @@ -51,7 +52,7 @@ public class Uc2KafkaStreamsBuilder extends KafkaStreamsBuilder { // NOPMD build } @Override - protected Topology buildTopology() { + protected Topology buildTopology(final Properties properties) { Objects.requireNonNull(this.inputTopic, "Input topic has not been set."); Objects.requireNonNull(this.feedbackTopic, "Feedback topic has not been set."); Objects.requireNonNull(this.outputTopic, "Output topic has not been set."); @@ -66,7 +67,7 @@ public class Uc2KafkaStreamsBuilder extends KafkaStreamsBuilder { // NOPMD build this.gracePeriod == null ? GRACE_PERIOD_DEFAULT : this.gracePeriod, new SchemaRegistryAvroSerdeFactory(this.schemaRegistryUrl)); - return topologyBuilder.build(); + return topologyBuilder.build(properties); } } diff --git a/benchmarks/uc3-application/src/main/java/theodolite/uc3/streamprocessing/TopologyBuilder.java b/benchmarks/uc3-application/src/main/java/theodolite/uc3/streamprocessing/TopologyBuilder.java index 74eed74c52a78df229c02542bc6e66d7f796c2c7..d6d6d4ffb7ebb1236be73dd681c900311853e732 100644 --- a/benchmarks/uc3-application/src/main/java/theodolite/uc3/streamprocessing/TopologyBuilder.java +++ b/benchmarks/uc3-application/src/main/java/theodolite/uc3/streamprocessing/TopologyBuilder.java @@ -2,6 +2,7 @@ package theodolite.uc3.streamprocessing; import com.google.common.math.Stats; import java.time.Duration; +import java.util.Properties; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; @@ -46,7 +47,7 @@ public class TopologyBuilder { /** * Build the {@link Topology} for the History microservice. */ - public Topology build() { + public Topology build(final Properties properties) { this.builder .stream(this.inputTopic, Consumed.with(Serdes.String(), @@ -68,6 +69,6 @@ public class TopologyBuilder { .peek((k, v) -> LOGGER.info(k + ": " + v)) .to(this.outputTopic, Produced.with(Serdes.String(), Serdes.String())); - return this.builder.build(); + return this.builder.build(properties); } } diff --git a/benchmarks/uc3-application/src/main/java/theodolite/uc3/streamprocessing/Uc3KafkaStreamsBuilder.java b/benchmarks/uc3-application/src/main/java/theodolite/uc3/streamprocessing/Uc3KafkaStreamsBuilder.java index 9ab4ea0a96c663af09008bd5358066ca3f8520ac..70113271a9d3c23499b85c07bf9d0a76db59f820 100644 --- a/benchmarks/uc3-application/src/main/java/theodolite/uc3/streamprocessing/Uc3KafkaStreamsBuilder.java +++ b/benchmarks/uc3-application/src/main/java/theodolite/uc3/streamprocessing/Uc3KafkaStreamsBuilder.java @@ -2,6 +2,7 @@ package theodolite.uc3.streamprocessing; import java.time.Duration; import java.util.Objects; +import java.util.Properties; import org.apache.commons.configuration2.Configuration; import org.apache.kafka.streams.Topology; import theodolite.commons.kafkastreams.KafkaStreamsBuilder; @@ -30,14 +31,14 @@ public class Uc3KafkaStreamsBuilder extends KafkaStreamsBuilder { } @Override - protected Topology buildTopology() { + protected Topology buildTopology(final Properties properties) { Objects.requireNonNull(this.inputTopic, "Input topic has not been set."); Objects.requireNonNull(this.outputTopic, "Output topic has not been set."); Objects.requireNonNull(this.windowDuration, "Window duration has not been set."); final TopologyBuilder topologyBuilder = new TopologyBuilder(this.inputTopic, this.outputTopic, new SchemaRegistryAvroSerdeFactory(this.schemaRegistryUrl), this.windowDuration); - return topologyBuilder.build(); + return topologyBuilder.build(properties); } } diff --git a/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/TopologyBuilder.java b/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/TopologyBuilder.java index a92abae6e11c4bf66a5d8d8dee0f10b088e8274b..a0c87ba4702b9c3f191291a3f04679cc73fcb04b 100644 --- a/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/TopologyBuilder.java +++ b/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/TopologyBuilder.java @@ -5,6 +5,7 @@ import java.time.Duration; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneId; +import java.util.Properties; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KeyValue; @@ -54,7 +55,7 @@ public class TopologyBuilder { /** * Build the {@link Topology} for the History microservice. */ - public Topology build() { + public Topology build(final Properties properties) { final StatsKeyFactory<HourOfDayKey> keyFactory = new HourOfDayKeyFactory(); final Serde<HourOfDayKey> keySerde = HourOfDayKeySerde.create(); @@ -89,6 +90,6 @@ public class TopologyBuilder { Serdes.String())); // this.serdes.avroValues())); - return this.builder.build(); + return this.builder.build(properties); } } diff --git a/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/Uc4KafkaStreamsBuilder.java b/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/Uc4KafkaStreamsBuilder.java index bbbb043119857612b1a8b0c60e3a5466cd68447e..67c652967194f59db560b8ad6fd86410725b3c9c 100644 --- a/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/Uc4KafkaStreamsBuilder.java +++ b/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/Uc4KafkaStreamsBuilder.java @@ -2,6 +2,7 @@ package theodolite.uc4.streamprocessing; import java.time.Duration; import java.util.Objects; +import java.util.Properties; import org.apache.commons.configuration2.Configuration; import org.apache.kafka.streams.Topology; import theodolite.commons.kafkastreams.KafkaStreamsBuilder; @@ -36,7 +37,7 @@ public class Uc4KafkaStreamsBuilder extends KafkaStreamsBuilder { } @Override - protected Topology buildTopology() { + protected Topology buildTopology(final Properties properties) { Objects.requireNonNull(this.inputTopic, "Input topic has not been set."); Objects.requireNonNull(this.outputTopic, "Output topic has not been set."); Objects.requireNonNull(this.aggregtionDuration, "Aggregation duration has not been set."); @@ -49,7 +50,7 @@ public class Uc4KafkaStreamsBuilder extends KafkaStreamsBuilder { this.aggregtionDuration, this.aggregationAdvance); - return topologyBuilder.build(); + return topologyBuilder.build(properties); } }