diff --git a/theodolite-benchmarks/uc1-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc1/hazelcast/Uc1PipelineTest.java b/theodolite-benchmarks/uc1-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc1/hazelcast/Uc1PipelineTest.java index d90f2b1edb448cf29b15db39695a418cbb3389ba..3cb30bf239d8035ce3813063ee6089ec7440c441 100644 --- a/theodolite-benchmarks/uc1-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc1/hazelcast/Uc1PipelineTest.java +++ b/theodolite-benchmarks/uc1-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc1/hazelcast/Uc1PipelineTest.java @@ -32,9 +32,6 @@ import rocks.theodolite.benchmarks.uc1.commons.DatabaseWriter; import rocks.theodolite.benchmarks.uc1.commons.logger.LogWriterFactory; import rocks.theodolite.benchmarks.uc1.hazelcastjet.Uc1PipelineFactory; -import static com.hazelcast.jet.pipeline.SinkBuilder.sinkBuilder; -import static com.hazelcast.logging.Logger.getLogger; - /** * Test methods for the Hazelcast Jet Implementation of UC1. */ @@ -88,16 +85,16 @@ public class Uc1PipelineTest extends JetTestSupport { // Create pipeline to test final Properties properties = new Properties(); - final Uc1PipelineFactory factory = new Uc1PipelineFactory(properties,""); - uc1Topology = factory.extendUc1Topology(testSource); + final Uc1PipelineFactory factory = new Uc1PipelineFactory(properties, ""); + this.uc1Topology = factory.extendUc1Topology(testSource); this.testPipeline = factory.getPipe(); // Create DatabaseWriter sink final DatabaseWriter<String> adapter = this.databaseAdapter.getDatabaseWriter(); final Sink<String> sink = sinkBuilder( "database-sink", x -> adapter) - .<String>receiveFn(DatabaseWriter::write) - .build(); + .<String>receiveFn(DatabaseWriter::write) + .build(); // Map Stage, can be used instead of sink // StreamStage<String> log = uc1Topology.map(s -> { diff --git a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/HistoryService.java b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/HistoryService.java index 7fbcc1cfe52eafcdeb1741b8f888ffce1e40909b..ce95d1a8cdd19b7c925c8b5b71e9ae534b085673 100644 --- a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/HistoryService.java +++ b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/HistoryService.java @@ -11,7 +11,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import rocks.theodolite.benchmarks.commons.hazelcastjet.ConfigurationKeys; import rocks.theodolite.benchmarks.commons.hazelcastjet.HazelcastJetService; -import rocks.theodolite.benchmarks.uc2.hazelcastjet.uc2specifics.StatsAccumulatorSerializer; /** diff --git a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/uc2specifics/StatsAccumulatorSerializer.java b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/StatsAccumulatorSerializer.java similarity index 94% rename from theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/uc2specifics/StatsAccumulatorSerializer.java rename to theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/StatsAccumulatorSerializer.java index 5c22b8dd6cc1a7af995a98b4388f40a1a3867ba5..8d4793dc03cf180ee6029a181842d0af07e37f91 100644 --- a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/uc2specifics/StatsAccumulatorSerializer.java +++ b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/StatsAccumulatorSerializer.java @@ -1,4 +1,4 @@ -package rocks.theodolite.benchmarks.uc2.hazelcastjet.uc2specifics; +package rocks.theodolite.benchmarks.uc2.hazelcastjet; import com.google.common.math.Stats; import com.google.common.math.StatsAccumulator; diff --git a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/uc2specifics/StatsAccumulatorSupplier.java b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/StatsAccumulatorSupplier.java similarity index 88% rename from theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/uc2specifics/StatsAccumulatorSupplier.java rename to theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/StatsAccumulatorSupplier.java index f4d203f03185cda712a5280634d8d3858c02f30d..457365799473f7d0740b7bbfcfbffd162308476e 100644 --- a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/uc2specifics/StatsAccumulatorSupplier.java +++ b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/StatsAccumulatorSupplier.java @@ -1,4 +1,4 @@ -package rocks.theodolite.benchmarks.uc2.hazelcastjet.uc2specifics; +package rocks.theodolite.benchmarks.uc2.hazelcastjet; import com.google.common.math.StatsAccumulator; import com.hazelcast.function.SupplierEx; diff --git a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/StatsAggregatorFactory.java b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/StatsAggregatorFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..dcfa561771b52e548e553c4673041303b17b3b55 --- /dev/null +++ b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/StatsAggregatorFactory.java @@ -0,0 +1,48 @@ +package rocks.theodolite.benchmarks.uc2.hazelcastjet; + +import com.google.common.math.Stats; +import com.google.common.math.StatsAccumulator; +import com.hazelcast.jet.aggregate.AggregateOperation; +import com.hazelcast.jet.aggregate.AggregateOperation1; +import java.util.Map.Entry; +import rocks.theodolite.benchmarks.commons.model.records.ActivePowerRecord; + + +/** + * Factory for creating an aggregation operator for {@link Stats} objects. + */ +public final class StatsAggregatorFactory { + + /** + * Defines an AggregateOperation1 for Hazelcast Jet which is used in the Pipeline of the Hazelcast + * Jet implementation of UC2. + * + * <p> + * Takes a windowed and keyed {@code Entry<String,ActivePowerRecord>} elements and returns a + * {@link Stats} object. + * </p> + * + * @return An AggregateOperation used by Hazelcast Jet in a streaming stage which aggregates + * ActivePowerRecord Objects into Stats Objects. + */ + public static AggregateOperation1<Entry<String, ActivePowerRecord>, StatsAccumulator, Stats> // NOCS + create() { + // Aggregate Operation to Create a Stats Object from Entry<String,ActivePowerRecord> items using + // the StatsAccumulator. + return AggregateOperation + // Creates the accumulator + .withCreate(new StatsAccumulatorSupplier()) + // Defines the accumulation + .<Entry<String, ActivePowerRecord>>andAccumulate((accumulator, item) -> { + accumulator.add(item.getValue().getValueInW()); + }) + // Defines the combination of spread out instances + .andCombine((left, right) -> { + final Stats rightStats = right.snapshot(); + left.addAll(rightStats); + + }) + // Finishes the aggregation + .andExportFinish(StatsAccumulator::snapshot); + } +} diff --git a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2PipelineFactory.java b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2PipelineFactory.java index 02e0ceb07e95bddb6c6fc1540f75a19642ee58a2..52096b86083b71f2e0cb80cd8cd9a1040ac8cb68 100644 --- a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2PipelineFactory.java +++ b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2PipelineFactory.java @@ -1,9 +1,5 @@ package rocks.theodolite.benchmarks.uc2.hazelcastjet; -import com.google.common.math.Stats; -import com.google.common.math.StatsAccumulator; -import com.hazelcast.jet.aggregate.AggregateOperation; -import com.hazelcast.jet.aggregate.AggregateOperation1; import com.hazelcast.jet.kafka.KafkaSinks; import com.hazelcast.jet.kafka.KafkaSources; import com.hazelcast.jet.pipeline.Pipeline; @@ -13,16 +9,13 @@ import com.hazelcast.jet.pipeline.StreamStage; import com.hazelcast.jet.pipeline.WindowDefinition; import java.time.Duration; import java.util.Map; -import java.util.Map.Entry; import java.util.Properties; import rocks.theodolite.benchmarks.commons.hazelcastjet.PipelineFactory; import rocks.theodolite.benchmarks.commons.model.records.ActivePowerRecord; -import rocks.theodolite.benchmarks.uc2.hazelcastjet.uc2specifics.StatsAccumulatorSupplier; /** - * PipelineFactory for use case 2. - * Allows to build and extend a pipeline. + * PipelineFactory for use case 2. Allows to build and extend a pipeline. */ public class Uc2PipelineFactory extends PipelineFactory { @@ -30,6 +23,7 @@ public class Uc2PipelineFactory extends PipelineFactory { /** * Factory for uc2 pipelines. + * * @param kafkaReadPropsForPipeline Properties Object containing the necessary kafka reads * attributes. * @param kafkaInputTopic The name of the input topic used for the pipeline. @@ -40,12 +34,12 @@ public class Uc2PipelineFactory extends PipelineFactory { * of this pipeline. */ protected Uc2PipelineFactory(final Properties kafkaReadPropsForPipeline, - final String kafkaInputTopic, - final Properties kafkaWritePropsForPipeline, - final String kafkaOutputTopic, - final Duration downsampleIntervalInMs) { + final String kafkaInputTopic, + final Properties kafkaWritePropsForPipeline, + final String kafkaOutputTopic, + final Duration downsampleIntervalInMs) { super(kafkaReadPropsForPipeline, kafkaInputTopic, - kafkaWritePropsForPipeline,kafkaOutputTopic); + kafkaWritePropsForPipeline, kafkaOutputTopic); this.downsampleInterval = downsampleIntervalInMs; } @@ -60,17 +54,18 @@ public class Uc2PipelineFactory extends PipelineFactory { // Define the Kafka Source final StreamSource<Map.Entry<String, ActivePowerRecord>> kafkaSource = - KafkaSources.<String, ActivePowerRecord>kafka(kafkaReadPropsForPipeline, kafkaInputTopic); + KafkaSources.<String, ActivePowerRecord>kafka(this.kafkaReadPropsForPipeline, + this.kafkaInputTopic); // Extend UC2 topology to the pipeline final StreamStage<Map.Entry<String, String>> uc2TopologyProduct = this.extendUc2Topology(kafkaSource); // Add Sink1: Logger - uc2TopologyProduct.writeTo(Sinks.logger()); + uc2TopologyProduct.writeTo(Sinks.logger()); // TODO align implementations // Add Sink2: Write back to kafka for the final benchmark uc2TopologyProduct.writeTo(KafkaSinks.<String, String>kafka( - kafkaWritePropsForPipeline, kafkaOutputTopic)); + this.kafkaWritePropsForPipeline, this.kafkaOutputTopic)); return this.pipe; } @@ -90,15 +85,15 @@ public class Uc2PipelineFactory extends PipelineFactory { * and value of the Entry object. It can be used to be further modified or directly be * written into a sink. */ - public StreamStage<Map.Entry<String, String>> - extendUc2Topology(final StreamSource<Map.Entry<String, ActivePowerRecord>> source) { + public StreamStage<Map.Entry<String, String>> extendUc2Topology( + final StreamSource<Map.Entry<String, ActivePowerRecord>> source) { // Build the pipeline topology. - return pipe.readFrom(source) + return this.pipe.readFrom(source) .withNativeTimestamps(0) .setLocalParallelism(1) .groupingKey(record -> record.getValue().getIdentifier()) - .window(WindowDefinition.tumbling(downsampleInterval.toMillis())) - .aggregate(this.uc2AggregateOperation()) + .window(WindowDefinition.tumbling(this.downsampleInterval.toMillis())) + .aggregate(StatsAggregatorFactory.create()) .map(agg -> { final String theKey = agg.key(); final String theValue = agg.getValue().toString(); @@ -106,37 +101,4 @@ public class Uc2PipelineFactory extends PipelineFactory { }); } - /** - * Defines an AggregateOperation1 for Hazelcast Jet which is used in the Pipeline of the Hazelcast - * Jet implementation of UC2. - * - * <p> - * Takes a windowed and keyed {@code Entry<String,ActivePowerRecord>} elements and returns a - * {@link Stats} object. - * </p> - * - * @return An AggregateOperation used by Hazelcast Jet in a streaming stage which aggregates - * ActivePowerRecord Objects into Stats Objects. - */ - public AggregateOperation1<Entry<String, ActivePowerRecord>, - StatsAccumulator, Stats> uc2AggregateOperation() { // NOCS - // Aggregate Operation to Create a Stats Object from Entry<String,ActivePowerRecord> items using - // the Statsaccumulator. - return AggregateOperation - // Creates the accumulator - .withCreate(new StatsAccumulatorSupplier()) - // Defines the accumulation - .<Entry<String, ActivePowerRecord>>andAccumulate((accumulator, item) -> { - accumulator.add(item.getValue().getValueInW()); - }) - // Defines the combination of spread out instances - .andCombine((left, right) -> { - final Stats rightStats = right.snapshot(); - left.addAll(rightStats); - - }) - // Finishes the aggregation - .andExportFinish( - StatsAccumulator::snapshot); - } } diff --git a/theodolite-benchmarks/uc2-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2PipelineTest.java b/theodolite-benchmarks/uc2-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2PipelineTest.java index 8461d9cebe9e9556744777d0626cfeca5152b13d..8b44bcd5f0451562254bac9a9a50c641702d7d52 100644 --- a/theodolite-benchmarks/uc2-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2PipelineTest.java +++ b/theodolite-benchmarks/uc2-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2PipelineTest.java @@ -5,14 +5,12 @@ import com.hazelcast.jet.JetInstance; import com.hazelcast.jet.config.JetConfig; import com.hazelcast.jet.core.JetTestSupport; import com.hazelcast.jet.pipeline.Pipeline; -import com.hazelcast.jet.pipeline.Sinks; import com.hazelcast.jet.pipeline.StreamSource; import com.hazelcast.jet.pipeline.StreamStage; import com.hazelcast.jet.pipeline.test.AssertionCompletedException; import com.hazelcast.jet.pipeline.test.Assertions; import com.hazelcast.jet.pipeline.test.TestSources; import com.hazelcast.jet.test.SerialTest; - import java.time.Duration; import java.util.Map; import java.util.Map.Entry; @@ -67,7 +65,7 @@ public class Uc2PipelineTest extends JetTestSupport { // Create pipeline to test final Properties properties = new Properties(); final Uc2PipelineFactory factory = new Uc2PipelineFactory( - properties,"",properties,"", testWindow); + properties, "", properties, "", testWindow); this.uc2Topology = factory.extendUc2Topology(testSource); this.testPipeline = factory.getPipe();