From 8da7f5febd306722336d75e1a1ea389f8590ead7 Mon Sep 17 00:00:00 2001 From: lorenz <stu203404@mail.uni-kiel.de> Date: Sun, 5 Jun 2022 23:42:01 +0200 Subject: [PATCH] Add getter for pipe in PipelineFactory for hzj pipeline tests --- .../commons/hazelcastjet/PipelineFactory.java | 7 +++++++ .../uc4/hazelcastjet/Uc4PipelineFactory.java | 6 ++---- .../benchmarks/uc4/hazelcastjet/Uc4PipelineTest.java | 10 ++++------ 3 files changed, 13 insertions(+), 10 deletions(-) diff --git a/theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/PipelineFactory.java b/theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/PipelineFactory.java index 6f1b2d935..e9d2baf6d 100644 --- a/theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/PipelineFactory.java +++ b/theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/PipelineFactory.java @@ -25,10 +25,12 @@ public abstract class PipelineFactory { protected String kafkaOutputTopic; + public PipelineFactory() { this.pipe = Pipeline.create(); } + /** * Constructs a pipeline factory with read properties and input topic. * Directly used for Uc1. @@ -58,4 +60,9 @@ public abstract class PipelineFactory { */ public abstract Pipeline buildPipeline(); + public Pipeline getPipe() { + return this.pipe; + } + + } diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineFactory.java b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineFactory.java index be617085a..166f27588 100644 --- a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineFactory.java +++ b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineFactory.java @@ -102,7 +102,7 @@ public class Uc4PipelineFactory extends PipelineFactory { // Extend UC4 topology to pipeline final StreamStage<Map.Entry<String, AggregatedActivePowerRecord>> uc4Aggregation = - this.extendUc4Topology(pipe, inputSource, aggregationSource, configSource); + this.extendUc4Topology(inputSource, aggregationSource, configSource); // Add Sink2: Write back to kafka feedback/aggregation topic uc4Aggregation.writeTo(KafkaSinks.kafka( @@ -140,7 +140,6 @@ public class Uc4PipelineFactory extends PipelineFactory { * (6) Aggregate data over the window * </p> * - * @param pipe The blank pipeline to extend the logic to. * @param inputSource A streaming source with {@code ActivePowerRecord} data. * @param aggregationSource A streaming source with aggregated data. * @param configurationSource A streaming source delivering a {@code SensorRegistry}. @@ -150,8 +149,7 @@ public class Uc4PipelineFactory extends PipelineFactory { */ public StreamStage <Map.Entry<String, AggregatedActivePowerRecord>> - extendUc4Topology(final Pipeline pipe, - final StreamSource<Map.Entry<String, ActivePowerRecord>> inputSource, + extendUc4Topology(final StreamSource<Map.Entry<String, ActivePowerRecord>> inputSource, final StreamSource<Map.Entry<String, AggregatedActivePowerRecord>> aggregationSource, final StreamSource<Map.Entry<Event, String>> configurationSource) { diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineTest.java b/theodolite-benchmarks/uc4-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineTest.java index 2db397b37..fcc42aed6 100644 --- a/theodolite-benchmarks/uc4-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineTest.java +++ b/theodolite-benchmarks/uc4-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineTest.java @@ -121,17 +121,15 @@ public class Uc4PipelineTest extends JetTestSupport { }); // Create pipeline to test - this.testPipeline = Pipeline.create(); - final Properties properties = new Properties(); final Uc4PipelineFactory factory = new Uc4PipelineFactory( properties,properties,properties,properties,"","", "","", testWindowSize); - this.uc4Topology = factory.extendUc4Topology( - testPipeline, testInputSource, testAggregationSource, testConfigSource); - + this.uc4Topology = factory.extendUc4Topology(testInputSource, testAggregationSource, testConfigSource); this.uc4Topology.writeTo(Sinks.logger()); + + this.testPipeline = factory.getPipe(); } /** @@ -217,7 +215,7 @@ public class Uc4PipelineTest extends JetTestSupport { "Job was expected to complete with AssertionCompletedException, but completed with: " + e.getCause(), errorMsg.contains(AssertionCompletedException.class.getName())); - } catch (Exception e){ + } catch (final Exception e){ LOGGER.error("Test is broken",e); } } -- GitLab