From e623b01239ec56801d9a8a401d42b391cfd08e73 Mon Sep 17 00:00:00 2001
From: lorenz <stu203404@mail.uni-kiel.de>
Date: Sat, 19 Mar 2022 15:45:39 +0100
Subject: [PATCH] Add generic db interface to hzjet uc1

---
 .../src/main/groovy/theodolite.hazelcastjet.gradle |  1 -
 .../uc1-hazelcastjet/build.gradle                  |  4 ++++
 .../uc1/hazelcastjet/Uc1PipelineBuilder.java       | 14 +++++++++-----
 3 files changed, 13 insertions(+), 6 deletions(-)

diff --git a/theodolite-benchmarks/buildSrc/src/main/groovy/theodolite.hazelcastjet.gradle b/theodolite-benchmarks/buildSrc/src/main/groovy/theodolite.hazelcastjet.gradle
index 826d4c750..5c600d77a 100644
--- a/theodolite-benchmarks/buildSrc/src/main/groovy/theodolite.hazelcastjet.gradle
+++ b/theodolite-benchmarks/buildSrc/src/main/groovy/theodolite.hazelcastjet.gradle
@@ -19,7 +19,6 @@ repositories {
 dependencies {
   implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true }
   implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true }
-  implementation 'com.google.code.gson:gson:2.8.2'
   implementation 'com.google.guava:guava:24.1-jre'
   implementation 'org.slf4j:slf4j-api:1.7.25'
   implementation 'io.confluent:kafka-avro-serializer:5.3.0'
diff --git a/theodolite-benchmarks/uc1-hazelcastjet/build.gradle b/theodolite-benchmarks/uc1-hazelcastjet/build.gradle
index 5d9df33f1..cac5ad9f6 100644
--- a/theodolite-benchmarks/uc1-hazelcastjet/build.gradle
+++ b/theodolite-benchmarks/uc1-hazelcastjet/build.gradle
@@ -2,4 +2,8 @@ plugins {
   id 'theodolite.hazelcastjet'
 }
 
+dependencies {
+    implementation project(':uc1-commons')
+}
+
 mainClassName = "rocks.theodolite.benchmarks.uc1.hazelcastjet.HistoryService"
diff --git a/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc1/hazelcastjet/Uc1PipelineBuilder.java b/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc1/hazelcastjet/Uc1PipelineBuilder.java
index ba176de69..aaebe19ce 100644
--- a/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc1/hazelcastjet/Uc1PipelineBuilder.java
+++ b/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc1/hazelcastjet/Uc1PipelineBuilder.java
@@ -1,6 +1,5 @@
 package rocks.theodolite.benchmarks.uc1.hazelcastjet;
 
-import com.google.gson.Gson;
 import com.hazelcast.jet.kafka.KafkaSources;
 import com.hazelcast.jet.pipeline.Pipeline;
 import com.hazelcast.jet.pipeline.Sinks;
@@ -8,6 +7,8 @@ import com.hazelcast.jet.pipeline.StreamSource;
 import com.hazelcast.jet.pipeline.StreamStage;
 import java.util.Map.Entry;
 import java.util.Properties;
+import rocks.theodolite.benchmarks.uc1.commons.DatabaseAdapter;
+import rocks.theodolite.benchmarks.uc1.commons.logger.LogWriterFactory;
 import titan.ccp.model.records.ActivePowerRecord;
 
 /**
@@ -16,7 +17,7 @@ import titan.ccp.model.records.ActivePowerRecord;
  */
 public class Uc1PipelineBuilder {
 
-  private static final Gson GSON = new Gson();
+  private final DatabaseAdapter<String> databaseAdapter = LogWriterFactory.forJson();
 
   /**
    * Builds a pipeline which can be used for stream processing using Hazelcast Jet.
@@ -58,14 +59,17 @@ public class Uc1PipelineBuilder {
    */
   public StreamStage<String> extendUc1Topology(final Pipeline pipe,
       final StreamSource<Entry<String, ActivePowerRecord>> source) {
+
     // Build the pipeline topology
     return pipe.readFrom(source)
         .withNativeTimestamps(0)
         .setLocalParallelism(1)
         .setName("Log content")
-        .map(record -> {
-          return GSON.toJson(record);
-        });
+        .map(Entry::getValue)
+        .map(this.databaseAdapter.getRecordConverter()::convert);
+
   }
 
+
+
 }
-- 
GitLab