Skip to content
Snippets Groups Projects

Migrate Beam benchmark implementation

2 files
+ 56
28
Compare changes
  • Side-by-side
  • Inline
Files
2
@@ -20,6 +20,8 @@ import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import titan.ccp.model.records.ActivePowerRecord;
/**
@@ -30,25 +32,46 @@ import titan.ccp.model.records.ActivePowerRecord;
* ${workspace_loc:/uc1-application-samza/eclipseConsoleLogs.log} as Output File under Standard
* Input Output in Common in the Run Configuration Start via Eclipse Run.
*/
public class Uc1ApplicationBeam {
@SuppressWarnings({"unchecked", "rawtypes", "serial"})
public final class Uc1ApplicationBeam {
private static final Logger LOGGER = LoggerFactory.getLogger(Uc1ApplicationBeam.class);
private static final String BOOTSTRAP = "KAFKA_BOOTSTRAP_SERVERS";
private static final String INPUT = "INPUT";
private static final String SCHEMA_REGISTRY = "SCHEMA_REGISTRY_URL";
private static final String YES = "true";
private static final String USE_AVRO_READER = YES;
private static final String AUTO_COMMIT_CONFIG = YES;
/**
* Private constructor to avoid instantiation.
*/
private Uc1ApplicationBeam() {
throw new UnsupportedOperationException();
}
/**
* Main method.
*
*/
@SuppressWarnings({"unchecked", "rawtypes","unused"})
public static void main(final String[] args) {
// Set Configuration for Kafka
final String bootstrapServer =
System.getenv("KAFKA_BOOTSTRAP_SERVERS") != null ? System.getenv("KAFKA_BOOTSTRAP_SERVERS")
: "my-confluent-cp-kafka:9092";
final String inputTopic = System.getenv("INPUT") != null ? System.getenv("INPUT") : "input";
final String schemaRegistryURL =
System.getenv("SCHEMA_REGISTRY_URL") != null ? System.getenv("SCHEMA_REGISTRY_URL")
: "http://my-confluent-cp-schema-registry:8081";
System.getenv(BOOTSTRAP) == null ? "my-confluent-cp-kafka:9092"
: System.getenv(BOOTSTRAP);
final String inputTopic = System.getenv(INPUT) == null ? "input" : System.getenv(INPUT);
final String schemaRegistryUrl =
System.getenv(SCHEMA_REGISTRY) == null ? "http://my-confluent-cp-schema-registry:8081"
: System.getenv(SCHEMA_REGISTRY);
// Set consumer configuration for the schema registry and commits back to Kafka
final HashMap<String, Object> consumerConfig = new HashMap<>();
consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, AUTO_COMMIT_CONFIG);
consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerConfig.put("schema.registry.url", schemaRegistryURL);
consumerConfig.put("specific.avro.reader", "true");
consumerConfig.put("schema.registry.url", schemaRegistryUrl);
consumerConfig.put("specific.avro.reader", USE_AVRO_READER);
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "uc-application");
final LogKeyValue logKeyValue = new LogKeyValue();
final PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
options.setJobName("ucapplication");
@@ -70,6 +93,7 @@ public class Uc1ApplicationBeam {
AvroCoder.of(ActivePowerRecord.class))
.withConsumerConfigUpdates(consumerConfig)
.withoutMetadata();
// Apply pipeline transformations
// Read from Kafka
pipeline.apply(kafka)
@@ -77,7 +101,7 @@ public class Uc1ApplicationBeam {
.apply(MapElements
.via(
new SimpleFunction<KV<String, ActivePowerRecord>, KV<String, String>>() {
transient Gson gsonObj = new Gson();
private transient Gson gsonObj = new Gson();
@Override
public KV<String, String> apply(
@@ -90,18 +114,25 @@ public class Uc1ApplicationBeam {
}
}))
// Print to console
.apply(ParDo.of(new DoFn<KV<String, String>, KV<String, String>>() {
@ProcessElement
public void processElement(@Element final KV<String, String> kv,
final OutputReceiver<KV<String, String>> out) {
System.out.println("Key: " + kv.getKey() + "Value: " + kv.getValue());
// out.output(kv);
}
}));
.apply(ParDo.of(logKeyValue));
// Submit job and start execution
pipeline.run().waitUntilFinish();
}
/**
* Logs all Key Value pairs.
*/
@SuppressWarnings({"unused"})
private static class LogKeyValue extends DoFn<KV<String, String>,KV<String, String>> {
private static final long serialVersionUID = 4328743;
@ProcessElement
public void processElement(@Element final KV<String, String> kv,
final OutputReceiver<KV<String, String>> out) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Key: " + kv.getKey() + "Value: " + kv.getValue());
}
}
}
}
Loading