diff --git a/theodolite-benchmarks/buildSrc/src/main/groovy/theodolite.hazelcastjet.gradle b/theodolite-benchmarks/buildSrc/src/main/groovy/theodolite.hazelcastjet.gradle index 826d4c750e201bb699a4526f7b474a095b4c652a..5c600d77a9b9b13ee11c8f3e3020a10a80e7f31d 100644 --- a/theodolite-benchmarks/buildSrc/src/main/groovy/theodolite.hazelcastjet.gradle +++ b/theodolite-benchmarks/buildSrc/src/main/groovy/theodolite.hazelcastjet.gradle @@ -19,7 +19,6 @@ repositories { dependencies { implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true } implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true } - implementation 'com.google.code.gson:gson:2.8.2' implementation 'com.google.guava:guava:24.1-jre' implementation 'org.slf4j:slf4j-api:1.7.25' implementation 'io.confluent:kafka-avro-serializer:5.3.0' diff --git a/theodolite-benchmarks/uc1-hazelcastjet/build.gradle b/theodolite-benchmarks/uc1-hazelcastjet/build.gradle index 5d9df33f1a2cbc21acc3a18851e050dab7b3d58d..cac5ad9f6f12b62389236decbe75fbec01050071 100644 --- a/theodolite-benchmarks/uc1-hazelcastjet/build.gradle +++ b/theodolite-benchmarks/uc1-hazelcastjet/build.gradle @@ -2,4 +2,8 @@ plugins { id 'theodolite.hazelcastjet' } +dependencies { + implementation project(':uc1-commons') +} + mainClassName = "rocks.theodolite.benchmarks.uc1.hazelcastjet.HistoryService" diff --git a/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc1/hazelcastjet/Uc1PipelineBuilder.java b/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc1/hazelcastjet/Uc1PipelineBuilder.java index ba176de69a1b2853371db98307c503806cfeb063..aaebe19ce88b8d79430882126804cd95531758e4 100644 --- a/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc1/hazelcastjet/Uc1PipelineBuilder.java +++ b/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc1/hazelcastjet/Uc1PipelineBuilder.java @@ -1,6 +1,5 @@ package rocks.theodolite.benchmarks.uc1.hazelcastjet; -import com.google.gson.Gson; import com.hazelcast.jet.kafka.KafkaSources; import com.hazelcast.jet.pipeline.Pipeline; import com.hazelcast.jet.pipeline.Sinks; @@ -8,6 +7,8 @@ import com.hazelcast.jet.pipeline.StreamSource; import com.hazelcast.jet.pipeline.StreamStage; import java.util.Map.Entry; import java.util.Properties; +import rocks.theodolite.benchmarks.uc1.commons.DatabaseAdapter; +import rocks.theodolite.benchmarks.uc1.commons.logger.LogWriterFactory; import titan.ccp.model.records.ActivePowerRecord; /** @@ -16,7 +17,7 @@ import titan.ccp.model.records.ActivePowerRecord; */ public class Uc1PipelineBuilder { - private static final Gson GSON = new Gson(); + private final DatabaseAdapter<String> databaseAdapter = LogWriterFactory.forJson(); /** * Builds a pipeline which can be used for stream processing using Hazelcast Jet. @@ -58,14 +59,17 @@ public class Uc1PipelineBuilder { */ public StreamStage<String> extendUc1Topology(final Pipeline pipe, final StreamSource<Entry<String, ActivePowerRecord>> source) { + // Build the pipeline topology return pipe.readFrom(source) .withNativeTimestamps(0) .setLocalParallelism(1) .setName("Log content") - .map(record -> { - return GSON.toJson(record); - }); + .map(Entry::getValue) + .map(this.databaseAdapter.getRecordConverter()::convert); + } + + }