Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • she/theodolite
1 result
Show changes
Commits on Source (29)
Showing
with 298 additions and 150 deletions
...@@ -33,10 +33,16 @@ default: ...@@ -33,10 +33,16 @@ default:
script: script:
- mkdir -p /kaniko/.docker - mkdir -p /kaniko/.docker
- echo "{\"auths\":{\"${CR_HOST}\":{\"auth\":\"$(printf "%s:%s" "${CR_USER}" "${CR_PW}" | base64 | tr -d '\n')\"}}}" > /kaniko/.docker/config.json - echo "{\"auths\":{\"${CR_HOST}\":{\"auth\":\"$(printf "%s:%s" "${CR_USER}" "${CR_PW}" | base64 | tr -d '\n')\"}}}" > /kaniko/.docker/config.json
- DOCKER_TAG_NAME=$(echo $CI_COMMIT_REF_SLUG- | sed 's/^master-$//') - >
- "[ ! $CI_COMMIT_TAG ] && KANIKO_D=\"$KANIKO_D -d $CR_HOST/$CR_ORG/$IMAGE_NAME:${DOCKER_TAG_NAME}latest\"" if [ $IMAGE_TAG ]; then
- "[ ! $CI_COMMIT_TAG ] && KANIKO_D=\"$KANIKO_D -d $CR_HOST/$CR_ORG/$IMAGE_NAME:$DOCKER_TAG_NAME$CI_COMMIT_SHORT_SHA\"" KANIKO_D="$KANIKO_D -d $CR_HOST/$CR_ORG/$IMAGE_NAME:$IMAGE_TAG"
- "[ $CI_COMMIT_TAG ] && KANIKO_D=\"$KANIKO_D -d $CR_HOST/$CR_ORG/$IMAGE_NAME:$CI_COMMIT_TAG\"" elif [ $CI_COMMIT_TAG ]; then
KANIKO_D="$KANIKO_D -d $CR_HOST/$CR_ORG/$IMAGE_NAME:$CI_COMMIT_TAG"
else
DOCKER_TAG_NAME=$(echo $CI_COMMIT_REF_SLUG- | sed 's/^master-$//')
KANIKO_D="$KANIKO_D -d $CR_HOST/$CR_ORG/$IMAGE_NAME:${DOCKER_TAG_NAME}latest"
KANIKO_D="$KANIKO_D -d $CR_HOST/$CR_ORG/$IMAGE_NAME:$DOCKER_TAG_NAME$CI_COMMIT_SHORT_SHA"
fi
- "[ $DOCKERFILE ] && KANIKO_DOCKERFILE=\"--dockerfile $DOCKERFILE\"" - "[ $DOCKERFILE ] && KANIKO_DOCKERFILE=\"--dockerfile $DOCKERFILE\""
- /kaniko/executor --context `pwd`/$CONTEXT $KANIKO_DOCKERFILE $KANIKO_D - /kaniko/executor --context `pwd`/$CONTEXT $KANIKO_DOCKERFILE $KANIKO_D
...@@ -73,26 +79,37 @@ test-docs-links: ...@@ -73,26 +79,37 @@ test-docs-links:
- build-docs - build-docs
script: bundle exec htmlproofer --assume-extension --allow_hash_href ./_site script: bundle exec htmlproofer --assume-extension --allow_hash_href ./_site
build-docs-crds:
stage: build
image:
name: ghcr.io/fybrik/crdoc:0.6.1
entrypoint: [""]
script: /crdoc --resources theodolite/crd/ --template docs/api-reference/crds.tmpl --output docs/api-reference/crds.ref.md
artifacts:
paths:
- docs/api-reference/crds.ref.md
expire_in: 1 week
rules:
- changes:
- docs/api-reference/crds.tmpl
- theodolite/crd/**/*
- when: manual
allow_failure: true
test-docs-crds-regression: test-docs-crds-regression:
stage: test stage: test
image: golang needs:
- build-docs-crds
image: alpine:3.15
before_script: before_script:
- cd docs - cd docs
- go install fybrik.io/crdoc@latest
script: script:
- crdoc --resources ../theodolite/crd/ --template api-reference/crds.tmpl --output api-reference/crds.ref.md
- cmp api-reference/crds.md api-reference/crds.ref.md - cmp api-reference/crds.md api-reference/crds.ref.md
artifacts: artifacts:
when: on_failure when: on_failure
paths: paths:
- docs/api-reference/crds.ref.md - docs/api-reference/crds.ref.md
expire_in: 1 week expire_in: 1 week
rules:
- changes:
- docs/api-reference/crds.tmpl
- theodolite/crd/**/*
- when: manual
allow_failure: true
# Theodolite Helm Chart # Theodolite Helm Chart
...@@ -104,6 +121,11 @@ lint-helm: ...@@ -104,6 +121,11 @@ lint-helm:
name: alpine/helm:3.5.2 name: alpine/helm:3.5.2
entrypoint: [""] entrypoint: [""]
script: helm lint helm/ script: helm lint helm/
rules:
- changes:
- helm/*
- when: manual
allow_failure: true
# Theodolite Benchmarks # Theodolite Benchmarks
...@@ -367,6 +389,11 @@ deploy-uc4-load-generator: ...@@ -367,6 +389,11 @@ deploy-uc4-load-generator:
before_script: before_script:
- export GRADLE_USER_HOME=`pwd`/.gradle - export GRADLE_USER_HOME=`pwd`/.gradle
- cd theodolite - cd theodolite
rules:
- changes:
- theodolite/**/*
- when: manual
allow_failure: true
build-theodolite-jvm: build-theodolite-jvm:
stage: build stage: build
...@@ -567,4 +594,22 @@ deploy-random-scheduler: ...@@ -567,4 +594,22 @@ deploy-random-scheduler:
- if: "$CR_HOST && $CR_ORG && $CR_USER && $CR_PW" - if: "$CR_HOST && $CR_ORG && $CR_USER && $CR_PW"
when: manual when: manual
allow_failure: true allow_failure: true
\ No newline at end of file deploy-buildimage-docker-compose-jq:
stage: deploy
extends:
- .kaniko-push
needs: []
variables:
DOCKER_VERSION: 20.10.12
IMAGE_NAME: theodolite-build-docker-compose-jq
IMAGE_TAG: $DOCKER_VERSION
before_script:
- cd buildimages/docker-compose-jq
rules:
- changes:
- buildimages/docker-compose-jq/Dockerfile
if: "$CR_HOST && $CR_ORG && $CR_USER && $CR_PW"
- if: "$CR_HOST && $CR_ORG && $CR_USER && $CR_PW && $CI_PIPELINE_SOURCE == 'web'"
when: manual
allow_failure: true
FROM docker:${DOCKER_VERSION:-latest}
RUN apk update && \
apk add jq && \
apk add py-pip python3-dev libffi-dev openssl-dev gcc libc-dev rust cargo make && \
pip install docker-compose
...@@ -39,5 +39,5 @@ crdoc --resources ../theodolite/crd/ --template api-reference/crds.tmpl --outpu ...@@ -39,5 +39,5 @@ crdoc --resources ../theodolite/crd/ --template api-reference/crds.tmpl --outpu
With the following command, crdoc is executed in Docker: With the following command, crdoc is executed in Docker:
```sh ```sh
docker run --rm -v "`pwd`/../theodolite/crd/":/crd -u $UID -v "`pwd`/api-reference":/api-reference ghcr.io/fybrik/crdoc:0.6.0 --resources /crd/ --template /api-reference/crds.tmpl --output /api-reference/crds.md docker run --rm -v "`pwd`/../theodolite/crd/":/crd -v "`pwd`/api-reference":/api-reference ghcr.io/fybrik/crdoc:0.6.1 --resources /crd/ --template /api-reference/crds.tmpl --output /api-reference/crds.md
``` ```
...@@ -115,15 +115,13 @@ If a benchmark is [executed by an Execution](running-benchmarks), these patchers ...@@ -115,15 +115,13 @@ If a benchmark is [executed by an Execution](running-benchmarks), these patchers
## Kafka Configuration ## Kafka Configuration
Theodolite allows to automatically create and remove Kafka topics for each SLO experiment. Theodolite allows to automatically create and remove Kafka topics for each SLO experiment by setting a `kafkaConfig`.
Use the `removeOnly: True` property for topics which are created automatically by the SUT. It `bootstrapServer` needs to point your Kafka cluster and `topics` configures the list of Kafka topics to be created/removed.
For those topics, also wildcards are allowed in the topic name. For each topic, you configure its name, the number of partitions and the replication factor.
If no Kafka topics should be created, simply set: With the `removeOnly: True` property, you can also instruct Theodolite to only remove topics and not create them.
This is useful when benchmarking SUTs, which create topics on their own (e.g., Kafka Streams and Samza applications).
```yaml For those topics, also wildcards are allowed in the topic name and, of course, no partition count or replication factor must be provided.
kafkaConfig: []
```
<!-- Further information: API Reference --> <!-- Further information: API Reference -->
......
cleanup.add_default_serial_version_id=true
cleanup.add_generated_serial_version_id=false
cleanup.add_missing_annotations=true
cleanup.add_missing_deprecated_annotations=true
cleanup.add_missing_methods=false
cleanup.add_missing_nls_tags=false
cleanup.add_missing_override_annotations=true
cleanup.add_missing_override_annotations_interface_methods=true
cleanup.add_serial_version_id=false
cleanup.always_use_blocks=true
cleanup.always_use_parentheses_in_expressions=false
cleanup.always_use_this_for_non_static_field_access=true
cleanup.always_use_this_for_non_static_method_access=true
cleanup.convert_functional_interfaces=false
cleanup.convert_to_enhanced_for_loop=true
cleanup.correct_indentation=true
cleanup.format_source_code=true
cleanup.format_source_code_changes_only=false
cleanup.insert_inferred_type_arguments=false
cleanup.make_local_variable_final=true
cleanup.make_parameters_final=true
cleanup.make_private_fields_final=true
cleanup.make_type_abstract_if_missing_method=false
cleanup.make_variable_declarations_final=true
cleanup.never_use_blocks=false
cleanup.never_use_parentheses_in_expressions=true
cleanup.organize_imports=true
cleanup.qualify_static_field_accesses_with_declaring_class=false
cleanup.qualify_static_member_accesses_through_instances_with_declaring_class=true
cleanup.qualify_static_member_accesses_through_subtypes_with_declaring_class=true
cleanup.qualify_static_member_accesses_with_declaring_class=true
cleanup.qualify_static_method_accesses_with_declaring_class=false
cleanup.remove_private_constructors=true
cleanup.remove_redundant_modifiers=false
cleanup.remove_redundant_semicolons=true
cleanup.remove_redundant_type_arguments=true
cleanup.remove_trailing_whitespaces=true
cleanup.remove_trailing_whitespaces_all=true
cleanup.remove_trailing_whitespaces_ignore_empty=false
cleanup.remove_unnecessary_casts=true
cleanup.remove_unnecessary_nls_tags=true
cleanup.remove_unused_imports=true
cleanup.remove_unused_local_variables=false
cleanup.remove_unused_private_fields=true
cleanup.remove_unused_private_members=false
cleanup.remove_unused_private_methods=true
cleanup.remove_unused_private_types=true
cleanup.sort_members=false
cleanup.sort_members_all=false
cleanup.use_anonymous_class_creation=false
cleanup.use_blocks=true
cleanup.use_blocks_only_for_return_and_throw=false
cleanup.use_lambda=true
cleanup.use_parentheses_in_expressions=true
cleanup.use_this_for_non_static_field_access=true
cleanup.use_this_for_non_static_field_access_only_if_necessary=false
cleanup.use_this_for_non_static_method_access=true
cleanup.use_this_for_non_static_method_access_only_if_necessary=false
cleanup_profile=_CAU-SE-Style
cleanup_settings_version=2
eclipse.preferences.version=1
editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true
formatter_profile=_CAU-SE-Style
formatter_settings_version=21
org.eclipse.jdt.ui.ignorelowercasenames=true
org.eclipse.jdt.ui.importorder=;
org.eclipse.jdt.ui.ondemandthreshold=99
org.eclipse.jdt.ui.staticondemandthreshold=99
sp_cleanup.add_default_serial_version_id=true
sp_cleanup.add_generated_serial_version_id=false
sp_cleanup.add_missing_annotations=true
sp_cleanup.add_missing_deprecated_annotations=true
sp_cleanup.add_missing_methods=false
sp_cleanup.add_missing_nls_tags=false
sp_cleanup.add_missing_override_annotations=true
sp_cleanup.add_missing_override_annotations_interface_methods=true
sp_cleanup.add_serial_version_id=false
sp_cleanup.always_use_blocks=true
sp_cleanup.always_use_parentheses_in_expressions=false
sp_cleanup.always_use_this_for_non_static_field_access=true
sp_cleanup.always_use_this_for_non_static_method_access=true
sp_cleanup.convert_functional_interfaces=false
sp_cleanup.convert_to_enhanced_for_loop=true
sp_cleanup.correct_indentation=true
sp_cleanup.format_source_code=true
sp_cleanup.format_source_code_changes_only=false
sp_cleanup.insert_inferred_type_arguments=false
sp_cleanup.make_local_variable_final=true
sp_cleanup.make_parameters_final=true
sp_cleanup.make_private_fields_final=true
sp_cleanup.make_type_abstract_if_missing_method=false
sp_cleanup.make_variable_declarations_final=true
sp_cleanup.never_use_blocks=false
sp_cleanup.never_use_parentheses_in_expressions=true
sp_cleanup.on_save_use_additional_actions=true
sp_cleanup.organize_imports=true
sp_cleanup.qualify_static_field_accesses_with_declaring_class=false
sp_cleanup.qualify_static_member_accesses_through_instances_with_declaring_class=true
sp_cleanup.qualify_static_member_accesses_through_subtypes_with_declaring_class=true
sp_cleanup.qualify_static_member_accesses_with_declaring_class=true
sp_cleanup.qualify_static_method_accesses_with_declaring_class=false
sp_cleanup.remove_private_constructors=true
sp_cleanup.remove_redundant_modifiers=false
sp_cleanup.remove_redundant_semicolons=true
sp_cleanup.remove_redundant_type_arguments=true
sp_cleanup.remove_trailing_whitespaces=true
sp_cleanup.remove_trailing_whitespaces_all=true
sp_cleanup.remove_trailing_whitespaces_ignore_empty=false
sp_cleanup.remove_unnecessary_casts=true
sp_cleanup.remove_unnecessary_nls_tags=true
sp_cleanup.remove_unused_imports=true
sp_cleanup.remove_unused_local_variables=false
sp_cleanup.remove_unused_private_fields=true
sp_cleanup.remove_unused_private_members=false
sp_cleanup.remove_unused_private_methods=true
sp_cleanup.remove_unused_private_types=true
sp_cleanup.sort_members=false
sp_cleanup.sort_members_all=false
sp_cleanup.use_anonymous_class_creation=false
sp_cleanup.use_blocks=true
sp_cleanup.use_blocks_only_for_return_and_throw=false
sp_cleanup.use_lambda=true
sp_cleanup.use_parentheses_in_expressions=true
sp_cleanup.use_this_for_non_static_field_access=true
sp_cleanup.use_this_for_non_static_field_access_only_if_necessary=false
sp_cleanup.use_this_for_non_static_method_access=true
sp_cleanup.use_this_for_non_static_method_access_only_if_necessary=false
\ No newline at end of file
...@@ -13,21 +13,19 @@ repositories { ...@@ -13,21 +13,19 @@ repositories {
} }
dependencies { dependencies {
// These dependencies are used internally, and not exposed to consumers on their own compile classpath.
implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true } implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true }
implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true } implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true }
implementation 'com.google.code.gson:gson:2.8.2'
implementation 'com.google.guava:guava:24.1-jre'
implementation group: 'org.apache.beam', name: 'beam-sdks-java-core', version: '2.22.0'
implementation('org.apache.beam:beam-sdks-java-io-kafka:2.22.0'){ implementation('org.apache.beam:beam-sdks-java-io-kafka:2.22.0'){
exclude group: 'org.apache.kafka', module: 'kafka-clients' exclude group: 'org.apache.kafka', module: 'kafka-clients'
} }
implementation ('io.confluent:kafka-streams-avro-serde:5.3.2')
implementation group: 'org.slf4j', name: 'slf4j-simple', version: '1.7.30' implementation group: 'org.slf4j', name: 'slf4j-simple', version: '1.7.30'
implementation group: 'org.apache.beam', name: 'beam-sdks-java-core', version: '2.22.0'
runtimeOnly 'org.slf4j:slf4j-api:1.7.32' runtimeOnly 'org.slf4j:slf4j-api:1.7.32'
runtimeOnly 'org.slf4j:slf4j-jdk14:1.7.32' runtimeOnly 'org.slf4j:slf4j-jdk14:1.7.32'
// Use JUnit test framework
testImplementation 'junit:junit:4.12' testImplementation 'junit:junit:4.12'
} }
...@@ -12,6 +12,9 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; ...@@ -12,6 +12,9 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
*/ */
public class AbstractPipeline extends Pipeline { public class AbstractPipeline extends Pipeline {
private static final String KAFKA_CONFIG_SPECIFIC_AVRO_READER = "specific.avro.reader"; // NOPMD
private static final String KAFKA_CONFIG_SCHEMA_REGISTRY_URL = "schema.registry.url"; // NOPMD
protected final String inputTopic; protected final String inputTopic;
protected final String bootstrapServer; protected final String bootstrapServer;
// Application Configurations // Application Configurations
...@@ -21,8 +24,8 @@ public class AbstractPipeline extends Pipeline { ...@@ -21,8 +24,8 @@ public class AbstractPipeline extends Pipeline {
super(options); super(options);
this.config = config; this.config = config;
inputTopic = config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC); this.inputTopic = config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC);
bootstrapServer = config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS); this.bootstrapServer = config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS);
} }
/** /**
...@@ -32,19 +35,37 @@ public class AbstractPipeline extends Pipeline { ...@@ -32,19 +35,37 @@ public class AbstractPipeline extends Pipeline {
*/ */
public Map<String, Object> buildConsumerConfig() { public Map<String, Object> buildConsumerConfig() {
final Map<String, Object> consumerConfig = new HashMap<>(); final Map<String, Object> consumerConfig = new HashMap<>();
consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, consumerConfig.put(
config.getString(ConfigurationKeys.ENABLE_AUTO_COMMIT_CONFIG)); ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, this.config.getString(ConfigurationKeys.ENABLE_AUTO_COMMIT_CONFIG));
config consumerConfig.put(
.getString(ConfigurationKeys.AUTO_OFFSET_RESET_CONFIG)); ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
consumerConfig.put("schema.registry.url", this.config.getString(ConfigurationKeys.AUTO_OFFSET_RESET_CONFIG));
config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL)); consumerConfig.put(
KAFKA_CONFIG_SCHEMA_REGISTRY_URL,
consumerConfig.put("specific.avro.reader", this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL));
config.getString(ConfigurationKeys.SPECIFIC_AVRO_READER)); consumerConfig.put(
KAFKA_CONFIG_SPECIFIC_AVRO_READER,
final String applicationName = config.getString(ConfigurationKeys.APPLICATION_NAME); this.config.getString(ConfigurationKeys.SPECIFIC_AVRO_READER));
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, applicationName); consumerConfig.put(
ConsumerConfig.GROUP_ID_CONFIG,
this.config.getString(ConfigurationKeys.APPLICATION_NAME));
return consumerConfig; return consumerConfig;
} }
/**
* Builds a simple configuration for a Kafka producer transformation.
*
* @return the build configuration.
*/
public Map<String, Object> buildProducerConfig() {
final Map<String, Object> config = new HashMap<>();
config.put(
KAFKA_CONFIG_SCHEMA_REGISTRY_URL,
this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL));
config.put(
KAFKA_CONFIG_SPECIFIC_AVRO_READER,
this.config.getString(ConfigurationKeys.SPECIFIC_AVRO_READER));
return config;
}
} }
package theodolite.commons.beam.kafka;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import titan.ccp.model.records.ActivePowerRecord;
/**
* A Kafka {@link Deserializer} for typed Schema Registry {@link ActivePowerRecord}.
*/
public class ActivePowerRecordDeserializer extends SpecificAvroDeserializer<ActivePowerRecord> {
}
package theodolite.commons.beam.kafka;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import java.util.Map;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.kafka.common.serialization.StringDeserializer;
import titan.ccp.model.records.ActivePowerRecord;
/**
* Simple {@link PTransform} that read from Kafka using {@link KafkaIO}.
*/
public class KafkaActivePowerRecordReader extends
PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> {
private static final long serialVersionUID = 2603286150183186115L;
private final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> reader;
/**
* Instantiates a {@link PTransform} that reads from Kafka with the given Configuration.
*/
public KafkaActivePowerRecordReader(final String bootstrapServer, final String inputTopic,
final Map<String, Object> consumerConfig) {
super();
if (bootstrapServer == null) {
throw new IllegalArgumentException("bootstrapServer is null");
}
if (inputTopic == null) {
throw new IllegalArgumentException("inputTopic is null");
}
// Check if boostrap server and inputTopic are defined
if (bootstrapServer.isEmpty() || inputTopic.isEmpty()) {
throw new IllegalArgumentException("bootstrapServer or inputTopic missing");
}
reader =
KafkaIO.<String, ActivePowerRecord>read()
.withBootstrapServers(bootstrapServer)
.withTopic(inputTopic)
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializerAndCoder((Class) KafkaAvroDeserializer.class,
AvroCoder.of(ActivePowerRecord.class))
.withConsumerConfigUpdates(consumerConfig)
.withoutMetadata();
}
@Override
public PCollection<KV<String, ActivePowerRecord>> expand(final PBegin input) {
return input.apply(this.reader);
}
}
package theodolite.commons.beam.kafka; package theodolite.commons.beam.kafka;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import java.util.Map; import java.util.Map;
import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.io.kafka.KafkaIO; import org.apache.beam.sdk.io.kafka.KafkaIO;
...@@ -12,40 +11,37 @@ import org.apache.kafka.common.serialization.StringDeserializer; ...@@ -12,40 +11,37 @@ import org.apache.kafka.common.serialization.StringDeserializer;
import titan.ccp.model.records.ActivePowerRecord; import titan.ccp.model.records.ActivePowerRecord;
/** /**
* Simple {@link PTransform} that read from Kafka using {@link KafkaIO}. * Simple {@link PTransform} that reads from Kafka using {@link KafkaIO} with event time.
* Has additional a TimestampPolicy.
*/ */
public class KafkaActivePowerTimestampReader extends public class KafkaActivePowerTimestampReader
PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> { extends PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> {
private static final long serialVersionUID = 2603286150183186115L; private static final long serialVersionUID = 2603286150183186115L;
private final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> reader; private final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> reader;
/** /**
* Instantiates a {@link PTransform} that reads from Kafka with the given Configuration. * Instantiates a {@link PTransform} that reads from Kafka with the given Configuration.
*/ */
public KafkaActivePowerTimestampReader(final String bootstrapServer, final String inputTopic, public KafkaActivePowerTimestampReader(
final Map<String, Object> consumerConfig) { final String bootstrapServer,
final String inputTopic,
final Map<String, Object> consumerConfig) {
super(); super();
// Check if boostrap server and inputTopic are defined // Check if bootstrap server and inputTopic are defined
if (bootstrapServer.isEmpty() || inputTopic.isEmpty()) { if (bootstrapServer.isEmpty() || inputTopic.isEmpty()) {
throw new IllegalArgumentException("bootstrapServer or inputTopic missing"); throw new IllegalArgumentException("bootstrapServer or inputTopic missing");
} }
reader = this.reader = KafkaIO.<String, ActivePowerRecord>read().withBootstrapServers(bootstrapServer)
KafkaIO.<String, ActivePowerRecord>read() .withTopic(inputTopic).withKeyDeserializer(StringDeserializer.class)
.withBootstrapServers(bootstrapServer) .withValueDeserializerAndCoder(
.withTopic(inputTopic) ActivePowerRecordDeserializer.class,
.withKeyDeserializer(StringDeserializer.class) AvroCoder.of(ActivePowerRecord.class))
.withValueDeserializerAndCoder((Class) KafkaAvroDeserializer.class, .withConsumerConfigUpdates(consumerConfig)
AvroCoder.of(ActivePowerRecord.class)) .withTimestampPolicyFactory(
.withConsumerConfigUpdates(consumerConfig) (tp, previousWatermark) -> new EventTimePolicy(previousWatermark))
// Set TimeStampPolicy for event time .withoutMetadata();
.withTimestampPolicyFactory(
(tp, previousWaterMark) -> new EventTimePolicy(previousWaterMark))
.withoutMetadata();
} }
@Override @Override
......
package theodolite.commons.beam.kafka; package theodolite.commons.beam.kafka;
import java.util.Map;
import org.apache.beam.sdk.io.kafka.KafkaIO; import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.KV;
...@@ -9,23 +10,35 @@ import org.apache.kafka.common.serialization.Serializer; ...@@ -9,23 +10,35 @@ import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.serialization.StringSerializer;
/** /**
* Wrapper for a Kafka writing Transformation * Wrapper for a Kafka writing Transformation where the value type can be generic.
* where the value type can be generic. *
* @param <T> type of the value. * @param <T> type of the value.
*/ */
public class KafkaWriterTransformation<T> extends public class KafkaWriterTransformation<T> extends PTransform<PCollection<KV<String, T>>, PDone> {
PTransform<PCollection<KV<String, T>>, PDone> {
private static final long serialVersionUID = 3171423303843174723L; private static final long serialVersionUID = 3171423303843174723L;
private final PTransform<PCollection<KV<String, T>>, PDone> writer; private final PTransform<PCollection<KV<String, T>>, PDone> writer;
/** /**
* Creates a new kafka writer transformation. * Creates a new Kafka writer transformation.
*/ */
public KafkaWriterTransformation(final String bootstrapServer, final String outputTopic, public KafkaWriterTransformation(
final Class<? extends Serializer<T>> valueSerializer) { final String bootstrapServer,
final String outputTopic,
final Class<? extends Serializer<T>> valueSerializer) {
this(bootstrapServer, outputTopic, valueSerializer, Map.of());
}
/**
* Creates a new Kafka writer transformation.
*/
public KafkaWriterTransformation(
final String bootstrapServer,
final String outputTopic,
final Class<? extends Serializer<T>> valueSerializer,
final Map<String, Object> producerConfig) {
super(); super();
// Check if boostrap server and outputTopic are defined // Check if bootstrap server and outputTopic are defined
if (bootstrapServer.isEmpty() || outputTopic.isEmpty()) { if (bootstrapServer.isEmpty() || outputTopic.isEmpty()) {
throw new IllegalArgumentException("bootstrapServer or outputTopic missing"); throw new IllegalArgumentException("bootstrapServer or outputTopic missing");
} }
...@@ -34,7 +47,8 @@ public class KafkaWriterTransformation<T> extends ...@@ -34,7 +47,8 @@ public class KafkaWriterTransformation<T> extends
.withBootstrapServers(bootstrapServer) .withBootstrapServers(bootstrapServer)
.withTopic(outputTopic) .withTopic(outputTopic)
.withKeySerializer(StringSerializer.class) .withKeySerializer(StringSerializer.class)
.withValueSerializer(valueSerializer); .withValueSerializer(valueSerializer)
.withProducerConfigUpdates(producerConfig);
} }
......
...@@ -61,7 +61,7 @@ cleanup_settings_version=2 ...@@ -61,7 +61,7 @@ cleanup_settings_version=2
eclipse.preferences.version=1 eclipse.preferences.version=1
editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true
formatter_profile=_CAU-SE-Style formatter_profile=_CAU-SE-Style
formatter_settings_version=15 formatter_settings_version=21
org.eclipse.jdt.ui.ignorelowercasenames=true org.eclipse.jdt.ui.ignorelowercasenames=true
org.eclipse.jdt.ui.importorder=; org.eclipse.jdt.ui.importorder=;
org.eclipse.jdt.ui.ondemandthreshold=99 org.eclipse.jdt.ui.ondemandthreshold=99
......
...@@ -61,7 +61,7 @@ cleanup_settings_version=2 ...@@ -61,7 +61,7 @@ cleanup_settings_version=2
eclipse.preferences.version=1 eclipse.preferences.version=1
editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true
formatter_profile=_CAU-SE-Style formatter_profile=_CAU-SE-Style
formatter_settings_version=15 formatter_settings_version=21
org.eclipse.jdt.ui.ignorelowercasenames=true org.eclipse.jdt.ui.ignorelowercasenames=true
org.eclipse.jdt.ui.importorder=; org.eclipse.jdt.ui.importorder=;
org.eclipse.jdt.ui.ondemandthreshold=99 org.eclipse.jdt.ui.ondemandthreshold=99
......
...@@ -61,7 +61,7 @@ cleanup_settings_version=2 ...@@ -61,7 +61,7 @@ cleanup_settings_version=2
eclipse.preferences.version=1 eclipse.preferences.version=1
editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true
formatter_profile=_CAU-SE-Style formatter_profile=_CAU-SE-Style
formatter_settings_version=15 formatter_settings_version=21
org.eclipse.jdt.ui.ignorelowercasenames=true org.eclipse.jdt.ui.ignorelowercasenames=true
org.eclipse.jdt.ui.importorder=; org.eclipse.jdt.ui.importorder=;
org.eclipse.jdt.ui.ondemandthreshold=99 org.eclipse.jdt.ui.ondemandthreshold=99
......
...@@ -10,7 +10,6 @@ import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo; ...@@ -10,7 +10,6 @@ import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo;
import static com.github.tomakehurst.wiremock.client.WireMock.verify; import static com.github.tomakehurst.wiremock.client.WireMock.verify;
import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.options; import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.options;
import com.github.tomakehurst.wiremock.junit.WireMockRule; import com.github.tomakehurst.wiremock.junit.WireMockRule;
import com.google.gson.Gson;
import java.net.URI; import java.net.URI;
import org.junit.Before; import org.junit.Before;
import org.junit.Rule; import org.junit.Rule;
...@@ -21,8 +20,6 @@ public class HttpRecordSenderTest { ...@@ -21,8 +20,6 @@ public class HttpRecordSenderTest {
private HttpRecordSender<ActivePowerRecord> httpRecordSender; private HttpRecordSender<ActivePowerRecord> httpRecordSender;
private Gson gson;
@Rule @Rule
public WireMockRule wireMockRule = new WireMockRule(options().dynamicPort()); public WireMockRule wireMockRule = new WireMockRule(options().dynamicPort());
...@@ -30,7 +27,6 @@ public class HttpRecordSenderTest { ...@@ -30,7 +27,6 @@ public class HttpRecordSenderTest {
public void setup() { public void setup() {
this.httpRecordSender = this.httpRecordSender =
new HttpRecordSender<>(URI.create("http://localhost:" + this.wireMockRule.port())); new HttpRecordSender<>(URI.create("http://localhost:" + this.wireMockRule.port()));
this.gson = new Gson();
} }
@Test @Test
......
...@@ -8,7 +8,6 @@ import org.slf4j.LoggerFactory; ...@@ -8,7 +8,6 @@ import org.slf4j.LoggerFactory;
/** /**
* Logs all Key Value pairs. * Logs all Key Value pairs.
*/ */
@SuppressWarnings({"unused"})
public class LogKeyValue extends DoFn<KV<String, String>, KV<String, String>> { public class LogKeyValue extends DoFn<KV<String, String>, KV<String, String>> {
private static final long serialVersionUID = 4328743; private static final long serialVersionUID = 4328743;
private static final Logger LOGGER = LoggerFactory.getLogger(LogKeyValue.class); private static final Logger LOGGER = LoggerFactory.getLogger(LogKeyValue.class);
...@@ -19,9 +18,7 @@ public class LogKeyValue extends DoFn<KV<String, String>, KV<String, String>> { ...@@ -19,9 +18,7 @@ public class LogKeyValue extends DoFn<KV<String, String>, KV<String, String>> {
@ProcessElement @ProcessElement
public void processElement(@Element final KV<String, String> kv, public void processElement(@Element final KV<String, String> kv,
final OutputReceiver<KV<String, String>> out) { final OutputReceiver<KV<String, String>> out) {
if (LOGGER.isInfoEnabled()) { LOGGER.info("Key: {}, Value: {}", kv.getKey(), kv.getValue());
LOGGER.info("Key: {}, Value: {}", kv.getKey(), kv.getValue());
}
out.output(kv); out.output(kv);
} }
} }
...@@ -61,7 +61,7 @@ cleanup_settings_version=2 ...@@ -61,7 +61,7 @@ cleanup_settings_version=2
eclipse.preferences.version=1 eclipse.preferences.version=1
editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true
formatter_profile=_CAU-SE-Style formatter_profile=_CAU-SE-Style
formatter_settings_version=15 formatter_settings_version=21
org.eclipse.jdt.ui.ignorelowercasenames=true org.eclipse.jdt.ui.ignorelowercasenames=true
org.eclipse.jdt.ui.importorder=; org.eclipse.jdt.ui.importorder=;
org.eclipse.jdt.ui.ondemandthreshold=99 org.eclipse.jdt.ui.ondemandthreshold=99
......
...@@ -61,7 +61,7 @@ cleanup_settings_version=2 ...@@ -61,7 +61,7 @@ cleanup_settings_version=2
eclipse.preferences.version=1 eclipse.preferences.version=1
editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true
formatter_profile=_CAU-SE-Style formatter_profile=_CAU-SE-Style
formatter_settings_version=15 formatter_settings_version=21
org.eclipse.jdt.ui.ignorelowercasenames=true org.eclipse.jdt.ui.ignorelowercasenames=true
org.eclipse.jdt.ui.importorder=; org.eclipse.jdt.ui.importorder=;
org.eclipse.jdt.ui.ondemandthreshold=99 org.eclipse.jdt.ui.ondemandthreshold=99
......
...@@ -61,7 +61,7 @@ cleanup_settings_version=2 ...@@ -61,7 +61,7 @@ cleanup_settings_version=2
eclipse.preferences.version=1 eclipse.preferences.version=1
editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true
formatter_profile=_CAU-SE-Style formatter_profile=_CAU-SE-Style
formatter_settings_version=15 formatter_settings_version=21
org.eclipse.jdt.ui.ignorelowercasenames=true org.eclipse.jdt.ui.ignorelowercasenames=true
org.eclipse.jdt.ui.importorder=; org.eclipse.jdt.ui.importorder=;
org.eclipse.jdt.ui.ondemandthreshold=99 org.eclipse.jdt.ui.ondemandthreshold=99
......
...@@ -61,7 +61,7 @@ cleanup_settings_version=2 ...@@ -61,7 +61,7 @@ cleanup_settings_version=2
eclipse.preferences.version=1 eclipse.preferences.version=1
editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true
formatter_profile=_CAU-SE-Style formatter_profile=_CAU-SE-Style
formatter_settings_version=15 formatter_settings_version=21
org.eclipse.jdt.ui.ignorelowercasenames=true org.eclipse.jdt.ui.ignorelowercasenames=true
org.eclipse.jdt.ui.importorder=; org.eclipse.jdt.ui.importorder=;
org.eclipse.jdt.ui.ondemandthreshold=99 org.eclipse.jdt.ui.ondemandthreshold=99
......