diff --git a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaActivePowerTimestampReader.java b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaActivePowerTimestampReader.java new file mode 100644 index 0000000000000000000000000000000000000000..f5fe21f805646531c4c4070a5562089ed48193e6 --- /dev/null +++ b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaActivePowerTimestampReader.java @@ -0,0 +1,58 @@ +package theodolite.commons.beam.kafka; + +import io.confluent.kafka.serializers.KafkaAvroDeserializer; +import org.apache.beam.sdk.coders.AvroCoder; +import org.apache.beam.sdk.io.kafka.KafkaIO; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.kafka.common.serialization.StringDeserializer; +import titan.ccp.model.records.ActivePowerRecord; + +import java.util.Properties; + +/** + * Simple {@link PTransform} that read from Kafka using {@link KafkaIO}. + * Has additional a TimestampPolicy. + */ +public class KafkaActivePowerTimestampReader extends + PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> { + + private static final long serialVersionUID = 2603286150183186115L; + private final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> reader; + + + /** + * Instantiates a {@link PTransform} that reads from Kafka with the given Configuration. + */ + @SuppressWarnings({"unchecked", "rawtypes"}) + public KafkaActivePowerTimestampReader(final String bootstrapServer, final String inputTopic, + final Properties consumerConfig) { + super(); + + // Check if boostrap server and inputTopic are defined + if (bootstrapServer.isEmpty() || inputTopic.isEmpty()) { + throw new IllegalArgumentException("bootstrapServer or inputTopic missing"); + } + + reader = + KafkaIO.<String, ActivePowerRecord>read() + .withBootstrapServers(bootstrapServer) + .withTopic(inputTopic) + .withKeyDeserializer(StringDeserializer.class) + .withValueDeserializerAndCoder((Class) KafkaAvroDeserializer.class, + AvroCoder.of(ActivePowerRecord.class)) + .withConsumerConfigUpdates(consumerConfig) + // Set TimeStampPolicy for event time + .withTimestampPolicyFactory( + (tp, previousWaterMark) -> new EventTimePolicy(previousWaterMark)) + .withoutMetadata(); + } + + @Override + public PCollection<KV<String, ActivePowerRecord>> expand(final PBegin input) { + return input.apply(this.reader); + } + +} diff --git a/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/Uc3BeamFlink.java b/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/Uc3BeamFlink.java index 0f74437d0e8c0d0f54de071f6f70ad73cef7f171..18532b2655fcc6c24dad5f2fca87607c0b5d2e54 100644 --- a/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/Uc3BeamFlink.java +++ b/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/Uc3BeamFlink.java @@ -1,36 +1,7 @@ package application; -import com.google.common.math.Stats; -import com.google.common.math.StatsAccumulator; -import io.confluent.kafka.serializers.KafkaAvroDeserializer; -import java.util.HashMap; -import java.util.Map; import org.apache.beam.runners.flink.FlinkRunner; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.coders.AvroCoder; -import org.apache.beam.sdk.coders.CoderRegistry; -import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.coders.SerializableCoder; -import org.apache.beam.sdk.io.kafka.KafkaIO; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.transforms.Combine; -import org.apache.beam.sdk.transforms.MapElements; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.SimpleFunction; -import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; -import org.apache.beam.sdk.transforms.windowing.AfterWatermark; -import org.apache.beam.sdk.transforms.windowing.SlidingWindows; -import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PBegin; -import org.apache.beam.sdk.values.PCollection; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.joda.time.Duration; import theodolite.commons.beam.AbstractBeamService; -import titan.ccp.model.records.ActivePowerRecord; /** * Implementation of the use case Aggregation based on Time Attributes using Apache Beam with the @@ -56,9 +27,10 @@ public final class Uc3BeamFlink extends AbstractBeamService { */ public static void main(final String[] args) { - Uc3BeamFlink uc3BeamFlink = new Uc3BeamFlink(args); + final Uc3BeamFlink uc3BeamFlink = new Uc3BeamFlink(args); - Uc3BeamPipeline pipeline = new Uc3BeamPipeline(uc3BeamFlink.options, uc3BeamFlink.getConfig()); + final Uc3BeamPipeline pipeline = + new Uc3BeamPipeline(uc3BeamFlink.options, uc3BeamFlink.getConfig()); pipeline.run().waitUntilFinish(); } diff --git a/theodolite-benchmarks/uc3-beam/src/main/java/application/HourOfDayWithStats.java b/theodolite-benchmarks/uc3-beam/src/main/java/application/HourOfDayWithStats.java index b2d037419341cb7ca1956d8d6d458861ea4aa56e..46232b3f13601d77f6cb7b13ea0bcdc31290357a 100644 --- a/theodolite-benchmarks/uc3-beam/src/main/java/application/HourOfDayWithStats.java +++ b/theodolite-benchmarks/uc3-beam/src/main/java/application/HourOfDayWithStats.java @@ -5,9 +5,11 @@ import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.values.KV; /** - * + * {@link SimpleFunction} that transforms into the sensorId and the Value. */ -public class HourOfDayWithStats extends SimpleFunction<KV<HourOfDayKey, Stats>, KV<String, String>> { +public class HourOfDayWithStats extends + SimpleFunction<KV<HourOfDayKey, Stats>, KV<String, String>> { + private static final long serialVersionUID = -7411154345437422919L; private final HourOfDayKeyFactory keyFactory = new HourOfDayKeyFactory(); @Override diff --git a/theodolite-benchmarks/uc3-beam/src/main/java/application/Uc3BeamPipeline.java b/theodolite-benchmarks/uc3-beam/src/main/java/application/Uc3BeamPipeline.java index e5a190fd92776d2110f9b894678d4a11f0a381ed..504a6f286f26a96bfc403783b5183eb2949b1e79 100644 --- a/theodolite-benchmarks/uc3-beam/src/main/java/application/Uc3BeamPipeline.java +++ b/theodolite-benchmarks/uc3-beam/src/main/java/application/Uc3BeamPipeline.java @@ -2,30 +2,28 @@ package application; import com.google.common.math.Stats; import com.google.common.math.StatsAccumulator; -import io.confluent.kafka.serializers.KafkaAvroDeserializer; import java.util.Properties; import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.SerializableCoder; -import org.apache.beam.sdk.io.kafka.KafkaIO; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.windowing.*; +import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; +import org.apache.beam.sdk.transforms.windowing.AfterWatermark; +import org.apache.beam.sdk.transforms.windowing.SlidingWindows; +import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.POutput; import org.apache.commons.configuration2.Configuration; -import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.joda.time.Duration; import theodolite.commons.beam.AbstractPipeline; import theodolite.commons.beam.ConfigurationKeys; -import theodolite.commons.beam.kafka.EventTimePolicy; -import theodolite.commons.beam.kafka.KafkaActivePowerRecordReader; +import theodolite.commons.beam.kafka.KafkaActivePowerTimestampReader; import theodolite.commons.beam.kafka.KafkaWriterTransformation; import titan.ccp.model.records.ActivePowerRecord; @@ -55,7 +53,7 @@ public final class Uc3BeamPipeline extends AbstractPipeline { final int triggerInterval = Integer.parseInt( config.getString(ConfigurationKeys.TRIGGER_INTERVAL)); - final Duration triggerDelay = Duration.standardDays(aggregationAdvance); + final Duration triggerDelay = Duration.standardSeconds(triggerInterval); // Build kafka configuration final Properties consumerConfig = buildConsumerConfig(); @@ -64,43 +62,29 @@ public final class Uc3BeamPipeline extends AbstractPipeline { final CoderRegistry cr = this.getCoderRegistry(); registerCoders(cr); - // Read from Kafka - final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> - kafkaActivePowerRecordReader = - new KafkaActivePowerRecordReader(bootstrapServer, inputTopic, consumerConfig); - @SuppressWarnings({"rawtypes", "unchecked"}) - final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> kafka = - KafkaIO.<String, ActivePowerRecord>read() - .withBootstrapServers(bootstrapServer) - .withTopic(inputTopic) - .withKeyDeserializer(StringDeserializer.class) - .withValueDeserializerAndCoder((Class) KafkaAvroDeserializer.class, - AvroCoder.of(ActivePowerRecord.class)) - .withConsumerConfigUpdates(consumerConfig) - // Set TimeStampPolicy for event time - .withTimestampPolicyFactory( - (tp, previousWaterMark) -> new EventTimePolicy(previousWaterMark)) - .withoutMetadata(); - - - final StatsKeyFactory<HourOfDayKey> keyFactory = new HourOfDayKeyFactory(); + final KafkaActivePowerTimestampReader kafka = + new KafkaActivePowerTimestampReader(bootstrapServer, inputTopic, consumerConfig); + // Map the time format final MapTimeFormat mapTimeFormat = new MapTimeFormat(); + // get the stats per HourOfDay final HourOfDayWithStats hourOfDayWithStats = new HourOfDayWithStats(); // Write to Kafka + @SuppressWarnings({"rawtypes", "unchecked"}) final PTransform<PCollection<KV<String, String>>, POutput> kafkaWriter = new KafkaWriterTransformation(bootstrapServer, outputTopic, StringSerializer.class); this.apply(kafka) // Map to correct time format - .apply(MapElements.via(new MapTimeFormat())) + .apply(MapElements.via(mapTimeFormat)) // Apply a sliding window .apply(Window - .<KV<HourOfDayKey, ActivePowerRecord>>into(SlidingWindows.of(duration).every(aggregationAdvanceDuration)) + .<KV<HourOfDayKey, ActivePowerRecord>> + into(SlidingWindows.of(duration).every(aggregationAdvanceDuration)) .triggering(AfterWatermark.pastEndOfWindow() .withEarlyFirings( AfterProcessingTime.pastFirstElementInPane().plusDelayOf(triggerDelay))) @@ -122,6 +106,7 @@ public final class Uc3BeamPipeline extends AbstractPipeline { /** * Registers all Coders for all needed Coders. + * * @param cr CoderRegistry. */ private static void registerCoders(final CoderRegistry cr) {