Skip to content
Snippets Groups Projects
Commit 584f5617 authored by Sören Henning's avatar Sören Henning
Browse files

Merge branch 'uc1-database-interface' into 'master'

Add generic database interface in UC1 to support different storage types

Closes #343

See merge request !238
parents 2f2a0eff 03a7f544
No related branches found
No related tags found
1 merge request!238Add generic database interface in UC1 to support different storage types
Pipeline #6593 failed
Showing
with 379 additions and 67 deletions
......@@ -24,7 +24,6 @@ dependencies {
// These dependencies are used internally, and not exposed to consumers on their own compile classpath.
implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true }
implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true }
implementation 'com.google.code.gson:gson:2.8.2'
implementation 'com.google.guava:guava:24.1-jre'
implementation 'org.slf4j:slf4j-simple:1.7.25'
implementation project(':beam-commons')
......
......@@ -41,7 +41,6 @@ dependencies {
implementation 'org.apache.kafka:kafka-clients:2.2.0'
implementation 'com.google.guava:guava:30.1-jre'
implementation 'com.google.code.gson:gson:2.8.2'
implementation 'org.slf4j:slf4j-simple:1.6.1'
implementation project(':flink-commons')
......
......@@ -23,7 +23,6 @@ dependencies {
implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true }
implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true }
implementation 'org.apache.kafka:kafka-streams:3.1.0'
implementation 'com.google.code.gson:gson:2.8.2'
implementation 'com.google.guava:guava:24.1-jre'
implementation 'org.slf4j:slf4j-simple:1.7.25'
implementation project(':kstreams-commons')
......
......@@ -5,32 +5,32 @@ include 'kstreams-commons'
include 'flink-commons'
include 'beam-commons'
include 'uc1-beam'
include 'uc2-beam'
include 'uc3-beam'
include 'uc4-beam'
include 'uc1-load-generator'
include 'uc1-commons'
include 'uc1-kstreams'
include 'uc1-flink'
include 'uc1-beam'
include 'uc1-beam-flink'
include 'uc1-beam-samza'
include 'uc2-load-generator'
include 'uc2-kstreams'
include 'uc2-flink'
include 'uc2-beam'
include 'uc2-beam-flink'
include 'uc2-beam-samza'
include 'uc3-load-generator'
include 'uc3-kstreams'
include 'uc3-flink'
include 'uc3-beam'
include 'uc3-beam-flink'
include 'uc3-beam-samza'
include 'uc4-load-generator'
include 'uc4-kstreams'
include 'uc4-flink'
include 'uc4-beam'
include 'uc4-beam-flink'
include 'uc4-beam-samza'
......
......@@ -2,4 +2,6 @@ plugins {
id 'theodolite.beam'
}
dependencies {
implementation project(':uc1-commons')
}
package application;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.TypeDescriptor;
import rocks.theodolite.benchmarks.uc1.commons.RecordConverter;
import titan.ccp.model.records.ActivePowerRecord;
/**
* {@link SimpleFunction} which wraps a {@link RecordConverter} to be used with Beam.
*
* @param <T> type the {@link RecordConverter} is associated with.
*/
public class ConverterAdapter<T> extends SimpleFunction<ActivePowerRecord, T> {
private static final long serialVersionUID = -5263671231838353747L; // NOPMD
private final RecordConverter<T> recordConverter;
private final TypeDescriptor<T> type;
/**
* Create a new {@link ConverterAdapter} with a given {@link RecordConverter} and the associated
* type.
*/
public ConverterAdapter(final RecordConverter<T> recordConverter, final Class<T> type) {
super();
this.recordConverter = recordConverter;
this.type = TypeDescriptor.of(type);
}
@Override
public T apply(final ActivePowerRecord record) {
return this.recordConverter.convert(record);
}
@Override
public TypeDescriptor<T> getOutputTypeDescriptor() {
return this.type;
}
}
package application;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.values.KV;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Logs all Key Value pairs.
*/
public class LogKeyValue extends DoFn<KV<String, String>, KV<String, String>> {
private static final long serialVersionUID = 4328743;
private static final Logger LOGGER = LoggerFactory.getLogger(LogKeyValue.class);
/**
* Logs all key value pairs it processes.
*/
@ProcessElement
public void processElement(@Element final KV<String, String> kv,
final OutputReceiver<KV<String, String>> out) {
LOGGER.info("Key: {}, Value: {}", kv.getKey(), kv.getValue());
out.output(kv);
}
}
package application;
import com.google.gson.Gson;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.KV;
import titan.ccp.model.records.ActivePowerRecord;
/**
* Converts a Map into a json String.
*/
public class MapToGson extends SimpleFunction<KV<String, ActivePowerRecord>, KV<String, String>> {
private static final long serialVersionUID = 7168356203579050214L;
private transient Gson gsonObj = new Gson();
@Override
public KV<String, String> apply(
final KV<String, ActivePowerRecord> kv) {
if (this.gsonObj == null) {
this.gsonObj = new Gson();
}
final String gson = this.gsonObj.toJson(kv.getValue());
return KV.of(kv.getKey(), gson);
}
}
......@@ -6,7 +6,10 @@ import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Values;
import org.apache.commons.configuration2.Configuration;
import rocks.theodolite.benchmarks.uc1.commons.DatabaseAdapter;
import rocks.theodolite.benchmarks.uc1.commons.logger.LogWriterFactory;
import theodolite.commons.beam.AbstractPipeline;
import theodolite.commons.beam.kafka.KafkaActivePowerTimestampReader;
import titan.ccp.model.records.ActivePowerRecord;
......@@ -22,6 +25,8 @@ import titan.ccp.model.records.ActivePowerRecord;
*/
public final class Uc1BeamPipeline extends AbstractPipeline {
private final DatabaseAdapter<String> databaseAdapter = LogWriterFactory.forJson();
protected Uc1BeamPipeline(final PipelineOptions options, final Configuration config) {
super(options, config);
......@@ -36,17 +41,14 @@ public final class Uc1BeamPipeline extends AbstractPipeline {
final KafkaActivePowerTimestampReader kafka =
new KafkaActivePowerTimestampReader(this.bootstrapServer, this.inputTopic, consumerConfig);
final LogKeyValue logKeyValue = new LogKeyValue();
final MapToGson mapToGson = new MapToGson();
// Apply pipeline transformations
// Read from Kafka
this.apply(kafka)
// Map to Gson
.apply(MapElements
.via(mapToGson))
// Print to console
.apply(ParDo.of(logKeyValue));
.apply(Values.create())
.apply(MapElements.via(new ConverterAdapter<>(
this.databaseAdapter.getRecordConverter(),
String.class)))
.apply(ParDo.of(new WriterAdapter<>(this.databaseAdapter.getDatabaseWriter())));
}
}
package application;
import org.apache.beam.sdk.transforms.DoFn;
import rocks.theodolite.benchmarks.uc1.commons.DatabaseAdapter;
import rocks.theodolite.benchmarks.uc1.commons.DatabaseWriter;
/**
* {@link DoFn} which wraps a {@link DatabaseAdapter} to be used with Beam.
*
* @param <T> type the {@link DatabaseWriter} is associated with.
*/
public class WriterAdapter<T> extends DoFn<T, Void> {
private static final long serialVersionUID = -5263671231838353742L; // NOPMD
private final DatabaseWriter<T> databaseWriter;
public WriterAdapter(final DatabaseWriter<T> databaseWriter) {
super();
this.databaseWriter = databaseWriter;
}
@ProcessElement
public void processElement(@Element final T record, final OutputReceiver<Void> out) {
this.databaseWriter.write(record);
}
}
cleanup.add_default_serial_version_id=true
cleanup.add_generated_serial_version_id=false
cleanup.add_missing_annotations=true
cleanup.add_missing_deprecated_annotations=true
cleanup.add_missing_methods=false
cleanup.add_missing_nls_tags=false
cleanup.add_missing_override_annotations=true
cleanup.add_missing_override_annotations_interface_methods=true
cleanup.add_serial_version_id=false
cleanup.always_use_blocks=true
cleanup.always_use_parentheses_in_expressions=false
cleanup.always_use_this_for_non_static_field_access=true
cleanup.always_use_this_for_non_static_method_access=true
cleanup.convert_functional_interfaces=false
cleanup.convert_to_enhanced_for_loop=true
cleanup.correct_indentation=true
cleanup.format_source_code=true
cleanup.format_source_code_changes_only=false
cleanup.insert_inferred_type_arguments=false
cleanup.make_local_variable_final=true
cleanup.make_parameters_final=true
cleanup.make_private_fields_final=true
cleanup.make_type_abstract_if_missing_method=false
cleanup.make_variable_declarations_final=true
cleanup.never_use_blocks=false
cleanup.never_use_parentheses_in_expressions=true
cleanup.organize_imports=true
cleanup.qualify_static_field_accesses_with_declaring_class=false
cleanup.qualify_static_member_accesses_through_instances_with_declaring_class=true
cleanup.qualify_static_member_accesses_through_subtypes_with_declaring_class=true
cleanup.qualify_static_member_accesses_with_declaring_class=true
cleanup.qualify_static_method_accesses_with_declaring_class=false
cleanup.remove_private_constructors=true
cleanup.remove_redundant_modifiers=false
cleanup.remove_redundant_semicolons=false
cleanup.remove_redundant_type_arguments=true
cleanup.remove_trailing_whitespaces=true
cleanup.remove_trailing_whitespaces_all=true
cleanup.remove_trailing_whitespaces_ignore_empty=false
cleanup.remove_unnecessary_casts=true
cleanup.remove_unnecessary_nls_tags=true
cleanup.remove_unused_imports=true
cleanup.remove_unused_local_variables=false
cleanup.remove_unused_private_fields=true
cleanup.remove_unused_private_members=false
cleanup.remove_unused_private_methods=true
cleanup.remove_unused_private_types=true
cleanup.sort_members=false
cleanup.sort_members_all=false
cleanup.use_anonymous_class_creation=false
cleanup.use_blocks=true
cleanup.use_blocks_only_for_return_and_throw=false
cleanup.use_lambda=true
cleanup.use_parentheses_in_expressions=true
cleanup.use_this_for_non_static_field_access=true
cleanup.use_this_for_non_static_field_access_only_if_necessary=false
cleanup.use_this_for_non_static_method_access=true
cleanup.use_this_for_non_static_method_access_only_if_necessary=false
cleanup_profile=_CAU-SE-Style
cleanup_settings_version=2
eclipse.preferences.version=1
editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true
formatter_profile=_CAU-SE-Style
formatter_settings_version=21
org.eclipse.jdt.ui.ignorelowercasenames=true
org.eclipse.jdt.ui.importorder=;
org.eclipse.jdt.ui.ondemandthreshold=99
org.eclipse.jdt.ui.staticondemandthreshold=99
sp_cleanup.add_default_serial_version_id=true
sp_cleanup.add_generated_serial_version_id=false
sp_cleanup.add_missing_annotations=true
sp_cleanup.add_missing_deprecated_annotations=true
sp_cleanup.add_missing_methods=false
sp_cleanup.add_missing_nls_tags=false
sp_cleanup.add_missing_override_annotations=true
sp_cleanup.add_missing_override_annotations_interface_methods=true
sp_cleanup.add_serial_version_id=false
sp_cleanup.always_use_blocks=true
sp_cleanup.always_use_parentheses_in_expressions=false
sp_cleanup.always_use_this_for_non_static_field_access=true
sp_cleanup.always_use_this_for_non_static_method_access=true
sp_cleanup.convert_functional_interfaces=false
sp_cleanup.convert_to_enhanced_for_loop=true
sp_cleanup.correct_indentation=true
sp_cleanup.format_source_code=true
sp_cleanup.format_source_code_changes_only=false
sp_cleanup.insert_inferred_type_arguments=false
sp_cleanup.make_local_variable_final=true
sp_cleanup.make_parameters_final=true
sp_cleanup.make_private_fields_final=true
sp_cleanup.make_type_abstract_if_missing_method=false
sp_cleanup.make_variable_declarations_final=true
sp_cleanup.never_use_blocks=false
sp_cleanup.never_use_parentheses_in_expressions=true
sp_cleanup.on_save_use_additional_actions=true
sp_cleanup.organize_imports=true
sp_cleanup.qualify_static_field_accesses_with_declaring_class=false
sp_cleanup.qualify_static_member_accesses_through_instances_with_declaring_class=true
sp_cleanup.qualify_static_member_accesses_through_subtypes_with_declaring_class=true
sp_cleanup.qualify_static_member_accesses_with_declaring_class=true
sp_cleanup.qualify_static_method_accesses_with_declaring_class=false
sp_cleanup.remove_private_constructors=true
sp_cleanup.remove_redundant_modifiers=false
sp_cleanup.remove_redundant_semicolons=false
sp_cleanup.remove_redundant_type_arguments=true
sp_cleanup.remove_trailing_whitespaces=true
sp_cleanup.remove_trailing_whitespaces_all=true
sp_cleanup.remove_trailing_whitespaces_ignore_empty=false
sp_cleanup.remove_unnecessary_casts=true
sp_cleanup.remove_unnecessary_nls_tags=true
sp_cleanup.remove_unused_imports=true
sp_cleanup.remove_unused_local_variables=false
sp_cleanup.remove_unused_private_fields=true
sp_cleanup.remove_unused_private_members=false
sp_cleanup.remove_unused_private_methods=true
sp_cleanup.remove_unused_private_types=true
sp_cleanup.sort_members=false
sp_cleanup.sort_members_all=false
sp_cleanup.use_anonymous_class_creation=false
sp_cleanup.use_blocks=true
sp_cleanup.use_blocks_only_for_return_and_throw=false
sp_cleanup.use_lambda=true
sp_cleanup.use_parentheses_in_expressions=true
sp_cleanup.use_this_for_non_static_field_access=true
sp_cleanup.use_this_for_non_static_field_access_only_if_necessary=false
sp_cleanup.use_this_for_non_static_method_access=true
sp_cleanup.use_this_for_non_static_method_access_only_if_necessary=false
configFilePath=../config/checkstyle.xml
customModulesJarPaths=
eclipse.preferences.version=1
enabled=true
customRulesJars=
eclipse.preferences.version=1
enabled=true
ruleSetFilePath=../config/pmd.xml
plugins {
id 'theodolite.java-commons'
}
repositories {
mavenCentral()
maven {
url "https://oss.sonatype.org/content/repositories/snapshots/"
}
maven {
url 'https://packages.confluent.io/maven/'
}
}
dependencies {
implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true }
implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true }
implementation 'com.google.code.gson:gson:2.8.9'
testImplementation 'junit:junit:4.12'
}
package rocks.theodolite.benchmarks.uc1.commons;
import java.util.Objects;
import titan.ccp.model.records.ActivePowerRecord;
/**
* A database adapter consisting of a {@link RecordConverter} and a {@link DatabaseWriter}.
*
* @param <T> intermediate data type written to the database.
*/
public final class DatabaseAdapter<T> {
private final RecordConverter<T> recordConverter;
private final DatabaseWriter<T> databaseWriter;
private DatabaseAdapter(final RecordConverter<T> recordConverter,
final DatabaseWriter<T> databaseWriter) {
this.recordConverter = recordConverter;
this.databaseWriter = databaseWriter;
}
public RecordConverter<T> getRecordConverter() {
return this.recordConverter;
}
public DatabaseWriter<T> getDatabaseWriter() {
return this.databaseWriter;
}
/**
* Create a new {@link DatabaseAdapter}.
*
* @param <T> intermediate data type written to the database.
* @param recordConverter RecordConverter for converting {@link ActivePowerRecord}s to {@code T}
* @param databaseWriter DatabaseWriter for writing converted records to the database.
* @return the {@link DatabaseAdapter}.
*/
public static <T> DatabaseAdapter<T> from(final RecordConverter<T> recordConverter,
final DatabaseWriter<T> databaseWriter) {
Objects.requireNonNull(recordConverter);
Objects.requireNonNull(databaseWriter);
return new DatabaseAdapter<>(recordConverter, databaseWriter);
}
}
package rocks.theodolite.benchmarks.uc1.commons;
/**
* Writes an object to a database.
*
* @param <T> Type expected by the database.
*/
@FunctionalInterface
public interface DatabaseWriter<T> {
void write(T record);
}
package rocks.theodolite.benchmarks.uc1.commons;
import titan.ccp.model.records.ActivePowerRecord;
/**
* Converts an {@link ActivePowerRecord} to the type required by a database.
*
* @param <T> Type required by the database.
*/
@FunctionalInterface
public interface RecordConverter<T> {
T convert(ActivePowerRecord record);
}
package theodolite.uc1.application;
package rocks.theodolite.benchmarks.uc1.commons.logger;
import com.google.gson.Gson;
import org.apache.flink.api.common.functions.MapFunction;
import java.io.Serializable;
import rocks.theodolite.benchmarks.uc1.commons.RecordConverter;
import titan.ccp.model.records.ActivePowerRecord;
/**
* {@link MapFunction} which maps {@link ActivePowerRecord}s to their representation as JSON
* strings.
* {@link RecordConverter} that converts {@link ActivePowerRecord}s to JSON strings.
*/
public class GsonMapper implements MapFunction<ActivePowerRecord, String> {
public class JsonConverter implements RecordConverter<String>, Serializable {
private static final long serialVersionUID = -5263671231838353747L; // NOPMD
private static final long serialVersionUID = -5263671231838353748L; // NOPMD
private static final Gson GSON = new Gson();
@Override
public String map(final ActivePowerRecord value) throws Exception {
return GSON.toJson(value);
public String convert(final ActivePowerRecord activePowerRecord) {
return GSON.toJson(activePowerRecord);
}
}
package rocks.theodolite.benchmarks.uc1.commons.logger;
import java.io.Serializable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rocks.theodolite.benchmarks.uc1.commons.DatabaseWriter;
/**
* Writes string records to a {@link Logger}.
*/
public class LogWriter implements DatabaseWriter<String>, Serializable {
private static final long serialVersionUID = -5263671231838353749L; // NOPMD
private static final Logger LOGGER = LoggerFactory.getLogger(LogWriter.class);
@Override
public void write(final String string) {
LOGGER.info("Record: {}", string);
}
}
package rocks.theodolite.benchmarks.uc1.commons.logger;
import org.slf4j.Logger;
import rocks.theodolite.benchmarks.uc1.commons.DatabaseAdapter;
/**
* Provides factory methods for creating a dummy {@link DatabaseAdapter} writing records as logs
* using a SLF4J {@link Logger}.
*/
public final class LogWriterFactory {
private LogWriterFactory() {}
public static DatabaseAdapter<String> forJson() {
return DatabaseAdapter.from(new JsonConverter(), new LogWriter());
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment