From 93309ba8579634ac6c83b6845ff7bcd2eacaba62 Mon Sep 17 00:00:00 2001
From: lorenz <stu203404@mail.uni-kiel.de>
Date: Fri, 20 May 2022 15:39:07 +0200
Subject: [PATCH] Refactor hazelcast jet benchmarks:

Add HazelcastJetService for commons
Add PipelineFactory for commons
Add Uc1PipelineFactory
Add NewHistoryService for stepwise migration
---
 .../hazelcastjet/HazelcastJetService.java     | 65 +++++++++++++
 .../commons/hazelcastjet/PipelineFactory.java | 20 ++++
 .../uc1/hazelcastjet/NewHistoryService.java   | 48 ++++++++++
 .../uc1/hazelcastjet/Uc1PipelineFactory.java  | 95 +++++++++++++++++++
 4 files changed, 228 insertions(+)
 create mode 100644 theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/HazelcastJetService.java
 create mode 100644 theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/PipelineFactory.java
 create mode 100644 theodolite-benchmarks/uc1-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc1/hazelcastjet/NewHistoryService.java
 create mode 100644 theodolite-benchmarks/uc1-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc1/hazelcastjet/Uc1PipelineFactory.java

diff --git a/theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/HazelcastJetService.java b/theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/HazelcastJetService.java
new file mode 100644
index 000000000..93bd6557f
--- /dev/null
+++ b/theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/HazelcastJetService.java
@@ -0,0 +1,65 @@
+package rocks.theodolite.benchmarks.commons.hazelcastjet;
+
+import com.hazelcast.jet.JetInstance;
+import com.hazelcast.jet.config.JobConfig;
+import com.hazelcast.jet.pipeline.Pipeline;
+import org.apache.commons.configuration2.Configuration;
+import org.slf4j.Logger;
+import titan.ccp.common.configuration.ServiceConfigurations;
+
+public abstract class HazelcastJetService {
+
+  protected final Configuration config = ServiceConfigurations.createWithDefaults();
+  protected final String kafkaBootstrapServer;
+  protected final String schemaRegistryUrl;
+  protected final String jobName;
+  protected final String kafkaInputTopic;
+
+  private static final String HZ_KUBERNETES_SERVICE_DNS_KEY = "service-dns";
+
+  protected PipelineFactory pipelineFactory;
+  private final JobConfig jobConfig = new JobConfig();
+
+  private Pipeline pipeline;
+
+  JetInstance jetInstance;
+
+  protected final KafkaPropertiesBuilder propsBuilder = new KafkaPropertiesBuilder();
+
+
+  /**
+   * Instantiate a new abstract service.
+   */
+  public HazelcastJetService(final Logger logger) {
+    this.kafkaBootstrapServer = config.getProperty(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS).toString();
+    this.jobName = config.getProperty(ConfigurationKeys.APPLICATION_NAME).toString();
+    this.schemaRegistryUrl = config.getProperty(ConfigurationKeys.SCHEMA_REGISTRY_URL).toString();
+    this.kafkaInputTopic = config.getProperty(ConfigurationKeys.KAFKA_INPUT_TOPIC).toString();
+
+
+    final JetInstanceBuilder jetInstance = new JetInstanceBuilder()
+        .setConfigFromEnv(logger, kafkaBootstrapServer, HZ_KUBERNETES_SERVICE_DNS_KEY);
+    this.jetInstance = jetInstance.build();
+  }
+
+
+  /**
+   * First initiates a pipeline,
+   * Second register the corresponding serializers,
+   * Third set the job name,
+   * Lastly, Add the job to the hazelcast instance.
+   */
+  public void run() {
+    this.pipeline  = pipelineFactory.buildPipeline();
+    registerSerializer();
+    jobConfig.setName(config.getString("name"));
+    this.jetInstance.newJobIfAbsent(pipeline, jobConfig).join();
+  }
+
+
+  /**
+   * Needs to be implemented to register the needed Serializer.
+   */
+  protected abstract void registerSerializer();
+
+}
diff --git a/theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/PipelineFactory.java b/theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/PipelineFactory.java
new file mode 100644
index 000000000..d73d809b3
--- /dev/null
+++ b/theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/PipelineFactory.java
@@ -0,0 +1,20 @@
+package rocks.theodolite.benchmarks.commons.hazelcastjet;
+
+import com.hazelcast.jet.pipeline.Pipeline;
+import com.hazelcast.jet.pipeline.StreamStage;
+import java.util.Properties;
+
+public abstract class PipelineFactory {
+
+  final Pipeline pipe;
+
+  public PipelineFactory() {
+    this.pipe = Pipeline.create();
+
+  }
+
+  public abstract Pipeline buildPipeline();
+
+  public abstract StreamStage extendTopology();
+
+}
diff --git a/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc1/hazelcastjet/NewHistoryService.java b/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc1/hazelcastjet/NewHistoryService.java
new file mode 100644
index 000000000..c0e25cf95
--- /dev/null
+++ b/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc1/hazelcastjet/NewHistoryService.java
@@ -0,0 +1,48 @@
+package rocks.theodolite.benchmarks.uc1.hazelcastjet;
+
+import io.confluent.kafka.serializers.KafkaAvroDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import rocks.theodolite.benchmarks.commons.hazelcastjet.HazelcastJetService;
+
+import java.util.Properties;
+
+public class NewHistoryService extends HazelcastJetService {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(HistoryService.class);
+
+  public NewHistoryService(final Logger logger) {
+    super(logger);
+    final Properties kafkaProps =
+        this.propsBuilder.buildKafkaInputReadPropsFromEnv(this.kafkaBootstrapServer,
+            schemaRegistryUrl,
+            jobName,
+            StringDeserializer.class.getCanonicalName(),
+            KafkaAvroDeserializer.class.getCanonicalName());
+
+    this.pipelineFactory = new Uc1PipelineFactory(kafkaProps, this.kafkaInputTopic);
+
+  }
+
+
+  @Override
+  public void run() {
+    try {
+      super.run();
+    } catch (final Exception e) { // NOPMD
+      LOGGER.error("ABORT MISSION!: {}", e);
+    }
+  }
+
+  @Override
+  protected void registerSerializer() {
+    // empty since we need no serializer in uc1
+  }
+
+  public static void main(final String[] args) {
+    new NewHistoryService(LOGGER).run();
+  }
+
+
+}
diff --git a/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc1/hazelcastjet/Uc1PipelineFactory.java b/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc1/hazelcastjet/Uc1PipelineFactory.java
new file mode 100644
index 000000000..5c4e97626
--- /dev/null
+++ b/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc1/hazelcastjet/Uc1PipelineFactory.java
@@ -0,0 +1,95 @@
+package rocks.theodolite.benchmarks.uc1.hazelcastjet;
+
+import com.hazelcast.jet.kafka.KafkaSources;
+import com.hazelcast.jet.pipeline.Pipeline;
+import com.hazelcast.jet.pipeline.Sink;
+import com.hazelcast.jet.pipeline.StreamSource;
+import com.hazelcast.jet.pipeline.StreamStage;
+import rocks.theodolite.benchmarks.commons.hazelcastjet.PipelineFactory;
+import rocks.theodolite.benchmarks.uc1.commons.DatabaseAdapter;
+import rocks.theodolite.benchmarks.uc1.commons.DatabaseWriter;
+import rocks.theodolite.benchmarks.uc1.commons.logger.LogWriterFactory;
+import titan.ccp.model.records.ActivePowerRecord;
+import java.util.Map;
+import java.util.Properties;
+import static com.hazelcast.jet.pipeline.SinkBuilder.sinkBuilder;
+
+public class Uc1PipelineFactory extends PipelineFactory {
+
+  private final DatabaseAdapter<String> databaseAdapter = LogWriterFactory.forJson();
+
+  private final Properties kafkaPropsForPipeline;
+  private final String kafkaInputTopic;
+
+  /**
+   * Creates a new Uc1PipelineFactory.
+   * @param kafkaPropsForPipeline Properties object containing the necessary Kafka attributes.
+   * @param kafkaInputTopic The name of the input topic used for the pipeline.
+   */
+  public Uc1PipelineFactory(final Properties kafkaPropsForPipeline, final String kafkaInputTopic) {
+    super();
+    this.kafkaPropsForPipeline = kafkaPropsForPipeline;
+    this.kafkaInputTopic = kafkaInputTopic;
+  }
+
+  /**
+   * Builds a pipeline which can be used for stream processing using Hazelcast Jet.
+   *
+   * @return A Hazelcast Jet pipeline which processes data for Uc1.
+   */
+  public Pipeline buildPipeline() {
+
+    // Define a new pipeline
+    final Pipeline pipe = Pipeline.create();
+
+    // Define the Kafka Source
+    final StreamSource<Map.Entry<String, ActivePowerRecord>> kafkaSource =
+        KafkaSources.<String, ActivePowerRecord>kafka(kafkaPropsForPipeline, kafkaInputTopic);
+
+    // Extend UC1 topology to the pipeline
+    final StreamStage<String> uc1TopologyProduct = this.extendUc1Topology(pipe, kafkaSource);
+
+    // Add Sink: Logger
+    // Do not refactor this to just use the call
+    // (There is a problem with static calls in functions in hazelcastjet)
+    final DatabaseWriter<String> writer = this.databaseAdapter.getDatabaseWriter();
+    final Sink<String> sink = sinkBuilder(
+        "Sink into database", x -> writer)
+        .<String>receiveFn(DatabaseWriter::write)
+        .build();
+
+    uc1TopologyProduct.writeTo(sink);
+
+    return pipe;
+  }
+
+  @Override
+  public StreamStage extendTopology() {
+    return null;
+  }
+
+  /**
+   * Extends to a blank Hazelcast Jet Pipeline the UC1 topology defines by Theodolite.
+   *
+   * <p>
+   * UC1 takes {@code Entry<String,ActivePowerRecord>} objects and turns them into JSON strings
+   * using GSON.
+   * </p>
+   *
+   * @param pipe The blank hazelcast jet pipeline to extend the logic to.
+   * @param source A streaming source to fetch data from.
+   * @return A {@code StreamStage<String>} with the above definition of the String. It can be used
+   *         to be further modified or directly be written into a sink.
+   */
+  public StreamStage<String> extendUc1Topology(final Pipeline pipe,
+                                               final StreamSource<Map.Entry<String, ActivePowerRecord>> source) {
+
+    // Build the pipeline topology
+    return pipe.readFrom(source)
+        .withNativeTimestamps(0)
+        .setLocalParallelism(1)
+        .setName("Convert content")
+        .map(Map.Entry::getValue)
+        .map(this.databaseAdapter.getRecordConverter()::convert);
+  }
+}
-- 
GitLab