Skip to content
Snippets Groups Projects
Commit ba916c0d authored by Lorenz Boguhn's avatar Lorenz Boguhn
Browse files

Change types of consumerConfig +

types of WriterTransformations uc1-3
parent 7aca9aa9
No related branches found
No related tags found
1 merge request!187Migrate Beam benchmark implementation
...@@ -34,7 +34,7 @@ public final class Uc1BeamPipeline extends AbstractPipeline { ...@@ -34,7 +34,7 @@ public final class Uc1BeamPipeline extends AbstractPipeline {
cr.registerCoderForClass(ActivePowerRecord.class, AvroCoder.of(ActivePowerRecord.SCHEMA$)); cr.registerCoderForClass(ActivePowerRecord.class, AvroCoder.of(ActivePowerRecord.SCHEMA$));
// build KafkaConsumerConfig // build KafkaConsumerConfig
final Map consumerConfig = buildConsumerConfig(); final Map<String, Object> consumerConfig = buildConsumerConfig();
// Create Pipeline transformations // Create Pipeline transformations
final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> kafka = final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> kafka =
......
...@@ -17,7 +17,6 @@ import org.apache.beam.sdk.transforms.windowing.Window; ...@@ -17,7 +17,6 @@ import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.POutput;
import org.apache.commons.configuration2.Configuration; import org.apache.commons.configuration2.Configuration;
import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.serialization.StringSerializer;
import org.joda.time.Duration; import org.joda.time.Duration;
...@@ -38,7 +37,7 @@ import titan.ccp.model.records.ActivePowerRecord; ...@@ -38,7 +37,7 @@ import titan.ccp.model.records.ActivePowerRecord;
*/ */
public final class Uc2BeamPipeline extends AbstractPipeline { public final class Uc2BeamPipeline extends AbstractPipeline {
protected Uc2BeamPipeline(final PipelineOptions options,final Configuration config) { protected Uc2BeamPipeline(final PipelineOptions options, final Configuration config) {
super(options, config); super(options, config);
// Additional needed variables // Additional needed variables
final String outputTopic = config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC); final String outputTopic = config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC);
...@@ -48,7 +47,7 @@ public final class Uc2BeamPipeline extends AbstractPipeline { ...@@ -48,7 +47,7 @@ public final class Uc2BeamPipeline extends AbstractPipeline {
final Duration duration = Duration.standardMinutes(windowDurationMinutes); final Duration duration = Duration.standardMinutes(windowDurationMinutes);
// Build kafka configuration // Build kafka configuration
final Map consumerConfig = buildConsumerConfig(); final Map<String, Object> consumerConfig = buildConsumerConfig();
// Set Coders for Classes that will be distributed // Set Coders for Classes that will be distributed
final CoderRegistry cr = this.getCoderRegistry(); final CoderRegistry cr = this.getCoderRegistry();
...@@ -67,8 +66,8 @@ public final class Uc2BeamPipeline extends AbstractPipeline { ...@@ -67,8 +66,8 @@ public final class Uc2BeamPipeline extends AbstractPipeline {
final StatsToString statsToString = new StatsToString(); final StatsToString statsToString = new StatsToString();
// Write to Kafka // Write to Kafka
final PTransform<PCollection<KV<String, String>>, POutput> kafkaWriter = final KafkaWriterTransformation<String> kafkaWriter =
new KafkaWriterTransformation(bootstrapServer, outputTopic, StringSerializer.class); new KafkaWriterTransformation<>(bootstrapServer, outputTopic, StringSerializer.class);
// Apply pipeline transformations // Apply pipeline transformations
this.apply(kafkaActivePowerRecordReader) this.apply(kafkaActivePowerRecordReader)
......
...@@ -10,14 +10,11 @@ import org.apache.beam.sdk.coders.SerializableCoder; ...@@ -10,14 +10,11 @@ import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark; import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows; import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.POutput;
import org.apache.commons.configuration2.Configuration; import org.apache.commons.configuration2.Configuration;
import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.serialization.StringSerializer;
import org.joda.time.Duration; import org.joda.time.Duration;
...@@ -63,7 +60,6 @@ public final class Uc3BeamPipeline extends AbstractPipeline { ...@@ -63,7 +60,6 @@ public final class Uc3BeamPipeline extends AbstractPipeline {
registerCoders(cr); registerCoders(cr);
// Read from Kafka // Read from Kafka
@SuppressWarnings({"rawtypes", "unchecked"})
final KafkaActivePowerTimestampReader kafka = final KafkaActivePowerTimestampReader kafka =
new KafkaActivePowerTimestampReader(bootstrapServer, inputTopic, consumerConfig); new KafkaActivePowerTimestampReader(bootstrapServer, inputTopic, consumerConfig);
...@@ -74,9 +70,8 @@ public final class Uc3BeamPipeline extends AbstractPipeline { ...@@ -74,9 +70,8 @@ public final class Uc3BeamPipeline extends AbstractPipeline {
final HourOfDayWithStats hourOfDayWithStats = new HourOfDayWithStats(); final HourOfDayWithStats hourOfDayWithStats = new HourOfDayWithStats();
// Write to Kafka // Write to Kafka
@SuppressWarnings({"rawtypes", "unchecked"}) final KafkaWriterTransformation<String> kafkaWriter =
final PTransform<PCollection<KV<String, String>>, POutput> kafkaWriter = new KafkaWriterTransformation<>(bootstrapServer, outputTopic, StringSerializer.class);
new KafkaWriterTransformation(bootstrapServer, outputTopic, StringSerializer.class);
this.apply(kafka) this.apply(kafka)
// Map to correct time format // Map to correct time format
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment