diff --git a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractPipeline.java b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractPipeline.java index ba9d0f96e7c3b85ad1be0519f78068a645ae42f9..56e1239f6ebfde69a9482cf50331acea1ad300d5 100644 --- a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractPipeline.java +++ b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractPipeline.java @@ -1,7 +1,6 @@ package theodolite.commons.beam; import java.util.HashMap; -import java.util.Properties; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.commons.configuration2.Configuration; @@ -31,8 +30,8 @@ public class AbstractPipeline extends Pipeline { * * @return the build configuration. */ - public HashMap buildConsumerConfig() { - final HashMap consumerConfig = new HashMap(); + public HashMap<String, Object> buildConsumerConfig() { + final HashMap<String, Object> consumerConfig = new HashMap<>(); consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, config.getString(ConfigurationKeys.ENABLE_AUTO_COMMIT_CONFIG)); consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, diff --git a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaWriterTransformation.java b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaWriterTransformation.java index 1bdcd3207f1b4edf31f083aa388421ca4812e3fd..0a3867e71479e36ce30a9f222dfd0a7d473bd209 100644 --- a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaWriterTransformation.java +++ b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaWriterTransformation.java @@ -13,7 +13,7 @@ import org.apache.kafka.common.serialization.StringSerializer; * where the value type can be generic. * @param <T> type of the value. */ -public class KafkaWriterTransformation<T extends Serializer> extends +public class KafkaWriterTransformation<T> extends PTransform<PCollection<KV<String, T>>, PDone> { private static final long serialVersionUID = 3171423303843174723L; diff --git a/theodolite-benchmarks/uc4-beam/src/main/java/application/Uc4BeamPipeline.java b/theodolite-benchmarks/uc4-beam/src/main/java/application/Uc4BeamPipeline.java index 4ef9a0de840ef8b11217dc593507590449d77192..cf3eb22d4c712922ce716cd1180f085e6f5e9c51 100644 --- a/theodolite-benchmarks/uc4-beam/src/main/java/application/Uc4BeamPipeline.java +++ b/theodolite-benchmarks/uc4-beam/src/main/java/application/Uc4BeamPipeline.java @@ -1,15 +1,13 @@ package application; - import com.google.common.math.StatsAccumulator; - import java.util.HashMap; import java.util.Map; import java.util.Set; - -import io.confluent.kafka.serializers.KafkaAvroDeserializer; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.coders.*; +import org.apache.beam.sdk.coders.AvroCoder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.coders.SetCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.kafka.KafkaIO; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.*; @@ -18,19 +16,16 @@ import org.apache.beam.sdk.values.*; import org.apache.commons.configuration2.Configuration; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; import org.joda.time.Duration; import serialization.*; import theodolite.commons.beam.AbstractPipeline; import theodolite.commons.beam.ConfigurationKeys; -import theodolite.commons.beam.kafka.KafkaActivePowerRecordReader; -import theodolite.commons.beam.kafka.KafkaGenericReader; +import theodolite.commons.beam.kafka.KafkaActivePowerTimestampReader; import theodolite.commons.beam.kafka.KafkaWriterTransformation; import titan.ccp.configuration.events.Event; import titan.ccp.model.records.ActivePowerRecord; import titan.ccp.model.records.AggregatedActivePowerRecord; - /** * Implementation of the use case Database Storage using Apache Beam with the Flink Runner. To * execute locally in standalone start Kafka, Zookeeper, the schema-registry and the workload @@ -62,7 +57,7 @@ public final class Uc4BeamPipeline extends AbstractPipeline { final Duration gracePeriod = Duration.standardSeconds(grace); // Build kafka configuration - final HashMap consumerConfig = buildConsumerConfig(); + final HashMap<String, Object> consumerConfig = buildConsumerConfig(); final HashMap<String, Object> configurationConfig = configurationConfig(config); // Set Coders for Classes that will be distributed @@ -70,36 +65,35 @@ public final class Uc4BeamPipeline extends AbstractPipeline { registerCoders(cr); // Read from Kafka - final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> + final KafkaActivePowerTimestampReader kafkaActivePowerRecordReader = - new KafkaActivePowerRecordReader(bootstrapServer, inputTopic, consumerConfig); - -// final PTransform<PBegin, PCollection<KV<String, AggregatedActivePowerRecord>>> -// kafkaAggregatedPowerRecordReader = -// new KafkaGenericReader<String, AggregatedActivePowerRecord> -// (bootstrapServer, feedbackTopic, configurationConfig, StringDeserializer.class, -// (Class<KafkaAvroDeserializer>) KafkaAvroDeserializer.class); + new KafkaActivePowerTimestampReader(bootstrapServer, inputTopic, consumerConfig); // Transform into AggregatedActivePowerRecords into ActivePowerRecords final AggregatedToActive aggregatedToActive = new AggregatedToActive(); // Write to Kafka - final KafkaWriterTransformation kafkaWriter = - new KafkaWriterTransformation(bootstrapServer, outputTopic, StringSerializer.class); + final KafkaWriterTransformation<AggregatedActivePowerRecord> kafkaOutput = + new KafkaWriterTransformation<>( + bootstrapServer, outputTopic, AggregatedActivePowerRecordSerializer.class); + + final KafkaWriterTransformation<AggregatedActivePowerRecord> kafkaFeedback = + new KafkaWriterTransformation<>( + bootstrapServer, feedbackTopic, AggregatedActivePowerRecordSerializer.class); // Apply pipeline transformations // Read from Kafka final PCollection<KV<String, ActivePowerRecord>> values = this .apply(kafkaActivePowerRecordReader) - .apply("Read Windows", Window.into(FixedWindows.of(duration))) - .apply("Set trigger for input", Window - .<KV<String, ActivePowerRecord>>configure() - .triggering(Repeatedly.forever( - AfterProcessingTime.pastFirstElementInPane() - .plusDelayOf(triggerDelay))) - .withAllowedLateness(gracePeriod) - .discardingFiredPanes()); + .apply("Read Windows", Window.into(FixedWindows.of(duration))) + .apply("Set trigger for input", Window + .<KV<String, ActivePowerRecord>>configure() + .triggering(Repeatedly.forever( + AfterProcessingTime.pastFirstElementInPane() + .plusDelayOf(triggerDelay))) + .withAllowedLateness(gracePeriod) + .discardingFiredPanes()); // Read the results of earlier aggregations. final PCollection<KV<String, ActivePowerRecord>> aggregationsInput = this @@ -179,8 +173,8 @@ public final class Uc4BeamPipeline extends AbstractPipeline { .apply("Filter out null values", Filter.by(filterNullValues)); - SetIdForAggregated setIdForAggregated = new SetIdForAggregated(); - SetKeyToGroup setKeyToGroup = new SetKeyToGroup(); + final SetIdForAggregated setIdForAggregated = new SetIdForAggregated(); + final SetKeyToGroup setKeyToGroup = new SetKeyToGroup(); // Aggregate for every sensor group of the current level final PCollection<KV<String, AggregatedActivePowerRecord>> @@ -200,19 +194,10 @@ public final class Uc4BeamPipeline extends AbstractPipeline { .apply("Set the Identifier in AggregatedActivePowerRecord", MapElements.via(setIdForAggregated)); - aggregations.apply("Write to aggregation results", - KafkaIO.<String, AggregatedActivePowerRecord>write() - .withBootstrapServers(bootstrapServer) - .withTopic(outputTopic) - .withKeySerializer(StringSerializer.class) - .withValueSerializer(AggregatedActivePowerRecordSerializer.class)); + aggregations.apply("Write to aggregation results", kafkaOutput); aggregations - .apply("Write to feedback topic", KafkaIO.<String, AggregatedActivePowerRecord>write() - .withBootstrapServers(bootstrapServer) - .withTopic(feedbackTopic) - .withKeySerializer(StringSerializer.class) - .withValueSerializer(AggregatedActivePowerRecordSerializer.class)); + .apply("Write to feedback topic", kafkaFeedback); } @@ -222,7 +207,7 @@ public final class Uc4BeamPipeline extends AbstractPipeline { * * @return the build configuration. */ - public HashMap<String, Object> configurationConfig(Configuration config) { + public HashMap<String, Object> configurationConfig(final Configuration config) { final HashMap<String, Object> consumerConfig = new HashMap<>(); consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, config.getString(ConfigurationKeys.ENABLE_AUTO_COMMIT_CONFIG)); @@ -230,7 +215,6 @@ public final class Uc4BeamPipeline extends AbstractPipeline { config .getString(ConfigurationKeys.AUTO_OFFSET_RESET_CONFIG)); -// final String applicationName = config.getString( ConfigurationKeys.APPLICATION_NAME) + "-configuration"; consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, config .getString(ConfigurationKeys.APPLICATION_NAME) + "-configuration"); return consumerConfig; @@ -244,7 +228,7 @@ public final class Uc4BeamPipeline extends AbstractPipeline { */ private static void registerCoders(final CoderRegistry cr) { cr.registerCoderForClass(ActivePowerRecord.class, - AvroCoder.of(ActivePowerRecord.class)); + AvroCoder.of(ActivePowerRecord.class)); cr.registerCoderForClass(AggregatedActivePowerRecord.class, new AggregatedActivePowerRecordCoder()); cr.registerCoderForClass(Set.class, SetCoder.of(StringUtf8Coder.of()));