diff --git a/theodolite-benchmarks/buildSrc/src/main/groovy/theodolite.hazelcastjet.gradle b/theodolite-benchmarks/buildSrc/src/main/groovy/theodolite.hazelcastjet.gradle index 4d965ec84bf4a0b26e4b6fdf006e1ee192bad11d..9a8a7bed11659ea7bce419ee9a289e141fd0b7f5 100644 --- a/theodolite-benchmarks/buildSrc/src/main/groovy/theodolite.hazelcastjet.gradle +++ b/theodolite-benchmarks/buildSrc/src/main/groovy/theodolite.hazelcastjet.gradle @@ -27,6 +27,7 @@ dependencies { implementation 'io.confluent:kafka-avro-serializer:5.3.0' implementation 'org.slf4j:slf4j-api:1.7.25' implementation 'com.google.code.gson:gson:2.8.2' + implementation 'com.google.guava:guava:24.1-jre' implementation project(':hazelcastjet-commons') compile 'com.hazelcast.jet:hazelcast-jet-core:4.5:tests' compile 'com.hazelcast:hazelcast:4.2:tests' diff --git a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/theodolite/uc2/application/Uc2HazelcastJetFactory.java b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/theodolite/uc2/application/Uc2HazelcastJetFactory.java index e4955376c5c4ca7a4ebfd60d7a216170acdff71e..dd741a8d63e276c043bf3db883be34666c5740ae 100644 --- a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/theodolite/uc2/application/Uc2HazelcastJetFactory.java +++ b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/theodolite/uc2/application/Uc2HazelcastJetFactory.java @@ -1,5 +1,6 @@ package theodolite.uc2.application; +import com.google.common.math.StatsAccumulator; import com.hazelcast.jet.JetInstance; import com.hazelcast.jet.config.JobConfig; import com.hazelcast.jet.pipeline.Pipeline; @@ -8,6 +9,7 @@ import java.util.Properties; import org.slf4j.Logger; import theodolite.commons.hazelcastjet.ConfigurationKeys; import theodolite.commons.hazelcastjet.JetInstanceBuilder; +import theodolite.uc2.application.uc2specifics.StatsAccumulatorSerializer; /** * A Hazelcast Jet factory which can build a Hazelcast Jet Instance and Pipeline for the UC2 @@ -56,6 +58,7 @@ public class Uc2HazelcastJetFactory { // Adds the job name and joins a job to the JetInstance defined in this factory final JobConfig jobConfig = new JobConfig(); + jobConfig.registerSerializer(StatsAccumulator.class, StatsAccumulatorSerializer.class); jobConfig.setName(jobName); this.uc2JetInstance.newJobIfAbsent(this.uc2JetPipeline, jobConfig).join(); } diff --git a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/theodolite/uc2/application/Uc2PipelineBuilder.java b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/theodolite/uc2/application/Uc2PipelineBuilder.java index a1c867bf3cb7bee44d1439e6e24eac1d31e5c7e6..fae23d682100447802e8f8e6a4521f0a674fe3ad 100644 --- a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/theodolite/uc2/application/Uc2PipelineBuilder.java +++ b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/theodolite/uc2/application/Uc2PipelineBuilder.java @@ -1,6 +1,9 @@ package theodolite.uc2.application; -import com.hazelcast.jet.aggregate.AggregateOperations; +import com.google.common.math.Stats; +import com.google.common.math.StatsAccumulator; +import com.hazelcast.jet.aggregate.AggregateOperation; +import com.hazelcast.jet.aggregate.AggregateOperation1; import com.hazelcast.jet.kafka.KafkaSinks; import com.hazelcast.jet.kafka.KafkaSources; import com.hazelcast.jet.pipeline.Pipeline; @@ -8,7 +11,9 @@ import com.hazelcast.jet.pipeline.Sinks; import com.hazelcast.jet.pipeline.StreamStage; import com.hazelcast.jet.pipeline.WindowDefinition; import java.util.Map; +import java.util.Map.Entry; import java.util.Properties; +import theodolite.uc2.application.uc2specifics.StatsAccumulatorSupplier; import titan.ccp.model.records.ActivePowerRecord; /** @@ -36,6 +41,26 @@ public class Uc2PipelineBuilder { final String kafkaOutputTopic, final int downsampleInterval) { + // Aggregate Operation to Create a Stats Object from Entry<String,ActivePowerRecord> items using + // the Statsaccumulator. + final AggregateOperation1<Object, StatsAccumulator, Stats> aggrOp = AggregateOperation + .withCreate(new StatsAccumulatorSupplier()) + .andAccumulate((accumulator, item) -> { + + Entry<String, ActivePowerRecord> castedEntry = (Entry<String, ActivePowerRecord>) item; + accumulator.add(castedEntry.getValue().getValueInW()); + + }) + .andCombine((left, right) -> { + + Stats rightStats = right.snapshot(); + left.addAll(rightStats); + + }) + .andExportFinish((accumulator) -> { + return accumulator.snapshot(); + }); + final Pipeline pipe = Pipeline.create(); final StreamStage<Map.Entry<String, String>> mapProduct = pipe.readFrom(KafkaSources.<String, ActivePowerRecord>kafka( @@ -44,11 +69,10 @@ public class Uc2PipelineBuilder { .setLocalParallelism(1) .groupingKey(record -> record.getValue().getIdentifier()) .window(WindowDefinition.tumbling(downsampleInterval)) - .aggregate( - AggregateOperations.averagingDouble(record -> record.getValue().getValueInW())) + .aggregate(aggrOp) .map(agg -> { + String theKey = agg.key(); String theValue = agg.getValue().toString(); - String theKey = agg.getKey().toString(); return Map.entry(theKey, theValue); }); // Add Sink1: Logger diff --git a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/theodolite/uc2/application/uc2specifics/StatsAccumulatorSerializer.java b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/theodolite/uc2/application/uc2specifics/StatsAccumulatorSerializer.java new file mode 100644 index 0000000000000000000000000000000000000000..7257ef18388176f2fe7ee0d3525ce3cbd0a67944 --- /dev/null +++ b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/theodolite/uc2/application/uc2specifics/StatsAccumulatorSerializer.java @@ -0,0 +1,39 @@ +package theodolite.uc2.application.uc2specifics; + +import com.google.common.math.Stats; +import com.google.common.math.StatsAccumulator; +import com.hazelcast.nio.ObjectDataInput; +import com.hazelcast.nio.ObjectDataOutput; +import com.hazelcast.nio.serialization.StreamSerializer; +import java.io.IOException; + +/** + * A serializer and deserializer for the StatsAccumulator which is used in the UC2 implementation + * using Hazelcast Jet. + */ +public class StatsAccumulatorSerializer implements StreamSerializer<StatsAccumulator> { + + private static final int TYPE_ID = 69420; + + @Override + public int getTypeId() { + // TODO Auto-generated method stub + return TYPE_ID; + } + + @Override + public void write(final ObjectDataOutput out, final StatsAccumulator object) throws IOException { + final byte[] byteArray = object.snapshot().toByteArray(); + out.writeByteArray(byteArray); + } + + @Override + public StatsAccumulator read(final ObjectDataInput in) throws IOException { + final byte[] byteArray = in.readByteArray(); + final Stats deserializedStats = Stats.fromByteArray(byteArray); + final StatsAccumulator accumulator = new StatsAccumulator(); + accumulator.addAll(deserializedStats); + return accumulator; + } + +} diff --git a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/theodolite/uc2/application/uc2specifics/StatsAccumulatorSupplier.java b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/theodolite/uc2/application/uc2specifics/StatsAccumulatorSupplier.java new file mode 100644 index 0000000000000000000000000000000000000000..6a5c1a5169f150254e5eadfb1112a15c5533c467 --- /dev/null +++ b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/theodolite/uc2/application/uc2specifics/StatsAccumulatorSupplier.java @@ -0,0 +1,22 @@ +package theodolite.uc2.application.uc2specifics; + +import com.google.common.math.StatsAccumulator; +import com.hazelcast.function.SupplierEx; + +/** + * Supplies a StatsAccumulator. Is used in the aggregation operation of the Hazelcast Jet + * implementation for UC2. + */ +public class StatsAccumulatorSupplier implements SupplierEx<StatsAccumulator> { + + private static final long serialVersionUID = -656395626316842910L;//NOPMD + + /** + * Gets a StatsAccumulator. + */ + @Override + public StatsAccumulator getEx() throws Exception { + return new StatsAccumulator(); + } + +}