Skip to content
Snippets Groups Projects
Commit 71cad78f authored by Sören Henning's avatar Sören Henning
Browse files

Set Kafka parallelism in Hazelcast Jet

parent d8159dac
Branches
Tags
No related merge requests found
Pipeline #10136 failed
...@@ -23,6 +23,7 @@ public class Uc1PipelineFactory extends PipelineFactory { ...@@ -23,6 +23,7 @@ public class Uc1PipelineFactory extends PipelineFactory {
/** /**
* Creates a new Uc1PipelineFactory. * Creates a new Uc1PipelineFactory.
*
* @param kafkaReadPropsForPipeline Properties object containing the necessary Kafka attributes. * @param kafkaReadPropsForPipeline Properties object containing the necessary Kafka attributes.
* @param kafkaInputTopic The name of the input topic used for the pipeline. * @param kafkaInputTopic The name of the input topic used for the pipeline.
*/ */
...@@ -41,7 +42,8 @@ public class Uc1PipelineFactory extends PipelineFactory { ...@@ -41,7 +42,8 @@ public class Uc1PipelineFactory extends PipelineFactory {
// Define the Kafka Source // Define the Kafka Source
final StreamSource<Map.Entry<String, ActivePowerRecord>> kafkaSource = final StreamSource<Map.Entry<String, ActivePowerRecord>> kafkaSource =
KafkaSources.<String, ActivePowerRecord>kafka(kafkaReadPropsForPipeline, kafkaInputTopic); KafkaSources.<String, ActivePowerRecord>kafka(this.kafkaReadPropsForPipeline,
this.kafkaInputTopic);
// Extend UC1 topology to the pipeline // Extend UC1 topology to the pipeline
final StreamStage<String> uc1TopologyProduct = this.extendUc1Topology(kafkaSource); final StreamStage<String> uc1TopologyProduct = this.extendUc1Topology(kafkaSource);
...@@ -57,7 +59,7 @@ public class Uc1PipelineFactory extends PipelineFactory { ...@@ -57,7 +59,7 @@ public class Uc1PipelineFactory extends PipelineFactory {
uc1TopologyProduct.writeTo(sink); uc1TopologyProduct.writeTo(sink);
return pipe; return this.pipe;
} }
/** /**
...@@ -72,13 +74,13 @@ public class Uc1PipelineFactory extends PipelineFactory { ...@@ -72,13 +74,13 @@ public class Uc1PipelineFactory extends PipelineFactory {
* @return A {@code StreamStage<String>} with the above definition of the String. It can be used * @return A {@code StreamStage<String>} with the above definition of the String. It can be used
* to be further modified or directly be written into a sink. * to be further modified or directly be written into a sink.
*/ */
public StreamStage<String> public StreamStage<String> extendUc1Topology(
extendUc1Topology(final StreamSource<Map.Entry<String, ActivePowerRecord>> source) { final StreamSource<Map.Entry<String, ActivePowerRecord>> source) {
// Build the pipeline topology // Build the pipeline topology
return pipe.readFrom(source) return this.pipe.readFrom(source)
.withNativeTimestamps(0) .withNativeTimestamps(0)
.setLocalParallelism(1) // .setLocalParallelism(1)
.setName("Convert content") .setName("Convert content")
.map(Map.Entry::getValue) .map(Map.Entry::getValue)
.map(this.databaseAdapter.getRecordConverter()::convert); .map(this.databaseAdapter.getRecordConverter()::convert);
......
...@@ -90,7 +90,7 @@ public class Uc2PipelineFactory extends PipelineFactory { ...@@ -90,7 +90,7 @@ public class Uc2PipelineFactory extends PipelineFactory {
// Build the pipeline topology. // Build the pipeline topology.
return this.pipe.readFrom(source) return this.pipe.readFrom(source)
.withNativeTimestamps(0) .withNativeTimestamps(0)
.setLocalParallelism(1) // .setLocalParallelism(1)
.groupingKey(record -> record.getValue().getIdentifier()) .groupingKey(record -> record.getValue().getIdentifier())
.window(WindowDefinition.tumbling(this.downsampleInterval.toMillis())) .window(WindowDefinition.tumbling(this.downsampleInterval.toMillis()))
.aggregate(StatsAggregatorFactory.create()) .aggregate(StatsAggregatorFactory.create())
......
...@@ -106,7 +106,7 @@ public class Uc3PipelineFactory extends PipelineFactory { ...@@ -106,7 +106,7 @@ public class Uc3PipelineFactory extends PipelineFactory {
.readFrom(source) .readFrom(source)
// use Timestamps // use Timestamps
.withNativeTimestamps(0) .withNativeTimestamps(0)
.setLocalParallelism(1) // .setLocalParallelism(1)
// Map timestamp to hour of day and create new key using sensorID and // Map timestamp to hour of day and create new key using sensorID and
// datetime mapped to HourOfDay // datetime mapped to HourOfDay
.map(record -> { .map(record -> {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment