Skip to content
Snippets Groups Projects
Commit d93e9e8d authored by Sören Henning's avatar Sören Henning
Browse files

Merge branch 'master' into upgrade-beam

parents c1ac5e5b 0254266a
No related branches found
No related tags found
1 merge request!223Upgrade Beam version
Pipeline #6348 failed
...@@ -14,13 +14,17 @@ import titan.ccp.model.records.ActivePowerRecord; ...@@ -14,13 +14,17 @@ import titan.ccp.model.records.ActivePowerRecord;
/** /**
* Duplicates the Kv containing the (Children,Parents) pair as a flat map. * Duplicates the {@link KV} containing the (children,parents) pairs as flatMap.
*/ */
public class DuplicateAsFlatMap public class DuplicateAsFlatMap
extends DoFn<KV<String, ActivePowerRecord>, KV<SensorParentKey, ActivePowerRecord>> { extends DoFn<KV<String, ActivePowerRecord>, KV<SensorParentKey, ActivePowerRecord>> {
private static final long serialVersionUID = -5132355515723961647L; private static final long serialVersionUID = -5132355515723961647L;
@StateId("parents")
private final StateSpec<ValueState<Set<String>>> parents = StateSpecs.value();// NOPMD private static final String STATE_STORE_NAME = "DuplicateParents";
@StateId(STATE_STORE_NAME)
private final StateSpec<ValueState<Set<String>>> parents = StateSpecs.value();
private final PCollectionView<Map<String, Set<String>>> childParentPairMap; private final PCollectionView<Map<String, Set<String>>> childParentPairMap;
public DuplicateAsFlatMap(final PCollectionView<Map<String, Set<String>>> childParentPairMap) { public DuplicateAsFlatMap(final PCollectionView<Map<String, Set<String>>> childParentPairMap) {
...@@ -28,7 +32,6 @@ public class DuplicateAsFlatMap ...@@ -28,7 +32,6 @@ public class DuplicateAsFlatMap
this.childParentPairMap = childParentPairMap; this.childParentPairMap = childParentPairMap;
} }
/** /**
* Generate a KV-pair for every child-parent match. * Generate a KV-pair for every child-parent match.
*/ */
...@@ -36,7 +39,7 @@ public class DuplicateAsFlatMap ...@@ -36,7 +39,7 @@ public class DuplicateAsFlatMap
public void processElement( public void processElement(
@Element final KV<String, ActivePowerRecord> kv, @Element final KV<String, ActivePowerRecord> kv,
final OutputReceiver<KV<SensorParentKey, ActivePowerRecord>> out, final OutputReceiver<KV<SensorParentKey, ActivePowerRecord>> out,
@StateId("parents") final ValueState<Set<String>> state, @StateId(STATE_STORE_NAME) final ValueState<Set<String>> state,
final ProcessContext c) { final ProcessContext c) {
final ActivePowerRecord record = kv.getValue() == null ? null : kv.getValue(); final ActivePowerRecord record = kv.getValue() == null ? null : kv.getValue();
......
...@@ -12,11 +12,12 @@ import org.apache.beam.sdk.values.KV; ...@@ -12,11 +12,12 @@ import org.apache.beam.sdk.values.KV;
*/ */
public class UpdateChildParentPairs extends DoFn<KV<String, Set<String>>, KV<String, Set<String>>> { public class UpdateChildParentPairs extends DoFn<KV<String, Set<String>>, KV<String, Set<String>>> {
private static final String STATE_STORE_NAME = "UpdateParents";
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
@StateId("parents") @StateId(STATE_STORE_NAME)
private final StateSpec<ValueState<Set<String>>> parents = // NOPMD private final StateSpec<ValueState<Set<String>>> parents = StateSpecs.value();
StateSpecs.value();
/** /**
* Match the changes accordingly. * Match the changes accordingly.
...@@ -24,9 +25,10 @@ public class UpdateChildParentPairs extends DoFn<KV<String, Set<String>>, KV<Str ...@@ -24,9 +25,10 @@ public class UpdateChildParentPairs extends DoFn<KV<String, Set<String>>, KV<Str
* @param kv the sensor parents set that contains the changes. * @param kv the sensor parents set that contains the changes.
*/ */
@ProcessElement @ProcessElement
public void processElement(@Element final KV<String, Set<String>> kv, public void processElement(
@Element final KV<String, Set<String>> kv,
final OutputReceiver<KV<String, Set<String>>> out, final OutputReceiver<KV<String, Set<String>>> out,
@StateId("parents") final ValueState<Set<String>> state) { @StateId(STATE_STORE_NAME) final ValueState<Set<String>> state) {
if (kv.getValue() == null || !kv.getValue().equals(state.read())) { if (kv.getValue() == null || !kv.getValue().equals(state.read())) {
out.output(kv); out.output(kv);
state.write(kv.getValue()); state.write(kv.getValue());
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment