Skip to content
Snippets Groups Projects
DuplicateAsFlatMap.java 2.46 KiB
Newer Older
package application;

import com.google.common.base.MoreObjects;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import titan.ccp.model.records.ActivePowerRecord;


/**
Lorenz Boguhn's avatar
Lorenz Boguhn committed
 * Duplicates the Kv containing the (Children,Parents) pair as a flat map.
 */
public class DuplicateAsFlatMap extends DoFn
    <KV<String, ActivePowerRecord>, KV<SensorParentKey, ActivePowerRecord>> {
Lorenz Boguhn's avatar
Lorenz Boguhn committed
  private static final long serialVersionUID = -5132355515723961647L;
  @StateId("parents")
Lorenz Boguhn's avatar
Lorenz Boguhn committed
  private final StateSpec<ValueState<Set<String>>> parents = StateSpecs.value();//NOPMD
  private final PCollectionView<Map<String, Set<String>>> childParentPairMap;

  public DuplicateAsFlatMap(final PCollectionView<Map<String, Set<String>>> childParentPairMap) {
    super();
    this.childParentPairMap = childParentPairMap;
  }


  /**
   *  Generate a KV-pair for every child-parent match.
   */
  @ProcessElement
  public void processElement(@Element final KV<String, ActivePowerRecord> kv,
                             final OutputReceiver<KV<SensorParentKey, ActivePowerRecord>> out,
                             @StateId("parents") final ValueState<Set<String>> state,
                             final ProcessContext c) {

    final ActivePowerRecord record = kv.getValue() == null ? null : kv.getValue();
    final Set<String> newParents =
        c.sideInput(childParentPairMap).get(kv.getKey()) == null
            ? Collections.emptySet()
            : c.sideInput(childParentPairMap).get(kv.getKey());
    final Set<String> oldParents =
        MoreObjects.firstNonNull(state.read(), Collections.emptySet());
    // Forward new Pairs if they exist
    if (!newParents.isEmpty()) {
      for (final String parent : newParents) {

        // Forward flat mapped record
        final SensorParentKey key = new SensorParentKey(kv.getKey(), parent);
        out.output(KV.of(key, record));
      }
    }
    if (!newParents.equals(oldParents)) {
      for (final String oldParent : oldParents) {
        if (!newParents.contains(oldParent)) {
          // Forward Delete
          final SensorParentKey key = new SensorParentKey(kv.getKey(), oldParent);
          out.output(KV.of(key, null));
        }
      }
      state.write(newParents);
    }
  }
}