diff --git a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaGenericReader.java b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaGenericReader.java new file mode 100644 index 0000000000000000000000000000000000000000..3ec03cbcc56a022177af18ed48dff128b11ca098 --- /dev/null +++ b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaGenericReader.java @@ -0,0 +1,56 @@ +package theodolite.commons.beam.kafka; + +import java.util.Map; +import org.apache.beam.sdk.io.kafka.KafkaIO; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; + +/** + * Simple {@link PTransform} that read from Kafka using {@link KafkaIO}. + * + * @param <K> Type of the Key. + * @param <V> Type of the Value. + */ +public class KafkaGenericReader<K, V> extends + PTransform<PBegin, PCollection<KV<K, V>>> { + + private static final long serialVersionUID = 2603286150183186115L; + private final PTransform<PBegin, PCollection<KV<K, V>>> reader; + + /** + * Instantiates a {@link PTransform} that reads from Kafka with the given Configuration. + */ + @SuppressWarnings({"unchecked", "rawtypes"}) + public KafkaGenericReader(final String bootstrapServer, final String inputTopic, + final Map consumerConfig, + final Class<? extends + org.apache.kafka.common.serialization.Deserializer<K>> + keyDeserializer, + final Class<? extends + org.apache.kafka.common.serialization.Deserializer<V>> + valueDeserializer) { + super(); + + // Check if boostrap server and inputTopic are defined + if (bootstrapServer.isEmpty() || inputTopic.isEmpty()) { + throw new IllegalArgumentException("bootstrapServer or inputTopic missing"); + } + + reader = + KafkaIO.<K, V>read() + .withBootstrapServers(bootstrapServer) + .withTopic(inputTopic) + .withKeyDeserializer(keyDeserializer) + .withValueDeserializer(valueDeserializer) + .withConsumerConfigUpdates(consumerConfig) + .withoutMetadata(); + } + + @Override + public PCollection<KV<K, V>> expand(final PBegin input) { + return input.apply(this.reader); + } + +} diff --git a/theodolite-benchmarks/uc4-beam/src/main/java/application/Uc4BeamPipeline.java b/theodolite-benchmarks/uc4-beam/src/main/java/application/Uc4BeamPipeline.java index 7d03a3e00996f11a00f6b73d440ad4d7ed819de4..78083db125f93a6e48878d07a293aa3a227f122d 100644 --- a/theodolite-benchmarks/uc4-beam/src/main/java/application/Uc4BeamPipeline.java +++ b/theodolite-benchmarks/uc4-beam/src/main/java/application/Uc4BeamPipeline.java @@ -40,6 +40,7 @@ import serialization.SensorParentKeyCoder; import theodolite.commons.beam.AbstractPipeline; import theodolite.commons.beam.ConfigurationKeys; import theodolite.commons.beam.kafka.KafkaActivePowerTimestampReader; +import theodolite.commons.beam.kafka.KafkaGenericReader; import theodolite.commons.beam.kafka.KafkaWriterTransformation; import titan.ccp.configuration.events.Event; import titan.ccp.model.records.ActivePowerRecord; @@ -84,10 +85,18 @@ public final class Uc4BeamPipeline extends AbstractPipeline { registerCoders(cr); // Read from Kafka + // ActivePowerRecords final KafkaActivePowerTimestampReader kafkaActivePowerRecordReader = new KafkaActivePowerTimestampReader(bootstrapServer, inputTopic, consumerConfig); + //Configuration Events + final KafkaGenericReader<Event, String> + kafkaConfigurationReader = + new KafkaGenericReader<>( + bootstrapServer, configurationTopic, configurationConfig, + EventDeserializer.class, StringDeserializer.class); + // Transform into AggregatedActivePowerRecords into ActivePowerRecords final AggregatedToActive aggregatedToActive = new AggregatedToActive(); @@ -100,11 +109,9 @@ public final class Uc4BeamPipeline extends AbstractPipeline { new KafkaWriterTransformation<>( bootstrapServer, feedbackTopic, AggregatedActivePowerRecordSerializer.class); - // Apply pipeline transformations - // Read from Kafka final PCollection<KV<String, ActivePowerRecord>> values = this - .apply(kafkaActivePowerRecordReader) + .apply("Read from Kafka", kafkaActivePowerRecordReader) .apply("Read Windows", Window.into(FixedWindows.of(duration))) .apply("Set trigger for input", Window .<KV<String, ActivePowerRecord>>configure() @@ -148,13 +155,7 @@ public final class Uc4BeamPipeline extends AbstractPipeline { // Build the configuration stream from a changelog. final PCollection<KV<String, Set<String>>> configurationStream = this - .apply("Read sensor groups", KafkaIO.<Event, String>read() - .withBootstrapServers(bootstrapServer) - .withTopic(configurationTopic) - .withKeyDeserializer(EventDeserializer.class) - .withValueDeserializer(StringDeserializer.class) - .withConsumerConfigUpdates(configurationConfig) - .withoutMetadata()) + .apply("Read sensor groups", kafkaConfigurationReader) // Only forward relevant changes in the hierarchy .apply("Filter changed and status events", Filter.by(new FilterEvents()))