diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java index f43f508906eb17044bb21f0db41c4f6f365bf57a..dc289826352b11c70c5a2858af6d1bcd9f41ff5e 100644 --- a/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java +++ b/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java @@ -11,10 +11,7 @@ import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.commons.configuration2.Configuration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import theodolite.commons.beam.AbstractPipeline; -import theodolite.commons.beam.ConfigurationKeys; import theodolite.commons.beam.kafka.KafkaActivePowerRecordReader; import titan.ccp.model.records.ActivePowerRecord; @@ -29,9 +26,7 @@ import titan.ccp.model.records.ActivePowerRecord; */ public final class Uc1BeamPipeline extends AbstractPipeline { - private static final Logger LOGGER = LoggerFactory.getLogger(Uc1BeamPipeline.class); - - Uc1BeamPipeline(PipelineOptions options, Configuration config) { + protected Uc1BeamPipeline(final PipelineOptions options, final Configuration config) { super(options, config); // Set Coders for Classes that will be distributed diff --git a/theodolite-benchmarks/uc2-beam/src/main/java/application/Uc2BeamPipeline.java b/theodolite-benchmarks/uc2-beam/src/main/java/application/Uc2BeamPipeline.java index 78988e4cb49542fac3d1f5ff99fd9575c45e165b..fa7d455e3625309843cc055a39721873f8d51d8f 100644 --- a/theodolite-benchmarks/uc2-beam/src/main/java/application/Uc2BeamPipeline.java +++ b/theodolite-benchmarks/uc2-beam/src/main/java/application/Uc2BeamPipeline.java @@ -38,17 +38,17 @@ import titan.ccp.model.records.ActivePowerRecord; */ public final class Uc2BeamPipeline extends AbstractPipeline { - protected Uc2BeamPipeline(PipelineOptions options, Configuration config) { + protected Uc2BeamPipeline(final PipelineOptions options,final Configuration config) { super(options, config); // Additional needed variables - String outputTopic = config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC); + final String outputTopic = config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC); final int windowDurationMinutes = Integer.parseInt( config.getString(ConfigurationKeys.KAFKA_WINDOW_DURATION_MINUTES)); final Duration duration = Duration.standardMinutes(windowDurationMinutes); // Build kafka configuration - Properties consumerConfig = buildConsumerConfig(); + final Properties consumerConfig = buildConsumerConfig(); // Set Coders for Classes that will be distributed final CoderRegistry cr = this.getCoderRegistry();