Skip to content
Snippets Groups Projects
Commit 4702e63d authored by Lorenz Boguhn's avatar Lorenz Boguhn
Browse files

Change hazelcast to output correct types

+ correct small spelling errors
parent 13ea8dd9
No related branches found
No related tags found
1 merge request!208Add benchmark implementations for Hazelcast Jet
Pipeline #6677 passed
...@@ -4,7 +4,8 @@ import com.hazelcast.function.BiFunctionEx; ...@@ -4,7 +4,8 @@ import com.hazelcast.function.BiFunctionEx;
import com.hazelcast.jet.Traverser; import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.Traversers; import com.hazelcast.jet.Traversers;
import com.hazelcast.jet.Util; import com.hazelcast.jet.Util;
import com.hazelcast.jet.aggregate.AggregateOperations; import com.hazelcast.jet.aggregate.AggregateOperation;
import com.hazelcast.jet.aggregate.AggregateOperation1;
import com.hazelcast.jet.kafka.KafkaSinks; import com.hazelcast.jet.kafka.KafkaSinks;
import com.hazelcast.jet.kafka.KafkaSources; import com.hazelcast.jet.kafka.KafkaSources;
import com.hazelcast.jet.pipeline.Pipeline; import com.hazelcast.jet.pipeline.Pipeline;
...@@ -24,11 +25,13 @@ import java.util.Properties; ...@@ -24,11 +25,13 @@ import java.util.Properties;
import java.util.Set; import java.util.Set;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import theodolite.uc4.application.uc4specifics.AggregatedActivePowerRecordAccumulator;
import theodolite.uc4.application.uc4specifics.ChildParentsTransformer; import theodolite.uc4.application.uc4specifics.ChildParentsTransformer;
import theodolite.uc4.application.uc4specifics.SensorGroupKey; import theodolite.uc4.application.uc4specifics.SensorGroupKey;
import theodolite.uc4.application.uc4specifics.ValueGroup; import theodolite.uc4.application.uc4specifics.ValueGroup;
import titan.ccp.configuration.events.Event; import titan.ccp.configuration.events.Event;
import titan.ccp.model.records.ActivePowerRecord; import titan.ccp.model.records.ActivePowerRecord;
import titan.ccp.model.records.AggregatedActivePowerRecord;
import titan.ccp.model.sensorregistry.SensorRegistry; import titan.ccp.model.sensorregistry.SensorRegistry;
/** /**
...@@ -76,33 +79,47 @@ public class Uc4PipelineBuilder { ...@@ -76,33 +79,47 @@ public class Uc4PipelineBuilder {
LOGGER.info("kafkaWriteProps: " + kafkaWritePropsForPipeline); LOGGER.info("kafkaWriteProps: " + kafkaWritePropsForPipeline);
} }
// The pipeline for this Use Case // The pipeline for this Use Case
final Pipeline uc4Pipeline = Pipeline.create(); final Pipeline uc4Pipeline = Pipeline.create();
// Sources for this use case // Sources for this use case
final StreamSource<Entry<Event, String>> configSource = KafkaSources.<Event, String>kafka( final StreamSource<Entry<Event, String>> configSource = KafkaSources.<Event, String>kafka(
kafkaConfigPropsForPipeline, kafkaConfigurationTopic); kafkaConfigPropsForPipeline, kafkaConfigurationTopic);
final StreamSource<Entry<String, ActivePowerRecord>> inputSource = final StreamSource<Entry<String, ActivePowerRecord>> inputSource =
KafkaSources.<String, ActivePowerRecord>kafka( KafkaSources.<String, ActivePowerRecord>kafka(
kafkaInputReadPropsForPipeline, kafkaInputTopic); kafkaInputReadPropsForPipeline, kafkaInputTopic);
final StreamSource<Entry<String, ActivePowerRecord>> aggregationSource =
KafkaSources.<String, ActivePowerRecord> final StreamSource<Entry<String, AggregatedActivePowerRecord>> aggregationSource =
KafkaSources.<String, AggregatedActivePowerRecord>
kafka(kafkaFeedbackPropsForPipeline, kafkaFeedbackTopic); kafka(kafkaFeedbackPropsForPipeline, kafkaFeedbackTopic);
// Extend UC4 topology to pipeline // Extend UC4 topology to pipeline
final StreamStage<Entry<String, ActivePowerRecord>> uc4Product = final StreamStage<Entry<String, AggregatedActivePowerRecord>> uc4Aggregation =
this.extendUc4Topology(uc4Pipeline, inputSource, aggregationSource, configSource, this.extendUc4Topology(uc4Pipeline, inputSource, aggregationSource, configSource,
windowSize); windowSize);
// Add Sink1: Write back to kafka output topic
uc4Product.writeTo(KafkaSinks.kafka(
kafkaWritePropsForPipeline, kafkaOutputTopic));
// Add Sink2: Write back to kafka feedback/aggregation topic // Add Sink2: Write back to kafka feedback/aggregation topic
uc4Product.writeTo(KafkaSinks.kafka( uc4Aggregation.writeTo(KafkaSinks.kafka(
kafkaWritePropsForPipeline, kafkaFeedbackTopic)); kafkaWritePropsForPipeline, kafkaFeedbackTopic));
// Add Sink3: Logger
// Log aggregation product
uc4Aggregation.writeTo(Sinks.logger());
// Map Aggregated to ActivePowerRecord
final StreamStage<Entry<String, ActivePowerRecord>> uc4Product = uc4Aggregation
.map(entry -> {
final AggregatedActivePowerRecord agg = entry.getValue();
final ActivePowerRecord record = new ActivePowerRecord(
agg.getIdentifier(), agg.getTimestamp(), agg.getSumInW());
return Util.entry(entry.getKey(), record);
});
// Add Sink2: Write back to kafka output topic
uc4Product.writeTo(KafkaSinks.kafka(
kafkaWritePropsForPipeline, kafkaOutputTopic));
// Logger uc4 product
uc4Product.writeTo(Sinks.logger()); uc4Product.writeTo(Sinks.logger());
// Return the pipeline // Return the pipeline
...@@ -114,7 +131,7 @@ public class Uc4PipelineBuilder { ...@@ -114,7 +131,7 @@ public class Uc4PipelineBuilder {
* *
* <p> * <p>
* UC4 takes {@code ActivePowerRecord} events from sensors and a {@code SensorRegistry} with maps * UC4 takes {@code ActivePowerRecord} events from sensors and a {@code SensorRegistry} with maps
* from keys to groups to map values to their accourding groups. A feedback stream allows for * from keys to groups to map values to their according groups. A feedback stream allows for
* group keys to be mapped to values and eventually to be mapped to other top level groups defines * group keys to be mapped to values and eventually to be mapped to other top level groups defines
* by the {@code SensorRegistry}. * by the {@code SensorRegistry}.
* </p> * </p>
...@@ -125,7 +142,7 @@ public class Uc4PipelineBuilder { ...@@ -125,7 +142,7 @@ public class Uc4PipelineBuilder {
* (2) Merge Input Values and Aggregations <br> * (2) Merge Input Values and Aggregations <br>
* (3) Join Configuration with Merged Input Stream <br> * (3) Join Configuration with Merged Input Stream <br>
* (4) Duplicate as flatmap per value and group <br> * (4) Duplicate as flatmap per value and group <br>
* (5) Window (preperation for possible last values) <br> * (5) Window (preparation for possible last values) <br>
* (6) Aggregate data over the window * (6) Aggregate data over the window
* </p> * </p>
* *
...@@ -138,10 +155,10 @@ public class Uc4PipelineBuilder { ...@@ -138,10 +155,10 @@ public class Uc4PipelineBuilder {
* according aggregated values. The data can be further modified or directly be linked to * according aggregated values. The data can be further modified or directly be linked to
* a Hazelcast Jet sink. * a Hazelcast Jet sink.
*/ */
public StreamStage<Entry<String, ActivePowerRecord>> extendUc4Topology(// NOPMD public StreamStage<Entry<String, AggregatedActivePowerRecord>> extendUc4Topology(// NOPMD
final Pipeline pipe, final Pipeline pipe,
final StreamSource<Entry<String, ActivePowerRecord>> inputSource, final StreamSource<Entry<String, ActivePowerRecord>> inputSource,
final StreamSource<Entry<String, ActivePowerRecord>> aggregationSource, final StreamSource<Entry<String, AggregatedActivePowerRecord>> aggregationSource,
final StreamSource<Entry<Event, String>> configurationSource, final int windowSize) { final StreamSource<Entry<Event, String>> configurationSource, final int windowSize) {
////////////////////////////////// //////////////////////////////////
...@@ -167,7 +184,13 @@ public class Uc4PipelineBuilder { ...@@ -167,7 +184,13 @@ public class Uc4PipelineBuilder {
// (1) Aggregation Stream // (1) Aggregation Stream
final StreamStage<Entry<String, ActivePowerRecord>> aggregations = pipe final StreamStage<Entry<String, ActivePowerRecord>> aggregations = pipe
.readFrom(aggregationSource) .readFrom(aggregationSource)
.withNativeTimestamps(0); .withNativeTimestamps(0)
.map(entry -> { // Map Aggregated to ActivePowerRecord
final AggregatedActivePowerRecord agg = entry.getValue();
final ActivePowerRecord record = new ActivePowerRecord(
agg.getIdentifier(), agg.getTimestamp(), agg.getSumInW());
return Util.entry(entry.getKey(), record);
});
////////////////////////////////// //////////////////////////////////
// (2) UC4 Merge Input with aggregation stream // (2) UC4 Merge Input with aggregation stream
...@@ -230,24 +253,33 @@ public class Uc4PipelineBuilder { ...@@ -230,24 +253,33 @@ public class Uc4PipelineBuilder {
windowedLastValues = dupliAsFlatmappedStage windowedLastValues = dupliAsFlatmappedStage
.window(WindowDefinition.tumbling(windowSize)); .window(WindowDefinition.tumbling(windowSize));
////////////////////////////////// final AggregateOperation1<Entry<SensorGroupKey, ActivePowerRecord>,
// (6) UC4 GroupBy and aggregate and map AggregatedActivePowerRecordAccumulator, AggregatedActivePowerRecord> aggrOp =
// Group using the group out of the sensorGroupKey keys AggregateOperation
.withCreate(AggregatedActivePowerRecordAccumulator::new)
.<Entry<SensorGroupKey, ActivePowerRecord>>andAccumulate((acc, rec) -> {
acc.setId(rec.getKey().getGroup());
acc.addInputs(rec.getValue());
})
// .andCombine((acc, rec)-> new AggregatedActivePowerRecordAccumulator()) // NOCS
// .andDeduct((acc, rec) -> new AggregatedActivePowerRecordAccumulator()) // NOCS
.andExportFinish(acc ->
new AggregatedActivePowerRecord(acc.getId(),
acc.getTimestamp(),
acc.getCount(),
acc.getSumInW(),
acc.getAverageInW())
);
// write aggregation back to kafka
return windowedLastValues return windowedLastValues
.groupingKey(entry -> entry.getKey().getGroup()) .groupingKey(entry -> entry.getKey().getGroup())
.aggregate(AggregateOperations.summingDouble(entry -> entry.getValue().getValueInW())) .aggregate(aggrOp).map(agg -> Util.entry(agg.getKey(), agg.getValue()));
.map(agg -> {
// Construct data for return pair
final String theGroup = agg.getKey();
final Double summedValueInW = agg.getValue();
// Return aggregates group value pair
return Util.entry(
theGroup,
new ActivePowerRecord(theGroup, System.currentTimeMillis(), summedValueInW));
});
} }
/** /**
* FlatMap function used to process the configuration input for UC4. * FlatMap function used to process the configuration input for UC4.
*/ */
......
package theodolite.uc4.application.uc4specifics;
import titan.ccp.model.records.ActivePowerRecord;
/**
* Accumulator class for AggregatedActivePowerRecords.
*/
public class AggregatedActivePowerRecordAccumulator {
private String id;
private long timestamp;
private long count;
private double sumInW;
private double averageInW;
/**
* Default constructor.
*/
public AggregatedActivePowerRecordAccumulator() {
// This constructor is intentionally empty. Nothing special is needed here.
}
/**
* Creates an AggregationObject.
*/
public AggregatedActivePowerRecordAccumulator(final String id,
final long timestamp,
final long count,
final double sumInW,
final double averageInW) {
this.id = id;
this.timestamp = timestamp;
this.count = count;
this.sumInW = sumInW;
this.averageInW = averageInW;
}
/**
* Sets the id.
*/
public void setId(final String id) {
this.id = id;
}
/**
* Adds the record to the aggregation.
*/
public void addInputs(final ActivePowerRecord record) {
this.count += 1;
this.sumInW += record.getValueInW();
this.timestamp = record.getTimestamp();
this.averageInW = sumInW / count;
}
public long getCount() {
return count;
}
public double getSumInW() {
return sumInW;
}
public double getAverageInW() {
return averageInW;
}
public String getId() {
return id;
}
public long getTimestamp() {
return timestamp;
}
}
...@@ -27,6 +27,7 @@ import theodolite.uc4.application.uc4specifics.ValueGroup; ...@@ -27,6 +27,7 @@ import theodolite.uc4.application.uc4specifics.ValueGroup;
import theodolite.uc4.application.uc4specifics.ValueGroupSerializer; import theodolite.uc4.application.uc4specifics.ValueGroupSerializer;
import titan.ccp.configuration.events.Event; import titan.ccp.configuration.events.Event;
import titan.ccp.model.records.ActivePowerRecord; import titan.ccp.model.records.ActivePowerRecord;
import titan.ccp.model.records.AggregatedActivePowerRecord;
import titan.ccp.model.sensorregistry.ImmutableSensorRegistry; import titan.ccp.model.sensorregistry.ImmutableSensorRegistry;
import titan.ccp.model.sensorregistry.MachineSensor; import titan.ccp.model.sensorregistry.MachineSensor;
import titan.ccp.model.sensorregistry.MutableAggregatedSensor; import titan.ccp.model.sensorregistry.MutableAggregatedSensor;
...@@ -38,7 +39,7 @@ public class Uc4PipelineTest extends JetTestSupport { ...@@ -38,7 +39,7 @@ public class Uc4PipelineTest extends JetTestSupport {
// TEst Machinery // TEst Machinery
JetInstance testInstance = null; JetInstance testInstance = null;
Pipeline testPipeline = null; Pipeline testPipeline = null;
StreamStage<Entry<String, ActivePowerRecord>> uc4Topology = null; StreamStage<Entry<String, AggregatedActivePowerRecord>> uc4Topology = null;
@Before @Before
public void buildUc4Pipeline() { public void buildUc4Pipeline() {
...@@ -67,12 +68,22 @@ public class Uc4PipelineTest extends JetTestSupport { ...@@ -67,12 +68,22 @@ public class Uc4PipelineTest extends JetTestSupport {
return testEntry; return testEntry;
}); });
final AggregatedActivePowerRecord.Builder aggregationBuilder = AggregatedActivePowerRecord.newBuilder();
// Create test source 2 : Mock aggregation Values // Create test source 2 : Mock aggregation Values
final StreamSource<Entry<String, ActivePowerRecord>> testAggregationSource = final StreamSource<Entry<String, AggregatedActivePowerRecord>> testAggregationSource =
TestSources.itemStream(testItemsPerSecond, (timestamp, item) -> { TestSources.itemStream(testItemsPerSecond, (timestamp, item) -> {
AggregatedActivePowerRecord test =
new AggregatedActivePowerRecord(testSensorName,
System.currentTimeMillis(),
1L,
testValueInW,
testValueInW);
final ActivePowerRecord testAggValue = new ActivePowerRecord(testSensorName,System.currentTimeMillis(),testValueInW); final ActivePowerRecord testAggValue = new ActivePowerRecord(testSensorName,System.currentTimeMillis(),testValueInW);
final Entry<String, ActivePowerRecord> testEntry = final Entry<String, AggregatedActivePowerRecord> testEntry =
Map.entry(testLevel1GroupName, testAggValue); Map.entry(testLevel1GroupName, test);
return testEntry; return testEntry;
}); });
...@@ -114,7 +125,7 @@ public class Uc4PipelineTest extends JetTestSupport { ...@@ -114,7 +125,7 @@ public class Uc4PipelineTest extends JetTestSupport {
@Test @Test
public void testOutput() { public void testOutput() {
System.out.println("DEBUG DEBUG DEBUG || ENTERED TEST 1"); // System.out.println("DEBUG DEBUG DEBUG || ENTERED TEST 1");
// Assertion Configuration // Assertion Configuration
int timeout = 10; int timeout = 10;
...@@ -127,7 +138,7 @@ public class Uc4PipelineTest extends JetTestSupport { ...@@ -127,7 +138,7 @@ public class Uc4PipelineTest extends JetTestSupport {
this.uc4Topology.apply(Assertions.assertCollectedEventually(timeout, this.uc4Topology.apply(Assertions.assertCollectedEventually(timeout,
collection -> { collection -> {
System.out.println("DEBUG DEBUG DEBUG || ENTERED ASSERTION COLLECTED EVENTUALLY"); System.out.println("DEBUG DEBUG DEBUG || ENTERED ASSERTION COLLECTED EVENTUALLY");
Thread.sleep(2000); Thread.sleep(20_000);
boolean allOkay = true; boolean allOkay = true;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment