diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index a8cf4930e1f07fef3ca39971893d259d8faa65db..babcf13f2762227ef349a0ec569fef30daadd0a0 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -84,26 +84,37 @@ test-docs-links: - build-docs script: bundle exec htmlproofer --assume-extension --allow_hash_href ./_site +build-docs-crds: + stage: build + image: + name: ghcr.io/fybrik/crdoc:0.6.1 + entrypoint: [""] + script: /crdoc --resources theodolite/crd/ --template docs/api-reference/crds.tmpl --output docs/api-reference/crds.ref.md + artifacts: + paths: + - docs/api-reference/crds.ref.md + expire_in: 1 week + rules: + - changes: + - docs/api-reference/crds.tmpl + - theodolite/crd/**/* + - when: manual + allow_failure: true + test-docs-crds-regression: stage: test - image: golang + needs: + - build-docs-crds + image: alpine:3.15 before_script: - cd docs - - go install fybrik.io/crdoc@latest script: - - crdoc --resources ../theodolite/crd/ --template api-reference/crds.tmpl --output api-reference/crds.ref.md - cmp api-reference/crds.md api-reference/crds.ref.md artifacts: when: on_failure paths: - docs/api-reference/crds.ref.md expire_in: 1 week - rules: - - changes: - - docs/api-reference/crds.tmpl - - theodolite/crd/**/* - - when: manual - allow_failure: true # Theodolite Helm Chart diff --git a/docs/README.md b/docs/README.md index 52b5311295e5a96721d9aa42f7e9c319da06960c..a19f94305dfdcb1de7c46da98afbb52b28a6bfa0 100644 --- a/docs/README.md +++ b/docs/README.md @@ -39,5 +39,5 @@ crdoc --resources ../theodolite/crd/ --template api-reference/crds.tmpl --outpu With the following command, crdoc is executed in Docker: ```sh -docker run --rm -v "`pwd`/../theodolite/crd/":/crd -u $UID -v "`pwd`/api-reference":/api-reference ghcr.io/fybrik/crdoc:0.6.0 --resources /crd/ --template /api-reference/crds.tmpl --output /api-reference/crds.md +docker run --rm -v "`pwd`/../theodolite/crd/":/crd -v "`pwd`/api-reference":/api-reference ghcr.io/fybrik/crdoc:0.6.1 --resources /crd/ --template /api-reference/crds.tmpl --output /api-reference/crds.md ``` diff --git a/theodolite-benchmarks/beam-commons/.settings/org.eclipse.jdt.ui.prefs b/theodolite-benchmarks/beam-commons/.settings/org.eclipse.jdt.ui.prefs new file mode 100644 index 0000000000000000000000000000000000000000..60b9977149c7b281cb2ac91ee282f73d4351e348 --- /dev/null +++ b/theodolite-benchmarks/beam-commons/.settings/org.eclipse.jdt.ui.prefs @@ -0,0 +1,127 @@ +cleanup.add_default_serial_version_id=true +cleanup.add_generated_serial_version_id=false +cleanup.add_missing_annotations=true +cleanup.add_missing_deprecated_annotations=true +cleanup.add_missing_methods=false +cleanup.add_missing_nls_tags=false +cleanup.add_missing_override_annotations=true +cleanup.add_missing_override_annotations_interface_methods=true +cleanup.add_serial_version_id=false +cleanup.always_use_blocks=true +cleanup.always_use_parentheses_in_expressions=false +cleanup.always_use_this_for_non_static_field_access=true +cleanup.always_use_this_for_non_static_method_access=true +cleanup.convert_functional_interfaces=false +cleanup.convert_to_enhanced_for_loop=true +cleanup.correct_indentation=true +cleanup.format_source_code=true +cleanup.format_source_code_changes_only=false +cleanup.insert_inferred_type_arguments=false +cleanup.make_local_variable_final=true +cleanup.make_parameters_final=true +cleanup.make_private_fields_final=true +cleanup.make_type_abstract_if_missing_method=false +cleanup.make_variable_declarations_final=true +cleanup.never_use_blocks=false +cleanup.never_use_parentheses_in_expressions=true +cleanup.organize_imports=true +cleanup.qualify_static_field_accesses_with_declaring_class=false +cleanup.qualify_static_member_accesses_through_instances_with_declaring_class=true +cleanup.qualify_static_member_accesses_through_subtypes_with_declaring_class=true +cleanup.qualify_static_member_accesses_with_declaring_class=true +cleanup.qualify_static_method_accesses_with_declaring_class=false +cleanup.remove_private_constructors=true +cleanup.remove_redundant_modifiers=false +cleanup.remove_redundant_semicolons=true +cleanup.remove_redundant_type_arguments=true +cleanup.remove_trailing_whitespaces=true +cleanup.remove_trailing_whitespaces_all=true +cleanup.remove_trailing_whitespaces_ignore_empty=false +cleanup.remove_unnecessary_casts=true +cleanup.remove_unnecessary_nls_tags=true +cleanup.remove_unused_imports=true +cleanup.remove_unused_local_variables=false +cleanup.remove_unused_private_fields=true +cleanup.remove_unused_private_members=false +cleanup.remove_unused_private_methods=true +cleanup.remove_unused_private_types=true +cleanup.sort_members=false +cleanup.sort_members_all=false +cleanup.use_anonymous_class_creation=false +cleanup.use_blocks=true +cleanup.use_blocks_only_for_return_and_throw=false +cleanup.use_lambda=true +cleanup.use_parentheses_in_expressions=true +cleanup.use_this_for_non_static_field_access=true +cleanup.use_this_for_non_static_field_access_only_if_necessary=false +cleanup.use_this_for_non_static_method_access=true +cleanup.use_this_for_non_static_method_access_only_if_necessary=false +cleanup_profile=_CAU-SE-Style +cleanup_settings_version=2 +eclipse.preferences.version=1 +editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true +formatter_profile=_CAU-SE-Style +formatter_settings_version=21 +org.eclipse.jdt.ui.ignorelowercasenames=true +org.eclipse.jdt.ui.importorder=; +org.eclipse.jdt.ui.ondemandthreshold=99 +org.eclipse.jdt.ui.staticondemandthreshold=99 +sp_cleanup.add_default_serial_version_id=true +sp_cleanup.add_generated_serial_version_id=false +sp_cleanup.add_missing_annotations=true +sp_cleanup.add_missing_deprecated_annotations=true +sp_cleanup.add_missing_methods=false +sp_cleanup.add_missing_nls_tags=false +sp_cleanup.add_missing_override_annotations=true +sp_cleanup.add_missing_override_annotations_interface_methods=true +sp_cleanup.add_serial_version_id=false +sp_cleanup.always_use_blocks=true +sp_cleanup.always_use_parentheses_in_expressions=false +sp_cleanup.always_use_this_for_non_static_field_access=true +sp_cleanup.always_use_this_for_non_static_method_access=true +sp_cleanup.convert_functional_interfaces=false +sp_cleanup.convert_to_enhanced_for_loop=true +sp_cleanup.correct_indentation=true +sp_cleanup.format_source_code=true +sp_cleanup.format_source_code_changes_only=false +sp_cleanup.insert_inferred_type_arguments=false +sp_cleanup.make_local_variable_final=true +sp_cleanup.make_parameters_final=true +sp_cleanup.make_private_fields_final=true +sp_cleanup.make_type_abstract_if_missing_method=false +sp_cleanup.make_variable_declarations_final=true +sp_cleanup.never_use_blocks=false +sp_cleanup.never_use_parentheses_in_expressions=true +sp_cleanup.on_save_use_additional_actions=true +sp_cleanup.organize_imports=true +sp_cleanup.qualify_static_field_accesses_with_declaring_class=false +sp_cleanup.qualify_static_member_accesses_through_instances_with_declaring_class=true +sp_cleanup.qualify_static_member_accesses_through_subtypes_with_declaring_class=true +sp_cleanup.qualify_static_member_accesses_with_declaring_class=true +sp_cleanup.qualify_static_method_accesses_with_declaring_class=false +sp_cleanup.remove_private_constructors=true +sp_cleanup.remove_redundant_modifiers=false +sp_cleanup.remove_redundant_semicolons=true +sp_cleanup.remove_redundant_type_arguments=true +sp_cleanup.remove_trailing_whitespaces=true +sp_cleanup.remove_trailing_whitespaces_all=true +sp_cleanup.remove_trailing_whitespaces_ignore_empty=false +sp_cleanup.remove_unnecessary_casts=true +sp_cleanup.remove_unnecessary_nls_tags=true +sp_cleanup.remove_unused_imports=true +sp_cleanup.remove_unused_local_variables=false +sp_cleanup.remove_unused_private_fields=true +sp_cleanup.remove_unused_private_members=false +sp_cleanup.remove_unused_private_methods=true +sp_cleanup.remove_unused_private_types=true +sp_cleanup.sort_members=false +sp_cleanup.sort_members_all=false +sp_cleanup.use_anonymous_class_creation=false +sp_cleanup.use_blocks=true +sp_cleanup.use_blocks_only_for_return_and_throw=false +sp_cleanup.use_lambda=true +sp_cleanup.use_parentheses_in_expressions=true +sp_cleanup.use_this_for_non_static_field_access=true +sp_cleanup.use_this_for_non_static_field_access_only_if_necessary=false +sp_cleanup.use_this_for_non_static_method_access=true +sp_cleanup.use_this_for_non_static_method_access_only_if_necessary=false \ No newline at end of file diff --git a/theodolite-benchmarks/beam-commons/build.gradle b/theodolite-benchmarks/beam-commons/build.gradle index a809f6bc4b97d8d62b807243eddecda8a5de5032..a15b388a33ce53b0f1b8f2f0567996917b9f18d1 100644 --- a/theodolite-benchmarks/beam-commons/build.gradle +++ b/theodolite-benchmarks/beam-commons/build.gradle @@ -13,21 +13,19 @@ repositories { } dependencies { - // These dependencies are used internally, and not exposed to consumers on their own compile classpath. implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true } implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true } - implementation 'com.google.code.gson:gson:2.8.2' - implementation 'com.google.guava:guava:24.1-jre' + implementation group: 'org.apache.beam', name: 'beam-sdks-java-core', version: '2.22.0' implementation('org.apache.beam:beam-sdks-java-io-kafka:2.22.0'){ exclude group: 'org.apache.kafka', module: 'kafka-clients' } + implementation ('io.confluent:kafka-streams-avro-serde:5.3.2') + implementation group: 'org.slf4j', name: 'slf4j-simple', version: '1.7.30' - implementation group: 'org.apache.beam', name: 'beam-sdks-java-core', version: '2.22.0' - + runtimeOnly 'org.slf4j:slf4j-api:1.7.32' runtimeOnly 'org.slf4j:slf4j-jdk14:1.7.32' - // Use JUnit test framework testImplementation 'junit:junit:4.12' } diff --git a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractPipeline.java b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractPipeline.java index c936ce918c10f3c500cdd26f7e057cd7b6c555b6..e67c5f60b6401b4ecd1f42b2a184afbc4654f425 100644 --- a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractPipeline.java +++ b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractPipeline.java @@ -12,6 +12,8 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; */ public class AbstractPipeline extends Pipeline { + private static final String KAFKA_CONFIG_SPECIFIC_AVRO_READER = "specific.avro.reader"; + private static final String KAFKA_CONFIG_SCHEMA_REGISTRY_URL = "schema.registry.url"; protected final String inputTopic; protected final String bootstrapServer; // Application Configurations @@ -21,8 +23,8 @@ public class AbstractPipeline extends Pipeline { super(options); this.config = config; - inputTopic = config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC); - bootstrapServer = config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS); + this.inputTopic = config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC); + this.bootstrapServer = config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS); } /** @@ -32,19 +34,37 @@ public class AbstractPipeline extends Pipeline { */ public Map<String, Object> buildConsumerConfig() { final Map<String, Object> consumerConfig = new HashMap<>(); - consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, - config.getString(ConfigurationKeys.ENABLE_AUTO_COMMIT_CONFIG)); - consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, - config - .getString(ConfigurationKeys.AUTO_OFFSET_RESET_CONFIG)); - consumerConfig.put("schema.registry.url", - config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL)); - - consumerConfig.put("specific.avro.reader", - config.getString(ConfigurationKeys.SPECIFIC_AVRO_READER)); - - final String applicationName = config.getString(ConfigurationKeys.APPLICATION_NAME); - consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, applicationName); + consumerConfig.put( + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, + this.config.getString(ConfigurationKeys.ENABLE_AUTO_COMMIT_CONFIG)); + consumerConfig.put( + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, + this.config.getString(ConfigurationKeys.AUTO_OFFSET_RESET_CONFIG)); + consumerConfig.put( + KAFKA_CONFIG_SCHEMA_REGISTRY_URL, + this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL)); + consumerConfig.put( + KAFKA_CONFIG_SPECIFIC_AVRO_READER, + this.config.getString(ConfigurationKeys.SPECIFIC_AVRO_READER)); + consumerConfig.put( + ConsumerConfig.GROUP_ID_CONFIG, + this.config.getString(ConfigurationKeys.APPLICATION_NAME)); return consumerConfig; } + + /** + * Builds a simple configuration for a Kafka producer transformation. + * + * @return the build configuration. + */ + public Map<String, Object> buildProducerConfig() { + final Map<String, Object> config = new HashMap<>(); + config.put( + KAFKA_CONFIG_SCHEMA_REGISTRY_URL, + this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL)); + config.put( + KAFKA_CONFIG_SPECIFIC_AVRO_READER, + this.config.getString(ConfigurationKeys.SPECIFIC_AVRO_READER)); + return config; + } } diff --git a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/ActivePowerRecordDeserializer.java b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/ActivePowerRecordDeserializer.java new file mode 100644 index 0000000000000000000000000000000000000000..c53dde3d5f4b7d18822c916a637c356b898fe2cd --- /dev/null +++ b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/ActivePowerRecordDeserializer.java @@ -0,0 +1,11 @@ +package theodolite.commons.beam.kafka; + +import io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer; +import org.apache.kafka.common.serialization.Deserializer; +import titan.ccp.model.records.ActivePowerRecord; + +/** + * A Kafka {@link Deserializer} for typed Schema Registry {@link ActivePowerRecord}. + */ +public class ActivePowerRecordDeserializer extends SpecificAvroDeserializer<ActivePowerRecord> { +} diff --git a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaActivePowerRecordReader.java b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaActivePowerRecordReader.java deleted file mode 100644 index f102bee41d66c251ecb66418dd3b90dced32cffb..0000000000000000000000000000000000000000 --- a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaActivePowerRecordReader.java +++ /dev/null @@ -1,61 +0,0 @@ -package theodolite.commons.beam.kafka; - -import io.confluent.kafka.serializers.KafkaAvroDeserializer; -import java.util.Map; -import org.apache.beam.sdk.coders.AvroCoder; -import org.apache.beam.sdk.io.kafka.KafkaIO; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PBegin; -import org.apache.beam.sdk.values.PCollection; -import org.apache.kafka.common.serialization.StringDeserializer; -import titan.ccp.model.records.ActivePowerRecord; - -/** - * Simple {@link PTransform} that read from Kafka using {@link KafkaIO}. - */ -public class KafkaActivePowerRecordReader extends - PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> { - - private static final long serialVersionUID = 2603286150183186115L; - private final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> reader; - - - /** - * Instantiates a {@link PTransform} that reads from Kafka with the given Configuration. - */ - public KafkaActivePowerRecordReader(final String bootstrapServer, final String inputTopic, - final Map<String, Object> consumerConfig) { - super(); - - if (bootstrapServer == null) { - throw new IllegalArgumentException("bootstrapServer is null"); - } - - if (inputTopic == null) { - throw new IllegalArgumentException("inputTopic is null"); - } - - // Check if boostrap server and inputTopic are defined - if (bootstrapServer.isEmpty() || inputTopic.isEmpty()) { - throw new IllegalArgumentException("bootstrapServer or inputTopic missing"); - } - - - reader = - KafkaIO.<String, ActivePowerRecord>read() - .withBootstrapServers(bootstrapServer) - .withTopic(inputTopic) - .withKeyDeserializer(StringDeserializer.class) - .withValueDeserializerAndCoder((Class) KafkaAvroDeserializer.class, - AvroCoder.of(ActivePowerRecord.class)) - .withConsumerConfigUpdates(consumerConfig) - .withoutMetadata(); - } - - @Override - public PCollection<KV<String, ActivePowerRecord>> expand(final PBegin input) { - return input.apply(this.reader); - } - -} diff --git a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaActivePowerTimestampReader.java b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaActivePowerTimestampReader.java index 732afe9a0c1d4bdfea876025fceea0c5da1310fe..7a48bd71d497f65351888425d092decf5adb05f3 100644 --- a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaActivePowerTimestampReader.java +++ b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaActivePowerTimestampReader.java @@ -1,6 +1,5 @@ package theodolite.commons.beam.kafka; -import io.confluent.kafka.serializers.KafkaAvroDeserializer; import java.util.Map; import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.io.kafka.KafkaIO; @@ -12,40 +11,37 @@ import org.apache.kafka.common.serialization.StringDeserializer; import titan.ccp.model.records.ActivePowerRecord; /** - * Simple {@link PTransform} that read from Kafka using {@link KafkaIO}. - * Has additional a TimestampPolicy. + * Simple {@link PTransform} that reads from Kafka using {@link KafkaIO} with event time. */ -public class KafkaActivePowerTimestampReader extends - PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> { +public class KafkaActivePowerTimestampReader + extends PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> { private static final long serialVersionUID = 2603286150183186115L; private final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> reader; - /** * Instantiates a {@link PTransform} that reads from Kafka with the given Configuration. */ - public KafkaActivePowerTimestampReader(final String bootstrapServer, final String inputTopic, - final Map<String, Object> consumerConfig) { + public KafkaActivePowerTimestampReader( + final String bootstrapServer, + final String inputTopic, + final Map<String, Object> consumerConfig) { super(); - // Check if boostrap server and inputTopic are defined + // Check if bootstrap server and inputTopic are defined if (bootstrapServer.isEmpty() || inputTopic.isEmpty()) { throw new IllegalArgumentException("bootstrapServer or inputTopic missing"); } - reader = - KafkaIO.<String, ActivePowerRecord>read() - .withBootstrapServers(bootstrapServer) - .withTopic(inputTopic) - .withKeyDeserializer(StringDeserializer.class) - .withValueDeserializerAndCoder((Class) KafkaAvroDeserializer.class, - AvroCoder.of(ActivePowerRecord.class)) - .withConsumerConfigUpdates(consumerConfig) - // Set TimeStampPolicy for event time - .withTimestampPolicyFactory( - (tp, previousWaterMark) -> new EventTimePolicy(previousWaterMark)) - .withoutMetadata(); + this.reader = KafkaIO.<String, ActivePowerRecord>read().withBootstrapServers(bootstrapServer) + .withTopic(inputTopic).withKeyDeserializer(StringDeserializer.class) + .withValueDeserializerAndCoder( + ActivePowerRecordDeserializer.class, + AvroCoder.of(ActivePowerRecord.class)) + .withConsumerConfigUpdates(consumerConfig) + .withTimestampPolicyFactory( + (tp, previousWatermark) -> new EventTimePolicy(previousWatermark)) + .withoutMetadata(); } @Override diff --git a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaWriterTransformation.java b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaWriterTransformation.java index 0a3867e71479e36ce30a9f222dfd0a7d473bd209..6d33f6f01493c10a1eb6aca56dd309ae58ce4b8d 100644 --- a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaWriterTransformation.java +++ b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaWriterTransformation.java @@ -1,5 +1,6 @@ package theodolite.commons.beam.kafka; +import java.util.Map; import org.apache.beam.sdk.io.kafka.KafkaIO; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.KV; @@ -9,23 +10,35 @@ import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.StringSerializer; /** - * Wrapper for a Kafka writing Transformation - * where the value type can be generic. + * Wrapper for a Kafka writing Transformation where the value type can be generic. + * * @param <T> type of the value. */ -public class KafkaWriterTransformation<T> extends - PTransform<PCollection<KV<String, T>>, PDone> { +public class KafkaWriterTransformation<T> extends PTransform<PCollection<KV<String, T>>, PDone> { private static final long serialVersionUID = 3171423303843174723L; private final PTransform<PCollection<KV<String, T>>, PDone> writer; /** - * Creates a new kafka writer transformation. + * Creates a new Kafka writer transformation. */ - public KafkaWriterTransformation(final String bootstrapServer, final String outputTopic, - final Class<? extends Serializer<T>> valueSerializer) { + public KafkaWriterTransformation( + final String bootstrapServer, + final String outputTopic, + final Class<? extends Serializer<T>> valueSerializer) { + this(bootstrapServer, outputTopic, valueSerializer, Map.of()); + } + + /** + * Creates a new Kafka writer transformation. + */ + public KafkaWriterTransformation( + final String bootstrapServer, + final String outputTopic, + final Class<? extends Serializer<T>> valueSerializer, + final Map<String, Object> producerConfig) { super(); - // Check if boostrap server and outputTopic are defined + // Check if bootstrap server and outputTopic are defined if (bootstrapServer.isEmpty() || outputTopic.isEmpty()) { throw new IllegalArgumentException("bootstrapServer or outputTopic missing"); } @@ -34,7 +47,8 @@ public class KafkaWriterTransformation<T> extends .withBootstrapServers(bootstrapServer) .withTopic(outputTopic) .withKeySerializer(StringSerializer.class) - .withValueSerializer(valueSerializer); + .withValueSerializer(valueSerializer) + .withProducerConfigUpdates(producerConfig); } diff --git a/theodolite-benchmarks/flink-commons/.settings/org.eclipse.jdt.ui.prefs b/theodolite-benchmarks/flink-commons/.settings/org.eclipse.jdt.ui.prefs index 66b402b58f39b79066638ce679c27c0378d5be54..174249a98f9d91ce2cbf2bb64b27c09b37f05d9f 100644 --- a/theodolite-benchmarks/flink-commons/.settings/org.eclipse.jdt.ui.prefs +++ b/theodolite-benchmarks/flink-commons/.settings/org.eclipse.jdt.ui.prefs @@ -61,7 +61,7 @@ cleanup_settings_version=2 eclipse.preferences.version=1 editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true formatter_profile=_CAU-SE-Style -formatter_settings_version=15 +formatter_settings_version=21 org.eclipse.jdt.ui.ignorelowercasenames=true org.eclipse.jdt.ui.importorder=; org.eclipse.jdt.ui.ondemandthreshold=99 diff --git a/theodolite-benchmarks/kstreams-commons/.settings/org.eclipse.jdt.ui.prefs b/theodolite-benchmarks/kstreams-commons/.settings/org.eclipse.jdt.ui.prefs index 98b5ca8064a352aacfe2aebd13fbd0a87735fc3e..713419c8d3d74d3bd7fd05c3e839367753fcdee0 100644 --- a/theodolite-benchmarks/kstreams-commons/.settings/org.eclipse.jdt.ui.prefs +++ b/theodolite-benchmarks/kstreams-commons/.settings/org.eclipse.jdt.ui.prefs @@ -61,7 +61,7 @@ cleanup_settings_version=2 eclipse.preferences.version=1 editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true formatter_profile=_CAU-SE-Style -formatter_settings_version=15 +formatter_settings_version=21 org.eclipse.jdt.ui.ignorelowercasenames=true org.eclipse.jdt.ui.importorder=; org.eclipse.jdt.ui.ondemandthreshold=99 diff --git a/theodolite-benchmarks/load-generator-commons/.settings/org.eclipse.jdt.ui.prefs b/theodolite-benchmarks/load-generator-commons/.settings/org.eclipse.jdt.ui.prefs index fa98ca63d77bdee891150bd6713f70197a75cefc..a375cb792eeb842ecfd1f789fbf6a716df43e9c8 100644 --- a/theodolite-benchmarks/load-generator-commons/.settings/org.eclipse.jdt.ui.prefs +++ b/theodolite-benchmarks/load-generator-commons/.settings/org.eclipse.jdt.ui.prefs @@ -61,7 +61,7 @@ cleanup_settings_version=2 eclipse.preferences.version=1 editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true formatter_profile=_CAU-SE-Style -formatter_settings_version=15 +formatter_settings_version=21 org.eclipse.jdt.ui.ignorelowercasenames=true org.eclipse.jdt.ui.importorder=; org.eclipse.jdt.ui.ondemandthreshold=99 diff --git a/theodolite-benchmarks/load-generator-commons/src/test/java/theodolite/commons/workloadgeneration/HttpRecordSenderTest.java b/theodolite-benchmarks/load-generator-commons/src/test/java/theodolite/commons/workloadgeneration/HttpRecordSenderTest.java index 6d908d34b7c6b87254782b6ae8b0b8dc2a6d036e..0d331a900f5bd5c18dbeaf2fc2a249256151ce70 100644 --- a/theodolite-benchmarks/load-generator-commons/src/test/java/theodolite/commons/workloadgeneration/HttpRecordSenderTest.java +++ b/theodolite-benchmarks/load-generator-commons/src/test/java/theodolite/commons/workloadgeneration/HttpRecordSenderTest.java @@ -10,7 +10,6 @@ import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo; import static com.github.tomakehurst.wiremock.client.WireMock.verify; import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.options; import com.github.tomakehurst.wiremock.junit.WireMockRule; -import com.google.gson.Gson; import java.net.URI; import org.junit.Before; import org.junit.Rule; @@ -21,8 +20,6 @@ public class HttpRecordSenderTest { private HttpRecordSender<ActivePowerRecord> httpRecordSender; - private Gson gson; - @Rule public WireMockRule wireMockRule = new WireMockRule(options().dynamicPort()); @@ -30,7 +27,6 @@ public class HttpRecordSenderTest { public void setup() { this.httpRecordSender = new HttpRecordSender<>(URI.create("http://localhost:" + this.wireMockRule.port())); - this.gson = new Gson(); } @Test diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/LogKeyValue.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/LogKeyValue.java index 79566fd937b9c100663d426610b6ff476035ef87..251523441e339cbaf58c7e3a1b30e97cc354df18 100644 --- a/theodolite-benchmarks/uc1-beam/src/main/java/application/LogKeyValue.java +++ b/theodolite-benchmarks/uc1-beam/src/main/java/application/LogKeyValue.java @@ -8,7 +8,6 @@ import org.slf4j.LoggerFactory; /** * Logs all Key Value pairs. */ -@SuppressWarnings({"unused"}) public class LogKeyValue extends DoFn<KV<String, String>, KV<String, String>> { private static final long serialVersionUID = 4328743; private static final Logger LOGGER = LoggerFactory.getLogger(LogKeyValue.class); @@ -19,9 +18,7 @@ public class LogKeyValue extends DoFn<KV<String, String>, KV<String, String>> { @ProcessElement public void processElement(@Element final KV<String, String> kv, final OutputReceiver<KV<String, String>> out) { - if (LOGGER.isInfoEnabled()) { - LOGGER.info("Key: {}, Value: {}", kv.getKey(), kv.getValue()); - } + LOGGER.info("Key: {}, Value: {}", kv.getKey(), kv.getValue()); out.output(kv); } } diff --git a/theodolite-benchmarks/uc1-flink/.settings/org.eclipse.jdt.ui.prefs b/theodolite-benchmarks/uc1-flink/.settings/org.eclipse.jdt.ui.prefs index fa98ca63d77bdee891150bd6713f70197a75cefc..a375cb792eeb842ecfd1f789fbf6a716df43e9c8 100644 --- a/theodolite-benchmarks/uc1-flink/.settings/org.eclipse.jdt.ui.prefs +++ b/theodolite-benchmarks/uc1-flink/.settings/org.eclipse.jdt.ui.prefs @@ -61,7 +61,7 @@ cleanup_settings_version=2 eclipse.preferences.version=1 editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true formatter_profile=_CAU-SE-Style -formatter_settings_version=15 +formatter_settings_version=21 org.eclipse.jdt.ui.ignorelowercasenames=true org.eclipse.jdt.ui.importorder=; org.eclipse.jdt.ui.ondemandthreshold=99 diff --git a/theodolite-benchmarks/uc1-kstreams/.settings/org.eclipse.jdt.ui.prefs b/theodolite-benchmarks/uc1-kstreams/.settings/org.eclipse.jdt.ui.prefs index fa98ca63d77bdee891150bd6713f70197a75cefc..a375cb792eeb842ecfd1f789fbf6a716df43e9c8 100644 --- a/theodolite-benchmarks/uc1-kstreams/.settings/org.eclipse.jdt.ui.prefs +++ b/theodolite-benchmarks/uc1-kstreams/.settings/org.eclipse.jdt.ui.prefs @@ -61,7 +61,7 @@ cleanup_settings_version=2 eclipse.preferences.version=1 editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true formatter_profile=_CAU-SE-Style -formatter_settings_version=15 +formatter_settings_version=21 org.eclipse.jdt.ui.ignorelowercasenames=true org.eclipse.jdt.ui.importorder=; org.eclipse.jdt.ui.ondemandthreshold=99 diff --git a/theodolite-benchmarks/uc1-load-generator/.settings/org.eclipse.jdt.ui.prefs b/theodolite-benchmarks/uc1-load-generator/.settings/org.eclipse.jdt.ui.prefs index 4d01df75552c562406705858b6368ecf59d6e82f..ac23341bf71ac68df4183361493261758fd5dafb 100644 --- a/theodolite-benchmarks/uc1-load-generator/.settings/org.eclipse.jdt.ui.prefs +++ b/theodolite-benchmarks/uc1-load-generator/.settings/org.eclipse.jdt.ui.prefs @@ -61,7 +61,7 @@ cleanup_settings_version=2 eclipse.preferences.version=1 editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true formatter_profile=_CAU-SE-Style -formatter_settings_version=15 +formatter_settings_version=21 org.eclipse.jdt.ui.ignorelowercasenames=true org.eclipse.jdt.ui.importorder=; org.eclipse.jdt.ui.ondemandthreshold=99 diff --git a/theodolite-benchmarks/uc2-flink/.settings/org.eclipse.jdt.ui.prefs b/theodolite-benchmarks/uc2-flink/.settings/org.eclipse.jdt.ui.prefs index 4d01df75552c562406705858b6368ecf59d6e82f..ac23341bf71ac68df4183361493261758fd5dafb 100644 --- a/theodolite-benchmarks/uc2-flink/.settings/org.eclipse.jdt.ui.prefs +++ b/theodolite-benchmarks/uc2-flink/.settings/org.eclipse.jdt.ui.prefs @@ -61,7 +61,7 @@ cleanup_settings_version=2 eclipse.preferences.version=1 editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true formatter_profile=_CAU-SE-Style -formatter_settings_version=15 +formatter_settings_version=21 org.eclipse.jdt.ui.ignorelowercasenames=true org.eclipse.jdt.ui.importorder=; org.eclipse.jdt.ui.ondemandthreshold=99 diff --git a/theodolite-benchmarks/uc2-kstreams/.settings/org.eclipse.jdt.ui.prefs b/theodolite-benchmarks/uc2-kstreams/.settings/org.eclipse.jdt.ui.prefs index 4d01df75552c562406705858b6368ecf59d6e82f..ac23341bf71ac68df4183361493261758fd5dafb 100644 --- a/theodolite-benchmarks/uc2-kstreams/.settings/org.eclipse.jdt.ui.prefs +++ b/theodolite-benchmarks/uc2-kstreams/.settings/org.eclipse.jdt.ui.prefs @@ -61,7 +61,7 @@ cleanup_settings_version=2 eclipse.preferences.version=1 editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true formatter_profile=_CAU-SE-Style -formatter_settings_version=15 +formatter_settings_version=21 org.eclipse.jdt.ui.ignorelowercasenames=true org.eclipse.jdt.ui.importorder=; org.eclipse.jdt.ui.ondemandthreshold=99 diff --git a/theodolite-benchmarks/uc2-load-generator/.settings/org.eclipse.jdt.ui.prefs b/theodolite-benchmarks/uc2-load-generator/.settings/org.eclipse.jdt.ui.prefs index 4d01df75552c562406705858b6368ecf59d6e82f..ac23341bf71ac68df4183361493261758fd5dafb 100644 --- a/theodolite-benchmarks/uc2-load-generator/.settings/org.eclipse.jdt.ui.prefs +++ b/theodolite-benchmarks/uc2-load-generator/.settings/org.eclipse.jdt.ui.prefs @@ -61,7 +61,7 @@ cleanup_settings_version=2 eclipse.preferences.version=1 editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true formatter_profile=_CAU-SE-Style -formatter_settings_version=15 +formatter_settings_version=21 org.eclipse.jdt.ui.ignorelowercasenames=true org.eclipse.jdt.ui.importorder=; org.eclipse.jdt.ui.ondemandthreshold=99 diff --git a/theodolite-benchmarks/uc3-flink/.settings/org.eclipse.jdt.ui.prefs b/theodolite-benchmarks/uc3-flink/.settings/org.eclipse.jdt.ui.prefs index 4d01df75552c562406705858b6368ecf59d6e82f..ac23341bf71ac68df4183361493261758fd5dafb 100644 --- a/theodolite-benchmarks/uc3-flink/.settings/org.eclipse.jdt.ui.prefs +++ b/theodolite-benchmarks/uc3-flink/.settings/org.eclipse.jdt.ui.prefs @@ -61,7 +61,7 @@ cleanup_settings_version=2 eclipse.preferences.version=1 editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true formatter_profile=_CAU-SE-Style -formatter_settings_version=15 +formatter_settings_version=21 org.eclipse.jdt.ui.ignorelowercasenames=true org.eclipse.jdt.ui.importorder=; org.eclipse.jdt.ui.ondemandthreshold=99 diff --git a/theodolite-benchmarks/uc3-kstreams/.settings/org.eclipse.jdt.ui.prefs b/theodolite-benchmarks/uc3-kstreams/.settings/org.eclipse.jdt.ui.prefs index fa98ca63d77bdee891150bd6713f70197a75cefc..a375cb792eeb842ecfd1f789fbf6a716df43e9c8 100644 --- a/theodolite-benchmarks/uc3-kstreams/.settings/org.eclipse.jdt.ui.prefs +++ b/theodolite-benchmarks/uc3-kstreams/.settings/org.eclipse.jdt.ui.prefs @@ -61,7 +61,7 @@ cleanup_settings_version=2 eclipse.preferences.version=1 editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true formatter_profile=_CAU-SE-Style -formatter_settings_version=15 +formatter_settings_version=21 org.eclipse.jdt.ui.ignorelowercasenames=true org.eclipse.jdt.ui.importorder=; org.eclipse.jdt.ui.ondemandthreshold=99 diff --git a/theodolite-benchmarks/uc3-load-generator/.settings/org.eclipse.jdt.ui.prefs b/theodolite-benchmarks/uc3-load-generator/.settings/org.eclipse.jdt.ui.prefs index fa98ca63d77bdee891150bd6713f70197a75cefc..a375cb792eeb842ecfd1f789fbf6a716df43e9c8 100644 --- a/theodolite-benchmarks/uc3-load-generator/.settings/org.eclipse.jdt.ui.prefs +++ b/theodolite-benchmarks/uc3-load-generator/.settings/org.eclipse.jdt.ui.prefs @@ -61,7 +61,7 @@ cleanup_settings_version=2 eclipse.preferences.version=1 editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true formatter_profile=_CAU-SE-Style -formatter_settings_version=15 +formatter_settings_version=21 org.eclipse.jdt.ui.ignorelowercasenames=true org.eclipse.jdt.ui.importorder=; org.eclipse.jdt.ui.ondemandthreshold=99 diff --git a/theodolite-benchmarks/uc4-beam/build.gradle b/theodolite-benchmarks/uc4-beam/build.gradle index 502e94fa737fb2ae1bab861407b27575cd8766ca..3e9d917cc3586e5df2c5645f1d2cbcf03e3993e4 100644 --- a/theodolite-benchmarks/uc4-beam/build.gradle +++ b/theodolite-benchmarks/uc4-beam/build.gradle @@ -2,4 +2,6 @@ plugins { id 'theodolite.beam' } - +dependencies { + implementation ('io.confluent:kafka-streams-avro-serde:5.3.2') +} diff --git a/theodolite-benchmarks/uc4-beam/src/main/java/application/DuplicateAsFlatMap.java b/theodolite-benchmarks/uc4-beam/src/main/java/application/DuplicateAsFlatMap.java index 7b66082c91b87c246d8c834249d2bc82545766f5..cf25f043ecef8c4bc564cff454d6477d69c945aa 100644 --- a/theodolite-benchmarks/uc4-beam/src/main/java/application/DuplicateAsFlatMap.java +++ b/theodolite-benchmarks/uc4-beam/src/main/java/application/DuplicateAsFlatMap.java @@ -16,11 +16,11 @@ import titan.ccp.model.records.ActivePowerRecord; /** * Duplicates the Kv containing the (Children,Parents) pair as a flat map. */ -public class DuplicateAsFlatMap extends DoFn - <KV<String, ActivePowerRecord>, KV<SensorParentKey, ActivePowerRecord>> { +public class DuplicateAsFlatMap + extends DoFn<KV<String, ActivePowerRecord>, KV<SensorParentKey, ActivePowerRecord>> { private static final long serialVersionUID = -5132355515723961647L; @StateId("parents") - private final StateSpec<ValueState<Set<String>>> parents = StateSpecs.value();//NOPMD + private final StateSpec<ValueState<Set<String>>> parents = StateSpecs.value();// NOPMD private final PCollectionView<Map<String, Set<String>>> childParentPairMap; public DuplicateAsFlatMap(final PCollectionView<Map<String, Set<String>>> childParentPairMap) { @@ -30,19 +30,20 @@ public class DuplicateAsFlatMap extends DoFn /** - * Generate a KV-pair for every child-parent match. + * Generate a KV-pair for every child-parent match. */ @ProcessElement - public void processElement(@Element final KV<String, ActivePowerRecord> kv, - final OutputReceiver<KV<SensorParentKey, ActivePowerRecord>> out, - @StateId("parents") final ValueState<Set<String>> state, - final ProcessContext c) { + public void processElement( + @Element final KV<String, ActivePowerRecord> kv, + final OutputReceiver<KV<SensorParentKey, ActivePowerRecord>> out, + @StateId("parents") final ValueState<Set<String>> state, + final ProcessContext c) { final ActivePowerRecord record = kv.getValue() == null ? null : kv.getValue(); final Set<String> newParents = - c.sideInput(childParentPairMap).get(kv.getKey()) == null + c.sideInput(this.childParentPairMap).get(kv.getKey()) == null ? Collections.emptySet() - : c.sideInput(childParentPairMap).get(kv.getKey()); + : c.sideInput(this.childParentPairMap).get(kv.getKey()); final Set<String> oldParents = MoreObjects.firstNonNull(state.read(), Collections.emptySet()); // Forward new Pairs if they exist diff --git a/theodolite-benchmarks/uc4-beam/src/main/java/application/Uc4BeamPipeline.java b/theodolite-benchmarks/uc4-beam/src/main/java/application/Uc4BeamPipeline.java index 7179fe5da937280d5baf72cd73cc392ef15a60e0..0c63e6f9322e5f70f1ad010de168f1a5292a45a4 100644 --- a/theodolite-benchmarks/uc4-beam/src/main/java/application/Uc4BeamPipeline.java +++ b/theodolite-benchmarks/uc4-beam/src/main/java/application/Uc4BeamPipeline.java @@ -66,8 +66,8 @@ public final class Uc4BeamPipeline extends AbstractPipeline { final Duration gracePeriod = Duration.standardSeconds(config.getInt(ConfigurationKeys.GRACE_PERIOD_MS)); - // Build kafka configuration - final Map<String, Object> consumerConfig = this.buildConsumerConfig(); + // Build Kafka configuration + final Map<String, Object> consumerConfig = super.buildConsumerConfig(); final Map<String, Object> configurationConfig = this.configurationConfig(config); // Set Coders for Classes that will be distributed @@ -77,25 +77,34 @@ public final class Uc4BeamPipeline extends AbstractPipeline { // Read from Kafka // ActivePowerRecords final KafkaActivePowerTimestampReader kafkaActivePowerRecordReader = - new KafkaActivePowerTimestampReader(this.bootstrapServer, this.inputTopic, consumerConfig); + new KafkaActivePowerTimestampReader( + this.bootstrapServer, + this.inputTopic, + consumerConfig); // Configuration Events final KafkaGenericReader<Event, String> kafkaConfigurationReader = new KafkaGenericReader<>( - this.bootstrapServer, configurationTopic, configurationConfig, - EventDeserializer.class, StringDeserializer.class); - - // Transform into AggregatedActivePowerRecords into ActivePowerRecords - final AggregatedToActive aggregatedToActive = new AggregatedToActive(); + this.bootstrapServer, + configurationTopic, + configurationConfig, + EventDeserializer.class, + StringDeserializer.class); // Write to Kafka final KafkaWriterTransformation<AggregatedActivePowerRecord> kafkaOutput = new KafkaWriterTransformation<>( - this.bootstrapServer, outputTopic, AggregatedActivePowerRecordSerializer.class); + this.bootstrapServer, + outputTopic, + AggregatedActivePowerRecordSerializer.class, + super.buildProducerConfig()); final KafkaWriterTransformation<AggregatedActivePowerRecord> kafkaFeedback = new KafkaWriterTransformation<>( - this.bootstrapServer, feedbackTopic, AggregatedActivePowerRecordSerializer.class); + this.bootstrapServer, + feedbackTopic, + AggregatedActivePowerRecordSerializer.class, + super.buildProducerConfig()); // Apply pipeline transformations final PCollection<KV<String, ActivePowerRecord>> values = this @@ -115,7 +124,10 @@ public final class Uc4BeamPipeline extends AbstractPipeline { .withBootstrapServers(this.bootstrapServer) .withTopic(feedbackTopic) .withKeyDeserializer(StringDeserializer.class) - .withValueDeserializer(AggregatedActivePowerRecordDeserializer.class) + .withValueDeserializerAndCoder( + AggregatedActivePowerRecordDeserializer.class, + AvroCoder.of(AggregatedActivePowerRecord.class)) + .withConsumerConfigUpdates(consumerConfig) .withTimestampPolicyFactory( (tp, previousWaterMark) -> new AggregatedActivePowerRecordEventTimePolicy( previousWaterMark)) @@ -123,11 +135,12 @@ public final class Uc4BeamPipeline extends AbstractPipeline { .apply("Apply Windows", Window.into(FixedWindows.of(duration))) // Convert into the correct data format .apply("Convert AggregatedActivePowerRecord to ActivePowerRecord", - MapElements.via(aggregatedToActive)) + MapElements.via(new AggregatedToActive())) .apply("Set trigger for feedback", Window .<KV<String, ActivePowerRecord>>configure() .triggering(Repeatedly.forever( - AfterProcessingTime.pastFirstElementInPane() + AfterProcessingTime + .pastFirstElementInPane() .plusDelayOf(triggerDelay))) .withAllowedLateness(gracePeriod) .discardingFiredPanes()); @@ -170,17 +183,13 @@ public final class Uc4BeamPipeline extends AbstractPipeline { .accumulatingFiredPanes()) .apply(View.asMap()); - final FilterNullValues filterNullValues = new FilterNullValues(); - // Build pairs of every sensor reading and parent final PCollection<KV<SensorParentKey, ActivePowerRecord>> flatMappedValues = inputCollection.apply( "Duplicate as flatMap", - ParDo.of(new DuplicateAsFlatMap(childParentPairMap)) - .withSideInputs(childParentPairMap)) + ParDo.of(new DuplicateAsFlatMap(childParentPairMap)).withSideInputs(childParentPairMap)) .apply("Filter only latest changes", Latest.perKey()) - .apply("Filter out null values", - Filter.by(filterNullValues)); + .apply("Filter out null values", Filter.by(new FilterNullValues())); final SetIdForAggregated setIdForAggregated = new SetIdForAggregated(); final SetKeyToGroup setKeyToGroup = new SetKeyToGroup(); @@ -204,8 +213,7 @@ public final class Uc4BeamPipeline extends AbstractPipeline { aggregations.apply("Write to aggregation results", kafkaOutput); - aggregations - .apply("Write to feedback topic", kafkaFeedback); + aggregations.apply("Write to feedback topic", kafkaFeedback); } @@ -217,14 +225,15 @@ public final class Uc4BeamPipeline extends AbstractPipeline { */ public Map<String, Object> configurationConfig(final Configuration config) { final Map<String, Object> consumerConfig = new HashMap<>(); - consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, + consumerConfig.put( + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, config.getString(ConfigurationKeys.ENABLE_AUTO_COMMIT_CONFIG)); - consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, - config - .getString(ConfigurationKeys.AUTO_OFFSET_RESET_CONFIG)); - - consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, config - .getString(ConfigurationKeys.APPLICATION_NAME) + "-configuration"); + consumerConfig.put( + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, + config.getString(ConfigurationKeys.AUTO_OFFSET_RESET_CONFIG)); + consumerConfig.put( + ConsumerConfig.GROUP_ID_CONFIG, config + .getString(ConfigurationKeys.APPLICATION_NAME) + "-configuration"); return consumerConfig; } diff --git a/theodolite-benchmarks/uc4-beam/src/main/java/serialization/AggregatedActivePowerRecordCoder.java b/theodolite-benchmarks/uc4-beam/src/main/java/serialization/AggregatedActivePowerRecordCoder.java index d2b484f5ab30be63f311d6dbcf495baebbd5e2b4..28f3d70b4a19756454fe4564b6f85599e7f17777 100644 --- a/theodolite-benchmarks/uc4-beam/src/main/java/serialization/AggregatedActivePowerRecordCoder.java +++ b/theodolite-benchmarks/uc4-beam/src/main/java/serialization/AggregatedActivePowerRecordCoder.java @@ -11,8 +11,7 @@ import org.apache.beam.sdk.coders.CoderException; import titan.ccp.model.records.AggregatedActivePowerRecord; /** - * Wrapper Class that encapsulates a AggregatedActivePowerRecord Serde in a - * org.apache.beam.sdk.coders.Coder. + * {@link Coder} for an {@link AggregatedActivePowerRecord}. */ @SuppressWarnings("serial") public class AggregatedActivePowerRecordCoder extends Coder<AggregatedActivePowerRecord> @@ -51,7 +50,7 @@ public class AggregatedActivePowerRecordCoder extends Coder<AggregatedActivePowe @Override public void verifyDeterministic() throws NonDeterministicException { if (!DETERMINISTIC) { - throw new NonDeterministicException(this, "This class should be deterministic!"); + throw new NonDeterministicException(this, "This class should be deterministic."); } } } diff --git a/theodolite-benchmarks/uc4-beam/src/main/java/serialization/AggregatedActivePowerRecordDeserializer.java b/theodolite-benchmarks/uc4-beam/src/main/java/serialization/AggregatedActivePowerRecordDeserializer.java index 6e2f2765ff65d3bca2a127be36db0854f15afebc..3076861a53dac031afd9e8eb913b5a0bafe480c0 100644 --- a/theodolite-benchmarks/uc4-beam/src/main/java/serialization/AggregatedActivePowerRecordDeserializer.java +++ b/theodolite-benchmarks/uc4-beam/src/main/java/serialization/AggregatedActivePowerRecordDeserializer.java @@ -1,34 +1,12 @@ package serialization; -import java.io.ByteArrayInputStream; -import java.io.IOException; -import org.apache.beam.sdk.coders.AvroCoder; +import io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer; import org.apache.kafka.common.serialization.Deserializer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import titan.ccp.model.records.AggregatedActivePowerRecord; /** - * Wrapper Class that encapsulates a IMonitoringRecordSerde.serializer in a Deserializer + * {@link Deserializer} for an {@link AggregatedActivePowerRecord}. */ public class AggregatedActivePowerRecordDeserializer - implements Deserializer<AggregatedActivePowerRecord> { - - private static final Logger LOGGER = - LoggerFactory.getLogger(AggregatedActivePowerRecordDeserializer.class); - - private final transient AvroCoder<AggregatedActivePowerRecord> avroEnCoder = - AvroCoder.of(AggregatedActivePowerRecord.class); - - @Override - public AggregatedActivePowerRecord deserialize(final String topic, final byte[] data) { - AggregatedActivePowerRecord value = null; - try { - value = this.avroEnCoder.decode(new ByteArrayInputStream(data)); - } catch (final IOException e) { - LOGGER.error("Could not deserialize AggregatedActivePowerRecord", e); - } - return value; - } - + extends SpecificAvroDeserializer<AggregatedActivePowerRecord> { } diff --git a/theodolite-benchmarks/uc4-beam/src/main/java/serialization/AggregatedActivePowerRecordSerializer.java b/theodolite-benchmarks/uc4-beam/src/main/java/serialization/AggregatedActivePowerRecordSerializer.java index 77b79d5465f1d561870bf5b04f8fa20f87076adb..26801d8a28b9756214c65c4e8190e15d04bb3e68 100644 --- a/theodolite-benchmarks/uc4-beam/src/main/java/serialization/AggregatedActivePowerRecordSerializer.java +++ b/theodolite-benchmarks/uc4-beam/src/main/java/serialization/AggregatedActivePowerRecordSerializer.java @@ -1,45 +1,12 @@ package serialization; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import org.apache.beam.sdk.coders.AvroCoder; +import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer; import org.apache.kafka.common.serialization.Serializer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import titan.ccp.model.records.AggregatedActivePowerRecord; /** - * Wrapper Class that encapsulates a IMonitoringRecordSerde.serializer in a Serializer + * {@link Serializer} for an {@link AggregatedActivePowerRecord}. */ public class AggregatedActivePowerRecordSerializer - implements Serializer<AggregatedActivePowerRecord> { - - private static final Logger LOGGER = - LoggerFactory.getLogger(AggregatedActivePowerRecordSerializer.class); - - private final transient AvroCoder<AggregatedActivePowerRecord> avroEnCoder = - AvroCoder.of(AggregatedActivePowerRecord.class); - - @Override - public byte[] serialize(final String topic, final AggregatedActivePowerRecord data) { - final ByteArrayOutputStream out = new ByteArrayOutputStream(); - try { - this.avroEnCoder.encode(data, out); - } catch (final IOException e) { - LOGGER.error("Could not serialize AggregatedActivePowerRecord", e); - } - final byte[] result = out.toByteArray(); - try { - out.close(); - } catch (final IOException e) { - LOGGER.error( - "Could not close output stream after serialization of AggregatedActivePowerRecord", e); - } - return result; - } - - @Override - public void close() { - Serializer.super.close(); - } + extends SpecificAvroSerializer<AggregatedActivePowerRecord> { } diff --git a/theodolite-benchmarks/uc4-flink/.settings/org.eclipse.jdt.ui.prefs b/theodolite-benchmarks/uc4-flink/.settings/org.eclipse.jdt.ui.prefs index 272e01533f6a345d53d2635c47e38c6d3c33dc8a..08fcb07933ca19165976bffd5e7fdfdaf64ee1d2 100644 --- a/theodolite-benchmarks/uc4-flink/.settings/org.eclipse.jdt.ui.prefs +++ b/theodolite-benchmarks/uc4-flink/.settings/org.eclipse.jdt.ui.prefs @@ -61,7 +61,7 @@ cleanup_settings_version=2 eclipse.preferences.version=1 editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true formatter_profile=_CAU-SE-Style -formatter_settings_version=15 +formatter_settings_version=21 org.eclipse.jdt.ui.ignorelowercasenames=true org.eclipse.jdt.ui.importorder=; org.eclipse.jdt.ui.ondemandthreshold=99 diff --git a/theodolite-benchmarks/uc4-kstreams/.settings/org.eclipse.jdt.ui.prefs b/theodolite-benchmarks/uc4-kstreams/.settings/org.eclipse.jdt.ui.prefs index fa98ca63d77bdee891150bd6713f70197a75cefc..a375cb792eeb842ecfd1f789fbf6a716df43e9c8 100644 --- a/theodolite-benchmarks/uc4-kstreams/.settings/org.eclipse.jdt.ui.prefs +++ b/theodolite-benchmarks/uc4-kstreams/.settings/org.eclipse.jdt.ui.prefs @@ -61,7 +61,7 @@ cleanup_settings_version=2 eclipse.preferences.version=1 editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true formatter_profile=_CAU-SE-Style -formatter_settings_version=15 +formatter_settings_version=21 org.eclipse.jdt.ui.ignorelowercasenames=true org.eclipse.jdt.ui.importorder=; org.eclipse.jdt.ui.ondemandthreshold=99 diff --git a/theodolite-benchmarks/uc4-load-generator/.settings/org.eclipse.jdt.ui.prefs b/theodolite-benchmarks/uc4-load-generator/.settings/org.eclipse.jdt.ui.prefs index 4d01df75552c562406705858b6368ecf59d6e82f..ac23341bf71ac68df4183361493261758fd5dafb 100644 --- a/theodolite-benchmarks/uc4-load-generator/.settings/org.eclipse.jdt.ui.prefs +++ b/theodolite-benchmarks/uc4-load-generator/.settings/org.eclipse.jdt.ui.prefs @@ -61,7 +61,7 @@ cleanup_settings_version=2 eclipse.preferences.version=1 editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true formatter_profile=_CAU-SE-Style -formatter_settings_version=15 +formatter_settings_version=21 org.eclipse.jdt.ui.ignorelowercasenames=true org.eclipse.jdt.ui.importorder=; org.eclipse.jdt.ui.ondemandthreshold=99