diff --git a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/EventTimePolicy.java b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/EventTimePolicy.java index 85f21b8dca4947e2babfa46b7881358ddaa23714..e568968670b3ea51388f3e2a19da8f64bd7c5391 100644 --- a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/EventTimePolicy.java +++ b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/EventTimePolicy.java @@ -22,7 +22,7 @@ public class EventTimePolicy @Override public Instant getTimestampForRecord(final PartitionContext ctx, - final KafkaRecord<String, titan.ccp.model.records.ActivePowerRecord> record) { + final KafkaRecord<String, ActivePowerRecord> record) { this.currentWatermark = new Instant(record.getKV().getValue().getTimestamp()); return this.currentWatermark; } diff --git a/theodolite-benchmarks/uc1-beam-flink/src/main/java/application/Uc1BeamFlink.java b/theodolite-benchmarks/uc1-beam-flink/src/main/java/application/Uc1BeamFlink.java index 62800cccb23769b7cccdf04a90ef9118c4182db6..3dc1a7568065deee1621dee9dc1a2606f3fa4ff3 100644 --- a/theodolite-benchmarks/uc1-beam-flink/src/main/java/application/Uc1BeamFlink.java +++ b/theodolite-benchmarks/uc1-beam-flink/src/main/java/application/Uc1BeamFlink.java @@ -31,7 +31,7 @@ public final class Uc1BeamFlink extends AbstractBeamService { final Uc1BeamFlink uc1 = new Uc1BeamFlink(args); // Create pipeline with configurations - Uc1BeamPipeline pipeline = new Uc1BeamPipeline(uc1.options, uc1.getConfig()); + final Uc1BeamPipeline pipeline = new Uc1BeamPipeline(uc1.options, uc1.getConfig()); // Submit job and start execution pipeline.run().waitUntilFinish(); diff --git a/theodolite-benchmarks/uc1-beam-samza/src/main/java/application/Uc1BeamSamza.java b/theodolite-benchmarks/uc1-beam-samza/src/main/java/application/Uc1BeamSamza.java index 75bedead7b0b202c80fa27bbf0b4557a3ebeeb5e..d7b8d5806282e97c67bf94c4e6bcc8cae27e8250 100644 --- a/theodolite-benchmarks/uc1-beam-samza/src/main/java/application/Uc1BeamSamza.java +++ b/theodolite-benchmarks/uc1-beam-samza/src/main/java/application/Uc1BeamSamza.java @@ -1,8 +1,6 @@ package application; import org.apache.beam.runners.samza.SamzaRunner; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import theodolite.commons.beam.AbstractBeamService; /** @@ -35,7 +33,7 @@ public final class Uc1BeamSamza extends AbstractBeamService { final Uc1BeamSamza uc1 = new Uc1BeamSamza(args); // Create pipeline with configurations - Uc1BeamPipeline pipeline = new Uc1BeamPipeline(uc1.options, uc1.getConfig()); + final Uc1BeamPipeline pipeline = new Uc1BeamPipeline(uc1.options, uc1.getConfig()); // Submit job and start execution pipeline.run().waitUntilFinish(); diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/LogKeyValue.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/LogKeyValue.java index e791d136294eb902e28f03063797901d63e4971f..0cf4495325a417fa0369ae2c59bb071de250df4a 100644 --- a/theodolite-benchmarks/uc1-beam/src/main/java/application/LogKeyValue.java +++ b/theodolite-benchmarks/uc1-beam/src/main/java/application/LogKeyValue.java @@ -13,6 +13,9 @@ public class LogKeyValue extends DoFn<KV<String, String>, KV<String, String>> { private static final long serialVersionUID = 4328743; private static final Logger LOGGER = LoggerFactory.getLogger(LogKeyValue.class); + /** + * Logs all key value pairs it processes. + */ @ProcessElement public void processElement(@Element final KV<String, String> kv, final OutputReceiver<KV<String, String>> out) { diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/MapToGson.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/MapToGson.java index c83fe2a3da9473e7750346d6f84fb12fb6432796..6b0c6bc4ddfe78c22028da5b8cf7dde7ed57fced 100644 --- a/theodolite-benchmarks/uc1-beam/src/main/java/application/MapToGson.java +++ b/theodolite-benchmarks/uc1-beam/src/main/java/application/MapToGson.java @@ -5,6 +5,9 @@ import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.values.KV; import titan.ccp.model.records.ActivePowerRecord; +/** + * Converts a Map into a json String. + */ public class MapToGson extends SimpleFunction<KV<String, ActivePowerRecord>, KV<String, String>> { private static final long serialVersionUID = 7168356203579050214L; private transient Gson gsonObj = new Gson(); @@ -12,9 +15,11 @@ public class MapToGson extends SimpleFunction<KV<String, ActivePowerRecord>, KV< @Override public KV<String, String> apply( final KV<String, ActivePowerRecord> kv) { + if (this.gsonObj == null) { this.gsonObj = new Gson(); } + final String gson = this.gsonObj.toJson(kv.getValue()); return KV.of(kv.getKey(), gson); } diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java index 992eda1612a2e3a574c6c37f7c0c6de934291348..024822080127a114955617502c44d0db46a6e366 100644 --- a/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java +++ b/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java @@ -1,8 +1,6 @@ package application; -import java.util.HashMap; import java.util.Map; -import java.util.Properties; import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.options.PipelineOptions; diff --git a/theodolite-benchmarks/uc2-beam/src/main/java/application/Uc2BeamPipeline.java b/theodolite-benchmarks/uc2-beam/src/main/java/application/Uc2BeamPipeline.java index a0ba5cb088c74d20c4b733d3ac2537e3fad2b127..f518ba8a794d33d7b5569d4a648eeadbc47a1e7b 100644 --- a/theodolite-benchmarks/uc2-beam/src/main/java/application/Uc2BeamPipeline.java +++ b/theodolite-benchmarks/uc2-beam/src/main/java/application/Uc2BeamPipeline.java @@ -2,10 +2,7 @@ package application; import com.google.common.math.Stats; import com.google.common.math.StatsAccumulator; - -import java.util.HashMap; import java.util.Map; -import java.util.Properties; import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.coders.KvCoder; diff --git a/theodolite-benchmarks/uc3-beam/src/main/java/application/Uc3BeamPipeline.java b/theodolite-benchmarks/uc3-beam/src/main/java/application/Uc3BeamPipeline.java index 088664aef8545eecf849d53f4c96dc04f8258631..eda0d8161750d49998b66640546b0e24ccb6256b 100644 --- a/theodolite-benchmarks/uc3-beam/src/main/java/application/Uc3BeamPipeline.java +++ b/theodolite-benchmarks/uc3-beam/src/main/java/application/Uc3BeamPipeline.java @@ -2,10 +2,7 @@ package application; import com.google.common.math.Stats; import com.google.common.math.StatsAccumulator; - -import java.util.HashMap; import java.util.Map; -import java.util.Properties; import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.coders.KvCoder; diff --git a/theodolite-benchmarks/uc4-beam-flink/src/main/java/application/Uc4ApplicationBeamNoFeedback.java b/theodolite-benchmarks/uc4-beam-flink/src/main/java/application/Uc4ApplicationBeamNoFeedback.java deleted file mode 100644 index 12558ade914c8d592db8bfc94fe748eb9979ff91..0000000000000000000000000000000000000000 --- a/theodolite-benchmarks/uc4-beam-flink/src/main/java/application/Uc4ApplicationBeamNoFeedback.java +++ /dev/null @@ -1,280 +0,0 @@ -package application; - -import com.google.common.base.MoreObjects; -import com.google.common.math.StatsAccumulator; -import io.confluent.kafka.serializers.KafkaAvroDeserializer; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; -import org.apache.beam.runners.direct.DirectOptions; -import org.apache.beam.runners.direct.DirectRunner; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.coders.AvroCoder; -import org.apache.beam.sdk.coders.CoderRegistry; -import org.apache.beam.sdk.coders.NullableCoder; -import org.apache.beam.sdk.coders.SetCoder; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.io.kafka.KafkaIO; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.state.StateSpec; -import org.apache.beam.sdk.state.StateSpecs; -import org.apache.beam.sdk.state.ValueState; -import org.apache.beam.sdk.transforms.Combine; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.Latest; -import org.apache.beam.sdk.transforms.MapElements; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.SimpleFunction; -import org.apache.beam.sdk.transforms.View; -import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; -import org.apache.beam.sdk.transforms.windowing.FixedWindows; -import org.apache.beam.sdk.transforms.windowing.Repeatedly; -import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PBegin; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.joda.time.Duration; -import serialization.AggregatedActivePowerRecordCoder; -import serialization.AggregatedActivePowerRecordSerializer; -import serialization.EventCoder; -import serialization.EventDeserializer; -import serialization.SensorParentKeyCoder; -import theodolite.commons.beam.kafka.EventTimePolicy; -import titan.ccp.configuration.events.Event; -import titan.ccp.model.records.ActivePowerRecord; -import titan.ccp.model.records.AggregatedActivePowerRecord; - -public class Uc4ApplicationBeamNoFeedback { - - @SuppressWarnings({"serial", "unchecked", "rawtypes"}) - public static void main(final String[] args) { - - final String inputTopic = "input"; - final String outputTopic = "output"; - final String bootstrapServer = "localhost:9092"; - final String configurationTopic = "configuration"; - final String schemaRegistryURL = "http://localhost:8081"; - final Duration duration = Duration.standardSeconds(15); - - // Set consumer configuration for the schema registry and commits back to Kafka - final HashMap<String, Object> consumerConfig = new HashMap<>(); - consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); - consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - consumerConfig.put("schema.registry.url", schemaRegistryURL); - consumerConfig.put("specific.avro.reader", "true"); - consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "uc-application-input"); - - final HashMap<String, Object> consumerConfigConfiguration = new HashMap<>(); - consumerConfigConfiguration.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); - consumerConfigConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - consumerConfigConfiguration.put(ConsumerConfig.GROUP_ID_CONFIG, "uc-application-configuration"); - - - - final DirectOptions options = - PipelineOptionsFactory.fromArgs(args).create().as(DirectOptions.class); - options.setRunner(DirectRunner.class); - options.setJobName("ucapplication"); - options.setTargetParallelism(1); - final Pipeline pipeline = Pipeline.create(options); - final CoderRegistry cr = pipeline.getCoderRegistry(); - - // Set Coders for Classes that will be distributed - cr.registerCoderForClass(ActivePowerRecord.class, - NullableCoder.of(AvroCoder.of(ActivePowerRecord.class))); - cr.registerCoderForClass(AggregatedActivePowerRecord.class, - new AggregatedActivePowerRecordCoder()); - cr.registerCoderForClass(Set.class, SetCoder.of(StringUtf8Coder.of())); - cr.registerCoderForClass(Event.class, new EventCoder()); - // SensorRegistry - cr.registerCoderForClass(SensorParentKey.class, new SensorParentKeyCoder()); - cr.registerCoderForClass(StatsAccumulator.class, AvroCoder.of(StatsAccumulator.class)); - - - final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> kafka = - KafkaIO.<String, ActivePowerRecord>read() - .withBootstrapServers(bootstrapServer) - .withTopic(inputTopic) - .withKeyDeserializer(StringDeserializer.class) - .withValueDeserializerAndCoder((Class) KafkaAvroDeserializer.class, - NullableCoder.of(AvroCoder.of(ActivePowerRecord.class))) - .withConsumerConfigUpdates(consumerConfig) - // Set TimeStampPolicy for event time - .withTimestampPolicyFactory( - (tp, previousWaterMark) -> new EventTimePolicy(previousWaterMark)) - - .commitOffsetsInFinalize() - .withoutMetadata(); - // Apply pipeline transformations - // Read from Kafka - final PCollection<KV<String, ActivePowerRecord>> values = pipeline.apply(kafka) - .apply("Apply Winddows", Window.into(FixedWindows.of(duration))); - - - // Build the configuration stream from a changelog. - final PCollection<KV<String, Set<String>>> configurationStream = pipeline - .apply("Read sensor groups", KafkaIO.<Event, String>read() - .withBootstrapServers(bootstrapServer) - .withTopic(configurationTopic) - .withKeyDeserializer(EventDeserializer.class) - .withValueDeserializer(StringDeserializer.class) - .withConsumerConfigUpdates(consumerConfigConfiguration) - .commitOffsetsInFinalize() - .withoutMetadata()) - - .apply("Generate Parents for every Sensor", ParDo.of(new GenerateParentsFn())) - .apply("Update child and parent pairs", ParDo.of(new UpdateChildParentPairs())) - .apply("Set trigger for configurations", Window - .<KV<String, Set<String>>>configure() - .triggering(Repeatedly.forever( - AfterProcessingTime.pastFirstElementInPane() - .plusDelayOf(Duration.standardSeconds(5)))) - .accumulatingFiredPanes()); - // This may need to be changed to eliminate duplicates in first iteration - - - final PCollectionView<Map<String, Set<String>>> childParentPairMap = - configurationStream.apply(Latest.perKey()) - .apply(View.asMap()); - - - final PCollection<KV<SensorParentKey, ActivePowerRecord>> flatMappedValues = - values.apply( - "Duplicate as flatMap", - ParDo.of( - new DoFn<KV<String, ActivePowerRecord>, KV<SensorParentKey, ActivePowerRecord>>() { - @StateId("parents") - private final StateSpec<ValueState<Set<String>>> parents = StateSpecs.value(); - - // Generate a KV-pair for every child-parent match - @ProcessElement - public void processElement(@Element final KV<String, ActivePowerRecord> kv, - final OutputReceiver<KV<SensorParentKey, ActivePowerRecord>> out, - @StateId("parents") final ValueState<Set<String>> state, - final ProcessContext c) { - final ActivePowerRecord record = kv.getValue() == null ? null : kv.getValue(); - final Set<String> newParents = - c.sideInput(childParentPairMap).get(kv.getKey()) == null - ? Collections.emptySet() - : c.sideInput(childParentPairMap).get(kv.getKey()); - System.out.println("Map Entry for Key: " + newParents.toString()); - final Set<String> oldParents = - MoreObjects.firstNonNull(state.read(), Collections.emptySet()); - - // Forward new Pairs if they exist - if (!newParents.isEmpty()) { - for (final String parent : newParents) { - - // Forward flat mapped record - final SensorParentKey key = new SensorParentKey(kv.getKey(), parent); - out.output(KV.of(key, record)); - } - } - if (!newParents.equals(oldParents)) { - for (final String oldParent : oldParents) { - if (!newParents.contains(oldParent)) { - // Forward Delete - final SensorParentKey key = new SensorParentKey(kv.getKey(), oldParent); - out.output(KV.of(key, null)); - - } - } - state.write(newParents); - } - } - }).withSideInputs(childParentPairMap)) - - .apply("Debugging output before filtering latest", ParDo.of( - new DoFn<KV<SensorParentKey, ActivePowerRecord>, KV<SensorParentKey, ActivePowerRecord>>() { - @ProcessElement - public void processElement( - @Element final KV<SensorParentKey, ActivePowerRecord> kv, - final OutputReceiver<KV<SensorParentKey, ActivePowerRecord>> out, - final ProcessContext c) { - System.out.println("Before filter latest Sensor: " + kv.getKey().getSensor() - + " Parent: " + kv.getKey().getParent() + " ValueKey : " - + kv.getValue().getIdentifier() + " ValueInW: " - + kv.getValue().getValueInW() - + " Timestamp: " + kv.getValue().getTimestamp()); - out.output(kv); - } - })) - .apply("Filter only latest changes", Latest.perKey()) - .apply("Debugging output after filtering latest", ParDo.of( - new DoFn<KV<SensorParentKey, ActivePowerRecord>, KV<SensorParentKey, ActivePowerRecord>>() { - @ProcessElement - public void processElement( - @Element final KV<SensorParentKey, ActivePowerRecord> kv, - final OutputReceiver<KV<SensorParentKey, ActivePowerRecord>> out, - final ProcessContext c) { - System.out.println("After filter latest Sensor: " + kv.getKey().getSensor() - + " Parent: " + kv.getKey().getParent() + " ValueKey : " - + kv.getValue().getIdentifier() + " ValueInW: " - + kv.getValue().getValueInW() - + " Timestamp: " + kv.getValue().getTimestamp()); - out.output(kv); - } - })); - - - final PCollection<KV<String, AggregatedActivePowerRecord>> aggregations = flatMappedValues - - .apply("Set key to group", MapElements.via( - new SimpleFunction<KV<SensorParentKey, ActivePowerRecord>, KV<String, ActivePowerRecord>>() { - @Override - public KV<String, ActivePowerRecord> apply( - final KV<SensorParentKey, ActivePowerRecord> kv) { - System.out.println("key set to group" + kv.getKey() + "Timestamp: " - + kv.getValue().getTimestamp()); - return KV.of(kv.getKey().getParent(), kv.getValue()); - } - })) - .apply("Aggregate per group", Combine.perKey(new RecordAggregation())) - .apply("Set the Identifier in AggregatedActivePowerRecord", MapElements.via( - new SimpleFunction<KV<String, AggregatedActivePowerRecord>, KV<String, AggregatedActivePowerRecord>>() { - @Override - public KV<String, AggregatedActivePowerRecord> apply( - final KV<String, AggregatedActivePowerRecord> kv) { - final AggregatedActivePowerRecord record = new AggregatedActivePowerRecord( - kv.getKey(), kv.getValue().getTimestamp(), kv.getValue().getCount(), - kv.getValue().getSumInW(), kv.getValue().getAverageInW()); - System.out.println("set identifier to: " + record.getIdentifier() + "Timestamp: " - + record.getTimestamp()); - return KV.of(kv.getKey(), record); - } - })); - - - aggregations.apply("Print Stats", MapElements.via( - new SimpleFunction<KV<String, AggregatedActivePowerRecord>, KV<String, AggregatedActivePowerRecord>>() { - - @Override - public KV<String, AggregatedActivePowerRecord> apply( - final KV<String, AggregatedActivePowerRecord> kv) { - System.out.println("Output: Key: " - + kv.getKey() - + " Identifier: " + kv.getValue().getIdentifier() - + " Timestamp: " + kv.getValue().getTimestamp() - + " Avg: " + kv.getValue().getAverageInW() - + " Count: " + kv.getValue().getCount() - + " Sum: " + kv.getValue().getSumInW()); - // - return kv; - } - })) - .apply("Write to aggregation results", KafkaIO.<String, AggregatedActivePowerRecord>write() - .withBootstrapServers(bootstrapServer) - .withTopic(outputTopic) - .withKeySerializer(StringSerializer.class) - .withValueSerializer(AggregatedActivePowerRecordSerializer.class)); - - pipeline.run().waitUntilFinish(); - } - -} diff --git a/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/Uc4ApplicationBeamNoFeedback.java b/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/Uc4ApplicationBeamNoFeedback.java deleted file mode 100644 index 0c7ff4a5d5ff922caf2896057d9d013610eacbd0..0000000000000000000000000000000000000000 --- a/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/Uc4ApplicationBeamNoFeedback.java +++ /dev/null @@ -1,315 +0,0 @@ -package application; - -import com.google.common.base.MoreObjects; -import com.google.common.math.StatsAccumulator; -import io.confluent.kafka.serializers.KafkaAvroDeserializer; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; -import org.apache.beam.runners.samza.SamzaRunner; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.coders.AvroCoder; -import org.apache.beam.sdk.coders.CoderRegistry; -import org.apache.beam.sdk.coders.NullableCoder; -import org.apache.beam.sdk.coders.SetCoder; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.io.kafka.KafkaIO; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.state.StateSpec; -import org.apache.beam.sdk.state.StateSpecs; -import org.apache.beam.sdk.state.ValueState; -import org.apache.beam.sdk.transforms.Combine; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.Latest; -import org.apache.beam.sdk.transforms.MapElements; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.SimpleFunction; -import org.apache.beam.sdk.transforms.View; -import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; -import org.apache.beam.sdk.transforms.windowing.FixedWindows; -import org.apache.beam.sdk.transforms.windowing.Repeatedly; -import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PBegin; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.joda.time.Duration; -import serialization.AggregatedActivePowerRecordCoder; -import serialization.AggregatedActivePowerRecordSerializer; -import serialization.EventCoder; -import serialization.EventDeserializer; -import serialization.SensorParentKeyCoder; -import theodolite.commons.beam.kafka.EventTimePolicy; -import titan.ccp.configuration.events.Event; -import titan.ccp.model.records.ActivePowerRecord; -import titan.ccp.model.records.AggregatedActivePowerRecord; - -/** - * Usecase implementation without the feedback. - */ -public final class Uc4ApplicationBeamNoFeedback { - - private static final String JOB_NAME = "Uc4Application"; - private static final String YES = "true"; - private static final String USE_AVRO_READER = YES; - private static final String AUTO_COMMIT_CONFIG = YES; - - private static final String AUTO_OFFSET_RESET_CONFIG = "earliest"; - private static final int DELAY = 5; - - private static final String PARENTS = "Parents: "; - private static final String VALUE_KEY = "ValueKey: "; - private static final String VALUE_IN_W = "ValueInW: "; - private static final String TIMESTAMP = "Timestamp: "; - - /** - * Private constructor to avoid instantiation. - */ - private Uc4ApplicationBeamNoFeedback() { - throw new UnsupportedOperationException(); - } - - /** - * Start executing this microservice. - */ - @SuppressWarnings({"serial", "unchecked", "rawtypes"}) - public static void main(final String[] args) { - - final String inputTopic = "input"; - final String outputTopic = "output"; - final String bootstrapServer = "localhost:9092"; - final String configurationTopic = "configuration"; - final String schemaRegistryUrl = "http://localhost:8081"; - final Duration duration = Duration.standardSeconds(15); - - // Set consumer configuration for the schema registry and commits back to Kafka - final HashMap<String, Object> consumerConfig = new HashMap<>(); - consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, AUTO_COMMIT_CONFIG); - consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET_CONFIG); - consumerConfig.put("schema.registry.url", schemaRegistryUrl); - consumerConfig.put("specific.avro.reader", USE_AVRO_READER); - consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "uc-application-input"); - - final HashMap<String, Object> consumerConfigConfiguration = new HashMap<>(); - consumerConfigConfiguration.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, AUTO_COMMIT_CONFIG); - consumerConfigConfiguration.put( - ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET_CONFIG); - consumerConfigConfiguration.put(ConsumerConfig.GROUP_ID_CONFIG, "uc-application-configuration"); - - - - // final DirectOptions options = - // PipelineOptionsFactory.fromArgs(args).create().as(DirectOptions.class); - // options.setRunner(DirectRunner.class); - // options.setJobName("ucapplication"); - // options.setTargetParallelism(1); - final PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create(); - options.setRunner(SamzaRunner.class); - options.setJobName(JOB_NAME); - final Pipeline pipeline = Pipeline.create(options); - final CoderRegistry cr = pipeline.getCoderRegistry(); - - // Set Coders for Classes that will be distributed - cr.registerCoderForClass(ActivePowerRecord.class, - NullableCoder.of(AvroCoder.of(ActivePowerRecord.class))); - cr.registerCoderForClass(AggregatedActivePowerRecord.class, - new AggregatedActivePowerRecordCoder()); - cr.registerCoderForClass(Set.class, SetCoder.of(StringUtf8Coder.of())); - cr.registerCoderForClass(Event.class, new EventCoder()); - // SensorRegistry - cr.registerCoderForClass(SensorParentKey.class, new SensorParentKeyCoder()); - cr.registerCoderForClass(StatsAccumulator.class, AvroCoder.of(StatsAccumulator.class)); - - - final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> kafka = - KafkaIO.<String, ActivePowerRecord>read() - .withBootstrapServers(bootstrapServer) - .withTopic(inputTopic) - .withKeyDeserializer(StringDeserializer.class) - .withValueDeserializerAndCoder((Class) KafkaAvroDeserializer.class, - NullableCoder.of(AvroCoder.of(ActivePowerRecord.class))) - .withConsumerConfigUpdates(consumerConfig) - // Set TimeStampPolicy for event time - .withTimestampPolicyFactory( - (tp, previousWaterMark) -> new EventTimePolicy(previousWaterMark)) - - // .commitOffsetsInFinalize() - .withoutMetadata(); - // Apply pipeline transformations - // Read from Kafka - final PCollection<KV<String, ActivePowerRecord>> values = pipeline.apply(kafka) - .apply("Apply Winddows", Window.into(FixedWindows.of(duration))); - - - // Build the configuration stream from a changelog. - final PCollection<KV<String, Set<String>>> configurationStream = pipeline - .apply("Read sensor groups", KafkaIO.<Event, String>read() - .withBootstrapServers(bootstrapServer) - .withTopic(configurationTopic) - .withKeyDeserializer(EventDeserializer.class) - .withValueDeserializer(StringDeserializer.class) - .withConsumerConfigUpdates(consumerConfigConfiguration) - // .commitOffsetsInFinalize() - .withoutMetadata()) - - .apply("Generate Parents for every Sensor", ParDo.of(new GenerateParentsFn())) - .apply("Update child and parent pairs", ParDo.of(new UpdateChildParentPairs())) - .apply("Set trigger for configurations", Window - .<KV<String, Set<String>>>configure() - .triggering(Repeatedly.forever( - AfterProcessingTime.pastFirstElementInPane() - .plusDelayOf(Duration.standardSeconds(DELAY)))) - .accumulatingFiredPanes()); - // This may need to be changed to eliminate duplicates in first iteration - - - final PCollectionView<Map<String, Set<String>>> childParentPairMap = - // configurationStream.apply(Latest.perKey()) - configurationStream.apply(View.asMap()); - - - final PCollection<KV<SensorParentKey, ActivePowerRecord>> flatMappedValues = - values.apply( - "Duplicate as flatMap", - ParDo.of( - new DoFn<KV<String, ActivePowerRecord>, KV<SensorParentKey, ActivePowerRecord>>() { - @StateId("parents") - private final StateSpec<ValueState<Set<String>>> parents = StateSpecs.value(); - - // Generate a KV-pair for every child-parent match - @ProcessElement - public void processElement(@Element final KV<String, ActivePowerRecord> kv, - final OutputReceiver<KV<SensorParentKey, ActivePowerRecord>> out, - @StateId("parents") final ValueState<Set<String>> state, - final ProcessContext c) { - final ActivePowerRecord record = kv.getValue() == null ? null : kv.getValue(); - final Set<String> newParents = - c.sideInput(childParentPairMap).get(kv.getKey()) == null - ? Collections.emptySet() - : c.sideInput(childParentPairMap).get(kv.getKey()); - System.out.println("Map Entry for Key: " + newParents.toString()); - final Set<String> oldParents = - MoreObjects.firstNonNull(state.read(), Collections.emptySet()); - - // Forward new Pairs if they exist - if (!newParents.isEmpty()) { - for (final String parent : newParents) { - - // Forward flat mapped record - final SensorParentKey key = new SensorParentKey(kv.getKey(), parent); - out.output(KV.of(key, record)); - } - } - if (!newParents.equals(oldParents)) { - for (final String oldParent : oldParents) { - if (!newParents.contains(oldParent)) { - // Forward Delete - final SensorParentKey key = new SensorParentKey(kv.getKey(), oldParent); - out.output(KV.of(key, null)); - - } - } - state.write(newParents); - } - } - }).withSideInputs(childParentPairMap)) - - .apply("Debugging output before filtering latest", ParDo.of( - new DoFn<KV<SensorParentKey, ActivePowerRecord>, - KV<SensorParentKey, ActivePowerRecord>>() { - @ProcessElement - public void processElement( - @Element final KV<SensorParentKey, ActivePowerRecord> kv, - final OutputReceiver<KV<SensorParentKey, ActivePowerRecord>> out, - final ProcessContext c) { - System.out.println("Before filter latest Sensor: " + kv.getKey().getSensor() - + PARENTS + kv.getKey().getParent() + VALUE_KEY - + kv.getValue().getIdentifier() + VALUE_IN_W - + kv.getValue().getValueInW() - + TIMESTAMP + kv.getValue().getTimestamp()); - out.output(kv); - } - })) - .apply("Filter only latest changes", Latest.perKey()) - .apply("Debugging output after filtering latest", ParDo.of( - new DoFn<KV<SensorParentKey, ActivePowerRecord>, - KV<SensorParentKey, ActivePowerRecord>>() { - @ProcessElement - public void processElement( - @Element final KV<SensorParentKey, ActivePowerRecord> kv, - final OutputReceiver<KV<SensorParentKey, ActivePowerRecord>> out, - final ProcessContext c) { - System.out.println("After filter latest Sensor: " + kv.getKey().getSensor() - + PARENTS + kv.getKey().getParent() + VALUE_KEY - + kv.getValue().getIdentifier() + VALUE_IN_W - + kv.getValue().getValueInW() - + TIMESTAMP + kv.getValue().getTimestamp()); - out.output(kv); - } - })); - - - final PCollection<KV<String, AggregatedActivePowerRecord>> aggregations = flatMappedValues - - .apply("Set key to group", MapElements.via( - new SimpleFunction<KV<SensorParentKey, ActivePowerRecord>, - KV<String, ActivePowerRecord>>() { - @Override - public KV<String, ActivePowerRecord> apply( - final KV<SensorParentKey, ActivePowerRecord> kv) { - System.out.println("key set to group" + kv.getKey() + TIMESTAMP - + kv.getValue().getTimestamp()); - return KV.of(kv.getKey().getParent(), kv.getValue()); - } - })) - .apply("Aggregate per group", Combine.perKey(new RecordAggregation())) - .apply("Set the Identifier in AggregatedActivePowerRecord", MapElements.via( - new SimpleFunction<KV<String, AggregatedActivePowerRecord>, - KV<String, AggregatedActivePowerRecord>>() { - @Override - public KV<String, AggregatedActivePowerRecord> apply( - final KV<String, AggregatedActivePowerRecord> kv) { - final AggregatedActivePowerRecord record = new AggregatedActivePowerRecord( - kv.getKey(), kv.getValue().getTimestamp(), kv.getValue().getCount(), - kv.getValue().getSumInW(), kv.getValue().getAverageInW()); - System.out.println("set identifier to: " + record.getIdentifier() + TIMESTAMP - + record.getTimestamp()); - return KV.of(kv.getKey(), record); - } - })); - - - aggregations.apply("Print Stats", MapElements.via( - new SimpleFunction<KV<String, AggregatedActivePowerRecord>, - KV<String, AggregatedActivePowerRecord>>() { - - @Override - public KV<String, AggregatedActivePowerRecord> apply( - final KV<String, AggregatedActivePowerRecord> kv) { - System.out.println("Output: Key: " - + kv.getKey() - + " Identifier: " + kv.getValue().getIdentifier() - + TIMESTAMP + kv.getValue().getTimestamp() - + " Avg: " + kv.getValue().getAverageInW() - + " Count: " + kv.getValue().getCount() - + " Sum: " + kv.getValue().getSumInW()); - // - return kv; - } - })) - .apply("Write to aggregation results", KafkaIO.<String, AggregatedActivePowerRecord>write() - .withBootstrapServers(bootstrapServer) - .withTopic(outputTopic) - .withKeySerializer(StringSerializer.class) - .withValueSerializer(AggregatedActivePowerRecordSerializer.class)); - - pipeline.run().waitUntilFinish(); - } - -} diff --git a/theodolite-benchmarks/uc4-beam/src/main/java/application/Uc4BeamPipeline.java b/theodolite-benchmarks/uc4-beam/src/main/java/application/Uc4BeamPipeline.java index e93f0da2271b894f65adc03803aa7448a0d0a3cc..7d03a3e00996f11a00f6b73d440ad4d7ed819de4 100644 --- a/theodolite-benchmarks/uc4-beam/src/main/java/application/Uc4BeamPipeline.java +++ b/theodolite-benchmarks/uc4-beam/src/main/java/application/Uc4BeamPipeline.java @@ -1,4 +1,4 @@ -package application; +package application; //NOPMD import com.google.common.math.StatsAccumulator; import java.util.HashMap; @@ -55,7 +55,7 @@ import titan.ccp.model.records.AggregatedActivePowerRecord; */ public final class Uc4BeamPipeline extends AbstractPipeline { - protected Uc4BeamPipeline(final PipelineOptions options, final Configuration config) { + protected Uc4BeamPipeline(final PipelineOptions options, final Configuration config) { //NOPMD super(options, config); // Additional needed variables