Skip to content
Snippets Groups Projects
Commit 020839d3 authored by Sören Henning's avatar Sören Henning
Browse files

Fix code quality issues

parent 75e2dc63
No related branches found
No related tags found
No related merge requests found
Pipeline #10433 passed
...@@ -18,8 +18,6 @@ import rocks.theodolite.benchmarks.commons.model.sensorregistry.SensorRegistry; ...@@ -18,8 +18,6 @@ import rocks.theodolite.benchmarks.commons.model.sensorregistry.SensorRegistry;
*/ */
public class ChildParentsTransformer { public class ChildParentsTransformer {
public ChildParentsTransformer() {}
/** /**
* Constructs a map of keys to their set of parents out of a SensorRegistry. * Constructs a map of keys to their set of parents out of a SensorRegistry.
* *
......
...@@ -53,16 +53,20 @@ public class HistoryService extends HazelcastJetService { ...@@ -53,16 +53,20 @@ public class HistoryService extends HazelcastJetService {
final String feedbackTopic = this.config.getString(Uc4ConfigurationKeys.KAFKA_FEEDBACK_TOPIC); final String feedbackTopic = this.config.getString(Uc4ConfigurationKeys.KAFKA_FEEDBACK_TOPIC);
final Duration windowSize = Duration.ofMillis( final Duration emirPeriod = Duration.ofMillis(
this.config.getInt(Uc4ConfigurationKeys.EMIT_PERIOD_MS)); this.config.getInt(Uc4ConfigurationKeys.EMIT_PERIOD_MS));
final Duration gracePeriod = Duration.ofMillis(
this.config.getInt(Uc4ConfigurationKeys.GRACE_PERIOD_MS));
this.pipelineFactory = new Uc4PipelineFactory( this.pipelineFactory = new Uc4PipelineFactory(
kafkaProps, kafkaProps,
kafkaConfigReadProps, kafkaConfigReadProps,
kafkaAggregationReadProps, kafkaAggregationReadProps,
kafkaWriteProps, kafkaWriteProps,
this.kafkaInputTopic, outputTopic, configurationTopic, feedbackTopic, this.kafkaInputTopic, outputTopic, configurationTopic, feedbackTopic,
windowSize); emirPeriod,
gracePeriod);
} }
@Override @Override
......
...@@ -12,6 +12,7 @@ public class Uc4ConfigurationKeys { ...@@ -12,6 +12,7 @@ public class Uc4ConfigurationKeys {
public static final String KAFKA_FEEDBACK_TOPIC = "kafka.feedback.topic"; public static final String KAFKA_FEEDBACK_TOPIC = "kafka.feedback.topic";
public static final String EMIT_PERIOD_MS = "emit.period.ms"; public static final String EMIT_PERIOD_MS = "emit.period.ms";
// public static final String GRACE_PERIOD_MS = "grace.period.ms";
public static final String GRACE_PERIOD_MS = "grace.period.ms";
} }
...@@ -45,6 +45,8 @@ public class Uc4PipelineFactory extends PipelineFactory { ...@@ -45,6 +45,8 @@ public class Uc4PipelineFactory extends PipelineFactory {
private final Duration emitPeriod; private final Duration emitPeriod;
private final Duration gracePeriod;
/** /**
* Builds a pipeline which can be used for stream processing using Hazelcast Jet. * Builds a pipeline which can be used for stream processing using Hazelcast Jet.
...@@ -61,7 +63,7 @@ public class Uc4PipelineFactory extends PipelineFactory { ...@@ -61,7 +63,7 @@ public class Uc4PipelineFactory extends PipelineFactory {
* @param kafkaOutputTopic The name of the output topic used for the pipeline. * @param kafkaOutputTopic The name of the output topic used for the pipeline.
* @param kafkaConfigurationTopic The name of the configuration topic used for the pipeline. * @param kafkaConfigurationTopic The name of the configuration topic used for the pipeline.
* @param kafkaFeedbackTopic The name of the feedback topic used for the pipeline. * @param kafkaFeedbackTopic The name of the feedback topic used for the pipeline.
* @param windowSize The window size of the tumbling window used in this pipeline. * @param emitPeriod The window size of the tumbling window used in this pipeline.
*/ */
public Uc4PipelineFactory(final Properties kafkaInputReadPropsForPipeline, // NOPMD public Uc4PipelineFactory(final Properties kafkaInputReadPropsForPipeline, // NOPMD
final Properties kafkaConfigPropsForPipeline, final Properties kafkaConfigPropsForPipeline,
...@@ -71,7 +73,8 @@ public class Uc4PipelineFactory extends PipelineFactory { ...@@ -71,7 +73,8 @@ public class Uc4PipelineFactory extends PipelineFactory {
final String kafkaOutputTopic, final String kafkaOutputTopic,
final String kafkaConfigurationTopic, final String kafkaConfigurationTopic,
final String kafkaFeedbackTopic, final String kafkaFeedbackTopic,
final Duration windowSize) { final Duration emitPeriod,
final Duration gracePeriod) {
super(kafkaInputReadPropsForPipeline, kafkaInputTopic, super(kafkaInputReadPropsForPipeline, kafkaInputTopic,
kafkaWritePropsForPipeline, kafkaOutputTopic); kafkaWritePropsForPipeline, kafkaOutputTopic);
...@@ -79,7 +82,8 @@ public class Uc4PipelineFactory extends PipelineFactory { ...@@ -79,7 +82,8 @@ public class Uc4PipelineFactory extends PipelineFactory {
this.kafkaFeedbackPropsForPipeline = kafkaFeedbackPropsForPipeline; this.kafkaFeedbackPropsForPipeline = kafkaFeedbackPropsForPipeline;
this.kafkaConfigurationTopic = kafkaConfigurationTopic; this.kafkaConfigurationTopic = kafkaConfigurationTopic;
this.kafkaFeedbackTopic = kafkaFeedbackTopic; this.kafkaFeedbackTopic = kafkaFeedbackTopic;
this.emitPeriod = windowSize; this.emitPeriod = emitPeriod;
this.gracePeriod = gracePeriod;
} }
/** /**
...@@ -155,13 +159,13 @@ public class Uc4PipelineFactory extends PipelineFactory { ...@@ -155,13 +159,13 @@ public class Uc4PipelineFactory extends PipelineFactory {
////////////////////////////////// //////////////////////////////////
// (1) Configuration Stream // (1) Configuration Stream
this.pipe.readFrom(configurationSource) this.pipe.readFrom(configurationSource)
.withNativeTimestamps(0) .withNativeTimestamps(this.gracePeriod.toMillis())
.filter(entry -> entry.getKey() == Event.SENSOR_REGISTRY_CHANGED .filter(entry -> entry.getKey() == Event.SENSOR_REGISTRY_CHANGED
|| entry.getKey() == Event.SENSOR_REGISTRY_STATUS) || entry.getKey() == Event.SENSOR_REGISTRY_STATUS)
.map(data -> Util.entry(data.getKey(), SensorRegistry.fromJson(data.getValue()))) .map(data -> Util.entry(data.getKey(), SensorRegistry.fromJson(data.getValue())))
.flatMapStateful(HashMap::new, new ConfigFlatMap()) .flatMapStateful(HashMap::new, new ConfigFlatMap())
.writeTo(Sinks.mapWithUpdating( .writeTo(Sinks.mapWithUpdating(
SENSOR_PARENT_MAP_NAME, // The addressed IMAP SENSOR_PARENT_MAP_NAME, // The addressed IMap
Entry::getKey, // The key to look for Entry::getKey, // The key to look for
(oldValue, newEntry) -> newEntry.getValue())); (oldValue, newEntry) -> newEntry.getValue()));
...@@ -169,13 +173,13 @@ public class Uc4PipelineFactory extends PipelineFactory { ...@@ -169,13 +173,13 @@ public class Uc4PipelineFactory extends PipelineFactory {
// (1) Sensor Input Stream // (1) Sensor Input Stream
final StreamStage<Entry<String, ActivePowerRecord>> inputStream = this.pipe final StreamStage<Entry<String, ActivePowerRecord>> inputStream = this.pipe
.readFrom(inputSource) .readFrom(inputSource)
.withNativeTimestamps(0); .withNativeTimestamps(this.gracePeriod.toMillis());
////////////////////////////////// //////////////////////////////////
// (1) Aggregation Stream // (1) Aggregation Stream
final StreamStage<Entry<String, ActivePowerRecord>> aggregations = this.pipe final StreamStage<Entry<String, ActivePowerRecord>> aggregations = this.pipe
.readFrom(aggregationSource) .readFrom(aggregationSource)
.withNativeTimestamps(0) .withNativeTimestamps(this.gracePeriod.toMillis())
.map(entry -> { // Map Aggregated to ActivePowerRecord .map(entry -> { // Map Aggregated to ActivePowerRecord
final AggregatedActivePowerRecord agg = entry.getValue(); final AggregatedActivePowerRecord agg = entry.getValue();
final ActivePowerRecord record = new ActivePowerRecord( final ActivePowerRecord record = new ActivePowerRecord(
...@@ -214,17 +218,11 @@ public class Uc4PipelineFactory extends PipelineFactory { ...@@ -214,17 +218,11 @@ public class Uc4PipelineFactory extends PipelineFactory {
final ActivePowerRecord record = entry.getValue().getRecord(); final ActivePowerRecord record = entry.getValue().getRecord();
final Set<String> groups = entry.getValue().getGroups(); final Set<String> groups = entry.getValue().getGroups();
// Transformed Data
final String[] groupList = groups.toArray(String[]::new);
final SensorGroupKey[] newKeyList = new SensorGroupKey[groupList.length];
final List<Entry<SensorGroupKey, ActivePowerRecord>> newEntryList = new ArrayList<>();
for (int i = 0; i < groupList.length; i++) {
newKeyList[i] = new SensorGroupKey(keyGroupId, groupList[i]);
newEntryList.add(Util.entry(newKeyList[i], record));
}
// Return traversable list of new entry elements // Return traversable list of new entry elements
return Traversers.traverseIterable(newEntryList); return Traversers.traverseStream(
groups
.stream()
.map(group -> Util.entry(new SensorGroupKey(keyGroupId, group), record)));
}); });
////////////////////////////////// //////////////////////////////////
...@@ -253,7 +251,8 @@ public class Uc4PipelineFactory extends PipelineFactory { ...@@ -253,7 +251,8 @@ public class Uc4PipelineFactory extends PipelineFactory {
return windowedLastValues return windowedLastValues
.groupingKey(entry -> entry.getKey().getGroup()) .groupingKey(entry -> entry.getKey().getGroup())
.aggregate(aggrOp).map(agg -> Util.entry(agg.getKey(), agg.getValue())); .aggregate(aggrOp)
.map(agg -> Util.entry(agg.getKey(), agg.getValue()));
} }
...@@ -270,7 +269,7 @@ public class Uc4PipelineFactory extends PipelineFactory { ...@@ -270,7 +269,7 @@ public class Uc4PipelineFactory extends PipelineFactory {
final Map<String, Set<String>> flatMapStage, final Map<String, Set<String>> flatMapStage,
final Entry<Event, SensorRegistry> eventItem) { final Entry<Event, SensorRegistry> eventItem) {
// Transform new Input // Transform new Input
final ChildParentsTransformer transformer = new ChildParentsTransformer("default-name"); final ChildParentsTransformer transformer = new ChildParentsTransformer();
final Map<String, Set<String>> mapFromRegistry = final Map<String, Set<String>> mapFromRegistry =
transformer.constructChildParentsPairs(eventItem.getValue()); transformer.constructChildParentsPairs(eventItem.getValue());
......
...@@ -10,4 +10,4 @@ kafka.feedback.topic=aggregation-feedback ...@@ -10,4 +10,4 @@ kafka.feedback.topic=aggregation-feedback
schema.registry.url=http://localhost:8081 schema.registry.url=http://localhost:8081
emit.period.ms=5000 emit.period.ms=5000
#grace.period.ms=0 grace.period.ms=0
...@@ -120,7 +120,7 @@ public class Uc4PipelineTest extends JetTestSupport { ...@@ -120,7 +120,7 @@ public class Uc4PipelineTest extends JetTestSupport {
final Properties properties = new Properties(); final Properties properties = new Properties();
final Uc4PipelineFactory factory = new Uc4PipelineFactory( final Uc4PipelineFactory factory = new Uc4PipelineFactory(
properties, properties, properties, properties, "", "", properties, properties, properties, properties, "", "",
"", "", testWindowSize); "", "", testWindowSize, Duration.ofMillis(0));
this.uc4Topology = this.uc4Topology =
factory.extendUc4Topology(testInputSource, testAggregationSource, testConfigSource); factory.extendUc4Topology(testInputSource, testAggregationSource, testConfigSource);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment