From ab067c3568d4409f69262e0fcf19cb213e11b25f Mon Sep 17 00:00:00 2001
From: lorenz <stu203404@mail.uni-kiel.de>
Date: Tue, 22 Mar 2022 15:05:51 +0100
Subject: [PATCH] Adapt uc1 hzj to use DatabaseWriter

---
 .../groovy/theodolite.hazelcastjet.gradle     |  5 +-
 .../uc1-hazelcastjet/build.gradle             |  2 +
 .../uc1/hazelcastjet/Uc1PipelineBuilder.java  | 21 +++++---
 .../uc1/hazelcast/Uc1PipelineTest.java        | 54 +++++++++++++++++--
 4 files changed, 70 insertions(+), 12 deletions(-)

diff --git a/theodolite-benchmarks/buildSrc/src/main/groovy/theodolite.hazelcastjet.gradle b/theodolite-benchmarks/buildSrc/src/main/groovy/theodolite.hazelcastjet.gradle
index 5c600d77a..c4b4cef24 100644
--- a/theodolite-benchmarks/buildSrc/src/main/groovy/theodolite.hazelcastjet.gradle
+++ b/theodolite-benchmarks/buildSrc/src/main/groovy/theodolite.hazelcastjet.gradle
@@ -20,7 +20,10 @@ dependencies {
   implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true }
   implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true }
   implementation 'com.google.guava:guava:24.1-jre'
-  implementation 'org.slf4j:slf4j-api:1.7.25'
+  implementation 'org.slf4j:slf4j-api:1.7.30'
+  implementation 'org.slf4j:slf4j-jdk14:1.7.30'
+
+
   implementation 'io.confluent:kafka-avro-serializer:5.3.0'
 
   implementation 'com.hazelcast.jet:hazelcast-jet:4.5'
diff --git a/theodolite-benchmarks/uc1-hazelcastjet/build.gradle b/theodolite-benchmarks/uc1-hazelcastjet/build.gradle
index cac5ad9f6..7decf2531 100644
--- a/theodolite-benchmarks/uc1-hazelcastjet/build.gradle
+++ b/theodolite-benchmarks/uc1-hazelcastjet/build.gradle
@@ -4,6 +4,8 @@ plugins {
 
 dependencies {
     implementation project(':uc1-commons')
+
+    implementation 'org.slf4j:slf4j-jdk14:1.7.30'
 }
 
 mainClassName = "rocks.theodolite.benchmarks.uc1.hazelcastjet.HistoryService"
diff --git a/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc1/hazelcastjet/Uc1PipelineBuilder.java b/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc1/hazelcastjet/Uc1PipelineBuilder.java
index aaebe19ce..c02ea1e7e 100644
--- a/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc1/hazelcastjet/Uc1PipelineBuilder.java
+++ b/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc1/hazelcastjet/Uc1PipelineBuilder.java
@@ -1,13 +1,16 @@
 package rocks.theodolite.benchmarks.uc1.hazelcastjet;
 
+import static com.hazelcast.jet.pipeline.SinkBuilder.sinkBuilder;
+
 import com.hazelcast.jet.kafka.KafkaSources;
 import com.hazelcast.jet.pipeline.Pipeline;
-import com.hazelcast.jet.pipeline.Sinks;
+import com.hazelcast.jet.pipeline.Sink;
 import com.hazelcast.jet.pipeline.StreamSource;
 import com.hazelcast.jet.pipeline.StreamStage;
 import java.util.Map.Entry;
 import java.util.Properties;
 import rocks.theodolite.benchmarks.uc1.commons.DatabaseAdapter;
+import rocks.theodolite.benchmarks.uc1.commons.DatabaseWriter;
 import rocks.theodolite.benchmarks.uc1.commons.logger.LogWriterFactory;
 import titan.ccp.model.records.ActivePowerRecord;
 
@@ -39,7 +42,15 @@ public class Uc1PipelineBuilder {
     final StreamStage<String> uc1TopologyProduct = this.extendUc1Topology(pipe, kafkaSource);
 
     // Add Sink: Logger
-    uc1TopologyProduct.writeTo(Sinks.logger());
+    // Do not refactor this to just use the call
+    // (There is a problem with static calls in functions in hazelcastjet)
+    final DatabaseWriter<String> writer = this.databaseAdapter.getDatabaseWriter();
+    final Sink<String> sink = sinkBuilder(
+        "Sink into database", x -> writer)
+        .<String>receiveFn(DatabaseWriter::write)
+        .build();
+
+    uc1TopologyProduct.writeTo(sink);
 
     return pipe;
   }
@@ -64,12 +75,8 @@ public class Uc1PipelineBuilder {
     return pipe.readFrom(source)
         .withNativeTimestamps(0)
         .setLocalParallelism(1)
-        .setName("Log content")
+        .setName("Convert content")
         .map(Entry::getValue)
         .map(this.databaseAdapter.getRecordConverter()::convert);
-
   }
-
-
-
 }
diff --git a/theodolite-benchmarks/uc1-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc1/hazelcast/Uc1PipelineTest.java b/theodolite-benchmarks/uc1-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc1/hazelcast/Uc1PipelineTest.java
index 4457ce89a..525327ddb 100644
--- a/theodolite-benchmarks/uc1-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc1/hazelcast/Uc1PipelineTest.java
+++ b/theodolite-benchmarks/uc1-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc1/hazelcast/Uc1PipelineTest.java
@@ -5,6 +5,7 @@ import com.hazelcast.jet.JetInstance;
 import com.hazelcast.jet.config.JetConfig;
 import com.hazelcast.jet.core.JetTestSupport;
 import com.hazelcast.jet.pipeline.Pipeline;
+import com.hazelcast.jet.pipeline.Sink;
 import com.hazelcast.jet.pipeline.StreamSource;
 import com.hazelcast.jet.pipeline.StreamStage;
 import com.hazelcast.jet.pipeline.test.AssertionCompletedException;
@@ -14,14 +15,22 @@ import com.hazelcast.jet.test.SerialTest;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.CompletionException;
+import com.hazelcast.logging.ILogger;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import rocks.theodolite.benchmarks.uc1.commons.DatabaseAdapter;
+import rocks.theodolite.benchmarks.uc1.commons.DatabaseWriter;
+import rocks.theodolite.benchmarks.uc1.commons.logger.LogWriterFactory;
 import rocks.theodolite.benchmarks.uc1.hazelcastjet.Uc1PipelineBuilder;
 import titan.ccp.model.records.ActivePowerRecord;
 
+import static com.hazelcast.jet.pipeline.SinkBuilder.sinkBuilder;
+import static com.hazelcast.logging.Logger.getLogger;
+
 /**
  * Test methods for the Hazelcast Jet Implementation of UC1.
  */
@@ -32,13 +41,24 @@ public class Uc1PipelineTest extends JetTestSupport {
   private Pipeline testPipeline = null;
   private StreamStage<String> uc1Topology = null;
 
+  // Standart Logger
+  private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(Uc1PipelineTest.class);
+  // HazelcastJet Logger
+  private static final ILogger logger =  getLogger(Uc1PipelineTest.class);
+
+  private final DatabaseAdapter<String> databaseAdapter = LogWriterFactory.forJson();
+
   /**
-   * Creates the JetInstance, defines a new Hazelcast Jet Pipeline and extends the UC2 topology.
+   * Creates the JetInstance, defines a new Hazelcast Jet Pipeline and extends the UC1 topology.
    * Allows for quick extension of tests.
    */
   @Before
   public void buildUc1Pipeline() {
 
+    this.logger.info("Hazelcast Logger");
+    LOGGER.info("Standard Logger");
+
+
     // Setup Configuration
     final int testItemsPerSecond = 1;
     final String testSensorName = "TEST_SENSOR";
@@ -47,8 +67,10 @@ public class Uc1PipelineTest extends JetTestSupport {
     // Create mock jet instance with configuration
     final String testClusterName = randomName();
     final JetConfig testJetConfig = new JetConfig();
+//    testJetConfig.setProperty( "hazelcast.logging.type", "slf4j" );
     testJetConfig.getHazelcastConfig().setClusterName(testClusterName);
-    this.testInstance = this.createJetMember(testJetConfig);
+    this.testInstance = createJetMember(testJetConfig);
+
 
     // Create a test source
     final StreamSource<Entry<String, ActivePowerRecord>> testSource =
@@ -66,6 +88,22 @@ public class Uc1PipelineTest extends JetTestSupport {
     this.uc1Topology =
         pipelineBuilder.extendUc1Topology(this.testPipeline, testSource);
 
+    // Create DatabaseWriter sink
+    final DatabaseWriter<String> adapter = this.databaseAdapter.getDatabaseWriter();
+    final Sink<String> sink = sinkBuilder(
+        "database-sink", x -> adapter)
+        .<String>receiveFn(DatabaseWriter::write)
+        .build();
+
+//    Map Stage, can be used instead of sink
+//    StreamStage<String> log = uc1Topology.map(s -> {
+//        LOGGER.info(s);
+//        return s;
+//    });
+//    log.writeTo(sink);
+
+    //apply sink
+    this.uc1Topology.writeTo(sink);
   }
 
   /**
@@ -78,10 +116,18 @@ public class Uc1PipelineTest extends JetTestSupport {
     final int assertTimeoutSeconds = 6;
     final int assertCollectedItems = 5;
 
+    LOGGER.info("Pipeline build successfully, starting test");
+
     // Assertion
     this.uc1Topology.apply(Assertions.assertCollectedEventually(assertTimeoutSeconds,
-        collection -> Assert.assertTrue("Not enough data arrived in the end",
-            collection.size() >= assertCollectedItems)));
+        collection -> {
+      //print the newest Record
+//      LOGGER.info(collection.get(collection.size()-1));
+
+          // Run pipeline until 5th item
+          Assert.assertTrue("Not enough data arrived in the end",
+              collection.size() >= assertCollectedItems);
+        }));
 
     // Test the UC1 Pipeline Recreation
     try {
-- 
GitLab