Skip to content
Snippets Groups Projects
Commit 2a295817 authored by Lorenz Boguhn's avatar Lorenz Boguhn Committed by Lorenz Boguhn
Browse files

Refactor uc1-beam for checkstyle, pmd, spotbugs


Co-authored-by: default avatarJan Bensien <stu128012@mail.uni-kiel.de>
parent cbe27e6f
No related branches found
No related tags found
1 merge request!187Migrate Beam benchmark implementation
......@@ -20,6 +20,8 @@ import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import titan.ccp.model.records.ActivePowerRecord;
/**
......@@ -30,25 +32,46 @@ import titan.ccp.model.records.ActivePowerRecord;
* ${workspace_loc:/uc1-application-samza/eclipseConsoleLogs.log} as Output File under Standard
* Input Output in Common in the Run Configuration Start via Eclipse Run.
*/
public class Uc1ApplicationBeam {
@SuppressWarnings({"unchecked", "rawtypes", "serial"})
public final class Uc1ApplicationBeam {
private static final Logger LOGGER = LoggerFactory.getLogger(Uc1ApplicationBeam.class);
private static final String BOOTSTRAP = "KAFKA_BOOTSTRAP_SERVERS";
private static final String INPUT = "INPUT";
private static final String SCHEMA_REGISTRY = "SCHEMA_REGISTRY_URL";
private static final String YES = "true";
private static final String USE_AVRO_READER = YES;
private static final String AUTO_COMMIT_CONFIG = YES;
/**
* Private constructor to avoid instantiation.
*/
private Uc1ApplicationBeam() {
throw new UnsupportedOperationException();
}
/**
* Main method.
*
*/
@SuppressWarnings({"unchecked", "rawtypes","unused"})
public static void main(final String[] args) {
// Set Configuration for Kafka
final String bootstrapServer =
System.getenv("KAFKA_BOOTSTRAP_SERVERS") != null ? System.getenv("KAFKA_BOOTSTRAP_SERVERS")
: "my-confluent-cp-kafka:9092";
final String inputTopic = System.getenv("INPUT") != null ? System.getenv("INPUT") : "input";
final String schemaRegistryURL =
System.getenv("SCHEMA_REGISTRY_URL") != null ? System.getenv("SCHEMA_REGISTRY_URL")
: "http://my-confluent-cp-schema-registry:8081";
System.getenv(BOOTSTRAP) == null ? "my-confluent-cp-kafka:9092"
: System.getenv(BOOTSTRAP);
final String inputTopic = System.getenv(INPUT) == null ? "input" : System.getenv(INPUT);
final String schemaRegistryUrl =
System.getenv(SCHEMA_REGISTRY) == null ? "http://my-confluent-cp-schema-registry:8081"
: System.getenv(SCHEMA_REGISTRY);
// Set consumer configuration for the schema registry and commits back to Kafka
final HashMap<String, Object> consumerConfig = new HashMap<>();
consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, AUTO_COMMIT_CONFIG);
consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerConfig.put("schema.registry.url", schemaRegistryURL);
consumerConfig.put("specific.avro.reader", "true");
consumerConfig.put("schema.registry.url", schemaRegistryUrl);
consumerConfig.put("specific.avro.reader", USE_AVRO_READER);
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "uc-application");
final LogKeyValue logKeyValue = new LogKeyValue();
final PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
options.setJobName("ucapplication");
......@@ -70,6 +93,7 @@ public class Uc1ApplicationBeam {
AvroCoder.of(ActivePowerRecord.class))
.withConsumerConfigUpdates(consumerConfig)
.withoutMetadata();
// Apply pipeline transformations
// Read from Kafka
pipeline.apply(kafka)
......@@ -77,7 +101,7 @@ public class Uc1ApplicationBeam {
.apply(MapElements
.via(
new SimpleFunction<KV<String, ActivePowerRecord>, KV<String, String>>() {
transient Gson gsonObj = new Gson();
private transient Gson gsonObj = new Gson();
@Override
public KV<String, String> apply(
......@@ -90,18 +114,25 @@ public class Uc1ApplicationBeam {
}
}))
// Print to console
.apply(ParDo.of(new DoFn<KV<String, String>, KV<String, String>>() {
@ProcessElement
public void processElement(@Element final KV<String, String> kv,
final OutputReceiver<KV<String, String>> out) {
System.out.println("Key: " + kv.getKey() + "Value: " + kv.getValue());
// out.output(kv);
}
}));
.apply(ParDo.of(logKeyValue));
// Submit job and start execution
pipeline.run().waitUntilFinish();
}
/**
* Logs all Key Value pairs.
*/
@SuppressWarnings({"unused"})
private static class LogKeyValue extends DoFn<KV<String, String>,KV<String, String>> {
private static final long serialVersionUID = 4328743;
@ProcessElement
public void processElement(@Element final KV<String, String> kv,
final OutputReceiver<KV<String, String>> out) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Key: " + kv.getKey() + "Value: " + kv.getValue());
}
}
}
}
......@@ -43,8 +43,6 @@ public final class Uc1ApplicationBeam {
private static final String USE_AVRO_READER = YES;
private static final String AUTO_COMMIT_CONFIG = YES;
/**
* Private constructor to avoid instantiation.
*/
......@@ -81,7 +79,7 @@ public final class Uc1ApplicationBeam {
// --samzaExecutionEnvironment=STANDALONE
// --maxSourceParallelism=1024
final LoggKeys logging = new LoggKeys();
final LogKeyValue logKeyValue = new LogKeyValue();
final PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
options.setJobName("ucapplication");
......@@ -124,16 +122,16 @@ public final class Uc1ApplicationBeam {
}
}))
// Print to console
.apply(ParDo.of(logging));
.apply(ParDo.of(logKeyValue));
// Start execution
pipeline.run().waitUntilFinish();
}
/**
* Logs all Keys it reads.
* Logs all Logs all Key Value pairs..
*/
@SuppressWarnings({"unused"})
private static class LoggKeys extends DoFn<KV<String, String>,KV<String, String>> {
private static class LogKeyValue extends DoFn<KV<String, String>,KV<String, String>> {
private static final long serialVersionUID = 4328743;
@ProcessElement
......@@ -144,7 +142,6 @@ public final class Uc1ApplicationBeam {
}
}
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment