Skip to content
Snippets Groups Projects
Commit dede63aa authored by Lorenz Boguhn's avatar Lorenz Boguhn
Browse files

Small code quality fixes

parent 86d4732c
No related branches found
No related tags found
1 merge request!187Migrate Beam benchmark implementation
......@@ -20,7 +20,7 @@ public class AbstractBeamService {
// Beam Pipeline
protected PipelineOptions options;
public AbstractBeamService(String[] args) {
public AbstractBeamService(final String[] args) { //NOPMD
options = PipelineOptionsFactory.fromArgs(args).create();
options.setJobName(APPLICATION_NAME);
}
......@@ -29,13 +29,7 @@ public class AbstractBeamService {
/**
* Abstract main for a Beam Service.
*/
public static void main(final String[] args) {
AbstractBeamService service = new AbstractBeamService(args);
service.run();
}
public void run() {
}
public static void main(final String[] args){} //NOPMD
/**
* Builds a simple configuration for a Kafka consumer.
......@@ -43,7 +37,7 @@ public class AbstractBeamService {
* @return the build Kafka consumer configuration.
*/
public Properties buildConsumerConfig() {
Properties consumerConfig = new Properties();
final Properties consumerConfig = new Properties();
consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
CONFIG.getString(ConfigurationKeys.ENABLE_AUTO_COMMIT_CONFIG));
consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
......
package theodolite.commons.beam.kafka;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import java.util.Map;
import java.util.Properties;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.transforms.PTransform;
......@@ -19,15 +17,17 @@ import titan.ccp.model.records.ActivePowerRecord;
public class KafkaAggregatedPowerRecordReader extends
PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> {
private static final long serialVersionUID = 2603286150183186115L;
private final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> reader;
/**
* Instantiates a {@link PTransform} that reads from Kafka with the given Configuration.
*/
@SuppressWarnings({"unchecked", "rawtypes"})
public KafkaAggregatedPowerRecordReader(String bootstrapServer, String inputTopic,
Properties consumerConfig) {
public KafkaAggregatedPowerRecordReader(final String bootstrapServer, final String inputTopic,
final Properties consumerConfig) {
super();
// Check if boostrap server and inputTopic are defined
......@@ -47,7 +47,7 @@ public class KafkaAggregatedPowerRecordReader extends
}
@Override
public PCollection<KV<String, ActivePowerRecord>> expand(PBegin input) {
public PCollection<KV<String, ActivePowerRecord>> expand(final PBegin input) {
return input.apply(this.reader);
}
......
package application;
import com.google.gson.Gson;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import java.util.Properties;
import org.apache.beam.runners.flink.FlinkRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
......@@ -16,7 +14,6 @@ import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import theodolite.commons.beam.AbstractBeamService;
......@@ -24,6 +21,7 @@ import theodolite.commons.beam.ConfigurationKeys;
import theodolite.commons.beam.kafka.KafkaAggregatedPowerRecordReader;
import titan.ccp.model.records.ActivePowerRecord;
/**
* Implementation of the use case Database Storage using Apache Beam with the Flink Runner. To
* execute locally in standalone start Kafka, Zookeeper, the schema-registry and the workload
......@@ -50,9 +48,8 @@ public final class Uc1ApplicationBeam extends AbstractBeamService {
/**
* Main method.
*
*/
@SuppressWarnings({"unchecked", "rawtypes","unused"})
@SuppressWarnings({"unchecked", "rawtypes", "unused"})
public static void main(final String[] args) {
final Uc1ApplicationBeam uc1 = new Uc1ApplicationBeam(args);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment