diff --git a/theodolite-benchmarks/buildSrc/src/main/groovy/theodolite.beam.gradle b/theodolite-benchmarks/buildSrc/src/main/groovy/theodolite.beam.gradle index 4611062f1b09ff2dbad02f93b9cc7f9920c32f5e..5849bd93221794d135f1c6cb3bcb62d2174724b5 100644 --- a/theodolite-benchmarks/buildSrc/src/main/groovy/theodolite.beam.gradle +++ b/theodolite-benchmarks/buildSrc/src/main/groovy/theodolite.beam.gradle @@ -24,7 +24,6 @@ dependencies { // These dependencies are used internally, and not exposed to consumers on their own compile classpath. implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true } implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true } - implementation 'com.google.code.gson:gson:2.8.2' implementation 'com.google.guava:guava:24.1-jre' implementation 'org.slf4j:slf4j-simple:1.7.25' implementation project(':beam-commons') diff --git a/theodolite-benchmarks/buildSrc/src/main/groovy/theodolite.flink.gradle b/theodolite-benchmarks/buildSrc/src/main/groovy/theodolite.flink.gradle index 258d1a82d002184fe96a9df19b7d99806da50d28..7671e602211b6d9e923a3b2a4c87f40fff84c6ec 100644 --- a/theodolite-benchmarks/buildSrc/src/main/groovy/theodolite.flink.gradle +++ b/theodolite-benchmarks/buildSrc/src/main/groovy/theodolite.flink.gradle @@ -41,7 +41,6 @@ dependencies { implementation 'org.apache.kafka:kafka-clients:2.2.0' implementation 'com.google.guava:guava:30.1-jre' - implementation 'com.google.code.gson:gson:2.8.2' implementation 'org.slf4j:slf4j-simple:1.6.1' implementation project(':flink-commons') diff --git a/theodolite-benchmarks/buildSrc/src/main/groovy/theodolite.kstreams.gradle b/theodolite-benchmarks/buildSrc/src/main/groovy/theodolite.kstreams.gradle index 112ac662798d5a1e41f146014dd95bdaaba3a264..bf533915a8fdf4a712754857702373264a30f80a 100644 --- a/theodolite-benchmarks/buildSrc/src/main/groovy/theodolite.kstreams.gradle +++ b/theodolite-benchmarks/buildSrc/src/main/groovy/theodolite.kstreams.gradle @@ -23,7 +23,6 @@ dependencies { implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true } implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true } implementation 'org.apache.kafka:kafka-streams:3.1.0' - implementation 'com.google.code.gson:gson:2.8.2' implementation 'com.google.guava:guava:24.1-jre' implementation 'org.slf4j:slf4j-simple:1.7.25' implementation project(':kstreams-commons') diff --git a/theodolite-benchmarks/settings.gradle b/theodolite-benchmarks/settings.gradle index 2f6bd4b74cbf103b740d8d8f57e38ac9deafd3cf..776e7d8e4fe132839b6e27c70c368720415721ea 100644 --- a/theodolite-benchmarks/settings.gradle +++ b/theodolite-benchmarks/settings.gradle @@ -5,32 +5,32 @@ include 'kstreams-commons' include 'flink-commons' include 'beam-commons' -include 'uc1-beam' -include 'uc2-beam' -include 'uc3-beam' -include 'uc4-beam' - include 'uc1-load-generator' +include 'uc1-commons' include 'uc1-kstreams' include 'uc1-flink' +include 'uc1-beam' include 'uc1-beam-flink' include 'uc1-beam-samza' include 'uc2-load-generator' include 'uc2-kstreams' include 'uc2-flink' +include 'uc2-beam' include 'uc2-beam-flink' include 'uc2-beam-samza' include 'uc3-load-generator' include 'uc3-kstreams' include 'uc3-flink' +include 'uc3-beam' include 'uc3-beam-flink' include 'uc3-beam-samza' include 'uc4-load-generator' include 'uc4-kstreams' include 'uc4-flink' +include 'uc4-beam' include 'uc4-beam-flink' include 'uc4-beam-samza' diff --git a/theodolite-benchmarks/uc1-beam/build.gradle b/theodolite-benchmarks/uc1-beam/build.gradle index 502e94fa737fb2ae1bab861407b27575cd8766ca..ae80dcd8c5b263892f4539098a9a29b25c133819 100644 --- a/theodolite-benchmarks/uc1-beam/build.gradle +++ b/theodolite-benchmarks/uc1-beam/build.gradle @@ -2,4 +2,6 @@ plugins { id 'theodolite.beam' } - +dependencies { + implementation project(':uc1-commons') +} diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/ConverterAdapter.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/ConverterAdapter.java new file mode 100644 index 0000000000000000000000000000000000000000..b9d747fe7ac52003cafd848a75fbcd4cbc1eb936 --- /dev/null +++ b/theodolite-benchmarks/uc1-beam/src/main/java/application/ConverterAdapter.java @@ -0,0 +1,33 @@ +package application; + +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.values.TypeDescriptor; +import rocks.theodolite.benchmarks.uc1.commons.RecordConverter; +import titan.ccp.model.records.ActivePowerRecord; + +/** + * {@link SimpleFunction} which wraps a {@link RecordConverter} to be used with Beam. + */ +public class ConverterAdapter<T> extends SimpleFunction<ActivePowerRecord, T> { + + private static final long serialVersionUID = -5263671231838353747L; // NOPMD + + private final RecordConverter<T> recordConverter; + private final TypeDescriptor<T> type; + + public ConverterAdapter(final RecordConverter<T> recordConverter, Class<T> type) { + this.recordConverter = recordConverter; + this.type = TypeDescriptor.of(type); + } + + @Override + public T apply(final ActivePowerRecord record) { + return this.recordConverter.convert(record); + } + + @Override + public TypeDescriptor<T> getOutputTypeDescriptor() { + return this.type; + } + +} diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/LogKeyValue.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/LogKeyValue.java deleted file mode 100644 index 251523441e339cbaf58c7e3a1b30e97cc354df18..0000000000000000000000000000000000000000 --- a/theodolite-benchmarks/uc1-beam/src/main/java/application/LogKeyValue.java +++ /dev/null @@ -1,24 +0,0 @@ -package application; - -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.values.KV; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Logs all Key Value pairs. - */ -public class LogKeyValue extends DoFn<KV<String, String>, KV<String, String>> { - private static final long serialVersionUID = 4328743; - private static final Logger LOGGER = LoggerFactory.getLogger(LogKeyValue.class); - - /** - * Logs all key value pairs it processes. - */ - @ProcessElement - public void processElement(@Element final KV<String, String> kv, - final OutputReceiver<KV<String, String>> out) { - LOGGER.info("Key: {}, Value: {}", kv.getKey(), kv.getValue()); - out.output(kv); - } -} diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/MapToGson.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/MapToGson.java deleted file mode 100644 index 6b0c6bc4ddfe78c22028da5b8cf7dde7ed57fced..0000000000000000000000000000000000000000 --- a/theodolite-benchmarks/uc1-beam/src/main/java/application/MapToGson.java +++ /dev/null @@ -1,26 +0,0 @@ -package application; - -import com.google.gson.Gson; -import org.apache.beam.sdk.transforms.SimpleFunction; -import org.apache.beam.sdk.values.KV; -import titan.ccp.model.records.ActivePowerRecord; - -/** - * Converts a Map into a json String. - */ -public class MapToGson extends SimpleFunction<KV<String, ActivePowerRecord>, KV<String, String>> { - private static final long serialVersionUID = 7168356203579050214L; - private transient Gson gsonObj = new Gson(); - - @Override - public KV<String, String> apply( - final KV<String, ActivePowerRecord> kv) { - - if (this.gsonObj == null) { - this.gsonObj = new Gson(); - } - - final String gson = this.gsonObj.toJson(kv.getValue()); - return KV.of(kv.getKey(), gson); - } -} diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java index eaff08ac78cd18ddfd47eb2949ca13340ecc27b8..dde2bc064af9b21e11b859fdbf3f6ba1374d17d7 100644 --- a/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java +++ b/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java @@ -6,7 +6,10 @@ import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Values; import org.apache.commons.configuration2.Configuration; +import rocks.theodolite.benchmarks.uc1.commons.DatabaseAdapter; +import rocks.theodolite.benchmarks.uc1.commons.logger.LogWriterFactory; import theodolite.commons.beam.AbstractPipeline; import theodolite.commons.beam.kafka.KafkaActivePowerTimestampReader; import titan.ccp.model.records.ActivePowerRecord; @@ -22,6 +25,8 @@ import titan.ccp.model.records.ActivePowerRecord; */ public final class Uc1BeamPipeline extends AbstractPipeline { + private final DatabaseAdapter<String> databaseAdapter = LogWriterFactory.forJson(); + protected Uc1BeamPipeline(final PipelineOptions options, final Configuration config) { super(options, config); @@ -36,17 +41,14 @@ public final class Uc1BeamPipeline extends AbstractPipeline { final KafkaActivePowerTimestampReader kafka = new KafkaActivePowerTimestampReader(this.bootstrapServer, this.inputTopic, consumerConfig); - final LogKeyValue logKeyValue = new LogKeyValue(); - final MapToGson mapToGson = new MapToGson(); - // Apply pipeline transformations // Read from Kafka this.apply(kafka) - // Map to Gson - .apply(MapElements - .via(mapToGson)) - // Print to console - .apply(ParDo.of(logKeyValue)); + .apply(Values.create()) + .apply(MapElements.via(new ConverterAdapter<>( + this.databaseAdapter.getRecordConverter(), + String.class))) + .apply(ParDo.of(new WriterAdapter<>(this.databaseAdapter.getDatabaseWriter()))); } } diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/WriterAdapter.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/WriterAdapter.java new file mode 100644 index 0000000000000000000000000000000000000000..0cc22ef1a0e329252b6586ee3cf9044f8d06e165 --- /dev/null +++ b/theodolite-benchmarks/uc1-beam/src/main/java/application/WriterAdapter.java @@ -0,0 +1,25 @@ +package application; + +import org.apache.beam.sdk.transforms.DoFn; +import rocks.theodolite.benchmarks.uc1.commons.DatabaseAdapter; +import rocks.theodolite.benchmarks.uc1.commons.DatabaseWriter; + +/** + * {@link DoFn} which wraps a {@link DatabaseAdapter} to be used with Beam. + */ +public class WriterAdapter<T> extends DoFn<T, Void> { + + private static final long serialVersionUID = -5263671231838353742L; // NOPMD + + private final DatabaseWriter<T> databaseWriter; + + public WriterAdapter(final DatabaseWriter<T> databaseWriter) { + this.databaseWriter = databaseWriter; + } + + @ProcessElement + public void processElement(@Element final T record, final OutputReceiver<Void> out) { + this.databaseWriter.write(record); + } + +} diff --git a/theodolite-benchmarks/uc1-commons/.settings/org.eclipse.jdt.ui.prefs b/theodolite-benchmarks/uc1-commons/.settings/org.eclipse.jdt.ui.prefs new file mode 100644 index 0000000000000000000000000000000000000000..713419c8d3d74d3bd7fd05c3e839367753fcdee0 --- /dev/null +++ b/theodolite-benchmarks/uc1-commons/.settings/org.eclipse.jdt.ui.prefs @@ -0,0 +1,127 @@ +cleanup.add_default_serial_version_id=true +cleanup.add_generated_serial_version_id=false +cleanup.add_missing_annotations=true +cleanup.add_missing_deprecated_annotations=true +cleanup.add_missing_methods=false +cleanup.add_missing_nls_tags=false +cleanup.add_missing_override_annotations=true +cleanup.add_missing_override_annotations_interface_methods=true +cleanup.add_serial_version_id=false +cleanup.always_use_blocks=true +cleanup.always_use_parentheses_in_expressions=false +cleanup.always_use_this_for_non_static_field_access=true +cleanup.always_use_this_for_non_static_method_access=true +cleanup.convert_functional_interfaces=false +cleanup.convert_to_enhanced_for_loop=true +cleanup.correct_indentation=true +cleanup.format_source_code=true +cleanup.format_source_code_changes_only=false +cleanup.insert_inferred_type_arguments=false +cleanup.make_local_variable_final=true +cleanup.make_parameters_final=true +cleanup.make_private_fields_final=true +cleanup.make_type_abstract_if_missing_method=false +cleanup.make_variable_declarations_final=true +cleanup.never_use_blocks=false +cleanup.never_use_parentheses_in_expressions=true +cleanup.organize_imports=true +cleanup.qualify_static_field_accesses_with_declaring_class=false +cleanup.qualify_static_member_accesses_through_instances_with_declaring_class=true +cleanup.qualify_static_member_accesses_through_subtypes_with_declaring_class=true +cleanup.qualify_static_member_accesses_with_declaring_class=true +cleanup.qualify_static_method_accesses_with_declaring_class=false +cleanup.remove_private_constructors=true +cleanup.remove_redundant_modifiers=false +cleanup.remove_redundant_semicolons=false +cleanup.remove_redundant_type_arguments=true +cleanup.remove_trailing_whitespaces=true +cleanup.remove_trailing_whitespaces_all=true +cleanup.remove_trailing_whitespaces_ignore_empty=false +cleanup.remove_unnecessary_casts=true +cleanup.remove_unnecessary_nls_tags=true +cleanup.remove_unused_imports=true +cleanup.remove_unused_local_variables=false +cleanup.remove_unused_private_fields=true +cleanup.remove_unused_private_members=false +cleanup.remove_unused_private_methods=true +cleanup.remove_unused_private_types=true +cleanup.sort_members=false +cleanup.sort_members_all=false +cleanup.use_anonymous_class_creation=false +cleanup.use_blocks=true +cleanup.use_blocks_only_for_return_and_throw=false +cleanup.use_lambda=true +cleanup.use_parentheses_in_expressions=true +cleanup.use_this_for_non_static_field_access=true +cleanup.use_this_for_non_static_field_access_only_if_necessary=false +cleanup.use_this_for_non_static_method_access=true +cleanup.use_this_for_non_static_method_access_only_if_necessary=false +cleanup_profile=_CAU-SE-Style +cleanup_settings_version=2 +eclipse.preferences.version=1 +editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true +formatter_profile=_CAU-SE-Style +formatter_settings_version=21 +org.eclipse.jdt.ui.ignorelowercasenames=true +org.eclipse.jdt.ui.importorder=; +org.eclipse.jdt.ui.ondemandthreshold=99 +org.eclipse.jdt.ui.staticondemandthreshold=99 +sp_cleanup.add_default_serial_version_id=true +sp_cleanup.add_generated_serial_version_id=false +sp_cleanup.add_missing_annotations=true +sp_cleanup.add_missing_deprecated_annotations=true +sp_cleanup.add_missing_methods=false +sp_cleanup.add_missing_nls_tags=false +sp_cleanup.add_missing_override_annotations=true +sp_cleanup.add_missing_override_annotations_interface_methods=true +sp_cleanup.add_serial_version_id=false +sp_cleanup.always_use_blocks=true +sp_cleanup.always_use_parentheses_in_expressions=false +sp_cleanup.always_use_this_for_non_static_field_access=true +sp_cleanup.always_use_this_for_non_static_method_access=true +sp_cleanup.convert_functional_interfaces=false +sp_cleanup.convert_to_enhanced_for_loop=true +sp_cleanup.correct_indentation=true +sp_cleanup.format_source_code=true +sp_cleanup.format_source_code_changes_only=false +sp_cleanup.insert_inferred_type_arguments=false +sp_cleanup.make_local_variable_final=true +sp_cleanup.make_parameters_final=true +sp_cleanup.make_private_fields_final=true +sp_cleanup.make_type_abstract_if_missing_method=false +sp_cleanup.make_variable_declarations_final=true +sp_cleanup.never_use_blocks=false +sp_cleanup.never_use_parentheses_in_expressions=true +sp_cleanup.on_save_use_additional_actions=true +sp_cleanup.organize_imports=true +sp_cleanup.qualify_static_field_accesses_with_declaring_class=false +sp_cleanup.qualify_static_member_accesses_through_instances_with_declaring_class=true +sp_cleanup.qualify_static_member_accesses_through_subtypes_with_declaring_class=true +sp_cleanup.qualify_static_member_accesses_with_declaring_class=true +sp_cleanup.qualify_static_method_accesses_with_declaring_class=false +sp_cleanup.remove_private_constructors=true +sp_cleanup.remove_redundant_modifiers=false +sp_cleanup.remove_redundant_semicolons=false +sp_cleanup.remove_redundant_type_arguments=true +sp_cleanup.remove_trailing_whitespaces=true +sp_cleanup.remove_trailing_whitespaces_all=true +sp_cleanup.remove_trailing_whitespaces_ignore_empty=false +sp_cleanup.remove_unnecessary_casts=true +sp_cleanup.remove_unnecessary_nls_tags=true +sp_cleanup.remove_unused_imports=true +sp_cleanup.remove_unused_local_variables=false +sp_cleanup.remove_unused_private_fields=true +sp_cleanup.remove_unused_private_members=false +sp_cleanup.remove_unused_private_methods=true +sp_cleanup.remove_unused_private_types=true +sp_cleanup.sort_members=false +sp_cleanup.sort_members_all=false +sp_cleanup.use_anonymous_class_creation=false +sp_cleanup.use_blocks=true +sp_cleanup.use_blocks_only_for_return_and_throw=false +sp_cleanup.use_lambda=true +sp_cleanup.use_parentheses_in_expressions=true +sp_cleanup.use_this_for_non_static_field_access=true +sp_cleanup.use_this_for_non_static_field_access_only_if_necessary=false +sp_cleanup.use_this_for_non_static_method_access=true +sp_cleanup.use_this_for_non_static_method_access_only_if_necessary=false diff --git a/theodolite-benchmarks/uc1-commons/.settings/qa.eclipse.plugin.checkstyle.prefs b/theodolite-benchmarks/uc1-commons/.settings/qa.eclipse.plugin.checkstyle.prefs new file mode 100644 index 0000000000000000000000000000000000000000..87860c815222845c1d264d7d0ce498d3397f8280 --- /dev/null +++ b/theodolite-benchmarks/uc1-commons/.settings/qa.eclipse.plugin.checkstyle.prefs @@ -0,0 +1,4 @@ +configFilePath=../config/checkstyle.xml +customModulesJarPaths= +eclipse.preferences.version=1 +enabled=true diff --git a/theodolite-benchmarks/uc1-commons/.settings/qa.eclipse.plugin.pmd.prefs b/theodolite-benchmarks/uc1-commons/.settings/qa.eclipse.plugin.pmd.prefs new file mode 100644 index 0000000000000000000000000000000000000000..efbcb8c9e5d449194a48ca1ea42b7d807b573db9 --- /dev/null +++ b/theodolite-benchmarks/uc1-commons/.settings/qa.eclipse.plugin.pmd.prefs @@ -0,0 +1,4 @@ +customRulesJars= +eclipse.preferences.version=1 +enabled=true +ruleSetFilePath=../config/pmd.xml diff --git a/theodolite-benchmarks/uc1-commons/build.gradle b/theodolite-benchmarks/uc1-commons/build.gradle new file mode 100644 index 0000000000000000000000000000000000000000..cd95e28ed53cc8e33a416564e613574faf1fc6cb --- /dev/null +++ b/theodolite-benchmarks/uc1-commons/build.gradle @@ -0,0 +1,22 @@ +plugins { + id 'theodolite.java-commons' +} + +repositories { + mavenCentral() + maven { + url "https://oss.sonatype.org/content/repositories/snapshots/" + } + maven { + url 'https://packages.confluent.io/maven/' + } +} + +dependencies { + implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true } + implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true } + + implementation 'com.google.code.gson:gson:2.8.9' + + testImplementation 'junit:junit:4.12' +} diff --git a/theodolite-benchmarks/uc1-commons/src/main/java/rocks/theodolite/benchmarks/uc1/commons/DatabaseAdapter.java b/theodolite-benchmarks/uc1-commons/src/main/java/rocks/theodolite/benchmarks/uc1/commons/DatabaseAdapter.java new file mode 100644 index 0000000000000000000000000000000000000000..a1cb1ade0dc76b168cf9ee54f64d5ac88d6b3a98 --- /dev/null +++ b/theodolite-benchmarks/uc1-commons/src/main/java/rocks/theodolite/benchmarks/uc1/commons/DatabaseAdapter.java @@ -0,0 +1,46 @@ +package rocks.theodolite.benchmarks.uc1.commons; + +import java.util.Objects; +import titan.ccp.model.records.ActivePowerRecord; + +/** + * A database adapter consisting of a {@link RecordConverter} and a {@link DatabaseWriter}. + * + * @param <T> intermediate data type written to the database. + */ +public final class DatabaseAdapter<T> { + + private final RecordConverter<T> recordConverter; + + private final DatabaseWriter<T> databaseWriter; + + private DatabaseAdapter(final RecordConverter<T> recordConverter, + final DatabaseWriter<T> databaseWriter) { + this.recordConverter = recordConverter; + this.databaseWriter = databaseWriter; + } + + public RecordConverter<T> getRecordConverter() { + return this.recordConverter; + } + + public DatabaseWriter<T> getDatabaseWriter() { + return this.databaseWriter; + } + + /** + * Create a new {@link DatabaseAdapter}. + * + * @param <T> intermediate data type written to the database. + * @param recordConverter RecordConverter for converting {@link ActivePowerRecord}s to {@code T} + * @param databaseWriter DatabaseWriter for writing converted records to the database. + * @return the {@link DatabaseAdapter}. + */ + public static <T> DatabaseAdapter<T> from(final RecordConverter<T> recordConverter, + final DatabaseWriter<T> databaseWriter) { + Objects.requireNonNull(recordConverter); + Objects.requireNonNull(databaseWriter); + return new DatabaseAdapter<>(recordConverter, databaseWriter); + } + +} diff --git a/theodolite-benchmarks/uc1-commons/src/main/java/rocks/theodolite/benchmarks/uc1/commons/DatabaseWriter.java b/theodolite-benchmarks/uc1-commons/src/main/java/rocks/theodolite/benchmarks/uc1/commons/DatabaseWriter.java new file mode 100644 index 0000000000000000000000000000000000000000..1beb269e4b75252ac72f7c30c4a26f7a11de4fb6 --- /dev/null +++ b/theodolite-benchmarks/uc1-commons/src/main/java/rocks/theodolite/benchmarks/uc1/commons/DatabaseWriter.java @@ -0,0 +1,13 @@ +package rocks.theodolite.benchmarks.uc1.commons; + +/** + * Writes an object to a database. + * + * @param <T> Type expected by the database. + */ +@FunctionalInterface +public interface DatabaseWriter<T> { + + void write(T record); + +} diff --git a/theodolite-benchmarks/uc1-commons/src/main/java/rocks/theodolite/benchmarks/uc1/commons/RecordConverter.java b/theodolite-benchmarks/uc1-commons/src/main/java/rocks/theodolite/benchmarks/uc1/commons/RecordConverter.java new file mode 100644 index 0000000000000000000000000000000000000000..105f19e0e920e3516f7277cd7804dae210a7d0b1 --- /dev/null +++ b/theodolite-benchmarks/uc1-commons/src/main/java/rocks/theodolite/benchmarks/uc1/commons/RecordConverter.java @@ -0,0 +1,15 @@ +package rocks.theodolite.benchmarks.uc1.commons; + +import titan.ccp.model.records.ActivePowerRecord; + +/** + * Converts an {@link ActivePowerRecord} to the type required by a database. + * + * @param <T> Type required by the database. + */ +@FunctionalInterface +public interface RecordConverter<T> { + + T convert(ActivePowerRecord record); + +} diff --git a/theodolite-benchmarks/uc1-commons/src/main/java/rocks/theodolite/benchmarks/uc1/commons/logger/JsonConverter.java b/theodolite-benchmarks/uc1-commons/src/main/java/rocks/theodolite/benchmarks/uc1/commons/logger/JsonConverter.java new file mode 100644 index 0000000000000000000000000000000000000000..f9974affb7bf57fc63e9bfe8ba92fd056da9a97b --- /dev/null +++ b/theodolite-benchmarks/uc1-commons/src/main/java/rocks/theodolite/benchmarks/uc1/commons/logger/JsonConverter.java @@ -0,0 +1,22 @@ +package rocks.theodolite.benchmarks.uc1.commons.logger; + +import com.google.gson.Gson; +import java.io.Serializable; +import rocks.theodolite.benchmarks.uc1.commons.RecordConverter; +import titan.ccp.model.records.ActivePowerRecord; + +/** + * {@link RecordConverter} that converts {@link ActivePowerRecord}s to JSON strings. + */ +public class JsonConverter implements RecordConverter<String>, Serializable { + + private static final long serialVersionUID = -5263671231838353748L; // NOPMD + + private static final Gson GSON = new Gson(); + + @Override + public String convert(final ActivePowerRecord activePowerRecord) { + return GSON.toJson(activePowerRecord); + } + +} diff --git a/theodolite-benchmarks/uc1-commons/src/main/java/rocks/theodolite/benchmarks/uc1/commons/logger/LogWriter.java b/theodolite-benchmarks/uc1-commons/src/main/java/rocks/theodolite/benchmarks/uc1/commons/logger/LogWriter.java new file mode 100644 index 0000000000000000000000000000000000000000..d606a6dffd01257b308bf2afebc3088b52793ccf --- /dev/null +++ b/theodolite-benchmarks/uc1-commons/src/main/java/rocks/theodolite/benchmarks/uc1/commons/logger/LogWriter.java @@ -0,0 +1,22 @@ +package rocks.theodolite.benchmarks.uc1.commons.logger; + +import java.io.Serializable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import rocks.theodolite.benchmarks.uc1.commons.DatabaseWriter; + +/** + * Writes string records to a {@link Logger}. + */ +public class LogWriter implements DatabaseWriter<String>, Serializable { + + private static final long serialVersionUID = -5263671231838353749L; // NOPMD + + private static final Logger LOGGER = LoggerFactory.getLogger(LogWriter.class); + + @Override + public void write(final String string) { + LOGGER.info("Record: {}", string); + } + +} diff --git a/theodolite-benchmarks/uc1-commons/src/main/java/rocks/theodolite/benchmarks/uc1/commons/logger/LogWriterFactory.java b/theodolite-benchmarks/uc1-commons/src/main/java/rocks/theodolite/benchmarks/uc1/commons/logger/LogWriterFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..305ed933ba3e0d885de9c65aacc6ace8a0884621 --- /dev/null +++ b/theodolite-benchmarks/uc1-commons/src/main/java/rocks/theodolite/benchmarks/uc1/commons/logger/LogWriterFactory.java @@ -0,0 +1,18 @@ +package rocks.theodolite.benchmarks.uc1.commons.logger; + +import org.slf4j.Logger; +import rocks.theodolite.benchmarks.uc1.commons.DatabaseAdapter; + +/** + * Provides factory methods for creating a dummy {@link DatabaseAdapter} writing records as logs + * using a SLF4J {@link Logger}. + */ +public final class LogWriterFactory { + + private LogWriterFactory() {} + + public static DatabaseAdapter<String> forJson() { + return DatabaseAdapter.from(new JsonConverter(), new LogWriter()); + } + +} diff --git a/theodolite-benchmarks/uc1-flink/build.gradle b/theodolite-benchmarks/uc1-flink/build.gradle index 8a2a359c4840e67581f7bc24f1544ff519f82525..681effe9a347f0fa9f26d6a2caf0668ade09d6c2 100644 --- a/theodolite-benchmarks/uc1-flink/build.gradle +++ b/theodolite-benchmarks/uc1-flink/build.gradle @@ -2,4 +2,8 @@ plugins { id 'theodolite.flink' } +dependencies { + implementation project(':uc1-commons') +} + mainClassName = "theodolite.uc1.application.HistoryServiceFlinkJob" diff --git a/theodolite-benchmarks/uc1-flink/src/main/java/theodolite/uc1/application/ConverterAdapter.java b/theodolite-benchmarks/uc1-flink/src/main/java/theodolite/uc1/application/ConverterAdapter.java new file mode 100644 index 0000000000000000000000000000000000000000..d8b833169ed38dfd811b3d09b8e250d2774f40b0 --- /dev/null +++ b/theodolite-benchmarks/uc1-flink/src/main/java/theodolite/uc1/application/ConverterAdapter.java @@ -0,0 +1,25 @@ +package theodolite.uc1.application; + +import org.apache.flink.api.common.functions.MapFunction; +import rocks.theodolite.benchmarks.uc1.commons.RecordConverter; +import titan.ccp.model.records.ActivePowerRecord; + +/** + * {@link MapFunction} which wraps a {@link RecordConverter} to be used with Flink. + */ +public class ConverterAdapter<T> implements MapFunction<ActivePowerRecord, T> { + + private static final long serialVersionUID = -5263671231838353747L; // NOPMD + + private final RecordConverter<T> recordConverter; + + public ConverterAdapter(final RecordConverter<T> recordConverter) { + this.recordConverter = recordConverter; + } + + @Override + public T map(final ActivePowerRecord record) throws Exception { + return this.recordConverter.convert(record); + } + +} diff --git a/theodolite-benchmarks/uc1-flink/src/main/java/theodolite/uc1/application/GsonMapper.java b/theodolite-benchmarks/uc1-flink/src/main/java/theodolite/uc1/application/GsonMapper.java deleted file mode 100644 index 831db7fe63be6529e6b7ba299dca92b138ff7d13..0000000000000000000000000000000000000000 --- a/theodolite-benchmarks/uc1-flink/src/main/java/theodolite/uc1/application/GsonMapper.java +++ /dev/null @@ -1,22 +0,0 @@ -package theodolite.uc1.application; - -import com.google.gson.Gson; -import org.apache.flink.api.common.functions.MapFunction; -import titan.ccp.model.records.ActivePowerRecord; - -/** - * {@link MapFunction} which maps {@link ActivePowerRecord}s to their representation as JSON - * strings. - */ -public class GsonMapper implements MapFunction<ActivePowerRecord, String> { - - private static final long serialVersionUID = -5263671231838353747L; // NOPMD - - private static final Gson GSON = new Gson(); - - @Override - public String map(final ActivePowerRecord value) throws Exception { - return GSON.toJson(value); - } - -} diff --git a/theodolite-benchmarks/uc1-flink/src/main/java/theodolite/uc1/application/HistoryServiceFlinkJob.java b/theodolite-benchmarks/uc1-flink/src/main/java/theodolite/uc1/application/HistoryServiceFlinkJob.java index 0cb132e526486e71409736b843dd25bdfa52da4a..41131152734f68dd34489461b1ad31d94a970eac 100644 --- a/theodolite-benchmarks/uc1-flink/src/main/java/theodolite/uc1/application/HistoryServiceFlinkJob.java +++ b/theodolite-benchmarks/uc1-flink/src/main/java/theodolite/uc1/application/HistoryServiceFlinkJob.java @@ -7,6 +7,8 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import rocks.theodolite.benchmarks.uc1.commons.DatabaseAdapter; +import rocks.theodolite.benchmarks.uc1.commons.logger.LogWriterFactory; import theodolite.commons.flink.KafkaConnectorFactory; import titan.ccp.common.configuration.ServiceConfigurations; import titan.ccp.model.records.ActivePowerRecord; @@ -22,6 +24,8 @@ public final class HistoryServiceFlinkJob { private final StreamExecutionEnvironment env; private final String applicationId; + private final DatabaseAdapter<String> databaseAdapter = LogWriterFactory.forJson(); + /** * Create a new instance of the {@link HistoryServiceFlinkJob}. */ @@ -69,9 +73,10 @@ public final class HistoryServiceFlinkJob { stream // .rebalance() - .map(new GsonMapper()) - .flatMap((record, c) -> LOGGER.info("Record: {}", record)) - .returns(Types.GENERIC(Object.class)); // Will never be used + .map(new ConverterAdapter<>(this.databaseAdapter.getRecordConverter())) + .returns(Types.STRING) + .flatMap(new WriterAdapter<>(this.databaseAdapter.getDatabaseWriter())) + .returns(Types.VOID); // Will never be used } /** diff --git a/theodolite-benchmarks/uc1-flink/src/main/java/theodolite/uc1/application/WriterAdapter.java b/theodolite-benchmarks/uc1-flink/src/main/java/theodolite/uc1/application/WriterAdapter.java new file mode 100644 index 0000000000000000000000000000000000000000..7974fd19b05c508dc970e6bd5173b0e55c5df561 --- /dev/null +++ b/theodolite-benchmarks/uc1-flink/src/main/java/theodolite/uc1/application/WriterAdapter.java @@ -0,0 +1,26 @@ +package theodolite.uc1.application; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.util.Collector; +import rocks.theodolite.benchmarks.uc1.commons.DatabaseAdapter; +import rocks.theodolite.benchmarks.uc1.commons.DatabaseWriter; + +/** + * {@link FlatMapFunction} which wraps a {@link DatabaseAdapter} to be used with Flink. + */ +public class WriterAdapter<T> implements FlatMapFunction<T, Void> { + + private static final long serialVersionUID = -5263671231838353747L; // NOPMD + + private final DatabaseWriter<T> databaseWriter; + + public WriterAdapter(final DatabaseWriter<T> databaseWriter) { + this.databaseWriter = databaseWriter; + } + + @Override + public void flatMap(final T value, final Collector<Void> out) throws Exception { + this.databaseWriter.write(value); + } + +} diff --git a/theodolite-benchmarks/uc1-kstreams/build.gradle b/theodolite-benchmarks/uc1-kstreams/build.gradle index 74cfb450ec80759f60582c25ab844e3398d5bf02..1460a99a2aad7767b84259494c4c231344862545 100644 --- a/theodolite-benchmarks/uc1-kstreams/build.gradle +++ b/theodolite-benchmarks/uc1-kstreams/build.gradle @@ -2,4 +2,8 @@ plugins { id 'theodolite.kstreams' } +dependencies { + implementation project(':uc1-commons') +} + mainClassName = "theodolite.uc1.application.HistoryService" diff --git a/theodolite-benchmarks/uc1-kstreams/src/main/java/theodolite/uc1/streamprocessing/TopologyBuilder.java b/theodolite-benchmarks/uc1-kstreams/src/main/java/theodolite/uc1/streamprocessing/TopologyBuilder.java index 427a838f45f6807ede00dcb68ebf8c5580f28ce6..64d6d08c30c1a015c668e744fe164bda3f493aa4 100644 --- a/theodolite-benchmarks/uc1-kstreams/src/main/java/theodolite/uc1/streamprocessing/TopologyBuilder.java +++ b/theodolite-benchmarks/uc1-kstreams/src/main/java/theodolite/uc1/streamprocessing/TopologyBuilder.java @@ -1,13 +1,12 @@ package theodolite.uc1.streamprocessing; -import com.google.gson.Gson; import java.util.Properties; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.Consumed; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import rocks.theodolite.benchmarks.uc1.commons.DatabaseAdapter; +import rocks.theodolite.benchmarks.uc1.commons.logger.LogWriterFactory; import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; import titan.ccp.model.records.ActivePowerRecord; @@ -16,12 +15,11 @@ import titan.ccp.model.records.ActivePowerRecord; */ public class TopologyBuilder { - private static final Logger LOGGER = LoggerFactory.getLogger(TopologyBuilder.class); - private static final Gson GSON = new Gson(); - private final String inputTopic; private final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory; + private final DatabaseAdapter<String> databaseAdapter = LogWriterFactory.forJson(); + private final StreamsBuilder builder = new StreamsBuilder(); @@ -42,8 +40,8 @@ public class TopologyBuilder { .stream(this.inputTopic, Consumed.with( Serdes.String(), this.srAvroSerdeFactory.<ActivePowerRecord>forValues())) - .mapValues(v -> GSON.toJson(v)) - .foreach((k, record) -> LOGGER.info("Record: {}", record)); + .mapValues(this.databaseAdapter.getRecordConverter()::convert) + .foreach((k, record) -> this.databaseAdapter.getDatabaseWriter().write(record)); return this.builder.build(properties); }