Skip to content
Snippets Groups Projects
Commit df842778 authored by Lorenz Boguhn's avatar Lorenz Boguhn Committed by Lorenz Boguhn
Browse files

Small code quality fixes

parent ea2b36c6
No related branches found
No related tags found
1 merge request!187Migrate Beam benchmark implementation
...@@ -20,7 +20,7 @@ public class AbstractBeamService { ...@@ -20,7 +20,7 @@ public class AbstractBeamService {
// Beam Pipeline // Beam Pipeline
protected PipelineOptions options; protected PipelineOptions options;
public AbstractBeamService(String[] args) { public AbstractBeamService(final String[] args) { //NOPMD
options = PipelineOptionsFactory.fromArgs(args).create(); options = PipelineOptionsFactory.fromArgs(args).create();
options.setJobName(APPLICATION_NAME); options.setJobName(APPLICATION_NAME);
} }
...@@ -29,13 +29,7 @@ public class AbstractBeamService { ...@@ -29,13 +29,7 @@ public class AbstractBeamService {
/** /**
* Abstract main for a Beam Service. * Abstract main for a Beam Service.
*/ */
public static void main(final String[] args) { public static void main(final String[] args){} //NOPMD
AbstractBeamService service = new AbstractBeamService(args);
service.run();
}
public void run() {
}
/** /**
* Builds a simple configuration for a Kafka consumer. * Builds a simple configuration for a Kafka consumer.
...@@ -43,7 +37,7 @@ public class AbstractBeamService { ...@@ -43,7 +37,7 @@ public class AbstractBeamService {
* @return the build Kafka consumer configuration. * @return the build Kafka consumer configuration.
*/ */
public Properties buildConsumerConfig() { public Properties buildConsumerConfig() {
Properties consumerConfig = new Properties(); final Properties consumerConfig = new Properties();
consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
CONFIG.getString(ConfigurationKeys.ENABLE_AUTO_COMMIT_CONFIG)); CONFIG.getString(ConfigurationKeys.ENABLE_AUTO_COMMIT_CONFIG));
consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
......
package theodolite.commons.beam.kafka; package theodolite.commons.beam.kafka;
import io.confluent.kafka.serializers.KafkaAvroDeserializer; import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import java.util.Map;
import java.util.Properties; import java.util.Properties;
import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.io.kafka.KafkaIO; import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.PTransform;
...@@ -19,15 +17,17 @@ import titan.ccp.model.records.ActivePowerRecord; ...@@ -19,15 +17,17 @@ import titan.ccp.model.records.ActivePowerRecord;
public class KafkaAggregatedPowerRecordReader extends public class KafkaAggregatedPowerRecordReader extends
PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> { PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> {
private static final long serialVersionUID = 2603286150183186115L;
private final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> reader; private final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> reader;
/** /**
* Instantiates a {@link PTransform} that reads from Kafka with the given Configuration. * Instantiates a {@link PTransform} that reads from Kafka with the given Configuration.
*/ */
@SuppressWarnings({"unchecked", "rawtypes"}) @SuppressWarnings({"unchecked", "rawtypes"})
public KafkaAggregatedPowerRecordReader(String bootstrapServer, String inputTopic, public KafkaAggregatedPowerRecordReader(final String bootstrapServer, final String inputTopic,
Properties consumerConfig) { final Properties consumerConfig) {
super(); super();
// Check if boostrap server and inputTopic are defined // Check if boostrap server and inputTopic are defined
...@@ -47,7 +47,7 @@ public class KafkaAggregatedPowerRecordReader extends ...@@ -47,7 +47,7 @@ public class KafkaAggregatedPowerRecordReader extends
} }
@Override @Override
public PCollection<KV<String, ActivePowerRecord>> expand(PBegin input) { public PCollection<KV<String, ActivePowerRecord>> expand(final PBegin input) {
return input.apply(this.reader); return input.apply(this.reader);
} }
......
package application; package application;
import com.google.gson.Gson; import com.google.gson.Gson;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import java.util.Properties; import java.util.Properties;
import org.apache.beam.runners.flink.FlinkRunner; import org.apache.beam.runners.flink.FlinkRunner;
import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.PTransform;
...@@ -16,7 +14,6 @@ import org.apache.beam.sdk.transforms.SimpleFunction; ...@@ -16,7 +14,6 @@ import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollection;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import theodolite.commons.beam.AbstractBeamService; import theodolite.commons.beam.AbstractBeamService;
...@@ -24,6 +21,7 @@ import theodolite.commons.beam.ConfigurationKeys; ...@@ -24,6 +21,7 @@ import theodolite.commons.beam.ConfigurationKeys;
import theodolite.commons.beam.kafka.KafkaAggregatedPowerRecordReader; import theodolite.commons.beam.kafka.KafkaAggregatedPowerRecordReader;
import titan.ccp.model.records.ActivePowerRecord; import titan.ccp.model.records.ActivePowerRecord;
/** /**
* Implementation of the use case Database Storage using Apache Beam with the Flink Runner. To * Implementation of the use case Database Storage using Apache Beam with the Flink Runner. To
* execute locally in standalone start Kafka, Zookeeper, the schema-registry and the workload * execute locally in standalone start Kafka, Zookeeper, the schema-registry and the workload
...@@ -50,9 +48,8 @@ public final class Uc1ApplicationBeam extends AbstractBeamService { ...@@ -50,9 +48,8 @@ public final class Uc1ApplicationBeam extends AbstractBeamService {
/** /**
* Main method. * Main method.
*
*/ */
@SuppressWarnings({"unchecked", "rawtypes","unused"}) @SuppressWarnings({"unchecked", "rawtypes", "unused"})
public static void main(final String[] args) { public static void main(final String[] args) {
final Uc1ApplicationBeam uc1 = new Uc1ApplicationBeam(args); final Uc1ApplicationBeam uc1 = new Uc1ApplicationBeam(args);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment