Skip to content
Snippets Groups Projects
Commit 26020df8 authored by Lorenz Boguhn's avatar Lorenz Boguhn
Browse files

Refactor uc1-beam for checkstyle, pmd, spotbugs


Co-authored-by: default avatarJan Bensien <stu128012@mail.uni-kiel.de>
parent ae2553f5
No related branches found
No related tags found
1 merge request!187Migrate Beam benchmark implementation
This commit is part of merge request !187. Comments created here will be created in the context of that merge request.
...@@ -20,6 +20,8 @@ import org.apache.beam.sdk.values.PBegin; ...@@ -20,6 +20,8 @@ import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollection;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import titan.ccp.model.records.ActivePowerRecord; import titan.ccp.model.records.ActivePowerRecord;
/** /**
...@@ -30,25 +32,46 @@ import titan.ccp.model.records.ActivePowerRecord; ...@@ -30,25 +32,46 @@ import titan.ccp.model.records.ActivePowerRecord;
* ${workspace_loc:/uc1-application-samza/eclipseConsoleLogs.log} as Output File under Standard * ${workspace_loc:/uc1-application-samza/eclipseConsoleLogs.log} as Output File under Standard
* Input Output in Common in the Run Configuration Start via Eclipse Run. * Input Output in Common in the Run Configuration Start via Eclipse Run.
*/ */
public class Uc1ApplicationBeam { public final class Uc1ApplicationBeam {
@SuppressWarnings({"unchecked", "rawtypes", "serial"}) private static final Logger LOGGER = LoggerFactory.getLogger(Uc1ApplicationBeam.class);
private static final String BOOTSTRAP = "KAFKA_BOOTSTRAP_SERVERS";
private static final String INPUT = "INPUT";
private static final String SCHEMA_REGISTRY = "SCHEMA_REGISTRY_URL";
private static final String YES = "true";
private static final String USE_AVRO_READER = YES;
private static final String AUTO_COMMIT_CONFIG = YES;
/**
* Private constructor to avoid instantiation.
*/
private Uc1ApplicationBeam() {
throw new UnsupportedOperationException();
}
/**
* Main method.
*
*/
@SuppressWarnings({"unchecked", "rawtypes","unused"})
public static void main(final String[] args) { public static void main(final String[] args) {
// Set Configuration for Kafka // Set Configuration for Kafka
final String bootstrapServer = final String bootstrapServer =
System.getenv("KAFKA_BOOTSTRAP_SERVERS") != null ? System.getenv("KAFKA_BOOTSTRAP_SERVERS") System.getenv(BOOTSTRAP) == null ? "my-confluent-cp-kafka:9092"
: "my-confluent-cp-kafka:9092"; : System.getenv(BOOTSTRAP);
final String inputTopic = System.getenv("INPUT") != null ? System.getenv("INPUT") : "input"; final String inputTopic = System.getenv(INPUT) == null ? "input" : System.getenv(INPUT);
final String schemaRegistryURL = final String schemaRegistryUrl =
System.getenv("SCHEMA_REGISTRY_URL") != null ? System.getenv("SCHEMA_REGISTRY_URL") System.getenv(SCHEMA_REGISTRY) == null ? "http://my-confluent-cp-schema-registry:8081"
: "http://my-confluent-cp-schema-registry:8081"; : System.getenv(SCHEMA_REGISTRY);
// Set consumer configuration for the schema registry and commits back to Kafka // Set consumer configuration for the schema registry and commits back to Kafka
final HashMap<String, Object> consumerConfig = new HashMap<>(); final HashMap<String, Object> consumerConfig = new HashMap<>();
consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, AUTO_COMMIT_CONFIG);
consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerConfig.put("schema.registry.url", schemaRegistryURL); consumerConfig.put("schema.registry.url", schemaRegistryUrl);
consumerConfig.put("specific.avro.reader", "true"); consumerConfig.put("specific.avro.reader", USE_AVRO_READER);
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "uc-application"); consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "uc-application");
final LogKeyValue logKeyValue = new LogKeyValue();
final PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create(); final PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
options.setJobName("ucapplication"); options.setJobName("ucapplication");
...@@ -70,6 +93,7 @@ public class Uc1ApplicationBeam { ...@@ -70,6 +93,7 @@ public class Uc1ApplicationBeam {
AvroCoder.of(ActivePowerRecord.class)) AvroCoder.of(ActivePowerRecord.class))
.withConsumerConfigUpdates(consumerConfig) .withConsumerConfigUpdates(consumerConfig)
.withoutMetadata(); .withoutMetadata();
// Apply pipeline transformations // Apply pipeline transformations
// Read from Kafka // Read from Kafka
pipeline.apply(kafka) pipeline.apply(kafka)
...@@ -77,7 +101,7 @@ public class Uc1ApplicationBeam { ...@@ -77,7 +101,7 @@ public class Uc1ApplicationBeam {
.apply(MapElements .apply(MapElements
.via( .via(
new SimpleFunction<KV<String, ActivePowerRecord>, KV<String, String>>() { new SimpleFunction<KV<String, ActivePowerRecord>, KV<String, String>>() {
transient Gson gsonObj = new Gson(); private transient Gson gsonObj = new Gson();
@Override @Override
public KV<String, String> apply( public KV<String, String> apply(
...@@ -90,18 +114,25 @@ public class Uc1ApplicationBeam { ...@@ -90,18 +114,25 @@ public class Uc1ApplicationBeam {
} }
})) }))
// Print to console // Print to console
.apply(ParDo.of(new DoFn<KV<String, String>, KV<String, String>>() { .apply(ParDo.of(logKeyValue));
@ProcessElement
public void processElement(@Element final KV<String, String> kv,
final OutputReceiver<KV<String, String>> out) {
System.out.println("Key: " + kv.getKey() + "Value: " + kv.getValue());
// out.output(kv);
}
}));
// Submit job and start execution // Submit job and start execution
pipeline.run().waitUntilFinish(); pipeline.run().waitUntilFinish();
}
/**
* Logs all Key Value pairs.
*/
@SuppressWarnings({"unused"})
private static class LogKeyValue extends DoFn<KV<String, String>,KV<String, String>> {
private static final long serialVersionUID = 4328743;
@ProcessElement
public void processElement(@Element final KV<String, String> kv,
final OutputReceiver<KV<String, String>> out) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Key: " + kv.getKey() + "Value: " + kv.getValue());
}
}
} }
} }
...@@ -43,8 +43,6 @@ public final class Uc1ApplicationBeam { ...@@ -43,8 +43,6 @@ public final class Uc1ApplicationBeam {
private static final String USE_AVRO_READER = YES; private static final String USE_AVRO_READER = YES;
private static final String AUTO_COMMIT_CONFIG = YES; private static final String AUTO_COMMIT_CONFIG = YES;
/** /**
* Private constructor to avoid instantiation. * Private constructor to avoid instantiation.
*/ */
...@@ -81,7 +79,7 @@ public final class Uc1ApplicationBeam { ...@@ -81,7 +79,7 @@ public final class Uc1ApplicationBeam {
// --samzaExecutionEnvironment=STANDALONE // --samzaExecutionEnvironment=STANDALONE
// --maxSourceParallelism=1024 // --maxSourceParallelism=1024
final LoggKeys logging = new LoggKeys(); final LogKeyValue logKeyValue = new LogKeyValue();
final PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create(); final PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
options.setJobName("ucapplication"); options.setJobName("ucapplication");
...@@ -124,16 +122,16 @@ public final class Uc1ApplicationBeam { ...@@ -124,16 +122,16 @@ public final class Uc1ApplicationBeam {
} }
})) }))
// Print to console // Print to console
.apply(ParDo.of(logging)); .apply(ParDo.of(logKeyValue));
// Start execution // Start execution
pipeline.run().waitUntilFinish(); pipeline.run().waitUntilFinish();
} }
/** /**
* Logs all Keys it reads. * Logs all Logs all Key Value pairs..
*/ */
@SuppressWarnings({"unused"}) @SuppressWarnings({"unused"})
private static class LoggKeys extends DoFn<KV<String, String>,KV<String, String>> { private static class LogKeyValue extends DoFn<KV<String, String>,KV<String, String>> {
private static final long serialVersionUID = 4328743; private static final long serialVersionUID = 4328743;
@ProcessElement @ProcessElement
...@@ -144,7 +142,6 @@ public final class Uc1ApplicationBeam { ...@@ -144,7 +142,6 @@ public final class Uc1ApplicationBeam {
} }
} }
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment