diff --git a/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/theodolite/uc1/application/Uc1PipelineBuilder.java b/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/theodolite/uc1/application/Uc1PipelineBuilder.java index 5643747a218385ac73f540970932505ea4165eaf..5d35e83eac8cddb4121603058c143130b9eeb4a8 100644 --- a/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/theodolite/uc1/application/Uc1PipelineBuilder.java +++ b/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/theodolite/uc1/application/Uc1PipelineBuilder.java @@ -21,9 +21,9 @@ public class Uc1PipelineBuilder { /** * Builds a pipeline which can be used for stream processing using Hazelcast Jet. * - * @param kafkaPropsForPipeline Properties Object containing the necessary kafka attributes. + * @param kafkaPropsForPipeline Properties object containing the necessary Kafka attributes. * @param kafkaInputTopic The name of the input topic used for the pipeline. - * @return A hazelcast jet pipeline which processes data for Uc1. + * @return A Hazelcast Jet pipeline which processes data for Uc1. */ public Pipeline build(final Properties kafkaPropsForPipeline, final String kafkaInputTopic) { @@ -44,11 +44,12 @@ public class Uc1PipelineBuilder { } /** - * Extends to a blank Hazelcast Jet Pipeline the UC1 topology defines by theodolite. + * Extends to a blank Hazelcast Jet Pipeline the UC1 topology defines by Theodolite. * * <p> - * UC1 takes {@code Entry<String,ActivePowerRecord>} objects and turns them into Json strings + * UC1 takes {@code Entry<String,ActivePowerRecord>} objects and turns them into JSON strings * using GSON. + * </p> * * @param pipe The blank hazelcast jet pipeline to extend the logic to. * @param source A streaming source to fetch data from. diff --git a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/theodolite/uc2/application/Uc2PipelineBuilder.java b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/theodolite/uc2/application/Uc2PipelineBuilder.java index dea01a1bed648171374db8568e22e87ad1904f28..db7521b2cbf3c662ed1bf141be0749acf41cbce3 100644 --- a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/theodolite/uc2/application/Uc2PipelineBuilder.java +++ b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/theodolite/uc2/application/Uc2PipelineBuilder.java @@ -70,6 +70,7 @@ public class Uc2PipelineBuilder { * window and aggregates them into {@code Stats} objects. The final map returns an * {@code Entry<String,String>} where the key is the key of the group and the String is the * {@code .toString()} representation of the {@code Stats} object. + * </p> * * @param pipe The blank hazelcast jet pipeline to extend the logic to. * @param source A streaming source to fetch data from. @@ -101,7 +102,8 @@ public class Uc2PipelineBuilder { * * <p> * Takes a windowed and keyed {@code Entry<String,ActivePowerRecord>} elements and returns a - * {@Stats} Object. + * {@Stats} object. + * </p> * * @return An AggregateOperation used by Hazelcast Jet in a streaming stage which aggregates * ActivePowerRecord Objects into Stats Objects. diff --git a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/theodolite/uc3/application/Uc3PipelineBuilder.java b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/theodolite/uc3/application/Uc3PipelineBuilder.java index f987f82b7230d6a2ba2f98a3ba35796802e0320d..82bf0dd30a6009cc64c71574adddee53958309e4 100644 --- a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/theodolite/uc3/application/Uc3PipelineBuilder.java +++ b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/theodolite/uc3/application/Uc3PipelineBuilder.java @@ -74,6 +74,7 @@ public class Uc3PipelineBuilder { * <p> * UC3 takes {@code ActivePowerRecord} object, groups them by keys and calculates average double * values for a sliding window and sorts them into the hour of the day. + * </p> * * @param pipe The blank hazelcast jet pipeline to extend the logic to. * @param source A streaming source to fetch data from. diff --git a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/theodolite/uc3/application/uc3specifics/HoursOfDayKeyFactory.java b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/theodolite/uc3/application/uc3specifics/HoursOfDayKeyFactory.java index 5661830e2698e5bb2e3486af4069e544009bc64f..aa4814ff928da5f345d7fb14b14d7d978442ff46 100644 --- a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/theodolite/uc3/application/uc3specifics/HoursOfDayKeyFactory.java +++ b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/theodolite/uc3/application/uc3specifics/HoursOfDayKeyFactory.java @@ -3,7 +3,7 @@ package theodolite.uc3.application.uc3specifics; import java.time.LocalDateTime; /** - * A factory class to build a {@link HourOfDayKey} + * A factory class to build an {@link HourOfDayKey}. * */ public class HoursOfDayKeyFactory implements StatsKeyFactory<HourOfDayKey> { diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/Uc4PipelineBuilder.java b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/Uc4PipelineBuilder.java index 104a59801d1b4e5e5c78b7e2df4c1278686d6b5a..7f3a87e15f3543843f059ff258457794d259e457 100644 --- a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/Uc4PipelineBuilder.java +++ b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/Uc4PipelineBuilder.java @@ -1,7 +1,6 @@ package theodolite.uc4.application; import com.hazelcast.function.BiFunctionEx; -import com.hazelcast.function.SupplierEx; import com.hazelcast.jet.Traverser; import com.hazelcast.jet.Traversers; import com.hazelcast.jet.Util; @@ -105,6 +104,7 @@ public class Uc4PipelineBuilder { * from keys to groups to map values to their accourding groups. A feedback stream allows for * group keys to be mapped to values and eventually to be mapped to other top level groups defines * by the {@code SensorRegistry}. + * </p> * * <p> * 6 Step topology: <br> @@ -114,6 +114,7 @@ public class Uc4PipelineBuilder { * (4) Duplicate as flatmap per value and group <br> * (5) Window (preperation for possible last values) <br> * (6) Aggregate data over the window + * </p> * * @param pipe The blank pipeline to extend the logic to. * @param inputSource A streaming source with {@code ActivePowerRecord} data. @@ -136,7 +137,7 @@ public class Uc4PipelineBuilder { .filter(entry -> entry.getKey() == Event.SENSOR_REGISTRY_CHANGED || entry.getKey() == Event.SENSOR_REGISTRY_STATUS) .map(data -> Util.entry(data.getKey(), SensorRegistry.fromJson(data.getValue()))) - .flatMapStateful(this.hashMapSupplier(), this.configFlatMap()) + .flatMapStateful(HashMap::new, new ConfigFlatMap()) .writeTo(Sinks.mapWithUpdating( SENSOR_PARENT_MAP_NAME, // The addressed IMAP Entry::getKey, // The key to look for @@ -237,21 +238,18 @@ public class Uc4PipelineBuilder { }); } - /** - * Returns a function which supplies a {@code HashMapy<String, Set<String>>()}. + * FlatMap function used to process the configuration input for UC4. */ - private SupplierEx<? extends HashMap<String, Set<String>>> hashMapSupplier() { - return HashMap::new; - } + private static class ConfigFlatMap implements + BiFunctionEx<Map<String, Set<String>>, Entry<Event, SensorRegistry>, Traverser<Entry<String, Set<String>>>> { // NOCS - /** - * Returns a function which supplies the flatMap function used to process the configuration input - * for UC4. - */ - private BiFunctionEx<? super HashMap<String, Set<String>>, ? super Entry<Event, SensorRegistry>, ? extends Traverser<Entry<String, Set<String>>>> configFlatMap() { - return (flatMapStage, eventItem) -> { + private static final long serialVersionUID = -6769931374907428699L; + @Override + public Traverser<Entry<String, Set<String>>> applyEx( + final Map<String, Set<String>> flatMapStage, + final Entry<Event, SensorRegistry> eventItem) { // Transform new Input final ChildParentsTransformer transformer = new ChildParentsTransformer("default-name"); final Map<String, Set<String>> mapFromRegistry = @@ -275,7 +273,8 @@ public class Uc4PipelineBuilder { // Return traverser with updates list. return Traversers.traverseIterable(updatesList) .map(e -> Util.entry(e.getKey(), e.getValue())); - }; + } + } } diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/uc4specifics/HashMapSupplier.java b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/uc4specifics/HashMapSupplier.java index f18f8ecd6b4844b07bc587290eed12952c70f717..e55278c1d5db4f197d86dcfbbdbe2d38e87c1b6c 100644 --- a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/uc4specifics/HashMapSupplier.java +++ b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/uc4specifics/HashMapSupplier.java @@ -5,7 +5,7 @@ import java.util.HashMap; import java.util.Set; /** - * Supplies a {@link HashMap} and implements {@link SupplierEx} + * Supplies a {@link HashMap} and implements {@link SupplierEx}. */ public class HashMapSupplier implements SupplierEx<HashMap<String, Set<String>>> {