diff --git a/theodolite-benchmarks/uc4-beam/src/main/java/application/DuplicateAsFlatMap.java b/theodolite-benchmarks/uc4-beam/src/main/java/application/DuplicateAsFlatMap.java index cf25f043ecef8c4bc564cff454d6477d69c945aa..16e47f6e1a82dd7d2c672986d67a65d7e008c20e 100644 --- a/theodolite-benchmarks/uc4-beam/src/main/java/application/DuplicateAsFlatMap.java +++ b/theodolite-benchmarks/uc4-beam/src/main/java/application/DuplicateAsFlatMap.java @@ -14,13 +14,17 @@ import titan.ccp.model.records.ActivePowerRecord; /** - * Duplicates the Kv containing the (Children,Parents) pair as a flat map. + * Duplicates the {@link KV} containing the (children,parents) pairs as flatMap. */ public class DuplicateAsFlatMap extends DoFn<KV<String, ActivePowerRecord>, KV<SensorParentKey, ActivePowerRecord>> { + private static final long serialVersionUID = -5132355515723961647L; - @StateId("parents") - private final StateSpec<ValueState<Set<String>>> parents = StateSpecs.value();// NOPMD + + private static final String STATE_STORE_NAME = "DuplicateParents"; + + @StateId(STATE_STORE_NAME) + private final StateSpec<ValueState<Set<String>>> parents = StateSpecs.value(); private final PCollectionView<Map<String, Set<String>>> childParentPairMap; public DuplicateAsFlatMap(final PCollectionView<Map<String, Set<String>>> childParentPairMap) { @@ -28,7 +32,6 @@ public class DuplicateAsFlatMap this.childParentPairMap = childParentPairMap; } - /** * Generate a KV-pair for every child-parent match. */ @@ -36,7 +39,7 @@ public class DuplicateAsFlatMap public void processElement( @Element final KV<String, ActivePowerRecord> kv, final OutputReceiver<KV<SensorParentKey, ActivePowerRecord>> out, - @StateId("parents") final ValueState<Set<String>> state, + @StateId(STATE_STORE_NAME) final ValueState<Set<String>> state, final ProcessContext c) { final ActivePowerRecord record = kv.getValue() == null ? null : kv.getValue(); diff --git a/theodolite-benchmarks/uc4-beam/src/main/java/application/UpdateChildParentPairs.java b/theodolite-benchmarks/uc4-beam/src/main/java/application/UpdateChildParentPairs.java index 8692be5ae6637ebda86f10d66b43c6071264e099..4a689d3b801edf539a7a9890759c97550865ff26 100644 --- a/theodolite-benchmarks/uc4-beam/src/main/java/application/UpdateChildParentPairs.java +++ b/theodolite-benchmarks/uc4-beam/src/main/java/application/UpdateChildParentPairs.java @@ -12,11 +12,12 @@ import org.apache.beam.sdk.values.KV; */ public class UpdateChildParentPairs extends DoFn<KV<String, Set<String>>, KV<String, Set<String>>> { + private static final String STATE_STORE_NAME = "UpdateParents"; + private static final long serialVersionUID = 1L; - @StateId("parents") - private final StateSpec<ValueState<Set<String>>> parents = // NOPMD - StateSpecs.value(); + @StateId(STATE_STORE_NAME) + private final StateSpec<ValueState<Set<String>>> parents = StateSpecs.value(); /** * Match the changes accordingly. @@ -24,9 +25,10 @@ public class UpdateChildParentPairs extends DoFn<KV<String, Set<String>>, KV<Str * @param kv the sensor parents set that contains the changes. */ @ProcessElement - public void processElement(@Element final KV<String, Set<String>> kv, + public void processElement( + @Element final KV<String, Set<String>> kv, final OutputReceiver<KV<String, Set<String>>> out, - @StateId("parents") final ValueState<Set<String>> state) { + @StateId(STATE_STORE_NAME) final ValueState<Set<String>> state) { if (kv.getValue() == null || !kv.getValue().equals(state.read())) { out.output(kv); state.write(kv.getValue());