Skip to content
Snippets Groups Projects
Commit e623b012 authored by Lorenz Boguhn's avatar Lorenz Boguhn
Browse files

Add generic db interface to hzjet uc1

parent 2f1e39ea
No related branches found
No related tags found
1 merge request!208Add benchmark implementations for Hazelcast Jet
Pipeline #7161 failed
......@@ -19,7 +19,6 @@ repositories {
dependencies {
implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true }
implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true }
implementation 'com.google.code.gson:gson:2.8.2'
implementation 'com.google.guava:guava:24.1-jre'
implementation 'org.slf4j:slf4j-api:1.7.25'
implementation 'io.confluent:kafka-avro-serializer:5.3.0'
......
......@@ -2,4 +2,8 @@ plugins {
id 'theodolite.hazelcastjet'
}
dependencies {
implementation project(':uc1-commons')
}
mainClassName = "rocks.theodolite.benchmarks.uc1.hazelcastjet.HistoryService"
package rocks.theodolite.benchmarks.uc1.hazelcastjet;
import com.google.gson.Gson;
import com.hazelcast.jet.kafka.KafkaSources;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sinks;
......@@ -8,6 +7,8 @@ import com.hazelcast.jet.pipeline.StreamSource;
import com.hazelcast.jet.pipeline.StreamStage;
import java.util.Map.Entry;
import java.util.Properties;
import rocks.theodolite.benchmarks.uc1.commons.DatabaseAdapter;
import rocks.theodolite.benchmarks.uc1.commons.logger.LogWriterFactory;
import titan.ccp.model.records.ActivePowerRecord;
/**
......@@ -16,7 +17,7 @@ import titan.ccp.model.records.ActivePowerRecord;
*/
public class Uc1PipelineBuilder {
private static final Gson GSON = new Gson();
private final DatabaseAdapter<String> databaseAdapter = LogWriterFactory.forJson();
/**
* Builds a pipeline which can be used for stream processing using Hazelcast Jet.
......@@ -58,14 +59,17 @@ public class Uc1PipelineBuilder {
*/
public StreamStage<String> extendUc1Topology(final Pipeline pipe,
final StreamSource<Entry<String, ActivePowerRecord>> source) {
// Build the pipeline topology
return pipe.readFrom(source)
.withNativeTimestamps(0)
.setLocalParallelism(1)
.setName("Log content")
.map(record -> {
return GSON.toJson(record);
});
.map(Entry::getValue)
.map(this.databaseAdapter.getRecordConverter()::convert);
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment