diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java index 80bfd8df443e5773328216c776845a4d0d653e43..ab8a4be910f9e580b59f809a8232dff499113356 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java @@ -48,11 +48,13 @@ public abstract class AbstractStage implements StageWithPort { return false; } - StageWithPort next = outputPort.getCachedTargetStage(); + outputPort.reportNewElement(); - do { - next.executeWithPorts(); // PERFORMANCE use the return value as indicator for re-schedulability instead - } while (next.isReschedulable()); + // StageWithPort next = outputPort.getCachedTargetStage(); + // + // do { + // next.executeWithPorts(); // PERFORMANCE use the return value as indicator for re-schedulability instead + // } while (next.isReschedulable()); return true; } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/OutputPort.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/OutputPort.java index 97d66d0521a617757332a4aee9fbf894584ffe36..3ce89029e00b078f8b964ead6bb7ff2975da6b60 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/OutputPort.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/OutputPort.java @@ -2,7 +2,7 @@ package teetime.variant.methodcallWithPorts.framework.core; import teetime.variant.methodcallWithPorts.framework.core.signal.Signal; -public class OutputPort<T> extends AbstractPort<T> { +public final class OutputPort<T> extends AbstractPort<T> { /** * Performance cache: Avoids the following method chain @@ -11,7 +11,7 @@ public class OutputPort<T> extends AbstractPort<T> { * this.getPipe().getTargetPort().getOwningStage() * </pre> */ - private StageWithPort cachedTargetStage; + // private StageWithPort cachedTargetStage; OutputPort() { super(); @@ -26,16 +26,21 @@ public class OutputPort<T> extends AbstractPort<T> { return this.pipe.add(element); } - public StageWithPort getCachedTargetStage() { - return this.cachedTargetStage; - } + // public StageWithPort getCachedTargetStage() { + // return this.cachedTargetStage; + // } + @Deprecated public void setCachedTargetStage(final StageWithPort cachedTargetStage) { - this.cachedTargetStage = cachedTargetStage; + // this.cachedTargetStage = cachedTargetStage; } public void sendSignal(final Signal signal) { this.pipe.setSignal(signal); } + public void reportNewElement() { + this.pipe.reportNewElement(); + } + } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/AbstractPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/AbstractPipe.java index 386bf62b2ef8596020bad8e1c3250dd3f294effb..e914afef5501ec8223e9aeddda7dcd88ef90fe76 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/AbstractPipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/AbstractPipe.java @@ -1,21 +1,20 @@ package teetime.variant.methodcallWithPorts.framework.core.pipe; import teetime.variant.methodcallWithPorts.framework.core.InputPort; +import teetime.variant.methodcallWithPorts.framework.core.StageWithPort; public abstract class AbstractPipe<T> implements IPipe<T> { - // private final AtomicBoolean closed = new AtomicBoolean(); private InputPort<T> targetPort; - // @Override - // public boolean isClosed() { - // return this.closed.get(); - // } - // - // @Override - // public void close() { - // this.closed.lazySet(true); // lazySet is legal due to our single-writer requirement - // } + /** + * Performance cache: Avoids the following method chain + * + * <pre> + * this.getPipe().getTargetPort().getOwningStage() + * </pre> + */ + protected StageWithPort cachedTargetStage; @Override public InputPort<T> getTargetPort() { @@ -25,6 +24,7 @@ public abstract class AbstractPipe<T> implements IPipe<T> { @Override public void setTargetPort(final InputPort<T> targetPort) { this.targetPort = targetPort; + this.cachedTargetStage = targetPort.getOwningStage(); } } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/DummyPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/DummyPipe.java index 498ada08f792229067e685bcd9f9adf27cadf522..4c8c195f870b0e2f3c8b3e0d2a87ae3b515c142d 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/DummyPipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/DummyPipe.java @@ -52,4 +52,9 @@ public final class DummyPipe implements IPipe { @Override public void connectPorts(final OutputPort sourcePort, final InputPort targetPort) {} + @Override + public void reportNewElement() { + // do nothing + } + } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IPipe.java index a5f2519919e54b6915f010a9cdc6f7caa432038b..7362f6fdaf7638a3317e0e0859d774931e94571d 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IPipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IPipe.java @@ -29,4 +29,6 @@ public interface IPipe<T> { // BETTER change signature to allow {OutputPort<T>, OutputPort<A0 extends T>, OutputPort<A1 extends T>, ...} void connectPorts(OutputPort<T> sourcePort, InputPort<T> targetPort); + void reportNewElement(); + } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IntraThreadPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IntraThreadPipe.java index 116d2903a0b699c7ebcbfad158f5787cd2eddd75..1e78c8b418e623adbbb0450d33341b3d0b89d903 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IntraThreadPipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IntraThreadPipe.java @@ -7,8 +7,13 @@ public abstract class IntraThreadPipe<T> extends AbstractPipe<T> { @Override public void setSignal(final Signal signal) { if (this.getTargetPort() != null) { - this.getTargetPort().getOwningStage().onSignal(signal, this.getTargetPort()); + this.cachedTargetStage.onSignal(signal, this.getTargetPort()); } } + @Override + public final void reportNewElement() { + this.cachedTargetStage.executeWithPorts(); + } + } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java index 8ceb292719e062cb6253b6d001d3f08ac2cbde93..22b5c6a41ee5a25916f4e3c5644a9b134b738745 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java @@ -83,4 +83,9 @@ public class SpScPipe<T> extends AbstractPipe<T> { return this.signal.get(); } + @Override + public void reportNewElement() { + // do nothing + } + } diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/ChwWorkComparisonMethodcallWithPorts.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/ChwWorkComparisonMethodcallWithPorts.java index 0dd3840c9ab315928842ee91714526180d2a2168..e8c6d799a81537f6d61cc7ab20acaf5930ecce01 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/ChwWorkComparisonMethodcallWithPorts.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/ChwWorkComparisonMethodcallWithPorts.java @@ -89,10 +89,10 @@ public class ChwWorkComparisonMethodcallWithPorts implements PerformanceCheckPro // assertEquals(53, value17, 4.1); // +0 // since 27.08.2014 (incl.) - assertEquals(112, value14, 5.1); // +16 - assertEquals(42, value10, 2.1); // +16 - assertEquals(41, value11, 4.1); // -3 - assertEquals(42, value9, 2.1); // +6 + assertEquals(102, value14, 5.1); // +16 + assertEquals(56, value10, 2.1); // +30 + assertEquals(64, value11, 4.1); // +15 + assertEquals(77, value9, 2.1); // +35 assertEquals(44, value15, 4.1); // +0 assertEquals(53, value17, 4.1); // +0 diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment14/MethodCallThroughputAnalysis14.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment14/MethodCallThroughputAnalysis14.java index e62d0572dafbe7de40df68b01811dd8ee3dfad34..8f50cbe13880243478686ba947f21d75637b5448 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment14/MethodCallThroughputAnalysis14.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment14/MethodCallThroughputAnalysis14.java @@ -23,7 +23,9 @@ import teetime.variant.explicitScheduling.framework.core.Analysis; import teetime.variant.methodcallWithPorts.framework.core.Pipeline; import teetime.variant.methodcallWithPorts.framework.core.RunnableStage; import teetime.variant.methodcallWithPorts.framework.core.StageWithPort; -import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe; +import teetime.variant.methodcallWithPorts.framework.core.pipe.IPipe; +import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory; +import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.ThreadCommunication; import teetime.variant.methodcallWithPorts.stage.CollectorSink; import teetime.variant.methodcallWithPorts.stage.NoopFilter; import teetime.variant.methodcallWithPorts.stage.ObjectProducer; @@ -32,13 +34,11 @@ import teetime.variant.methodcallWithPorts.stage.StopTimestampFilter; /** * @author Christian Wulf - * + * * @since 1.10 */ public class MethodCallThroughputAnalysis14 extends Analysis { - private static final int SPSC_INITIAL_CAPACITY = 4; - private long numInputObjects; private ConstructorClosure<TimestampObject> inputObjectCreator; private int numNoopFilters; @@ -75,13 +75,19 @@ public class MethodCallThroughputAnalysis14 extends Analysis { pipeline.addIntermediateStage(stopTimestampFilter); pipeline.setLastStage(collectorSink); - SpScPipe.connect(objectProducer.getOutputPort(), startTimestampFilter.getInputPort(), SPSC_INITIAL_CAPACITY); - SpScPipe.connect(startTimestampFilter.getOutputPort(), noopFilters[0].getInputPort(), SPSC_INITIAL_CAPACITY); + PipeFactory pipeFactory = new PipeFactory(); + IPipe<TimestampObject> pipe = pipeFactory.create(ThreadCommunication.INTRA); + pipe.connectPorts(objectProducer.getOutputPort(), startTimestampFilter.getInputPort()); + pipe = pipeFactory.create(ThreadCommunication.INTRA); + pipe.connectPorts(startTimestampFilter.getOutputPort(), noopFilters[0].getInputPort()); for (int i = 0; i < noopFilters.length - 1; i++) { - SpScPipe.connect(noopFilters[i].getOutputPort(), noopFilters[i + 1].getInputPort(), SPSC_INITIAL_CAPACITY); + pipe = pipeFactory.create(ThreadCommunication.INTRA); + pipe.connectPorts(noopFilters[i].getOutputPort(), noopFilters[i + 1].getInputPort()); } - SpScPipe.connect(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort(), SPSC_INITIAL_CAPACITY); - SpScPipe.connect(stopTimestampFilter.getOutputPort(), collectorSink.getInputPort(), SPSC_INITIAL_CAPACITY); + pipe = pipeFactory.create(ThreadCommunication.INTRA); + pipe.connectPorts(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort()); + pipe = pipeFactory.create(ThreadCommunication.INTRA); + pipe.connectPorts(stopTimestampFilter.getOutputPort(), collectorSink.getInputPort()); return pipeline; }