diff --git a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractBeamService.java b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractBeamService.java index b91f47cfada9207118393ba33fcba65ab446faa5..debfd967ea956f94b939d686b87ce6d29254e449 100644 --- a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractBeamService.java +++ b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractBeamService.java @@ -20,7 +20,7 @@ public class AbstractBeamService { // Beam Pipeline protected PipelineOptions options; - public AbstractBeamService(String[] args) { + public AbstractBeamService(final String[] args) { //NOPMD options = PipelineOptionsFactory.fromArgs(args).create(); options.setJobName(APPLICATION_NAME); } @@ -29,13 +29,7 @@ public class AbstractBeamService { /** * Abstract main for a Beam Service. */ - public static void main(final String[] args) { - AbstractBeamService service = new AbstractBeamService(args); - service.run(); - } - - public void run() { - } + public static void main(final String[] args){} //NOPMD /** * Builds a simple configuration for a Kafka consumer. @@ -43,7 +37,7 @@ public class AbstractBeamService { * @return the build Kafka consumer configuration. */ public Properties buildConsumerConfig() { - Properties consumerConfig = new Properties(); + final Properties consumerConfig = new Properties(); consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, CONFIG.getString(ConfigurationKeys.ENABLE_AUTO_COMMIT_CONFIG)); consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, diff --git a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaAggregatedPowerRecordReader.java b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaAggregatedPowerRecordReader.java index 5976838d9fbe2ce3b3bc7f57df1e96f9ef8820f9..c919878046af0ab71e19f1ff33bd6d6b5f82c9ba 100644 --- a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaAggregatedPowerRecordReader.java +++ b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaAggregatedPowerRecordReader.java @@ -1,9 +1,7 @@ package theodolite.commons.beam.kafka; import io.confluent.kafka.serializers.KafkaAvroDeserializer; -import java.util.Map; import java.util.Properties; - import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.io.kafka.KafkaIO; import org.apache.beam.sdk.transforms.PTransform; @@ -19,15 +17,17 @@ import titan.ccp.model.records.ActivePowerRecord; public class KafkaAggregatedPowerRecordReader extends PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> { + private static final long serialVersionUID = 2603286150183186115L; private final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> reader; + /** * Instantiates a {@link PTransform} that reads from Kafka with the given Configuration. */ @SuppressWarnings({"unchecked", "rawtypes"}) - public KafkaAggregatedPowerRecordReader(String bootstrapServer, String inputTopic, - Properties consumerConfig) { + public KafkaAggregatedPowerRecordReader(final String bootstrapServer, final String inputTopic, + final Properties consumerConfig) { super(); // Check if boostrap server and inputTopic are defined @@ -47,7 +47,7 @@ public class KafkaAggregatedPowerRecordReader extends } @Override - public PCollection<KV<String, ActivePowerRecord>> expand(PBegin input) { + public PCollection<KV<String, ActivePowerRecord>> expand(final PBegin input) { return input.apply(this.reader); } diff --git a/theodolite-benchmarks/uc1-beam-flink/src/main/java/application/Uc1ApplicationBeam.java b/theodolite-benchmarks/uc1-beam-flink/src/main/java/application/Uc1ApplicationBeam.java index b4e1a4cef0063dd29f3bf86ea54a5bba62bf2dbf..e8707fcdfb8799eb987ec18394fa24c32a786cee 100644 --- a/theodolite-benchmarks/uc1-beam-flink/src/main/java/application/Uc1ApplicationBeam.java +++ b/theodolite-benchmarks/uc1-beam-flink/src/main/java/application/Uc1ApplicationBeam.java @@ -1,13 +1,11 @@ package application; import com.google.gson.Gson; -import io.confluent.kafka.serializers.KafkaAvroDeserializer; import java.util.Properties; import org.apache.beam.runners.flink.FlinkRunner; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.CoderRegistry; -import org.apache.beam.sdk.io.kafka.KafkaIO; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; @@ -16,7 +14,6 @@ import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; -import org.apache.kafka.common.serialization.StringDeserializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import theodolite.commons.beam.AbstractBeamService; @@ -24,6 +21,7 @@ import theodolite.commons.beam.ConfigurationKeys; import theodolite.commons.beam.kafka.KafkaAggregatedPowerRecordReader; import titan.ccp.model.records.ActivePowerRecord; + /** * Implementation of the use case Database Storage using Apache Beam with the Flink Runner. To * execute locally in standalone start Kafka, Zookeeper, the schema-registry and the workload @@ -50,9 +48,8 @@ public final class Uc1ApplicationBeam extends AbstractBeamService { /** * Main method. - * */ - @SuppressWarnings({"unchecked", "rawtypes","unused"}) + @SuppressWarnings({"unchecked", "rawtypes", "unused"}) public static void main(final String[] args) { final Uc1ApplicationBeam uc1 = new Uc1ApplicationBeam(args);