Skip to content
Snippets Groups Projects
Commit 72d2186b authored by Lorenz Boguhn's avatar Lorenz Boguhn
Browse files

Externalize DuplicateAsFlatMap uc4-beam-samza

parent b32a44dd
No related branches found
No related tags found
1 merge request!187Migrate Beam benchmark implementation
package application;
import com.google.common.base.MoreObjects;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import titan.ccp.model.records.ActivePowerRecord;
/**
* Duplicates the data as a flat map.
*/
public class DuplicateAsFlatMap extends DoFn
<KV<String, ActivePowerRecord>, KV<SensorParentKey, ActivePowerRecord>> {
@StateId("parents")
private final StateSpec<ValueState<Set<String>>> parents = StateSpecs.value();
private final PCollectionView<Map<String, Set<String>>> childParentPairMap;
public DuplicateAsFlatMap(final PCollectionView<Map<String, Set<String>>> childParentPairMap) {
super();
this.childParentPairMap = childParentPairMap;
}
/**
* Generate a KV-pair for every child-parent match.
*/
@ProcessElement
public void processElement(@Element final KV<String, ActivePowerRecord> kv,
final OutputReceiver<KV<SensorParentKey, ActivePowerRecord>> out,
@StateId("parents") final ValueState<Set<String>> state,
final ProcessContext c) {
final ActivePowerRecord record = kv.getValue() == null ? null : kv.getValue();
final Set<String> newParents =
c.sideInput(childParentPairMap).get(kv.getKey()) == null
? Collections.emptySet()
: c.sideInput(childParentPairMap).get(kv.getKey());
final Set<String> oldParents =
MoreObjects.firstNonNull(state.read(), Collections.emptySet());
// Forward new Pairs if they exist
if (!newParents.isEmpty()) {
for (final String parent : newParents) {
// Forward flat mapped record
final SensorParentKey key = new SensorParentKey(kv.getKey(), parent);
out.output(KV.of(key, record));
}
}
if (!newParents.equals(oldParents)) {
for (final String oldParent : oldParents) {
if (!newParents.contains(oldParent)) {
// Forward Delete
final SensorParentKey key = new SensorParentKey(kv.getKey(), oldParent);
out.output(KV.of(key, null));
}
}
state.write(newParents);
}
}
}
package application;
import com.google.common.base.MoreObjects;
import com.google.common.math.StatsAccumulator;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
......@@ -17,11 +15,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.Latest;
......@@ -66,7 +60,7 @@ import titan.ccp.model.records.AggregatedActivePowerRecord;
* persist logs add ${workspace_loc:/uc4-application-samza/eclipseConsoleLogs.log} as Output File
* under Standard Input Output in Common in the Run Configuration Start via Eclipse Run.
*/
public class Uc4ApplicationBeam {
final public class Uc4ApplicationBeam {
private static final String JOB_NAME = "Uc4Application";
private static final String BOOTSTRAP = "KAFKA_BOOTSTRAP_SERVERS";
private static final String INPUT = "INPUT";
......@@ -285,46 +279,7 @@ public class Uc4ApplicationBeam {
final PCollection<KV<SensorParentKey, ActivePowerRecord>> flatMappedValues =
inputCollection.apply(
"Duplicate as flatMap",
ParDo.of(
new DoFn<KV<String, ActivePowerRecord>, KV<SensorParentKey, ActivePowerRecord>>() {
@StateId("parents")
private final StateSpec<ValueState<Set<String>>> parents = StateSpecs.value();
// Generate a KV-pair for every child-parent match
@ProcessElement
public void processElement(@Element final KV<String, ActivePowerRecord> kv,
final OutputReceiver<KV<SensorParentKey, ActivePowerRecord>> out,
@StateId("parents") final ValueState<Set<String>> state,
final ProcessContext c) {
final ActivePowerRecord record = kv.getValue() == null ? null : kv.getValue();
final Set<String> newParents =
c.sideInput(childParentPairMap).get(kv.getKey()) == null
? Collections.emptySet()
: c.sideInput(childParentPairMap).get(kv.getKey());
final Set<String> oldParents =
MoreObjects.firstNonNull(state.read(), Collections.emptySet());
// Forward new Pairs if they exist
if (!newParents.isEmpty()) {
for (final String parent : newParents) {
// Forward flat mapped record
final SensorParentKey key = new SensorParentKey(kv.getKey(), parent);
out.output(KV.of(key, record));
}
}
if (!newParents.equals(oldParents)) {
for (final String oldParent : oldParents) {
if (!newParents.contains(oldParent)) {
// Forward Delete
final SensorParentKey key = new SensorParentKey(kv.getKey(), oldParent);
out.output(KV.of(key, null));
}
}
state.write(newParents);
}
}
}).withSideInputs(childParentPairMap))
ParDo.of(new DuplicateAsFlatMap(childParentPairMap)).withSideInputs(childParentPairMap))
.apply("Filter only latest changes", Latest.perKey())
.apply("Filter out null values",
Filter.by(
......@@ -334,6 +289,7 @@ public class Uc4ApplicationBeam {
return kv.getValue() != null;
}
}));
// Aggregate for every sensor group of the current level
final PCollection<KV<String, AggregatedActivePowerRecord>>
aggregations = flatMappedValues
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment