Skip to content
Snippets Groups Projects
Commit 0254266a authored by Sören Henning's avatar Sören Henning
Browse files

Set indivdual state store names, fix #335

parent 3bdc9c14
No related branches found
No related tags found
No related merge requests found
Pipeline #6344 failed
......@@ -14,13 +14,17 @@ import titan.ccp.model.records.ActivePowerRecord;
/**
* Duplicates the Kv containing the (Children,Parents) pair as a flat map.
* Duplicates the {@link KV} containing the (children,parents) pairs as flatMap.
*/
public class DuplicateAsFlatMap
extends DoFn<KV<String, ActivePowerRecord>, KV<SensorParentKey, ActivePowerRecord>> {
private static final long serialVersionUID = -5132355515723961647L;
@StateId("parents")
private final StateSpec<ValueState<Set<String>>> parents = StateSpecs.value();// NOPMD
private static final String STATE_STORE_NAME = "DuplicateParents";
@StateId(STATE_STORE_NAME)
private final StateSpec<ValueState<Set<String>>> parents = StateSpecs.value();
private final PCollectionView<Map<String, Set<String>>> childParentPairMap;
public DuplicateAsFlatMap(final PCollectionView<Map<String, Set<String>>> childParentPairMap) {
......@@ -28,7 +32,6 @@ public class DuplicateAsFlatMap
this.childParentPairMap = childParentPairMap;
}
/**
* Generate a KV-pair for every child-parent match.
*/
......@@ -36,7 +39,7 @@ public class DuplicateAsFlatMap
public void processElement(
@Element final KV<String, ActivePowerRecord> kv,
final OutputReceiver<KV<SensorParentKey, ActivePowerRecord>> out,
@StateId("parents") final ValueState<Set<String>> state,
@StateId(STATE_STORE_NAME) final ValueState<Set<String>> state,
final ProcessContext c) {
final ActivePowerRecord record = kv.getValue() == null ? null : kv.getValue();
......
......@@ -12,11 +12,12 @@ import org.apache.beam.sdk.values.KV;
*/
public class UpdateChildParentPairs extends DoFn<KV<String, Set<String>>, KV<String, Set<String>>> {
private static final String STATE_STORE_NAME = "UpdateParents";
private static final long serialVersionUID = 1L;
@StateId("parents")
private final StateSpec<ValueState<Set<String>>> parents = // NOPMD
StateSpecs.value();
@StateId(STATE_STORE_NAME)
private final StateSpec<ValueState<Set<String>>> parents = StateSpecs.value();
/**
* Match the changes accordingly.
......@@ -24,9 +25,10 @@ public class UpdateChildParentPairs extends DoFn<KV<String, Set<String>>, KV<Str
* @param kv the sensor parents set that contains the changes.
*/
@ProcessElement
public void processElement(@Element final KV<String, Set<String>> kv,
public void processElement(
@Element final KV<String, Set<String>> kv,
final OutputReceiver<KV<String, Set<String>>> out,
@StateId("parents") final ValueState<Set<String>> state) {
@StateId(STATE_STORE_NAME) final ValueState<Set<String>> state) {
if (kv.getValue() == null || !kv.getValue().equals(state.read())) {
out.output(kv);
state.write(kv.getValue());
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment