Skip to content
Snippets Groups Projects
DuplicateAsFlatMap.java 2.5 KiB
Newer Older
package application;

import com.google.common.base.MoreObjects;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import titan.ccp.model.records.ActivePowerRecord;


/**
 * Duplicates the {@link KV} containing the (children,parents) pairs as flatMap.
public class DuplicateAsFlatMap
    extends DoFn<KV<String, ActivePowerRecord>, KV<SensorParentKey, ActivePowerRecord>> {
Lorenz Boguhn's avatar
Lorenz Boguhn committed
  private static final long serialVersionUID = -5132355515723961647L;

  private static final String STATE_STORE_NAME = "DuplicateParents";

  @StateId(STATE_STORE_NAME)
Sören Henning's avatar
Sören Henning committed
  private final StateSpec<ValueState<Set<String>>> parents = StateSpecs.value(); // NOPMD

  private final PCollectionView<Map<String, Set<String>>> childParentPairMap;

  public DuplicateAsFlatMap(final PCollectionView<Map<String, Set<String>>> childParentPairMap) {
    super();
    this.childParentPairMap = childParentPairMap;
  }

  /**
   * Generate a KV-pair for every child-parent match.
  public void processElement(
      @Element final KV<String, ActivePowerRecord> kv,
      final OutputReceiver<KV<SensorParentKey, ActivePowerRecord>> out,
      @StateId(STATE_STORE_NAME) final ValueState<Set<String>> state,
      final ProcessContext c) {

    final ActivePowerRecord record = kv.getValue() == null ? null : kv.getValue();
    final Set<String> newParents =
        c.sideInput(this.childParentPairMap).get(kv.getKey()) == null
            ? Collections.emptySet()
            : c.sideInput(this.childParentPairMap).get(kv.getKey());
    final Set<String> oldParents =
        MoreObjects.firstNonNull(state.read(), Collections.emptySet());
    // Forward new Pairs if they exist
    if (!newParents.isEmpty()) {
      for (final String parent : newParents) {

        // Forward flat mapped record
        final SensorParentKey key = new SensorParentKey(kv.getKey(), parent);
        out.output(KV.of(key, record));
      }
    }
    if (!newParents.equals(oldParents)) {
      for (final String oldParent : oldParents) {
        if (!newParents.contains(oldParent)) {
          // Forward Delete
          final SensorParentKey key = new SensorParentKey(kv.getKey(), oldParent);
          out.output(KV.of(key, null));
        }
      }
      state.write(newParents);
    }
  }
}