diff --git a/hints.txt b/doc/hints.txt similarity index 100% rename from hints.txt rename to doc/hints.txt diff --git a/necessary stage types.txt b/doc/necessary stage types.txt similarity index 100% rename from necessary stage types.txt rename to doc/necessary stage types.txt diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment09/MethodCallThroughputAnalysis9.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment09/MethodCallThroughputAnalysis9.java index 04c2ecfd87f17b33681fd9a7b503842beacc53b2..6b2a9378342364001ca044dc632f18e7b42d201a 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment09/MethodCallThroughputAnalysis9.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment09/MethodCallThroughputAnalysis9.java @@ -22,6 +22,7 @@ import teetime.variant.explicitScheduling.examples.throughput.TimestampObject; import teetime.variant.explicitScheduling.framework.core.Analysis; import teetime.variant.methodcallWithPorts.framework.core.Pipeline; import teetime.variant.methodcallWithPorts.framework.core.RunnableStage; +import teetime.variant.methodcallWithPorts.framework.core.StageWithPort; import teetime.variant.methodcallWithPorts.framework.core.pipe.Pipe; import teetime.variant.methodcallWithPorts.stage.CollectorSink; import teetime.variant.methodcallWithPorts.stage.NoopFilter; @@ -45,14 +46,15 @@ public class MethodCallThroughputAnalysis9 extends Analysis { @Override public void init() { super.init(); - this.runnable = this.buildPipeline(); + StageWithPort pipeline = this.buildPipeline(); + this.runnable = new RunnableStage(pipeline); } /** * @param numNoopFilters * @since 1.10 */ - private Runnable buildPipeline() { + private StageWithPort buildPipeline() { @SuppressWarnings("unchecked") final NoopFilter<TimestampObject>[] noopFilters = new NoopFilter[this.numNoopFilters]; // create stages @@ -64,12 +66,12 @@ public class MethodCallThroughputAnalysis9 extends Analysis { final StopTimestampFilter stopTimestampFilter = new StopTimestampFilter(); final CollectorSink<TimestampObject> collectorSink = new CollectorSink<TimestampObject>(this.timestampObjects); - final Pipeline<Void, Void> pipeline = new Pipeline<Void, Void>(); - pipeline.setFirstStage(objectProducer, null); + final Pipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new Pipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>>(); + pipeline.setFirstStage(objectProducer); pipeline.addIntermediateStage(startTimestampFilter); pipeline.addIntermediateStages(noopFilters); pipeline.addIntermediateStage(stopTimestampFilter); - pipeline.setLastStage(collectorSink, null); + pipeline.setLastStage(collectorSink); Pipe.connect(objectProducer.getOutputPort(), startTimestampFilter.getInputPort()); Pipe.connect(startTimestampFilter.getOutputPort(), noopFilters[0].getInputPort()); @@ -79,7 +81,7 @@ public class MethodCallThroughputAnalysis9 extends Analysis { Pipe.connect(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort()); Pipe.connect(stopTimestampFilter.getOutputPort(), collectorSink.getInputPort()); - return new RunnableStage(pipeline); + return pipeline; } @Override diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment10/MethodCallThroughputAnalysis10.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment10/MethodCallThroughputAnalysis10.java index 4c8fac3f934028502d6cb8ad8c36c90fa5c8c1de..19dc2d6e76534bfc5e661654826f20938cf1fc70 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment10/MethodCallThroughputAnalysis10.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment10/MethodCallThroughputAnalysis10.java @@ -64,7 +64,7 @@ public class MethodCallThroughputAnalysis10 extends Analysis { final StopTimestampFilter stopTimestampFilter = new StopTimestampFilter(); final CollectorSink<TimestampObject> collectorSink = new CollectorSink<TimestampObject>(this.timestampObjects); - final Pipeline<Void, Void> pipeline = new Pipeline<Void, Void>(); + final Pipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new Pipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>>(); pipeline.setFirstStage(objectProducer); pipeline.addIntermediateStage(startTimestampFilter); pipeline.addIntermediateStages(noopFilters); diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment11/MethodCallThroughputAnalysis11.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment11/MethodCallThroughputAnalysis11.java index db3ae9e49364b1c5233d89d80d0d66048d6b0df9..2d57a02c6c1eb479daef864faf0cedf3954a8f9a 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment11/MethodCallThroughputAnalysis11.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment11/MethodCallThroughputAnalysis11.java @@ -22,6 +22,7 @@ import teetime.variant.explicitScheduling.examples.throughput.TimestampObject; import teetime.variant.explicitScheduling.framework.core.Analysis; import teetime.variant.methodcallWithPorts.framework.core.Pipeline; import teetime.variant.methodcallWithPorts.framework.core.RunnableStage; +import teetime.variant.methodcallWithPorts.framework.core.StageWithPort; import teetime.variant.methodcallWithPorts.framework.core.pipe.UnorderedGrowablePipe; import teetime.variant.methodcallWithPorts.stage.CollectorSink; import teetime.variant.methodcallWithPorts.stage.NoopFilter; @@ -45,15 +46,11 @@ public class MethodCallThroughputAnalysis11 extends Analysis { @Override public void init() { super.init(); - Pipeline<Void, ?> pipeline = this.buildPipeline(this.numInputObjects, this.inputObjectCreator); - this.runnable = new RunnableStage<Void>(pipeline); + StageWithPort pipeline = this.buildPipeline(this.numInputObjects, this.inputObjectCreator); + this.runnable = new RunnableStage(pipeline); } - /** - * @param numNoopFilters - * @since 1.10 - */ - private Pipeline<Void, Void> buildPipeline(final long numInputObjects, final ConstructorClosure<TimestampObject> inputObjectCreator) { + private StageWithPort buildPipeline(final long numInputObjects, final ConstructorClosure<TimestampObject> inputObjectCreator) { @SuppressWarnings("unchecked") final NoopFilter<TimestampObject>[] noopFilters = new NoopFilter[this.numNoopFilters]; // create stages @@ -67,7 +64,7 @@ public class MethodCallThroughputAnalysis11 extends Analysis { final StopTimestampFilter stopTimestampFilter = new StopTimestampFilter(); final CollectorSink<TimestampObject> collectorSink = new CollectorSink<TimestampObject>(this.timestampObjects); - final Pipeline<Void, Void> pipeline = new Pipeline<Void, Void>(); + final Pipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new Pipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>>(); pipeline.setFirstStage(objectProducer); // pipeline.addIntermediateStage(relayFake); pipeline.addIntermediateStage(startTimestampFilter); diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment14/MethodCallThroughputAnalysis14.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment14/MethodCallThroughputAnalysis14.java index 1a9044123ac6f9872fdf4dd2b03df72307e715bf..e62d0572dafbe7de40df68b01811dd8ee3dfad34 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment14/MethodCallThroughputAnalysis14.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment14/MethodCallThroughputAnalysis14.java @@ -22,6 +22,7 @@ import teetime.variant.explicitScheduling.examples.throughput.TimestampObject; import teetime.variant.explicitScheduling.framework.core.Analysis; import teetime.variant.methodcallWithPorts.framework.core.Pipeline; import teetime.variant.methodcallWithPorts.framework.core.RunnableStage; +import teetime.variant.methodcallWithPorts.framework.core.StageWithPort; import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe; import teetime.variant.methodcallWithPorts.stage.CollectorSink; import teetime.variant.methodcallWithPorts.stage.NoopFilter; @@ -47,14 +48,15 @@ public class MethodCallThroughputAnalysis14 extends Analysis { @Override public void init() { super.init(); - this.runnable = this.buildPipeline(); + StageWithPort pipeline = this.buildPipeline(); + this.runnable = new RunnableStage(pipeline); } /** * @param numNoopFilters * @since 1.10 */ - private Runnable buildPipeline() { + private StageWithPort buildPipeline() { @SuppressWarnings("unchecked") final NoopFilter<TimestampObject>[] noopFilters = new NoopFilter[this.numNoopFilters]; // create stages @@ -66,7 +68,7 @@ public class MethodCallThroughputAnalysis14 extends Analysis { final StopTimestampFilter stopTimestampFilter = new StopTimestampFilter(); final CollectorSink<TimestampObject> collectorSink = new CollectorSink<TimestampObject>(this.timestampObjects); - final Pipeline<Void, Void> pipeline = new Pipeline<Void, Void>(); + final Pipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new Pipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>>(); pipeline.setFirstStage(objectProducer); pipeline.addIntermediateStage(startTimestampFilter); pipeline.addIntermediateStages(noopFilters); @@ -81,7 +83,7 @@ public class MethodCallThroughputAnalysis14 extends Analysis { SpScPipe.connect(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort(), SPSC_INITIAL_CAPACITY); SpScPipe.connect(stopTimestampFilter.getOutputPort(), collectorSink.getInputPort(), SPSC_INITIAL_CAPACITY); - return new RunnableStage(pipeline); + return pipeline; } @Override diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment15/MethodCallThroughputAnalysis15.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment15/MethodCallThroughputAnalysis15.java index b285f8fca81ab51e9c0d9f98d540e1e146a1856b..4fb5db966f945e91348ab691fe53662eb875546a 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment15/MethodCallThroughputAnalysis15.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment15/MethodCallThroughputAnalysis15.java @@ -22,17 +22,18 @@ import teetime.variant.explicitScheduling.examples.throughput.TimestampObject; import teetime.variant.explicitScheduling.framework.core.Analysis; import teetime.variant.methodcallWithPorts.framework.core.Pipeline; import teetime.variant.methodcallWithPorts.framework.core.RunnableStage; +import teetime.variant.methodcallWithPorts.framework.core.StageWithPort; import teetime.variant.methodcallWithPorts.framework.core.pipe.OrderedGrowableArrayPipe; import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe; import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe; import teetime.variant.methodcallWithPorts.stage.Clock; import teetime.variant.methodcallWithPorts.stage.CollectorSink; -import teetime.variant.methodcallWithPorts.stage.EndStage; import teetime.variant.methodcallWithPorts.stage.NoopFilter; import teetime.variant.methodcallWithPorts.stage.ObjectProducer; import teetime.variant.methodcallWithPorts.stage.StartTimestampFilter; import teetime.variant.methodcallWithPorts.stage.StopTimestampFilter; import teetime.variant.methodcallWithPorts.stage.basic.Delay; +import teetime.variant.methodcallWithPorts.stage.basic.Sink; /** * @author Christian Wulf @@ -57,28 +58,31 @@ public class MethodCallThroughputAnalysis15 extends Analysis { public void init() { super.init(); - this.clockRunnable = this.buildClockPipeline(); - this.runnable = this.buildPipeline(this.clock); + StageWithPort clockPipeline = this.buildClockPipeline(); + this.clockRunnable = new RunnableStage(clockPipeline); + + StageWithPort pipeline = this.buildPipeline(this.clock); + this.runnable = new RunnableStage(pipeline); } - private Runnable buildClockPipeline() { + private StageWithPort buildClockPipeline() { this.clock = new Clock(); this.clock.setInitialDelayInMs(100); this.clock.setIntervalDelayInMs(100); - final Pipeline<Void, Long> pipeline = new Pipeline<Void, Long>(); + final Pipeline<Clock, Sink<Long>> pipeline = new Pipeline<Clock, Sink<Long>>(); pipeline.setFirstStage(this.clock); - pipeline.setLastStage(new EndStage<Long>()); + pipeline.setLastStage(new Sink<Long>()); - return new RunnableStage(pipeline); + return pipeline; } /** * @param numNoopFilters * @since 1.10 */ - private Runnable buildPipeline(final Clock clock) { + private StageWithPort buildPipeline(final Clock clock) { @SuppressWarnings("unchecked") final NoopFilter<TimestampObject>[] noopFilters = new NoopFilter[this.numNoopFilters]; // create stages @@ -91,7 +95,7 @@ public class MethodCallThroughputAnalysis15 extends Analysis { final StopTimestampFilter stopTimestampFilter = new StopTimestampFilter(); final CollectorSink<TimestampObject> collectorSink = new CollectorSink<TimestampObject>(this.timestampObjects); - final Pipeline<Void, Void> pipeline = new Pipeline<Void, Void>(); + final Pipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new Pipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>>(); pipeline.setFirstStage(objectProducer); pipeline.addIntermediateStage(startTimestampFilter); pipeline.addIntermediateStages(noopFilters); @@ -111,7 +115,7 @@ public class MethodCallThroughputAnalysis15 extends Analysis { SingleElementPipe.connect(delay.getOutputPort(), collectorSink.getInputPort()); - return new RunnableStage(pipeline); + return pipeline; } @Override diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment16/MethodCallThroughputAnalysis16.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment16/MethodCallThroughputAnalysis16.java index 1e82e3fce004ffe7db2277c5f6ea921d8c83951f..f204497b5c2bac8bea7baf36936bd73994114ba9 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment16/MethodCallThroughputAnalysis16.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment16/MethodCallThroughputAnalysis16.java @@ -24,7 +24,6 @@ import teetime.variant.explicitScheduling.examples.throughput.TimestampObject; import teetime.variant.explicitScheduling.framework.core.Analysis; import teetime.variant.methodcallWithPorts.framework.core.Pipeline; import teetime.variant.methodcallWithPorts.framework.core.RunnableStage; -import teetime.variant.methodcallWithPorts.framework.core.StageWithPort; import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe; import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe; import teetime.variant.methodcallWithPorts.stage.CollectorSink; @@ -51,7 +50,6 @@ public class MethodCallThroughputAnalysis16 extends Analysis { private final List<List<TimestampObject>> timestampObjectsList = new LinkedList<List<TimestampObject>>(); - private Distributor<TimestampObject> distributor; private Thread producerThread; private Thread[] workerThreads; @@ -61,8 +59,9 @@ public class MethodCallThroughputAnalysis16 extends Analysis { @Override public void init() { super.init(); - Pipeline<Void, TimestampObject> producerPipeline = this.buildProducerPipeline(this.numInputObjects, this.inputObjectCreator); - this.producerThread = new Thread(new RunnableStage<Void>(producerPipeline)); + Pipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> producerPipeline = this.buildProducerPipeline(this.numInputObjects, + this.inputObjectCreator); + this.producerThread = new Thread(new RunnableStage(producerPipeline)); this.numWorkerThreads = Math.min(NUM_WORKER_THREADS, this.numWorkerThreads); @@ -71,20 +70,21 @@ public class MethodCallThroughputAnalysis16 extends Analysis { List<TimestampObject> resultList = new ArrayList<TimestampObject>(this.numInputObjects); this.timestampObjectsList.add(resultList); - Pipeline<TimestampObject, Void> workerPipeline = this.buildPipeline(producerPipeline, resultList); - this.workerThreads[i] = new Thread(new RunnableStage<TimestampObject>(workerPipeline)); + Pipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>> workerPipeline = this.buildPipeline(producerPipeline, resultList); + this.workerThreads[i] = new Thread(new RunnableStage(workerPipeline)); } } - private Pipeline<Void, TimestampObject> buildProducerPipeline(final int numInputObjects, final ConstructorClosure<TimestampObject> inputObjectCreator) { + private Pipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> buildProducerPipeline(final int numInputObjects, + final ConstructorClosure<TimestampObject> inputObjectCreator) { final ObjectProducer<TimestampObject> objectProducer = new ObjectProducer<TimestampObject>(numInputObjects, inputObjectCreator); - this.distributor = new Distributor<TimestampObject>(); + Distributor<TimestampObject> distributor = new Distributor<TimestampObject>(); - final Pipeline<Void, TimestampObject> pipeline = new Pipeline<Void, TimestampObject>(); + final Pipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> pipeline = new Pipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>>(); pipeline.setFirstStage(objectProducer); - pipeline.setLastStage(this.distributor); + pipeline.setLastStage(distributor); - SingleElementPipe.connect(objectProducer.getOutputPort(), this.distributor.getInputPort()); + SingleElementPipe.connect(objectProducer.getOutputPort(), distributor.getInputPort()); return pipeline; } @@ -93,7 +93,9 @@ public class MethodCallThroughputAnalysis16 extends Analysis { * @param numNoopFilters * @since 1.10 */ - private Pipeline<TimestampObject, Void> buildPipeline(final StageWithPort<Void, TimestampObject> previousStage, final List<TimestampObject> timestampObjects) { + private Pipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>> buildPipeline( + final Pipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> previousStage, + final List<TimestampObject> timestampObjects) { Relay<TimestampObject> relay = new Relay<TimestampObject>(); @SuppressWarnings("unchecked") final NoopFilter<TimestampObject>[] noopFilters = new NoopFilter[this.numNoopFilters]; @@ -105,14 +107,14 @@ public class MethodCallThroughputAnalysis16 extends Analysis { final StopTimestampFilter stopTimestampFilter = new StopTimestampFilter(); final CollectorSink<TimestampObject> collectorSink = new CollectorSink<TimestampObject>(timestampObjects); - final Pipeline<TimestampObject, Void> pipeline = new Pipeline<TimestampObject, Void>(); + final Pipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new Pipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>>(); pipeline.setFirstStage(relay); pipeline.addIntermediateStage(startTimestampFilter); pipeline.addIntermediateStages(noopFilters); pipeline.addIntermediateStage(stopTimestampFilter); pipeline.setLastStage(collectorSink); - SpScPipe.connect(previousStage.getOutputPort(), relay.getInputPort(), SPSC_INITIAL_CAPACITY); + SpScPipe.connect(previousStage.getLastStage().getNewOutputPort(), relay.getInputPort(), SPSC_INITIAL_CAPACITY); SingleElementPipe.connect(relay.getOutputPort(), startTimestampFilter.getInputPort()); diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment17/MethodCallThroughputAnalysis17.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment17/MethodCallThroughputAnalysis17.java index c26772746ff420cba334ba28bb04a794da56e888..652ccbdc2aa43ac9157612fd409cfa389ac1bc31 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment17/MethodCallThroughputAnalysis17.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment17/MethodCallThroughputAnalysis17.java @@ -30,7 +30,6 @@ import teetime.variant.methodcallWithPorts.framework.core.pipe.IPipe; import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe; import teetime.variant.methodcallWithPorts.framework.core.pipe.UnorderedGrowablePipe; import teetime.variant.methodcallWithPorts.stage.CollectorSink; -import teetime.variant.methodcallWithPorts.stage.EndStage; import teetime.variant.methodcallWithPorts.stage.NoopFilter; import teetime.variant.methodcallWithPorts.stage.ObjectProducer; import teetime.variant.methodcallWithPorts.stage.Relay; @@ -60,8 +59,8 @@ public class MethodCallThroughputAnalysis17 extends Analysis { @Override public void init() { - final Pipeline<Void, TimestampObject> producerPipeline = this.buildProducerPipeline(this.numInputObjects, this.inputObjectCreator); - this.producerThread = new Thread(new RunnableStage<Void>(producerPipeline)); + final StageWithPort producerPipeline = this.buildProducerPipeline(this.numInputObjects, this.inputObjectCreator); + this.producerThread = new Thread(new RunnableStage(producerPipeline)); int numWorkerThreads = Math.min(NUM_WORKER_THREADS, 1); // only for testing purpose @@ -95,7 +94,7 @@ public class MethodCallThroughputAnalysis17 extends Analysis { // this.producerThread.start(); // this.producerThread.run(); - new RunnableStage<Void>(producerPipeline).run(); + new RunnableStage(producerPipeline).run(); // try { // this.producerThread.join(); @@ -107,14 +106,14 @@ public class MethodCallThroughputAnalysis17 extends Analysis { super.init(); } - private Pipeline<Void, TimestampObject> buildProducerPipeline(final int numInputObjects, final ConstructorClosure<TimestampObject> inputObjectCreator) { + private StageWithPort buildProducerPipeline(final int numInputObjects, + final ConstructorClosure<TimestampObject> inputObjectCreator) { final ObjectProducer<TimestampObject> objectProducer = new ObjectProducer<TimestampObject>(numInputObjects, inputObjectCreator); Distributor<TimestampObject> distributor = new Distributor<TimestampObject>(); Sink<TimestampObject> sink = new Sink<TimestampObject>(); - EndStage<Void> endStage = new EndStage<Void>(); - endStage.closure = inputObjectCreator; + Sink<Void> endStage = new Sink<Void>(); - final Pipeline<Void, TimestampObject> pipeline = new Pipeline<Void, TimestampObject>(); + final Pipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> pipeline = new Pipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>>(); pipeline.setFirstStage(objectProducer); // pipeline.setFirstStage(sink); // pipeline.setFirstStage(endStage); @@ -127,7 +126,7 @@ public class MethodCallThroughputAnalysis17 extends Analysis { // objectProducer.getOutputPort().pipe = new UnorderedGrowablePipe<TimestampObject>(); UnorderedGrowablePipe.connect(objectProducer.getOutputPort(), distributor.getInputPort()); - distributor.getOutputPort().setPipe(new UnorderedGrowablePipe<TimestampObject>()); + distributor.getNewOutputPort().setPipe(new UnorderedGrowablePipe<TimestampObject>()); return pipeline; } @@ -136,7 +135,7 @@ public class MethodCallThroughputAnalysis17 extends Analysis { * @param numNoopFilters * @since 1.10 */ - private Runnable buildPipeline(final StageWithPort<Void, TimestampObject> previousStage, final List<TimestampObject> timestampObjects) { + private Runnable buildPipeline(final StageWithPort previousStage, final List<TimestampObject> timestampObjects) { Relay<TimestampObject> relay = new Relay<TimestampObject>(); // create stages final StartTimestampFilter startTimestampFilter = new StartTimestampFilter(); @@ -148,7 +147,7 @@ public class MethodCallThroughputAnalysis17 extends Analysis { final StopTimestampFilter stopTimestampFilter = new StopTimestampFilter(); final CollectorSink<TimestampObject> collectorSink = new CollectorSink<TimestampObject>(timestampObjects); - final Pipeline<TimestampObject, Void> pipeline = new Pipeline<TimestampObject, Void>(); + final Pipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new Pipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>>(); pipeline.setFirstStage(relay); pipeline.addIntermediateStage(startTimestampFilter); pipeline.addIntermediateStages(noopFilters); diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment18/MethodCallThroughputAnalysis18.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment18/MethodCallThroughputAnalysis18.java index 9efe097ee64e9a1b077e21f4f59a0049ab7e8b9d..cb95597bfcce10321cf747c949515ea619a19952 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment18/MethodCallThroughputAnalysis18.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment18/MethodCallThroughputAnalysis18.java @@ -51,7 +51,6 @@ public class MethodCallThroughputAnalysis18 extends Analysis { private final List<List<TimestampObject>> timestampObjectsList = new LinkedList<List<TimestampObject>>(); - private Distributor<TimestampObject> distributor; private Thread producerThread; private Thread[] workerThreads; @@ -61,7 +60,8 @@ public class MethodCallThroughputAnalysis18 extends Analysis { @Override public void init() { super.init(); - Pipeline<Void, TimestampObject> producerPipeline = this.buildProducerPipeline(this.numInputObjects, this.inputObjectCreator); + Pipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> producerPipeline = this.buildProducerPipeline(this.numInputObjects, + this.inputObjectCreator); this.producerThread = new Thread(new RunnableStage(producerPipeline)); this.numWorkerThreads = Math.min(NUM_WORKER_THREADS, this.numWorkerThreads); @@ -71,20 +71,21 @@ public class MethodCallThroughputAnalysis18 extends Analysis { List<TimestampObject> resultList = new ArrayList<TimestampObject>(this.numInputObjects); this.timestampObjectsList.add(resultList); - Runnable workerRunnable = this.buildPipeline(producerPipeline, resultList); - this.workerThreads[i] = new Thread(workerRunnable); + StageWithPort pipeline = this.buildPipeline(producerPipeline, resultList); + this.workerThreads[i] = new Thread(new RunnableStage(pipeline)); } } - private Pipeline<Void, TimestampObject> buildProducerPipeline(final int numInputObjects, final ConstructorClosure<TimestampObject> inputObjectCreator) { + private Pipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> buildProducerPipeline(final int numInputObjects, + final ConstructorClosure<TimestampObject> inputObjectCreator) { final ObjectProducer<TimestampObject> objectProducer = new ObjectProducer<TimestampObject>(numInputObjects, inputObjectCreator); - this.distributor = new Distributor<TimestampObject>(); + Distributor<TimestampObject> distributor = new Distributor<TimestampObject>(); - final Pipeline<Void, TimestampObject> pipeline = new Pipeline<Void, TimestampObject>(); + final Pipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> pipeline = new Pipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>>(); pipeline.setFirstStage(objectProducer); - pipeline.setLastStage(this.distributor); + pipeline.setLastStage(distributor); - UnorderedGrowablePipe.connect(objectProducer.getOutputPort(), this.distributor.getInputPort()); + UnorderedGrowablePipe.connect(objectProducer.getOutputPort(), distributor.getInputPort()); return pipeline; } @@ -93,7 +94,9 @@ public class MethodCallThroughputAnalysis18 extends Analysis { * @param numNoopFilters * @since 1.10 */ - private Runnable buildPipeline(final StageWithPort<Void, TimestampObject> previousStage, final List<TimestampObject> timestampObjects) { + private Pipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>> buildPipeline( + final Pipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> previousStage, + final List<TimestampObject> timestampObjects) { Relay<TimestampObject> relay = new Relay<TimestampObject>(); @SuppressWarnings("unchecked") final NoopFilter<TimestampObject>[] noopFilters = new NoopFilter[this.numNoopFilters]; @@ -105,14 +108,14 @@ public class MethodCallThroughputAnalysis18 extends Analysis { final StopTimestampFilter stopTimestampFilter = new StopTimestampFilter(); final CollectorSink<TimestampObject> collectorSink = new CollectorSink<TimestampObject>(timestampObjects); - final Pipeline<TimestampObject, Void> pipeline = new Pipeline<TimestampObject, Void>(); + final Pipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new Pipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>>(); pipeline.setFirstStage(relay); pipeline.addIntermediateStage(startTimestampFilter); pipeline.addIntermediateStages(noopFilters); pipeline.addIntermediateStage(stopTimestampFilter); pipeline.setLastStage(collectorSink); - SpScPipe.connect(previousStage.getOutputPort(), relay.getInputPort(), SPSC_INITIAL_CAPACITY); + SpScPipe.connect(previousStage.getLastStage().getNewOutputPort(), relay.getInputPort(), SPSC_INITIAL_CAPACITY); UnorderedGrowablePipe.connect(relay.getOutputPort(), startTimestampFilter.getInputPort()); @@ -123,7 +126,7 @@ public class MethodCallThroughputAnalysis18 extends Analysis { UnorderedGrowablePipe.connect(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort()); UnorderedGrowablePipe.connect(stopTimestampFilter.getOutputPort(), collectorSink.getInputPort()); - return new RunnableStage(pipeline); + return pipeline; } @Override diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment19/MethodCallThroughputAnalysis19.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment19/MethodCallThroughputAnalysis19.java index 6df61a384bf8d305a263c103a9bc5dc96b0799bf..f31ad97eb15ebb506ba6fc07b99fa2c359227ab9 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment19/MethodCallThroughputAnalysis19.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment19/MethodCallThroughputAnalysis19.java @@ -24,7 +24,6 @@ import teetime.variant.explicitScheduling.examples.throughput.TimestampObject; import teetime.variant.explicitScheduling.framework.core.Analysis; import teetime.variant.methodcallWithPorts.framework.core.Pipeline; import teetime.variant.methodcallWithPorts.framework.core.RunnableStage; -import teetime.variant.methodcallWithPorts.framework.core.StageWithPort; import teetime.variant.methodcallWithPorts.framework.core.pipe.OrderedGrowableArrayPipe; import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe; import teetime.variant.methodcallWithPorts.stage.CollectorSink; @@ -51,7 +50,6 @@ public class MethodCallThroughputAnalysis19 extends Analysis { private final List<List<TimestampObject>> timestampObjectsList = new LinkedList<List<TimestampObject>>(); - private Distributor<TimestampObject> distributor; private Thread producerThread; private Thread[] workerThreads; @@ -61,7 +59,8 @@ public class MethodCallThroughputAnalysis19 extends Analysis { @Override public void init() { super.init(); - Pipeline<Void, TimestampObject> producerPipeline = this.buildProducerPipeline(this.numInputObjects, this.inputObjectCreator); + Pipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> producerPipeline = this.buildProducerPipeline(this.numInputObjects, + this.inputObjectCreator); this.producerThread = new Thread(new RunnableStage(producerPipeline)); this.numWorkerThreads = Math.min(NUM_WORKER_THREADS, this.numWorkerThreads); @@ -71,30 +70,27 @@ public class MethodCallThroughputAnalysis19 extends Analysis { List<TimestampObject> resultList = new ArrayList<TimestampObject>(this.numInputObjects); this.timestampObjectsList.add(resultList); - Runnable workerRunnable = this.buildPipeline(producerPipeline, resultList); + Runnable workerRunnable = this.buildPipeline(producerPipeline.getLastStage(), resultList); this.workerThreads[i] = new Thread(workerRunnable); } } - private Pipeline<Void, TimestampObject> buildProducerPipeline(final int numInputObjects, final ConstructorClosure<TimestampObject> inputObjectCreator) { + private Pipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> buildProducerPipeline(final int numInputObjects, + final ConstructorClosure<TimestampObject> inputObjectCreator) { final ObjectProducer<TimestampObject> objectProducer = new ObjectProducer<TimestampObject>(numInputObjects, inputObjectCreator); - this.distributor = new Distributor<TimestampObject>(); + Distributor<TimestampObject> distributor = new Distributor<TimestampObject>(); - final Pipeline<Void, TimestampObject> pipeline = new Pipeline<Void, TimestampObject>(); + final Pipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> pipeline = new Pipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>>(); pipeline.setFirstStage(objectProducer); - pipeline.setLastStage(this.distributor); + pipeline.setLastStage(distributor); - OrderedGrowableArrayPipe.connect(objectProducer.getOutputPort(), this.distributor.getInputPort()); + OrderedGrowableArrayPipe.connect(objectProducer.getOutputPort(), distributor.getInputPort()); return pipeline; } - /** - * @param numNoopFilters - * @since 1.10 - */ - private Runnable buildPipeline(final StageWithPort<Void, TimestampObject> previousStage, final List<TimestampObject> timestampObjects) { + private Runnable buildPipeline(final Distributor<TimestampObject> previousStage, final List<TimestampObject> timestampObjects) { Relay<TimestampObject> relay = new Relay<TimestampObject>(); @SuppressWarnings("unchecked") final NoopFilter<TimestampObject>[] noopFilters = new NoopFilter[this.numNoopFilters]; @@ -106,14 +102,14 @@ public class MethodCallThroughputAnalysis19 extends Analysis { final StopTimestampFilter stopTimestampFilter = new StopTimestampFilter(); final CollectorSink<TimestampObject> collectorSink = new CollectorSink<TimestampObject>(timestampObjects); - final Pipeline<TimestampObject, Void> pipeline = new Pipeline<TimestampObject, Void>(); + final Pipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new Pipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>>(); pipeline.setFirstStage(relay); pipeline.addIntermediateStage(startTimestampFilter); pipeline.addIntermediateStages(noopFilters); pipeline.addIntermediateStage(stopTimestampFilter); pipeline.setLastStage(collectorSink); - SpScPipe.connect(previousStage.getOutputPort(), relay.getInputPort(), SPSC_INITIAL_CAPACITY); + SpScPipe.connect(previousStage.getNewOutputPort(), relay.getInputPort(), SPSC_INITIAL_CAPACITY); OrderedGrowableArrayPipe.connect(relay.getOutputPort(), startTimestampFilter.getInputPort()); diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TCPReaderSink.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TCPReaderSink.java index 1f244d8a332fce0a68f9a75b1bf27ee2610daed2..fe7e3ead6b1ad5490608883e46ae017d0de010d6 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TCPReaderSink.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TCPReaderSink.java @@ -44,7 +44,7 @@ import kieker.common.util.registry.Lookup; * * @since 1.10 */ -public class TCPReaderSink extends ProducerStage<Void, IMonitoringRecord> { +public class TCPReaderSink extends ProducerStage<IMonitoringRecord> { private static final int MESSAGE_BUFFER_SIZE = 65535; @@ -92,7 +92,7 @@ public class TCPReaderSink extends ProducerStage<Void, IMonitoringRecord> { } @Override - protected void execute5(final Void element) { + protected void execute() { ServerSocketChannel serversocket = null; try { serversocket = ServerSocketChannel.open(); diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceLogging.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceLogging.java index 55b5b4004a75f0d7aa40f0e247cc02d19b026098..61e89b210e64c2c605f41eabd730741baa7fd823 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceLogging.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceLogging.java @@ -4,8 +4,6 @@ import teetime.variant.explicitScheduling.framework.core.Analysis; import teetime.variant.methodcallWithPorts.framework.core.RunnableStage; import teetime.variant.methodcallWithPorts.framework.core.StageWithPort; -import kieker.common.record.IMonitoringRecord; - public class TcpTraceLogging extends Analysis { private Thread tcpThread; @@ -13,8 +11,8 @@ public class TcpTraceLogging extends Analysis { @Override public void init() { super.init(); - StageWithPort<Void, IMonitoringRecord> tcpPipeline = this.buildTcpPipeline(); - this.tcpThread = new Thread(new RunnableStage<Void>(tcpPipeline)); + StageWithPort tcpPipeline = this.buildTcpPipeline(); + this.tcpThread = new Thread(new RunnableStage(tcpPipeline)); } @Override @@ -30,7 +28,7 @@ public class TcpTraceLogging extends Analysis { } } - private StageWithPort<Void, IMonitoringRecord> buildTcpPipeline() { + private StageWithPort buildTcpPipeline() { TCPReaderSink tcpReader = new TCPReaderSink(); // EndStage<IMonitoringRecord> endStage = new EndStage<IMonitoringRecord>(); // diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReconstruction.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReconstruction.java index 6328a5d70102374f7a978fca2b2e9ba30296d2fc..5c5c568ab4b0a7e9cfed8093b2fa31faa5365995 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReconstruction.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReconstruction.java @@ -12,9 +12,9 @@ import teetime.variant.methodcallWithPorts.framework.core.RunnableStage; import teetime.variant.methodcallWithPorts.framework.core.StageWithPort; import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe; import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe; -import teetime.variant.methodcallWithPorts.stage.EndStage; import teetime.variant.methodcallWithPorts.stage.InstanceOfFilter; import teetime.variant.methodcallWithPorts.stage.Relay; +import teetime.variant.methodcallWithPorts.stage.basic.Sink; import teetime.variant.methodcallWithPorts.stage.basic.distributor.Distributor; import teetime.variant.methodcallWithPorts.stage.io.TCPReader; import teetime.variant.methodcallWithPorts.stage.kieker.traceReconstruction.TraceReconstructionFilter; @@ -41,41 +41,41 @@ public class TcpTraceReconstruction extends Analysis { @Override public void init() { super.init(); - StageWithPort<Void, IMonitoringRecord> tcpPipeline = this.buildTcpPipeline(); - this.tcpThread = new Thread(new RunnableStage<Void>(tcpPipeline)); + Pipeline<TCPReader, Distributor<IMonitoringRecord>> tcpPipeline = this.buildTcpPipeline(); + this.tcpThread = new Thread(new RunnableStage(tcpPipeline)); this.numWorkerThreads = Math.min(NUM_VIRTUAL_CORES, this.numWorkerThreads); this.workerThreads = new Thread[this.numWorkerThreads]; for (int i = 0; i < this.workerThreads.length; i++) { - StageWithPort<IMonitoringRecord, ?> pipeline = this.buildPipeline(tcpPipeline); - this.workerThreads[i] = new Thread(new RunnableStage<IMonitoringRecord>(pipeline)); + StageWithPort pipeline = this.buildPipeline(tcpPipeline.getLastStage()); + this.workerThreads[i] = new Thread(new RunnableStage(pipeline)); } } - private StageWithPort<Void, IMonitoringRecord> buildTcpPipeline() { + private Pipeline<TCPReader, Distributor<IMonitoringRecord>> buildTcpPipeline() { TCPReader tcpReader = new TCPReader(); Distributor<IMonitoringRecord> distributor = new Distributor<IMonitoringRecord>(); SingleElementPipe.connect(tcpReader.getOutputPort(), distributor.getInputPort()); // create and configure pipeline - Pipeline<Void, IMonitoringRecord> pipeline = new Pipeline<Void, IMonitoringRecord>(); + Pipeline<TCPReader, Distributor<IMonitoringRecord>> pipeline = new Pipeline<TCPReader, Distributor<IMonitoringRecord>>(); pipeline.setFirstStage(tcpReader); pipeline.setLastStage(distributor); return pipeline; } - private Pipeline<IMonitoringRecord, ?> buildPipeline(final StageWithPort<Void, IMonitoringRecord> tcpReaderPipeline) { + private StageWithPort buildPipeline(final Distributor<IMonitoringRecord> tcpReaderPipeline) { // create stages Relay<IMonitoringRecord> relay = new Relay<IMonitoringRecord>(); final InstanceOfFilter<IMonitoringRecord, IFlowRecord> instanceOfFilter = new InstanceOfFilter<IMonitoringRecord, IFlowRecord>( IFlowRecord.class); final TraceReconstructionFilter traceReconstructionFilter = new TraceReconstructionFilter(this.traceId2trace); - EndStage<TraceEventRecords> endStage = new EndStage<TraceEventRecords>(); + Sink<TraceEventRecords> endStage = new Sink<TraceEventRecords>(); // connect stages - SpScPipe<IMonitoringRecord> tcpRelayPipe = SpScPipe.connect(tcpReaderPipeline.getOutputPort(), relay.getInputPort(), TCP_RELAY_MAX_SIZE); + SpScPipe<IMonitoringRecord> tcpRelayPipe = SpScPipe.connect(tcpReaderPipeline.getNewOutputPort(), relay.getInputPort(), TCP_RELAY_MAX_SIZE); this.tcpRelayPipes.add(tcpRelayPipe); SingleElementPipe.connect(relay.getOutputPort(), instanceOfFilter.getInputPort()); @@ -83,7 +83,7 @@ public class TcpTraceReconstruction extends Analysis { SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), endStage.getInputPort()); // create and configure pipeline - Pipeline<IMonitoringRecord, TraceEventRecords> pipeline = new Pipeline<IMonitoringRecord, TraceEventRecords>(); + Pipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>> pipeline = new Pipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>>(); pipeline.setFirstStage(relay); pipeline.addIntermediateStage(instanceOfFilter); pipeline.addIntermediateStage(traceReconstructionFilter); diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReduction.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReduction.java index d73c8032a9f6562ed3b06cd01a6f5385eae6e479..b450cc2055e162988a98dbcbedd77a59bea8efd5 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReduction.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReduction.java @@ -15,9 +15,9 @@ import teetime.variant.methodcallWithPorts.framework.core.StageWithPort; import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe; import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe; import teetime.variant.methodcallWithPorts.stage.Clock; -import teetime.variant.methodcallWithPorts.stage.EndStage; import teetime.variant.methodcallWithPorts.stage.InstanceOfFilter; import teetime.variant.methodcallWithPorts.stage.Relay; +import teetime.variant.methodcallWithPorts.stage.basic.Sink; import teetime.variant.methodcallWithPorts.stage.basic.distributor.Distributor; import teetime.variant.methodcallWithPorts.stage.io.TCPReader; import teetime.variant.methodcallWithPorts.stage.kieker.traceReconstruction.TraceReconstructionFilter; @@ -49,35 +49,35 @@ public class TcpTraceReduction extends Analysis { @Override public void init() { super.init(); - StageWithPort<Void, IMonitoringRecord> tcpPipeline = this.buildTcpPipeline(); - this.tcpThread = new Thread(new RunnableStage<Void>(tcpPipeline)); + Pipeline<TCPReader, Distributor<IMonitoringRecord>> tcpPipeline = this.buildTcpPipeline(); + this.tcpThread = new Thread(new RunnableStage(tcpPipeline)); - StageWithPort<Void, Long> clockStage = this.buildClockPipeline(5000); - this.clockThread = new Thread(new RunnableStage<Void>(clockStage)); + Pipeline<Clock, Distributor<Long>> clockStage = this.buildClockPipeline(5000); + this.clockThread = new Thread(new RunnableStage(clockStage)); this.numWorkerThreads = Math.min(NUM_VIRTUAL_CORES, this.numWorkerThreads); this.workerThreads = new Thread[this.numWorkerThreads]; for (int i = 0; i < this.workerThreads.length; i++) { - StageWithPort<IMonitoringRecord, ?> pipeline = this.buildPipeline(tcpPipeline, clockStage); - this.workerThreads[i] = new Thread(new RunnableStage<IMonitoringRecord>(pipeline)); + StageWithPort pipeline = this.buildPipeline(tcpPipeline.getLastStage(), clockStage.getLastStage()); + this.workerThreads[i] = new Thread(new RunnableStage(pipeline)); } } - private StageWithPort<Void, IMonitoringRecord> buildTcpPipeline() { + private Pipeline<TCPReader, Distributor<IMonitoringRecord>> buildTcpPipeline() { TCPReader tcpReader = new TCPReader(); Distributor<IMonitoringRecord> distributor = new Distributor<IMonitoringRecord>(); SingleElementPipe.connect(tcpReader.getOutputPort(), distributor.getInputPort()); // create and configure pipeline - Pipeline<Void, IMonitoringRecord> pipeline = new Pipeline<Void, IMonitoringRecord>(); + Pipeline<TCPReader, Distributor<IMonitoringRecord>> pipeline = new Pipeline<TCPReader, Distributor<IMonitoringRecord>>(); pipeline.setFirstStage(tcpReader); pipeline.setLastStage(distributor); return pipeline; } - private StageWithPort<Void, Long> buildClockPipeline(final long intervalDelayInMs) { + private Pipeline<Clock, Distributor<Long>> buildClockPipeline(final long intervalDelayInMs) { Clock clock = new Clock(); clock.setInitialDelayInMs(intervalDelayInMs); clock.setIntervalDelayInMs(intervalDelayInMs); @@ -86,24 +86,23 @@ public class TcpTraceReduction extends Analysis { SingleElementPipe.connect(clock.getOutputPort(), distributor.getInputPort()); // create and configure pipeline - Pipeline<Void, Long> pipeline = new Pipeline<Void, Long>(); + Pipeline<Clock, Distributor<Long>> pipeline = new Pipeline<Clock, Distributor<Long>>(); pipeline.setFirstStage(clock); pipeline.setLastStage(distributor); return pipeline; } - private Pipeline<IMonitoringRecord, ?> buildPipeline(final StageWithPort<Void, IMonitoringRecord> tcpReaderPipeline, - final StageWithPort<Void, Long> clockStage) { + private StageWithPort buildPipeline(final Distributor<IMonitoringRecord> tcpReaderPipeline, final Distributor<Long> clockStage) { // create stages Relay<IMonitoringRecord> relay = new Relay<IMonitoringRecord>(); final InstanceOfFilter<IMonitoringRecord, IFlowRecord> instanceOfFilter = new InstanceOfFilter<IMonitoringRecord, IFlowRecord>( IFlowRecord.class); final TraceReconstructionFilter traceReconstructionFilter = new TraceReconstructionFilter(this.traceId2trace); TraceReductionFilter traceReductionFilter = new TraceReductionFilter(this.trace2buffer); - EndStage<TraceEventRecords> endStage = new EndStage<TraceEventRecords>(); + Sink<TraceEventRecords> endStage = new Sink<TraceEventRecords>(); // connect stages - SpScPipe<IMonitoringRecord> tcpRelayPipe = SpScPipe.connect(tcpReaderPipeline.getOutputPort(), relay.getInputPort(), TCP_RELAY_MAX_SIZE); + SpScPipe<IMonitoringRecord> tcpRelayPipe = SpScPipe.connect(tcpReaderPipeline.getNewOutputPort(), relay.getInputPort(), TCP_RELAY_MAX_SIZE); this.tcpRelayPipes.add(tcpRelayPipe); SingleElementPipe.connect(relay.getOutputPort(), instanceOfFilter.getInputPort()); @@ -111,10 +110,10 @@ public class TcpTraceReduction extends Analysis { SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), traceReductionFilter.getInputPort()); SingleElementPipe.connect(traceReductionFilter.getOutputPort(), endStage.getInputPort()); - SpScPipe.connect(clockStage.getOutputPort(), traceReductionFilter.getTriggerInputPort(), 10); + SpScPipe.connect(clockStage.getNewOutputPort(), traceReductionFilter.getTriggerInputPort(), 10); // create and configure pipeline - Pipeline<IMonitoringRecord, TraceEventRecords> pipeline = new Pipeline<IMonitoringRecord, TraceEventRecords>(); + Pipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>> pipeline = new Pipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>>(); pipeline.setFirstStage(relay); pipeline.addIntermediateStage(instanceOfFilter); pipeline.addIntermediateStage(traceReconstructionFilter); diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/recordReader/RecordReaderAnalysis.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/recordReader/RecordReaderAnalysis.java index bc99338571a85498c2cbb3895c4eb4e25f0954d9..b0b778abd40f30ccad76a6555008b136220af182 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/recordReader/RecordReaderAnalysis.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/recordReader/RecordReaderAnalysis.java @@ -22,6 +22,7 @@ import java.util.List; import teetime.variant.explicitScheduling.framework.core.Analysis; import teetime.variant.methodcallWithPorts.framework.core.Pipeline; import teetime.variant.methodcallWithPorts.framework.core.RunnableStage; +import teetime.variant.methodcallWithPorts.framework.core.StageWithPort; import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe; import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe; import teetime.variant.methodcallWithPorts.stage.CollectorSink; @@ -46,21 +47,21 @@ public class RecordReaderAnalysis extends Analysis { @Override public void init() { super.init(); - Pipeline<File, ?> producerPipeline = this.buildProducerPipeline(); - this.producerThread = new Thread(new RunnableStage<File>(producerPipeline)); + StageWithPort producerPipeline = this.buildProducerPipeline(); + this.producerThread = new Thread(new RunnableStage(producerPipeline)); } - private Pipeline<File, Void> buildProducerPipeline() { + private StageWithPort buildProducerPipeline() { this.classNameRegistryRepository = new ClassNameRegistryRepository(); // create stages Dir2RecordsFilter dir2RecordsFilter = new Dir2RecordsFilter(this.classNameRegistryRepository); CollectorSink<IMonitoringRecord> collector = new CollectorSink<IMonitoringRecord>(this.elementCollection); - final Pipeline<File, Void> pipeline = new Pipeline<File, Void>(); + final Pipeline<Dir2RecordsFilter, CollectorSink<IMonitoringRecord>> pipeline = new Pipeline<Dir2RecordsFilter, CollectorSink<IMonitoringRecord>>(); pipeline.setFirstStage(dir2RecordsFilter); pipeline.setLastStage(collector); - SpScPipe.connect(null, dir2RecordsFilter.getInputPort(), 1); + dir2RecordsFilter.getInputPort().setPipe(new SpScPipe<File>(1)); SingleElementPipe.connect(dir2RecordsFilter.getOutputPort(), collector.getInputPort()); dir2RecordsFilter.getInputPort().getPipe().add(new File("src/test/data/bookstore-logs")); diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReading/TcpTraceLoggingExtAnalysis.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReading/TcpTraceLoggingExtAnalysis.java index 2337ec8f6998ba96c0bfa8769817bfbeef6ef3df..0a228f586cd9e5774a1141eadabd55f4ea229949 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReading/TcpTraceLoggingExtAnalysis.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReading/TcpTraceLoggingExtAnalysis.java @@ -11,7 +11,7 @@ import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe; import teetime.variant.methodcallWithPorts.stage.Clock; import teetime.variant.methodcallWithPorts.stage.Counter; import teetime.variant.methodcallWithPorts.stage.ElementThroughputMeasuringStage; -import teetime.variant.methodcallWithPorts.stage.EndStage; +import teetime.variant.methodcallWithPorts.stage.basic.Sink; import teetime.variant.methodcallWithPorts.stage.basic.distributor.Distributor; import teetime.variant.methodcallWithPorts.stage.io.TCPReader; @@ -25,7 +25,7 @@ public class TcpTraceLoggingExtAnalysis extends Analysis { private Counter<IMonitoringRecord> recordCounter; private ElementThroughputMeasuringStage<IMonitoringRecord> recordThroughputStage; - private StageWithPort<Void, Long> buildClockPipeline(final long intervalDelayInMs) { + private Pipeline<Clock, Distributor<Long>> buildClockPipeline(final long intervalDelayInMs) { Clock clockStage = new Clock(); clockStage.setInitialDelayInMs(intervalDelayInMs); clockStage.setIntervalDelayInMs(intervalDelayInMs); @@ -34,26 +34,26 @@ public class TcpTraceLoggingExtAnalysis extends Analysis { SingleElementPipe.connect(clockStage.getOutputPort(), distributor.getInputPort()); // create and configure pipeline - Pipeline<Void, Long> pipeline = new Pipeline<Void, Long>(); + Pipeline<Clock, Distributor<Long>> pipeline = new Pipeline<Clock, Distributor<Long>>(); pipeline.setFirstStage(clockStage); pipeline.setLastStage(distributor); return pipeline; } - private StageWithPort<Void, IMonitoringRecord> buildTcpPipeline(final StageWithPort<Void, Long> clockPipeline) { + private StageWithPort buildTcpPipeline(final Distributor<Long> previousClockStage) { TCPReader tcpReader = new TCPReader(); this.recordCounter = new Counter<IMonitoringRecord>(); this.recordThroughputStage = new ElementThroughputMeasuringStage<IMonitoringRecord>(); - EndStage<IMonitoringRecord> endStage = new EndStage<IMonitoringRecord>(); + Sink<IMonitoringRecord> endStage = new Sink<IMonitoringRecord>(); SingleElementPipe.connect(tcpReader.getOutputPort(), this.recordCounter.getInputPort()); SingleElementPipe.connect(this.recordCounter.getOutputPort(), this.recordThroughputStage.getInputPort()); SingleElementPipe.connect(this.recordThroughputStage.getOutputPort(), endStage.getInputPort()); - SpScPipe.connect(clockPipeline.getOutputPort(), this.recordThroughputStage.getTriggerInputPort(), 10); + SpScPipe.connect(previousClockStage.getNewOutputPort(), this.recordThroughputStage.getTriggerInputPort(), 10); // create and configure pipeline - Pipeline<Void, IMonitoringRecord> pipeline = new Pipeline<Void, IMonitoringRecord>(); + Pipeline<TCPReader, Sink<IMonitoringRecord>> pipeline = new Pipeline<TCPReader, Sink<IMonitoringRecord>>(); pipeline.setFirstStage(tcpReader); pipeline.addIntermediateStage(this.recordCounter); pipeline.addIntermediateStage(this.recordThroughputStage); @@ -65,11 +65,11 @@ public class TcpTraceLoggingExtAnalysis extends Analysis { public void init() { super.init(); - StageWithPort<Void, Long> clockPipeline = this.buildClockPipeline(1000); - this.clockThread = new Thread(new RunnableStage<Void>(clockPipeline)); + Pipeline<Clock, Distributor<Long>> clockPipeline = this.buildClockPipeline(1000); + this.clockThread = new Thread(new RunnableStage(clockPipeline)); - StageWithPort<Void, IMonitoringRecord> tcpPipeline = this.buildTcpPipeline(clockPipeline); - this.tcpThread = new Thread(new RunnableStage<Void>(tcpPipeline)); + StageWithPort tcpPipeline = this.buildTcpPipeline(clockPipeline.getLastStage()); + this.tcpThread = new Thread(new RunnableStage(tcpPipeline)); } @Override diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java index 3fccf84bd51456b60226a389d3aefa67c9276ad3..57bf8054baecae482e50f21c0c60f2ed5dfbf85c 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java @@ -14,8 +14,8 @@ import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe; import teetime.variant.methodcallWithPorts.stage.Clock; import teetime.variant.methodcallWithPorts.stage.Counter; import teetime.variant.methodcallWithPorts.stage.ElementThroughputMeasuringStage; -import teetime.variant.methodcallWithPorts.stage.EndStage; import teetime.variant.methodcallWithPorts.stage.InstanceOfFilter; +import teetime.variant.methodcallWithPorts.stage.basic.Sink; import teetime.variant.methodcallWithPorts.stage.basic.distributor.Distributor; import teetime.variant.methodcallWithPorts.stage.io.TCPReader; import teetime.variant.methodcallWithPorts.stage.kieker.traceReconstruction.TraceReconstructionFilter; @@ -45,17 +45,17 @@ public class TcpTraceReconstructionAnalysis extends Analysis { @Override public void init() { super.init(); - StageWithPort<Void, Long> clockStage = this.buildClockPipeline(1000); - this.clockThread = new Thread(new RunnableStage<Void>(clockStage)); + Pipeline<Clock, Distributor<Long>> clockStage = this.buildClockPipeline(1000); + this.clockThread = new Thread(new RunnableStage(clockStage)); - StageWithPort<Void, Long> clock2Stage = this.buildClockPipeline(2000); - this.clock2Thread = new Thread(new RunnableStage<Void>(clock2Stage)); + Pipeline<Clock, Distributor<Long>> clock2Stage = this.buildClockPipeline(2000); + this.clock2Thread = new Thread(new RunnableStage(clock2Stage)); - Pipeline<Void, ?> pipeline = this.buildPipeline(clockStage, clock2Stage); - this.workerThread = new Thread(new RunnableStage<Void>(pipeline)); + StageWithPort pipeline = this.buildPipeline(clockStage.getLastStage(), clock2Stage.getLastStage()); + this.workerThread = new Thread(new RunnableStage(pipeline)); } - private StageWithPort<Void, Long> buildClockPipeline(final long intervalDelayInMs) { + private Pipeline<Clock, Distributor<Long>> buildClockPipeline(final long intervalDelayInMs) { Clock clock = new Clock(); clock.setIntervalDelayInMs(intervalDelayInMs); Distributor<Long> distributor = new Distributor<Long>(); @@ -63,13 +63,13 @@ public class TcpTraceReconstructionAnalysis extends Analysis { SingleElementPipe.connect(clock.getOutputPort(), distributor.getInputPort()); // create and configure pipeline - Pipeline<Void, Long> pipeline = new Pipeline<Void, Long>(); + Pipeline<Clock, Distributor<Long>> pipeline = new Pipeline<Clock, Distributor<Long>>(); pipeline.setFirstStage(clock); pipeline.setLastStage(distributor); return pipeline; } - private Pipeline<Void, TraceEventRecords> buildPipeline(final StageWithPort<Void, Long> clockStage, final StageWithPort<Void, Long> clock2Stage) { + private StageWithPort buildPipeline(final Distributor<Long> clockStage, final Distributor<Long> clock2Stage) { // create stages TCPReader tcpReader = new TCPReader(); this.recordCounter = new Counter<IMonitoringRecord>(); @@ -79,7 +79,7 @@ public class TcpTraceReconstructionAnalysis extends Analysis { final TraceReconstructionFilter traceReconstructionFilter = new TraceReconstructionFilter(this.traceId2trace); this.traceThroughputFilter = new ElementThroughputMeasuringStage<TraceEventRecords>(); this.traceCounter = new Counter<TraceEventRecords>(); - EndStage<TraceEventRecords> endStage = new EndStage<TraceEventRecords>(); + Sink<TraceEventRecords> endStage = new Sink<TraceEventRecords>(); // connect stages SpScPipe.connect(tcpReader.getOutputPort(), this.recordCounter.getInputPort(), TCP_RELAY_MAX_SIZE); @@ -92,11 +92,11 @@ public class TcpTraceReconstructionAnalysis extends Analysis { SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), this.traceCounter.getInputPort()); SingleElementPipe.connect(this.traceCounter.getOutputPort(), endStage.getInputPort()); - SpScPipe.connect(clockStage.getOutputPort(), this.recordThroughputFilter.getTriggerInputPort(), 10); - SpScPipe.connect(clock2Stage.getOutputPort(), this.traceThroughputFilter.getTriggerInputPort(), 10); + SpScPipe.connect(clockStage.getNewOutputPort(), this.recordThroughputFilter.getTriggerInputPort(), 10); + SpScPipe.connect(clock2Stage.getNewOutputPort(), this.traceThroughputFilter.getTriggerInputPort(), 10); // create and configure pipeline - Pipeline<Void, TraceEventRecords> pipeline = new Pipeline<Void, TraceEventRecords>(); + Pipeline<TCPReader, Sink<TraceEventRecords>> pipeline = new Pipeline<TCPReader, Sink<TraceEventRecords>>(); pipeline.setFirstStage(tcpReader); pipeline.addIntermediateStage(this.recordCounter); pipeline.addIntermediateStage(instanceOfFilter); diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TraceReconstructionAnalysis.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TraceReconstructionAnalysis.java index cbfe94b3c5702bc7235f2157e2bb4b179c9e2730..26ae3b495719efa43aded35d8b76ea6f9108f4b4 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TraceReconstructionAnalysis.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TraceReconstructionAnalysis.java @@ -48,21 +48,21 @@ public class TraceReconstructionAnalysis extends Analysis { @Override public void init() { super.init(); - StageWithPort<Void, Long> clockStage = this.buildClockPipeline(); - this.clockThread = new Thread(new RunnableStage<Void>(clockStage)); + Clock clockStage = this.buildClockPipeline(); + this.clockThread = new Thread(new RunnableStage(clockStage)); - Pipeline<File, ?> pipeline = this.buildPipeline(clockStage); - this.workerThread = new Thread(new RunnableStage<File>(pipeline)); + StageWithPort pipeline = this.buildPipeline(clockStage); + this.workerThread = new Thread(new RunnableStage(pipeline)); } - private StageWithPort<Void, Long> buildClockPipeline() { + private Clock buildClockPipeline() { Clock clock = new Clock(); clock.setIntervalDelayInMs(100); return clock; } - private Pipeline<File, Void> buildPipeline(final StageWithPort<Void, Long> clockStage) { + private StageWithPort buildPipeline(final Clock clockStage) { this.classNameRegistryRepository = new ClassNameRegistryRepository(); // create stages @@ -100,7 +100,7 @@ public class TraceReconstructionAnalysis extends Analysis { dir2RecordsFilter.getInputPort().getPipe().add(this.inputDir); // create and configure pipeline - Pipeline<File, Void> pipeline = new Pipeline<File, Void>(); + Pipeline<Dir2RecordsFilter, CollectorSink<TraceEventRecords>> pipeline = new Pipeline<Dir2RecordsFilter, CollectorSink<TraceEventRecords>>(); pipeline.setFirstStage(dir2RecordsFilter); pipeline.addIntermediateStage(this.recordCounter); pipeline.addIntermediateStage(cache); diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreads.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreads.java index f82567ba472ff9aafb1c64111b8cba4c5faf4bc4..0817310f8f0a2e9bbb812632ff83e8cafe11fdbb 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreads.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreads.java @@ -18,10 +18,10 @@ import teetime.variant.methodcallWithPorts.stage.Clock; import teetime.variant.methodcallWithPorts.stage.Counter; import teetime.variant.methodcallWithPorts.stage.ElementDelayMeasuringStage; import teetime.variant.methodcallWithPorts.stage.ElementThroughputMeasuringStage; -import teetime.variant.methodcallWithPorts.stage.EndStage; import teetime.variant.methodcallWithPorts.stage.InstanceCounter; import teetime.variant.methodcallWithPorts.stage.InstanceOfFilter; import teetime.variant.methodcallWithPorts.stage.Relay; +import teetime.variant.methodcallWithPorts.stage.basic.Sink; import teetime.variant.methodcallWithPorts.stage.basic.distributor.Distributor; import teetime.variant.methodcallWithPorts.stage.io.TCPReader; import teetime.variant.methodcallWithPorts.stage.kieker.traceReconstruction.TraceReconstructionFilter; @@ -49,38 +49,38 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis { @Override public void init() { super.init(); - StageWithPort<Void, IMonitoringRecord> tcpPipeline = this.buildTcpPipeline(); - this.tcpThread = new Thread(new RunnableStage<Void>(tcpPipeline)); + Pipeline<TCPReader, Distributor<IMonitoringRecord>> tcpPipeline = this.buildTcpPipeline(); + this.tcpThread = new Thread(new RunnableStage(tcpPipeline)); - StageWithPort<Void, Long> clockStage = this.buildClockPipeline(1000); - this.clockThread = new Thread(new RunnableStage<Void>(clockStage)); + Pipeline<Clock, Distributor<Long>> clockStage = this.buildClockPipeline(1000); + this.clockThread = new Thread(new RunnableStage(clockStage)); - StageWithPort<Void, Long> clock2Stage = this.buildClockPipeline(2000); - this.clock2Thread = new Thread(new RunnableStage<Void>(clock2Stage)); + Pipeline<Clock, Distributor<Long>> clock2Stage = this.buildClockPipeline(2000); + this.clock2Thread = new Thread(new RunnableStage(clock2Stage)); this.numWorkerThreads = Math.min(NUM_VIRTUAL_CORES, this.numWorkerThreads); this.workerThreads = new Thread[this.numWorkerThreads]; for (int i = 0; i < this.workerThreads.length; i++) { - StageWithPort<IMonitoringRecord, ?> pipeline = this.buildPipeline(tcpPipeline, clockStage, clock2Stage); - this.workerThreads[i] = new Thread(new RunnableStage<IMonitoringRecord>(pipeline)); + StageWithPort pipeline = this.buildPipeline(tcpPipeline.getLastStage(), clockStage.getLastStage(), clock2Stage.getLastStage()); + this.workerThreads[i] = new Thread(new RunnableStage(pipeline)); } } - private StageWithPort<Void, IMonitoringRecord> buildTcpPipeline() { + private Pipeline<TCPReader, Distributor<IMonitoringRecord>> buildTcpPipeline() { TCPReader tcpReader = new TCPReader(); Distributor<IMonitoringRecord> distributor = new Distributor<IMonitoringRecord>(); SingleElementPipe.connect(tcpReader.getOutputPort(), distributor.getInputPort()); // create and configure pipeline - Pipeline<Void, IMonitoringRecord> pipeline = new Pipeline<Void, IMonitoringRecord>("TCP reader pipeline"); + Pipeline<TCPReader, Distributor<IMonitoringRecord>> pipeline = new Pipeline<TCPReader, Distributor<IMonitoringRecord>>("TCP reader pipeline"); pipeline.setFirstStage(tcpReader); pipeline.setLastStage(distributor); return pipeline; } - private StageWithPort<Void, Long> buildClockPipeline(final long intervalDelayInMs) { + private Pipeline<Clock, Distributor<Long>> buildClockPipeline(final long intervalDelayInMs) { Clock clock = new Clock(); clock.setInitialDelayInMs(intervalDelayInMs); clock.setIntervalDelayInMs(intervalDelayInMs); @@ -89,13 +89,13 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis { SingleElementPipe.connect(clock.getOutputPort(), distributor.getInputPort()); // create and configure pipeline - Pipeline<Void, Long> pipeline = new Pipeline<Void, Long>(); + Pipeline<Clock, Distributor<Long>> pipeline = new Pipeline<Clock, Distributor<Long>>(); pipeline.setFirstStage(clock); pipeline.setLastStage(distributor); return pipeline; } - private static class StageFactory<T extends StageWithPort<?, ?>> { + private static class StageFactory<T extends StageWithPort> { private final Constructor<T> constructor; private final List<T> stages = new ArrayList<T>(); @@ -154,8 +154,8 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis { } } - private Pipeline<IMonitoringRecord, ?> buildPipeline(final StageWithPort<Void, IMonitoringRecord> tcpReaderPipeline, - final StageWithPort<Void, Long> clockStage, final StageWithPort<Void, Long> clock2Stage) { + private StageWithPort buildPipeline(final Distributor<IMonitoringRecord> tcpReaderPipeline, + final Distributor<Long> clockStage, final Distributor<Long> clock2Stage) { // create stages Relay<IMonitoringRecord> relay = new Relay<IMonitoringRecord>(); Counter<IMonitoringRecord> recordCounter = this.recordCounterFactory.create(); @@ -168,16 +168,16 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis { final TraceReconstructionFilter traceReconstructionFilter = this.traceReconstructionFilterFactory.create(this.traceId2trace); Counter<TraceEventRecords> traceCounter = this.traceCounterFactory.create(); ElementThroughputMeasuringStage<TraceEventRecords> traceThroughputFilter = this.traceThroughputFilterFactory.create(); - EndStage<TraceEventRecords> endStage = new EndStage<TraceEventRecords>(); + Sink<TraceEventRecords> endStage = new Sink<TraceEventRecords>(); // EndStage<IMonitoringRecord> endStage = new EndStage<IMonitoringRecord>(); // connect stages - SpScPipe<IMonitoringRecord> tcpRelayPipe = SpScPipe.connect(tcpReaderPipeline.getOutputPort(), relay.getInputPort(), TCP_RELAY_MAX_SIZE); + SpScPipe<IMonitoringRecord> tcpRelayPipe = SpScPipe.connect(tcpReaderPipeline.getNewOutputPort(), relay.getInputPort(), TCP_RELAY_MAX_SIZE); this.tcpRelayPipes.add(tcpRelayPipe); // SysOutFilter<TraceEventRecords> sysout = new SysOutFilter<TraceEventRecords>(tcpRelayPipe); - SpScPipe.connect(clockStage.getOutputPort(), recordThroughputFilter.getTriggerInputPort(), 10); - SpScPipe.connect(clock2Stage.getOutputPort(), traceThroughputFilter.getTriggerInputPort(), 10); + SpScPipe.connect(clockStage.getNewOutputPort(), recordThroughputFilter.getTriggerInputPort(), 10); + SpScPipe.connect(clock2Stage.getNewOutputPort(), traceThroughputFilter.getTriggerInputPort(), 10); SingleElementPipe.connect(relay.getOutputPort(), recordCounter.getInputPort()); SingleElementPipe.connect(recordCounter.getOutputPort(), recordThroughputFilter.getInputPort()); @@ -190,7 +190,7 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis { SingleElementPipe.connect(traceCounter.getOutputPort(), endStage.getInputPort()); // create and configure pipeline - Pipeline<IMonitoringRecord, TraceEventRecords> pipeline = new Pipeline<IMonitoringRecord, TraceEventRecords>("Worker pipeline"); + Pipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>> pipeline = new Pipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>>("Worker pipeline"); pipeline.setFirstStage(relay); pipeline.addIntermediateStage(recordCounter); pipeline.addIntermediateStage(recordThroughputFilter);