From 92555bcacda371e84b3b0cdca5c33a62a1dec076 Mon Sep 17 00:00:00 2001
From: lorenz <stu203404@mail.uni-kiel.de>
Date: Wed, 25 May 2022 16:49:36 +0200
Subject: [PATCH] Refactor new uc1-hazelcastjet structure

---
 .../uc1/hazelcastjet/NewHistoryService.java   | 30 +++++++---------
 .../uc1/hazelcastjet/Uc1PipelineFactory.java  | 35 +++++++------------
 2 files changed, 24 insertions(+), 41 deletions(-)

diff --git a/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc1/hazelcastjet/NewHistoryService.java b/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc1/hazelcastjet/NewHistoryService.java
index c0e25cf95..3f3502ff1 100644
--- a/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc1/hazelcastjet/NewHistoryService.java
+++ b/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc1/hazelcastjet/NewHistoryService.java
@@ -1,23 +1,27 @@
 package rocks.theodolite.benchmarks.uc1.hazelcastjet;
 
 import io.confluent.kafka.serializers.KafkaAvroDeserializer;
+import java.util.Properties;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import rocks.theodolite.benchmarks.commons.hazelcastjet.HazelcastJetService;
 
-import java.util.Properties;
-
+/**
+ * A microservice that records incoming measurements.
+ */
 public class NewHistoryService extends HazelcastJetService {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(HistoryService.class);
 
-  public NewHistoryService(final Logger logger) {
-    super(logger);
+  /**
+   * Constructs the use case logic for UC1.
+   * Retrieves the needed values and instantiates a pipeline factory.
+   */
+  public NewHistoryService() {
+    super(LOGGER);
     final Properties kafkaProps =
-        this.propsBuilder.buildKafkaInputReadPropsFromEnv(this.kafkaBootstrapServer,
-            schemaRegistryUrl,
-            jobName,
+        this.propsBuilder.buildReadProperties(
             StringDeserializer.class.getCanonicalName(),
             KafkaAvroDeserializer.class.getCanonicalName());
 
@@ -25,23 +29,13 @@ public class NewHistoryService extends HazelcastJetService {
 
   }
 
-
-  @Override
-  public void run() {
-    try {
-      super.run();
-    } catch (final Exception e) { // NOPMD
-      LOGGER.error("ABORT MISSION!: {}", e);
-    }
-  }
-
   @Override
   protected void registerSerializer() {
     // empty since we need no serializer in uc1
   }
 
   public static void main(final String[] args) {
-    new NewHistoryService(LOGGER).run();
+    new NewHistoryService().run();
   }
 
 
diff --git a/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc1/hazelcastjet/Uc1PipelineFactory.java b/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc1/hazelcastjet/Uc1PipelineFactory.java
index 5c4e97626..9fb1cc05a 100644
--- a/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc1/hazelcastjet/Uc1PipelineFactory.java
+++ b/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc1/hazelcastjet/Uc1PipelineFactory.java
@@ -1,35 +1,32 @@
 package rocks.theodolite.benchmarks.uc1.hazelcastjet;
 
+import static com.hazelcast.jet.pipeline.SinkBuilder.sinkBuilder;
+
 import com.hazelcast.jet.kafka.KafkaSources;
 import com.hazelcast.jet.pipeline.Pipeline;
 import com.hazelcast.jet.pipeline.Sink;
 import com.hazelcast.jet.pipeline.StreamSource;
 import com.hazelcast.jet.pipeline.StreamStage;
+import java.util.Map;
+import java.util.Properties;
 import rocks.theodolite.benchmarks.commons.hazelcastjet.PipelineFactory;
 import rocks.theodolite.benchmarks.uc1.commons.DatabaseAdapter;
 import rocks.theodolite.benchmarks.uc1.commons.DatabaseWriter;
 import rocks.theodolite.benchmarks.uc1.commons.logger.LogWriterFactory;
 import titan.ccp.model.records.ActivePowerRecord;
-import java.util.Map;
-import java.util.Properties;
-import static com.hazelcast.jet.pipeline.SinkBuilder.sinkBuilder;
 
 public class Uc1PipelineFactory extends PipelineFactory {
 
   private final DatabaseAdapter<String> databaseAdapter = LogWriterFactory.forJson();
 
-  private final Properties kafkaPropsForPipeline;
-  private final String kafkaInputTopic;
-
   /**
    * Creates a new Uc1PipelineFactory.
-   * @param kafkaPropsForPipeline Properties object containing the necessary Kafka attributes.
+   * @param kafkaReadPropsForPipeline Properties object containing the necessary Kafka attributes.
    * @param kafkaInputTopic The name of the input topic used for the pipeline.
    */
-  public Uc1PipelineFactory(final Properties kafkaPropsForPipeline, final String kafkaInputTopic) {
-    super();
-    this.kafkaPropsForPipeline = kafkaPropsForPipeline;
-    this.kafkaInputTopic = kafkaInputTopic;
+  public Uc1PipelineFactory(final Properties kafkaReadPropsForPipeline,
+                            final String kafkaInputTopic) {
+    super(kafkaReadPropsForPipeline,kafkaInputTopic);
   }
 
   /**
@@ -39,15 +36,12 @@ public class Uc1PipelineFactory extends PipelineFactory {
    */
   public Pipeline buildPipeline() {
 
-    // Define a new pipeline
-    final Pipeline pipe = Pipeline.create();
-
     // Define the Kafka Source
     final StreamSource<Map.Entry<String, ActivePowerRecord>> kafkaSource =
-        KafkaSources.<String, ActivePowerRecord>kafka(kafkaPropsForPipeline, kafkaInputTopic);
+        KafkaSources.<String, ActivePowerRecord>kafka(kafkaReadPropsForPipeline, kafkaInputTopic);
 
     // Extend UC1 topology to the pipeline
-    final StreamStage<String> uc1TopologyProduct = this.extendUc1Topology(pipe, kafkaSource);
+    final StreamStage<String> uc1TopologyProduct = this.extendUc1Topology(kafkaSource);
 
     // Add Sink: Logger
     // Do not refactor this to just use the call
@@ -63,10 +57,6 @@ public class Uc1PipelineFactory extends PipelineFactory {
     return pipe;
   }
 
-  @Override
-  public StreamStage extendTopology() {
-    return null;
-  }
 
   /**
    * Extends to a blank Hazelcast Jet Pipeline the UC1 topology defines by Theodolite.
@@ -76,13 +66,12 @@ public class Uc1PipelineFactory extends PipelineFactory {
    * using GSON.
    * </p>
    *
-   * @param pipe The blank hazelcast jet pipeline to extend the logic to.
    * @param source A streaming source to fetch data from.
    * @return A {@code StreamStage<String>} with the above definition of the String. It can be used
    *         to be further modified or directly be written into a sink.
    */
-  public StreamStage<String> extendUc1Topology(final Pipeline pipe,
-                                               final StreamSource<Map.Entry<String, ActivePowerRecord>> source) {
+  public StreamStage<String>
+      extendUc1Topology(final StreamSource<Map.Entry<String, ActivePowerRecord>> source) {
 
     // Build the pipeline topology
     return pipe.readFrom(source)
-- 
GitLab