Skip to content
Snippets Groups Projects
Commit a7bbf1c9 authored by Lorenz Boguhn's avatar Lorenz Boguhn
Browse files

Fix pmd and checkstyle uc1+uc2 beam

parent a11881e7
No related branches found
No related tags found
1 merge request!187Migrate Beam benchmark implementation
......@@ -11,10 +11,7 @@ import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.commons.configuration2.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import theodolite.commons.beam.AbstractPipeline;
import theodolite.commons.beam.ConfigurationKeys;
import theodolite.commons.beam.kafka.KafkaActivePowerRecordReader;
import titan.ccp.model.records.ActivePowerRecord;
......@@ -29,9 +26,7 @@ import titan.ccp.model.records.ActivePowerRecord;
*/
public final class Uc1BeamPipeline extends AbstractPipeline {
private static final Logger LOGGER = LoggerFactory.getLogger(Uc1BeamPipeline.class);
Uc1BeamPipeline(PipelineOptions options, Configuration config) {
protected Uc1BeamPipeline(final PipelineOptions options, final Configuration config) {
super(options, config);
// Set Coders for Classes that will be distributed
......
......@@ -38,17 +38,17 @@ import titan.ccp.model.records.ActivePowerRecord;
*/
public final class Uc2BeamPipeline extends AbstractPipeline {
protected Uc2BeamPipeline(PipelineOptions options, Configuration config) {
protected Uc2BeamPipeline(final PipelineOptions options,final Configuration config) {
super(options, config);
// Additional needed variables
String outputTopic = config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC);
final String outputTopic = config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC);
final int windowDurationMinutes = Integer.parseInt(
config.getString(ConfigurationKeys.KAFKA_WINDOW_DURATION_MINUTES));
final Duration duration = Duration.standardMinutes(windowDurationMinutes);
// Build kafka configuration
Properties consumerConfig = buildConsumerConfig();
final Properties consumerConfig = buildConsumerConfig();
// Set Coders for Classes that will be distributed
final CoderRegistry cr = this.getCoderRegistry();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment