Skip to content
Snippets Groups Projects
Commit 40b904ee authored by Sören Henning's avatar Sören Henning
Browse files

Clean up code

parent d80dd0c3
No related branches found
No related tags found
No related merge requests found
Pipeline #10431 failed
...@@ -16,7 +16,11 @@ public class AggregatedToActive ...@@ -16,7 +16,11 @@ public class AggregatedToActive
@Override @Override
public KV<String, ActivePowerRecord> apply( public KV<String, ActivePowerRecord> apply(
final KV<String, AggregatedActivePowerRecord> kv) { final KV<String, AggregatedActivePowerRecord> kv) {
return KV.of(kv.getKey(), new ActivePowerRecord(kv.getValue().getIdentifier(), return KV.of(
kv.getValue().getTimestamp(), kv.getValue().getSumInW())); kv.getKey(),
new ActivePowerRecord(
kv.getValue().getIdentifier(),
kv.getValue().getTimestamp(),
kv.getValue().getSumInW()));
} }
} }
...@@ -131,7 +131,7 @@ public final class AggregationServiceFlinkJob extends AbstractFlinkService { ...@@ -131,7 +131,7 @@ public final class AggregationServiceFlinkJob extends AbstractFlinkService {
|| tuple.f0 == Event.SENSOR_REGISTRY_STATUS) || tuple.f0 == Event.SENSOR_REGISTRY_STATUS)
.name("[Filter] SensorRegistry changed") .name("[Filter] SensorRegistry changed")
.map(tuple -> SensorRegistry.fromJson(tuple.f1)).name("[Map] JSON -> SensorRegistry") .map(tuple -> SensorRegistry.fromJson(tuple.f1)).name("[Map] JSON -> SensorRegistry")
.keyBy(sr -> 1) // .keyBy(sr -> 1)
.flatMap(new ChildParentsFlatMapFunction()) .flatMap(new ChildParentsFlatMapFunction())
.name("[FlatMap] SensorRegistry -> (ChildSensor, ParentSensor[])"); .name("[FlatMap] SensorRegistry -> (ChildSensor, ParentSensor[])");
......
package rocks.theodolite.benchmarks.uc4.hazelcastjet; package rocks.theodolite.benchmarks.uc4.hazelcastjet;
import java.util.Map; import java.util.Map;
import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import rocks.theodolite.benchmarks.commons.configuration.events.Event;
import rocks.theodolite.benchmarks.commons.model.sensorregistry.AggregatedSensor; import rocks.theodolite.benchmarks.commons.model.sensorregistry.AggregatedSensor;
import rocks.theodolite.benchmarks.commons.model.sensorregistry.Sensor; import rocks.theodolite.benchmarks.commons.model.sensorregistry.Sensor;
import rocks.theodolite.benchmarks.commons.model.sensorregistry.SensorRegistry; import rocks.theodolite.benchmarks.commons.model.sensorregistry.SensorRegistry;
...@@ -23,46 +16,9 @@ import rocks.theodolite.benchmarks.commons.model.sensorregistry.SensorRegistry; ...@@ -23,46 +16,9 @@ import rocks.theodolite.benchmarks.commons.model.sensorregistry.SensorRegistry;
* does not longer exists in the sensor registry. * does not longer exists in the sensor registry.
* *
*/ */
public class ChildParentsTransformer implements public class ChildParentsTransformer {
Transformer<Event, SensorRegistry, Iterable<KeyValue<String, Optional<Set<String>>>>> {
private final String stateStoreName; public ChildParentsTransformer() {}
// private ProcessorContext context;
private KeyValueStore<String, Set<String>> state;
public ChildParentsTransformer(final String stateStoreName) {
this.stateStoreName = stateStoreName;
}
@Override
@SuppressWarnings("unchecked")
public void init(final ProcessorContext context) {
// this.context = context;
this.state = (KeyValueStore<String, Set<String>>) context.getStateStore(this.stateStoreName);
}
@Override
public Iterable<KeyValue<String, Optional<Set<String>>>> transform(final Event event,
final SensorRegistry registry) {
// Values may later be null for deleting a sensor
final Map<String, Set<String>> childParentsPairs = this.constructChildParentsPairs(registry);
this.updateChildParentsPairs(childParentsPairs);
this.updateState(childParentsPairs);
return childParentsPairs
.entrySet()
.stream()
.map(e -> KeyValue.pair(e.getKey(), Optional.ofNullable(e.getValue())))
.collect(Collectors.toList());
}
@Override
public void close() {
// Do nothing
}
/** /**
* Constructs a map of keys to their set of parents out of a SensorRegistry. * Constructs a map of keys to their set of parents out of a SensorRegistry.
...@@ -87,33 +43,4 @@ public class ChildParentsTransformer implements ...@@ -87,33 +43,4 @@ public class ChildParentsTransformer implements
: Stream.empty())); : Stream.empty()));
} }
private void updateChildParentsPairs(final Map<String, Set<String>> childParentsPairs) {
final KeyValueIterator<String, Set<String>> oldChildParentsPairs = this.state.all();
while (oldChildParentsPairs.hasNext()) {
final KeyValue<String, Set<String>> oldChildParentPair = oldChildParentsPairs.next();
final String identifier = oldChildParentPair.key;
final Set<String> oldParents = oldChildParentPair.value;
final Set<String> newParents = childParentsPairs.get(identifier); // null if not exists
if (newParents == null) {
// Sensor was deleted
childParentsPairs.put(identifier, null);
} else if (newParents.equals(oldParents)) {
// No changes
childParentsPairs.remove(identifier);
}
// Else: Later Perhaps: Mark changed parents
}
oldChildParentsPairs.close();
}
private void updateState(final Map<String, Set<String>> childParentsPairs) {
for (final Map.Entry<String, Set<String>> childParentPair : childParentsPairs.entrySet()) {
if (childParentPair.getValue() == null) {
this.state.delete(childParentPair.getKey());
} else {
this.state.put(childParentPair.getKey(), childParentPair.getValue());
}
}
}
} }
...@@ -9,7 +9,6 @@ import org.apache.avro.specific.SpecificRecord; ...@@ -9,7 +9,6 @@ import org.apache.avro.specific.SpecificRecord;
import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serde;
import rocks.theodolite.benchmarks.commons.kafka.avro.SchemaRegistryAvroSerdeFactory; import rocks.theodolite.benchmarks.commons.kafka.avro.SchemaRegistryAvroSerdeFactory;
public class MockedSchemaRegistrySerdes extends SchemaRegistryAvroSerdeFactory { public class MockedSchemaRegistrySerdes extends SchemaRegistryAvroSerdeFactory {
private static final String URL_KEY = AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG; private static final String URL_KEY = AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment