diff --git a/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/AggregatedActivePowerRecordEventTimePolicy.java b/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/AggregatedActivePowerRecordEventTimePolicy.java index 394af1e9bba5cd94883b7bd626242ba791109d47..a4bf080cfaf502f881cab42bb8b578d7b0834dae 100644 --- a/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/AggregatedActivePowerRecordEventTimePolicy.java +++ b/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/AggregatedActivePowerRecordEventTimePolicy.java @@ -1,7 +1,7 @@ package application; - import java.util.Optional; + import org.apache.beam.sdk.io.kafka.KafkaRecord; import org.apache.beam.sdk.io.kafka.TimestampPolicy; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; diff --git a/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/EventTimePolicy.java b/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/EventTimePolicy.java index ade4f34b7da9aad2d93e47110b22f6a9770a07f9..da389521f7f9157b736dc4f559cbfa64cab0aaa9 100644 --- a/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/EventTimePolicy.java +++ b/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/EventTimePolicy.java @@ -1,6 +1,5 @@ package application; - import java.util.Optional; import org.apache.beam.sdk.io.kafka.KafkaRecord; import org.apache.beam.sdk.io.kafka.TimestampPolicy; diff --git a/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/GenerateParentsFn.java b/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/GenerateParentsFn.java index b2d6b8e3d743b3e8dacc72fd10ee5f3d194d24bf..f58ebde52de5da2a07a5280d1e361a524ca6a2e7 100644 --- a/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/GenerateParentsFn.java +++ b/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/GenerateParentsFn.java @@ -16,13 +16,18 @@ import titan.ccp.model.sensorregistry.Sensor; import titan.ccp.model.sensorregistry.SensorRegistry; /** - * DoFn class to generate a child-parent pair for every sensor in the hierarchie. + * DoFn class to generate a child-parent pair for every sensor in the hierarchy. */ public class GenerateParentsFn extends DoFn<KV<Event, String>, KV<String, Set<String>>> { private static final long serialVersionUID = 958270648688932091L; + /** + * Transforms a parent [children] map of sensors to a child [parents] map. + * @param kv input map. + * @param out outputstream. + */ @ProcessElement public void processElement(@Element final KV<Event, String> kv, final OutputReceiver<KV<String, Set<String>>> out) { @@ -42,7 +47,7 @@ public class GenerateParentsFn extends DoFn<KV<Event, String>, KV<String, Set<St child -> child.getIdentifier(), child -> child.getParent() .map(p -> Stream.of(p.getIdentifier()).collect(Collectors.toSet())) - .orElseGet(() -> Collections.<String>emptySet()))); + .orElse(Collections.<String>emptySet()))); } private Stream<Sensor> streamAllChildren(final AggregatedSensor sensor) { diff --git a/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/RecordAggregation.java b/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/RecordAggregation.java index d35ef32ea36298e957bde635c601464db502515f..3cb79a11a78b331e4fdc02b51759b8dac559d453 100644 --- a/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/RecordAggregation.java +++ b/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/RecordAggregation.java @@ -10,7 +10,7 @@ import titan.ccp.model.records.AggregatedActivePowerRecord; /** - * CombineFn to aggregate ActivePowerRecords into AggregatedActivePowerRecords + * CombineFn to aggregate ActivePowerRecords into AggregatedActivePowerRecords. */ public class RecordAggregation extends CombineFn<ActivePowerRecord, RecordAggregation.Accum, AggregatedActivePowerRecord> { @@ -19,12 +19,15 @@ public class RecordAggregation private static final long serialVersionUID = 4362213539553233529L; + /** + * Wrapper for an accumulation of records. + */ @DefaultCoder(AvroCoder.class) public static class Accum implements Serializable { private static final long serialVersionUID = 3701311203919534376L; - long count = 0; - Double sum = 0.0; - long timestamp = 0; + protected long count = 0; + protected Double sum = 0.0; + protected long timestamp = 0; } @Override diff --git a/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/Uc4ApplicationBeam.java b/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/Uc4ApplicationBeam.java index 7604d5f87f709bc710120a82a616262ecdd614d5..2bef6aab9d9b26320ba0619d952de24d5e56a000 100644 --- a/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/Uc4ApplicationBeam.java +++ b/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/Uc4ApplicationBeam.java @@ -54,57 +54,80 @@ import serialization.EventDeserializer; import serialization.SensorParentKeyCoder; import titan.ccp.configuration.events.Event; import titan.ccp.model.records.ActivePowerRecord; -import titan.ccp.model.records.AggregatedActivePowerRecord;; - +import titan.ccp.model.records.AggregatedActivePowerRecord; + +/** + * Implementation of the use case Hierarchical Aggregation using Apache Beam with the Samza + * Runner. To run locally in standalone start Kafka, Zookeeper, the schema-registry and the + * workload generator using the delayed_startup.sh script. Add + * --configFactory=org.apache.samza.config.factories.PropertiesConfigFactory + * --configFilePath=${workspace_loc:uc4-application-samza}/config/standalone_local.properties + * --samzaExecutionEnvironment=STANDALONE --maxSourceParallelism=1024 --as program arguments. To + * persist logs add ${workspace_loc:/uc4-application-samza/eclipseConsoleLogs.log} as Output File + * under Standard Input Output in Common in the Run Configuration Start via Eclipse Run. + */ public class Uc4ApplicationBeam { + private static final String JOB_NAME = "Uc4Application"; + private static final String BOOTSTRAP = "KAFKA_BOOTSTRAP_SERVERS"; + private static final String INPUT = "INPUT"; + private static final String OUTPUT = "OUTPUT"; + private static final String CONFIGURATION = "CONFIGURATION"; + private static final String FEEDBACKTOPIC = "FEEDBACKTOPIC"; + private static final String SCHEMA_REGISTRY = "SCHEMA_REGISTRY_URL"; + private static final String YES = "true"; + private static final String USE_AVRO_READER = YES; + private static final String AUTO_COMMIT_CONFIG = YES; + private static final String KAFKA_WINDOW_DURATION = "KAFKA_WINDOW_DURATION"; + private static final String TRIGGER_INTERVAL = "TRIGGER_INTERVAL"; + private static final String GRACE_PERIOD = "GRACE_PERIOD"; + private static final String AUTO_OFFSET_RESET_CONFIG = "earliest"; + + + /** - * Implementation of the use case Hierarchical Aggregation using Apache Beam with the Samza - * Runner. To run locally in standalone start Kafka, Zookeeper, the schema-registry and the - * workload generator using the delayed_startup.sh script. Add - * --configFactory=org.apache.samza.config.factories.PropertiesConfigFactory - * --configFilePath=${workspace_loc:uc4-application-samza}/config/standalone_local.properties - * --samzaExecutionEnvironment=STANDALONE --maxSourceParallelism=1024 --as program arguments. To - * persist logs add ${workspace_loc:/uc4-application-samza/eclipseConsoleLogs.log} as Output File - * under Standard Input Output in Common in the Run Configuration Start via Eclipse Run. + * Private constructor to avoid instantiation. */ + private Uc4ApplicationBeam() { + throw new UnsupportedOperationException(); + } + /** + * Start executing this microservice. + */ @SuppressWarnings({"serial", "unchecked", "rawtypes"}) public static void main(final String[] args) { // Set Configuration for Windows final int windowDuration = Integer.parseInt( - System.getenv("KAFKA_WINDOW_DURATION") != null - ? System.getenv("KAFKA_WINDOW_DURATION") - : "60"); + System.getenv(KAFKA_WINDOW_DURATION) == null + ? "60" : System.getenv(KAFKA_WINDOW_DURATION)); final Duration duration = Duration.standardSeconds(windowDuration); final int triggerInterval = Integer.parseInt( - System.getenv("TRIGGER_INTERVAL") != null - ? System.getenv("TRIGGER_INTERVAL") - : "30"); + System.getenv(TRIGGER_INTERVAL) == null + ? "30" : System.getenv(TRIGGER_INTERVAL)); final Duration triggerDelay = Duration.standardSeconds(triggerInterval); final int grace = Integer.parseInt( - System.getenv("GRACE_PERIOD") != null - ? System.getenv("GRACE_PERIOD") - : "270"); + System.getenv(GRACE_PERIOD) == null + ? "270" : System.getenv(GRACE_PERIOD)); final Duration gracePeriod = Duration.standardSeconds(grace); // Set Configuration for Kafka final String bootstrapServer = - System.getenv("KAFKA_BOOTSTRAP_SERVERS") != null ? System.getenv("KAFKA_BOOTSTRAP_SERVERS") - : "my-confluent-cp-kafka:9092"; - final String inputTopic = System.getenv("INPUT") != null ? System.getenv("INPUT") : "input"; - final String outputTopic = System.getenv("OUTPUT") != null ? System.getenv("OUTPUT") : "output"; + System.getenv(BOOTSTRAP) == null ? "my-confluent-cp-kafka:9092" + : System.getenv(BOOTSTRAP); + final String inputTopic = System.getenv(INPUT) == null ? "input" : System.getenv(INPUT); + final String outputTopic = System.getenv(OUTPUT) == null ? "output" : System.getenv(OUTPUT); final String configurationTopic = - System.getenv("CONFIGURATION") != null ? System.getenv("CONFIGURATION") : "configuration"; + System.getenv(CONFIGURATION) == null ? "configuration" : System.getenv(CONFIGURATION); final String feedbackTopic = - System.getenv("FEEDBACKTOPIC") != null ? System.getenv("FEEDBACKTOPIC") - : "aggregation-feedback"; - final String schemaRegistryURL = - System.getenv("SCHEMA_REGISTRY_URL") != null ? System.getenv("SCHEMA_REGISTRY_URL") - : "http://my-confluent-cp-schema-registry:8081"; + System.getenv(FEEDBACKTOPIC) == null ? "aggregation-feedback" + : System.getenv(FEEDBACKTOPIC); + final String schemaRegistryUrl = + System.getenv(SCHEMA_REGISTRY) == null ? "http://my-confluent-cp-schema-registry:8081" + : System.getenv(SCHEMA_REGISTRY); // final String inputTopic = "input"; @@ -119,21 +142,22 @@ public class Uc4ApplicationBeam { // Set consumer configuration for the schema registry and commits back to Kafka final HashMap<String, Object> consumerConfig = new HashMap<>(); - consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); - consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - consumerConfig.put("schema.registry.url", schemaRegistryURL); - consumerConfig.put("specific.avro.reader", "true"); + consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, AUTO_COMMIT_CONFIG); + consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET_CONFIG); + consumerConfig.put("schema.registry.url", schemaRegistryUrl); + consumerConfig.put("specific.avro.reader", USE_AVRO_READER); consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "uc-application-input"); final HashMap<String, Object> consumerConfigConfiguration = new HashMap<>(); - consumerConfigConfiguration.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); - consumerConfigConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + consumerConfigConfiguration.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, AUTO_COMMIT_CONFIG); + consumerConfigConfiguration.put( + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET_CONFIG); consumerConfigConfiguration.put(ConsumerConfig.GROUP_ID_CONFIG, "uc-application-configuration"); // Create run options from args final PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create(); options.setRunner(SamzaRunner.class); - options.setJobName("ucapplication"); + options.setJobName(JOB_NAME); final Pipeline pipeline = Pipeline.create(options); final CoderRegistry cr = pipeline.getCoderRegistry(); @@ -166,7 +190,7 @@ public class Uc4ApplicationBeam { // Apply pipeline transformations // Read from Kafka final PCollection<KV<String, ActivePowerRecord>> values = pipeline.apply(kafka) - .apply("Apply Winddows", Window.into(FixedWindows.of(duration))) + .apply("Read Windows", Window.into(FixedWindows.of(duration))) .apply("Set trigger for input", Window .<KV<String, ActivePowerRecord>>configure() .triggering(Repeatedly.forever( @@ -184,19 +208,21 @@ public class Uc4ApplicationBeam { .withValueDeserializer(AggregatedActivePowerRecordDeserializer.class) .withTimestampPolicyFactory( (tp, previousWaterMark) -> new AggregatedActivePowerRecordEventTimePolicy( - previousWaterMark)) + previousWaterMark)) .withoutMetadata()) .apply("Apply Winddows", Window.into(FixedWindows.of(duration))) // Convert into the correct data format - .apply("Convert AggregatedActivePowerRecord to ActivePowerRecord", MapElements.via( - new SimpleFunction<KV<String, AggregatedActivePowerRecord>, KV<String, ActivePowerRecord>>() { - @Override - public KV<String, ActivePowerRecord> apply( - final KV<String, AggregatedActivePowerRecord> kv) { - return KV.of(kv.getKey(), new ActivePowerRecord(kv.getValue().getIdentifier(), - kv.getValue().getTimestamp(), kv.getValue().getSumInW())); - } - })) + .apply("Convert AggregatedActivePowerRecord to ActivePowerRecord", + MapElements.via( + new SimpleFunction<KV<String, AggregatedActivePowerRecord>, + KV<String, ActivePowerRecord>>() { + @Override + public KV<String, ActivePowerRecord> apply( + final KV<String, AggregatedActivePowerRecord> kv) { + return KV.of(kv.getKey(), new ActivePowerRecord(kv.getValue().getIdentifier(), + kv.getValue().getTimestamp(), kv.getValue().getSumInW())); + } + })) .apply("Set trigger for feedback", Window .<KV<String, ActivePowerRecord>>configure() .triggering(Repeatedly.forever( @@ -309,9 +335,11 @@ public class Uc4ApplicationBeam { } })); // Aggregate for every sensor group of the current level - final PCollection<KV<String, AggregatedActivePowerRecord>> aggregations = flatMappedValues + final PCollection<KV<String, AggregatedActivePowerRecord>> + aggregations = flatMappedValues .apply("Set key to group", MapElements.via( - new SimpleFunction<KV<SensorParentKey, ActivePowerRecord>, KV<String, ActivePowerRecord>>() { + new SimpleFunction<KV<SensorParentKey, + ActivePowerRecord>, KV<String, ActivePowerRecord>>() { @Override public KV<String, ActivePowerRecord> apply( final KV<SensorParentKey, ActivePowerRecord> kv) { @@ -328,18 +356,22 @@ public class Uc4ApplicationBeam { .withAllowedLateness(gracePeriod) .discardingFiredPanes()) - .apply("Aggregate per group", Combine.perKey(new RecordAggregation())) - .apply("Set the Identifier in AggregatedActivePowerRecord", MapElements.via( - new SimpleFunction<KV<String, AggregatedActivePowerRecord>, KV<String, AggregatedActivePowerRecord>>() { - @Override - public KV<String, AggregatedActivePowerRecord> apply( - final KV<String, AggregatedActivePowerRecord> kv) { - final AggregatedActivePowerRecord record = new AggregatedActivePowerRecord( - kv.getKey(), kv.getValue().getTimestamp(), kv.getValue().getCount(), - kv.getValue().getSumInW(), kv.getValue().getAverageInW()); - return KV.of(kv.getKey(), record); - } - })); + .apply( + "Aggregate per group", + Combine.perKey(new RecordAggregation())) + .apply("Set the Identifier in AggregatedActivePowerRecord", + MapElements.via( + new SimpleFunction<KV<String, AggregatedActivePowerRecord>, + KV<String, AggregatedActivePowerRecord>>() { + @Override + public KV<String, AggregatedActivePowerRecord> apply( + final KV<String, AggregatedActivePowerRecord> kv) { + final AggregatedActivePowerRecord record = new AggregatedActivePowerRecord( + kv.getKey(), kv.getValue().getTimestamp(), kv.getValue().getCount(), + kv.getValue().getSumInW(), kv.getValue().getAverageInW()); + return KV.of(kv.getKey(), record); + } + })); diff --git a/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/Uc4ApplicationBeamNoFeedback.java b/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/Uc4ApplicationBeamNoFeedback.java index e528a836c75d111f602379e9567244255af79a76..4e2efbd2ec26b61b5e48c5b3aa9dd67d978cf35c 100644 --- a/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/Uc4ApplicationBeamNoFeedback.java +++ b/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/Uc4ApplicationBeamNoFeedback.java @@ -49,8 +49,37 @@ import titan.ccp.configuration.events.Event; import titan.ccp.model.records.ActivePowerRecord; import titan.ccp.model.records.AggregatedActivePowerRecord; -public class Uc4ApplicationBeamNoFeedback { +/** + * Usecase implementation without the feedback. + */ +public final class Uc4ApplicationBeamNoFeedback { + private static final String JOB_NAME = "Uc4Application"; + private static final String YES = "true"; + private static final String USE_AVRO_READER = YES; + private static final String AUTO_COMMIT_CONFIG = YES; + + private static final String AUTO_OFFSET_RESET_CONFIG = "earliest"; + private static final int DELAY = 5; + + private static final String PARENTS = "Parents: "; + private static final String VALUE_KEY = "ValueKey: "; + private static final String VALUE_IN_W = "ValueInW: "; + private static final String TIMESTAMP = "Timestamp: "; + + + + + /** + * Private constructor to avoid instantiation. + */ + private Uc4ApplicationBeamNoFeedback() { + throw new UnsupportedOperationException(); + } + + /** + * Start executing this microservice. + */ @SuppressWarnings({"serial", "unchecked", "rawtypes"}) public static void main(final String[] args) { @@ -58,20 +87,21 @@ public class Uc4ApplicationBeamNoFeedback { final String outputTopic = "output"; final String bootstrapServer = "localhost:9092"; final String configurationTopic = "configuration"; - final String schemaRegistryURL = "http://localhost:8081"; + final String schemaRegistryUrl = "http://localhost:8081"; final Duration duration = Duration.standardSeconds(15); // Set consumer configuration for the schema registry and commits back to Kafka final HashMap<String, Object> consumerConfig = new HashMap<>(); - consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); - consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - consumerConfig.put("schema.registry.url", schemaRegistryURL); - consumerConfig.put("specific.avro.reader", "true"); + consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, AUTO_COMMIT_CONFIG); + consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET_CONFIG); + consumerConfig.put("schema.registry.url", schemaRegistryUrl); + consumerConfig.put("specific.avro.reader", USE_AVRO_READER); consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "uc-application-input"); final HashMap<String, Object> consumerConfigConfiguration = new HashMap<>(); - consumerConfigConfiguration.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); - consumerConfigConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + consumerConfigConfiguration.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, AUTO_COMMIT_CONFIG); + consumerConfigConfiguration.put( + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET_CONFIG); consumerConfigConfiguration.put(ConsumerConfig.GROUP_ID_CONFIG, "uc-application-configuration"); @@ -83,7 +113,7 @@ public class Uc4ApplicationBeamNoFeedback { // options.setTargetParallelism(1); final PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create(); options.setRunner(SamzaRunner.class); - options.setJobName("ucapplication"); + options.setJobName(JOB_NAME); final Pipeline pipeline = Pipeline.create(options); final CoderRegistry cr = pipeline.getCoderRegistry(); @@ -136,7 +166,7 @@ public class Uc4ApplicationBeamNoFeedback { .<KV<String, Set<String>>>configure() .triggering(Repeatedly.forever( AfterProcessingTime.pastFirstElementInPane() - .plusDelayOf(Duration.standardSeconds(5)))) + .plusDelayOf(Duration.standardSeconds(DELAY)))) .accumulatingFiredPanes()); // This may need to be changed to eliminate duplicates in first iteration @@ -193,33 +223,35 @@ public class Uc4ApplicationBeamNoFeedback { }).withSideInputs(childParentPairMap)) .apply("Debugging output before filtering latest", ParDo.of( - new DoFn<KV<SensorParentKey, ActivePowerRecord>, KV<SensorParentKey, ActivePowerRecord>>() { + new DoFn<KV<SensorParentKey, ActivePowerRecord>, + KV<SensorParentKey, ActivePowerRecord>>() { @ProcessElement public void processElement( @Element final KV<SensorParentKey, ActivePowerRecord> kv, final OutputReceiver<KV<SensorParentKey, ActivePowerRecord>> out, final ProcessContext c) { System.out.println("Before filter latest Sensor: " + kv.getKey().getSensor() - + " Parent: " + kv.getKey().getParent() + " ValueKey : " - + kv.getValue().getIdentifier() + " ValueInW: " + + PARENTS + kv.getKey().getParent() + VALUE_KEY + + kv.getValue().getIdentifier() + VALUE_IN_W + kv.getValue().getValueInW() - + " Timestamp: " + kv.getValue().getTimestamp()); + + TIMESTAMP + kv.getValue().getTimestamp()); out.output(kv); } })) .apply("Filter only latest changes", Latest.perKey()) .apply("Debugging output after filtering latest", ParDo.of( - new DoFn<KV<SensorParentKey, ActivePowerRecord>, KV<SensorParentKey, ActivePowerRecord>>() { + new DoFn<KV<SensorParentKey, ActivePowerRecord>, + KV<SensorParentKey, ActivePowerRecord>>() { @ProcessElement public void processElement( @Element final KV<SensorParentKey, ActivePowerRecord> kv, final OutputReceiver<KV<SensorParentKey, ActivePowerRecord>> out, final ProcessContext c) { System.out.println("After filter latest Sensor: " + kv.getKey().getSensor() - + " Parent: " + kv.getKey().getParent() + " ValueKey : " - + kv.getValue().getIdentifier() + " ValueInW: " + + PARENTS + kv.getKey().getParent() + VALUE_KEY + + kv.getValue().getIdentifier() + VALUE_IN_W + kv.getValue().getValueInW() - + " Timestamp: " + kv.getValue().getTimestamp()); + + TIMESTAMP + kv.getValue().getTimestamp()); out.output(kv); } })); @@ -228,25 +260,27 @@ public class Uc4ApplicationBeamNoFeedback { final PCollection<KV<String, AggregatedActivePowerRecord>> aggregations = flatMappedValues .apply("Set key to group", MapElements.via( - new SimpleFunction<KV<SensorParentKey, ActivePowerRecord>, KV<String, ActivePowerRecord>>() { + new SimpleFunction<KV<SensorParentKey, ActivePowerRecord>, + KV<String, ActivePowerRecord>>() { @Override public KV<String, ActivePowerRecord> apply( final KV<SensorParentKey, ActivePowerRecord> kv) { - System.out.println("key set to group" + kv.getKey() + "Timestamp: " + System.out.println("key set to group" + kv.getKey() + TIMESTAMP + kv.getValue().getTimestamp()); return KV.of(kv.getKey().getParent(), kv.getValue()); } })) .apply("Aggregate per group", Combine.perKey(new RecordAggregation())) .apply("Set the Identifier in AggregatedActivePowerRecord", MapElements.via( - new SimpleFunction<KV<String, AggregatedActivePowerRecord>, KV<String, AggregatedActivePowerRecord>>() { + new SimpleFunction<KV<String, AggregatedActivePowerRecord>, + KV<String, AggregatedActivePowerRecord>>() { @Override public KV<String, AggregatedActivePowerRecord> apply( final KV<String, AggregatedActivePowerRecord> kv) { final AggregatedActivePowerRecord record = new AggregatedActivePowerRecord( kv.getKey(), kv.getValue().getTimestamp(), kv.getValue().getCount(), kv.getValue().getSumInW(), kv.getValue().getAverageInW()); - System.out.println("set identifier to: " + record.getIdentifier() + "Timestamp: " + System.out.println("set identifier to: " + record.getIdentifier() + TIMESTAMP + record.getTimestamp()); return KV.of(kv.getKey(), record); } @@ -254,7 +288,8 @@ public class Uc4ApplicationBeamNoFeedback { aggregations.apply("Print Stats", MapElements.via( - new SimpleFunction<KV<String, AggregatedActivePowerRecord>, KV<String, AggregatedActivePowerRecord>>() { + new SimpleFunction<KV<String, AggregatedActivePowerRecord>, + KV<String, AggregatedActivePowerRecord>>() { @Override public KV<String, AggregatedActivePowerRecord> apply( @@ -262,7 +297,7 @@ public class Uc4ApplicationBeamNoFeedback { System.out.println("Output: Key: " + kv.getKey() + " Identifier: " + kv.getValue().getIdentifier() - + " Timestamp: " + kv.getValue().getTimestamp() + + TIMESTAMP + kv.getValue().getTimestamp() + " Avg: " + kv.getValue().getAverageInW() + " Count: " + kv.getValue().getCount() + " Sum: " + kv.getValue().getSumInW()); diff --git a/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/UpdateChildParentPairs.java b/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/UpdateChildParentPairs.java index d5d4d3cd5f5c09d4261a097c8500b1f44f59ceb9..cabd7acf573752c05c8ecd18a639efa21bc4accb 100644 --- a/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/UpdateChildParentPairs.java +++ b/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/UpdateChildParentPairs.java @@ -8,7 +8,7 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.values.KV; /** - * Forward changes or tombstone values for deleted records + * Forward changes or tombstone values for deleted records. */ public class UpdateChildParentPairs extends DoFn<KV<String, Set<String>>, KV<String, Set<String>>> { @@ -19,7 +19,10 @@ public class UpdateChildParentPairs extends DoFn<KV<String, Set<String>>, KV<Str StateSpecs.value(); - + /** + * Match the changes accordingly. + * @param kv the sensor parents set that contains the changes. + */ @ProcessElement public void processElement(@Element final KV<String, Set<String>> kv, final OutputReceiver<KV<String, Set<String>>> out, diff --git a/theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/AggregatedActivePowerRecordSerializer.java b/theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/AggregatedActivePowerRecordSerializer.java index 6de16875201382e45bf616980b0ebf698e57749e..f4619ec9b7920b573126de4f32e3b141d8c53bf3 100644 --- a/theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/AggregatedActivePowerRecordSerializer.java +++ b/theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/AggregatedActivePowerRecordSerializer.java @@ -2,8 +2,7 @@ package serialization; import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.io.OutputStream; -import java.util.Map; + import org.apache.beam.sdk.coders.AvroCoder; import org.apache.kafka.common.serialization.Serializer; import titan.ccp.model.records.AggregatedActivePowerRecord; @@ -14,10 +13,11 @@ import titan.ccp.model.records.AggregatedActivePowerRecord; public class AggregatedActivePowerRecordSerializer implements Serializer<AggregatedActivePowerRecord> { - private transient AvroCoder avroEnCoder = AvroCoder.of(AggregatedActivePowerRecord.class); + private final transient AvroCoder avroEnCoder = AvroCoder.of(AggregatedActivePowerRecord.class); // Gab - // Fehler:/home/jan/jan-bensien-bsc/uc2-application-samza/src/main/java/serialization/AggregatedActivePowerRecordSerializer.java:9: + // Fehler:/home/jan/jan-bensien-bsc/uc2-application-samza/src/main + // /java/serialization/AggregatedActivePowerRecordSerializer.java:9: // error: AggregatedActivePowerRecordSerializer is not abstract and does not override abstract // method close() in Serializer // public class AggregatedActivePowerRecordSerializer implements Serializer @@ -27,7 +27,7 @@ public class AggregatedActivePowerRecordSerializer public byte[] serialize(final String topic, final AggregatedActivePowerRecord data) { ByteArrayOutputStream out = new ByteArrayOutputStream(); try { - this.avroEnCoder.encode(data,out); + this.avroEnCoder.encode(data, out); } catch (IOException e) { e.printStackTrace(); } @@ -39,4 +39,9 @@ public class AggregatedActivePowerRecordSerializer } return result; } + + @Override + public void close() { + Serializer.super.close(); + } } diff --git a/theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/EventCoder.java b/theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/EventCoder.java index d442e30a58d0a5df93c0c837720badba1c5417df..128ead251d437d2df9ba3858e90b86878e610ac4 100644 --- a/theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/EventCoder.java +++ b/theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/EventCoder.java @@ -5,6 +5,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.io.Serializable; import java.nio.ByteBuffer; +import java.util.Collections; import java.util.List; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; @@ -17,6 +18,8 @@ import titan.ccp.configuration.events.EventSerde; */ public class EventCoder extends Coder<Event> implements Serializable { private static final long serialVersionUID = 8403045343970659100L; + private static final int VALUE_SIZE = 4; + private transient Serde<Event> innerSerde = EventSerde.serde(); @Override @@ -26,7 +29,7 @@ public class EventCoder extends Coder<Event> implements Serializable { this.innerSerde = EventSerde.serde(); } final byte[] bytes = this.innerSerde.serializer().serialize("ser", value); - final byte[] sizeinBytes = ByteBuffer.allocate(4).putInt(bytes.length).array(); + final byte[] sizeinBytes = ByteBuffer.allocate(VALUE_SIZE).putInt(bytes.length).array(); outStream.write(sizeinBytes); outStream.write(bytes); } @@ -36,7 +39,7 @@ public class EventCoder extends Coder<Event> implements Serializable { if (this.innerSerde == null) { this.innerSerde = EventSerde.serde(); } - final byte[] sizeinBytes = new byte[4]; + final byte[] sizeinBytes = new byte[VALUE_SIZE]; inStream.read(sizeinBytes); final int size = ByteBuffer.wrap(sizeinBytes).getInt(); final byte[] bytes = new byte[size]; @@ -46,7 +49,7 @@ public class EventCoder extends Coder<Event> implements Serializable { @Override public List<? extends Coder<?>> getCoderArguments() { - return null; + return Collections.emptyList(); } @Override diff --git a/theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/EventDeserializer.java b/theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/EventDeserializer.java index f5c60380a5ab52bc9c871840f59decf41cf374a3..34e31a3059d0749848a30979f32e6df6651c1b47 100644 --- a/theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/EventDeserializer.java +++ b/theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/EventDeserializer.java @@ -5,6 +5,9 @@ import org.apache.kafka.common.serialization.ByteBufferDeserializer; import org.apache.kafka.common.serialization.Deserializer; import titan.ccp.configuration.events.Event; +/** + * Deserializer for Events(SensorRegistry changes). + */ public class EventDeserializer implements Deserializer<Event> { private final ByteBufferDeserializer byteBufferDeserializer = new ByteBufferDeserializer(); diff --git a/theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/SensorParentKeyCoder.java b/theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/SensorParentKeyCoder.java index f7285a5895bef774905239d6f35c524429afe88f..afa9440b31ac01eb9abed5c8bcdb2c4674dca4c4 100644 --- a/theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/SensorParentKeyCoder.java +++ b/theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/SensorParentKeyCoder.java @@ -1,12 +1,12 @@ package serialization; - import application.SensorParentKey; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.io.Serializable; import java.nio.ByteBuffer; +import java.util.Collections; import java.util.List; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; @@ -16,9 +16,11 @@ import org.apache.kafka.common.serialization.Serde; * Wrapper Class that encapsulates a SensorParentKey Serde in a org.apache.beam.sdk.coders.Coder. */ public class SensorParentKeyCoder extends Coder<SensorParentKey> implements Serializable { - private static final long serialVersionUID = -3480141901035692398L; - private transient Serde<application.SensorParentKey> innerSerde = SensorParentKeySerde.serde(); + private static final boolean DETERMINISTIC = true; + private static final int VALUE_SIZE = 4; + + private transient Serde<application.SensorParentKey> innerSerde = SensorParentKeySerde.serde(); @Override public void encode(final SensorParentKey value, final OutputStream outStream) @@ -28,7 +30,7 @@ public class SensorParentKeyCoder extends Coder<SensorParentKey> implements Seri } final byte[] bytes = this.innerSerde.serializer().serialize("ser", value); - final byte[] sizeinBytes = ByteBuffer.allocate(4).putInt(bytes.length).array(); + final byte[] sizeinBytes = ByteBuffer.allocate(VALUE_SIZE).putInt(bytes.length).array(); outStream.write(sizeinBytes); outStream.write(bytes); @@ -40,7 +42,7 @@ public class SensorParentKeyCoder extends Coder<SensorParentKey> implements Seri this.innerSerde = SensorParentKeySerde.serde(); } - final byte[] sizeinBytes = new byte[4]; + final byte[] sizeinBytes = new byte[VALUE_SIZE]; inStream.read(sizeinBytes); final int size = ByteBuffer.wrap(sizeinBytes).getInt(); final byte[] bytes = new byte[size]; @@ -51,12 +53,14 @@ public class SensorParentKeyCoder extends Coder<SensorParentKey> implements Seri @Override public List<? extends Coder<?>> getCoderArguments() { - return null; + return Collections.emptyList(); } @Override public void verifyDeterministic() throws NonDeterministicException { - + if (!DETERMINISTIC) { + throw new NonDeterministicException(this, "This class should be deterministic!"); + } } }