From c8034d6cde72de5c6e67da9aebf7a8169e755666 Mon Sep 17 00:00:00 2001
From: lorenz <stu203404@mail.uni-kiel.de>
Date: Fri, 22 Oct 2021 10:28:31 +0200
Subject: [PATCH] Migrate uc3-beam-flink

Co-authored-by: Jan Bensien <stu128012@mail.uni-kiel.de>
---
 theodolite-benchmarks/settings.gradle         |   1 +
 .../uc3-beam-flink/Dockerfile                 |   5 +
 .../uc3-beam-flink/build.gradle               |  30 +++
 .../java/application/EventTimePolicy.java     |  34 ++++
 .../main/java/application/HourOfDayKey.java   |  35 ++++
 .../java/application/HourOfDayKeyFactory.java |  24 +++
 .../java/application/HourOfDayKeySerde.java   |  32 ++++
 .../java/application/HourOfDaykeyCoder.java   |  55 ++++++
 .../java/application/StatsAggregation.java    |  45 +++++
 .../java/application/StatsKeyFactory.java     |  17 ++
 .../java/application/Uc3ApplicationBeam.java  | 172 ++++++++++++++++++
 11 files changed, 450 insertions(+)
 create mode 100644 theodolite-benchmarks/uc3-beam-flink/Dockerfile
 create mode 100644 theodolite-benchmarks/uc3-beam-flink/build.gradle
 create mode 100644 theodolite-benchmarks/uc3-beam-flink/src/main/java/application/EventTimePolicy.java
 create mode 100644 theodolite-benchmarks/uc3-beam-flink/src/main/java/application/HourOfDayKey.java
 create mode 100644 theodolite-benchmarks/uc3-beam-flink/src/main/java/application/HourOfDayKeyFactory.java
 create mode 100644 theodolite-benchmarks/uc3-beam-flink/src/main/java/application/HourOfDayKeySerde.java
 create mode 100644 theodolite-benchmarks/uc3-beam-flink/src/main/java/application/HourOfDaykeyCoder.java
 create mode 100644 theodolite-benchmarks/uc3-beam-flink/src/main/java/application/StatsAggregation.java
 create mode 100644 theodolite-benchmarks/uc3-beam-flink/src/main/java/application/StatsKeyFactory.java
 create mode 100644 theodolite-benchmarks/uc3-beam-flink/src/main/java/application/Uc3ApplicationBeam.java

diff --git a/theodolite-benchmarks/settings.gradle b/theodolite-benchmarks/settings.gradle
index 208292aaf..88448110f 100644
--- a/theodolite-benchmarks/settings.gradle
+++ b/theodolite-benchmarks/settings.gradle
@@ -19,6 +19,7 @@ include 'uc2-beam-flink'
 include 'uc3-load-generator'
 include 'uc3-kstreams'
 include 'uc3-flink'
+include 'uc3-beam-flink'
 
 include 'uc4-load-generator'
 include 'uc4-kstreams'
diff --git a/theodolite-benchmarks/uc3-beam-flink/Dockerfile b/theodolite-benchmarks/uc3-beam-flink/Dockerfile
new file mode 100644
index 000000000..f16ec202e
--- /dev/null
+++ b/theodolite-benchmarks/uc3-beam-flink/Dockerfile
@@ -0,0 +1,5 @@
+FROM openjdk:8-slim
+
+ADD build/distributions/uc4-application-flink.tar /
+
+CMD /uc4-application-flink/bin/uc4-application-flink --runner=FlinkRunner --flinkMaster=flink-jobmanager:8081 --streaming --parallelism=$PARALLELISM --disableMetrics=true --fasterCopy --stateBackend=rocksdb --stateBackendStoragePath=file:///data/flink/checkpoints
\ No newline at end of file
diff --git a/theodolite-benchmarks/uc3-beam-flink/build.gradle b/theodolite-benchmarks/uc3-beam-flink/build.gradle
new file mode 100644
index 000000000..b202c53c0
--- /dev/null
+++ b/theodolite-benchmarks/uc3-beam-flink/build.gradle
@@ -0,0 +1,30 @@
+plugins {
+  id 'theodolite.kstreams'
+}
+
+allprojects {
+  repositories {
+    maven {
+      url 'https://packages.confluent.io/maven/'
+    }
+    mavenCentral()
+  }
+}
+
+dependencies {
+  compile group: 'org.apache.beam', name: 'beam-sdks-java-core', version: '2.22.0'
+  compile group: 'org.apache.beam', name: 'beam-runners-flink-1.12', version: '2.27.0'
+
+  compile('org.apache.beam:beam-sdks-java-io-kafka:2.22.0'){
+    exclude group: 'org.apache.kafka', module: 'kafka-clients'
+  }
+  compile group: 'org.slf4j', name: 'slf4j-simple', version: '1.7.30'
+
+  runtime 'org.apache.beam:beam-runners-direct-java:2.22.0'
+  runtime 'org.slf4j:slf4j-api:1.7.32'
+  runtime 'org.slf4j:slf4j-jdk14:1.7.32'
+}
+
+
+// This is the path of the main class, stored within ./src/main/java/
+mainClassName = 'application.Uc3ApplicationBeam'
diff --git a/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/EventTimePolicy.java b/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/EventTimePolicy.java
new file mode 100644
index 000000000..993951cb3
--- /dev/null
+++ b/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/EventTimePolicy.java
@@ -0,0 +1,34 @@
+package application;
+
+
+import java.util.Optional;
+import org.apache.beam.sdk.io.kafka.KafkaRecord;
+import org.apache.beam.sdk.io.kafka.TimestampPolicy;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.joda.time.Instant;
+
+/**
+ * TimeStampPolicy to use event time based on the timestamp of the record value.
+ */
+public class EventTimePolicy
+    extends TimestampPolicy<String, titan.ccp.model.records.ActivePowerRecord> {
+  protected Instant currentWatermark;
+
+  public EventTimePolicy(final Optional<Instant> previousWatermark) {
+    this.currentWatermark = previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE);
+  }
+
+
+  @Override
+  public Instant getTimestampForRecord(final PartitionContext ctx,
+      final KafkaRecord<String, titan.ccp.model.records.ActivePowerRecord> record) {
+    this.currentWatermark = new Instant(record.getKV().getValue().getTimestamp());
+    return this.currentWatermark;
+  }
+
+  @Override
+  public Instant getWatermark(final PartitionContext ctx) {
+    return this.currentWatermark;
+  }
+
+}
diff --git a/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/HourOfDayKey.java b/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/HourOfDayKey.java
new file mode 100644
index 000000000..55ed8b535
--- /dev/null
+++ b/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/HourOfDayKey.java
@@ -0,0 +1,35 @@
+package application;
+
+
+import org.apache.beam.sdk.coders.DefaultCoder;
+import org.apache.beam.sdk.coders.AvroCoder;
+
+/**
+ * Composed key of an hour of the day and a sensor id.
+ */
+
+@DefaultCoder(AvroCoder.class)
+public class HourOfDayKey{
+
+  private final int hourOfDay;
+  private final String sensorId;
+
+  public HourOfDayKey(final int hourOfDay, final String sensorId) {
+    this.hourOfDay = hourOfDay;
+    this.sensorId = sensorId;
+  }
+
+  public int getHourOfDay() {
+    return this.hourOfDay;
+  }
+
+  public String getSensorId() {
+    return this.sensorId;
+  }
+
+  @Override
+  public String toString() {
+    return this.sensorId + ";" + this.hourOfDay;
+  }
+
+}
diff --git a/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/HourOfDayKeyFactory.java b/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/HourOfDayKeyFactory.java
new file mode 100644
index 000000000..b993a0199
--- /dev/null
+++ b/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/HourOfDayKeyFactory.java
@@ -0,0 +1,24 @@
+package application;
+
+import java.io.Serializable;
+import java.time.LocalDateTime;
+
+/**
+ * {@link StatsKeyFactory} for {@link HourOfDayKey}.
+ */
+public class HourOfDayKeyFactory implements StatsKeyFactory<HourOfDayKey>, Serializable {
+
+  private static final long serialVersionUID = 1L;
+
+  @Override
+  public HourOfDayKey createKey(final String sensorId, final LocalDateTime dateTime) {
+    final int hourOfDay = dateTime.getHour();
+    return new HourOfDayKey(hourOfDay, sensorId);
+  }
+
+  @Override
+  public String getSensorId(final HourOfDayKey key) {
+    return key.getSensorId();
+  }
+
+}
diff --git a/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/HourOfDayKeySerde.java b/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/HourOfDayKeySerde.java
new file mode 100644
index 000000000..a0f8e0bba
--- /dev/null
+++ b/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/HourOfDayKeySerde.java
@@ -0,0 +1,32 @@
+package application;
+
+import org.apache.kafka.common.serialization.Serde;
+import titan.ccp.common.kafka.simpleserdes.BufferSerde;
+import titan.ccp.common.kafka.simpleserdes.ReadBuffer;
+import titan.ccp.common.kafka.simpleserdes.SimpleSerdes;
+import titan.ccp.common.kafka.simpleserdes.WriteBuffer;
+
+/**
+ * {@link BufferSerde} for a {@link HourOfDayKey}. Use the {@link #create()} method to create a new
+ * Kafka {@link Serde}.
+ */
+public class HourOfDayKeySerde implements BufferSerde<HourOfDayKey> {
+
+  @Override
+  public void serialize(final WriteBuffer buffer, final HourOfDayKey data) {
+    buffer.putInt(data.getHourOfDay());
+    buffer.putString(data.getSensorId());
+  }
+
+  @Override
+  public HourOfDayKey deserialize(final ReadBuffer buffer) {
+    final int hourOfDay = buffer.getInt();
+    final String sensorId = buffer.getString();
+    return new HourOfDayKey(hourOfDay, sensorId);
+  }
+
+  public static Serde<HourOfDayKey> create() {
+    return SimpleSerdes.create(new HourOfDayKeySerde());
+  }
+
+}
diff --git a/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/HourOfDaykeyCoder.java b/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/HourOfDaykeyCoder.java
new file mode 100644
index 000000000..31fe4da7e
--- /dev/null
+++ b/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/HourOfDaykeyCoder.java
@@ -0,0 +1,55 @@
+package application;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.List;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.kafka.common.serialization.Serde;
+
+/**
+ * Wrapper Class that encapsulates a HourOfDayKeySerde in a org.apache.beam.sdk.coders.Coder.
+ */
+@SuppressWarnings("serial")
+public class HourOfDaykeyCoder extends Coder<HourOfDayKey> implements Serializable {
+  private transient Serde<HourOfDayKey> innerSerde = HourOfDayKeySerde.create(); 
+
+  @Override
+  public void encode(final HourOfDayKey value, final OutputStream outStream)
+      throws CoderException, IOException {
+    if (this.innerSerde == null) {
+      this.innerSerde = HourOfDayKeySerde.create();
+    }
+    final byte[] bytes = this.innerSerde.serializer().serialize("ser", value);
+    final byte[] sizeinBytes = ByteBuffer.allocate(4).putInt(bytes.length).array();
+    outStream.write(sizeinBytes);
+    outStream.write(bytes);
+  }
+
+  @Override
+  public HourOfDayKey decode(final InputStream inStream) throws CoderException, IOException {
+    if (this.innerSerde == null) {
+      this.innerSerde = HourOfDayKeySerde.create();
+    }
+    final byte[] sizeinBytes = new byte[4];
+    inStream.read(sizeinBytes);
+    final int size = ByteBuffer.wrap(sizeinBytes).getInt();
+    final byte[] bytes = new byte[size];
+    inStream.read(bytes);
+    return this.innerSerde.deserializer().deserialize("deser", bytes);
+  }
+
+  @Override
+  public List<? extends Coder<?>> getCoderArguments() {
+    return null;
+  }
+
+  @Override
+  public void verifyDeterministic() throws NonDeterministicException {
+
+  }
+
+}
diff --git a/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/StatsAggregation.java b/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/StatsAggregation.java
new file mode 100644
index 000000000..ee5cfc48b
--- /dev/null
+++ b/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/StatsAggregation.java
@@ -0,0 +1,45 @@
+package application;
+
+import com.google.common.math.Stats;
+import com.google.common.math.StatsAccumulator;
+import java.io.Serializable;
+import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.coders.DefaultCoder;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import titan.ccp.model.records.ActivePowerRecord;
+
+
+/**
+ * Aggregation Class for ActivePowerRecords. Creates a StatsAccumulator based on the ValueInW.
+ */
+
+@DefaultCoder(AvroCoder.class)
+public class StatsAggregation extends CombineFn<ActivePowerRecord, StatsAccumulator, Stats>
+    implements Serializable {
+  private static final long serialVersionUID = 1L;
+
+  @Override
+  public StatsAccumulator createAccumulator() {
+    return new StatsAccumulator();
+  }
+
+  @Override
+  public StatsAccumulator addInput(final StatsAccumulator accum, final ActivePowerRecord input) {
+    accum.add(input.getValueInW());
+    return accum;
+  }
+
+  @Override
+  public StatsAccumulator mergeAccumulators(final Iterable<StatsAccumulator> accums) {
+    final StatsAccumulator merged = this.createAccumulator();
+    for (final StatsAccumulator accum : accums) {
+      merged.addAll(accum.snapshot());
+    }
+    return merged;
+  }
+
+  @Override
+  public Stats extractOutput(final StatsAccumulator accum) {
+    return accum.snapshot();
+  }
+}
diff --git a/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/StatsKeyFactory.java b/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/StatsKeyFactory.java
new file mode 100644
index 000000000..820168058
--- /dev/null
+++ b/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/StatsKeyFactory.java
@@ -0,0 +1,17 @@
+package application;
+
+import java.time.LocalDateTime;
+
+/**
+ * Factory interface for creating a stats key from a sensor id and a {@link LocalDateTime} object
+ * and vice versa.
+ *
+ * @param <T> Type of the key
+ */
+public interface StatsKeyFactory<T> {
+
+  T createKey(String sensorId, LocalDateTime dateTime);
+
+  String getSensorId(T key);
+
+}
diff --git a/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/Uc3ApplicationBeam.java b/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/Uc3ApplicationBeam.java
new file mode 100644
index 000000000..796f5a8cf
--- /dev/null
+++ b/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/Uc3ApplicationBeam.java
@@ -0,0 +1,172 @@
+package application;
+
+import com.google.common.math.Stats;
+import com.google.common.math.StatsAccumulator;
+import io.confluent.kafka.serializers.KafkaAvroDeserializer;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.HashMap;
+import org.apache.beam.runners.flink.FlinkRunner;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.io.kafka.KafkaIO;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
+import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.joda.time.Duration;
+import titan.ccp.model.records.ActivePowerRecord;
+
+/**
+ * Implementation of the use case Aggregation based on Time Attributes using Apache Beam with the
+ * Flink Runner. To run locally in standalone start Kafka, Zookeeper, the schema-registry and the
+ * workload generator using the delayed_startup.sh script. And configure the Kafka, Zookeeper and
+ * Schema Registry urls accordingly. Start a Flink cluster and pass its REST adress
+ * using--flinkMaster as run parameter. To persist logs add
+ * ${workspace_loc:/uc4-application-samza/eclipseConsoleLogs.log} as Output File under Standard
+ * Input Output in Common in the Run Configuration Start via Eclipse Run.
+ */
+public class Uc3ApplicationBeam {
+
+
+  @SuppressWarnings("serial")
+  public static void main(final String[] args) {
+
+    // Set Configuration for Windows
+    final int windowDuration = Integer.parseInt(
+        System.getenv("KAFKA_WINDOW_DURATION_DAYS") != null
+            ? System.getenv("KAFKA_WINDOW_DURATION_DAYS")
+            : "30");
+    final Duration duration = Duration.standardDays(windowDuration);
+
+    final int aggregationAdvance = Integer.parseInt(
+        System.getenv("AGGREGATION_ADVANCE_DAYS") != null
+            ? System.getenv("AGGREGATION_ADVANCE_DAYS")
+            : "1");
+    final Duration advance = Duration.standardDays(aggregationAdvance);
+    final int triggerInterval = Integer.parseInt(
+        System.getenv("TRIGGER_INTERVAL") != null
+            ? System.getenv("TRIGGER_INTERVAL")
+            : "15");
+
+    final Duration triggerDelay = Duration.standardSeconds(triggerInterval);
+
+    // Set Configuration for Kafka
+    final String bootstrapServer =
+        System.getenv("KAFKA_BOOTSTRAP_SERVERS") != null ? System.getenv("KAFKA_BOOTSTRAP_SERVERS")
+            : "my-confluent-cp-kafka:9092";
+    final String inputTopic = System.getenv("INPUT") != null ? System.getenv("INPUT") : "input";
+    final String outputTopic = System.getenv("OUTPUT") != null ? System.getenv("OUTPUT") : "output";
+    final String schemaRegistryURL =
+        System.getenv("SCHEMA_REGISTRY_URL") != null ? System.getenv("SCHEMA_REGISTRY_URL")
+            : "http://my-confluent-cp-schema-registry:8081";
+
+    // Set consumer configuration for the schema registry and commits back to Kafka
+    final HashMap<String, Object> consumerConfig = new HashMap<>();
+    consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
+    consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+    consumerConfig.put("schema.registry.url", schemaRegistryURL);
+    consumerConfig.put("specific.avro.reader", "true");
+    consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "uc-application");
+    final StatsKeyFactory<HourOfDayKey> keyFactory = new HourOfDayKeyFactory();
+
+
+    final PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
+    options.setRunner(FlinkRunner.class);
+    options.setJobName("ucapplication");
+    final Pipeline pipeline = Pipeline.create(options);
+    final CoderRegistry cr = pipeline.getCoderRegistry();
+
+
+    cr.registerCoderForClass(ActivePowerRecord.class, AvroCoder.of(ActivePowerRecord.SCHEMA$));
+    cr.registerCoderForClass(HourOfDayKey.class, new HourOfDaykeyCoder());
+    cr.registerCoderForClass(StatsAggregation.class,
+        SerializableCoder.of(StatsAggregation.class));
+    cr.registerCoderForClass(StatsAccumulator.class, AvroCoder.of(StatsAccumulator.class));
+
+
+
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> kafka =
+        KafkaIO.<String, ActivePowerRecord>read()
+            .withBootstrapServers(bootstrapServer)
+            .withTopic(inputTopic)
+            .withKeyDeserializer(StringDeserializer.class)
+            .withValueDeserializerAndCoder((Class) KafkaAvroDeserializer.class,
+                AvroCoder.of(ActivePowerRecord.class))
+            .withConsumerConfigUpdates(consumerConfig)
+            // Set TimeStampPolicy for event time
+            .withTimestampPolicyFactory(
+                (tp, previousWaterMark) -> new EventTimePolicy(previousWaterMark))
+            .withoutMetadata();
+    // Apply pipeline transformations
+    // Read from Kafka
+    pipeline.apply(kafka)
+        // Map to correct time format
+        .apply(MapElements.via(
+            new SimpleFunction<KV<String, ActivePowerRecord>, KV<HourOfDayKey, ActivePowerRecord>>() {
+              final ZoneId zone = ZoneId.of("Europe/Paris");
+
+              @Override
+              public KV<application.HourOfDayKey, ActivePowerRecord> apply(
+                  final KV<String, ActivePowerRecord> kv) {
+                final Instant instant = Instant.ofEpochMilli(kv.getValue().getTimestamp());
+                final LocalDateTime dateTime = LocalDateTime.ofInstant(instant, this.zone);
+                return KV.of(keyFactory.createKey(kv.getValue().getIdentifier(), dateTime),
+                    kv.getValue());
+              }
+            }))
+
+        // Apply a sliding window
+        .apply(Window
+            .<KV<HourOfDayKey, ActivePowerRecord>>into(SlidingWindows.of(duration).every(advance))
+            .triggering(AfterWatermark.pastEndOfWindow()
+                .withEarlyFirings(
+                    AfterProcessingTime.pastFirstElementInPane().plusDelayOf(triggerDelay)))
+            .withAllowedLateness(Duration.ZERO)
+            .accumulatingFiredPanes())
+
+        // Aggregate per window for every key
+        .apply(Combine.<HourOfDayKey, ActivePowerRecord, Stats>perKey(
+            new StatsAggregation()))
+        .setCoder(KvCoder.of(new HourOfDaykeyCoder(), SerializableCoder.of(Stats.class)))
+
+        // Map into correct output format
+        .apply(MapElements
+            .via(new SimpleFunction<KV<HourOfDayKey, Stats>, KV<String, String>>() {
+              @Override
+              public KV<String, String> apply(final KV<HourOfDayKey, Stats> kv) {
+                return KV.of(keyFactory.getSensorId(kv.getKey()), kv.getValue().toString());
+              }
+            }))
+        // Write to Kafka
+        .apply(KafkaIO.<String, String>write()
+            .withBootstrapServers(bootstrapServer)
+            .withTopic(outputTopic)
+            .withKeySerializer(StringSerializer.class)
+            .withValueSerializer(StringSerializer.class));
+
+
+    pipeline.run().waitUntilFinish();
+
+
+
+  }
+}
+
-- 
GitLab