Skip to content
Snippets Groups Projects
Commit 3fa22170 authored by Sören Henning's avatar Sören Henning
Browse files

Fix code style issues

parent 9863e9e4
Branches
Tags
1 merge request!208Add benchmark implementations for Hazelcast Jet
Pipeline #5865 passed
...@@ -21,9 +21,9 @@ public class Uc1PipelineBuilder { ...@@ -21,9 +21,9 @@ public class Uc1PipelineBuilder {
/** /**
* Builds a pipeline which can be used for stream processing using Hazelcast Jet. * Builds a pipeline which can be used for stream processing using Hazelcast Jet.
* *
* @param kafkaPropsForPipeline Properties Object containing the necessary kafka attributes. * @param kafkaPropsForPipeline Properties object containing the necessary Kafka attributes.
* @param kafkaInputTopic The name of the input topic used for the pipeline. * @param kafkaInputTopic The name of the input topic used for the pipeline.
* @return A hazelcast jet pipeline which processes data for Uc1. * @return A Hazelcast Jet pipeline which processes data for Uc1.
*/ */
public Pipeline build(final Properties kafkaPropsForPipeline, final String kafkaInputTopic) { public Pipeline build(final Properties kafkaPropsForPipeline, final String kafkaInputTopic) {
...@@ -44,11 +44,12 @@ public class Uc1PipelineBuilder { ...@@ -44,11 +44,12 @@ public class Uc1PipelineBuilder {
} }
/** /**
* Extends to a blank Hazelcast Jet Pipeline the UC1 topology defines by theodolite. * Extends to a blank Hazelcast Jet Pipeline the UC1 topology defines by Theodolite.
* *
* <p> * <p>
* UC1 takes {@code Entry<String,ActivePowerRecord>} objects and turns them into Json strings * UC1 takes {@code Entry<String,ActivePowerRecord>} objects and turns them into JSON strings
* using GSON. * using GSON.
* </p>
* *
* @param pipe The blank hazelcast jet pipeline to extend the logic to. * @param pipe The blank hazelcast jet pipeline to extend the logic to.
* @param source A streaming source to fetch data from. * @param source A streaming source to fetch data from.
......
...@@ -70,6 +70,7 @@ public class Uc2PipelineBuilder { ...@@ -70,6 +70,7 @@ public class Uc2PipelineBuilder {
* window and aggregates them into {@code Stats} objects. The final map returns an * window and aggregates them into {@code Stats} objects. The final map returns an
* {@code Entry<String,String>} where the key is the key of the group and the String is the * {@code Entry<String,String>} where the key is the key of the group and the String is the
* {@code .toString()} representation of the {@code Stats} object. * {@code .toString()} representation of the {@code Stats} object.
* </p>
* *
* @param pipe The blank hazelcast jet pipeline to extend the logic to. * @param pipe The blank hazelcast jet pipeline to extend the logic to.
* @param source A streaming source to fetch data from. * @param source A streaming source to fetch data from.
...@@ -101,7 +102,8 @@ public class Uc2PipelineBuilder { ...@@ -101,7 +102,8 @@ public class Uc2PipelineBuilder {
* *
* <p> * <p>
* Takes a windowed and keyed {@code Entry<String,ActivePowerRecord>} elements and returns a * Takes a windowed and keyed {@code Entry<String,ActivePowerRecord>} elements and returns a
* {@Stats} Object. * {@Stats} object.
* </p>
* *
* @return An AggregateOperation used by Hazelcast Jet in a streaming stage which aggregates * @return An AggregateOperation used by Hazelcast Jet in a streaming stage which aggregates
* ActivePowerRecord Objects into Stats Objects. * ActivePowerRecord Objects into Stats Objects.
......
...@@ -74,6 +74,7 @@ public class Uc3PipelineBuilder { ...@@ -74,6 +74,7 @@ public class Uc3PipelineBuilder {
* <p> * <p>
* UC3 takes {@code ActivePowerRecord} object, groups them by keys and calculates average double * UC3 takes {@code ActivePowerRecord} object, groups them by keys and calculates average double
* values for a sliding window and sorts them into the hour of the day. * values for a sliding window and sorts them into the hour of the day.
* </p>
* *
* @param pipe The blank hazelcast jet pipeline to extend the logic to. * @param pipe The blank hazelcast jet pipeline to extend the logic to.
* @param source A streaming source to fetch data from. * @param source A streaming source to fetch data from.
......
...@@ -3,7 +3,7 @@ package theodolite.uc3.application.uc3specifics; ...@@ -3,7 +3,7 @@ package theodolite.uc3.application.uc3specifics;
import java.time.LocalDateTime; import java.time.LocalDateTime;
/** /**
* A factory class to build a {@link HourOfDayKey} * A factory class to build an {@link HourOfDayKey}.
* *
*/ */
public class HoursOfDayKeyFactory implements StatsKeyFactory<HourOfDayKey> { public class HoursOfDayKeyFactory implements StatsKeyFactory<HourOfDayKey> {
......
package theodolite.uc4.application; package theodolite.uc4.application;
import com.hazelcast.function.BiFunctionEx; import com.hazelcast.function.BiFunctionEx;
import com.hazelcast.function.SupplierEx;
import com.hazelcast.jet.Traverser; import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.Traversers; import com.hazelcast.jet.Traversers;
import com.hazelcast.jet.Util; import com.hazelcast.jet.Util;
...@@ -105,6 +104,7 @@ public class Uc4PipelineBuilder { ...@@ -105,6 +104,7 @@ public class Uc4PipelineBuilder {
* from keys to groups to map values to their accourding groups. A feedback stream allows for * from keys to groups to map values to their accourding groups. A feedback stream allows for
* group keys to be mapped to values and eventually to be mapped to other top level groups defines * group keys to be mapped to values and eventually to be mapped to other top level groups defines
* by the {@code SensorRegistry}. * by the {@code SensorRegistry}.
* </p>
* *
* <p> * <p>
* 6 Step topology: <br> * 6 Step topology: <br>
...@@ -114,6 +114,7 @@ public class Uc4PipelineBuilder { ...@@ -114,6 +114,7 @@ public class Uc4PipelineBuilder {
* (4) Duplicate as flatmap per value and group <br> * (4) Duplicate as flatmap per value and group <br>
* (5) Window (preperation for possible last values) <br> * (5) Window (preperation for possible last values) <br>
* (6) Aggregate data over the window * (6) Aggregate data over the window
* </p>
* *
* @param pipe The blank pipeline to extend the logic to. * @param pipe The blank pipeline to extend the logic to.
* @param inputSource A streaming source with {@code ActivePowerRecord} data. * @param inputSource A streaming source with {@code ActivePowerRecord} data.
...@@ -136,7 +137,7 @@ public class Uc4PipelineBuilder { ...@@ -136,7 +137,7 @@ public class Uc4PipelineBuilder {
.filter(entry -> entry.getKey() == Event.SENSOR_REGISTRY_CHANGED .filter(entry -> entry.getKey() == Event.SENSOR_REGISTRY_CHANGED
|| entry.getKey() == Event.SENSOR_REGISTRY_STATUS) || entry.getKey() == Event.SENSOR_REGISTRY_STATUS)
.map(data -> Util.entry(data.getKey(), SensorRegistry.fromJson(data.getValue()))) .map(data -> Util.entry(data.getKey(), SensorRegistry.fromJson(data.getValue())))
.flatMapStateful(this.hashMapSupplier(), this.configFlatMap()) .flatMapStateful(HashMap::new, new ConfigFlatMap())
.writeTo(Sinks.mapWithUpdating( .writeTo(Sinks.mapWithUpdating(
SENSOR_PARENT_MAP_NAME, // The addressed IMAP SENSOR_PARENT_MAP_NAME, // The addressed IMAP
Entry::getKey, // The key to look for Entry::getKey, // The key to look for
...@@ -237,21 +238,18 @@ public class Uc4PipelineBuilder { ...@@ -237,21 +238,18 @@ public class Uc4PipelineBuilder {
}); });
} }
/** /**
* Returns a function which supplies a {@code HashMapy<String, Set<String>>()}. * FlatMap function used to process the configuration input for UC4.
*/ */
private SupplierEx<? extends HashMap<String, Set<String>>> hashMapSupplier() { private static class ConfigFlatMap implements
return HashMap::new; BiFunctionEx<Map<String, Set<String>>, Entry<Event, SensorRegistry>, Traverser<Entry<String, Set<String>>>> { // NOCS
}
/** private static final long serialVersionUID = -6769931374907428699L;
* Returns a function which supplies the flatMap function used to process the configuration input
* for UC4.
*/
private BiFunctionEx<? super HashMap<String, Set<String>>, ? super Entry<Event, SensorRegistry>, ? extends Traverser<Entry<String, Set<String>>>> configFlatMap() {
return (flatMapStage, eventItem) -> {
@Override
public Traverser<Entry<String, Set<String>>> applyEx(
final Map<String, Set<String>> flatMapStage,
final Entry<Event, SensorRegistry> eventItem) {
// Transform new Input // Transform new Input
final ChildParentsTransformer transformer = new ChildParentsTransformer("default-name"); final ChildParentsTransformer transformer = new ChildParentsTransformer("default-name");
final Map<String, Set<String>> mapFromRegistry = final Map<String, Set<String>> mapFromRegistry =
...@@ -275,7 +273,8 @@ public class Uc4PipelineBuilder { ...@@ -275,7 +273,8 @@ public class Uc4PipelineBuilder {
// Return traverser with updates list. // Return traverser with updates list.
return Traversers.traverseIterable(updatesList) return Traversers.traverseIterable(updatesList)
.map(e -> Util.entry(e.getKey(), e.getValue())); .map(e -> Util.entry(e.getKey(), e.getValue()));
}; }
} }
} }
...@@ -5,7 +5,7 @@ import java.util.HashMap; ...@@ -5,7 +5,7 @@ import java.util.HashMap;
import java.util.Set; import java.util.Set;
/** /**
* Supplies a {@link HashMap} and implements {@link SupplierEx} * Supplies a {@link HashMap} and implements {@link SupplierEx}.
*/ */
public class HashMapSupplier implements SupplierEx<HashMap<String, Set<String>>> { public class HashMapSupplier implements SupplierEx<HashMap<String, Set<String>>> {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment