From ef8d6e2eb5d96821f0310b859bb078ae8deeca8a Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?S=C3=B6ren=20Henning?= <soeren.henning@email.uni-kiel.de>
Date: Fri, 11 Feb 2022 20:23:08 +0100
Subject: [PATCH] Introduce generic interface for database storage

---
 .../src/main/groovy/theodolite.beam.gradle    |   1 -
 .../src/main/groovy/theodolite.flink.gradle   |   1 -
 .../main/groovy/theodolite.kstreams.gradle    |   1 -
 theodolite-benchmarks/settings.gradle         |  10 +-
 theodolite-benchmarks/uc1-beam/build.gradle   |   4 +-
 .../java/application/ConverterAdapter.java    |  33 +++++
 .../main/java/application/LogKeyValue.java    |  24 ----
 .../src/main/java/application/MapToGson.java  |  26 ----
 .../java/application/Uc1BeamPipeline.java     |  18 +--
 .../main/java/application/WriterAdapter.java  |  25 ++++
 .../.settings/org.eclipse.jdt.ui.prefs        | 127 ++++++++++++++++++
 .../qa.eclipse.plugin.checkstyle.prefs        |   4 +
 .../.settings/qa.eclipse.plugin.pmd.prefs     |   4 +
 .../uc1-commons/build.gradle                  |  22 +++
 .../uc1/commons/DatabaseAdapter.java          |  46 +++++++
 .../uc1/commons/DatabaseWriter.java           |  13 ++
 .../uc1/commons/RecordConverter.java          |  15 +++
 .../uc1/commons/logger/JsonConverter.java     |  22 +++
 .../uc1/commons/logger/LogWriter.java         |  22 +++
 .../uc1/commons/logger/LogWriterFactory.java  |  18 +++
 theodolite-benchmarks/uc1-flink/build.gradle  |   4 +
 .../uc1/application/ConverterAdapter.java     |  25 ++++
 .../uc1/application/GsonMapper.java           |  22 ---
 .../application/HistoryServiceFlinkJob.java   |  11 +-
 .../uc1/application/WriterAdapter.java        |  26 ++++
 .../uc1-kstreams/build.gradle                 |   4 +
 .../uc1/streamprocessing/TopologyBuilder.java |  14 +-
 27 files changed, 442 insertions(+), 100 deletions(-)
 create mode 100644 theodolite-benchmarks/uc1-beam/src/main/java/application/ConverterAdapter.java
 delete mode 100644 theodolite-benchmarks/uc1-beam/src/main/java/application/LogKeyValue.java
 delete mode 100644 theodolite-benchmarks/uc1-beam/src/main/java/application/MapToGson.java
 create mode 100644 theodolite-benchmarks/uc1-beam/src/main/java/application/WriterAdapter.java
 create mode 100644 theodolite-benchmarks/uc1-commons/.settings/org.eclipse.jdt.ui.prefs
 create mode 100644 theodolite-benchmarks/uc1-commons/.settings/qa.eclipse.plugin.checkstyle.prefs
 create mode 100644 theodolite-benchmarks/uc1-commons/.settings/qa.eclipse.plugin.pmd.prefs
 create mode 100644 theodolite-benchmarks/uc1-commons/build.gradle
 create mode 100644 theodolite-benchmarks/uc1-commons/src/main/java/rocks/theodolite/benchmarks/uc1/commons/DatabaseAdapter.java
 create mode 100644 theodolite-benchmarks/uc1-commons/src/main/java/rocks/theodolite/benchmarks/uc1/commons/DatabaseWriter.java
 create mode 100644 theodolite-benchmarks/uc1-commons/src/main/java/rocks/theodolite/benchmarks/uc1/commons/RecordConverter.java
 create mode 100644 theodolite-benchmarks/uc1-commons/src/main/java/rocks/theodolite/benchmarks/uc1/commons/logger/JsonConverter.java
 create mode 100644 theodolite-benchmarks/uc1-commons/src/main/java/rocks/theodolite/benchmarks/uc1/commons/logger/LogWriter.java
 create mode 100644 theodolite-benchmarks/uc1-commons/src/main/java/rocks/theodolite/benchmarks/uc1/commons/logger/LogWriterFactory.java
 create mode 100644 theodolite-benchmarks/uc1-flink/src/main/java/theodolite/uc1/application/ConverterAdapter.java
 delete mode 100644 theodolite-benchmarks/uc1-flink/src/main/java/theodolite/uc1/application/GsonMapper.java
 create mode 100644 theodolite-benchmarks/uc1-flink/src/main/java/theodolite/uc1/application/WriterAdapter.java

diff --git a/theodolite-benchmarks/buildSrc/src/main/groovy/theodolite.beam.gradle b/theodolite-benchmarks/buildSrc/src/main/groovy/theodolite.beam.gradle
index 4611062f1..5849bd932 100644
--- a/theodolite-benchmarks/buildSrc/src/main/groovy/theodolite.beam.gradle
+++ b/theodolite-benchmarks/buildSrc/src/main/groovy/theodolite.beam.gradle
@@ -24,7 +24,6 @@ dependencies {
     // These dependencies are used internally, and not exposed to consumers on their own compile classpath.
     implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true }
     implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true }
-    implementation 'com.google.code.gson:gson:2.8.2'
     implementation 'com.google.guava:guava:24.1-jre'
     implementation 'org.slf4j:slf4j-simple:1.7.25'
     implementation project(':beam-commons')
diff --git a/theodolite-benchmarks/buildSrc/src/main/groovy/theodolite.flink.gradle b/theodolite-benchmarks/buildSrc/src/main/groovy/theodolite.flink.gradle
index 258d1a82d..7671e6022 100644
--- a/theodolite-benchmarks/buildSrc/src/main/groovy/theodolite.flink.gradle
+++ b/theodolite-benchmarks/buildSrc/src/main/groovy/theodolite.flink.gradle
@@ -41,7 +41,6 @@ dependencies {
 
     implementation 'org.apache.kafka:kafka-clients:2.2.0'
     implementation 'com.google.guava:guava:30.1-jre'
-    implementation 'com.google.code.gson:gson:2.8.2'
     implementation 'org.slf4j:slf4j-simple:1.6.1'
     implementation project(':flink-commons')
 
diff --git a/theodolite-benchmarks/buildSrc/src/main/groovy/theodolite.kstreams.gradle b/theodolite-benchmarks/buildSrc/src/main/groovy/theodolite.kstreams.gradle
index 112ac6627..bf533915a 100644
--- a/theodolite-benchmarks/buildSrc/src/main/groovy/theodolite.kstreams.gradle
+++ b/theodolite-benchmarks/buildSrc/src/main/groovy/theodolite.kstreams.gradle
@@ -23,7 +23,6 @@ dependencies {
     implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true }
     implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true }
     implementation 'org.apache.kafka:kafka-streams:3.1.0'
-    implementation 'com.google.code.gson:gson:2.8.2'
     implementation 'com.google.guava:guava:24.1-jre'
     implementation 'org.slf4j:slf4j-simple:1.7.25'
     implementation project(':kstreams-commons')
diff --git a/theodolite-benchmarks/settings.gradle b/theodolite-benchmarks/settings.gradle
index 2f6bd4b74..776e7d8e4 100644
--- a/theodolite-benchmarks/settings.gradle
+++ b/theodolite-benchmarks/settings.gradle
@@ -5,32 +5,32 @@ include 'kstreams-commons'
 include 'flink-commons'
 include 'beam-commons'
 
-include 'uc1-beam'
-include 'uc2-beam'
-include 'uc3-beam'
-include 'uc4-beam'
-
 include 'uc1-load-generator'
+include 'uc1-commons'
 include 'uc1-kstreams'
 include 'uc1-flink'
+include 'uc1-beam'
 include 'uc1-beam-flink'
 include 'uc1-beam-samza'
 
 include 'uc2-load-generator'
 include 'uc2-kstreams'
 include 'uc2-flink'
+include 'uc2-beam'
 include 'uc2-beam-flink'
 include 'uc2-beam-samza'
 
 include 'uc3-load-generator'
 include 'uc3-kstreams'
 include 'uc3-flink'
+include 'uc3-beam'
 include 'uc3-beam-flink'
 include 'uc3-beam-samza'
 
 include 'uc4-load-generator'
 include 'uc4-kstreams'
 include 'uc4-flink'
+include 'uc4-beam'
 include 'uc4-beam-flink'
 include 'uc4-beam-samza'
 
diff --git a/theodolite-benchmarks/uc1-beam/build.gradle b/theodolite-benchmarks/uc1-beam/build.gradle
index 502e94fa7..ae80dcd8c 100644
--- a/theodolite-benchmarks/uc1-beam/build.gradle
+++ b/theodolite-benchmarks/uc1-beam/build.gradle
@@ -2,4 +2,6 @@ plugins {
   id 'theodolite.beam'
 }
 
-
+dependencies {
+    implementation project(':uc1-commons')
+}
diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/ConverterAdapter.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/ConverterAdapter.java
new file mode 100644
index 000000000..b9d747fe7
--- /dev/null
+++ b/theodolite-benchmarks/uc1-beam/src/main/java/application/ConverterAdapter.java
@@ -0,0 +1,33 @@
+package application;
+
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import rocks.theodolite.benchmarks.uc1.commons.RecordConverter;
+import titan.ccp.model.records.ActivePowerRecord;
+
+/**
+ * {@link SimpleFunction} which wraps a {@link RecordConverter} to be used with Beam.
+ */
+public class ConverterAdapter<T> extends SimpleFunction<ActivePowerRecord, T> {
+
+  private static final long serialVersionUID = -5263671231838353747L; // NOPMD
+
+  private final RecordConverter<T> recordConverter;
+  private final TypeDescriptor<T> type;
+
+  public ConverterAdapter(final RecordConverter<T> recordConverter, Class<T> type) {
+    this.recordConverter = recordConverter;
+    this.type = TypeDescriptor.of(type);
+  }
+
+  @Override
+  public T apply(final ActivePowerRecord record) {
+    return this.recordConverter.convert(record);
+  }
+
+  @Override
+  public TypeDescriptor<T> getOutputTypeDescriptor() {
+    return this.type;
+  }
+
+}
diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/LogKeyValue.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/LogKeyValue.java
deleted file mode 100644
index 251523441..000000000
--- a/theodolite-benchmarks/uc1-beam/src/main/java/application/LogKeyValue.java
+++ /dev/null
@@ -1,24 +0,0 @@
-package application;
-
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.values.KV;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Logs all Key Value pairs.
- */
-public class LogKeyValue extends DoFn<KV<String, String>, KV<String, String>> {
-  private static final long serialVersionUID = 4328743;
-  private static final Logger LOGGER = LoggerFactory.getLogger(LogKeyValue.class);
-
-  /**
-   * Logs all key value pairs it processes.
-   */
-  @ProcessElement
-  public void processElement(@Element final KV<String, String> kv,
-      final OutputReceiver<KV<String, String>> out) {
-    LOGGER.info("Key: {}, Value: {}", kv.getKey(), kv.getValue());
-    out.output(kv);
-  }
-}
diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/MapToGson.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/MapToGson.java
deleted file mode 100644
index 6b0c6bc4d..000000000
--- a/theodolite-benchmarks/uc1-beam/src/main/java/application/MapToGson.java
+++ /dev/null
@@ -1,26 +0,0 @@
-package application;
-
-import com.google.gson.Gson;
-import org.apache.beam.sdk.transforms.SimpleFunction;
-import org.apache.beam.sdk.values.KV;
-import titan.ccp.model.records.ActivePowerRecord;
-
-/**
- * Converts a Map into a json String.
- */
-public class MapToGson extends SimpleFunction<KV<String, ActivePowerRecord>, KV<String, String>> {
-  private static final long serialVersionUID = 7168356203579050214L;
-  private transient Gson gsonObj = new Gson();
-
-  @Override
-  public KV<String, String> apply(
-      final KV<String, ActivePowerRecord> kv) {
-
-    if (this.gsonObj == null) {
-      this.gsonObj = new Gson();
-    }
-
-    final String gson = this.gsonObj.toJson(kv.getValue());
-    return KV.of(kv.getKey(), gson);
-  }
-}
diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java
index eaff08ac7..dde2bc064 100644
--- a/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java
+++ b/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java
@@ -6,7 +6,10 @@ import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Values;
 import org.apache.commons.configuration2.Configuration;
+import rocks.theodolite.benchmarks.uc1.commons.DatabaseAdapter;
+import rocks.theodolite.benchmarks.uc1.commons.logger.LogWriterFactory;
 import theodolite.commons.beam.AbstractPipeline;
 import theodolite.commons.beam.kafka.KafkaActivePowerTimestampReader;
 import titan.ccp.model.records.ActivePowerRecord;
@@ -22,6 +25,8 @@ import titan.ccp.model.records.ActivePowerRecord;
  */
 public final class Uc1BeamPipeline extends AbstractPipeline {
 
+  private final DatabaseAdapter<String> databaseAdapter = LogWriterFactory.forJson();
+
   protected Uc1BeamPipeline(final PipelineOptions options, final Configuration config) {
     super(options, config);
 
@@ -36,17 +41,14 @@ public final class Uc1BeamPipeline extends AbstractPipeline {
     final KafkaActivePowerTimestampReader kafka =
         new KafkaActivePowerTimestampReader(this.bootstrapServer, this.inputTopic, consumerConfig);
 
-    final LogKeyValue logKeyValue = new LogKeyValue();
-    final MapToGson mapToGson = new MapToGson();
-
     // Apply pipeline transformations
     // Read from Kafka
     this.apply(kafka)
-        // Map to Gson
-        .apply(MapElements
-            .via(mapToGson))
-        // Print to console
-        .apply(ParDo.of(logKeyValue));
+        .apply(Values.create())
+        .apply(MapElements.via(new ConverterAdapter<>(
+            this.databaseAdapter.getRecordConverter(),
+            String.class)))
+        .apply(ParDo.of(new WriterAdapter<>(this.databaseAdapter.getDatabaseWriter())));
   }
 }
 
diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/WriterAdapter.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/WriterAdapter.java
new file mode 100644
index 000000000..0cc22ef1a
--- /dev/null
+++ b/theodolite-benchmarks/uc1-beam/src/main/java/application/WriterAdapter.java
@@ -0,0 +1,25 @@
+package application;
+
+import org.apache.beam.sdk.transforms.DoFn;
+import rocks.theodolite.benchmarks.uc1.commons.DatabaseAdapter;
+import rocks.theodolite.benchmarks.uc1.commons.DatabaseWriter;
+
+/**
+ * {@link DoFn} which wraps a {@link DatabaseAdapter} to be used with Beam.
+ */
+public class WriterAdapter<T> extends DoFn<T, Void> {
+
+  private static final long serialVersionUID = -5263671231838353742L; // NOPMD
+
+  private final DatabaseWriter<T> databaseWriter;
+
+  public WriterAdapter(final DatabaseWriter<T> databaseWriter) {
+    this.databaseWriter = databaseWriter;
+  }
+
+  @ProcessElement
+  public void processElement(@Element final T record, final OutputReceiver<Void> out) {
+    this.databaseWriter.write(record);
+  }
+
+}
diff --git a/theodolite-benchmarks/uc1-commons/.settings/org.eclipse.jdt.ui.prefs b/theodolite-benchmarks/uc1-commons/.settings/org.eclipse.jdt.ui.prefs
new file mode 100644
index 000000000..713419c8d
--- /dev/null
+++ b/theodolite-benchmarks/uc1-commons/.settings/org.eclipse.jdt.ui.prefs
@@ -0,0 +1,127 @@
+cleanup.add_default_serial_version_id=true
+cleanup.add_generated_serial_version_id=false
+cleanup.add_missing_annotations=true
+cleanup.add_missing_deprecated_annotations=true
+cleanup.add_missing_methods=false
+cleanup.add_missing_nls_tags=false
+cleanup.add_missing_override_annotations=true
+cleanup.add_missing_override_annotations_interface_methods=true
+cleanup.add_serial_version_id=false
+cleanup.always_use_blocks=true
+cleanup.always_use_parentheses_in_expressions=false
+cleanup.always_use_this_for_non_static_field_access=true
+cleanup.always_use_this_for_non_static_method_access=true
+cleanup.convert_functional_interfaces=false
+cleanup.convert_to_enhanced_for_loop=true
+cleanup.correct_indentation=true
+cleanup.format_source_code=true
+cleanup.format_source_code_changes_only=false
+cleanup.insert_inferred_type_arguments=false
+cleanup.make_local_variable_final=true
+cleanup.make_parameters_final=true
+cleanup.make_private_fields_final=true
+cleanup.make_type_abstract_if_missing_method=false
+cleanup.make_variable_declarations_final=true
+cleanup.never_use_blocks=false
+cleanup.never_use_parentheses_in_expressions=true
+cleanup.organize_imports=true
+cleanup.qualify_static_field_accesses_with_declaring_class=false
+cleanup.qualify_static_member_accesses_through_instances_with_declaring_class=true
+cleanup.qualify_static_member_accesses_through_subtypes_with_declaring_class=true
+cleanup.qualify_static_member_accesses_with_declaring_class=true
+cleanup.qualify_static_method_accesses_with_declaring_class=false
+cleanup.remove_private_constructors=true
+cleanup.remove_redundant_modifiers=false
+cleanup.remove_redundant_semicolons=false
+cleanup.remove_redundant_type_arguments=true
+cleanup.remove_trailing_whitespaces=true
+cleanup.remove_trailing_whitespaces_all=true
+cleanup.remove_trailing_whitespaces_ignore_empty=false
+cleanup.remove_unnecessary_casts=true
+cleanup.remove_unnecessary_nls_tags=true
+cleanup.remove_unused_imports=true
+cleanup.remove_unused_local_variables=false
+cleanup.remove_unused_private_fields=true
+cleanup.remove_unused_private_members=false
+cleanup.remove_unused_private_methods=true
+cleanup.remove_unused_private_types=true
+cleanup.sort_members=false
+cleanup.sort_members_all=false
+cleanup.use_anonymous_class_creation=false
+cleanup.use_blocks=true
+cleanup.use_blocks_only_for_return_and_throw=false
+cleanup.use_lambda=true
+cleanup.use_parentheses_in_expressions=true
+cleanup.use_this_for_non_static_field_access=true
+cleanup.use_this_for_non_static_field_access_only_if_necessary=false
+cleanup.use_this_for_non_static_method_access=true
+cleanup.use_this_for_non_static_method_access_only_if_necessary=false
+cleanup_profile=_CAU-SE-Style
+cleanup_settings_version=2
+eclipse.preferences.version=1
+editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true
+formatter_profile=_CAU-SE-Style
+formatter_settings_version=21
+org.eclipse.jdt.ui.ignorelowercasenames=true
+org.eclipse.jdt.ui.importorder=;
+org.eclipse.jdt.ui.ondemandthreshold=99
+org.eclipse.jdt.ui.staticondemandthreshold=99
+sp_cleanup.add_default_serial_version_id=true
+sp_cleanup.add_generated_serial_version_id=false
+sp_cleanup.add_missing_annotations=true
+sp_cleanup.add_missing_deprecated_annotations=true
+sp_cleanup.add_missing_methods=false
+sp_cleanup.add_missing_nls_tags=false
+sp_cleanup.add_missing_override_annotations=true
+sp_cleanup.add_missing_override_annotations_interface_methods=true
+sp_cleanup.add_serial_version_id=false
+sp_cleanup.always_use_blocks=true
+sp_cleanup.always_use_parentheses_in_expressions=false
+sp_cleanup.always_use_this_for_non_static_field_access=true
+sp_cleanup.always_use_this_for_non_static_method_access=true
+sp_cleanup.convert_functional_interfaces=false
+sp_cleanup.convert_to_enhanced_for_loop=true
+sp_cleanup.correct_indentation=true
+sp_cleanup.format_source_code=true
+sp_cleanup.format_source_code_changes_only=false
+sp_cleanup.insert_inferred_type_arguments=false
+sp_cleanup.make_local_variable_final=true
+sp_cleanup.make_parameters_final=true
+sp_cleanup.make_private_fields_final=true
+sp_cleanup.make_type_abstract_if_missing_method=false
+sp_cleanup.make_variable_declarations_final=true
+sp_cleanup.never_use_blocks=false
+sp_cleanup.never_use_parentheses_in_expressions=true
+sp_cleanup.on_save_use_additional_actions=true
+sp_cleanup.organize_imports=true
+sp_cleanup.qualify_static_field_accesses_with_declaring_class=false
+sp_cleanup.qualify_static_member_accesses_through_instances_with_declaring_class=true
+sp_cleanup.qualify_static_member_accesses_through_subtypes_with_declaring_class=true
+sp_cleanup.qualify_static_member_accesses_with_declaring_class=true
+sp_cleanup.qualify_static_method_accesses_with_declaring_class=false
+sp_cleanup.remove_private_constructors=true
+sp_cleanup.remove_redundant_modifiers=false
+sp_cleanup.remove_redundant_semicolons=false
+sp_cleanup.remove_redundant_type_arguments=true
+sp_cleanup.remove_trailing_whitespaces=true
+sp_cleanup.remove_trailing_whitespaces_all=true
+sp_cleanup.remove_trailing_whitespaces_ignore_empty=false
+sp_cleanup.remove_unnecessary_casts=true
+sp_cleanup.remove_unnecessary_nls_tags=true
+sp_cleanup.remove_unused_imports=true
+sp_cleanup.remove_unused_local_variables=false
+sp_cleanup.remove_unused_private_fields=true
+sp_cleanup.remove_unused_private_members=false
+sp_cleanup.remove_unused_private_methods=true
+sp_cleanup.remove_unused_private_types=true
+sp_cleanup.sort_members=false
+sp_cleanup.sort_members_all=false
+sp_cleanup.use_anonymous_class_creation=false
+sp_cleanup.use_blocks=true
+sp_cleanup.use_blocks_only_for_return_and_throw=false
+sp_cleanup.use_lambda=true
+sp_cleanup.use_parentheses_in_expressions=true
+sp_cleanup.use_this_for_non_static_field_access=true
+sp_cleanup.use_this_for_non_static_field_access_only_if_necessary=false
+sp_cleanup.use_this_for_non_static_method_access=true
+sp_cleanup.use_this_for_non_static_method_access_only_if_necessary=false
diff --git a/theodolite-benchmarks/uc1-commons/.settings/qa.eclipse.plugin.checkstyle.prefs b/theodolite-benchmarks/uc1-commons/.settings/qa.eclipse.plugin.checkstyle.prefs
new file mode 100644
index 000000000..87860c815
--- /dev/null
+++ b/theodolite-benchmarks/uc1-commons/.settings/qa.eclipse.plugin.checkstyle.prefs
@@ -0,0 +1,4 @@
+configFilePath=../config/checkstyle.xml
+customModulesJarPaths=
+eclipse.preferences.version=1
+enabled=true
diff --git a/theodolite-benchmarks/uc1-commons/.settings/qa.eclipse.plugin.pmd.prefs b/theodolite-benchmarks/uc1-commons/.settings/qa.eclipse.plugin.pmd.prefs
new file mode 100644
index 000000000..efbcb8c9e
--- /dev/null
+++ b/theodolite-benchmarks/uc1-commons/.settings/qa.eclipse.plugin.pmd.prefs
@@ -0,0 +1,4 @@
+customRulesJars=
+eclipse.preferences.version=1
+enabled=true
+ruleSetFilePath=../config/pmd.xml
diff --git a/theodolite-benchmarks/uc1-commons/build.gradle b/theodolite-benchmarks/uc1-commons/build.gradle
new file mode 100644
index 000000000..cd95e28ed
--- /dev/null
+++ b/theodolite-benchmarks/uc1-commons/build.gradle
@@ -0,0 +1,22 @@
+plugins {
+    id 'theodolite.java-commons'
+}
+
+repositories {
+  mavenCentral()
+  maven {
+    url "https://oss.sonatype.org/content/repositories/snapshots/"
+  }
+  maven {
+    url 'https://packages.confluent.io/maven/'
+  }
+}
+
+dependencies {
+  implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true }
+  implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true }
+
+  implementation 'com.google.code.gson:gson:2.8.9'
+
+  testImplementation 'junit:junit:4.12'
+}
diff --git a/theodolite-benchmarks/uc1-commons/src/main/java/rocks/theodolite/benchmarks/uc1/commons/DatabaseAdapter.java b/theodolite-benchmarks/uc1-commons/src/main/java/rocks/theodolite/benchmarks/uc1/commons/DatabaseAdapter.java
new file mode 100644
index 000000000..a1cb1ade0
--- /dev/null
+++ b/theodolite-benchmarks/uc1-commons/src/main/java/rocks/theodolite/benchmarks/uc1/commons/DatabaseAdapter.java
@@ -0,0 +1,46 @@
+package rocks.theodolite.benchmarks.uc1.commons;
+
+import java.util.Objects;
+import titan.ccp.model.records.ActivePowerRecord;
+
+/**
+ * A database adapter consisting of a {@link RecordConverter} and a {@link DatabaseWriter}.
+ *
+ * @param <T> intermediate data type written to the database.
+ */
+public final class DatabaseAdapter<T> {
+
+  private final RecordConverter<T> recordConverter;
+
+  private final DatabaseWriter<T> databaseWriter;
+
+  private DatabaseAdapter(final RecordConverter<T> recordConverter,
+      final DatabaseWriter<T> databaseWriter) {
+    this.recordConverter = recordConverter;
+    this.databaseWriter = databaseWriter;
+  }
+
+  public RecordConverter<T> getRecordConverter() {
+    return this.recordConverter;
+  }
+
+  public DatabaseWriter<T> getDatabaseWriter() {
+    return this.databaseWriter;
+  }
+
+  /**
+   * Create a new {@link DatabaseAdapter}.
+   *
+   * @param <T> intermediate data type written to the database.
+   * @param recordConverter RecordConverter for converting {@link ActivePowerRecord}s to {@code T}
+   * @param databaseWriter DatabaseWriter for writing converted records to the database.
+   * @return the {@link DatabaseAdapter}.
+   */
+  public static <T> DatabaseAdapter<T> from(final RecordConverter<T> recordConverter,
+      final DatabaseWriter<T> databaseWriter) {
+    Objects.requireNonNull(recordConverter);
+    Objects.requireNonNull(databaseWriter);
+    return new DatabaseAdapter<>(recordConverter, databaseWriter);
+  }
+
+}
diff --git a/theodolite-benchmarks/uc1-commons/src/main/java/rocks/theodolite/benchmarks/uc1/commons/DatabaseWriter.java b/theodolite-benchmarks/uc1-commons/src/main/java/rocks/theodolite/benchmarks/uc1/commons/DatabaseWriter.java
new file mode 100644
index 000000000..1beb269e4
--- /dev/null
+++ b/theodolite-benchmarks/uc1-commons/src/main/java/rocks/theodolite/benchmarks/uc1/commons/DatabaseWriter.java
@@ -0,0 +1,13 @@
+package rocks.theodolite.benchmarks.uc1.commons;
+
+/**
+ * Writes an object to a database.
+ *
+ * @param <T> Type expected by the database.
+ */
+@FunctionalInterface
+public interface DatabaseWriter<T> {
+
+  void write(T record);
+
+}
diff --git a/theodolite-benchmarks/uc1-commons/src/main/java/rocks/theodolite/benchmarks/uc1/commons/RecordConverter.java b/theodolite-benchmarks/uc1-commons/src/main/java/rocks/theodolite/benchmarks/uc1/commons/RecordConverter.java
new file mode 100644
index 000000000..105f19e0e
--- /dev/null
+++ b/theodolite-benchmarks/uc1-commons/src/main/java/rocks/theodolite/benchmarks/uc1/commons/RecordConverter.java
@@ -0,0 +1,15 @@
+package rocks.theodolite.benchmarks.uc1.commons;
+
+import titan.ccp.model.records.ActivePowerRecord;
+
+/**
+ * Converts an {@link ActivePowerRecord} to the type required by a database.
+ *
+ * @param <T> Type required by the database.
+ */
+@FunctionalInterface
+public interface RecordConverter<T> {
+
+  T convert(ActivePowerRecord record);
+
+}
diff --git a/theodolite-benchmarks/uc1-commons/src/main/java/rocks/theodolite/benchmarks/uc1/commons/logger/JsonConverter.java b/theodolite-benchmarks/uc1-commons/src/main/java/rocks/theodolite/benchmarks/uc1/commons/logger/JsonConverter.java
new file mode 100644
index 000000000..f9974affb
--- /dev/null
+++ b/theodolite-benchmarks/uc1-commons/src/main/java/rocks/theodolite/benchmarks/uc1/commons/logger/JsonConverter.java
@@ -0,0 +1,22 @@
+package rocks.theodolite.benchmarks.uc1.commons.logger;
+
+import com.google.gson.Gson;
+import java.io.Serializable;
+import rocks.theodolite.benchmarks.uc1.commons.RecordConverter;
+import titan.ccp.model.records.ActivePowerRecord;
+
+/**
+ * {@link RecordConverter} that converts {@link ActivePowerRecord}s to JSON strings.
+ */
+public class JsonConverter implements RecordConverter<String>, Serializable {
+
+  private static final long serialVersionUID = -5263671231838353748L; // NOPMD
+
+  private static final Gson GSON = new Gson();
+
+  @Override
+  public String convert(final ActivePowerRecord activePowerRecord) {
+    return GSON.toJson(activePowerRecord);
+  }
+
+}
diff --git a/theodolite-benchmarks/uc1-commons/src/main/java/rocks/theodolite/benchmarks/uc1/commons/logger/LogWriter.java b/theodolite-benchmarks/uc1-commons/src/main/java/rocks/theodolite/benchmarks/uc1/commons/logger/LogWriter.java
new file mode 100644
index 000000000..d606a6dff
--- /dev/null
+++ b/theodolite-benchmarks/uc1-commons/src/main/java/rocks/theodolite/benchmarks/uc1/commons/logger/LogWriter.java
@@ -0,0 +1,22 @@
+package rocks.theodolite.benchmarks.uc1.commons.logger;
+
+import java.io.Serializable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import rocks.theodolite.benchmarks.uc1.commons.DatabaseWriter;
+
+/**
+ * Writes string records to a {@link Logger}.
+ */
+public class LogWriter implements DatabaseWriter<String>, Serializable {
+
+  private static final long serialVersionUID = -5263671231838353749L; // NOPMD
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(LogWriter.class);
+
+  @Override
+  public void write(final String string) {
+    LOGGER.info("Record: {}", string);
+  }
+
+}
diff --git a/theodolite-benchmarks/uc1-commons/src/main/java/rocks/theodolite/benchmarks/uc1/commons/logger/LogWriterFactory.java b/theodolite-benchmarks/uc1-commons/src/main/java/rocks/theodolite/benchmarks/uc1/commons/logger/LogWriterFactory.java
new file mode 100644
index 000000000..305ed933b
--- /dev/null
+++ b/theodolite-benchmarks/uc1-commons/src/main/java/rocks/theodolite/benchmarks/uc1/commons/logger/LogWriterFactory.java
@@ -0,0 +1,18 @@
+package rocks.theodolite.benchmarks.uc1.commons.logger;
+
+import org.slf4j.Logger;
+import rocks.theodolite.benchmarks.uc1.commons.DatabaseAdapter;
+
+/**
+ * Provides factory methods for creating a dummy {@link DatabaseAdapter} writing records as logs
+ * using a SLF4J {@link Logger}.
+ */
+public final class LogWriterFactory {
+
+  private LogWriterFactory() {}
+
+  public static DatabaseAdapter<String> forJson() {
+    return DatabaseAdapter.from(new JsonConverter(), new LogWriter());
+  }
+
+}
diff --git a/theodolite-benchmarks/uc1-flink/build.gradle b/theodolite-benchmarks/uc1-flink/build.gradle
index 8a2a359c4..681effe9a 100644
--- a/theodolite-benchmarks/uc1-flink/build.gradle
+++ b/theodolite-benchmarks/uc1-flink/build.gradle
@@ -2,4 +2,8 @@ plugins {
   id 'theodolite.flink'
 }
 
+dependencies {
+    implementation project(':uc1-commons')
+}
+
 mainClassName = "theodolite.uc1.application.HistoryServiceFlinkJob"
diff --git a/theodolite-benchmarks/uc1-flink/src/main/java/theodolite/uc1/application/ConverterAdapter.java b/theodolite-benchmarks/uc1-flink/src/main/java/theodolite/uc1/application/ConverterAdapter.java
new file mode 100644
index 000000000..d8b833169
--- /dev/null
+++ b/theodolite-benchmarks/uc1-flink/src/main/java/theodolite/uc1/application/ConverterAdapter.java
@@ -0,0 +1,25 @@
+package theodolite.uc1.application;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import rocks.theodolite.benchmarks.uc1.commons.RecordConverter;
+import titan.ccp.model.records.ActivePowerRecord;
+
+/**
+ * {@link MapFunction} which wraps a {@link RecordConverter} to be used with Flink.
+ */
+public class ConverterAdapter<T> implements MapFunction<ActivePowerRecord, T> {
+
+  private static final long serialVersionUID = -5263671231838353747L; // NOPMD
+
+  private final RecordConverter<T> recordConverter;
+
+  public ConverterAdapter(final RecordConverter<T> recordConverter) {
+    this.recordConverter = recordConverter;
+  }
+
+  @Override
+  public T map(final ActivePowerRecord record) throws Exception {
+    return this.recordConverter.convert(record);
+  }
+
+}
diff --git a/theodolite-benchmarks/uc1-flink/src/main/java/theodolite/uc1/application/GsonMapper.java b/theodolite-benchmarks/uc1-flink/src/main/java/theodolite/uc1/application/GsonMapper.java
deleted file mode 100644
index 831db7fe6..000000000
--- a/theodolite-benchmarks/uc1-flink/src/main/java/theodolite/uc1/application/GsonMapper.java
+++ /dev/null
@@ -1,22 +0,0 @@
-package theodolite.uc1.application;
-
-import com.google.gson.Gson;
-import org.apache.flink.api.common.functions.MapFunction;
-import titan.ccp.model.records.ActivePowerRecord;
-
-/**
- * {@link MapFunction} which maps {@link ActivePowerRecord}s to their representation as JSON
- * strings.
- */
-public class GsonMapper implements MapFunction<ActivePowerRecord, String> {
-
-  private static final long serialVersionUID = -5263671231838353747L; // NOPMD
-
-  private static final Gson GSON = new Gson();
-
-  @Override
-  public String map(final ActivePowerRecord value) throws Exception {
-    return GSON.toJson(value);
-  }
-
-}
diff --git a/theodolite-benchmarks/uc1-flink/src/main/java/theodolite/uc1/application/HistoryServiceFlinkJob.java b/theodolite-benchmarks/uc1-flink/src/main/java/theodolite/uc1/application/HistoryServiceFlinkJob.java
index 0cb132e52..411311527 100644
--- a/theodolite-benchmarks/uc1-flink/src/main/java/theodolite/uc1/application/HistoryServiceFlinkJob.java
+++ b/theodolite-benchmarks/uc1-flink/src/main/java/theodolite/uc1/application/HistoryServiceFlinkJob.java
@@ -7,6 +7,8 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import rocks.theodolite.benchmarks.uc1.commons.DatabaseAdapter;
+import rocks.theodolite.benchmarks.uc1.commons.logger.LogWriterFactory;
 import theodolite.commons.flink.KafkaConnectorFactory;
 import titan.ccp.common.configuration.ServiceConfigurations;
 import titan.ccp.model.records.ActivePowerRecord;
@@ -22,6 +24,8 @@ public final class HistoryServiceFlinkJob {
   private final StreamExecutionEnvironment env;
   private final String applicationId;
 
+  private final DatabaseAdapter<String> databaseAdapter = LogWriterFactory.forJson();
+
   /**
    * Create a new instance of the {@link HistoryServiceFlinkJob}.
    */
@@ -69,9 +73,10 @@ public final class HistoryServiceFlinkJob {
 
     stream
         // .rebalance()
-        .map(new GsonMapper())
-        .flatMap((record, c) -> LOGGER.info("Record: {}", record))
-        .returns(Types.GENERIC(Object.class)); // Will never be used
+        .map(new ConverterAdapter<>(this.databaseAdapter.getRecordConverter()))
+        .returns(Types.STRING)
+        .flatMap(new WriterAdapter<>(this.databaseAdapter.getDatabaseWriter()))
+        .returns(Types.VOID); // Will never be used
   }
 
   /**
diff --git a/theodolite-benchmarks/uc1-flink/src/main/java/theodolite/uc1/application/WriterAdapter.java b/theodolite-benchmarks/uc1-flink/src/main/java/theodolite/uc1/application/WriterAdapter.java
new file mode 100644
index 000000000..7974fd19b
--- /dev/null
+++ b/theodolite-benchmarks/uc1-flink/src/main/java/theodolite/uc1/application/WriterAdapter.java
@@ -0,0 +1,26 @@
+package theodolite.uc1.application;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.util.Collector;
+import rocks.theodolite.benchmarks.uc1.commons.DatabaseAdapter;
+import rocks.theodolite.benchmarks.uc1.commons.DatabaseWriter;
+
+/**
+ * {@link FlatMapFunction} which wraps a {@link DatabaseAdapter} to be used with Flink.
+ */
+public class WriterAdapter<T> implements FlatMapFunction<T, Void> {
+
+  private static final long serialVersionUID = -5263671231838353747L; // NOPMD
+
+  private final DatabaseWriter<T> databaseWriter;
+
+  public WriterAdapter(final DatabaseWriter<T> databaseWriter) {
+    this.databaseWriter = databaseWriter;
+  }
+
+  @Override
+  public void flatMap(final T value, final Collector<Void> out) throws Exception {
+    this.databaseWriter.write(value);
+  }
+
+}
diff --git a/theodolite-benchmarks/uc1-kstreams/build.gradle b/theodolite-benchmarks/uc1-kstreams/build.gradle
index 74cfb450e..1460a99a2 100644
--- a/theodolite-benchmarks/uc1-kstreams/build.gradle
+++ b/theodolite-benchmarks/uc1-kstreams/build.gradle
@@ -2,4 +2,8 @@ plugins {
   id 'theodolite.kstreams'
 }
 
+dependencies {
+    implementation project(':uc1-commons')
+}
+
 mainClassName = "theodolite.uc1.application.HistoryService"
diff --git a/theodolite-benchmarks/uc1-kstreams/src/main/java/theodolite/uc1/streamprocessing/TopologyBuilder.java b/theodolite-benchmarks/uc1-kstreams/src/main/java/theodolite/uc1/streamprocessing/TopologyBuilder.java
index 427a838f4..64d6d08c3 100644
--- a/theodolite-benchmarks/uc1-kstreams/src/main/java/theodolite/uc1/streamprocessing/TopologyBuilder.java
+++ b/theodolite-benchmarks/uc1-kstreams/src/main/java/theodolite/uc1/streamprocessing/TopologyBuilder.java
@@ -1,13 +1,12 @@
 package theodolite.uc1.streamprocessing;
 
-import com.google.gson.Gson;
 import java.util.Properties;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.kstream.Consumed;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import rocks.theodolite.benchmarks.uc1.commons.DatabaseAdapter;
+import rocks.theodolite.benchmarks.uc1.commons.logger.LogWriterFactory;
 import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory;
 import titan.ccp.model.records.ActivePowerRecord;
 
@@ -16,12 +15,11 @@ import titan.ccp.model.records.ActivePowerRecord;
  */
 public class TopologyBuilder {
 
-  private static final Logger LOGGER = LoggerFactory.getLogger(TopologyBuilder.class);
-  private static final Gson GSON = new Gson();
-
   private final String inputTopic;
   private final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory;
 
+  private final DatabaseAdapter<String> databaseAdapter = LogWriterFactory.forJson();
+
   private final StreamsBuilder builder = new StreamsBuilder();
 
 
@@ -42,8 +40,8 @@ public class TopologyBuilder {
         .stream(this.inputTopic, Consumed.with(
             Serdes.String(),
             this.srAvroSerdeFactory.<ActivePowerRecord>forValues()))
-        .mapValues(v -> GSON.toJson(v))
-        .foreach((k, record) -> LOGGER.info("Record: {}", record));
+        .mapValues(this.databaseAdapter.getRecordConverter()::convert)
+        .foreach((k, record) -> this.databaseAdapter.getDatabaseWriter().write(record));
 
     return this.builder.build(properties);
   }
-- 
GitLab