Skip to content
Snippets Groups Projects
Commit 1bc1b3f4 authored by Simon Ehrenstein's avatar Simon Ehrenstein
Browse files

Merge master and version fic

parents 03bf64ff 584f5617
No related branches found
No related tags found
1 merge request!243Migrate to Strimzi Kafka
Showing
with 154 additions and 11 deletions
package rocks.theodolite.benchmarks.uc1.commons;
import titan.ccp.model.records.ActivePowerRecord;
/**
* Converts an {@link ActivePowerRecord} to the type required by a database.
*
* @param <T> Type required by the database.
*/
@FunctionalInterface
public interface RecordConverter<T> {
T convert(ActivePowerRecord record);
}
package theodolite.uc1.application;
package rocks.theodolite.benchmarks.uc1.commons.logger;
import com.google.gson.Gson;
import org.apache.flink.api.common.functions.MapFunction;
import java.io.Serializable;
import rocks.theodolite.benchmarks.uc1.commons.RecordConverter;
import titan.ccp.model.records.ActivePowerRecord;
/**
* {@link MapFunction} which maps {@link ActivePowerRecord}s to their representation as JSON
* strings.
* {@link RecordConverter} that converts {@link ActivePowerRecord}s to JSON strings.
*/
public class GsonMapper implements MapFunction<ActivePowerRecord, String> {
public class JsonConverter implements RecordConverter<String>, Serializable {
private static final long serialVersionUID = -5263671231838353747L; // NOPMD
private static final long serialVersionUID = -5263671231838353748L; // NOPMD
private static final Gson GSON = new Gson();
@Override
public String map(final ActivePowerRecord value) throws Exception {
return GSON.toJson(value);
public String convert(final ActivePowerRecord activePowerRecord) {
return GSON.toJson(activePowerRecord);
}
}
package rocks.theodolite.benchmarks.uc1.commons.logger;
import java.io.Serializable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rocks.theodolite.benchmarks.uc1.commons.DatabaseWriter;
/**
* Writes string records to a {@link Logger}.
*/
public class LogWriter implements DatabaseWriter<String>, Serializable {
private static final long serialVersionUID = -5263671231838353749L; // NOPMD
private static final Logger LOGGER = LoggerFactory.getLogger(LogWriter.class);
@Override
public void write(final String string) {
LOGGER.info("Record: {}", string);
}
}
package rocks.theodolite.benchmarks.uc1.commons.logger;
import org.slf4j.Logger;
import rocks.theodolite.benchmarks.uc1.commons.DatabaseAdapter;
/**
* Provides factory methods for creating a dummy {@link DatabaseAdapter} writing records as logs
* using a SLF4J {@link Logger}.
*/
public final class LogWriterFactory {
private LogWriterFactory() {}
public static DatabaseAdapter<String> forJson() {
return DatabaseAdapter.from(new JsonConverter(), new LogWriter());
}
}
......@@ -2,4 +2,8 @@ plugins {
id 'theodolite.flink'
}
dependencies {
implementation project(':uc1-commons')
}
mainClassName = "theodolite.uc1.application.HistoryServiceFlinkJob"
package theodolite.uc1.application;
import org.apache.flink.api.common.functions.MapFunction;
import rocks.theodolite.benchmarks.uc1.commons.RecordConverter;
import titan.ccp.model.records.ActivePowerRecord;
/**
* {@link MapFunction} which wraps a {@link RecordConverter} to be used with Flink.
*
* @param <T> type the {@link RecordConverter} is associated with.
*/
public class ConverterAdapter<T> implements MapFunction<ActivePowerRecord, T> {
private static final long serialVersionUID = -5263671231838353747L; // NOPMD
private final RecordConverter<T> recordConverter;
public ConverterAdapter(final RecordConverter<T> recordConverter) {
this.recordConverter = recordConverter;
}
@Override
public T map(final ActivePowerRecord record) throws Exception {
return this.recordConverter.convert(record);
}
}
......@@ -7,6 +7,8 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rocks.theodolite.benchmarks.uc1.commons.DatabaseAdapter;
import rocks.theodolite.benchmarks.uc1.commons.logger.LogWriterFactory;
import theodolite.commons.flink.KafkaConnectorFactory;
import titan.ccp.common.configuration.ServiceConfigurations;
import titan.ccp.model.records.ActivePowerRecord;
......@@ -22,6 +24,8 @@ public final class HistoryServiceFlinkJob {
private final StreamExecutionEnvironment env;
private final String applicationId;
private final DatabaseAdapter<String> databaseAdapter = LogWriterFactory.forJson();
/**
* Create a new instance of the {@link HistoryServiceFlinkJob}.
*/
......@@ -69,9 +73,10 @@ public final class HistoryServiceFlinkJob {
stream
// .rebalance()
.map(new GsonMapper())
.flatMap((record, c) -> LOGGER.info("Record: {}", record))
.returns(Types.GENERIC(Object.class)); // Will never be used
.map(new ConverterAdapter<>(this.databaseAdapter.getRecordConverter()))
.returns(Types.STRING)
.flatMap(new WriterAdapter<>(this.databaseAdapter.getDatabaseWriter()))
.returns(Types.VOID); // Will never be used
}
/**
......
package theodolite.uc1.application;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.util.Collector;
import rocks.theodolite.benchmarks.uc1.commons.DatabaseAdapter;
import rocks.theodolite.benchmarks.uc1.commons.DatabaseWriter;
/**
* {@link FlatMapFunction} which wraps a {@link DatabaseAdapter} to be used with Flink.
*
* @param <T> type the {@link DatabaseWriter} is associated with.
*/
public class WriterAdapter<T> implements FlatMapFunction<T, Void> {
private static final long serialVersionUID = -5263671231838353747L; // NOPMD
private final DatabaseWriter<T> databaseWriter;
public WriterAdapter(final DatabaseWriter<T> databaseWriter) {
this.databaseWriter = databaseWriter;
}
@Override
public void flatMap(final T value, final Collector<Void> out) throws Exception {
this.databaseWriter.write(value);
}
}
......@@ -2,4 +2,8 @@ plugins {
id 'theodolite.kstreams'
}
dependencies {
implementation project(':uc1-commons')
}
mainClassName = "theodolite.uc1.application.HistoryService"
package theodolite.uc1.streamprocessing;
import com.google.gson.Gson;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rocks.theodolite.benchmarks.uc1.commons.DatabaseAdapter;
import rocks.theodolite.benchmarks.uc1.commons.logger.LogWriterFactory;
import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory;
import titan.ccp.model.records.ActivePowerRecord;
......@@ -16,12 +15,11 @@ import titan.ccp.model.records.ActivePowerRecord;
*/
public class TopologyBuilder {
private static final Logger LOGGER = LoggerFactory.getLogger(TopologyBuilder.class);
private static final Gson GSON = new Gson();
private final String inputTopic;
private final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory;
private final DatabaseAdapter<String> databaseAdapter = LogWriterFactory.forJson();
private final StreamsBuilder builder = new StreamsBuilder();
......@@ -42,8 +40,8 @@ public class TopologyBuilder {
.stream(this.inputTopic, Consumed.with(
Serdes.String(),
this.srAvroSerdeFactory.<ActivePowerRecord>forValues()))
.mapValues(v -> GSON.toJson(v))
.foreach((k, record) -> LOGGER.info("Record: {}", record));
.mapValues(this.databaseAdapter.getRecordConverter()::convert)
.foreach((k, record) -> this.databaseAdapter.getDatabaseWriter().write(record));
return this.builder.build(properties);
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment