Skip to content
Snippets Groups Projects
Commit 81fd7eac authored by Lorenz Boguhn's avatar Lorenz Boguhn
Browse files

Change output + Refactoring + Set id

new output: AggregatedPowerRecords
refactoring: remove inferable types
set id in combine function of aggregator
parent 16bf2a81
No related branches found
No related tags found
1 merge request!208Add benchmark implementations for Hazelcast Jet
......@@ -83,16 +83,14 @@ public class Uc4PipelineBuilder {
final Pipeline uc4Pipeline = Pipeline.create();
// Sources for this use case
final StreamSource<Entry<Event, String>> configSource = KafkaSources.<Event, String>kafka(
kafkaConfigPropsForPipeline, kafkaConfigurationTopic);
final StreamSource<Entry<Event, String>> configSource =
KafkaSources.kafka(kafkaConfigPropsForPipeline, kafkaConfigurationTopic);
final StreamSource<Entry<String, ActivePowerRecord>> inputSource =
KafkaSources.<String, ActivePowerRecord>kafka(
kafkaInputReadPropsForPipeline, kafkaInputTopic);
KafkaSources.kafka(kafkaInputReadPropsForPipeline, kafkaInputTopic);
final StreamSource<Entry<String, AggregatedActivePowerRecord>> aggregationSource =
KafkaSources.<String, AggregatedActivePowerRecord>
kafka(kafkaFeedbackPropsForPipeline, kafkaFeedbackTopic);
KafkaSources.kafka(kafkaFeedbackPropsForPipeline, kafkaFeedbackTopic);
// Extend UC4 topology to pipeline
final StreamStage<Entry<String, AggregatedActivePowerRecord>> uc4Aggregation =
......@@ -106,22 +104,10 @@ public class Uc4PipelineBuilder {
// Log aggregation product
uc4Aggregation.writeTo(Sinks.logger());
// Map Aggregated to ActivePowerRecord
final StreamStage<Entry<String, ActivePowerRecord>> uc4Product = uc4Aggregation
.map(entry -> {
final AggregatedActivePowerRecord agg = entry.getValue();
final ActivePowerRecord record = new ActivePowerRecord(
agg.getIdentifier(), agg.getTimestamp(), agg.getSumInW());
return Util.entry(entry.getKey(), record);
});
// Add Sink2: Write back to kafka output topic
uc4Product.writeTo(KafkaSinks.kafka(
uc4Aggregation.writeTo(KafkaSinks.kafka(
kafkaWritePropsForPipeline, kafkaOutputTopic));
// Logger uc4 product
uc4Product.writeTo(Sinks.logger());
// Return the pipeline
return uc4Pipeline;
}
......@@ -261,8 +247,9 @@ public class Uc4PipelineBuilder {
acc.setId(rec.getKey().getGroup());
acc.addInputs(rec.getValue());
})
// .andCombine((acc, rec)-> new AggregatedActivePowerRecordAccumulator()) // NOCS
// .andDeduct((acc, rec) -> new AggregatedActivePowerRecordAccumulator()) // NOCS
.andCombine((acc, acc2) ->
acc.addInputs(acc2.getId(), acc2.getSumInW(), acc2.getCount(), acc.getTimestamp()))
.andDeduct((acc, acc2) -> acc.removeInputs(acc2.getSumInW(), acc2.getCount()))
.andExportFinish(acc ->
new AggregatedActivePowerRecord(acc.getId(),
acc.getTimestamp(),
......@@ -271,8 +258,8 @@ public class Uc4PipelineBuilder {
acc.getAverageInW())
);
// write aggregation back to kafka
return windowedLastValues
.groupingKey(entry -> entry.getKey().getGroup())
.aggregate(aggrOp).map(agg -> Util.entry(agg.getKey(), agg.getValue()));
......
......@@ -56,7 +56,8 @@ public class AggregatedActivePowerRecordAccumulator {
/**
* Adds the records from another aggregator.
*/
public void addInputs(final double sumInW, final long count, final long timestamp) {
public void addInputs(final String id, final double sumInW, final long count, final long timestamp) {
this.id = this.id == null ? id : this.id;
this.sumInW += sumInW;
this.count += count;
this.timestamp = Math.max(this.timestamp, timestamp);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment