From 72cd11197edc55665621a9fc9d237f1ca7904a20 Mon Sep 17 00:00:00 2001
From: lorenz <stu203404@mail.uni-kiel.de>
Date: Wed, 25 May 2022 16:45:42 +0200
Subject: [PATCH] Extend commons-hazelcastjet infrastructure

---
 .../hazelcastjet/HazelcastJetService.java     | 42 ++++++++++------
 .../commons/hazelcastjet/PipelineFactory.java | 49 +++++++++++++++++--
 2 files changed, 71 insertions(+), 20 deletions(-)

diff --git a/theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/HazelcastJetService.java b/theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/HazelcastJetService.java
index 93bd6557f..cb39e99a7 100644
--- a/theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/HazelcastJetService.java
+++ b/theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/HazelcastJetService.java
@@ -5,37 +5,42 @@ import com.hazelcast.jet.config.JobConfig;
 import com.hazelcast.jet.pipeline.Pipeline;
 import org.apache.commons.configuration2.Configuration;
 import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import titan.ccp.common.configuration.ServiceConfigurations;
 
 public abstract class HazelcastJetService {
 
+  private static final Logger LOGGER = LoggerFactory.getLogger(HazelcastJetService.class);
+  private static final String HZ_KUBERNETES_SERVICE_DNS_KEY = "service-dns";
+
   protected final Configuration config = ServiceConfigurations.createWithDefaults();
   protected final String kafkaBootstrapServer;
   protected final String schemaRegistryUrl;
   protected final String jobName;
-  protected final String kafkaInputTopic;
 
-  private static final String HZ_KUBERNETES_SERVICE_DNS_KEY = "service-dns";
+  protected final String kafkaInputTopic;
 
   protected PipelineFactory pipelineFactory;
-  private final JobConfig jobConfig = new JobConfig();
+  protected final JobConfig jobConfig = new JobConfig();
+  protected final KafkaPropertiesBuilder propsBuilder;
 
-  private Pipeline pipeline;
-
-  JetInstance jetInstance;
-
-  protected final KafkaPropertiesBuilder propsBuilder = new KafkaPropertiesBuilder();
+  private final JetInstance jetInstance;
 
 
   /**
    * Instantiate a new abstract service.
+   * Retrieves needed fields using ServiceConfiguration and build a new jet instance.
    */
   public HazelcastJetService(final Logger logger) {
-    this.kafkaBootstrapServer = config.getProperty(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS).toString();
     this.jobName = config.getProperty(ConfigurationKeys.APPLICATION_NAME).toString();
+
+    this.kafkaBootstrapServer = config.getProperty(
+        ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS).toString();
     this.schemaRegistryUrl = config.getProperty(ConfigurationKeys.SCHEMA_REGISTRY_URL).toString();
-    this.kafkaInputTopic = config.getProperty(ConfigurationKeys.KAFKA_INPUT_TOPIC).toString();
+    this.propsBuilder =
+        new KafkaPropertiesBuilder(kafkaBootstrapServer, schemaRegistryUrl, jobName);
 
+    this.kafkaInputTopic = config.getProperty(ConfigurationKeys.KAFKA_INPUT_TOPIC).toString();
 
     final JetInstanceBuilder jetInstance = new JetInstanceBuilder()
         .setConfigFromEnv(logger, kafkaBootstrapServer, HZ_KUBERNETES_SERVICE_DNS_KEY);
@@ -44,21 +49,26 @@ public abstract class HazelcastJetService {
 
 
   /**
+   * Constructs and starts the pipeline.
    * First initiates a pipeline,
    * Second register the corresponding serializers,
    * Third set the job name,
-   * Lastly, Add the job to the hazelcast instance.
+   * Lastly, add the job to the hazelcast instance.
    */
   public void run() {
-    this.pipeline  = pipelineFactory.buildPipeline();
-    registerSerializer();
-    jobConfig.setName(config.getString("name"));
-    this.jetInstance.newJobIfAbsent(pipeline, jobConfig).join();
+    try {
+      final Pipeline pipeline  = pipelineFactory.buildPipeline();
+      registerSerializer();
+      jobConfig.setName(config.getString("name"));
+      this.jetInstance.newJobIfAbsent(pipeline, jobConfig).join();
+    } catch (final Exception e) { // NOPMD
+      LOGGER.error("ABORT MISSION!:", e);
+    }
   }
 
 
   /**
-   * Needs to be implemented to register the needed Serializer.
+   * Needs to be implemented by subclasses to register the needed Serializer.
    */
   protected abstract void registerSerializer();
 
diff --git a/theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/PipelineFactory.java b/theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/PipelineFactory.java
index d73d809b3..6f1b2d935 100644
--- a/theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/PipelineFactory.java
+++ b/theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/PipelineFactory.java
@@ -1,20 +1,61 @@
 package rocks.theodolite.benchmarks.commons.hazelcastjet;
 
 import com.hazelcast.jet.pipeline.Pipeline;
-import com.hazelcast.jet.pipeline.StreamStage;
+
 import java.util.Properties;
 
+/**
+ * Abstract class to handle the common logic for all pipelines.
+ * Implement {@link #buildPipeline()} method to implement the custom logic of the use case.
+ * Caution implement this with the help of an extendPipeline() method in order to
+ * be testable without further infrastructure.
+ * A template for this is construct the sources in {@link #buildPipeline()} and give them
+ * as parameters to extendTopology(...).
+ * Further implement the pipeline logic in the extendPipeline() method and return the last stage.
+ * Use the returned stage afterwards in the {@link #buildPipeline()} to write the results.
+ */
 public abstract class PipelineFactory {
 
-  final Pipeline pipe;
+  protected final Pipeline pipe;
+
+  protected Properties kafkaReadPropsForPipeline;
+  protected Properties kafkaWritePropsForPipeline;
+
+  protected String kafkaInputTopic;
+  protected String kafkaOutputTopic;
+
 
   public PipelineFactory() {
     this.pipe = Pipeline.create();
+  }
 
+  /**
+   * Constructs a pipeline factory with read properties and input topic.
+   * Directly used for Uc1.
+   */
+  public PipelineFactory(final Properties kafkaReadPropsForPipeline,
+                         final String kafkaInputTopic) {
+    this();
+    this.kafkaReadPropsForPipeline = kafkaReadPropsForPipeline;
+    this.kafkaInputTopic = kafkaInputTopic;
   }
 
-  public abstract Pipeline buildPipeline();
+  /**
+   * Constructs a pipeline factory with read/write properties and input/output topic.
+   */
+  public PipelineFactory(final Properties kafkaReadPropsForPipeline,
+                         final String kafkaInputTopic,
+                         final Properties kafkaWritePropsForPipeline,
+                         final String kafkaOutputTopic) {
+    this(kafkaReadPropsForPipeline, kafkaInputTopic);
+    this.kafkaWritePropsForPipeline = kafkaWritePropsForPipeline;
+    this.kafkaOutputTopic = kafkaOutputTopic;
+  }
 
-  public abstract StreamStage extendTopology();
+  /**
+   * Implement to construct the use case logic.
+   * @return pipeline that holds the use case logic.
+   */
+  public abstract Pipeline buildPipeline();
 
 }
-- 
GitLab