-
Lorenz Boguhn authored
additional naive upgrade to Titan Control Center Commons 0.1.0 (new ActivePowerRecords,...) git commit -m Migrate
Lorenz Boguhn authoredadditional naive upgrade to Titan Control Center Commons 0.1.0 (new ActivePowerRecords,...) git commit -m Migrate
UpdateChildParentPairs.java 947 B
package application;
import java.util.Set;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.values.KV;
/**
* Forward changes or tombstone values for deleted records
*/
public class UpdateChildParentPairs extends DoFn<KV<String, Set<String>>, KV<String, Set<String>>> {
private static final long serialVersionUID = 1L;
@StateId("parents")
private final StateSpec<ValueState<Set<String>>> parents =
StateSpecs.value();
@ProcessElement
public void processElement(@Element final KV<String, Set<String>> kv,
final OutputReceiver<KV<String, Set<String>>> out,
@StateId("parents") final ValueState<Set<String>> state) {
if (kv.getValue() == null || !kv.getValue().equals(state.read())) {
out.output(kv);
state.write(kv.getValue());
}
}
}