From 9802b6f4c33828e67928f5373fa537d47e30bdbd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6ren=20Henning?= <soeren.henning@email.uni-kiel.de> Date: Thu, 11 Mar 2021 17:43:34 +0100 Subject: [PATCH] Improve code quality --- .../AggregationServiceFlinkJob.java | 33 ++--------- .../ChildParentsFlatMapFunction.java | 35 +++++------ .../JoinAndDuplicateCoFlatMapFunction.java | 24 +++++--- ...uplicateKeyedBroadcastProcessFunction.java | 58 ------------------- ...ecordAggregationProcessWindowFunction.java | 9 ++- .../ImmutableSensorRegistrySerializer.java | 3 + .../util/ImmutableSetSerializer.java | 3 + 7 files changed, 53 insertions(+), 112 deletions(-) delete mode 100644 theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/JoinAndDuplicateKeyedBroadcastProcessFunction.java diff --git a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/AggregationServiceFlinkJob.java b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/AggregationServiceFlinkJob.java index f720b1ab5..d76c68c14 100644 --- a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/AggregationServiceFlinkJob.java +++ b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/AggregationServiceFlinkJob.java @@ -161,7 +161,7 @@ public class AggregationServiceFlinkJob { try { env.setStateBackend(new RocksDBStateBackend(stateBackendPath, true)); } catch (final IOException e) { - e.printStackTrace(); + LOGGER.error("Cannot create RocksDB state backend.", e); } } else { env.setStateBackend(new MemoryStateBackend(memoryStateBackendSize)); @@ -177,7 +177,7 @@ public class AggregationServiceFlinkJob { new ImmutableSetSerializer()); env.getConfig().registerTypeWithKryoSerializer(Set.of(1).getClass(), new ImmutableSetSerializer()); - env.getConfig().registerTypeWithKryoSerializer(Set.of(1, 2, 3, 4).getClass(), + env.getConfig().registerTypeWithKryoSerializer(Set.of(1, 2, 3, 4).getClass(), // NOCS new ImmutableSetSerializer()); env.getConfig().getRegisteredTypesWithKryoSerializers() @@ -188,14 +188,14 @@ public class AggregationServiceFlinkJob { // Build input stream final DataStream<ActivePowerRecord> inputStream = env.addSource(kafkaInputSource) - .name("[Kafka Consumer] Topic: " + inputTopic) + .name("[Kafka Consumer] Topic: " + inputTopic)// NOCS .rebalance() .map(r -> r) .name("[Map] Rebalance Forward"); // Build aggregation stream final DataStream<ActivePowerRecord> aggregationsInputStream = env.addSource(kafkaOutputSource) - .name("[Kafka Consumer] Topic: " + outputTopic) + .name("[Kafka Consumer] Topic: " + outputTopic) // NOCS .rebalance() .map(r -> new ActivePowerRecord(r.getIdentifier(), r.getTimestamp(), r.getSumInW())) .name("[Map] AggregatedActivePowerRecord -> ActivePowerRecord"); @@ -210,11 +210,10 @@ public class AggregationServiceFlinkJob { // Build parent sensor stream from configuration stream final DataStream<Tuple2<String, Set<String>>> configurationsStream = env.addSource(kafkaConfigSource) - .name("[Kafka Consumer] Topic: " + configurationTopic) + .name("[Kafka Consumer] Topic: " + configurationTopic) // NOCS .filter(tuple -> tuple.f0 == Event.SENSOR_REGISTRY_CHANGED || tuple.f0 == Event.SENSOR_REGISTRY_STATUS) .name("[Filter] SensorRegistry changed") - // Tuple2<Event, String> -> SensorRegistry .map(tuple -> SensorRegistry.fromJson(tuple.f1)).name("[Map] JSON -> SensorRegistry") .keyBy(sr -> 1) .flatMap(new ChildParentsFlatMapFunction()) @@ -227,28 +226,9 @@ public class AggregationServiceFlinkJob { .flatMap(new JoinAndDuplicateCoFlatMapFunction()) .name("[CoFlatMap] Join input-config, Flatten to ((Sensor, Group), ActivePowerRecord)"); - // KeyedStream<ActivePowerRecord, String> keyedStream = - // mergedInputStream.keyBy(ActivePowerRecord::getIdentifier); - // - // MapStateDescriptor<String, Set<String>> sensorConfigStateDescriptor = - // new MapStateDescriptor<>( - // "join-and-duplicate-state", - // BasicTypeInfo.STRING_TYPE_INFO, - // TypeInformation.of(new TypeHint<Set<String>>() {})); - // - // BroadcastStream<Tuple2<String, Set<String>>> broadcastStream = - // configurationsStream.keyBy(t -> t.f0).broadcast(sensorConfigStateDescriptor); - // - // DataStream<Tuple2<SensorParentKey, ActivePowerRecord>> lastValueStream = - // keyedStream.connect(broadcastStream) - // .process(new JoinAndDuplicateKeyedBroadcastProcessFunction()); - if (debug) { lastValueStream - .map(t -> "<" + t.f0.getSensor() + "|" + t.f0.getParent() + ">" + "ActivePowerRecord {" - + "identifier: " + t.f1.getIdentifier() + ", " - + "timestamp: " + t.f1.getTimestamp() + ", " - + "valueInW: " + t.f1.getValueInW() + " }") + .map(t -> "<" + t.f0.getSensor() + "|" + t.f0.getParent() + ">" + t.f1) .print(); } @@ -262,7 +242,6 @@ public class AggregationServiceFlinkJob { // add Kafka Sink aggregationStream - // AggregatedActivePowerRecord -> Tuple2<String, AggregatedActivePowerRecord> .map(value -> new Tuple2<>(value.getIdentifier(), value)) .name("[Map] AggregatedActivePowerRecord -> (Sensor, AggregatedActivePowerRecord)") .returns(Types.TUPLE(Types.STRING, TypeInformation.of(AggregatedActivePowerRecord.class))) diff --git a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/ChildParentsFlatMapFunction.java b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/ChildParentsFlatMapFunction.java index a7e5ac28f..910dc359f 100644 --- a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/ChildParentsFlatMapFunction.java +++ b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/ChildParentsFlatMapFunction.java @@ -1,5 +1,10 @@ package theodolite.uc4.application; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.state.MapState; import org.apache.flink.api.common.state.MapStateDescriptor; @@ -12,36 +17,31 @@ import titan.ccp.model.sensorregistry.AggregatedSensor; import titan.ccp.model.sensorregistry.Sensor; import titan.ccp.model.sensorregistry.SensorRegistry; -import java.util.Iterator; -import java.util.Map; -import java.util.Set; -import java.util.stream.Collectors; -import java.util.stream.Stream; - /** * Transforms a {@link SensorRegistry} into key value pairs of Sensor identifiers and their parents' * sensor identifiers. All pairs whose sensor's parents have changed since last iteration are * forwarded. A mapping of an identifier to <code>null</code> means that the corresponding sensor * does not longer exists in the sensor registry. */ -public class ChildParentsFlatMapFunction extends RichFlatMapFunction<SensorRegistry, Tuple2<String, Set<String>>> { +public class ChildParentsFlatMapFunction + extends RichFlatMapFunction<SensorRegistry, Tuple2<String, Set<String>>> { - private static final long serialVersionUID = 3969444219510915221L; //NOPMD + private static final long serialVersionUID = 3969444219510915221L; // NOPMD private transient MapState<String, Set<String>> state; @Override - public void open(Configuration parameters) { - MapStateDescriptor<String, Set<String>> descriptor = - new MapStateDescriptor<String, Set<String>>( + public void open(final Configuration parameters) { + final MapStateDescriptor<String, Set<String>> descriptor = + new MapStateDescriptor<>( "child-parents-state", - TypeInformation.of(new TypeHint<String>(){}), - TypeInformation.of(new TypeHint<Set<String>>(){})); - this.state = getRuntimeContext().getMapState(descriptor); + TypeInformation.of(new TypeHint<String>() {}), + TypeInformation.of(new TypeHint<Set<String>>() {})); + this.state = this.getRuntimeContext().getMapState(descriptor); } @Override - public void flatMap(SensorRegistry value, Collector<Tuple2<String, Set<String>>> out) + public void flatMap(final SensorRegistry value, final Collector<Tuple2<String, Set<String>>> out) throws Exception { final Map<String, Set<String>> childParentsPairs = this.constructChildParentsPairs(value); this.updateChildParentsPairs(childParentsPairs); @@ -71,7 +71,7 @@ public class ChildParentsFlatMapFunction extends RichFlatMapFunction<SensorRegis } private void updateChildParentsPairs(final Map<String, Set<String>> childParentsPairs) - throws Exception { + throws Exception { // NOPMD General exception thown by Flink final Iterator<Map.Entry<String, Set<String>>> oldChildParentsPairs = this.state.iterator(); while (oldChildParentsPairs.hasNext()) { final Map.Entry<String, Set<String>> oldChildParentPair = oldChildParentsPairs.next(); @@ -89,7 +89,8 @@ public class ChildParentsFlatMapFunction extends RichFlatMapFunction<SensorRegis } } - private void updateState(final Map<String, Set<String>> childParentsPairs) throws Exception { + private void updateState(final Map<String, Set<String>> childParentsPairs) + throws Exception { // NOPMD General exception thown by Flink for (final Map.Entry<String, Set<String>> childParentPair : childParentsPairs.entrySet()) { if (childParentPair.getValue() == null) { this.state.remove(childParentPair.getKey()); diff --git a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/JoinAndDuplicateCoFlatMapFunction.java b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/JoinAndDuplicateCoFlatMapFunction.java index dec7b417b..6ef9a72e9 100644 --- a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/JoinAndDuplicateCoFlatMapFunction.java +++ b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/JoinAndDuplicateCoFlatMapFunction.java @@ -12,6 +12,12 @@ import org.apache.flink.util.Collector; import theodolite.uc4.application.util.SensorParentKey; import titan.ccp.model.records.ActivePowerRecord; +/** + * A {@link RichCoFlatMapFunction} which joins each incoming {@link ActivePowerRecord} with its + * corresponding parents. The {@link ActivePowerRecord} is duplicated for each parent. When + * receiving a new set of parents for a sensor, this operator updates its internal state and + * forwards "tombstone" record if a sensor does no longer have a certain parent. + */ public class JoinAndDuplicateCoFlatMapFunction extends RichCoFlatMapFunction<ActivePowerRecord, Tuple2<String, Set<String>>, Tuple2<SensorParentKey, ActivePowerRecord>> { // NOCS @@ -44,17 +50,17 @@ public class JoinAndDuplicateCoFlatMapFunction extends @Override public void flatMap2(final Tuple2<String, Set<String>> value, final Collector<Tuple2<SensorParentKey, ActivePowerRecord>> out) throws Exception { - final Set<String> oldParents = this.state.get(value.f0); - if (oldParents != null) { - final Set<String> newParents = value.f1; - if (!newParents.equals(oldParents)) { - for (final String oldParent : oldParents) { - if (!newParents.contains(oldParent)) { - out.collect(new Tuple2<>(new SensorParentKey(value.f0, oldParent), null)); - } + final String sensor = value.f0; + final Set<String> oldParents = this.state.get(sensor); + final Set<String> newParents = value.f1; + if (oldParents != null && !newParents.equals(oldParents)) { + for (final String oldParent : oldParents) { + if (!newParents.contains(oldParent)) { + // Parent was deleted, emit tombstone record + out.collect(new Tuple2<>(new SensorParentKey(sensor, oldParent), null)); } } } - this.state.put(value.f0, value.f1); + this.state.put(sensor, newParents); } } diff --git a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/JoinAndDuplicateKeyedBroadcastProcessFunction.java b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/JoinAndDuplicateKeyedBroadcastProcessFunction.java deleted file mode 100644 index 96711b2f0..000000000 --- a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/JoinAndDuplicateKeyedBroadcastProcessFunction.java +++ /dev/null @@ -1,58 +0,0 @@ -package theodolite.uc4.application; - -import java.util.Set; -import org.apache.flink.api.common.state.BroadcastState; -import org.apache.flink.api.common.state.MapStateDescriptor; -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeinfo.TypeHint; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction; -import org.apache.flink.util.Collector; -import theodolite.uc4.application.util.SensorParentKey; -import titan.ccp.model.records.ActivePowerRecord; - -public class JoinAndDuplicateKeyedBroadcastProcessFunction extends - KeyedBroadcastProcessFunction<String, ActivePowerRecord, Tuple2<String, Set<String>>, Tuple2<SensorParentKey, ActivePowerRecord>> { // NOCS - - private static final long serialVersionUID = -4525438547262992821L; // NOPMD - - private final MapStateDescriptor<String, Set<String>> sensorConfigStateDescriptor = - new MapStateDescriptor<>( - "join-and-duplicate-state", - BasicTypeInfo.STRING_TYPE_INFO, - TypeInformation.of(new TypeHint<Set<String>>() {})); - - @Override - public void processElement(final ActivePowerRecord value, final ReadOnlyContext ctx, - final Collector<Tuple2<SensorParentKey, ActivePowerRecord>> out) throws Exception { - final Set<String> parents = - ctx.getBroadcastState(this.sensorConfigStateDescriptor).get(value.getIdentifier()); - if (parents == null) { - return; - } - for (final String parent : parents) { - out.collect(new Tuple2<>(new SensorParentKey(value.getIdentifier(), parent), value)); - } - } - - @Override - public void processBroadcastElement(final Tuple2<String, Set<String>> value, final Context ctx, - final Collector<Tuple2<SensorParentKey, ActivePowerRecord>> out) throws Exception { - final BroadcastState<String, Set<String>> state = - ctx.getBroadcastState(this.sensorConfigStateDescriptor); - final Set<String> oldParents = state.get(value.f0); - if (oldParents != null) { - final Set<String> newParents = value.f1; - if (!newParents.equals(oldParents)) { - for (final String oldParent : oldParents) { - if (!newParents.contains(oldParent)) { - out.collect(new Tuple2<>(new SensorParentKey(value.f0, oldParent), null)); - } - } - } - } - state.put(value.f0, value.f1); - } - -} diff --git a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/RecordAggregationProcessWindowFunction.java b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/RecordAggregationProcessWindowFunction.java index e4b545174..45d4a09d1 100644 --- a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/RecordAggregationProcessWindowFunction.java +++ b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/RecordAggregationProcessWindowFunction.java @@ -15,6 +15,11 @@ import theodolite.uc4.application.util.SensorParentKey; import titan.ccp.model.records.ActivePowerRecord; import titan.ccp.model.records.AggregatedActivePowerRecord; +/** + * A {@link ProcessWindowFunction} which performs the windowed aggregation of all + * {@link ActivePowerRecord} for the same {@link SensorParentKey}. Result of this aggregation is an + * {@link AggregatedActivePowerRecord}. + */ public class RecordAggregationProcessWindowFunction extends ProcessWindowFunction<Tuple2<SensorParentKey, ActivePowerRecord>, AggregatedActivePowerRecord, String, TimeWindow> { // NOCS @@ -40,7 +45,9 @@ public class RecordAggregationProcessWindowFunction extends } @Override - public void process(final String key, final Context context, + public void process( + final String key, + final Context context, final Iterable<Tuple2<SensorParentKey, ActivePowerRecord>> elements, final Collector<AggregatedActivePowerRecord> out) throws Exception { for (final Tuple2<SensorParentKey, ActivePowerRecord> t : elements) { diff --git a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/util/ImmutableSensorRegistrySerializer.java b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/util/ImmutableSensorRegistrySerializer.java index 927006fab..e157f35c8 100644 --- a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/util/ImmutableSensorRegistrySerializer.java +++ b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/util/ImmutableSensorRegistrySerializer.java @@ -7,6 +7,9 @@ import com.esotericsoftware.kryo.io.Output; import java.io.Serializable; import titan.ccp.model.sensorregistry.ImmutableSensorRegistry; +/** + * A {@link Serializer} for {@link ImmutableSensorRegistry}s. + */ public class ImmutableSensorRegistrySerializer extends Serializer<ImmutableSensorRegistry> implements Serializable { diff --git a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/util/ImmutableSetSerializer.java b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/util/ImmutableSetSerializer.java index 913b07aaa..6b2dbcdfb 100644 --- a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/util/ImmutableSetSerializer.java +++ b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/util/ImmutableSetSerializer.java @@ -7,6 +7,9 @@ import com.esotericsoftware.kryo.io.Output; import java.io.Serializable; import java.util.Set; +/** + * A {@link Serializer} for serializing arbitrary {@link Set}s of {@link Object}s. + */ public final class ImmutableSetSerializer extends Serializer<Set<Object>> implements Serializable { private static final long serialVersionUID = 6919877826110724620L; // NOPMD -- GitLab