diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/InitialElementProducer.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/InitialElementProducer.java new file mode 100644 index 0000000000000000000000000000000000000000..8a303610ad4d2af4b69242d2cca45a1c875b1698 --- /dev/null +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/InitialElementProducer.java @@ -0,0 +1,21 @@ +package teetime.variant.methodcallWithPorts.stage; + +import teetime.variant.methodcallWithPorts.framework.core.ProducerStage; + +public class InitialElementProducer<T> extends ProducerStage<T> { + + private final T[] elements; + + public InitialElementProducer(final T... elements) { + this.elements = elements; + } + + @Override + protected void execute() { + for (T e : this.elements) { + this.send(this.outputPort, e); + } + this.terminate(); + } + +} diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/ObjectProducer.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/ObjectProducer.java index 599d9fb490911009ce4f088a897f9c123bcc4b85..663c83ecf2e4e9118810f0388f8b1dbaf138a70e 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/ObjectProducer.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/ObjectProducer.java @@ -54,13 +54,9 @@ public class ObjectProducer<T> extends ProducerStage<T> { @Override protected void execute() { - // this.logger.debug("Executing object producer..."); - - T newObject = null; - newObject = this.inputObjectCreator.create(); + T newObject = this.inputObjectCreator.create(); this.numInputObjects--; - // System.out.println(this.getClass().getSimpleName() + ": sending " + this.numInputObjects); this.send(this.outputPort, newObject); if (this.numInputObjects == 0) { diff --git a/src/main/java/util/KiekerLoadDriver.java b/src/main/java/util/KiekerLoadDriver.java index 659a9cf27517c840cacf57afdfc4894cab53c2fc..99a250518b65b998f2e509615e55d54d1d95b279 100644 --- a/src/main/java/util/KiekerLoadDriver.java +++ b/src/main/java/util/KiekerLoadDriver.java @@ -14,12 +14,12 @@ import java.util.Collection; import java.util.LinkedList; import java.util.List; -import teetime.variant.methodcallWithPorts.framework.core.HeadStage; import teetime.variant.methodcallWithPorts.framework.core.HeadPipeline; +import teetime.variant.methodcallWithPorts.framework.core.HeadStage; import teetime.variant.methodcallWithPorts.framework.core.RunnableStage; import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe; -import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe; import teetime.variant.methodcallWithPorts.stage.CollectorSink; +import teetime.variant.methodcallWithPorts.stage.InitialElementProducer; import teetime.variant.methodcallWithPorts.stage.kieker.Dir2RecordsFilter; import teetime.variant.methodcallWithPorts.stage.kieker.className.ClassNameRegistryRepository; @@ -40,21 +40,20 @@ public class KiekerLoadDriver { this.runnableStage = new RunnableStage(producerPipeline); } - private HeadPipeline<Dir2RecordsFilter, CollectorSink<IMonitoringRecord>> buildProducerPipeline(final File directory) { + private HeadPipeline<InitialElementProducer<File>, CollectorSink<IMonitoringRecord>> buildProducerPipeline(final File directory) { ClassNameRegistryRepository classNameRegistryRepository = new ClassNameRegistryRepository(); // create stages + InitialElementProducer<File> initialElementProducer = new InitialElementProducer<File>(directory); Dir2RecordsFilter dir2RecordsFilter = new Dir2RecordsFilter(classNameRegistryRepository); CollectorSink<IMonitoringRecord> collector = new CollectorSink<IMonitoringRecord>(this.elementCollection); - final HeadPipeline<Dir2RecordsFilter, CollectorSink<IMonitoringRecord>> pipeline = new HeadPipeline<Dir2RecordsFilter, CollectorSink<IMonitoringRecord>>(); - pipeline.setFirstStage(dir2RecordsFilter); + final HeadPipeline<InitialElementProducer<File>, CollectorSink<IMonitoringRecord>> pipeline = new HeadPipeline<InitialElementProducer<File>, CollectorSink<IMonitoringRecord>>(); + pipeline.setFirstStage(initialElementProducer); pipeline.setLastStage(collector); - dir2RecordsFilter.getInputPort().setPipe(new SpScPipe<File>(1)); + SingleElementPipe.connect(initialElementProducer.getOutputPort(), dir2RecordsFilter.getInputPort()); SingleElementPipe.connect(dir2RecordsFilter.getOutputPort(), collector.getInputPort()); - dir2RecordsFilter.getInputPort().getPipe().add(directory); - return pipeline; } diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceLogging.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceLogging.java index 5d27666cf810daf6802680742aef0c62bbe86f9f..97d4d8680f700b1ac2b51d5d05107e6405a1ac03 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceLogging.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceLogging.java @@ -1,8 +1,8 @@ package teetime.variant.methodcallWithPorts.examples.kiekerdays; import teetime.variant.explicitScheduling.framework.core.Analysis; +import teetime.variant.methodcallWithPorts.framework.core.HeadStage; import teetime.variant.methodcallWithPorts.framework.core.RunnableStage; -import teetime.variant.methodcallWithPorts.framework.core.StageWithPort; import teetime.variant.methodcallWithPorts.stage.io.TCPReader; public class TcpTraceLogging extends Analysis { @@ -12,7 +12,7 @@ public class TcpTraceLogging extends Analysis { @Override public void init() { super.init(); - StageWithPort tcpPipeline = this.buildTcpPipeline(); + HeadStage tcpPipeline = this.buildTcpPipeline(); this.tcpThread = new Thread(new RunnableStage(tcpPipeline)); } @@ -29,7 +29,7 @@ public class TcpTraceLogging extends Analysis { } } - private StageWithPort buildTcpPipeline() { + private HeadStage buildTcpPipeline() { // TCPReaderSink tcpReader = new TCPReaderSink(); TCPReader tcpReader = new TCPReader();