From 42b048d08f40d906bbe9a810073552dc7fac4ef2 Mon Sep 17 00:00:00 2001
From: lorenz <stu203404@mail.uni-kiel.de>
Date: Fri, 12 Nov 2021 15:42:47 +0100
Subject: [PATCH] Rename beam-common kafkaReader and outsource
 uc3-MapTimeFormatFunction

---
 ...java => KafkaActivePowerRecordReader.java} |  1 -
 .../main/java/application/MapTimeFormat.java  | 25 +++++++++++++++++++
 .../java/application/Uc3ApplicationBeam.java  | 15 +----------
 3 files changed, 26 insertions(+), 15 deletions(-)
 rename theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/{KafkaAggregatedPowerRecordReader.java => KafkaActivePowerRecordReader.java} (99%)
 create mode 100644 theodolite-benchmarks/uc3-beam-flink/src/main/java/application/MapTimeFormat.java

diff --git a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaAggregatedPowerRecordReader.java b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaActivePowerRecordReader.java
similarity index 99%
rename from theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaAggregatedPowerRecordReader.java
rename to theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaActivePowerRecordReader.java
index c91987804..1e97ec57c 100644
--- a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaAggregatedPowerRecordReader.java
+++ b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaActivePowerRecordReader.java
@@ -21,7 +21,6 @@ public class KafkaAggregatedPowerRecordReader extends
   private final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> reader;
 
 
-
   /**
    * Instantiates a {@link PTransform} that reads from Kafka with the given Configuration.
    */
diff --git a/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/MapTimeFormat.java b/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/MapTimeFormat.java
new file mode 100644
index 000000000..1c61d7272
--- /dev/null
+++ b/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/MapTimeFormat.java
@@ -0,0 +1,25 @@
+package application;
+
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.KV;
+import titan.ccp.model.records.ActivePowerRecord;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+
+public class MapTimeFormat extends SimpleFunction<KV<String, ActivePowerRecord>, KV<HourOfDayKey,
+    ActivePowerRecord>> {
+    private final ZoneId zone = ZoneId.of("Europe/Paris");
+  final StatsKeyFactory<HourOfDayKey> keyFactory = new HourOfDayKeyFactory();
+
+    @Override
+    public KV<application.HourOfDayKey, ActivePowerRecord> apply(
+    final KV<String, ActivePowerRecord> kv) {
+      final Instant instant = Instant.ofEpochMilli(kv.getValue().getTimestamp());
+      final LocalDateTime dateTime = LocalDateTime.ofInstant(instant, this.zone);
+      return KV.of(keyFactory.createKey(kv.getValue().getIdentifier(), dateTime),
+          kv.getValue());
+    }
+  }
+}
diff --git a/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/Uc3ApplicationBeam.java b/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/Uc3ApplicationBeam.java
index b80b0dfd3..a058e6c58 100644
--- a/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/Uc3ApplicationBeam.java
+++ b/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/Uc3ApplicationBeam.java
@@ -123,20 +123,7 @@ public final class Uc3ApplicationBeam {
     // Read from Kafka
     pipeline.apply(kafka)
         // Map to correct time format
-        .apply(MapElements.via(
-            new SimpleFunction<KV<String, ActivePowerRecord>, KV<HourOfDayKey,
-                ActivePowerRecord>>() {
-              private final ZoneId zone = ZoneId.of("Europe/Paris");
-
-              @Override
-              public KV<application.HourOfDayKey, ActivePowerRecord> apply(
-                  final KV<String, ActivePowerRecord> kv) {
-                final Instant instant = Instant.ofEpochMilli(kv.getValue().getTimestamp());
-                final LocalDateTime dateTime = LocalDateTime.ofInstant(instant, this.zone);
-                return KV.of(keyFactory.createKey(kv.getValue().getIdentifier(), dateTime),
-                    kv.getValue());
-              }
-            }))
+        .apply(MapElements.via(new MapTimeFormat()))
 
         // Apply a sliding window
         .apply(Window
-- 
GitLab