diff --git a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractPipeline.java b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractPipeline.java index e67c5f60b6401b4ecd1f42b2a184afbc4654f425..3f04bf4373aab0394ff4574b4020065ac356724b 100644 --- a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractPipeline.java +++ b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractPipeline.java @@ -12,8 +12,9 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; */ public class AbstractPipeline extends Pipeline { - private static final String KAFKA_CONFIG_SPECIFIC_AVRO_READER = "specific.avro.reader"; - private static final String KAFKA_CONFIG_SCHEMA_REGISTRY_URL = "schema.registry.url"; + private static final String KAFKA_CONFIG_SPECIFIC_AVRO_READER = "specific.avro.reader"; // NOPMD + private static final String KAFKA_CONFIG_SCHEMA_REGISTRY_URL = "schema.registry.url"; // NOPMD + protected final String inputTopic; protected final String bootstrapServer; // Application Configurations diff --git a/theodolite-benchmarks/uc4-beam/src/main/java/application/DuplicateAsFlatMap.java b/theodolite-benchmarks/uc4-beam/src/main/java/application/DuplicateAsFlatMap.java index cf25f043ecef8c4bc564cff454d6477d69c945aa..347d76dfb3d1d1f09f1091296a322a23bba67ec0 100644 --- a/theodolite-benchmarks/uc4-beam/src/main/java/application/DuplicateAsFlatMap.java +++ b/theodolite-benchmarks/uc4-beam/src/main/java/application/DuplicateAsFlatMap.java @@ -14,13 +14,18 @@ import titan.ccp.model.records.ActivePowerRecord; /** - * Duplicates the Kv containing the (Children,Parents) pair as a flat map. + * Duplicates the {@link KV} containing the (children,parents) pairs as flatMap. */ public class DuplicateAsFlatMap extends DoFn<KV<String, ActivePowerRecord>, KV<SensorParentKey, ActivePowerRecord>> { + private static final long serialVersionUID = -5132355515723961647L; - @StateId("parents") - private final StateSpec<ValueState<Set<String>>> parents = StateSpecs.value();// NOPMD + + private static final String STATE_STORE_NAME = "DuplicateParents"; + + @StateId(STATE_STORE_NAME) + private final StateSpec<ValueState<Set<String>>> parents = StateSpecs.value(); // NOPMD + private final PCollectionView<Map<String, Set<String>>> childParentPairMap; public DuplicateAsFlatMap(final PCollectionView<Map<String, Set<String>>> childParentPairMap) { @@ -28,7 +33,6 @@ public class DuplicateAsFlatMap this.childParentPairMap = childParentPairMap; } - /** * Generate a KV-pair for every child-parent match. */ @@ -36,7 +40,7 @@ public class DuplicateAsFlatMap public void processElement( @Element final KV<String, ActivePowerRecord> kv, final OutputReceiver<KV<SensorParentKey, ActivePowerRecord>> out, - @StateId("parents") final ValueState<Set<String>> state, + @StateId(STATE_STORE_NAME) final ValueState<Set<String>> state, final ProcessContext c) { final ActivePowerRecord record = kv.getValue() == null ? null : kv.getValue(); diff --git a/theodolite-benchmarks/uc4-beam/src/main/java/application/UpdateChildParentPairs.java b/theodolite-benchmarks/uc4-beam/src/main/java/application/UpdateChildParentPairs.java index 8692be5ae6637ebda86f10d66b43c6071264e099..cff04e132a93f6c8098c3039232dd48084e6d264 100644 --- a/theodolite-benchmarks/uc4-beam/src/main/java/application/UpdateChildParentPairs.java +++ b/theodolite-benchmarks/uc4-beam/src/main/java/application/UpdateChildParentPairs.java @@ -12,11 +12,12 @@ import org.apache.beam.sdk.values.KV; */ public class UpdateChildParentPairs extends DoFn<KV<String, Set<String>>, KV<String, Set<String>>> { + private static final String STATE_STORE_NAME = "UpdateParents"; + private static final long serialVersionUID = 1L; - @StateId("parents") - private final StateSpec<ValueState<Set<String>>> parents = // NOPMD - StateSpecs.value(); + @StateId(STATE_STORE_NAME) + private final StateSpec<ValueState<Set<String>>> parents = StateSpecs.value(); // NOPMD /** * Match the changes accordingly. @@ -24,9 +25,10 @@ public class UpdateChildParentPairs extends DoFn<KV<String, Set<String>>, KV<Str * @param kv the sensor parents set that contains the changes. */ @ProcessElement - public void processElement(@Element final KV<String, Set<String>> kv, + public void processElement( + @Element final KV<String, Set<String>> kv, final OutputReceiver<KV<String, Set<String>>> out, - @StateId("parents") final ValueState<Set<String>> state) { + @StateId(STATE_STORE_NAME) final ValueState<Set<String>> state) { if (kv.getValue() == null || !kv.getValue().equals(state.read())) { out.output(kv); state.write(kv.getValue()); diff --git a/theodolite-benchmarks/uc4-beam/src/main/java/serialization/AggregatedActivePowerRecordCoder.java b/theodolite-benchmarks/uc4-beam/src/main/java/serialization/AggregatedActivePowerRecordCoder.java index 28f3d70b4a19756454fe4564b6f85599e7f17777..3e0be0fa456efa3ec67504ea9d0e285ae8b3b913 100644 --- a/theodolite-benchmarks/uc4-beam/src/main/java/serialization/AggregatedActivePowerRecordCoder.java +++ b/theodolite-benchmarks/uc4-beam/src/main/java/serialization/AggregatedActivePowerRecordCoder.java @@ -44,7 +44,7 @@ public class AggregatedActivePowerRecordCoder extends Coder<AggregatedActivePowe @Override public List<? extends Coder<?>> getCoderArguments() { - return null; + return List.of(); } @Override