Skip to content
Snippets Groups Projects
Commit 9802b6f4 authored by Sören Henning's avatar Sören Henning
Browse files

Improve code quality

parent efc5a96b
No related branches found
No related tags found
1 merge request!90Migrate Flink benchmark implementation
Pipeline #2253 failed
This commit is part of merge request !90. Comments created here will be created in the context of that merge request.
Showing
with 53 additions and 112 deletions
......@@ -161,7 +161,7 @@ public class AggregationServiceFlinkJob {
try {
env.setStateBackend(new RocksDBStateBackend(stateBackendPath, true));
} catch (final IOException e) {
e.printStackTrace();
LOGGER.error("Cannot create RocksDB state backend.", e);
}
} else {
env.setStateBackend(new MemoryStateBackend(memoryStateBackendSize));
......@@ -177,7 +177,7 @@ public class AggregationServiceFlinkJob {
new ImmutableSetSerializer());
env.getConfig().registerTypeWithKryoSerializer(Set.of(1).getClass(),
new ImmutableSetSerializer());
env.getConfig().registerTypeWithKryoSerializer(Set.of(1, 2, 3, 4).getClass(),
env.getConfig().registerTypeWithKryoSerializer(Set.of(1, 2, 3, 4).getClass(), // NOCS
new ImmutableSetSerializer());
env.getConfig().getRegisteredTypesWithKryoSerializers()
......@@ -188,14 +188,14 @@ public class AggregationServiceFlinkJob {
// Build input stream
final DataStream<ActivePowerRecord> inputStream = env.addSource(kafkaInputSource)
.name("[Kafka Consumer] Topic: " + inputTopic)
.name("[Kafka Consumer] Topic: " + inputTopic)// NOCS
.rebalance()
.map(r -> r)
.name("[Map] Rebalance Forward");
// Build aggregation stream
final DataStream<ActivePowerRecord> aggregationsInputStream = env.addSource(kafkaOutputSource)
.name("[Kafka Consumer] Topic: " + outputTopic)
.name("[Kafka Consumer] Topic: " + outputTopic) // NOCS
.rebalance()
.map(r -> new ActivePowerRecord(r.getIdentifier(), r.getTimestamp(), r.getSumInW()))
.name("[Map] AggregatedActivePowerRecord -> ActivePowerRecord");
......@@ -210,11 +210,10 @@ public class AggregationServiceFlinkJob {
// Build parent sensor stream from configuration stream
final DataStream<Tuple2<String, Set<String>>> configurationsStream =
env.addSource(kafkaConfigSource)
.name("[Kafka Consumer] Topic: " + configurationTopic)
.name("[Kafka Consumer] Topic: " + configurationTopic) // NOCS
.filter(tuple -> tuple.f0 == Event.SENSOR_REGISTRY_CHANGED
|| tuple.f0 == Event.SENSOR_REGISTRY_STATUS)
.name("[Filter] SensorRegistry changed")
// Tuple2<Event, String> -> SensorRegistry
.map(tuple -> SensorRegistry.fromJson(tuple.f1)).name("[Map] JSON -> SensorRegistry")
.keyBy(sr -> 1)
.flatMap(new ChildParentsFlatMapFunction())
......@@ -227,28 +226,9 @@ public class AggregationServiceFlinkJob {
.flatMap(new JoinAndDuplicateCoFlatMapFunction())
.name("[CoFlatMap] Join input-config, Flatten to ((Sensor, Group), ActivePowerRecord)");
// KeyedStream<ActivePowerRecord, String> keyedStream =
// mergedInputStream.keyBy(ActivePowerRecord::getIdentifier);
//
// MapStateDescriptor<String, Set<String>> sensorConfigStateDescriptor =
// new MapStateDescriptor<>(
// "join-and-duplicate-state",
// BasicTypeInfo.STRING_TYPE_INFO,
// TypeInformation.of(new TypeHint<Set<String>>() {}));
//
// BroadcastStream<Tuple2<String, Set<String>>> broadcastStream =
// configurationsStream.keyBy(t -> t.f0).broadcast(sensorConfigStateDescriptor);
//
// DataStream<Tuple2<SensorParentKey, ActivePowerRecord>> lastValueStream =
// keyedStream.connect(broadcastStream)
// .process(new JoinAndDuplicateKeyedBroadcastProcessFunction());
if (debug) {
lastValueStream
.map(t -> "<" + t.f0.getSensor() + "|" + t.f0.getParent() + ">" + "ActivePowerRecord {"
+ "identifier: " + t.f1.getIdentifier() + ", "
+ "timestamp: " + t.f1.getTimestamp() + ", "
+ "valueInW: " + t.f1.getValueInW() + " }")
.map(t -> "<" + t.f0.getSensor() + "|" + t.f0.getParent() + ">" + t.f1)
.print();
}
......@@ -262,7 +242,6 @@ public class AggregationServiceFlinkJob {
// add Kafka Sink
aggregationStream
// AggregatedActivePowerRecord -> Tuple2<String, AggregatedActivePowerRecord>
.map(value -> new Tuple2<>(value.getIdentifier(), value))
.name("[Map] AggregatedActivePowerRecord -> (Sensor, AggregatedActivePowerRecord)")
.returns(Types.TUPLE(Types.STRING, TypeInformation.of(AggregatedActivePowerRecord.class)))
......
......
package theodolite.uc4.application;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
......@@ -12,36 +17,31 @@ import titan.ccp.model.sensorregistry.AggregatedSensor;
import titan.ccp.model.sensorregistry.Sensor;
import titan.ccp.model.sensorregistry.SensorRegistry;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* Transforms a {@link SensorRegistry} into key value pairs of Sensor identifiers and their parents'
* sensor identifiers. All pairs whose sensor's parents have changed since last iteration are
* forwarded. A mapping of an identifier to <code>null</code> means that the corresponding sensor
* does not longer exists in the sensor registry.
*/
public class ChildParentsFlatMapFunction extends RichFlatMapFunction<SensorRegistry, Tuple2<String, Set<String>>> {
public class ChildParentsFlatMapFunction
extends RichFlatMapFunction<SensorRegistry, Tuple2<String, Set<String>>> {
private static final long serialVersionUID = 3969444219510915221L; // NOPMD
private transient MapState<String, Set<String>> state;
@Override
public void open(Configuration parameters) {
MapStateDescriptor<String, Set<String>> descriptor =
new MapStateDescriptor<String, Set<String>>(
public void open(final Configuration parameters) {
final MapStateDescriptor<String, Set<String>> descriptor =
new MapStateDescriptor<>(
"child-parents-state",
TypeInformation.of(new TypeHint<String>() {}),
TypeInformation.of(new TypeHint<Set<String>>() {}));
this.state = getRuntimeContext().getMapState(descriptor);
this.state = this.getRuntimeContext().getMapState(descriptor);
}
@Override
public void flatMap(SensorRegistry value, Collector<Tuple2<String, Set<String>>> out)
public void flatMap(final SensorRegistry value, final Collector<Tuple2<String, Set<String>>> out)
throws Exception {
final Map<String, Set<String>> childParentsPairs = this.constructChildParentsPairs(value);
this.updateChildParentsPairs(childParentsPairs);
......@@ -71,7 +71,7 @@ public class ChildParentsFlatMapFunction extends RichFlatMapFunction<SensorRegis
}
private void updateChildParentsPairs(final Map<String, Set<String>> childParentsPairs)
throws Exception {
throws Exception { // NOPMD General exception thown by Flink
final Iterator<Map.Entry<String, Set<String>>> oldChildParentsPairs = this.state.iterator();
while (oldChildParentsPairs.hasNext()) {
final Map.Entry<String, Set<String>> oldChildParentPair = oldChildParentsPairs.next();
......@@ -89,7 +89,8 @@ public class ChildParentsFlatMapFunction extends RichFlatMapFunction<SensorRegis
}
}
private void updateState(final Map<String, Set<String>> childParentsPairs) throws Exception {
private void updateState(final Map<String, Set<String>> childParentsPairs)
throws Exception { // NOPMD General exception thown by Flink
for (final Map.Entry<String, Set<String>> childParentPair : childParentsPairs.entrySet()) {
if (childParentPair.getValue() == null) {
this.state.remove(childParentPair.getKey());
......
......
......@@ -12,6 +12,12 @@ import org.apache.flink.util.Collector;
import theodolite.uc4.application.util.SensorParentKey;
import titan.ccp.model.records.ActivePowerRecord;
/**
* A {@link RichCoFlatMapFunction} which joins each incoming {@link ActivePowerRecord} with its
* corresponding parents. The {@link ActivePowerRecord} is duplicated for each parent. When
* receiving a new set of parents for a sensor, this operator updates its internal state and
* forwards "tombstone" record if a sensor does no longer have a certain parent.
*/
public class JoinAndDuplicateCoFlatMapFunction extends
RichCoFlatMapFunction<ActivePowerRecord, Tuple2<String, Set<String>>, Tuple2<SensorParentKey, ActivePowerRecord>> { // NOCS
......@@ -44,17 +50,17 @@ public class JoinAndDuplicateCoFlatMapFunction extends
@Override
public void flatMap2(final Tuple2<String, Set<String>> value,
final Collector<Tuple2<SensorParentKey, ActivePowerRecord>> out) throws Exception {
final Set<String> oldParents = this.state.get(value.f0);
if (oldParents != null) {
final String sensor = value.f0;
final Set<String> oldParents = this.state.get(sensor);
final Set<String> newParents = value.f1;
if (!newParents.equals(oldParents)) {
if (oldParents != null && !newParents.equals(oldParents)) {
for (final String oldParent : oldParents) {
if (!newParents.contains(oldParent)) {
out.collect(new Tuple2<>(new SensorParentKey(value.f0, oldParent), null));
// Parent was deleted, emit tombstone record
out.collect(new Tuple2<>(new SensorParentKey(sensor, oldParent), null));
}
}
}
}
this.state.put(value.f0, value.f1);
this.state.put(sensor, newParents);
}
}
package theodolite.uc4.application;
import java.util.Set;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.util.Collector;
import theodolite.uc4.application.util.SensorParentKey;
import titan.ccp.model.records.ActivePowerRecord;
public class JoinAndDuplicateKeyedBroadcastProcessFunction extends
KeyedBroadcastProcessFunction<String, ActivePowerRecord, Tuple2<String, Set<String>>, Tuple2<SensorParentKey, ActivePowerRecord>> { // NOCS
private static final long serialVersionUID = -4525438547262992821L; // NOPMD
private final MapStateDescriptor<String, Set<String>> sensorConfigStateDescriptor =
new MapStateDescriptor<>(
"join-and-duplicate-state",
BasicTypeInfo.STRING_TYPE_INFO,
TypeInformation.of(new TypeHint<Set<String>>() {}));
@Override
public void processElement(final ActivePowerRecord value, final ReadOnlyContext ctx,
final Collector<Tuple2<SensorParentKey, ActivePowerRecord>> out) throws Exception {
final Set<String> parents =
ctx.getBroadcastState(this.sensorConfigStateDescriptor).get(value.getIdentifier());
if (parents == null) {
return;
}
for (final String parent : parents) {
out.collect(new Tuple2<>(new SensorParentKey(value.getIdentifier(), parent), value));
}
}
@Override
public void processBroadcastElement(final Tuple2<String, Set<String>> value, final Context ctx,
final Collector<Tuple2<SensorParentKey, ActivePowerRecord>> out) throws Exception {
final BroadcastState<String, Set<String>> state =
ctx.getBroadcastState(this.sensorConfigStateDescriptor);
final Set<String> oldParents = state.get(value.f0);
if (oldParents != null) {
final Set<String> newParents = value.f1;
if (!newParents.equals(oldParents)) {
for (final String oldParent : oldParents) {
if (!newParents.contains(oldParent)) {
out.collect(new Tuple2<>(new SensorParentKey(value.f0, oldParent), null));
}
}
}
}
state.put(value.f0, value.f1);
}
}
......@@ -15,6 +15,11 @@ import theodolite.uc4.application.util.SensorParentKey;
import titan.ccp.model.records.ActivePowerRecord;
import titan.ccp.model.records.AggregatedActivePowerRecord;
/**
* A {@link ProcessWindowFunction} which performs the windowed aggregation of all
* {@link ActivePowerRecord} for the same {@link SensorParentKey}. Result of this aggregation is an
* {@link AggregatedActivePowerRecord}.
*/
public class RecordAggregationProcessWindowFunction extends
ProcessWindowFunction<Tuple2<SensorParentKey, ActivePowerRecord>, AggregatedActivePowerRecord, String, TimeWindow> { // NOCS
......@@ -40,7 +45,9 @@ public class RecordAggregationProcessWindowFunction extends
}
@Override
public void process(final String key, final Context context,
public void process(
final String key,
final Context context,
final Iterable<Tuple2<SensorParentKey, ActivePowerRecord>> elements,
final Collector<AggregatedActivePowerRecord> out) throws Exception {
for (final Tuple2<SensorParentKey, ActivePowerRecord> t : elements) {
......
......
......@@ -7,6 +7,9 @@ import com.esotericsoftware.kryo.io.Output;
import java.io.Serializable;
import titan.ccp.model.sensorregistry.ImmutableSensorRegistry;
/**
* A {@link Serializer} for {@link ImmutableSensorRegistry}s.
*/
public class ImmutableSensorRegistrySerializer extends Serializer<ImmutableSensorRegistry>
implements Serializable {
......
......
......@@ -7,6 +7,9 @@ import com.esotericsoftware.kryo.io.Output;
import java.io.Serializable;
import java.util.Set;
/**
* A {@link Serializer} for serializing arbitrary {@link Set}s of {@link Object}s.
*/
public final class ImmutableSetSerializer extends Serializer<Set<Object>> implements Serializable {
private static final long serialVersionUID = 6919877826110724620L; // NOPMD
......
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment