diff --git a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/AggregationServiceFlinkJob.java b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/AggregationServiceFlinkJob.java index f720b1ab557a11f8e5a20a36bc2b660588454e0d..d76c68c14f855b076f612cb894d0163213d830ee 100644 --- a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/AggregationServiceFlinkJob.java +++ b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/AggregationServiceFlinkJob.java @@ -161,7 +161,7 @@ public class AggregationServiceFlinkJob { try { env.setStateBackend(new RocksDBStateBackend(stateBackendPath, true)); } catch (final IOException e) { - e.printStackTrace(); + LOGGER.error("Cannot create RocksDB state backend.", e); } } else { env.setStateBackend(new MemoryStateBackend(memoryStateBackendSize)); @@ -177,7 +177,7 @@ public class AggregationServiceFlinkJob { new ImmutableSetSerializer()); env.getConfig().registerTypeWithKryoSerializer(Set.of(1).getClass(), new ImmutableSetSerializer()); - env.getConfig().registerTypeWithKryoSerializer(Set.of(1, 2, 3, 4).getClass(), + env.getConfig().registerTypeWithKryoSerializer(Set.of(1, 2, 3, 4).getClass(), // NOCS new ImmutableSetSerializer()); env.getConfig().getRegisteredTypesWithKryoSerializers() @@ -188,14 +188,14 @@ public class AggregationServiceFlinkJob { // Build input stream final DataStream<ActivePowerRecord> inputStream = env.addSource(kafkaInputSource) - .name("[Kafka Consumer] Topic: " + inputTopic) + .name("[Kafka Consumer] Topic: " + inputTopic)// NOCS .rebalance() .map(r -> r) .name("[Map] Rebalance Forward"); // Build aggregation stream final DataStream<ActivePowerRecord> aggregationsInputStream = env.addSource(kafkaOutputSource) - .name("[Kafka Consumer] Topic: " + outputTopic) + .name("[Kafka Consumer] Topic: " + outputTopic) // NOCS .rebalance() .map(r -> new ActivePowerRecord(r.getIdentifier(), r.getTimestamp(), r.getSumInW())) .name("[Map] AggregatedActivePowerRecord -> ActivePowerRecord"); @@ -210,11 +210,10 @@ public class AggregationServiceFlinkJob { // Build parent sensor stream from configuration stream final DataStream<Tuple2<String, Set<String>>> configurationsStream = env.addSource(kafkaConfigSource) - .name("[Kafka Consumer] Topic: " + configurationTopic) + .name("[Kafka Consumer] Topic: " + configurationTopic) // NOCS .filter(tuple -> tuple.f0 == Event.SENSOR_REGISTRY_CHANGED || tuple.f0 == Event.SENSOR_REGISTRY_STATUS) .name("[Filter] SensorRegistry changed") - // Tuple2<Event, String> -> SensorRegistry .map(tuple -> SensorRegistry.fromJson(tuple.f1)).name("[Map] JSON -> SensorRegistry") .keyBy(sr -> 1) .flatMap(new ChildParentsFlatMapFunction()) @@ -227,28 +226,9 @@ public class AggregationServiceFlinkJob { .flatMap(new JoinAndDuplicateCoFlatMapFunction()) .name("[CoFlatMap] Join input-config, Flatten to ((Sensor, Group), ActivePowerRecord)"); - // KeyedStream<ActivePowerRecord, String> keyedStream = - // mergedInputStream.keyBy(ActivePowerRecord::getIdentifier); - // - // MapStateDescriptor<String, Set<String>> sensorConfigStateDescriptor = - // new MapStateDescriptor<>( - // "join-and-duplicate-state", - // BasicTypeInfo.STRING_TYPE_INFO, - // TypeInformation.of(new TypeHint<Set<String>>() {})); - // - // BroadcastStream<Tuple2<String, Set<String>>> broadcastStream = - // configurationsStream.keyBy(t -> t.f0).broadcast(sensorConfigStateDescriptor); - // - // DataStream<Tuple2<SensorParentKey, ActivePowerRecord>> lastValueStream = - // keyedStream.connect(broadcastStream) - // .process(new JoinAndDuplicateKeyedBroadcastProcessFunction()); - if (debug) { lastValueStream - .map(t -> "<" + t.f0.getSensor() + "|" + t.f0.getParent() + ">" + "ActivePowerRecord {" - + "identifier: " + t.f1.getIdentifier() + ", " - + "timestamp: " + t.f1.getTimestamp() + ", " - + "valueInW: " + t.f1.getValueInW() + " }") + .map(t -> "<" + t.f0.getSensor() + "|" + t.f0.getParent() + ">" + t.f1) .print(); } @@ -262,7 +242,6 @@ public class AggregationServiceFlinkJob { // add Kafka Sink aggregationStream - // AggregatedActivePowerRecord -> Tuple2<String, AggregatedActivePowerRecord> .map(value -> new Tuple2<>(value.getIdentifier(), value)) .name("[Map] AggregatedActivePowerRecord -> (Sensor, AggregatedActivePowerRecord)") .returns(Types.TUPLE(Types.STRING, TypeInformation.of(AggregatedActivePowerRecord.class))) diff --git a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/ChildParentsFlatMapFunction.java b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/ChildParentsFlatMapFunction.java index a7e5ac28ff08c17bbd7911c02cde7bee1316c823..910dc359fa9b5b0810f7f9b6e67bfceaa68cc798 100644 --- a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/ChildParentsFlatMapFunction.java +++ b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/ChildParentsFlatMapFunction.java @@ -1,5 +1,10 @@ package theodolite.uc4.application; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.state.MapState; import org.apache.flink.api.common.state.MapStateDescriptor; @@ -12,36 +17,31 @@ import titan.ccp.model.sensorregistry.AggregatedSensor; import titan.ccp.model.sensorregistry.Sensor; import titan.ccp.model.sensorregistry.SensorRegistry; -import java.util.Iterator; -import java.util.Map; -import java.util.Set; -import java.util.stream.Collectors; -import java.util.stream.Stream; - /** * Transforms a {@link SensorRegistry} into key value pairs of Sensor identifiers and their parents' * sensor identifiers. All pairs whose sensor's parents have changed since last iteration are * forwarded. A mapping of an identifier to <code>null</code> means that the corresponding sensor * does not longer exists in the sensor registry. */ -public class ChildParentsFlatMapFunction extends RichFlatMapFunction<SensorRegistry, Tuple2<String, Set<String>>> { +public class ChildParentsFlatMapFunction + extends RichFlatMapFunction<SensorRegistry, Tuple2<String, Set<String>>> { - private static final long serialVersionUID = 3969444219510915221L; //NOPMD + private static final long serialVersionUID = 3969444219510915221L; // NOPMD private transient MapState<String, Set<String>> state; @Override - public void open(Configuration parameters) { - MapStateDescriptor<String, Set<String>> descriptor = - new MapStateDescriptor<String, Set<String>>( + public void open(final Configuration parameters) { + final MapStateDescriptor<String, Set<String>> descriptor = + new MapStateDescriptor<>( "child-parents-state", - TypeInformation.of(new TypeHint<String>(){}), - TypeInformation.of(new TypeHint<Set<String>>(){})); - this.state = getRuntimeContext().getMapState(descriptor); + TypeInformation.of(new TypeHint<String>() {}), + TypeInformation.of(new TypeHint<Set<String>>() {})); + this.state = this.getRuntimeContext().getMapState(descriptor); } @Override - public void flatMap(SensorRegistry value, Collector<Tuple2<String, Set<String>>> out) + public void flatMap(final SensorRegistry value, final Collector<Tuple2<String, Set<String>>> out) throws Exception { final Map<String, Set<String>> childParentsPairs = this.constructChildParentsPairs(value); this.updateChildParentsPairs(childParentsPairs); @@ -71,7 +71,7 @@ public class ChildParentsFlatMapFunction extends RichFlatMapFunction<SensorRegis } private void updateChildParentsPairs(final Map<String, Set<String>> childParentsPairs) - throws Exception { + throws Exception { // NOPMD General exception thown by Flink final Iterator<Map.Entry<String, Set<String>>> oldChildParentsPairs = this.state.iterator(); while (oldChildParentsPairs.hasNext()) { final Map.Entry<String, Set<String>> oldChildParentPair = oldChildParentsPairs.next(); @@ -89,7 +89,8 @@ public class ChildParentsFlatMapFunction extends RichFlatMapFunction<SensorRegis } } - private void updateState(final Map<String, Set<String>> childParentsPairs) throws Exception { + private void updateState(final Map<String, Set<String>> childParentsPairs) + throws Exception { // NOPMD General exception thown by Flink for (final Map.Entry<String, Set<String>> childParentPair : childParentsPairs.entrySet()) { if (childParentPair.getValue() == null) { this.state.remove(childParentPair.getKey()); diff --git a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/JoinAndDuplicateCoFlatMapFunction.java b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/JoinAndDuplicateCoFlatMapFunction.java index dec7b417b2683f95f363547fd4f76acf49195c4d..6ef9a72e9695cfccba0bbcca1238f7ebc94fc505 100644 --- a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/JoinAndDuplicateCoFlatMapFunction.java +++ b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/JoinAndDuplicateCoFlatMapFunction.java @@ -12,6 +12,12 @@ import org.apache.flink.util.Collector; import theodolite.uc4.application.util.SensorParentKey; import titan.ccp.model.records.ActivePowerRecord; +/** + * A {@link RichCoFlatMapFunction} which joins each incoming {@link ActivePowerRecord} with its + * corresponding parents. The {@link ActivePowerRecord} is duplicated for each parent. When + * receiving a new set of parents for a sensor, this operator updates its internal state and + * forwards "tombstone" record if a sensor does no longer have a certain parent. + */ public class JoinAndDuplicateCoFlatMapFunction extends RichCoFlatMapFunction<ActivePowerRecord, Tuple2<String, Set<String>>, Tuple2<SensorParentKey, ActivePowerRecord>> { // NOCS @@ -44,17 +50,17 @@ public class JoinAndDuplicateCoFlatMapFunction extends @Override public void flatMap2(final Tuple2<String, Set<String>> value, final Collector<Tuple2<SensorParentKey, ActivePowerRecord>> out) throws Exception { - final Set<String> oldParents = this.state.get(value.f0); - if (oldParents != null) { - final Set<String> newParents = value.f1; - if (!newParents.equals(oldParents)) { - for (final String oldParent : oldParents) { - if (!newParents.contains(oldParent)) { - out.collect(new Tuple2<>(new SensorParentKey(value.f0, oldParent), null)); - } + final String sensor = value.f0; + final Set<String> oldParents = this.state.get(sensor); + final Set<String> newParents = value.f1; + if (oldParents != null && !newParents.equals(oldParents)) { + for (final String oldParent : oldParents) { + if (!newParents.contains(oldParent)) { + // Parent was deleted, emit tombstone record + out.collect(new Tuple2<>(new SensorParentKey(sensor, oldParent), null)); } } } - this.state.put(value.f0, value.f1); + this.state.put(sensor, newParents); } } diff --git a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/JoinAndDuplicateKeyedBroadcastProcessFunction.java b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/JoinAndDuplicateKeyedBroadcastProcessFunction.java deleted file mode 100644 index 96711b2f09ad9fd6b0b2b3f98687acbf2e4c8c68..0000000000000000000000000000000000000000 --- a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/JoinAndDuplicateKeyedBroadcastProcessFunction.java +++ /dev/null @@ -1,58 +0,0 @@ -package theodolite.uc4.application; - -import java.util.Set; -import org.apache.flink.api.common.state.BroadcastState; -import org.apache.flink.api.common.state.MapStateDescriptor; -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeinfo.TypeHint; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction; -import org.apache.flink.util.Collector; -import theodolite.uc4.application.util.SensorParentKey; -import titan.ccp.model.records.ActivePowerRecord; - -public class JoinAndDuplicateKeyedBroadcastProcessFunction extends - KeyedBroadcastProcessFunction<String, ActivePowerRecord, Tuple2<String, Set<String>>, Tuple2<SensorParentKey, ActivePowerRecord>> { // NOCS - - private static final long serialVersionUID = -4525438547262992821L; // NOPMD - - private final MapStateDescriptor<String, Set<String>> sensorConfigStateDescriptor = - new MapStateDescriptor<>( - "join-and-duplicate-state", - BasicTypeInfo.STRING_TYPE_INFO, - TypeInformation.of(new TypeHint<Set<String>>() {})); - - @Override - public void processElement(final ActivePowerRecord value, final ReadOnlyContext ctx, - final Collector<Tuple2<SensorParentKey, ActivePowerRecord>> out) throws Exception { - final Set<String> parents = - ctx.getBroadcastState(this.sensorConfigStateDescriptor).get(value.getIdentifier()); - if (parents == null) { - return; - } - for (final String parent : parents) { - out.collect(new Tuple2<>(new SensorParentKey(value.getIdentifier(), parent), value)); - } - } - - @Override - public void processBroadcastElement(final Tuple2<String, Set<String>> value, final Context ctx, - final Collector<Tuple2<SensorParentKey, ActivePowerRecord>> out) throws Exception { - final BroadcastState<String, Set<String>> state = - ctx.getBroadcastState(this.sensorConfigStateDescriptor); - final Set<String> oldParents = state.get(value.f0); - if (oldParents != null) { - final Set<String> newParents = value.f1; - if (!newParents.equals(oldParents)) { - for (final String oldParent : oldParents) { - if (!newParents.contains(oldParent)) { - out.collect(new Tuple2<>(new SensorParentKey(value.f0, oldParent), null)); - } - } - } - } - state.put(value.f0, value.f1); - } - -} diff --git a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/RecordAggregationProcessWindowFunction.java b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/RecordAggregationProcessWindowFunction.java index e4b545174861a70a49a0955f95b5bd7e14b2dfb6..45d4a09d153881572c949d2af7542f9cffb5622d 100644 --- a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/RecordAggregationProcessWindowFunction.java +++ b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/RecordAggregationProcessWindowFunction.java @@ -15,6 +15,11 @@ import theodolite.uc4.application.util.SensorParentKey; import titan.ccp.model.records.ActivePowerRecord; import titan.ccp.model.records.AggregatedActivePowerRecord; +/** + * A {@link ProcessWindowFunction} which performs the windowed aggregation of all + * {@link ActivePowerRecord} for the same {@link SensorParentKey}. Result of this aggregation is an + * {@link AggregatedActivePowerRecord}. + */ public class RecordAggregationProcessWindowFunction extends ProcessWindowFunction<Tuple2<SensorParentKey, ActivePowerRecord>, AggregatedActivePowerRecord, String, TimeWindow> { // NOCS @@ -40,7 +45,9 @@ public class RecordAggregationProcessWindowFunction extends } @Override - public void process(final String key, final Context context, + public void process( + final String key, + final Context context, final Iterable<Tuple2<SensorParentKey, ActivePowerRecord>> elements, final Collector<AggregatedActivePowerRecord> out) throws Exception { for (final Tuple2<SensorParentKey, ActivePowerRecord> t : elements) { diff --git a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/util/ImmutableSensorRegistrySerializer.java b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/util/ImmutableSensorRegistrySerializer.java index 927006fabd56ab6c24532cd71184e6f7c20094ac..e157f35c8a052d2d4a28526a0d98d56515d586d6 100644 --- a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/util/ImmutableSensorRegistrySerializer.java +++ b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/util/ImmutableSensorRegistrySerializer.java @@ -7,6 +7,9 @@ import com.esotericsoftware.kryo.io.Output; import java.io.Serializable; import titan.ccp.model.sensorregistry.ImmutableSensorRegistry; +/** + * A {@link Serializer} for {@link ImmutableSensorRegistry}s. + */ public class ImmutableSensorRegistrySerializer extends Serializer<ImmutableSensorRegistry> implements Serializable { diff --git a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/util/ImmutableSetSerializer.java b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/util/ImmutableSetSerializer.java index 913b07aaafcaad3b5247fd4bf13ec64df3469312..6b2dbcdfb403705b39815dd31112deab7947d83d 100644 --- a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/util/ImmutableSetSerializer.java +++ b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/util/ImmutableSetSerializer.java @@ -7,6 +7,9 @@ import com.esotericsoftware.kryo.io.Output; import java.io.Serializable; import java.util.Set; +/** + * A {@link Serializer} for serializing arbitrary {@link Set}s of {@link Object}s. + */ public final class ImmutableSetSerializer extends Serializer<Set<Object>> implements Serializable { private static final long serialVersionUID = 6919877826110724620L; // NOPMD