From 71cad78f071d58bc138d6f818c4e6fbf03a3c096 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6ren=20Henning?= <soeren.henning@email.uni-kiel.de> Date: Fri, 25 Nov 2022 16:56:10 +0100 Subject: [PATCH] Set Kafka parallelism in Hazelcast Jet --- .../uc1/hazelcastjet/Uc1PipelineFactory.java | 18 ++++++++++-------- .../uc2/hazelcastjet/Uc2PipelineFactory.java | 2 +- .../uc3/hazelcastjet/Uc3PipelineFactory.java | 2 +- 3 files changed, 12 insertions(+), 10 deletions(-) diff --git a/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc1/hazelcastjet/Uc1PipelineFactory.java b/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc1/hazelcastjet/Uc1PipelineFactory.java index 87a528a93..c987e3ada 100644 --- a/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc1/hazelcastjet/Uc1PipelineFactory.java +++ b/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc1/hazelcastjet/Uc1PipelineFactory.java @@ -23,12 +23,13 @@ public class Uc1PipelineFactory extends PipelineFactory { /** * Creates a new Uc1PipelineFactory. + * * @param kafkaReadPropsForPipeline Properties object containing the necessary Kafka attributes. * @param kafkaInputTopic The name of the input topic used for the pipeline. */ public Uc1PipelineFactory(final Properties kafkaReadPropsForPipeline, - final String kafkaInputTopic) { - super(kafkaReadPropsForPipeline,kafkaInputTopic); + final String kafkaInputTopic) { + super(kafkaReadPropsForPipeline, kafkaInputTopic); } /** @@ -41,7 +42,8 @@ public class Uc1PipelineFactory extends PipelineFactory { // Define the Kafka Source final StreamSource<Map.Entry<String, ActivePowerRecord>> kafkaSource = - KafkaSources.<String, ActivePowerRecord>kafka(kafkaReadPropsForPipeline, kafkaInputTopic); + KafkaSources.<String, ActivePowerRecord>kafka(this.kafkaReadPropsForPipeline, + this.kafkaInputTopic); // Extend UC1 topology to the pipeline final StreamStage<String> uc1TopologyProduct = this.extendUc1Topology(kafkaSource); @@ -57,7 +59,7 @@ public class Uc1PipelineFactory extends PipelineFactory { uc1TopologyProduct.writeTo(sink); - return pipe; + return this.pipe; } /** @@ -72,13 +74,13 @@ public class Uc1PipelineFactory extends PipelineFactory { * @return A {@code StreamStage<String>} with the above definition of the String. It can be used * to be further modified or directly be written into a sink. */ - public StreamStage<String> - extendUc1Topology(final StreamSource<Map.Entry<String, ActivePowerRecord>> source) { + public StreamStage<String> extendUc1Topology( + final StreamSource<Map.Entry<String, ActivePowerRecord>> source) { // Build the pipeline topology - return pipe.readFrom(source) + return this.pipe.readFrom(source) .withNativeTimestamps(0) - .setLocalParallelism(1) + // .setLocalParallelism(1) .setName("Convert content") .map(Map.Entry::getValue) .map(this.databaseAdapter.getRecordConverter()::convert); diff --git a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2PipelineFactory.java b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2PipelineFactory.java index 52096b860..209234d4b 100644 --- a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2PipelineFactory.java +++ b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2PipelineFactory.java @@ -90,7 +90,7 @@ public class Uc2PipelineFactory extends PipelineFactory { // Build the pipeline topology. return this.pipe.readFrom(source) .withNativeTimestamps(0) - .setLocalParallelism(1) + // .setLocalParallelism(1) .groupingKey(record -> record.getValue().getIdentifier()) .window(WindowDefinition.tumbling(this.downsampleInterval.toMillis())) .aggregate(StatsAggregatorFactory.create()) diff --git a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3PipelineFactory.java b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3PipelineFactory.java index 69fe67756..d31854ec7 100644 --- a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3PipelineFactory.java +++ b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3PipelineFactory.java @@ -106,7 +106,7 @@ public class Uc3PipelineFactory extends PipelineFactory { .readFrom(source) // use Timestamps .withNativeTimestamps(0) - .setLocalParallelism(1) + // .setLocalParallelism(1) // Map timestamp to hour of day and create new key using sensorID and // datetime mapped to HourOfDay .map(record -> { -- GitLab