Skip to content
Snippets Groups Projects
Commit ef8d6e2e authored by Sören Henning's avatar Sören Henning
Browse files

Introduce generic interface for database storage

parent 37d9dc5f
No related branches found
No related tags found
1 merge request!245Firestore sink for UC1 Beam
Pipeline #6524 failed
......@@ -2,4 +2,8 @@ plugins {
id 'theodolite.flink'
}
dependencies {
implementation project(':uc1-commons')
}
mainClassName = "theodolite.uc1.application.HistoryServiceFlinkJob"
package theodolite.uc1.application;
import org.apache.flink.api.common.functions.MapFunction;
import rocks.theodolite.benchmarks.uc1.commons.RecordConverter;
import titan.ccp.model.records.ActivePowerRecord;
/**
* {@link MapFunction} which wraps a {@link RecordConverter} to be used with Flink.
*/
public class ConverterAdapter<T> implements MapFunction<ActivePowerRecord, T> {
private static final long serialVersionUID = -5263671231838353747L; // NOPMD
private final RecordConverter<T> recordConverter;
public ConverterAdapter(final RecordConverter<T> recordConverter) {
this.recordConverter = recordConverter;
}
@Override
public T map(final ActivePowerRecord record) throws Exception {
return this.recordConverter.convert(record);
}
}
......@@ -7,6 +7,8 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rocks.theodolite.benchmarks.uc1.commons.DatabaseAdapter;
import rocks.theodolite.benchmarks.uc1.commons.logger.LogWriterFactory;
import theodolite.commons.flink.KafkaConnectorFactory;
import titan.ccp.common.configuration.ServiceConfigurations;
import titan.ccp.model.records.ActivePowerRecord;
......@@ -22,6 +24,8 @@ public final class HistoryServiceFlinkJob {
private final StreamExecutionEnvironment env;
private final String applicationId;
private final DatabaseAdapter<String> databaseAdapter = LogWriterFactory.forJson();
/**
* Create a new instance of the {@link HistoryServiceFlinkJob}.
*/
......@@ -69,9 +73,10 @@ public final class HistoryServiceFlinkJob {
stream
// .rebalance()
.map(new GsonMapper())
.flatMap((record, c) -> LOGGER.info("Record: {}", record))
.returns(Types.GENERIC(Object.class)); // Will never be used
.map(new ConverterAdapter<>(this.databaseAdapter.getRecordConverter()))
.returns(Types.STRING)
.flatMap(new WriterAdapter<>(this.databaseAdapter.getDatabaseWriter()))
.returns(Types.VOID); // Will never be used
}
/**
......
package theodolite.uc1.application;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.util.Collector;
import rocks.theodolite.benchmarks.uc1.commons.DatabaseAdapter;
import rocks.theodolite.benchmarks.uc1.commons.DatabaseWriter;
/**
* {@link FlatMapFunction} which wraps a {@link DatabaseAdapter} to be used with Flink.
*/
public class WriterAdapter<T> implements FlatMapFunction<T, Void> {
private static final long serialVersionUID = -5263671231838353747L; // NOPMD
private final DatabaseWriter<T> databaseWriter;
public WriterAdapter(final DatabaseWriter<T> databaseWriter) {
this.databaseWriter = databaseWriter;
}
@Override
public void flatMap(final T value, final Collector<Void> out) throws Exception {
this.databaseWriter.write(value);
}
}
......@@ -2,4 +2,8 @@ plugins {
id 'theodolite.kstreams'
}
dependencies {
implementation project(':uc1-commons')
}
mainClassName = "theodolite.uc1.application.HistoryService"
package theodolite.uc1.streamprocessing;
import com.google.gson.Gson;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rocks.theodolite.benchmarks.uc1.commons.DatabaseAdapter;
import rocks.theodolite.benchmarks.uc1.commons.logger.LogWriterFactory;
import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory;
import titan.ccp.model.records.ActivePowerRecord;
......@@ -16,12 +15,11 @@ import titan.ccp.model.records.ActivePowerRecord;
*/
public class TopologyBuilder {
private static final Logger LOGGER = LoggerFactory.getLogger(TopologyBuilder.class);
private static final Gson GSON = new Gson();
private final String inputTopic;
private final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory;
private final DatabaseAdapter<String> databaseAdapter = LogWriterFactory.forJson();
private final StreamsBuilder builder = new StreamsBuilder();
......@@ -42,8 +40,8 @@ public class TopologyBuilder {
.stream(this.inputTopic, Consumed.with(
Serdes.String(),
this.srAvroSerdeFactory.<ActivePowerRecord>forValues()))
.mapValues(v -> GSON.toJson(v))
.foreach((k, record) -> LOGGER.info("Record: {}", record));
.mapValues(this.databaseAdapter.getRecordConverter()::convert)
.foreach((k, record) -> this.databaseAdapter.getDatabaseWriter().write(record));
return this.builder.build(properties);
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment