diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java index 7e8e853a882a91c7e46f03b245fc13c2226876a7..9c3b790518da813762d492e4ef13c56fbfdd61e6 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java @@ -102,6 +102,27 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> { return this.id; } + /** + * May not be invoked outside of IPipe implementations + */ + @Override + public void onSignal(final Signal signal, final InputPort<?> inputPort) { + switch (signal) { + case FINISHED: + this.onFinished(); + break; + default: + this.logger.warn("Aborted sending signal " + signal + ". Reason: Unknown signal."); + break; + } + + this.getOutputPort().sendSignal(signal); + } + + protected void onFinished() { + // empty default implementation + } + @Override public String toString() { return this.getClass().getName() + ": " + this.id; diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/OutputPort.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/OutputPort.java index 36ce913f749d1a1723189fd76a5c65d81d9c9ef1..7500e4de2a40f69122046c664ead4c050133c6db 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/OutputPort.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/OutputPort.java @@ -34,4 +34,8 @@ public class OutputPort<T> { this.cachedTargetStage = cachedTargetStage; } + public void sendSignal(final Signal signal) { + this.pipe.setSignal(signal); + } + } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Pipeline.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Pipeline.java index 89c4270e55110595f12e201dd2e347c45b54ee1d..3a90f164165a7608023202bb7f726c16d0cfd17c 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Pipeline.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Pipeline.java @@ -61,13 +61,13 @@ public class Pipeline<I, O> implements StageWithPort<I, O> { public void executeWithPorts() { StageWithPort<?, ?> headStage = this.stages[this.firstStageIndex]; - do { - headStage.executeWithPorts(); - } while (headStage.isReschedulable()); + // do { + headStage.executeWithPorts(); + // } while (headStage.isReschedulable()); // headStage.sendFinishedSignalToAllSuccessorStages(); - this.updateRescheduable(headStage); + // this.updateRescheduable(headStage); } private final void updateRescheduable(final StageWithPort<?, ?> stage) { @@ -164,4 +164,9 @@ public class Pipeline<I, O> implements StageWithPort<I, O> { this.lastStage = null; } + @Override + public void onSignal(final Signal signal, final InputPort<?> inputPort) { + throw new IllegalStateException("Should not be used since the signal is directly passed via the first stage's input port."); + } + } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/RunnableStage.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/RunnableStage.java index ab1892e71c3783d5160cc81f9b72b20259717b5c..62b236375f1e4a784eb3c440b8e486130cd71469 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/RunnableStage.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/RunnableStage.java @@ -3,12 +3,12 @@ package teetime.variant.methodcallWithPorts.framework.core; import kieker.common.logging.Log; import kieker.common.logging.LogFactory; -public class RunnableStage implements Runnable { +public class RunnableStage<I> implements Runnable { - private final StageWithPort<?, ?> stage; + private final StageWithPort<I, ?> stage; private final Log logger; - public RunnableStage(final StageWithPort<?, ?> stage) { + public RunnableStage(final StageWithPort<I, ?> stage) { this.stage = stage; this.logger = LogFactory.getLog(stage.getClass()); } @@ -22,12 +22,11 @@ public class RunnableStage implements Runnable { this.stage.executeWithPorts(); } while (this.stage.isReschedulable()); - // stage.sendFinishedSignalToAllSuccessorStages(); + this.stage.onSignal(Signal.FINISHED, this.stage.getInputPort()); } catch (RuntimeException e) { this.logger.error("Terminating thread due to the following exception: ", e); throw e; } } - } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Signal.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Signal.java new file mode 100644 index 0000000000000000000000000000000000000000..0e3eadce9f1065d4ed113c94ce300dea25c35eb0 --- /dev/null +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Signal.java @@ -0,0 +1,6 @@ +package teetime.variant.methodcallWithPorts.framework.core; + +public enum Signal { + FINISHED + +} diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/StageWithPort.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/StageWithPort.java index 8cea382223be9d6810eacb8ff3da4fd94145158a..84bc4309cfd5c7e8a81ce75802cce903f51c75b6 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/StageWithPort.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/StageWithPort.java @@ -1,6 +1,5 @@ package teetime.variant.methodcallWithPorts.framework.core; - public interface StageWithPort<I, O> { InputPort<I> getInputPort(); @@ -20,4 +19,6 @@ public interface StageWithPort<I, O> { void onIsPipelineHead(); void onStart(); + + void onSignal(Signal signal, InputPort<?> inputPort); } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/AbstractPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/AbstractPipe.java index 5522b69b96c9ea048095d9da824dfbf64294f452..386bf62b2ef8596020bad8e1c3250dd3f294effb 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/AbstractPipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/AbstractPipe.java @@ -1,23 +1,21 @@ package teetime.variant.methodcallWithPorts.framework.core.pipe; -import java.util.concurrent.atomic.AtomicBoolean; - import teetime.variant.methodcallWithPorts.framework.core.InputPort; public abstract class AbstractPipe<T> implements IPipe<T> { - private final AtomicBoolean closed = new AtomicBoolean(); + // private final AtomicBoolean closed = new AtomicBoolean(); private InputPort<T> targetPort; - @Override - public boolean isClosed() { - return this.closed.get(); - } - - @Override - public void close() { - this.closed.lazySet(true); // lazySet is legal due to our single-writer requirement - } + // @Override + // public boolean isClosed() { + // return this.closed.get(); + // } + // + // @Override + // public void close() { + // this.closed.lazySet(true); // lazySet is legal due to our single-writer requirement + // } @Override public InputPort<T> getTargetPort() { diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IPipe.java index 2d13264878c4d08a3d257804927ad20a48784ea4..4b94be69c33616979c6dc78e6a8d1a482deb6dc9 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IPipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IPipe.java @@ -1,6 +1,7 @@ package teetime.variant.methodcallWithPorts.framework.core.pipe; import teetime.variant.methodcallWithPorts.framework.core.InputPort; +import teetime.variant.methodcallWithPorts.framework.core.Signal; public interface IPipe<T> { @@ -14,12 +15,14 @@ public interface IPipe<T> { T readLast(); - void close(); - - boolean isClosed(); + // void close(); + // + // boolean isClosed(); InputPort<T> getTargetPort(); void setTargetPort(InputPort<T> targetPort); + void setSignal(Signal signal); + } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IntraThreadPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IntraThreadPipe.java new file mode 100644 index 0000000000000000000000000000000000000000..997665e1c82faa7c20ec867de6adabcb3c6dbef9 --- /dev/null +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IntraThreadPipe.java @@ -0,0 +1,12 @@ +package teetime.variant.methodcallWithPorts.framework.core.pipe; + +import teetime.variant.methodcallWithPorts.framework.core.Signal; + +public abstract class IntraThreadPipe<T> extends AbstractPipe<T> { + + @Override + public void setSignal(final Signal signal) { + this.getTargetPort().getOwningStage().onSignal(signal, this.getTargetPort()); + } + +} diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowableArrayPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowableArrayPipe.java index ec4fa42520927ad92c3217c3f6a137e3f61ae006..fc3d224a0ad8a1654171b7cf974f4d4a74e2677b 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowableArrayPipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowableArrayPipe.java @@ -4,7 +4,7 @@ import teetime.util.concurrent.workstealing.CircularArray; import teetime.variant.methodcallWithPorts.framework.core.InputPort; import teetime.variant.methodcallWithPorts.framework.core.OutputPort; -public class OrderedGrowableArrayPipe<T> extends AbstractPipe<T> { +public class OrderedGrowableArrayPipe<T> extends IntraThreadPipe<T> { private CircularArray<T> elements; private int head; diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowablePipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowablePipe.java index d7eb9a01800dad2f740f9193e9284acb9d916d9d..eb92d9433e9745a1d29bb46788c0d29e497079d8 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowablePipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowablePipe.java @@ -5,7 +5,7 @@ import java.util.LinkedList; import teetime.variant.methodcallWithPorts.framework.core.InputPort; import teetime.variant.methodcallWithPorts.framework.core.OutputPort; -public class OrderedGrowablePipe<T> extends AbstractPipe<T> { +public class OrderedGrowablePipe<T> extends IntraThreadPipe<T> { private LinkedList<T> elements; @@ -48,5 +48,4 @@ public class OrderedGrowablePipe<T> extends AbstractPipe<T> { public int size() { return this.elements.size(); } - } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/Pipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/Pipe.java index b89f27ad4d94ef7e868b7fdfe44acf1e09f3d69a..eda193f371596c4b95515dd9b70277a3fd28ccf5 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/Pipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/Pipe.java @@ -4,7 +4,7 @@ import teetime.util.list.CommittableResizableArrayQueue; import teetime.variant.methodcallWithPorts.framework.core.InputPort; import teetime.variant.methodcallWithPorts.framework.core.OutputPort; -public class Pipe<T> extends AbstractPipe<T> { +public class Pipe<T> extends IntraThreadPipe<T> { private final CommittableResizableArrayQueue<T> elements = new CommittableResizableArrayQueue<T>(null, 4); @@ -66,5 +66,4 @@ public class Pipe<T> extends AbstractPipe<T> { public int size() { return this.elements.size(); } - } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipe.java index c9edc53621e5fae9dc91636e20c312694f39b4ba..c3581c1a6369bdf9841b8545c55f1f67b01fe3d7 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipe.java @@ -3,8 +3,7 @@ package teetime.variant.methodcallWithPorts.framework.core.pipe; import teetime.variant.methodcallWithPorts.framework.core.InputPort; import teetime.variant.methodcallWithPorts.framework.core.OutputPort; -//public class SingleElementPipe<T> implements IPipe<T> { -public class SingleElementPipe<T> extends AbstractPipe<T> { +public class SingleElementPipe<T> extends IntraThreadPipe<T> { private T element; diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java index 9570f8fd338791e9efaf816812d35054e6197fc8..ed380b19e855b18b643fa06eee2ca5c78f6b2e42 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java @@ -1,13 +1,17 @@ package teetime.variant.methodcallWithPorts.framework.core.pipe; +import java.util.concurrent.atomic.AtomicReference; + import teetime.util.concurrent.spsc.FFBufferOrdered3; import teetime.variant.methodcallWithPorts.framework.core.InputPort; import teetime.variant.methodcallWithPorts.framework.core.OutputPort; +import teetime.variant.methodcallWithPorts.framework.core.Signal; public class SpScPipe<T> extends AbstractPipe<T> { private final FFBufferOrdered3<T> queue; private int maxSize; + private final AtomicReference<Signal> signal = new AtomicReference<Signal>(); private SpScPipe(final int capacity) { this.queue = new FFBufferOrdered3<T>(capacity); @@ -53,4 +57,13 @@ public class SpScPipe<T> extends AbstractPipe<T> { return this.maxSize; } + @Override + public void setSignal(final Signal signal) { + this.signal.lazySet(signal); // lazySet is legal due to our single-writer requirement + } + + public Signal getSignal() { + return this.signal.get(); + } + } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/UnorderedGrowablePipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/UnorderedGrowablePipe.java index 729b651e841b1bf5354f2ec8a0fc7212250e0087..1ead00769b9f07daca2e04b70ead6da3dfbe9bb8 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/UnorderedGrowablePipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/UnorderedGrowablePipe.java @@ -3,33 +3,8 @@ package teetime.variant.methodcallWithPorts.framework.core.pipe; import teetime.variant.methodcallWithPorts.framework.core.InputPort; import teetime.variant.methodcallWithPorts.framework.core.OutputPort; -public class UnorderedGrowablePipe<T> extends AbstractPipe<T> { - - // private static final class ArrayWrapper2<T> { - // - // private final T[] elements; - // - // // private int lastFreeIndex; - // - // @SuppressWarnings("unchecked") - // public ArrayWrapper2(final int initialCapacity) { - // super(); - // this.elements = (T[]) new Object[initialCapacity]; - // } - // - // public final T get(final int index) { - // return this.elements[index]; - // } - // - // public final void put(final int index, final T element) { - // this.elements[index] = element; - // } - // - // public final int getCapacity() { - // return this.elements.length; - // } - // - // } +public class UnorderedGrowablePipe<T> extends IntraThreadPipe<T> { + private final int MIN_CAPACITY; private T[] elements; diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/Distributor.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/Distributor.java index e60b19e1221d08a8f5219689638e0a27cdc5950b..663b4d4703d01c56ea4daea64dd9910061284d99 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/Distributor.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/Distributor.java @@ -36,12 +36,12 @@ public final class Distributor<T> extends AbstractStage<T, T> { @Override public void onIsPipelineHead() { - for (OutputPort<?> op : this.outputPorts) { - op.getPipe().close(); - if (this.logger.isDebugEnabled()) { - this.logger.debug("End signal sent, size: " + op.getPipe().size()); - } - } + // for (OutputPort<?> op : this.outputPorts) { + // op.getPipe().close(); + // if (this.logger.isDebugEnabled()) { + // this.logger.debug("End signal sent, size: " + op.getPipe().size()); + // } + // } // for (OutputPort<?> op : this.outputPorts) { // op.pipe = null; diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/EndStage.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/EndStage.java index 8019b0d849654206d701b4c6ae6b47c1a40f7c8a..62a092f5d569b48a8c14b86a285db56b113415d9 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/EndStage.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/EndStage.java @@ -6,6 +6,7 @@ import java.util.List; import teetime.util.ConstructorClosure; import teetime.variant.methodcallWithPorts.framework.core.InputPort; import teetime.variant.methodcallWithPorts.framework.core.OutputPort; +import teetime.variant.methodcallWithPorts.framework.core.Signal; import teetime.variant.methodcallWithPorts.framework.core.StageWithPort; public class EndStage<T> implements StageWithPort<T, T> { @@ -64,4 +65,9 @@ public class EndStage<T> implements StageWithPort<T, T> { } + @Override + public void onSignal(final Signal signal, final InputPort<?> inputPort) { + // do nothing + } + } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/Relay.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/Relay.java index 26cb2acd8960a9a09defba578749bc139cf44184..55f3591ea2b33a2ed4ee01ecd2cfc98f5e2186fc 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/Relay.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/Relay.java @@ -2,9 +2,13 @@ package teetime.variant.methodcallWithPorts.stage; import teetime.util.list.CommittableQueue; import teetime.variant.methodcallWithPorts.framework.core.AbstractStage; +import teetime.variant.methodcallWithPorts.framework.core.Signal; +import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe; public class Relay<T> extends AbstractStage<T, T> { + private SpScPipe<T> inputPipe; + public Relay() { this.setReschedulable(true); } @@ -13,7 +17,8 @@ public class Relay<T> extends AbstractStage<T, T> { public void executeWithPorts() { T element = this.getInputPort().receive(); if (null == element) { - if (this.getInputPort().getPipe().isClosed()) { + // if (this.getInputPort().getPipe().isClosed()) { + if (this.inputPipe.getSignal() == Signal.FINISHED) { this.setReschedulable(false); assert 0 == this.getInputPort().getPipe().size(); } @@ -23,11 +28,17 @@ public class Relay<T> extends AbstractStage<T, T> { this.send(element); } + @Override + public void onStart() { + this.inputPipe = (SpScPipe<T>) this.getInputPort().getPipe(); + super.onStart(); + } + @Override public void onIsPipelineHead() { - if (this.getInputPort().getPipe().isClosed()) { - this.setReschedulable(false); - } + // if (this.getInputPort().getPipe().isClosed()) { + // this.setReschedulable(false); + // } } @Override diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/distributor/Distributor.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/distributor/Distributor.java index 8fc677ae516f7c3f3cbb2e2fa0e3f9f71f324250..46454a89a1e811e80a4d041d453ba7015ea1f1c7 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/distributor/Distributor.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/distributor/Distributor.java @@ -54,10 +54,10 @@ public class Distributor<T> extends AbstractStage<T, T> { @Override public void onIsPipelineHead() { - for (OutputPort<T> op : this.outputPortList) { - op.getPipe().close(); - System.out.println("End signal sent, size: " + op.getPipe().size()); - } + // for (OutputPort<T> op : this.outputPortList) { + // op.getPipe().close(); + // System.out.println("End signal sent, size: " + op.getPipe().size()); + // } } @Override @@ -83,4 +83,5 @@ public class Distributor<T> extends AbstractStage<T, T> { this.execute5(element); } + } diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment17/MethodCallThroughputAnalysis17.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment17/MethodCallThroughputAnalysis17.java index deafa63b9e4cf293c9e0eed0f57d5ceafb87c7aa..f942090813143f9cf83defb74ed16d6e794d3790 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment17/MethodCallThroughputAnalysis17.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment17/MethodCallThroughputAnalysis17.java @@ -24,6 +24,7 @@ import teetime.variant.explicitScheduling.examples.throughput.TimestampObject; import teetime.variant.explicitScheduling.framework.core.Analysis; import teetime.variant.methodcallWithPorts.framework.core.Pipeline; import teetime.variant.methodcallWithPorts.framework.core.RunnableStage; +import teetime.variant.methodcallWithPorts.framework.core.Signal; import teetime.variant.methodcallWithPorts.framework.core.StageWithPort; import teetime.variant.methodcallWithPorts.framework.core.pipe.IPipe; import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe; @@ -159,7 +160,8 @@ public class MethodCallThroughputAnalysis17 extends Analysis { for (int i = 0; i < this.numInputObjects; i++) { startPipe.add(this.inputObjectCreator.create()); } - startPipe.close(); + // startPipe.close(); + startPipe.setSignal(Signal.FINISHED); UnorderedGrowablePipe.connect(relay.getOutputPort(), startTimestampFilter.getInputPort());