Newer
Older
package application;
import com.google.common.base.MoreObjects;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import titan.ccp.model.records.ActivePowerRecord;
/**
* Duplicates the {@link KV} containing the (children,parents) pairs as flatMap.
public class DuplicateAsFlatMap
extends DoFn<KV<String, ActivePowerRecord>, KV<SensorParentKey, ActivePowerRecord>> {
private static final long serialVersionUID = -5132355515723961647L;
private static final String STATE_STORE_NAME = "DuplicateParents";
@StateId(STATE_STORE_NAME)
private final StateSpec<ValueState<Set<String>>> parents = StateSpecs.value(); // NOPMD
private final PCollectionView<Map<String, Set<String>>> childParentPairMap;
public DuplicateAsFlatMap(final PCollectionView<Map<String, Set<String>>> childParentPairMap) {
super();
this.childParentPairMap = childParentPairMap;
}
/**
* Generate a KV-pair for every child-parent match.
public void processElement(
@Element final KV<String, ActivePowerRecord> kv,
final OutputReceiver<KV<SensorParentKey, ActivePowerRecord>> out,
@StateId(STATE_STORE_NAME) final ValueState<Set<String>> state,
final ActivePowerRecord record = kv.getValue() == null ? null : kv.getValue();
final Set<String> newParents =
c.sideInput(this.childParentPairMap).get(kv.getKey()) == null
: c.sideInput(this.childParentPairMap).get(kv.getKey());
final Set<String> oldParents =
MoreObjects.firstNonNull(state.read(), Collections.emptySet());
// Forward new Pairs if they exist
if (!newParents.isEmpty()) {
for (final String parent : newParents) {
// Forward flat mapped record
final SensorParentKey key = new SensorParentKey(kv.getKey(), parent);
out.output(KV.of(key, record));
}
}
if (!newParents.equals(oldParents)) {
for (final String oldParent : oldParents) {
if (!newParents.contains(oldParent)) {
// Forward Delete
final SensorParentKey key = new SensorParentKey(kv.getKey(), oldParent);
out.output(KV.of(key, null));
}
}
state.write(newParents);
}
}
}