From c0a1dba3c4cc4ec83ee71b7e4ab0690235232810 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Bj=C3=B6rn=20Vonheiden?= <bjoern.vonheiden@hotmail.de>
Date: Fri, 18 Dec 2020 19:35:20 +0100
Subject: [PATCH] enable topic optimization

---
 .../theodolite/commons/kafkastreams/KafkaStreamsBuilder.java | 5 +++--
 .../theodolite/uc1/streamprocessing/TopologyBuilder.java     | 5 +++--
 .../uc1/streamprocessing/Uc1KafkaStreamsBuilder.java         | 5 +++--
 .../theodolite/uc2/streamprocessing/TopologyBuilder.java     | 5 +++--
 .../uc2/streamprocessing/Uc2KafkaStreamsBuilder.java         | 5 +++--
 .../theodolite/uc3/streamprocessing/TopologyBuilder.java     | 5 +++--
 .../uc3/streamprocessing/Uc3KafkaStreamsBuilder.java         | 5 +++--
 .../theodolite/uc4/streamprocessing/TopologyBuilder.java     | 5 +++--
 .../uc4/streamprocessing/Uc4KafkaStreamsBuilder.java         | 5 +++--
 9 files changed, 27 insertions(+), 18 deletions(-)

diff --git a/benchmarks/application-kafkastreams-commons/src/main/java/theodolite/commons/kafkastreams/KafkaStreamsBuilder.java b/benchmarks/application-kafkastreams-commons/src/main/java/theodolite/commons/kafkastreams/KafkaStreamsBuilder.java
index ef1ece354..89bd3147f 100644
--- a/benchmarks/application-kafkastreams-commons/src/main/java/theodolite/commons/kafkastreams/KafkaStreamsBuilder.java
+++ b/benchmarks/application-kafkastreams-commons/src/main/java/theodolite/commons/kafkastreams/KafkaStreamsBuilder.java
@@ -110,14 +110,15 @@ public abstract class KafkaStreamsBuilder {
    *
    * @return A {@code Topology} for a {@code KafkaStreams} application.
    */
-  protected abstract Topology buildTopology();
+  protected abstract Topology buildTopology(Properties properties);
 
   /**
    * Builds the {@link KafkaStreams} instance.
    */
   public KafkaStreams build() {
     // Create the Kafka streams instance.
-    return new KafkaStreams(this.buildTopology(), this.buildProperties());
+    final Properties properties = this.buildProperties();
+    return new KafkaStreams(this.buildTopology(properties), properties);
   }
 
 }
diff --git a/benchmarks/uc1-application/src/main/java/theodolite/uc1/streamprocessing/TopologyBuilder.java b/benchmarks/uc1-application/src/main/java/theodolite/uc1/streamprocessing/TopologyBuilder.java
index 1c30e0c2c..75c833aa7 100644
--- a/benchmarks/uc1-application/src/main/java/theodolite/uc1/streamprocessing/TopologyBuilder.java
+++ b/benchmarks/uc1-application/src/main/java/theodolite/uc1/streamprocessing/TopologyBuilder.java
@@ -1,6 +1,7 @@
 package theodolite.uc1.streamprocessing;
 
 import com.google.gson.Gson;
+import java.util.Properties;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.Topology;
@@ -36,7 +37,7 @@ public class TopologyBuilder {
   /**
    * Build the {@link Topology} for the History microservice.
    */
-  public Topology build() {
+  public Topology build(final Properties properties) {
     this.builder
         .stream(this.inputTopic, Consumed.with(
             Serdes.String(),
@@ -44,6 +45,6 @@ public class TopologyBuilder {
         .mapValues(v -> this.gson.toJson(v))
         .foreach((k, v) -> LOGGER.info("Key: " + k + " Value: " + v));
 
-    return this.builder.build();
+    return this.builder.build(properties);
   }
 }
diff --git a/benchmarks/uc1-application/src/main/java/theodolite/uc1/streamprocessing/Uc1KafkaStreamsBuilder.java b/benchmarks/uc1-application/src/main/java/theodolite/uc1/streamprocessing/Uc1KafkaStreamsBuilder.java
index 143352828..cc39bb046 100644
--- a/benchmarks/uc1-application/src/main/java/theodolite/uc1/streamprocessing/Uc1KafkaStreamsBuilder.java
+++ b/benchmarks/uc1-application/src/main/java/theodolite/uc1/streamprocessing/Uc1KafkaStreamsBuilder.java
@@ -1,6 +1,7 @@
 package theodolite.uc1.streamprocessing;
 
 import java.util.Objects;
+import java.util.Properties;
 import org.apache.commons.configuration2.Configuration;
 import org.apache.kafka.streams.Topology;
 import theodolite.commons.kafkastreams.KafkaStreamsBuilder;
@@ -16,9 +17,9 @@ public class Uc1KafkaStreamsBuilder extends KafkaStreamsBuilder {
   }
 
   @Override
-  protected Topology buildTopology() {
+  protected Topology buildTopology(final Properties properties) {
     Objects.requireNonNull(this.inputTopic, "Input topic has not been set.");
     return new TopologyBuilder(this.inputTopic,
-        new SchemaRegistryAvroSerdeFactory(this.schemaRegistryUrl)).build();
+        new SchemaRegistryAvroSerdeFactory(this.schemaRegistryUrl)).build(properties);
   }
 }
diff --git a/benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/TopologyBuilder.java b/benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/TopologyBuilder.java
index 6a254d9b7..3dba062b3 100644
--- a/benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/TopologyBuilder.java
+++ b/benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/TopologyBuilder.java
@@ -1,6 +1,7 @@
 package theodolite.uc2.streamprocessing;
 
 import java.time.Duration;
+import java.util.Properties;
 import java.util.Set;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KeyValue;
@@ -71,7 +72,7 @@ public class TopologyBuilder {
   /**
    * Build the {@link Topology} for the Aggregation microservice.
    */
-  public Topology build() {
+  public Topology build(final Properties properties) {
     // 1. Build Parent-Sensor Table
     final KTable<String, Set<String>> parentSensorTable = this.buildParentSensorTable();
 
@@ -92,7 +93,7 @@ public class TopologyBuilder {
     // 5. Expose Aggregations Stream
     this.exposeOutputStream(aggregations);
 
-    return this.builder.build();
+    return this.builder.build(properties);
   }
 
   private KTable<String, ActivePowerRecord> buildInputTable() {
diff --git a/benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/Uc2KafkaStreamsBuilder.java b/benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/Uc2KafkaStreamsBuilder.java
index 1a606ee3d..78f73268e 100644
--- a/benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/Uc2KafkaStreamsBuilder.java
+++ b/benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/Uc2KafkaStreamsBuilder.java
@@ -2,6 +2,7 @@ package theodolite.uc2.streamprocessing;
 
 import java.time.Duration;
 import java.util.Objects;
+import java.util.Properties;
 import org.apache.commons.configuration2.Configuration;
 import org.apache.kafka.streams.Topology;
 import theodolite.commons.kafkastreams.KafkaStreamsBuilder;
@@ -51,7 +52,7 @@ public class Uc2KafkaStreamsBuilder extends KafkaStreamsBuilder { // NOPMD build
   }
 
   @Override
-  protected Topology buildTopology() {
+  protected Topology buildTopology(final Properties properties) {
     Objects.requireNonNull(this.inputTopic, "Input topic has not been set.");
     Objects.requireNonNull(this.feedbackTopic, "Feedback topic has not been set.");
     Objects.requireNonNull(this.outputTopic, "Output topic has not been set.");
@@ -66,7 +67,7 @@ public class Uc2KafkaStreamsBuilder extends KafkaStreamsBuilder { // NOPMD build
         this.gracePeriod == null ? GRACE_PERIOD_DEFAULT : this.gracePeriod,
         new SchemaRegistryAvroSerdeFactory(this.schemaRegistryUrl));
 
-    return topologyBuilder.build();
+    return topologyBuilder.build(properties);
   }
 
 }
diff --git a/benchmarks/uc3-application/src/main/java/theodolite/uc3/streamprocessing/TopologyBuilder.java b/benchmarks/uc3-application/src/main/java/theodolite/uc3/streamprocessing/TopologyBuilder.java
index 74eed74c5..d6d6d4ffb 100644
--- a/benchmarks/uc3-application/src/main/java/theodolite/uc3/streamprocessing/TopologyBuilder.java
+++ b/benchmarks/uc3-application/src/main/java/theodolite/uc3/streamprocessing/TopologyBuilder.java
@@ -2,6 +2,7 @@ package theodolite.uc3.streamprocessing;
 
 import com.google.common.math.Stats;
 import java.time.Duration;
+import java.util.Properties;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
@@ -46,7 +47,7 @@ public class TopologyBuilder {
   /**
    * Build the {@link Topology} for the History microservice.
    */
-  public Topology build() {
+  public Topology build(final Properties properties) {
     this.builder
         .stream(this.inputTopic,
             Consumed.with(Serdes.String(),
@@ -68,6 +69,6 @@ public class TopologyBuilder {
         .peek((k, v) -> LOGGER.info(k + ": " + v))
         .to(this.outputTopic, Produced.with(Serdes.String(), Serdes.String()));
 
-    return this.builder.build();
+    return this.builder.build(properties);
   }
 }
diff --git a/benchmarks/uc3-application/src/main/java/theodolite/uc3/streamprocessing/Uc3KafkaStreamsBuilder.java b/benchmarks/uc3-application/src/main/java/theodolite/uc3/streamprocessing/Uc3KafkaStreamsBuilder.java
index 9ab4ea0a9..70113271a 100644
--- a/benchmarks/uc3-application/src/main/java/theodolite/uc3/streamprocessing/Uc3KafkaStreamsBuilder.java
+++ b/benchmarks/uc3-application/src/main/java/theodolite/uc3/streamprocessing/Uc3KafkaStreamsBuilder.java
@@ -2,6 +2,7 @@ package theodolite.uc3.streamprocessing;
 
 import java.time.Duration;
 import java.util.Objects;
+import java.util.Properties;
 import org.apache.commons.configuration2.Configuration;
 import org.apache.kafka.streams.Topology;
 import theodolite.commons.kafkastreams.KafkaStreamsBuilder;
@@ -30,14 +31,14 @@ public class Uc3KafkaStreamsBuilder extends KafkaStreamsBuilder {
   }
 
   @Override
-  protected Topology buildTopology() {
+  protected Topology buildTopology(final Properties properties) {
     Objects.requireNonNull(this.inputTopic, "Input topic has not been set.");
     Objects.requireNonNull(this.outputTopic, "Output topic has not been set.");
     Objects.requireNonNull(this.windowDuration, "Window duration has not been set.");
 
     final TopologyBuilder topologyBuilder = new TopologyBuilder(this.inputTopic, this.outputTopic,
         new SchemaRegistryAvroSerdeFactory(this.schemaRegistryUrl), this.windowDuration);
-    return topologyBuilder.build();
+    return topologyBuilder.build(properties);
   }
 
 }
diff --git a/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/TopologyBuilder.java b/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/TopologyBuilder.java
index a92abae6e..a0c87ba47 100644
--- a/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/TopologyBuilder.java
+++ b/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/TopologyBuilder.java
@@ -5,6 +5,7 @@ import java.time.Duration;
 import java.time.Instant;
 import java.time.LocalDateTime;
 import java.time.ZoneId;
+import java.util.Properties;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KeyValue;
@@ -54,7 +55,7 @@ public class TopologyBuilder {
   /**
    * Build the {@link Topology} for the History microservice.
    */
-  public Topology build() {
+  public Topology build(final Properties properties) {
     final StatsKeyFactory<HourOfDayKey> keyFactory = new HourOfDayKeyFactory();
     final Serde<HourOfDayKey> keySerde = HourOfDayKeySerde.create();
 
@@ -89,6 +90,6 @@ public class TopologyBuilder {
                 Serdes.String()));
     // this.serdes.avroValues()));
 
-    return this.builder.build();
+    return this.builder.build(properties);
   }
 }
diff --git a/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/Uc4KafkaStreamsBuilder.java b/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/Uc4KafkaStreamsBuilder.java
index bbbb04311..67c652967 100644
--- a/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/Uc4KafkaStreamsBuilder.java
+++ b/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/Uc4KafkaStreamsBuilder.java
@@ -2,6 +2,7 @@ package theodolite.uc4.streamprocessing;
 
 import java.time.Duration;
 import java.util.Objects;
+import java.util.Properties;
 import org.apache.commons.configuration2.Configuration;
 import org.apache.kafka.streams.Topology;
 import theodolite.commons.kafkastreams.KafkaStreamsBuilder;
@@ -36,7 +37,7 @@ public class Uc4KafkaStreamsBuilder extends KafkaStreamsBuilder {
   }
 
   @Override
-  protected Topology buildTopology() {
+  protected Topology buildTopology(final Properties properties) {
     Objects.requireNonNull(this.inputTopic, "Input topic has not been set.");
     Objects.requireNonNull(this.outputTopic, "Output topic has not been set.");
     Objects.requireNonNull(this.aggregtionDuration, "Aggregation duration has not been set.");
@@ -49,7 +50,7 @@ public class Uc4KafkaStreamsBuilder extends KafkaStreamsBuilder {
         this.aggregtionDuration,
         this.aggregationAdvance);
 
-    return topologyBuilder.build();
+    return topologyBuilder.build(properties);
   }
 
 }
-- 
GitLab