Skip to content
Snippets Groups Projects
Commit c045452e authored by Sören Henning's avatar Sören Henning
Browse files

Merge branch 'master' into benchmark-smoke-tests

parents 932ae9e1 47ec6dbc
No related branches found
No related tags found
1 merge request!232Add smoke tests for benchmark
......@@ -12,8 +12,9 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
*/
public class AbstractPipeline extends Pipeline {
private static final String KAFKA_CONFIG_SPECIFIC_AVRO_READER = "specific.avro.reader";
private static final String KAFKA_CONFIG_SCHEMA_REGISTRY_URL = "schema.registry.url";
private static final String KAFKA_CONFIG_SPECIFIC_AVRO_READER = "specific.avro.reader"; // NOPMD
private static final String KAFKA_CONFIG_SCHEMA_REGISTRY_URL = "schema.registry.url"; // NOPMD
protected final String inputTopic;
protected final String bootstrapServer;
// Application Configurations
......
......
......@@ -14,13 +14,18 @@ import titan.ccp.model.records.ActivePowerRecord;
/**
* Duplicates the Kv containing the (Children,Parents) pair as a flat map.
* Duplicates the {@link KV} containing the (children,parents) pairs as flatMap.
*/
public class DuplicateAsFlatMap
extends DoFn<KV<String, ActivePowerRecord>, KV<SensorParentKey, ActivePowerRecord>> {
private static final long serialVersionUID = -5132355515723961647L;
@StateId("parents")
private static final String STATE_STORE_NAME = "DuplicateParents";
@StateId(STATE_STORE_NAME)
private final StateSpec<ValueState<Set<String>>> parents = StateSpecs.value(); // NOPMD
private final PCollectionView<Map<String, Set<String>>> childParentPairMap;
public DuplicateAsFlatMap(final PCollectionView<Map<String, Set<String>>> childParentPairMap) {
......@@ -28,7 +33,6 @@ public class DuplicateAsFlatMap
this.childParentPairMap = childParentPairMap;
}
/**
* Generate a KV-pair for every child-parent match.
*/
......@@ -36,7 +40,7 @@ public class DuplicateAsFlatMap
public void processElement(
@Element final KV<String, ActivePowerRecord> kv,
final OutputReceiver<KV<SensorParentKey, ActivePowerRecord>> out,
@StateId("parents") final ValueState<Set<String>> state,
@StateId(STATE_STORE_NAME) final ValueState<Set<String>> state,
final ProcessContext c) {
final ActivePowerRecord record = kv.getValue() == null ? null : kv.getValue();
......
......
......@@ -12,11 +12,12 @@ import org.apache.beam.sdk.values.KV;
*/
public class UpdateChildParentPairs extends DoFn<KV<String, Set<String>>, KV<String, Set<String>>> {
private static final String STATE_STORE_NAME = "UpdateParents";
private static final long serialVersionUID = 1L;
@StateId("parents")
private final StateSpec<ValueState<Set<String>>> parents = // NOPMD
StateSpecs.value();
@StateId(STATE_STORE_NAME)
private final StateSpec<ValueState<Set<String>>> parents = StateSpecs.value(); // NOPMD
/**
* Match the changes accordingly.
......@@ -24,9 +25,10 @@ public class UpdateChildParentPairs extends DoFn<KV<String, Set<String>>, KV<Str
* @param kv the sensor parents set that contains the changes.
*/
@ProcessElement
public void processElement(@Element final KV<String, Set<String>> kv,
public void processElement(
@Element final KV<String, Set<String>> kv,
final OutputReceiver<KV<String, Set<String>>> out,
@StateId("parents") final ValueState<Set<String>> state) {
@StateId(STATE_STORE_NAME) final ValueState<Set<String>> state) {
if (kv.getValue() == null || !kv.getValue().equals(state.read())) {
out.output(kv);
state.write(kv.getValue());
......
......
......@@ -44,7 +44,7 @@ public class AggregatedActivePowerRecordCoder extends Coder<AggregatedActivePowe
@Override
public List<? extends Coder<?>> getCoderArguments() {
return null;
return List.of();
}
@Override
......
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment