From 3739718365185fc9ee5c6f41ec3c18abf7a78102 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?S=C3=B6ren=20Henning?= <soeren.henning@email.uni-kiel.de>
Date: Mon, 31 Jan 2022 19:01:21 +0100
Subject: [PATCH] Fix (and suppress) deprecation warnings

---
 .../kafkastreams/KafkaStreamsBuilder.java     | 31 ++++++++++---------
 .../uc2/streamprocessing/TopologyBuilder.java |  2 +-
 .../uc3/streamprocessing/TopologyBuilder.java | 13 ++++----
 .../uc4/streamprocessing/TopologyBuilder.java |  2 +-
 4 files changed, 26 insertions(+), 22 deletions(-)

diff --git a/theodolite-benchmarks/kstreams-commons/src/main/java/theodolite/commons/kafkastreams/KafkaStreamsBuilder.java b/theodolite-benchmarks/kstreams-commons/src/main/java/theodolite/commons/kafkastreams/KafkaStreamsBuilder.java
index 89bd3147f..fe3cf484a 100644
--- a/theodolite-benchmarks/kstreams-commons/src/main/java/theodolite/commons/kafkastreams/KafkaStreamsBuilder.java
+++ b/theodolite-benchmarks/kstreams-commons/src/main/java/theodolite/commons/kafkastreams/KafkaStreamsBuilder.java
@@ -70,18 +70,15 @@ public abstract class KafkaStreamsBuilder {
 
     // optional configurations
     this.setOptionalProperty(propBuilder, StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG,
-        this.config::getLong,
-        p -> p >= 0);
+        this.config::getLong, p -> p >= 0);
     this.setOptionalProperty(propBuilder, StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG,
         this.config::getInt, p -> p > 0);
     this.setOptionalProperty(propBuilder, StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,
-        this.config::getInt,
-        p -> p >= 0);
+        this.config::getInt, p -> p >= 0);
     this.setOptionalProperty(propBuilder, StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
         this.config::getInt, p -> p >= 0);
     this.setOptionalProperty(propBuilder, StreamsConfig.MAX_TASK_IDLE_MS_CONFIG,
-        this.config::getLong,
-        p -> p >= 0);
+        this.config::getLong, p -> p >= 0);
     this.setOptionalProperty(propBuilder, StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG,
         this.config::getInt, p -> p >= 1);
     this.setOptionalProperty(propBuilder, StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG,
@@ -89,22 +86,28 @@ public abstract class KafkaStreamsBuilder {
     this.setOptionalProperty(propBuilder, StreamsConfig.NUM_STREAM_THREADS_CONFIG,
         this.config::getInt, p -> p > 0);
     this.setOptionalProperty(propBuilder, StreamsConfig.POLL_MS_CONFIG,
-        this.config::getLong,
-        p -> p >= 0);
+        this.config::getLong, p -> p >= 0);
     this.setOptionalProperty(propBuilder, StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
-        this.config::getString, p -> StreamsConfig.AT_LEAST_ONCE.equals(p)
-            || StreamsConfig.EXACTLY_ONCE.equals(p) || StreamsConfig.EXACTLY_ONCE_BETA.equals(p));
+        this.config::getString, this::validateProcessingGuarantee);
     this.setOptionalProperty(propBuilder, StreamsConfig.REPLICATION_FACTOR_CONFIG,
         this.config::getInt, p -> p >= 0);
 
-    if (this.config.containsKey(StreamsConfig.TOPOLOGY_OPTIMIZATION)
-        && this.config.getBoolean(StreamsConfig.TOPOLOGY_OPTIMIZATION)) {
-      propBuilder.set(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
+    if (this.config.containsKey(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG)
+        && this.config.getBoolean(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG)) {
+      propBuilder.set(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
     }
 
     return propBuilder.build();
   }
 
+  @SuppressWarnings("deprecation")
+  private boolean validateProcessingGuarantee(final String processingGuarantee) {
+    return StreamsConfig.AT_LEAST_ONCE.equals(processingGuarantee)
+        // We continue support EXACTLY_ONCE to allow benchmarking it against v2
+        || StreamsConfig.EXACTLY_ONCE.equals(processingGuarantee)
+        || StreamsConfig.EXACTLY_ONCE_V2.equals(processingGuarantee);
+  }
+
   /**
    * Method to implement a {@link Topology} for a {@code KafkaStreams} application.
    *
@@ -116,7 +119,7 @@ public abstract class KafkaStreamsBuilder {
    * Builds the {@link KafkaStreams} instance.
    */
   public KafkaStreams build() {
-    // Create the Kafka streams instance.
+    // Create the Kafka Streams instance.
     final Properties properties = this.buildProperties();
     return new KafkaStreams(this.buildTopology(properties), properties);
   }
diff --git a/theodolite-benchmarks/uc2-kstreams/src/main/java/theodolite/uc2/streamprocessing/TopologyBuilder.java b/theodolite-benchmarks/uc2-kstreams/src/main/java/theodolite/uc2/streamprocessing/TopologyBuilder.java
index eda7c495a..21dcf14a9 100644
--- a/theodolite-benchmarks/uc2-kstreams/src/main/java/theodolite/uc2/streamprocessing/TopologyBuilder.java
+++ b/theodolite-benchmarks/uc2-kstreams/src/main/java/theodolite/uc2/streamprocessing/TopologyBuilder.java
@@ -53,7 +53,7 @@ public class TopologyBuilder {
             Consumed.with(Serdes.String(),
                 this.srAvroSerdeFactory.<ActivePowerRecord>forValues()))
         .groupByKey()
-        .windowedBy(TimeWindows.of(this.duration))
+        .windowedBy(TimeWindows.ofSizeWithNoGrace(this.duration))
         // .aggregate(
         // () -> 0.0,
         // (key, activePowerRecord, agg) -> agg + activePowerRecord.getValueInW(),
diff --git a/theodolite-benchmarks/uc3-kstreams/src/main/java/theodolite/uc3/streamprocessing/TopologyBuilder.java b/theodolite-benchmarks/uc3-kstreams/src/main/java/theodolite/uc3/streamprocessing/TopologyBuilder.java
index 1e976c071..4c63e21f3 100644
--- a/theodolite-benchmarks/uc3-kstreams/src/main/java/theodolite/uc3/streamprocessing/TopologyBuilder.java
+++ b/theodolite-benchmarks/uc3-kstreams/src/main/java/theodolite/uc3/streamprocessing/TopologyBuilder.java
@@ -60,17 +60,18 @@ public class TopologyBuilder {
     final Serde<HourOfDayKey> keySerde = HourOfDayKeySerde.create();
 
     this.builder
-        .stream(this.inputTopic,
-            Consumed.with(Serdes.String(),
-                this.srAvroSerdeFactory.<ActivePowerRecord>forValues()))
+        .stream(this.inputTopic, Consumed.with(
+            Serdes.String(),
+            this.srAvroSerdeFactory.<ActivePowerRecord>forValues()))
         .selectKey((key, value) -> {
           final Instant instant = Instant.ofEpochMilli(value.getTimestamp());
           final LocalDateTime dateTime = LocalDateTime.ofInstant(instant, this.zone);
           return keyFactory.createKey(value.getIdentifier(), dateTime);
         })
-        .groupByKey(
-            Grouped.with(keySerde, this.srAvroSerdeFactory.forValues()))
-        .windowedBy(TimeWindows.of(this.aggregtionDuration).advanceBy(this.aggregationAdvance))
+        .groupByKey(Grouped.with(keySerde, this.srAvroSerdeFactory.forValues()))
+        .windowedBy(TimeWindows
+            .ofSizeWithNoGrace(this.aggregtionDuration)
+            .advanceBy(this.aggregationAdvance))
         .aggregate(
             () -> Stats.of(),
             (k, record, stats) -> StatsFactory.accumulate(stats, record.getValueInW()),
diff --git a/theodolite-benchmarks/uc4-kstreams/src/main/java/theodolite/uc4/streamprocessing/TopologyBuilder.java b/theodolite-benchmarks/uc4-kstreams/src/main/java/theodolite/uc4/streamprocessing/TopologyBuilder.java
index 623870313..712b20cb6 100644
--- a/theodolite-benchmarks/uc4-kstreams/src/main/java/theodolite/uc4/streamprocessing/TopologyBuilder.java
+++ b/theodolite-benchmarks/uc4-kstreams/src/main/java/theodolite/uc4/streamprocessing/TopologyBuilder.java
@@ -146,7 +146,7 @@ public class TopologyBuilder {
         .groupByKey(Grouped.with(
             SensorParentKeySerde.serde(),
             this.srAvroSerdeFactory.forValues()))
-        .windowedBy(TimeWindows.of(this.emitPeriod).grace(this.gracePeriod))
+        .windowedBy(TimeWindows.ofSizeAndGrace(this.emitPeriod, this.gracePeriod))
         .reduce(
             // TODO Configurable window aggregation function
             (oldVal, newVal) -> newVal.getTimestamp() >= oldVal.getTimestamp() ? newVal : oldVal,
-- 
GitLab