From 2b53d632a84b25a67ed130ecd55962a61841a629 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?S=C3=B6ren=20Henning?= <soeren.henning@email.uni-kiel.de>
Date: Sat, 26 Nov 2022 16:55:41 +0100
Subject: [PATCH] Align downsample intervals among engines

---
 .../benchmarks/commons/beam/ConfigurationKeys.java     |  2 +-
 .../resources/uc2-hazelcastjet-deployment.yaml         |  2 --
 .../docker-test/uc2-hazelcastjet/docker-compose.yml    |  1 -
 .../docker-test/uc2-kstreams/docker-compose.yml        |  1 -
 .../commons/hazelcastjet/ConfigurationKeys.java        |  2 +-
 .../benchmarks/commons/kstreams/ConfigurationKeys.java |  2 +-
 .../benchmarks/uc2/beam/PipelineFactory.java           |  6 +++---
 .../src/main/resources/META-INF/application.properties |  4 ++--
 .../benchmarks/uc2/flink/ConfigurationKeys.java        |  2 +-
 .../benchmarks/uc2/flink/HistoryServiceFlinkJob.java   |  5 ++---
 .../src/main/resources/META-INF/application.properties |  4 ++--
 .../benchmarks/uc2/hazelcastjet/HistoryService.java    | 10 ++++------
 .../src/main/resources/META-INF/application.properties |  3 +--
 .../benchmarks/uc2/kstreams/HistoryService.java        |  6 ++----
 .../src/main/resources/META-INF/application.properties |  4 ++--
 .../benchmarks/uc4/beam/PipelineFactory.java           |  2 +-
 16 files changed, 23 insertions(+), 33 deletions(-)

diff --git a/theodolite-benchmarks/beam-commons/src/main/java/rocks/theodolite/benchmarks/commons/beam/ConfigurationKeys.java b/theodolite-benchmarks/beam-commons/src/main/java/rocks/theodolite/benchmarks/commons/beam/ConfigurationKeys.java
index c22c164f6..a98aa89ff 100644
--- a/theodolite-benchmarks/beam-commons/src/main/java/rocks/theodolite/benchmarks/commons/beam/ConfigurationKeys.java
+++ b/theodolite-benchmarks/beam-commons/src/main/java/rocks/theodolite/benchmarks/commons/beam/ConfigurationKeys.java
@@ -21,7 +21,7 @@ public final class ConfigurationKeys {
   public static final String KAFKA_CONFIGURATION_TOPIC = "kafka.configuration.topic";
 
   // UC2
-  public static final String KAFKA_WINDOW_DURATION_MINUTES = "kafka.window.duration.minutes";
+  public static final String DOWNSAMPLE_INTERVAL_MINUTES = "downsample.interval.minutes";
 
   // UC3
   public static final String AGGREGATION_DURATION_DAYS = "aggregation.duration.days";
diff --git a/theodolite-benchmarks/definitions/uc2-hazelcastjet/resources/uc2-hazelcastjet-deployment.yaml b/theodolite-benchmarks/definitions/uc2-hazelcastjet/resources/uc2-hazelcastjet-deployment.yaml
index ae28c0709..0d9f73724 100644
--- a/theodolite-benchmarks/definitions/uc2-hazelcastjet/resources/uc2-hazelcastjet-deployment.yaml
+++ b/theodolite-benchmarks/definitions/uc2-hazelcastjet/resources/uc2-hazelcastjet-deployment.yaml
@@ -21,8 +21,6 @@ spec:
               value: "theodolite-kafka-kafka-bootstrap:9092"
             - name: SCHEMA_REGISTRY_URL
               value: "http://theodolite-kafka-schema-registry:8081"
-            - name: DOWNSAMPLE_INTERVAL
-              value: "5000"
             #- name: KUBERNETES_DNS_NAME
             #  value: "titan-ccp-aggregation"
             - name: KUBERNETES_NAMESPACE
diff --git a/theodolite-benchmarks/docker-test/uc2-hazelcastjet/docker-compose.yml b/theodolite-benchmarks/docker-test/uc2-hazelcastjet/docker-compose.yml
index 1c6d352dc..55b0c457e 100644
--- a/theodolite-benchmarks/docker-test/uc2-hazelcastjet/docker-compose.yml
+++ b/theodolite-benchmarks/docker-test/uc2-hazelcastjet/docker-compose.yml
@@ -50,7 +50,6 @@ services:
       BOOTSTRAP_SERVER: benchmark:5701
       KAFKA_BOOTSTRAP_SERVERS: kafka:9092
       SCHEMA_REGISTRY_URL: http://schema-registry:8081
-      KAFKA_WINDOW_DURATION_MINUTES: 60
   load-generator: 
     image: ghcr.io/cau-se/theodolite-uc2-workload-generator:${THEODOLITE_TAG:-latest}
     depends_on:
diff --git a/theodolite-benchmarks/docker-test/uc2-kstreams/docker-compose.yml b/theodolite-benchmarks/docker-test/uc2-kstreams/docker-compose.yml
index efdba90be..f378fe86a 100755
--- a/theodolite-benchmarks/docker-test/uc2-kstreams/docker-compose.yml
+++ b/theodolite-benchmarks/docker-test/uc2-kstreams/docker-compose.yml
@@ -45,7 +45,6 @@ services:
     environment:
       KAFKA_BOOTSTRAP_SERVERS: kafka:9092
       SCHEMA_REGISTRY_URL: http://schema-registry:8081
-      KAFKA_WINDOW_DURATION_MINUTES: 60
   load-generator: 
     image: ghcr.io/cau-se/theodolite-uc2-workload-generator:${THEODOLITE_TAG:-latest}
     depends_on:
diff --git a/theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/ConfigurationKeys.java b/theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/ConfigurationKeys.java
index c1f2e646f..44e82126d 100644
--- a/theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/ConfigurationKeys.java
+++ b/theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/ConfigurationKeys.java
@@ -21,7 +21,7 @@ public class ConfigurationKeys {
   public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic";
 
   // UC2
-  public static final String DOWNSAMPLE_INTERVAL = "kafka.window.duration.minutes";
+  public static final String DOWNSAMPLE_INTERVAL_MINUTES = "downsample.interval.minutes";
 
   // UC3
   public static final String AGGREGATION_DURATION_DAYS = "aggregation.duration.days";
diff --git a/theodolite-benchmarks/kstreams-commons/src/main/java/rocks/theodolite/benchmarks/commons/kstreams/ConfigurationKeys.java b/theodolite-benchmarks/kstreams-commons/src/main/java/rocks/theodolite/benchmarks/commons/kstreams/ConfigurationKeys.java
index ca74aa7d9..6f1f70e72 100644
--- a/theodolite-benchmarks/kstreams-commons/src/main/java/rocks/theodolite/benchmarks/commons/kstreams/ConfigurationKeys.java
+++ b/theodolite-benchmarks/kstreams-commons/src/main/java/rocks/theodolite/benchmarks/commons/kstreams/ConfigurationKeys.java
@@ -28,7 +28,7 @@ public final class ConfigurationKeys {
   public static final String GRACE_PERIOD_MS = "grace.period.ms";
 
   // UC3
-  public static final String KAFKA_WINDOW_DURATION_MINUTES = "kafka.window.duration.minutes";
+  public static final String DOWNSAMPLE_INTERVAL_MINUTES = "downsample.interval.minutes";
 
   // UC4
   public static final String AGGREGATION_DURATION_DAYS = "aggregation.duration.days";
diff --git a/theodolite-benchmarks/uc2-beam/src/main/java/rocks/theodolite/benchmarks/uc2/beam/PipelineFactory.java b/theodolite-benchmarks/uc2-beam/src/main/java/rocks/theodolite/benchmarks/uc2/beam/PipelineFactory.java
index decbcae1c..f80856265 100644
--- a/theodolite-benchmarks/uc2-beam/src/main/java/rocks/theodolite/benchmarks/uc2/beam/PipelineFactory.java
+++ b/theodolite-benchmarks/uc2-beam/src/main/java/rocks/theodolite/benchmarks/uc2/beam/PipelineFactory.java
@@ -42,8 +42,8 @@ public class PipelineFactory extends AbstractPipelineFactory {
   protected void constructPipeline(final Pipeline pipeline) {
     final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC);
 
-    final Duration duration = Duration.standardMinutes(
-        this.config.getInt(ConfigurationKeys.KAFKA_WINDOW_DURATION_MINUTES));
+    final Duration downsampleInterval = Duration.standardMinutes(
+        this.config.getInt(ConfigurationKeys.DOWNSAMPLE_INTERVAL_MINUTES));
 
     final KafkaActivePowerTimestampReader kafkaReader = super.buildKafkaReader();
 
@@ -58,7 +58,7 @@ public class PipelineFactory extends AbstractPipelineFactory {
     // Apply pipeline transformations
     pipeline.apply(kafkaReader)
         // Apply a fixed window
-        .apply(Window.<KV<String, ActivePowerRecord>>into(FixedWindows.of(duration)))
+        .apply(Window.<KV<String, ActivePowerRecord>>into(FixedWindows.of(downsampleInterval)))
         // Aggregate per window for every key
         .apply(Combine.<String, ActivePowerRecord, Stats>perKey(new StatsAggregation()))
         .setCoder(KvCoder.of(StringUtf8Coder.of(), SerializableCoder.of(Stats.class)))
diff --git a/theodolite-benchmarks/uc2-beam/src/main/resources/META-INF/application.properties b/theodolite-benchmarks/uc2-beam/src/main/resources/META-INF/application.properties
index 557689f07..3f81e6005 100644
--- a/theodolite-benchmarks/uc2-beam/src/main/resources/META-INF/application.properties
+++ b/theodolite-benchmarks/uc2-beam/src/main/resources/META-INF/application.properties
@@ -4,10 +4,10 @@ application.version=0.0.1
 kafka.bootstrap.servers=localhost:9092
 kafka.input.topic=input
 kafka.output.topic=output
-kafka.window.duration.minutes=1
-
 schema.registry.url=http://localhost:8081
 
+downsample.interval.minutes=1
+
 specific.avro.reader=true
 
 # Kafka Settings
diff --git a/theodolite-benchmarks/uc2-flink/src/main/java/rocks/theodolite/benchmarks/uc2/flink/ConfigurationKeys.java b/theodolite-benchmarks/uc2-flink/src/main/java/rocks/theodolite/benchmarks/uc2/flink/ConfigurationKeys.java
index a2cdf8f84..8ea245eba 100644
--- a/theodolite-benchmarks/uc2-flink/src/main/java/rocks/theodolite/benchmarks/uc2/flink/ConfigurationKeys.java
+++ b/theodolite-benchmarks/uc2-flink/src/main/java/rocks/theodolite/benchmarks/uc2/flink/ConfigurationKeys.java
@@ -17,7 +17,7 @@ public final class ConfigurationKeys {
 
   public static final String SCHEMA_REGISTRY_URL = "schema.registry.url";
 
-  public static final String KAFKA_WINDOW_DURATION_MINUTES = "kafka.window.duration.minutes";
+  public static final String DOWNSAMPLE_INTERVAL_MINUTES = "downsample.interval.minutes";
 
   public static final String FLINK_STATE_BACKEND = "flink.state.backend";
 
diff --git a/theodolite-benchmarks/uc2-flink/src/main/java/rocks/theodolite/benchmarks/uc2/flink/HistoryServiceFlinkJob.java b/theodolite-benchmarks/uc2-flink/src/main/java/rocks/theodolite/benchmarks/uc2/flink/HistoryServiceFlinkJob.java
index eda3c1a98..3ef6ddbd5 100644
--- a/theodolite-benchmarks/uc2-flink/src/main/java/rocks/theodolite/benchmarks/uc2/flink/HistoryServiceFlinkJob.java
+++ b/theodolite-benchmarks/uc2-flink/src/main/java/rocks/theodolite/benchmarks/uc2/flink/HistoryServiceFlinkJob.java
@@ -39,9 +39,8 @@ public final class HistoryServiceFlinkJob extends AbstractFlinkService {
     final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL);
     final String inputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC);
     final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC);
-    final int windowDurationMinutes =
-        this.config.getInt(ConfigurationKeys.KAFKA_WINDOW_DURATION_MINUTES);
-    final Time windowDuration = Time.minutes(windowDurationMinutes);
+    final Time windowDuration = Time.minutes(
+        this.config.getInt(ConfigurationKeys.DOWNSAMPLE_INTERVAL_MINUTES));
     final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true);
 
     final KafkaConnectorFactory kafkaConnector = new KafkaConnectorFactory(
diff --git a/theodolite-benchmarks/uc2-flink/src/main/resources/META-INF/application.properties b/theodolite-benchmarks/uc2-flink/src/main/resources/META-INF/application.properties
index 3e8b8e08e..f12c875e2 100644
--- a/theodolite-benchmarks/uc2-flink/src/main/resources/META-INF/application.properties
+++ b/theodolite-benchmarks/uc2-flink/src/main/resources/META-INF/application.properties
@@ -6,7 +6,7 @@ kafka.input.topic=input
 kafka.output.topic=output
 schema.registry.url=http://localhost:8081
 
-kafka.window.duration.minutes=1
+downsample.interval.minutes=1
 
 # Flink configuration
-checkpointing.interval.ms=100
+checkpointing.interval.ms=1000
diff --git a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/HistoryService.java b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/HistoryService.java
index ce95d1a8c..6104b8a70 100644
--- a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/HistoryService.java
+++ b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/HistoryService.java
@@ -2,7 +2,6 @@ package rocks.theodolite.benchmarks.uc2.hazelcastjet;
 
 import com.google.common.math.StatsAccumulator;
 import io.confluent.kafka.serializers.KafkaAvroDeserializer;
-
 import java.time.Duration;
 import java.util.Properties;
 import org.apache.kafka.common.serialization.StringDeserializer;
@@ -22,8 +21,8 @@ public class HistoryService extends HazelcastJetService {
 
 
   /**
-   * Constructs the use case logic for UC2.
-   * Retrieves the needed values and instantiates a pipeline factory.
+   * Constructs the use case logic for UC2. Retrieves the needed values and instantiates a pipeline
+   * factory.
    */
   public HistoryService() {
     super(LOGGER);
@@ -38,11 +37,10 @@ public class HistoryService extends HazelcastJetService {
             StringSerializer.class.getCanonicalName());
 
     final String kafkaOutputTopic =
-        config.getProperty(ConfigurationKeys.KAFKA_OUTPUT_TOPIC).toString();
+        this.config.getProperty(ConfigurationKeys.KAFKA_OUTPUT_TOPIC).toString();
 
     final Duration downsampleInterval = Duration.ofMinutes(
-        Integer.parseInt(config.getProperty(
-            ConfigurationKeys.DOWNSAMPLE_INTERVAL).toString()));
+        this.config.getInt(ConfigurationKeys.DOWNSAMPLE_INTERVAL_MINUTES));
 
     this.pipelineFactory = new Uc2PipelineFactory(
         kafkaProps,
diff --git a/theodolite-benchmarks/uc2-hazelcastjet/src/main/resources/META-INF/application.properties b/theodolite-benchmarks/uc2-hazelcastjet/src/main/resources/META-INF/application.properties
index 32db468dc..636584ce9 100644
--- a/theodolite-benchmarks/uc2-hazelcastjet/src/main/resources/META-INF/application.properties
+++ b/theodolite-benchmarks/uc2-hazelcastjet/src/main/resources/META-INF/application.properties
@@ -4,7 +4,6 @@ application.version=0.0.1
 kafka.bootstrap.servers=localhost:9092
 kafka.input.topic=input
 kafka.output.topic=output
-kafka.window.duration.minutes=1
-
 schema.registry.url=http://localhost:8081
 
+downsample.interval.minutes=1
diff --git a/theodolite-benchmarks/uc2-kstreams/src/main/java/rocks/theodolite/benchmarks/uc2/kstreams/HistoryService.java b/theodolite-benchmarks/uc2-kstreams/src/main/java/rocks/theodolite/benchmarks/uc2/kstreams/HistoryService.java
index 4afc2d91e..8e4d29053 100644
--- a/theodolite-benchmarks/uc2-kstreams/src/main/java/rocks/theodolite/benchmarks/uc2/kstreams/HistoryService.java
+++ b/theodolite-benchmarks/uc2-kstreams/src/main/java/rocks/theodolite/benchmarks/uc2/kstreams/HistoryService.java
@@ -1,7 +1,6 @@
 package rocks.theodolite.benchmarks.uc2.kstreams;
 
 import java.time.Duration;
-import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import org.apache.commons.configuration2.Configuration;
 import org.apache.kafka.streams.KafkaStreams;
@@ -18,8 +17,6 @@ public class HistoryService {
   private final Configuration config = ServiceConfigurations.createWithDefaults();
 
   private final CompletableFuture<Void> stopEvent = new CompletableFuture<>();
-  private final int windowDurationMinutes = Integer
-      .parseInt(Objects.requireNonNullElse(System.getenv("KAFKA_WINDOW_DURATION_MINUTES"), "60"));
 
   /**
    * Start the service.
@@ -36,7 +33,8 @@ public class HistoryService {
     final Uc2KafkaStreamsBuilder uc2KafkaStreamsBuilder = new Uc2KafkaStreamsBuilder(this.config);
     uc2KafkaStreamsBuilder
         .outputTopic(this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC))
-        .windowDuration(Duration.ofMinutes(this.windowDurationMinutes));
+        .windowDuration(Duration.ofMinutes(
+            this.config.getInt(ConfigurationKeys.DOWNSAMPLE_INTERVAL_MINUTES)));
 
     final KafkaStreams kafkaStreams = uc2KafkaStreamsBuilder.build();
 
diff --git a/theodolite-benchmarks/uc2-kstreams/src/main/resources/META-INF/application.properties b/theodolite-benchmarks/uc2-kstreams/src/main/resources/META-INF/application.properties
index 7765cbff3..afba990c1 100644
--- a/theodolite-benchmarks/uc2-kstreams/src/main/resources/META-INF/application.properties
+++ b/theodolite-benchmarks/uc2-kstreams/src/main/resources/META-INF/application.properties
@@ -4,9 +4,9 @@ application.version=0.0.1
 kafka.bootstrap.servers=localhost:9092
 kafka.input.topic=input
 kafka.output.topic=output
-kafka.window.duration.minutes=1
-
 schema.registry.url=http://localhost:8081
 
+downsample.interval.minutes=1
+
 # Kafka Streams Config
 commit.interval.ms=5000	
diff --git a/theodolite-benchmarks/uc4-beam/src/main/java/rocks/theodolite/benchmarks/uc4/beam/PipelineFactory.java b/theodolite-benchmarks/uc4-beam/src/main/java/rocks/theodolite/benchmarks/uc4/beam/PipelineFactory.java
index 955f71015..6627235dc 100644
--- a/theodolite-benchmarks/uc4-beam/src/main/java/rocks/theodolite/benchmarks/uc4/beam/PipelineFactory.java
+++ b/theodolite-benchmarks/uc4-beam/src/main/java/rocks/theodolite/benchmarks/uc4/beam/PipelineFactory.java
@@ -73,7 +73,7 @@ public class PipelineFactory extends AbstractPipelineFactory {
         this.config.getString(ConfigurationKeys.KAFKA_CONFIGURATION_TOPIC);
 
     final Duration duration = Duration.standardSeconds(
-        this.config.getInt(ConfigurationKeys.KAFKA_WINDOW_DURATION_MINUTES));
+        this.config.getInt(ConfigurationKeys.DOWNSAMPLE_INTERVAL_MINUTES));
     final Duration triggerDelay = Duration.standardSeconds(
         this.config.getInt(ConfigurationKeys.TRIGGER_INTERVAL));
     final Duration gracePeriod = Duration.standardSeconds(
-- 
GitLab