From b93e2ab2e733220524f71ba5034658838c70fd2a Mon Sep 17 00:00:00 2001 From: Christian Wulf <chw@informatik.uni-kiel.de> Date: Thu, 26 Jun 2014 12:11:06 +0200 Subject: [PATCH] added cached access on target stage --- .settings/org.moreunit.prefs | 2 ++ .../framework/core/AbstractStage.java | 9 ++++++++- .../framework/core/InputPort.java | 5 +++++ .../framework/core/OutputPort.java | 17 +++++++++++++++++ .../framework/core/Pipeline.java | 2 ++ .../core/pipe/OrderedGrowableArrayPipe.java | 1 + .../core/pipe/OrderedGrowablePipe.java | 1 + .../framework/core/pipe/Pipe.java | 1 + .../framework/core/pipe/SingleElementPipe.java | 1 + .../framework/core/pipe/SpScPipe.java | 1 + .../core/pipe/UnorderedGrowablePipe.java | 1 + .../stage/CollectorSink.java | 2 +- .../MethodCallThroughputAnalysis15.java | 15 ++++++++------- 13 files changed, 49 insertions(+), 9 deletions(-) create mode 100644 .settings/org.moreunit.prefs diff --git a/.settings/org.moreunit.prefs b/.settings/org.moreunit.prefs new file mode 100644 index 00000000..b23adba0 --- /dev/null +++ b/.settings/org.moreunit.prefs @@ -0,0 +1,2 @@ +eclipse.preferences.version=1 +org.moreunit.preferences.version=2 diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java index 6e74f0ca..f54fa344 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java @@ -22,6 +22,11 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> { private boolean reschedulable; + /** + * cached successor for default output port + */ + private StageWithPort<?, ?> next; + public AbstractStage() { this.id = UUID.randomUUID().toString(); // the id should only be represented by a UUID, not additionally by the class name this.logger = LogFactory.getLog(this.getClass().getName() + "(" + this.id + ")"); @@ -81,7 +86,8 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> { protected final void send(final OutputPort<O> outputPort, final O element) { outputPort.send(element); - StageWithPort<?, ?> next = outputPort.getPipe().getTargetPort().getOwningStage(); + // StageWithPort<?, ?> next = outputPort.getPipe().getTargetPort().getOwningStage(); + StageWithPort<?, ?> next = outputPort.getCachedTargetStage(); do { next.executeWithPorts(); @@ -102,6 +108,7 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> { @Override public void onStart() { // empty default implementation + this.next = (this.outputPort.getPipe() != null) ? this.outputPort.getPipe().getTargetPort().getOwningStage() : null; } @Override diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/InputPort.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/InputPort.java index d55ad342..6a614b76 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/InputPort.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/InputPort.java @@ -26,6 +26,11 @@ public class InputPort<T> { return this.pipe; } + /** + * Connects this input port with the given <code>pipe</code> bi-directionally + * + * @param pipe + */ public void setPipe(final IPipe<T> pipe) { this.pipe = pipe; pipe.setTargetPort(this); diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/OutputPort.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/OutputPort.java index b426506e..36ce913f 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/OutputPort.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/OutputPort.java @@ -5,6 +5,14 @@ import teetime.variant.methodcallWithPorts.framework.core.pipe.IPipe; public class OutputPort<T> { private IPipe<T> pipe; + /** + * Performance cache: Avoids the following method chain + * + * <pre> + * this.getPipe().getTargetPort().getOwningStage() + * </pre> + */ + private StageWithPort<?, ?> cachedTargetStage; public void send(final T element) { this.pipe.add(element); @@ -17,4 +25,13 @@ public class OutputPort<T> { public void setPipe(final IPipe<T> pipe) { this.pipe = pipe; } + + public StageWithPort<?, ?> getCachedTargetStage() { + return this.cachedTargetStage; + } + + public void setCachedTargetStage(final StageWithPort<?, ?> cachedTargetStage) { + this.cachedTargetStage = cachedTargetStage; + } + } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Pipeline.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Pipeline.java index 35ec2275..04ec7240 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Pipeline.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Pipeline.java @@ -22,6 +22,8 @@ public class Pipeline<I, O> implements StageWithPort<I, O> { private final List<StageWithPort<?, ?>> intermediateStages = new LinkedList<StageWithPort<?, ?>>(); private StageWithPort<?, O> lastStage; + // BETTER remove the stage array and use the output ports instead for passing a signal to all stages in the same thread; what about multiple same signals due to + // multiple input ports? private StageWithPort<?, ?>[] stages; private StageWithPort<?, ?> parentStage; // private int startIndex; diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowableArrayPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowableArrayPipe.java index 109824ff..ec4fa425 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowableArrayPipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowableArrayPipe.java @@ -22,6 +22,7 @@ public class OrderedGrowableArrayPipe<T> extends AbstractPipe<T> { IPipe<T> pipe = new OrderedGrowableArrayPipe<T>(); sourcePort.setPipe(pipe); targetPort.setPipe(pipe); + sourcePort.setCachedTargetStage(targetPort.getOwningStage()); } @Override diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowablePipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowablePipe.java index 4c36c839..d7eb9a01 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowablePipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowablePipe.java @@ -21,6 +21,7 @@ public class OrderedGrowablePipe<T> extends AbstractPipe<T> { IPipe<T> pipe = new OrderedGrowablePipe<T>(); sourcePort.setPipe(pipe); targetPort.setPipe(pipe); + sourcePort.setCachedTargetStage(targetPort.getOwningStage()); } @Override diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/Pipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/Pipe.java index 79321208..b89f27ad 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/Pipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/Pipe.java @@ -12,6 +12,7 @@ public class Pipe<T> extends AbstractPipe<T> { IPipe<T> pipe = new Pipe<T>(); sourcePort.setPipe(pipe); targetPort.setPipe(pipe); + sourcePort.setCachedTargetStage(targetPort.getOwningStage()); } /* diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipe.java index 9440263d..c9edc536 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipe.java @@ -12,6 +12,7 @@ public class SingleElementPipe<T> extends AbstractPipe<T> { IPipe<T> pipe = new SingleElementPipe<T>(); sourcePort.setPipe(pipe); targetPort.setPipe(pipe); + sourcePort.setCachedTargetStage(targetPort.getOwningStage()); } @Override diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java index ece1f1d0..f55d3257 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java @@ -16,6 +16,7 @@ public class SpScPipe<T> extends AbstractPipe<T> { IPipe<T> pipe = new SpScPipe<T>(initialCapacity); sourcePort.setPipe(pipe); targetPort.setPipe(pipe); + sourcePort.setCachedTargetStage(targetPort.getOwningStage()); } @Override diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/UnorderedGrowablePipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/UnorderedGrowablePipe.java index 48c2afa1..729b651e 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/UnorderedGrowablePipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/UnorderedGrowablePipe.java @@ -46,6 +46,7 @@ public class UnorderedGrowablePipe<T> extends AbstractPipe<T> { IPipe<T> pipe = new UnorderedGrowablePipe<T>(); sourcePort.setPipe(pipe); targetPort.setPipe(pipe); + sourcePort.setCachedTargetStage(targetPort.getOwningStage()); } @Override diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/CollectorSink.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/CollectorSink.java index f72b987b..f3492e26 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/CollectorSink.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/CollectorSink.java @@ -27,7 +27,7 @@ import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage; */ public class CollectorSink<T> extends ConsumerStage<T, Object> { - private static final int THRESHOLD = 100; // TODO make configurable or use an sysout stage instead + private static final int THRESHOLD = 10000; // TODO make configurable or use an sysout stage instead private final List<T> elements; diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment15/MethodCallThroughputAnalysis15.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment15/MethodCallThroughputAnalysis15.java index 0ef9b18b..134d80ae 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment15/MethodCallThroughputAnalysis15.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment15/MethodCallThroughputAnalysis15.java @@ -22,8 +22,9 @@ import teetime.variant.explicitScheduling.examples.throughput.TimestampObject; import teetime.variant.explicitScheduling.framework.core.Analysis; import teetime.variant.methodcallWithPorts.framework.core.Pipeline; import teetime.variant.methodcallWithPorts.framework.core.RunnableStage; +import teetime.variant.methodcallWithPorts.framework.core.pipe.OrderedGrowableArrayPipe; +import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe; import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe; -import teetime.variant.methodcallWithPorts.framework.core.pipe.UnorderedGrowablePipe; import teetime.variant.methodcallWithPorts.stage.Clock; import teetime.variant.methodcallWithPorts.stage.CollectorSink; import teetime.variant.methodcallWithPorts.stage.Delay; @@ -99,15 +100,15 @@ public class MethodCallThroughputAnalysis15 extends Analysis { SpScPipe.connect(clock.getOutputPort(), delay.getTimestampTriggerInputPort(), SPSC_INITIAL_CAPACITY); - UnorderedGrowablePipe.connect(objectProducer.getOutputPort(), startTimestampFilter.getInputPort()); - UnorderedGrowablePipe.connect(startTimestampFilter.getOutputPort(), noopFilters[0].getInputPort()); + SingleElementPipe.connect(objectProducer.getOutputPort(), startTimestampFilter.getInputPort()); + SingleElementPipe.connect(startTimestampFilter.getOutputPort(), noopFilters[0].getInputPort()); for (int i = 0; i < noopFilters.length - 1; i++) { - UnorderedGrowablePipe.connect(noopFilters[i].getOutputPort(), noopFilters[i + 1].getInputPort()); + SingleElementPipe.connect(noopFilters[i].getOutputPort(), noopFilters[i + 1].getInputPort()); } - UnorderedGrowablePipe.connect(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort()); - UnorderedGrowablePipe.connect(stopTimestampFilter.getOutputPort(), delay.getInputPort()); + SingleElementPipe.connect(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort()); + OrderedGrowableArrayPipe.connect(stopTimestampFilter.getOutputPort(), delay.getInputPort()); - UnorderedGrowablePipe.connect(delay.getOutputPort(), collectorSink.getInputPort()); + SingleElementPipe.connect(delay.getOutputPort(), collectorSink.getInputPort()); return new RunnableStage(pipeline); } -- GitLab