Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • she/theodolite
1 result
Show changes
Commits on Source (29)
Showing
with 298 additions and 150 deletions
......@@ -33,10 +33,16 @@ default:
script:
- mkdir -p /kaniko/.docker
- echo "{\"auths\":{\"${CR_HOST}\":{\"auth\":\"$(printf "%s:%s" "${CR_USER}" "${CR_PW}" | base64 | tr -d '\n')\"}}}" > /kaniko/.docker/config.json
- DOCKER_TAG_NAME=$(echo $CI_COMMIT_REF_SLUG- | sed 's/^master-$//')
- "[ ! $CI_COMMIT_TAG ] && KANIKO_D=\"$KANIKO_D -d $CR_HOST/$CR_ORG/$IMAGE_NAME:${DOCKER_TAG_NAME}latest\""
- "[ ! $CI_COMMIT_TAG ] && KANIKO_D=\"$KANIKO_D -d $CR_HOST/$CR_ORG/$IMAGE_NAME:$DOCKER_TAG_NAME$CI_COMMIT_SHORT_SHA\""
- "[ $CI_COMMIT_TAG ] && KANIKO_D=\"$KANIKO_D -d $CR_HOST/$CR_ORG/$IMAGE_NAME:$CI_COMMIT_TAG\""
- >
if [ $IMAGE_TAG ]; then
KANIKO_D="$KANIKO_D -d $CR_HOST/$CR_ORG/$IMAGE_NAME:$IMAGE_TAG"
elif [ $CI_COMMIT_TAG ]; then
KANIKO_D="$KANIKO_D -d $CR_HOST/$CR_ORG/$IMAGE_NAME:$CI_COMMIT_TAG"
else
DOCKER_TAG_NAME=$(echo $CI_COMMIT_REF_SLUG- | sed 's/^master-$//')
KANIKO_D="$KANIKO_D -d $CR_HOST/$CR_ORG/$IMAGE_NAME:${DOCKER_TAG_NAME}latest"
KANIKO_D="$KANIKO_D -d $CR_HOST/$CR_ORG/$IMAGE_NAME:$DOCKER_TAG_NAME$CI_COMMIT_SHORT_SHA"
fi
- "[ $DOCKERFILE ] && KANIKO_DOCKERFILE=\"--dockerfile $DOCKERFILE\""
- /kaniko/executor --context `pwd`/$CONTEXT $KANIKO_DOCKERFILE $KANIKO_D
......@@ -73,26 +79,37 @@ test-docs-links:
- build-docs
script: bundle exec htmlproofer --assume-extension --allow_hash_href ./_site
build-docs-crds:
stage: build
image:
name: ghcr.io/fybrik/crdoc:0.6.1
entrypoint: [""]
script: /crdoc --resources theodolite/crd/ --template docs/api-reference/crds.tmpl --output docs/api-reference/crds.ref.md
artifacts:
paths:
- docs/api-reference/crds.ref.md
expire_in: 1 week
rules:
- changes:
- docs/api-reference/crds.tmpl
- theodolite/crd/**/*
- when: manual
allow_failure: true
test-docs-crds-regression:
stage: test
image: golang
needs:
- build-docs-crds
image: alpine:3.15
before_script:
- cd docs
- go install fybrik.io/crdoc@latest
script:
- crdoc --resources ../theodolite/crd/ --template api-reference/crds.tmpl --output api-reference/crds.ref.md
- cmp api-reference/crds.md api-reference/crds.ref.md
artifacts:
when: on_failure
paths:
- docs/api-reference/crds.ref.md
expire_in: 1 week
rules:
- changes:
- docs/api-reference/crds.tmpl
- theodolite/crd/**/*
- when: manual
allow_failure: true
# Theodolite Helm Chart
......@@ -104,6 +121,11 @@ lint-helm:
name: alpine/helm:3.5.2
entrypoint: [""]
script: helm lint helm/
rules:
- changes:
- helm/*
- when: manual
allow_failure: true
# Theodolite Benchmarks
......@@ -367,6 +389,11 @@ deploy-uc4-load-generator:
before_script:
- export GRADLE_USER_HOME=`pwd`/.gradle
- cd theodolite
rules:
- changes:
- theodolite/**/*
- when: manual
allow_failure: true
build-theodolite-jvm:
stage: build
......@@ -567,4 +594,22 @@ deploy-random-scheduler:
- if: "$CR_HOST && $CR_ORG && $CR_USER && $CR_PW"
when: manual
allow_failure: true
\ No newline at end of file
deploy-buildimage-docker-compose-jq:
stage: deploy
extends:
- .kaniko-push
needs: []
variables:
DOCKER_VERSION: 20.10.12
IMAGE_NAME: theodolite-build-docker-compose-jq
IMAGE_TAG: $DOCKER_VERSION
before_script:
- cd buildimages/docker-compose-jq
rules:
- changes:
- buildimages/docker-compose-jq/Dockerfile
if: "$CR_HOST && $CR_ORG && $CR_USER && $CR_PW"
- if: "$CR_HOST && $CR_ORG && $CR_USER && $CR_PW && $CI_PIPELINE_SOURCE == 'web'"
when: manual
allow_failure: true
FROM docker:${DOCKER_VERSION:-latest}
RUN apk update && \
apk add jq && \
apk add py-pip python3-dev libffi-dev openssl-dev gcc libc-dev rust cargo make && \
pip install docker-compose
......@@ -39,5 +39,5 @@ crdoc --resources ../theodolite/crd/ --template api-reference/crds.tmpl --outpu
With the following command, crdoc is executed in Docker:
```sh
docker run --rm -v "`pwd`/../theodolite/crd/":/crd -u $UID -v "`pwd`/api-reference":/api-reference ghcr.io/fybrik/crdoc:0.6.0 --resources /crd/ --template /api-reference/crds.tmpl --output /api-reference/crds.md
docker run --rm -v "`pwd`/../theodolite/crd/":/crd -v "`pwd`/api-reference":/api-reference ghcr.io/fybrik/crdoc:0.6.1 --resources /crd/ --template /api-reference/crds.tmpl --output /api-reference/crds.md
```
......@@ -115,15 +115,13 @@ If a benchmark is [executed by an Execution](running-benchmarks), these patchers
## Kafka Configuration
Theodolite allows to automatically create and remove Kafka topics for each SLO experiment.
Use the `removeOnly: True` property for topics which are created automatically by the SUT.
For those topics, also wildcards are allowed in the topic name.
Theodolite allows to automatically create and remove Kafka topics for each SLO experiment by setting a `kafkaConfig`.
It `bootstrapServer` needs to point your Kafka cluster and `topics` configures the list of Kafka topics to be created/removed.
For each topic, you configure its name, the number of partitions and the replication factor.
If no Kafka topics should be created, simply set:
```yaml
kafkaConfig: []
```
With the `removeOnly: True` property, you can also instruct Theodolite to only remove topics and not create them.
This is useful when benchmarking SUTs, which create topics on their own (e.g., Kafka Streams and Samza applications).
For those topics, also wildcards are allowed in the topic name and, of course, no partition count or replication factor must be provided.
<!-- Further information: API Reference -->
......
cleanup.add_default_serial_version_id=true
cleanup.add_generated_serial_version_id=false
cleanup.add_missing_annotations=true
cleanup.add_missing_deprecated_annotations=true
cleanup.add_missing_methods=false
cleanup.add_missing_nls_tags=false
cleanup.add_missing_override_annotations=true
cleanup.add_missing_override_annotations_interface_methods=true
cleanup.add_serial_version_id=false
cleanup.always_use_blocks=true
cleanup.always_use_parentheses_in_expressions=false
cleanup.always_use_this_for_non_static_field_access=true
cleanup.always_use_this_for_non_static_method_access=true
cleanup.convert_functional_interfaces=false
cleanup.convert_to_enhanced_for_loop=true
cleanup.correct_indentation=true
cleanup.format_source_code=true
cleanup.format_source_code_changes_only=false
cleanup.insert_inferred_type_arguments=false
cleanup.make_local_variable_final=true
cleanup.make_parameters_final=true
cleanup.make_private_fields_final=true
cleanup.make_type_abstract_if_missing_method=false
cleanup.make_variable_declarations_final=true
cleanup.never_use_blocks=false
cleanup.never_use_parentheses_in_expressions=true
cleanup.organize_imports=true
cleanup.qualify_static_field_accesses_with_declaring_class=false
cleanup.qualify_static_member_accesses_through_instances_with_declaring_class=true
cleanup.qualify_static_member_accesses_through_subtypes_with_declaring_class=true
cleanup.qualify_static_member_accesses_with_declaring_class=true
cleanup.qualify_static_method_accesses_with_declaring_class=false
cleanup.remove_private_constructors=true
cleanup.remove_redundant_modifiers=false
cleanup.remove_redundant_semicolons=true
cleanup.remove_redundant_type_arguments=true
cleanup.remove_trailing_whitespaces=true
cleanup.remove_trailing_whitespaces_all=true
cleanup.remove_trailing_whitespaces_ignore_empty=false
cleanup.remove_unnecessary_casts=true
cleanup.remove_unnecessary_nls_tags=true
cleanup.remove_unused_imports=true
cleanup.remove_unused_local_variables=false
cleanup.remove_unused_private_fields=true
cleanup.remove_unused_private_members=false
cleanup.remove_unused_private_methods=true
cleanup.remove_unused_private_types=true
cleanup.sort_members=false
cleanup.sort_members_all=false
cleanup.use_anonymous_class_creation=false
cleanup.use_blocks=true
cleanup.use_blocks_only_for_return_and_throw=false
cleanup.use_lambda=true
cleanup.use_parentheses_in_expressions=true
cleanup.use_this_for_non_static_field_access=true
cleanup.use_this_for_non_static_field_access_only_if_necessary=false
cleanup.use_this_for_non_static_method_access=true
cleanup.use_this_for_non_static_method_access_only_if_necessary=false
cleanup_profile=_CAU-SE-Style
cleanup_settings_version=2
eclipse.preferences.version=1
editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true
formatter_profile=_CAU-SE-Style
formatter_settings_version=21
org.eclipse.jdt.ui.ignorelowercasenames=true
org.eclipse.jdt.ui.importorder=;
org.eclipse.jdt.ui.ondemandthreshold=99
org.eclipse.jdt.ui.staticondemandthreshold=99
sp_cleanup.add_default_serial_version_id=true
sp_cleanup.add_generated_serial_version_id=false
sp_cleanup.add_missing_annotations=true
sp_cleanup.add_missing_deprecated_annotations=true
sp_cleanup.add_missing_methods=false
sp_cleanup.add_missing_nls_tags=false
sp_cleanup.add_missing_override_annotations=true
sp_cleanup.add_missing_override_annotations_interface_methods=true
sp_cleanup.add_serial_version_id=false
sp_cleanup.always_use_blocks=true
sp_cleanup.always_use_parentheses_in_expressions=false
sp_cleanup.always_use_this_for_non_static_field_access=true
sp_cleanup.always_use_this_for_non_static_method_access=true
sp_cleanup.convert_functional_interfaces=false
sp_cleanup.convert_to_enhanced_for_loop=true
sp_cleanup.correct_indentation=true
sp_cleanup.format_source_code=true
sp_cleanup.format_source_code_changes_only=false
sp_cleanup.insert_inferred_type_arguments=false
sp_cleanup.make_local_variable_final=true
sp_cleanup.make_parameters_final=true
sp_cleanup.make_private_fields_final=true
sp_cleanup.make_type_abstract_if_missing_method=false
sp_cleanup.make_variable_declarations_final=true
sp_cleanup.never_use_blocks=false
sp_cleanup.never_use_parentheses_in_expressions=true
sp_cleanup.on_save_use_additional_actions=true
sp_cleanup.organize_imports=true
sp_cleanup.qualify_static_field_accesses_with_declaring_class=false
sp_cleanup.qualify_static_member_accesses_through_instances_with_declaring_class=true
sp_cleanup.qualify_static_member_accesses_through_subtypes_with_declaring_class=true
sp_cleanup.qualify_static_member_accesses_with_declaring_class=true
sp_cleanup.qualify_static_method_accesses_with_declaring_class=false
sp_cleanup.remove_private_constructors=true
sp_cleanup.remove_redundant_modifiers=false
sp_cleanup.remove_redundant_semicolons=true
sp_cleanup.remove_redundant_type_arguments=true
sp_cleanup.remove_trailing_whitespaces=true
sp_cleanup.remove_trailing_whitespaces_all=true
sp_cleanup.remove_trailing_whitespaces_ignore_empty=false
sp_cleanup.remove_unnecessary_casts=true
sp_cleanup.remove_unnecessary_nls_tags=true
sp_cleanup.remove_unused_imports=true
sp_cleanup.remove_unused_local_variables=false
sp_cleanup.remove_unused_private_fields=true
sp_cleanup.remove_unused_private_members=false
sp_cleanup.remove_unused_private_methods=true
sp_cleanup.remove_unused_private_types=true
sp_cleanup.sort_members=false
sp_cleanup.sort_members_all=false
sp_cleanup.use_anonymous_class_creation=false
sp_cleanup.use_blocks=true
sp_cleanup.use_blocks_only_for_return_and_throw=false
sp_cleanup.use_lambda=true
sp_cleanup.use_parentheses_in_expressions=true
sp_cleanup.use_this_for_non_static_field_access=true
sp_cleanup.use_this_for_non_static_field_access_only_if_necessary=false
sp_cleanup.use_this_for_non_static_method_access=true
sp_cleanup.use_this_for_non_static_method_access_only_if_necessary=false
\ No newline at end of file
......@@ -13,21 +13,19 @@ repositories {
}
dependencies {
// These dependencies are used internally, and not exposed to consumers on their own compile classpath.
implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true }
implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true }
implementation 'com.google.code.gson:gson:2.8.2'
implementation 'com.google.guava:guava:24.1-jre'
implementation group: 'org.apache.beam', name: 'beam-sdks-java-core', version: '2.22.0'
implementation('org.apache.beam:beam-sdks-java-io-kafka:2.22.0'){
exclude group: 'org.apache.kafka', module: 'kafka-clients'
}
implementation ('io.confluent:kafka-streams-avro-serde:5.3.2')
implementation group: 'org.slf4j', name: 'slf4j-simple', version: '1.7.30'
implementation group: 'org.apache.beam', name: 'beam-sdks-java-core', version: '2.22.0'
runtimeOnly 'org.slf4j:slf4j-api:1.7.32'
runtimeOnly 'org.slf4j:slf4j-jdk14:1.7.32'
// Use JUnit test framework
testImplementation 'junit:junit:4.12'
}
......@@ -12,6 +12,9 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
*/
public class AbstractPipeline extends Pipeline {
private static final String KAFKA_CONFIG_SPECIFIC_AVRO_READER = "specific.avro.reader"; // NOPMD
private static final String KAFKA_CONFIG_SCHEMA_REGISTRY_URL = "schema.registry.url"; // NOPMD
protected final String inputTopic;
protected final String bootstrapServer;
// Application Configurations
......@@ -21,8 +24,8 @@ public class AbstractPipeline extends Pipeline {
super(options);
this.config = config;
inputTopic = config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC);
bootstrapServer = config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS);
this.inputTopic = config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC);
this.bootstrapServer = config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS);
}
/**
......@@ -32,19 +35,37 @@ public class AbstractPipeline extends Pipeline {
*/
public Map<String, Object> buildConsumerConfig() {
final Map<String, Object> consumerConfig = new HashMap<>();
consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
config.getString(ConfigurationKeys.ENABLE_AUTO_COMMIT_CONFIG));
consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
config
.getString(ConfigurationKeys.AUTO_OFFSET_RESET_CONFIG));
consumerConfig.put("schema.registry.url",
config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL));
consumerConfig.put("specific.avro.reader",
config.getString(ConfigurationKeys.SPECIFIC_AVRO_READER));
final String applicationName = config.getString(ConfigurationKeys.APPLICATION_NAME);
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, applicationName);
consumerConfig.put(
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
this.config.getString(ConfigurationKeys.ENABLE_AUTO_COMMIT_CONFIG));
consumerConfig.put(
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
this.config.getString(ConfigurationKeys.AUTO_OFFSET_RESET_CONFIG));
consumerConfig.put(
KAFKA_CONFIG_SCHEMA_REGISTRY_URL,
this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL));
consumerConfig.put(
KAFKA_CONFIG_SPECIFIC_AVRO_READER,
this.config.getString(ConfigurationKeys.SPECIFIC_AVRO_READER));
consumerConfig.put(
ConsumerConfig.GROUP_ID_CONFIG,
this.config.getString(ConfigurationKeys.APPLICATION_NAME));
return consumerConfig;
}
/**
* Builds a simple configuration for a Kafka producer transformation.
*
* @return the build configuration.
*/
public Map<String, Object> buildProducerConfig() {
final Map<String, Object> config = new HashMap<>();
config.put(
KAFKA_CONFIG_SCHEMA_REGISTRY_URL,
this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL));
config.put(
KAFKA_CONFIG_SPECIFIC_AVRO_READER,
this.config.getString(ConfigurationKeys.SPECIFIC_AVRO_READER));
return config;
}
}
package theodolite.commons.beam.kafka;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import titan.ccp.model.records.ActivePowerRecord;
/**
* A Kafka {@link Deserializer} for typed Schema Registry {@link ActivePowerRecord}.
*/
public class ActivePowerRecordDeserializer extends SpecificAvroDeserializer<ActivePowerRecord> {
}
package theodolite.commons.beam.kafka;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import java.util.Map;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.kafka.common.serialization.StringDeserializer;
import titan.ccp.model.records.ActivePowerRecord;
/**
* Simple {@link PTransform} that read from Kafka using {@link KafkaIO}.
*/
public class KafkaActivePowerRecordReader extends
PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> {
private static final long serialVersionUID = 2603286150183186115L;
private final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> reader;
/**
* Instantiates a {@link PTransform} that reads from Kafka with the given Configuration.
*/
public KafkaActivePowerRecordReader(final String bootstrapServer, final String inputTopic,
final Map<String, Object> consumerConfig) {
super();
if (bootstrapServer == null) {
throw new IllegalArgumentException("bootstrapServer is null");
}
if (inputTopic == null) {
throw new IllegalArgumentException("inputTopic is null");
}
// Check if boostrap server and inputTopic are defined
if (bootstrapServer.isEmpty() || inputTopic.isEmpty()) {
throw new IllegalArgumentException("bootstrapServer or inputTopic missing");
}
reader =
KafkaIO.<String, ActivePowerRecord>read()
.withBootstrapServers(bootstrapServer)
.withTopic(inputTopic)
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializerAndCoder((Class) KafkaAvroDeserializer.class,
AvroCoder.of(ActivePowerRecord.class))
.withConsumerConfigUpdates(consumerConfig)
.withoutMetadata();
}
@Override
public PCollection<KV<String, ActivePowerRecord>> expand(final PBegin input) {
return input.apply(this.reader);
}
}
package theodolite.commons.beam.kafka;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import java.util.Map;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.io.kafka.KafkaIO;
......@@ -12,40 +11,37 @@ import org.apache.kafka.common.serialization.StringDeserializer;
import titan.ccp.model.records.ActivePowerRecord;
/**
* Simple {@link PTransform} that read from Kafka using {@link KafkaIO}.
* Has additional a TimestampPolicy.
* Simple {@link PTransform} that reads from Kafka using {@link KafkaIO} with event time.
*/
public class KafkaActivePowerTimestampReader extends
PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> {
public class KafkaActivePowerTimestampReader
extends PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> {
private static final long serialVersionUID = 2603286150183186115L;
private final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> reader;
/**
* Instantiates a {@link PTransform} that reads from Kafka with the given Configuration.
*/
public KafkaActivePowerTimestampReader(final String bootstrapServer, final String inputTopic,
final Map<String, Object> consumerConfig) {
public KafkaActivePowerTimestampReader(
final String bootstrapServer,
final String inputTopic,
final Map<String, Object> consumerConfig) {
super();
// Check if boostrap server and inputTopic are defined
// Check if bootstrap server and inputTopic are defined
if (bootstrapServer.isEmpty() || inputTopic.isEmpty()) {
throw new IllegalArgumentException("bootstrapServer or inputTopic missing");
}
reader =
KafkaIO.<String, ActivePowerRecord>read()
.withBootstrapServers(bootstrapServer)
.withTopic(inputTopic)
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializerAndCoder((Class) KafkaAvroDeserializer.class,
AvroCoder.of(ActivePowerRecord.class))
.withConsumerConfigUpdates(consumerConfig)
// Set TimeStampPolicy for event time
.withTimestampPolicyFactory(
(tp, previousWaterMark) -> new EventTimePolicy(previousWaterMark))
.withoutMetadata();
this.reader = KafkaIO.<String, ActivePowerRecord>read().withBootstrapServers(bootstrapServer)
.withTopic(inputTopic).withKeyDeserializer(StringDeserializer.class)
.withValueDeserializerAndCoder(
ActivePowerRecordDeserializer.class,
AvroCoder.of(ActivePowerRecord.class))
.withConsumerConfigUpdates(consumerConfig)
.withTimestampPolicyFactory(
(tp, previousWatermark) -> new EventTimePolicy(previousWatermark))
.withoutMetadata();
}
@Override
......
package theodolite.commons.beam.kafka;
import java.util.Map;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.KV;
......@@ -9,23 +10,35 @@ import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
/**
* Wrapper for a Kafka writing Transformation
* where the value type can be generic.
* Wrapper for a Kafka writing Transformation where the value type can be generic.
*
* @param <T> type of the value.
*/
public class KafkaWriterTransformation<T> extends
PTransform<PCollection<KV<String, T>>, PDone> {
public class KafkaWriterTransformation<T> extends PTransform<PCollection<KV<String, T>>, PDone> {
private static final long serialVersionUID = 3171423303843174723L;
private final PTransform<PCollection<KV<String, T>>, PDone> writer;
/**
* Creates a new kafka writer transformation.
* Creates a new Kafka writer transformation.
*/
public KafkaWriterTransformation(final String bootstrapServer, final String outputTopic,
final Class<? extends Serializer<T>> valueSerializer) {
public KafkaWriterTransformation(
final String bootstrapServer,
final String outputTopic,
final Class<? extends Serializer<T>> valueSerializer) {
this(bootstrapServer, outputTopic, valueSerializer, Map.of());
}
/**
* Creates a new Kafka writer transformation.
*/
public KafkaWriterTransformation(
final String bootstrapServer,
final String outputTopic,
final Class<? extends Serializer<T>> valueSerializer,
final Map<String, Object> producerConfig) {
super();
// Check if boostrap server and outputTopic are defined
// Check if bootstrap server and outputTopic are defined
if (bootstrapServer.isEmpty() || outputTopic.isEmpty()) {
throw new IllegalArgumentException("bootstrapServer or outputTopic missing");
}
......@@ -34,7 +47,8 @@ public class KafkaWriterTransformation<T> extends
.withBootstrapServers(bootstrapServer)
.withTopic(outputTopic)
.withKeySerializer(StringSerializer.class)
.withValueSerializer(valueSerializer);
.withValueSerializer(valueSerializer)
.withProducerConfigUpdates(producerConfig);
}
......
......@@ -61,7 +61,7 @@ cleanup_settings_version=2
eclipse.preferences.version=1
editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true
formatter_profile=_CAU-SE-Style
formatter_settings_version=15
formatter_settings_version=21
org.eclipse.jdt.ui.ignorelowercasenames=true
org.eclipse.jdt.ui.importorder=;
org.eclipse.jdt.ui.ondemandthreshold=99
......
......@@ -61,7 +61,7 @@ cleanup_settings_version=2
eclipse.preferences.version=1
editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true
formatter_profile=_CAU-SE-Style
formatter_settings_version=15
formatter_settings_version=21
org.eclipse.jdt.ui.ignorelowercasenames=true
org.eclipse.jdt.ui.importorder=;
org.eclipse.jdt.ui.ondemandthreshold=99
......
......@@ -61,7 +61,7 @@ cleanup_settings_version=2
eclipse.preferences.version=1
editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true
formatter_profile=_CAU-SE-Style
formatter_settings_version=15
formatter_settings_version=21
org.eclipse.jdt.ui.ignorelowercasenames=true
org.eclipse.jdt.ui.importorder=;
org.eclipse.jdt.ui.ondemandthreshold=99
......
......@@ -10,7 +10,6 @@ import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo;
import static com.github.tomakehurst.wiremock.client.WireMock.verify;
import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.options;
import com.github.tomakehurst.wiremock.junit.WireMockRule;
import com.google.gson.Gson;
import java.net.URI;
import org.junit.Before;
import org.junit.Rule;
......@@ -21,8 +20,6 @@ public class HttpRecordSenderTest {
private HttpRecordSender<ActivePowerRecord> httpRecordSender;
private Gson gson;
@Rule
public WireMockRule wireMockRule = new WireMockRule(options().dynamicPort());
......@@ -30,7 +27,6 @@ public class HttpRecordSenderTest {
public void setup() {
this.httpRecordSender =
new HttpRecordSender<>(URI.create("http://localhost:" + this.wireMockRule.port()));
this.gson = new Gson();
}
@Test
......
......@@ -8,7 +8,6 @@ import org.slf4j.LoggerFactory;
/**
* Logs all Key Value pairs.
*/
@SuppressWarnings({"unused"})
public class LogKeyValue extends DoFn<KV<String, String>, KV<String, String>> {
private static final long serialVersionUID = 4328743;
private static final Logger LOGGER = LoggerFactory.getLogger(LogKeyValue.class);
......@@ -19,9 +18,7 @@ public class LogKeyValue extends DoFn<KV<String, String>, KV<String, String>> {
@ProcessElement
public void processElement(@Element final KV<String, String> kv,
final OutputReceiver<KV<String, String>> out) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Key: {}, Value: {}", kv.getKey(), kv.getValue());
}
LOGGER.info("Key: {}, Value: {}", kv.getKey(), kv.getValue());
out.output(kv);
}
}
......@@ -61,7 +61,7 @@ cleanup_settings_version=2
eclipse.preferences.version=1
editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true
formatter_profile=_CAU-SE-Style
formatter_settings_version=15
formatter_settings_version=21
org.eclipse.jdt.ui.ignorelowercasenames=true
org.eclipse.jdt.ui.importorder=;
org.eclipse.jdt.ui.ondemandthreshold=99
......
......@@ -61,7 +61,7 @@ cleanup_settings_version=2
eclipse.preferences.version=1
editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true
formatter_profile=_CAU-SE-Style
formatter_settings_version=15
formatter_settings_version=21
org.eclipse.jdt.ui.ignorelowercasenames=true
org.eclipse.jdt.ui.importorder=;
org.eclipse.jdt.ui.ondemandthreshold=99
......
......@@ -61,7 +61,7 @@ cleanup_settings_version=2
eclipse.preferences.version=1
editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true
formatter_profile=_CAU-SE-Style
formatter_settings_version=15
formatter_settings_version=21
org.eclipse.jdt.ui.ignorelowercasenames=true
org.eclipse.jdt.ui.importorder=;
org.eclipse.jdt.ui.ondemandthreshold=99
......
......@@ -61,7 +61,7 @@ cleanup_settings_version=2
eclipse.preferences.version=1
editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true
formatter_profile=_CAU-SE-Style
formatter_settings_version=15
formatter_settings_version=21
org.eclipse.jdt.ui.ignorelowercasenames=true
org.eclipse.jdt.ui.importorder=;
org.eclipse.jdt.ui.ondemandthreshold=99
......