diff --git a/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/DuplicateAsFlatMap.java b/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/DuplicateAsFlatMap.java new file mode 100644 index 0000000000000000000000000000000000000000..ac5206db53434f3f7679664b0bd612d754f63525 --- /dev/null +++ b/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/DuplicateAsFlatMap.java @@ -0,0 +1,68 @@ +package application; + +import com.google.common.base.MoreObjects; +import java.util.Collections; +import java.util.Map; +import java.util.Set; +import org.apache.beam.sdk.state.StateSpec; +import org.apache.beam.sdk.state.StateSpecs; +import org.apache.beam.sdk.state.ValueState; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollectionView; +import titan.ccp.model.records.ActivePowerRecord; + + +/** + * Duplicates the data as a flat map. + */ +public class DuplicateAsFlatMap extends DoFn + <KV<String, ActivePowerRecord>, KV<SensorParentKey, ActivePowerRecord>> { + @StateId("parents") + private final StateSpec<ValueState<Set<String>>> parents = StateSpecs.value(); + private final PCollectionView<Map<String, Set<String>>> childParentPairMap; + + public DuplicateAsFlatMap(final PCollectionView<Map<String, Set<String>>> childParentPairMap) { + super(); + this.childParentPairMap = childParentPairMap; + } + + + /** + * Generate a KV-pair for every child-parent match. + */ + @ProcessElement + public void processElement(@Element final KV<String, ActivePowerRecord> kv, + final OutputReceiver<KV<SensorParentKey, ActivePowerRecord>> out, + @StateId("parents") final ValueState<Set<String>> state, + final ProcessContext c) { + + final ActivePowerRecord record = kv.getValue() == null ? null : kv.getValue(); + final Set<String> newParents = + c.sideInput(childParentPairMap).get(kv.getKey()) == null + ? Collections.emptySet() + : c.sideInput(childParentPairMap).get(kv.getKey()); + final Set<String> oldParents = + MoreObjects.firstNonNull(state.read(), Collections.emptySet()); + // Forward new Pairs if they exist + if (!newParents.isEmpty()) { + for (final String parent : newParents) { + + // Forward flat mapped record + final SensorParentKey key = new SensorParentKey(kv.getKey(), parent); + out.output(KV.of(key, record)); + } + } + if (!newParents.equals(oldParents)) { + for (final String oldParent : oldParents) { + if (!newParents.contains(oldParent)) { + // Forward Delete + final SensorParentKey key = new SensorParentKey(kv.getKey(), oldParent); + out.output(KV.of(key, null)); + } + } + state.write(newParents); + } + } +} + diff --git a/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/Uc4ApplicationBeam.java b/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/Uc4ApplicationBeam.java index 2bef6aab9d9b26320ba0619d952de24d5e56a000..7a9a8ca20c7d01c50e0df866ba900ac6682eae41 100644 --- a/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/Uc4ApplicationBeam.java +++ b/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/Uc4ApplicationBeam.java @@ -1,9 +1,7 @@ package application; -import com.google.common.base.MoreObjects; import com.google.common.math.StatsAccumulator; import io.confluent.kafka.serializers.KafkaAvroDeserializer; -import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Set; @@ -17,11 +15,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.kafka.KafkaIO; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.state.StateSpec; -import org.apache.beam.sdk.state.StateSpecs; -import org.apache.beam.sdk.state.ValueState; import org.apache.beam.sdk.transforms.Combine; -import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Filter; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.Latest; @@ -66,7 +60,7 @@ import titan.ccp.model.records.AggregatedActivePowerRecord; * persist logs add ${workspace_loc:/uc4-application-samza/eclipseConsoleLogs.log} as Output File * under Standard Input Output in Common in the Run Configuration Start via Eclipse Run. */ -public class Uc4ApplicationBeam { +final public class Uc4ApplicationBeam { private static final String JOB_NAME = "Uc4Application"; private static final String BOOTSTRAP = "KAFKA_BOOTSTRAP_SERVERS"; private static final String INPUT = "INPUT"; @@ -285,46 +279,7 @@ public class Uc4ApplicationBeam { final PCollection<KV<SensorParentKey, ActivePowerRecord>> flatMappedValues = inputCollection.apply( "Duplicate as flatMap", - ParDo.of( - new DoFn<KV<String, ActivePowerRecord>, KV<SensorParentKey, ActivePowerRecord>>() { - @StateId("parents") - private final StateSpec<ValueState<Set<String>>> parents = StateSpecs.value(); - - // Generate a KV-pair for every child-parent match - @ProcessElement - public void processElement(@Element final KV<String, ActivePowerRecord> kv, - final OutputReceiver<KV<SensorParentKey, ActivePowerRecord>> out, - @StateId("parents") final ValueState<Set<String>> state, - final ProcessContext c) { - final ActivePowerRecord record = kv.getValue() == null ? null : kv.getValue(); - final Set<String> newParents = - c.sideInput(childParentPairMap).get(kv.getKey()) == null - ? Collections.emptySet() - : c.sideInput(childParentPairMap).get(kv.getKey()); - final Set<String> oldParents = - MoreObjects.firstNonNull(state.read(), Collections.emptySet()); - // Forward new Pairs if they exist - if (!newParents.isEmpty()) { - for (final String parent : newParents) { - - // Forward flat mapped record - final SensorParentKey key = new SensorParentKey(kv.getKey(), parent); - out.output(KV.of(key, record)); - } - } - if (!newParents.equals(oldParents)) { - for (final String oldParent : oldParents) { - if (!newParents.contains(oldParent)) { - // Forward Delete - final SensorParentKey key = new SensorParentKey(kv.getKey(), oldParent); - out.output(KV.of(key, null)); - } - } - state.write(newParents); - } - } - }).withSideInputs(childParentPairMap)) - + ParDo.of(new DuplicateAsFlatMap(childParentPairMap)).withSideInputs(childParentPairMap)) .apply("Filter only latest changes", Latest.perKey()) .apply("Filter out null values", Filter.by( @@ -334,6 +289,7 @@ public class Uc4ApplicationBeam { return kv.getValue() != null; } })); + // Aggregate for every sensor group of the current level final PCollection<KV<String, AggregatedActivePowerRecord>> aggregations = flatMappedValues