Skip to content
Snippets Groups Projects
Commit 149b6044 authored by Lorenz Boguhn's avatar Lorenz Boguhn
Browse files

Add TimestampPolicy to uc1-2

parent fae58895
No related branches found
No related tags found
1 merge request!187Migrate Beam benchmark implementation
...@@ -5,14 +5,10 @@ import org.apache.beam.sdk.coders.AvroCoder; ...@@ -5,14 +5,10 @@ import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.commons.configuration2.Configuration; import org.apache.commons.configuration2.Configuration;
import theodolite.commons.beam.AbstractPipeline; import theodolite.commons.beam.AbstractPipeline;
import theodolite.commons.beam.kafka.KafkaActivePowerRecordReader; import theodolite.commons.beam.kafka.KafkaActivePowerTimestampReader;
import titan.ccp.model.records.ActivePowerRecord; import titan.ccp.model.records.ActivePowerRecord;
...@@ -37,8 +33,8 @@ public final class Uc1BeamPipeline extends AbstractPipeline { ...@@ -37,8 +33,8 @@ public final class Uc1BeamPipeline extends AbstractPipeline {
final Map<String, Object> consumerConfig = buildConsumerConfig(); final Map<String, Object> consumerConfig = buildConsumerConfig();
// Create Pipeline transformations // Create Pipeline transformations
final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> kafka = final KafkaActivePowerTimestampReader kafka =
new KafkaActivePowerRecordReader(bootstrapServer, inputTopic, consumerConfig); new KafkaActivePowerTimestampReader(bootstrapServer, inputTopic, consumerConfig);
final LogKeyValue logKeyValue = new LogKeyValue(); final LogKeyValue logKeyValue = new LogKeyValue();
final MapToGson mapToGson = new MapToGson(); final MapToGson mapToGson = new MapToGson();
......
...@@ -11,18 +11,15 @@ import org.apache.beam.sdk.coders.StringUtf8Coder; ...@@ -11,18 +11,15 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.commons.configuration2.Configuration; import org.apache.commons.configuration2.Configuration;
import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.serialization.StringSerializer;
import org.joda.time.Duration; import org.joda.time.Duration;
import theodolite.commons.beam.AbstractPipeline; import theodolite.commons.beam.AbstractPipeline;
import theodolite.commons.beam.ConfigurationKeys; import theodolite.commons.beam.ConfigurationKeys;
import theodolite.commons.beam.kafka.KafkaActivePowerRecordReader; import theodolite.commons.beam.kafka.KafkaActivePowerTimestampReader;
import theodolite.commons.beam.kafka.KafkaWriterTransformation; import theodolite.commons.beam.kafka.KafkaWriterTransformation;
import titan.ccp.model.records.ActivePowerRecord; import titan.ccp.model.records.ActivePowerRecord;
...@@ -58,9 +55,9 @@ public final class Uc2BeamPipeline extends AbstractPipeline { ...@@ -58,9 +55,9 @@ public final class Uc2BeamPipeline extends AbstractPipeline {
// Read from Kafka // Read from Kafka
final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> final KafkaActivePowerTimestampReader
kafkaActivePowerRecordReader = kafkaActivePowerRecordReader =
new KafkaActivePowerRecordReader(bootstrapServer, inputTopic, consumerConfig); new KafkaActivePowerTimestampReader(bootstrapServer, inputTopic, consumerConfig);
// Transform into String // Transform into String
final StatsToString statsToString = new StatsToString(); final StatsToString statsToString = new StatsToString();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment