Skip to content
Snippets Groups Projects

Upgrade Kafka Streams Benchmarks to Kafka Streams 3.1

Merged Sören Henning requested to merge upgrade-kstreams into upgrade-flink
2 files
+ 15
10
Compare changes
  • Side-by-side
  • Inline
Files
2
@@ -14,13 +14,17 @@ import titan.ccp.model.records.ActivePowerRecord;
/**
* Duplicates the Kv containing the (Children,Parents) pair as a flat map.
* Duplicates the {@link KV} containing the (children,parents) pairs as flatMap.
*/
public class DuplicateAsFlatMap
extends DoFn<KV<String, ActivePowerRecord>, KV<SensorParentKey, ActivePowerRecord>> {
private static final long serialVersionUID = -5132355515723961647L;
@StateId("parents")
private final StateSpec<ValueState<Set<String>>> parents = StateSpecs.value();// NOPMD
private static final String STATE_STORE_NAME = "DuplicateParents";
@StateId(STATE_STORE_NAME)
private final StateSpec<ValueState<Set<String>>> parents = StateSpecs.value();
private final PCollectionView<Map<String, Set<String>>> childParentPairMap;
public DuplicateAsFlatMap(final PCollectionView<Map<String, Set<String>>> childParentPairMap) {
@@ -28,7 +32,6 @@ public class DuplicateAsFlatMap
this.childParentPairMap = childParentPairMap;
}
/**
* Generate a KV-pair for every child-parent match.
*/
@@ -36,7 +39,7 @@ public class DuplicateAsFlatMap
public void processElement(
@Element final KV<String, ActivePowerRecord> kv,
final OutputReceiver<KV<SensorParentKey, ActivePowerRecord>> out,
@StateId("parents") final ValueState<Set<String>> state,
@StateId(STATE_STORE_NAME) final ValueState<Set<String>> state,
final ProcessContext c) {
final ActivePowerRecord record = kv.getValue() == null ? null : kv.getValue();
Loading