diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java index 46fa53756fb028c2bccf86d544bd8430c9ef12d6..b146ce465cdb55faf632ddd09cf15d36575fbe4b 100644 --- a/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java +++ b/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java @@ -5,14 +5,10 @@ import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.MapElements; -import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PBegin; -import org.apache.beam.sdk.values.PCollection; import org.apache.commons.configuration2.Configuration; import theodolite.commons.beam.AbstractPipeline; -import theodolite.commons.beam.kafka.KafkaActivePowerRecordReader; +import theodolite.commons.beam.kafka.KafkaActivePowerTimestampReader; import titan.ccp.model.records.ActivePowerRecord; @@ -37,8 +33,8 @@ public final class Uc1BeamPipeline extends AbstractPipeline { final Map<String, Object> consumerConfig = buildConsumerConfig(); // Create Pipeline transformations - final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> kafka = - new KafkaActivePowerRecordReader(bootstrapServer, inputTopic, consumerConfig); + final KafkaActivePowerTimestampReader kafka = + new KafkaActivePowerTimestampReader(bootstrapServer, inputTopic, consumerConfig); final LogKeyValue logKeyValue = new LogKeyValue(); final MapToGson mapToGson = new MapToGson(); diff --git a/theodolite-benchmarks/uc2-beam/src/main/java/application/Uc2BeamPipeline.java b/theodolite-benchmarks/uc2-beam/src/main/java/application/Uc2BeamPipeline.java index 3b43dc47aaf6e3f9937aa87fca7fc1895c8fef84..e69ccce2981ace8f7c5ff48aae71ce0357dd19b2 100644 --- a/theodolite-benchmarks/uc2-beam/src/main/java/application/Uc2BeamPipeline.java +++ b/theodolite-benchmarks/uc2-beam/src/main/java/application/Uc2BeamPipeline.java @@ -11,18 +11,15 @@ import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.MapElements; -import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PBegin; -import org.apache.beam.sdk.values.PCollection; import org.apache.commons.configuration2.Configuration; import org.apache.kafka.common.serialization.StringSerializer; import org.joda.time.Duration; import theodolite.commons.beam.AbstractPipeline; import theodolite.commons.beam.ConfigurationKeys; -import theodolite.commons.beam.kafka.KafkaActivePowerRecordReader; +import theodolite.commons.beam.kafka.KafkaActivePowerTimestampReader; import theodolite.commons.beam.kafka.KafkaWriterTransformation; import titan.ccp.model.records.ActivePowerRecord; @@ -58,9 +55,9 @@ public final class Uc2BeamPipeline extends AbstractPipeline { // Read from Kafka - final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> + final KafkaActivePowerTimestampReader kafkaActivePowerRecordReader = - new KafkaActivePowerRecordReader(bootstrapServer, inputTopic, consumerConfig); + new KafkaActivePowerTimestampReader(bootstrapServer, inputTopic, consumerConfig); // Transform into String final StatsToString statsToString = new StatsToString();