diff --git a/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc1/hazelcastjet/Uc1PipelineFactory.java b/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc1/hazelcastjet/Uc1PipelineFactory.java index 87a528a9313f3d949e394112602f21b712bd622a..c987e3ada01f955c4f3507eb8ca4d64130302126 100644 --- a/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc1/hazelcastjet/Uc1PipelineFactory.java +++ b/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc1/hazelcastjet/Uc1PipelineFactory.java @@ -23,12 +23,13 @@ public class Uc1PipelineFactory extends PipelineFactory { /** * Creates a new Uc1PipelineFactory. + * * @param kafkaReadPropsForPipeline Properties object containing the necessary Kafka attributes. * @param kafkaInputTopic The name of the input topic used for the pipeline. */ public Uc1PipelineFactory(final Properties kafkaReadPropsForPipeline, - final String kafkaInputTopic) { - super(kafkaReadPropsForPipeline,kafkaInputTopic); + final String kafkaInputTopic) { + super(kafkaReadPropsForPipeline, kafkaInputTopic); } /** @@ -41,7 +42,8 @@ public class Uc1PipelineFactory extends PipelineFactory { // Define the Kafka Source final StreamSource<Map.Entry<String, ActivePowerRecord>> kafkaSource = - KafkaSources.<String, ActivePowerRecord>kafka(kafkaReadPropsForPipeline, kafkaInputTopic); + KafkaSources.<String, ActivePowerRecord>kafka(this.kafkaReadPropsForPipeline, + this.kafkaInputTopic); // Extend UC1 topology to the pipeline final StreamStage<String> uc1TopologyProduct = this.extendUc1Topology(kafkaSource); @@ -57,7 +59,7 @@ public class Uc1PipelineFactory extends PipelineFactory { uc1TopologyProduct.writeTo(sink); - return pipe; + return this.pipe; } /** @@ -72,13 +74,13 @@ public class Uc1PipelineFactory extends PipelineFactory { * @return A {@code StreamStage<String>} with the above definition of the String. It can be used * to be further modified or directly be written into a sink. */ - public StreamStage<String> - extendUc1Topology(final StreamSource<Map.Entry<String, ActivePowerRecord>> source) { + public StreamStage<String> extendUc1Topology( + final StreamSource<Map.Entry<String, ActivePowerRecord>> source) { // Build the pipeline topology - return pipe.readFrom(source) + return this.pipe.readFrom(source) .withNativeTimestamps(0) - .setLocalParallelism(1) + // .setLocalParallelism(1) .setName("Convert content") .map(Map.Entry::getValue) .map(this.databaseAdapter.getRecordConverter()::convert); diff --git a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2PipelineFactory.java b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2PipelineFactory.java index 52096b86083b71f2e0cb80cd8cd9a1040ac8cb68..209234d4bd0d3081f8d4f9f92f9db848ccc46e4f 100644 --- a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2PipelineFactory.java +++ b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2PipelineFactory.java @@ -90,7 +90,7 @@ public class Uc2PipelineFactory extends PipelineFactory { // Build the pipeline topology. return this.pipe.readFrom(source) .withNativeTimestamps(0) - .setLocalParallelism(1) + // .setLocalParallelism(1) .groupingKey(record -> record.getValue().getIdentifier()) .window(WindowDefinition.tumbling(this.downsampleInterval.toMillis())) .aggregate(StatsAggregatorFactory.create()) diff --git a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3PipelineFactory.java b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3PipelineFactory.java index 69fe67756e0922085f795cbce4b36a9d23ba9121..d31854ec70ce0ef04e978da67fab35f786003db9 100644 --- a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3PipelineFactory.java +++ b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3PipelineFactory.java @@ -106,7 +106,7 @@ public class Uc3PipelineFactory extends PipelineFactory { .readFrom(source) // use Timestamps .withNativeTimestamps(0) - .setLocalParallelism(1) + // .setLocalParallelism(1) // Map timestamp to hour of day and create new key using sensorID and // datetime mapped to HourOfDay .map(record -> {