From 0254266ad5a70a32174899ebad13df82a77c8cc9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6ren=20Henning?= <soeren.henning@email.uni-kiel.de> Date: Mon, 31 Jan 2022 18:33:46 +0100 Subject: [PATCH] Set indivdual state store names, fix #335 --- .../main/java/application/DuplicateAsFlatMap.java | 13 ++++++++----- .../java/application/UpdateChildParentPairs.java | 12 +++++++----- 2 files changed, 15 insertions(+), 10 deletions(-) diff --git a/theodolite-benchmarks/uc4-beam/src/main/java/application/DuplicateAsFlatMap.java b/theodolite-benchmarks/uc4-beam/src/main/java/application/DuplicateAsFlatMap.java index cf25f043e..16e47f6e1 100644 --- a/theodolite-benchmarks/uc4-beam/src/main/java/application/DuplicateAsFlatMap.java +++ b/theodolite-benchmarks/uc4-beam/src/main/java/application/DuplicateAsFlatMap.java @@ -14,13 +14,17 @@ import titan.ccp.model.records.ActivePowerRecord; /** - * Duplicates the Kv containing the (Children,Parents) pair as a flat map. + * Duplicates the {@link KV} containing the (children,parents) pairs as flatMap. */ public class DuplicateAsFlatMap extends DoFn<KV<String, ActivePowerRecord>, KV<SensorParentKey, ActivePowerRecord>> { + private static final long serialVersionUID = -5132355515723961647L; - @StateId("parents") - private final StateSpec<ValueState<Set<String>>> parents = StateSpecs.value();// NOPMD + + private static final String STATE_STORE_NAME = "DuplicateParents"; + + @StateId(STATE_STORE_NAME) + private final StateSpec<ValueState<Set<String>>> parents = StateSpecs.value(); private final PCollectionView<Map<String, Set<String>>> childParentPairMap; public DuplicateAsFlatMap(final PCollectionView<Map<String, Set<String>>> childParentPairMap) { @@ -28,7 +32,6 @@ public class DuplicateAsFlatMap this.childParentPairMap = childParentPairMap; } - /** * Generate a KV-pair for every child-parent match. */ @@ -36,7 +39,7 @@ public class DuplicateAsFlatMap public void processElement( @Element final KV<String, ActivePowerRecord> kv, final OutputReceiver<KV<SensorParentKey, ActivePowerRecord>> out, - @StateId("parents") final ValueState<Set<String>> state, + @StateId(STATE_STORE_NAME) final ValueState<Set<String>> state, final ProcessContext c) { final ActivePowerRecord record = kv.getValue() == null ? null : kv.getValue(); diff --git a/theodolite-benchmarks/uc4-beam/src/main/java/application/UpdateChildParentPairs.java b/theodolite-benchmarks/uc4-beam/src/main/java/application/UpdateChildParentPairs.java index 8692be5ae..4a689d3b8 100644 --- a/theodolite-benchmarks/uc4-beam/src/main/java/application/UpdateChildParentPairs.java +++ b/theodolite-benchmarks/uc4-beam/src/main/java/application/UpdateChildParentPairs.java @@ -12,11 +12,12 @@ import org.apache.beam.sdk.values.KV; */ public class UpdateChildParentPairs extends DoFn<KV<String, Set<String>>, KV<String, Set<String>>> { + private static final String STATE_STORE_NAME = "UpdateParents"; + private static final long serialVersionUID = 1L; - @StateId("parents") - private final StateSpec<ValueState<Set<String>>> parents = // NOPMD - StateSpecs.value(); + @StateId(STATE_STORE_NAME) + private final StateSpec<ValueState<Set<String>>> parents = StateSpecs.value(); /** * Match the changes accordingly. @@ -24,9 +25,10 @@ public class UpdateChildParentPairs extends DoFn<KV<String, Set<String>>, KV<Str * @param kv the sensor parents set that contains the changes. */ @ProcessElement - public void processElement(@Element final KV<String, Set<String>> kv, + public void processElement( + @Element final KV<String, Set<String>> kv, final OutputReceiver<KV<String, Set<String>>> out, - @StateId("parents") final ValueState<Set<String>> state) { + @StateId(STATE_STORE_NAME) final ValueState<Set<String>> state) { if (kv.getValue() == null || !kv.getValue().equals(state.read())) { out.output(kv); state.write(kv.getValue()); -- GitLab