diff --git a/theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/PipelineFactory.java b/theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/PipelineFactory.java index 6f1b2d935c39bcc63e0b02ebbae0f97a71b90cee..e9d2baf6dcb5b81b6ef7c8668458cd505dd7aa0b 100644 --- a/theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/PipelineFactory.java +++ b/theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/PipelineFactory.java @@ -25,10 +25,12 @@ public abstract class PipelineFactory { protected String kafkaOutputTopic; + public PipelineFactory() { this.pipe = Pipeline.create(); } + /** * Constructs a pipeline factory with read properties and input topic. * Directly used for Uc1. @@ -58,4 +60,9 @@ public abstract class PipelineFactory { */ public abstract Pipeline buildPipeline(); + public Pipeline getPipe() { + return this.pipe; + } + + } diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineFactory.java b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineFactory.java index be617085ad970a4095cbcacc91c8b7b05d1d13b8..166f27588c09cc8821d98f5e9fae8e7d12bfa743 100644 --- a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineFactory.java +++ b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineFactory.java @@ -102,7 +102,7 @@ public class Uc4PipelineFactory extends PipelineFactory { // Extend UC4 topology to pipeline final StreamStage<Map.Entry<String, AggregatedActivePowerRecord>> uc4Aggregation = - this.extendUc4Topology(pipe, inputSource, aggregationSource, configSource); + this.extendUc4Topology(inputSource, aggregationSource, configSource); // Add Sink2: Write back to kafka feedback/aggregation topic uc4Aggregation.writeTo(KafkaSinks.kafka( @@ -140,7 +140,6 @@ public class Uc4PipelineFactory extends PipelineFactory { * (6) Aggregate data over the window * </p> * - * @param pipe The blank pipeline to extend the logic to. * @param inputSource A streaming source with {@code ActivePowerRecord} data. * @param aggregationSource A streaming source with aggregated data. * @param configurationSource A streaming source delivering a {@code SensorRegistry}. @@ -150,8 +149,7 @@ public class Uc4PipelineFactory extends PipelineFactory { */ public StreamStage <Map.Entry<String, AggregatedActivePowerRecord>> - extendUc4Topology(final Pipeline pipe, - final StreamSource<Map.Entry<String, ActivePowerRecord>> inputSource, + extendUc4Topology(final StreamSource<Map.Entry<String, ActivePowerRecord>> inputSource, final StreamSource<Map.Entry<String, AggregatedActivePowerRecord>> aggregationSource, final StreamSource<Map.Entry<Event, String>> configurationSource) { diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineTest.java b/theodolite-benchmarks/uc4-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineTest.java index 2db397b37ddbc7f043fdbe63b52476c9bc146f0c..fcc42aed623873e61434b2aa003698ebd5dc062f 100644 --- a/theodolite-benchmarks/uc4-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineTest.java +++ b/theodolite-benchmarks/uc4-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineTest.java @@ -121,17 +121,15 @@ public class Uc4PipelineTest extends JetTestSupport { }); // Create pipeline to test - this.testPipeline = Pipeline.create(); - final Properties properties = new Properties(); final Uc4PipelineFactory factory = new Uc4PipelineFactory( properties,properties,properties,properties,"","", "","", testWindowSize); - this.uc4Topology = factory.extendUc4Topology( - testPipeline, testInputSource, testAggregationSource, testConfigSource); - + this.uc4Topology = factory.extendUc4Topology(testInputSource, testAggregationSource, testConfigSource); this.uc4Topology.writeTo(Sinks.logger()); + + this.testPipeline = factory.getPipe(); } /** @@ -217,7 +215,7 @@ public class Uc4PipelineTest extends JetTestSupport { "Job was expected to complete with AssertionCompletedException, but completed with: " + e.getCause(), errorMsg.contains(AssertionCompletedException.class.getName())); - } catch (Exception e){ + } catch (final Exception e){ LOGGER.error("Test is broken",e); } }