Skip to content
Snippets Groups Projects
Commit 824fd27f authored by MaxEmerold's avatar MaxEmerold Committed by Sören Henning
Browse files

Change UC2 Hazelcast Jet Implementation

- Changed Pipeline Structure
- Aggregation Step now produces Stats Objects
- Final Output is String representation of Keys and Stats Objects
- Added Supplier and Serializer for Statsaccumulator (used in Pipeline)
- Fixed Checkstyle/PMD
parent 1ab16534
No related branches found
No related tags found
1 merge request!208Add benchmark implementations for Hazelcast Jet
Pipeline #5483 failed
...@@ -27,6 +27,7 @@ dependencies { ...@@ -27,6 +27,7 @@ dependencies {
implementation 'io.confluent:kafka-avro-serializer:5.3.0' implementation 'io.confluent:kafka-avro-serializer:5.3.0'
implementation 'org.slf4j:slf4j-api:1.7.25' implementation 'org.slf4j:slf4j-api:1.7.25'
implementation 'com.google.code.gson:gson:2.8.2' implementation 'com.google.code.gson:gson:2.8.2'
implementation 'com.google.guava:guava:24.1-jre'
implementation project(':hazelcastjet-commons') implementation project(':hazelcastjet-commons')
compile 'com.hazelcast.jet:hazelcast-jet-core:4.5:tests' compile 'com.hazelcast.jet:hazelcast-jet-core:4.5:tests'
compile 'com.hazelcast:hazelcast:4.2:tests' compile 'com.hazelcast:hazelcast:4.2:tests'
......
package theodolite.uc2.application; package theodolite.uc2.application;
import com.google.common.math.StatsAccumulator;
import com.hazelcast.jet.JetInstance; import com.hazelcast.jet.JetInstance;
import com.hazelcast.jet.config.JobConfig; import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.pipeline.Pipeline; import com.hazelcast.jet.pipeline.Pipeline;
...@@ -8,6 +9,7 @@ import java.util.Properties; ...@@ -8,6 +9,7 @@ import java.util.Properties;
import org.slf4j.Logger; import org.slf4j.Logger;
import theodolite.commons.hazelcastjet.ConfigurationKeys; import theodolite.commons.hazelcastjet.ConfigurationKeys;
import theodolite.commons.hazelcastjet.JetInstanceBuilder; import theodolite.commons.hazelcastjet.JetInstanceBuilder;
import theodolite.uc2.application.uc2specifics.StatsAccumulatorSerializer;
/** /**
* A Hazelcast Jet factory which can build a Hazelcast Jet Instance and Pipeline for the UC2 * A Hazelcast Jet factory which can build a Hazelcast Jet Instance and Pipeline for the UC2
...@@ -56,6 +58,7 @@ public class Uc2HazelcastJetFactory { ...@@ -56,6 +58,7 @@ public class Uc2HazelcastJetFactory {
// Adds the job name and joins a job to the JetInstance defined in this factory // Adds the job name and joins a job to the JetInstance defined in this factory
final JobConfig jobConfig = new JobConfig(); final JobConfig jobConfig = new JobConfig();
jobConfig.registerSerializer(StatsAccumulator.class, StatsAccumulatorSerializer.class);
jobConfig.setName(jobName); jobConfig.setName(jobName);
this.uc2JetInstance.newJobIfAbsent(this.uc2JetPipeline, jobConfig).join(); this.uc2JetInstance.newJobIfAbsent(this.uc2JetPipeline, jobConfig).join();
} }
......
package theodolite.uc2.application; package theodolite.uc2.application;
import com.hazelcast.jet.aggregate.AggregateOperations; import com.google.common.math.Stats;
import com.google.common.math.StatsAccumulator;
import com.hazelcast.jet.aggregate.AggregateOperation;
import com.hazelcast.jet.aggregate.AggregateOperation1;
import com.hazelcast.jet.kafka.KafkaSinks; import com.hazelcast.jet.kafka.KafkaSinks;
import com.hazelcast.jet.kafka.KafkaSources; import com.hazelcast.jet.kafka.KafkaSources;
import com.hazelcast.jet.pipeline.Pipeline; import com.hazelcast.jet.pipeline.Pipeline;
...@@ -8,7 +11,9 @@ import com.hazelcast.jet.pipeline.Sinks; ...@@ -8,7 +11,9 @@ import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.StreamStage; import com.hazelcast.jet.pipeline.StreamStage;
import com.hazelcast.jet.pipeline.WindowDefinition; import com.hazelcast.jet.pipeline.WindowDefinition;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties; import java.util.Properties;
import theodolite.uc2.application.uc2specifics.StatsAccumulatorSupplier;
import titan.ccp.model.records.ActivePowerRecord; import titan.ccp.model.records.ActivePowerRecord;
/** /**
...@@ -36,6 +41,26 @@ public class Uc2PipelineBuilder { ...@@ -36,6 +41,26 @@ public class Uc2PipelineBuilder {
final String kafkaOutputTopic, final String kafkaOutputTopic,
final int downsampleInterval) { final int downsampleInterval) {
// Aggregate Operation to Create a Stats Object from Entry<String,ActivePowerRecord> items using
// the Statsaccumulator.
final AggregateOperation1<Object, StatsAccumulator, Stats> aggrOp = AggregateOperation
.withCreate(new StatsAccumulatorSupplier())
.andAccumulate((accumulator, item) -> {
Entry<String, ActivePowerRecord> castedEntry = (Entry<String, ActivePowerRecord>) item;
accumulator.add(castedEntry.getValue().getValueInW());
})
.andCombine((left, right) -> {
Stats rightStats = right.snapshot();
left.addAll(rightStats);
})
.andExportFinish((accumulator) -> {
return accumulator.snapshot();
});
final Pipeline pipe = Pipeline.create(); final Pipeline pipe = Pipeline.create();
final StreamStage<Map.Entry<String, String>> mapProduct = final StreamStage<Map.Entry<String, String>> mapProduct =
pipe.readFrom(KafkaSources.<String, ActivePowerRecord>kafka( pipe.readFrom(KafkaSources.<String, ActivePowerRecord>kafka(
...@@ -44,11 +69,10 @@ public class Uc2PipelineBuilder { ...@@ -44,11 +69,10 @@ public class Uc2PipelineBuilder {
.setLocalParallelism(1) .setLocalParallelism(1)
.groupingKey(record -> record.getValue().getIdentifier()) .groupingKey(record -> record.getValue().getIdentifier())
.window(WindowDefinition.tumbling(downsampleInterval)) .window(WindowDefinition.tumbling(downsampleInterval))
.aggregate( .aggregate(aggrOp)
AggregateOperations.averagingDouble(record -> record.getValue().getValueInW()))
.map(agg -> { .map(agg -> {
String theKey = agg.key();
String theValue = agg.getValue().toString(); String theValue = agg.getValue().toString();
String theKey = agg.getKey().toString();
return Map.entry(theKey, theValue); return Map.entry(theKey, theValue);
}); });
// Add Sink1: Logger // Add Sink1: Logger
......
package theodolite.uc2.application.uc2specifics;
import com.google.common.math.Stats;
import com.google.common.math.StatsAccumulator;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.nio.serialization.StreamSerializer;
import java.io.IOException;
/**
* A serializer and deserializer for the StatsAccumulator which is used in the UC2 implementation
* using Hazelcast Jet.
*/
public class StatsAccumulatorSerializer implements StreamSerializer<StatsAccumulator> {
private static final int TYPE_ID = 69420;
@Override
public int getTypeId() {
// TODO Auto-generated method stub
return TYPE_ID;
}
@Override
public void write(final ObjectDataOutput out, final StatsAccumulator object) throws IOException {
final byte[] byteArray = object.snapshot().toByteArray();
out.writeByteArray(byteArray);
}
@Override
public StatsAccumulator read(final ObjectDataInput in) throws IOException {
final byte[] byteArray = in.readByteArray();
final Stats deserializedStats = Stats.fromByteArray(byteArray);
final StatsAccumulator accumulator = new StatsAccumulator();
accumulator.addAll(deserializedStats);
return accumulator;
}
}
package theodolite.uc2.application.uc2specifics;
import com.google.common.math.StatsAccumulator;
import com.hazelcast.function.SupplierEx;
/**
* Supplies a StatsAccumulator. Is used in the aggregation operation of the Hazelcast Jet
* implementation for UC2.
*/
public class StatsAccumulatorSupplier implements SupplierEx<StatsAccumulator> {
private static final long serialVersionUID = -656395626316842910L;//NOPMD
/**
* Gets a StatsAccumulator.
*/
@Override
public StatsAccumulator getEx() throws Exception {
return new StatsAccumulator();
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment