diff --git a/.settings/org.moreunit.prefs b/.settings/org.moreunit.prefs
new file mode 100644
index 0000000000000000000000000000000000000000..b23adba051f1b921d58e03e5183f02b2627c5ce1
--- /dev/null
+++ b/.settings/org.moreunit.prefs
@@ -0,0 +1,2 @@
+eclipse.preferences.version=1
+org.moreunit.preferences.version=2
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java
index 6e74f0ca4a4d090cccb81712e82c5bb05048e796..f54fa344dceafa4196316e3200ac99cad014b141 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java
@@ -22,6 +22,11 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> {
private boolean reschedulable;
+ /**
+ * cached successor for default output port
+ */
+ private StageWithPort<?, ?> next;
+
public AbstractStage() {
this.id = UUID.randomUUID().toString(); // the id should only be represented by a UUID, not additionally by the class name
this.logger = LogFactory.getLog(this.getClass().getName() + "(" + this.id + ")");
@@ -81,7 +86,8 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> {
protected final void send(final OutputPort<O> outputPort, final O element) {
outputPort.send(element);
- StageWithPort<?, ?> next = outputPort.getPipe().getTargetPort().getOwningStage();
+ // StageWithPort<?, ?> next = outputPort.getPipe().getTargetPort().getOwningStage();
+ StageWithPort<?, ?> next = outputPort.getCachedTargetStage();
do {
next.executeWithPorts();
@@ -102,6 +108,7 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> {
@Override
public void onStart() {
// empty default implementation
+ this.next = (this.outputPort.getPipe() != null) ? this.outputPort.getPipe().getTargetPort().getOwningStage() : null;
}
@Override
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/InputPort.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/InputPort.java
index d55ad342259a9699e06cd1d84e7fd306f0927729..6a614b766246772ac595be4e1796afc8ddd9e160 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/InputPort.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/InputPort.java
@@ -26,6 +26,11 @@ public class InputPort<T> {
return this.pipe;
}
+ /**
+ * Connects this input port with the given <code>pipe</code> bi-directionally
+ *
+ * @param pipe
+ */
public void setPipe(final IPipe<T> pipe) {
this.pipe = pipe;
pipe.setTargetPort(this);
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/OutputPort.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/OutputPort.java
index b426506e35b586d6722b3e38b032b8eddf43ad2a..36ce913f749d1a1723189fd76a5c65d81d9c9ef1 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/OutputPort.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/OutputPort.java
@@ -5,6 +5,14 @@ import teetime.variant.methodcallWithPorts.framework.core.pipe.IPipe;
public class OutputPort<T> {
private IPipe<T> pipe;
+ /**
+ * Performance cache: Avoids the following method chain
+ *
+ * <pre>
+ * this.getPipe().getTargetPort().getOwningStage()
+ * </pre>
+ */
+ private StageWithPort<?, ?> cachedTargetStage;
public void send(final T element) {
this.pipe.add(element);
@@ -17,4 +25,13 @@ public class OutputPort<T> {
public void setPipe(final IPipe<T> pipe) {
this.pipe = pipe;
}
+
+ public StageWithPort<?, ?> getCachedTargetStage() {
+ return this.cachedTargetStage;
+ }
+
+ public void setCachedTargetStage(final StageWithPort<?, ?> cachedTargetStage) {
+ this.cachedTargetStage = cachedTargetStage;
+ }
+
}
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Pipeline.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Pipeline.java
index 35ec227550545ec5713be7d01dae33db6f4ff9a4..04ec724025bb280854c249b3914eb873598f4e49 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Pipeline.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Pipeline.java
@@ -22,6 +22,8 @@ public class Pipeline<I, O> implements StageWithPort<I, O> {
private final List<StageWithPort<?, ?>> intermediateStages = new LinkedList<StageWithPort<?, ?>>();
private StageWithPort<?, O> lastStage;
+ // BETTER remove the stage array and use the output ports instead for passing a signal to all stages in the same thread; what about multiple same signals due to
+ // multiple input ports?
private StageWithPort<?, ?>[] stages;
private StageWithPort<?, ?> parentStage;
// private int startIndex;
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowableArrayPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowableArrayPipe.java
index 109824ffda5e28b4c4c287b29b1e1aa41c7495a4..ec4fa42520927ad92c3217c3f6a137e3f61ae006 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowableArrayPipe.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowableArrayPipe.java
@@ -22,6 +22,7 @@ public class OrderedGrowableArrayPipe<T> extends AbstractPipe<T> {
IPipe<T> pipe = new OrderedGrowableArrayPipe<T>();
sourcePort.setPipe(pipe);
targetPort.setPipe(pipe);
+ sourcePort.setCachedTargetStage(targetPort.getOwningStage());
}
@Override
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowablePipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowablePipe.java
index 4c36c839d551d521447797d0b679afc4c551c813..d7eb9a01800dad2f740f9193e9284acb9d916d9d 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowablePipe.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowablePipe.java
@@ -21,6 +21,7 @@ public class OrderedGrowablePipe<T> extends AbstractPipe<T> {
IPipe<T> pipe = new OrderedGrowablePipe<T>();
sourcePort.setPipe(pipe);
targetPort.setPipe(pipe);
+ sourcePort.setCachedTargetStage(targetPort.getOwningStage());
}
@Override
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/Pipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/Pipe.java
index 79321208b4cdfea93782d44f280276595904dba9..b89f27ad4d94ef7e868b7fdfe44acf1e09f3d69a 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/Pipe.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/Pipe.java
@@ -12,6 +12,7 @@ public class Pipe<T> extends AbstractPipe<T> {
IPipe<T> pipe = new Pipe<T>();
sourcePort.setPipe(pipe);
targetPort.setPipe(pipe);
+ sourcePort.setCachedTargetStage(targetPort.getOwningStage());
}
/*
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipe.java
index 9440263d549b3b1262cf71c58a4f311a4c7ac13a..c9edc53621e5fae9dc91636e20c312694f39b4ba 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipe.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipe.java
@@ -12,6 +12,7 @@ public class SingleElementPipe<T> extends AbstractPipe<T> {
IPipe<T> pipe = new SingleElementPipe<T>();
sourcePort.setPipe(pipe);
targetPort.setPipe(pipe);
+ sourcePort.setCachedTargetStage(targetPort.getOwningStage());
}
@Override
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java
index ece1f1d074beb50afa9c07048043b854ffc3379e..f55d3257200451fc04f6954a8977cf9d1ff94393 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java
@@ -16,6 +16,7 @@ public class SpScPipe<T> extends AbstractPipe<T> {
IPipe<T> pipe = new SpScPipe<T>(initialCapacity);
sourcePort.setPipe(pipe);
targetPort.setPipe(pipe);
+ sourcePort.setCachedTargetStage(targetPort.getOwningStage());
}
@Override
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/UnorderedGrowablePipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/UnorderedGrowablePipe.java
index 48c2afa17062e810a323c01172fa7d41d344e8a1..729b651e841b1bf5354f2ec8a0fc7212250e0087 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/UnorderedGrowablePipe.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/UnorderedGrowablePipe.java
@@ -46,6 +46,7 @@ public class UnorderedGrowablePipe<T> extends AbstractPipe<T> {
IPipe<T> pipe = new UnorderedGrowablePipe<T>();
sourcePort.setPipe(pipe);
targetPort.setPipe(pipe);
+ sourcePort.setCachedTargetStage(targetPort.getOwningStage());
}
@Override
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/CollectorSink.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/CollectorSink.java
index f72b987ba40ccd347b79cdc1b6cdc2e74181e5e1..f3492e265eb32f4e6560c23f822063703d70a6ce 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/stage/CollectorSink.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/CollectorSink.java
@@ -27,7 +27,7 @@ import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage;
*/
public class CollectorSink<T> extends ConsumerStage<T, Object> {
- private static final int THRESHOLD = 100; // TODO make configurable or use an sysout stage instead
+ private static final int THRESHOLD = 10000; // TODO make configurable or use an sysout stage instead
private final List<T> elements;
diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment15/MethodCallThroughputAnalysis15.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment15/MethodCallThroughputAnalysis15.java
index 0ef9b18b8e687e12b2f693c3afbbe11780baed34..134d80aeb98e967a1d917e154272708aa508da25 100644
--- a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment15/MethodCallThroughputAnalysis15.java
+++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment15/MethodCallThroughputAnalysis15.java
@@ -22,8 +22,9 @@ import teetime.variant.explicitScheduling.examples.throughput.TimestampObject;
import teetime.variant.explicitScheduling.framework.core.Analysis;
import teetime.variant.methodcallWithPorts.framework.core.Pipeline;
import teetime.variant.methodcallWithPorts.framework.core.RunnableStage;
+import teetime.variant.methodcallWithPorts.framework.core.pipe.OrderedGrowableArrayPipe;
+import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
-import teetime.variant.methodcallWithPorts.framework.core.pipe.UnorderedGrowablePipe;
import teetime.variant.methodcallWithPorts.stage.Clock;
import teetime.variant.methodcallWithPorts.stage.CollectorSink;
import teetime.variant.methodcallWithPorts.stage.Delay;
@@ -99,15 +100,15 @@ public class MethodCallThroughputAnalysis15 extends Analysis {
SpScPipe.connect(clock.getOutputPort(), delay.getTimestampTriggerInputPort(), SPSC_INITIAL_CAPACITY);
- UnorderedGrowablePipe.connect(objectProducer.getOutputPort(), startTimestampFilter.getInputPort());
- UnorderedGrowablePipe.connect(startTimestampFilter.getOutputPort(), noopFilters[0].getInputPort());
+ SingleElementPipe.connect(objectProducer.getOutputPort(), startTimestampFilter.getInputPort());
+ SingleElementPipe.connect(startTimestampFilter.getOutputPort(), noopFilters[0].getInputPort());
for (int i = 0; i < noopFilters.length - 1; i++) {
- UnorderedGrowablePipe.connect(noopFilters[i].getOutputPort(), noopFilters[i + 1].getInputPort());
+ SingleElementPipe.connect(noopFilters[i].getOutputPort(), noopFilters[i + 1].getInputPort());
}
- UnorderedGrowablePipe.connect(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort());
- UnorderedGrowablePipe.connect(stopTimestampFilter.getOutputPort(), delay.getInputPort());
+ SingleElementPipe.connect(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort());
+ OrderedGrowableArrayPipe.connect(stopTimestampFilter.getOutputPort(), delay.getInputPort());
- UnorderedGrowablePipe.connect(delay.getOutputPort(), collectorSink.getInputPort());
+ SingleElementPipe.connect(delay.getOutputPort(), collectorSink.getInputPort());
return new RunnableStage(pipeline);
}