From ba916c0d4976688d83d44b1b8cc052372264626f Mon Sep 17 00:00:00 2001 From: lorenz <stu203404@mail.uni-kiel.de> Date: Tue, 7 Dec 2021 19:34:28 +0100 Subject: [PATCH] Change types of consumerConfig + types of WriterTransformations uc1-3 --- .../src/main/java/application/Uc1BeamPipeline.java | 2 +- .../src/main/java/application/Uc2BeamPipeline.java | 9 ++++----- .../src/main/java/application/Uc3BeamPipeline.java | 9 ++------- 3 files changed, 7 insertions(+), 13 deletions(-) diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java index 024822080..46fa53756 100644 --- a/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java +++ b/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java @@ -34,7 +34,7 @@ public final class Uc1BeamPipeline extends AbstractPipeline { cr.registerCoderForClass(ActivePowerRecord.class, AvroCoder.of(ActivePowerRecord.SCHEMA$)); // build KafkaConsumerConfig - final Map consumerConfig = buildConsumerConfig(); + final Map<String, Object> consumerConfig = buildConsumerConfig(); // Create Pipeline transformations final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> kafka = diff --git a/theodolite-benchmarks/uc2-beam/src/main/java/application/Uc2BeamPipeline.java b/theodolite-benchmarks/uc2-beam/src/main/java/application/Uc2BeamPipeline.java index f518ba8a7..3b43dc47a 100644 --- a/theodolite-benchmarks/uc2-beam/src/main/java/application/Uc2BeamPipeline.java +++ b/theodolite-benchmarks/uc2-beam/src/main/java/application/Uc2BeamPipeline.java @@ -17,7 +17,6 @@ import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.POutput; import org.apache.commons.configuration2.Configuration; import org.apache.kafka.common.serialization.StringSerializer; import org.joda.time.Duration; @@ -38,7 +37,7 @@ import titan.ccp.model.records.ActivePowerRecord; */ public final class Uc2BeamPipeline extends AbstractPipeline { - protected Uc2BeamPipeline(final PipelineOptions options,final Configuration config) { + protected Uc2BeamPipeline(final PipelineOptions options, final Configuration config) { super(options, config); // Additional needed variables final String outputTopic = config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC); @@ -48,7 +47,7 @@ public final class Uc2BeamPipeline extends AbstractPipeline { final Duration duration = Duration.standardMinutes(windowDurationMinutes); // Build kafka configuration - final Map consumerConfig = buildConsumerConfig(); + final Map<String, Object> consumerConfig = buildConsumerConfig(); // Set Coders for Classes that will be distributed final CoderRegistry cr = this.getCoderRegistry(); @@ -67,8 +66,8 @@ public final class Uc2BeamPipeline extends AbstractPipeline { final StatsToString statsToString = new StatsToString(); // Write to Kafka - final PTransform<PCollection<KV<String, String>>, POutput> kafkaWriter = - new KafkaWriterTransformation(bootstrapServer, outputTopic, StringSerializer.class); + final KafkaWriterTransformation<String> kafkaWriter = + new KafkaWriterTransformation<>(bootstrapServer, outputTopic, StringSerializer.class); // Apply pipeline transformations this.apply(kafkaActivePowerRecordReader) diff --git a/theodolite-benchmarks/uc3-beam/src/main/java/application/Uc3BeamPipeline.java b/theodolite-benchmarks/uc3-beam/src/main/java/application/Uc3BeamPipeline.java index eda0d8161..7424a19fe 100644 --- a/theodolite-benchmarks/uc3-beam/src/main/java/application/Uc3BeamPipeline.java +++ b/theodolite-benchmarks/uc3-beam/src/main/java/application/Uc3BeamPipeline.java @@ -10,14 +10,11 @@ import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.MapElements; -import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; import org.apache.beam.sdk.transforms.windowing.AfterWatermark; import org.apache.beam.sdk.transforms.windowing.SlidingWindows; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.POutput; import org.apache.commons.configuration2.Configuration; import org.apache.kafka.common.serialization.StringSerializer; import org.joda.time.Duration; @@ -63,7 +60,6 @@ public final class Uc3BeamPipeline extends AbstractPipeline { registerCoders(cr); // Read from Kafka - @SuppressWarnings({"rawtypes", "unchecked"}) final KafkaActivePowerTimestampReader kafka = new KafkaActivePowerTimestampReader(bootstrapServer, inputTopic, consumerConfig); @@ -74,9 +70,8 @@ public final class Uc3BeamPipeline extends AbstractPipeline { final HourOfDayWithStats hourOfDayWithStats = new HourOfDayWithStats(); // Write to Kafka - @SuppressWarnings({"rawtypes", "unchecked"}) - final PTransform<PCollection<KV<String, String>>, POutput> kafkaWriter = - new KafkaWriterTransformation(bootstrapServer, outputTopic, StringSerializer.class); + final KafkaWriterTransformation<String> kafkaWriter = + new KafkaWriterTransformation<>(bootstrapServer, outputTopic, StringSerializer.class); this.apply(kafka) // Map to correct time format -- GitLab