From 149b60448c2acea53babf89ff22cb863854b17bc Mon Sep 17 00:00:00 2001
From: lorenz <stu203404@mail.uni-kiel.de>
Date: Fri, 17 Dec 2021 11:45:06 +0100
Subject: [PATCH] Add TimestampPolicy to uc1-2

---
 .../src/main/java/application/Uc1BeamPipeline.java     | 10 +++-------
 .../src/main/java/application/Uc2BeamPipeline.java     |  9 +++------
 2 files changed, 6 insertions(+), 13 deletions(-)

diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java
index 46fa53756..b146ce465 100644
--- a/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java
+++ b/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java
@@ -5,14 +5,10 @@ import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
 import org.apache.commons.configuration2.Configuration;
 import theodolite.commons.beam.AbstractPipeline;
-import theodolite.commons.beam.kafka.KafkaActivePowerRecordReader;
+import theodolite.commons.beam.kafka.KafkaActivePowerTimestampReader;
 import titan.ccp.model.records.ActivePowerRecord;
 
 
@@ -37,8 +33,8 @@ public final class Uc1BeamPipeline extends AbstractPipeline {
     final Map<String, Object> consumerConfig = buildConsumerConfig();
 
     // Create Pipeline transformations
-    final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> kafka =
-        new KafkaActivePowerRecordReader(bootstrapServer, inputTopic, consumerConfig);
+    final KafkaActivePowerTimestampReader kafka =
+        new KafkaActivePowerTimestampReader(bootstrapServer, inputTopic, consumerConfig);
 
     final LogKeyValue logKeyValue = new LogKeyValue();
     final MapToGson mapToGson = new MapToGson();
diff --git a/theodolite-benchmarks/uc2-beam/src/main/java/application/Uc2BeamPipeline.java b/theodolite-benchmarks/uc2-beam/src/main/java/application/Uc2BeamPipeline.java
index 3b43dc47a..e69ccce29 100644
--- a/theodolite-benchmarks/uc2-beam/src/main/java/application/Uc2BeamPipeline.java
+++ b/theodolite-benchmarks/uc2-beam/src/main/java/application/Uc2BeamPipeline.java
@@ -11,18 +11,15 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
 import org.apache.commons.configuration2.Configuration;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.joda.time.Duration;
 import theodolite.commons.beam.AbstractPipeline;
 import theodolite.commons.beam.ConfigurationKeys;
-import theodolite.commons.beam.kafka.KafkaActivePowerRecordReader;
+import theodolite.commons.beam.kafka.KafkaActivePowerTimestampReader;
 import theodolite.commons.beam.kafka.KafkaWriterTransformation;
 import titan.ccp.model.records.ActivePowerRecord;
 
@@ -58,9 +55,9 @@ public final class Uc2BeamPipeline extends AbstractPipeline {
 
 
     // Read from Kafka
-    final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>>
+    final KafkaActivePowerTimestampReader
         kafkaActivePowerRecordReader =
-        new KafkaActivePowerRecordReader(bootstrapServer, inputTopic, consumerConfig);
+        new KafkaActivePowerTimestampReader(bootstrapServer, inputTopic, consumerConfig);
 
     // Transform into String
     final StatsToString statsToString = new StatsToString();
-- 
GitLab