From 78182435bb7aaf0e52135aa54c80b8c0599cd088 Mon Sep 17 00:00:00 2001 From: Christian Wulf <chw@informatik.uni-kiel.de> Date: Fri, 4 Jul 2014 09:57:53 +0200 Subject: [PATCH] added generic Signal concept --- .../framework/core/AbstractStage.java | 21 ++++++++++++++ .../framework/core/OutputPort.java | 4 +++ .../framework/core/Pipeline.java | 13 ++++++--- .../framework/core/RunnableStage.java | 9 +++--- .../framework/core/Signal.java | 6 ++++ .../framework/core/StageWithPort.java | 3 +- .../framework/core/pipe/AbstractPipe.java | 22 +++++++------- .../framework/core/pipe/IPipe.java | 9 ++++-- .../framework/core/pipe/IntraThreadPipe.java | 12 ++++++++ .../core/pipe/OrderedGrowableArrayPipe.java | 2 +- .../core/pipe/OrderedGrowablePipe.java | 3 +- .../framework/core/pipe/Pipe.java | 3 +- .../core/pipe/SingleElementPipe.java | 3 +- .../framework/core/pipe/SpScPipe.java | 13 +++++++++ .../core/pipe/UnorderedGrowablePipe.java | 29 ++----------------- .../stage/Distributor.java | 12 ++++---- .../methodcallWithPorts/stage/EndStage.java | 6 ++++ .../methodcallWithPorts/stage/Relay.java | 19 +++++++++--- .../stage/basic/distributor/Distributor.java | 9 +++--- .../MethodCallThroughputAnalysis17.java | 4 ++- 20 files changed, 128 insertions(+), 74 deletions(-) create mode 100644 src/main/java/teetime/variant/methodcallWithPorts/framework/core/Signal.java create mode 100644 src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IntraThreadPipe.java diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java index 7e8e853a..9c3b7905 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java @@ -102,6 +102,27 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> { return this.id; } + /** + * May not be invoked outside of IPipe implementations + */ + @Override + public void onSignal(final Signal signal, final InputPort<?> inputPort) { + switch (signal) { + case FINISHED: + this.onFinished(); + break; + default: + this.logger.warn("Aborted sending signal " + signal + ". Reason: Unknown signal."); + break; + } + + this.getOutputPort().sendSignal(signal); + } + + protected void onFinished() { + // empty default implementation + } + @Override public String toString() { return this.getClass().getName() + ": " + this.id; diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/OutputPort.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/OutputPort.java index 36ce913f..7500e4de 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/OutputPort.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/OutputPort.java @@ -34,4 +34,8 @@ public class OutputPort<T> { this.cachedTargetStage = cachedTargetStage; } + public void sendSignal(final Signal signal) { + this.pipe.setSignal(signal); + } + } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Pipeline.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Pipeline.java index 89c4270e..3a90f164 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Pipeline.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Pipeline.java @@ -61,13 +61,13 @@ public class Pipeline<I, O> implements StageWithPort<I, O> { public void executeWithPorts() { StageWithPort<?, ?> headStage = this.stages[this.firstStageIndex]; - do { - headStage.executeWithPorts(); - } while (headStage.isReschedulable()); + // do { + headStage.executeWithPorts(); + // } while (headStage.isReschedulable()); // headStage.sendFinishedSignalToAllSuccessorStages(); - this.updateRescheduable(headStage); + // this.updateRescheduable(headStage); } private final void updateRescheduable(final StageWithPort<?, ?> stage) { @@ -164,4 +164,9 @@ public class Pipeline<I, O> implements StageWithPort<I, O> { this.lastStage = null; } + @Override + public void onSignal(final Signal signal, final InputPort<?> inputPort) { + throw new IllegalStateException("Should not be used since the signal is directly passed via the first stage's input port."); + } + } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/RunnableStage.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/RunnableStage.java index ab1892e7..62b23637 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/RunnableStage.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/RunnableStage.java @@ -3,12 +3,12 @@ package teetime.variant.methodcallWithPorts.framework.core; import kieker.common.logging.Log; import kieker.common.logging.LogFactory; -public class RunnableStage implements Runnable { +public class RunnableStage<I> implements Runnable { - private final StageWithPort<?, ?> stage; + private final StageWithPort<I, ?> stage; private final Log logger; - public RunnableStage(final StageWithPort<?, ?> stage) { + public RunnableStage(final StageWithPort<I, ?> stage) { this.stage = stage; this.logger = LogFactory.getLog(stage.getClass()); } @@ -22,12 +22,11 @@ public class RunnableStage implements Runnable { this.stage.executeWithPorts(); } while (this.stage.isReschedulable()); - // stage.sendFinishedSignalToAllSuccessorStages(); + this.stage.onSignal(Signal.FINISHED, this.stage.getInputPort()); } catch (RuntimeException e) { this.logger.error("Terminating thread due to the following exception: ", e); throw e; } } - } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Signal.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Signal.java new file mode 100644 index 00000000..0e3eadce --- /dev/null +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Signal.java @@ -0,0 +1,6 @@ +package teetime.variant.methodcallWithPorts.framework.core; + +public enum Signal { + FINISHED + +} diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/StageWithPort.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/StageWithPort.java index 8cea3822..84bc4309 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/StageWithPort.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/StageWithPort.java @@ -1,6 +1,5 @@ package teetime.variant.methodcallWithPorts.framework.core; - public interface StageWithPort<I, O> { InputPort<I> getInputPort(); @@ -20,4 +19,6 @@ public interface StageWithPort<I, O> { void onIsPipelineHead(); void onStart(); + + void onSignal(Signal signal, InputPort<?> inputPort); } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/AbstractPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/AbstractPipe.java index 5522b69b..386bf62b 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/AbstractPipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/AbstractPipe.java @@ -1,23 +1,21 @@ package teetime.variant.methodcallWithPorts.framework.core.pipe; -import java.util.concurrent.atomic.AtomicBoolean; - import teetime.variant.methodcallWithPorts.framework.core.InputPort; public abstract class AbstractPipe<T> implements IPipe<T> { - private final AtomicBoolean closed = new AtomicBoolean(); + // private final AtomicBoolean closed = new AtomicBoolean(); private InputPort<T> targetPort; - @Override - public boolean isClosed() { - return this.closed.get(); - } - - @Override - public void close() { - this.closed.lazySet(true); // lazySet is legal due to our single-writer requirement - } + // @Override + // public boolean isClosed() { + // return this.closed.get(); + // } + // + // @Override + // public void close() { + // this.closed.lazySet(true); // lazySet is legal due to our single-writer requirement + // } @Override public InputPort<T> getTargetPort() { diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IPipe.java index 2d132648..4b94be69 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IPipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IPipe.java @@ -1,6 +1,7 @@ package teetime.variant.methodcallWithPorts.framework.core.pipe; import teetime.variant.methodcallWithPorts.framework.core.InputPort; +import teetime.variant.methodcallWithPorts.framework.core.Signal; public interface IPipe<T> { @@ -14,12 +15,14 @@ public interface IPipe<T> { T readLast(); - void close(); - - boolean isClosed(); + // void close(); + // + // boolean isClosed(); InputPort<T> getTargetPort(); void setTargetPort(InputPort<T> targetPort); + void setSignal(Signal signal); + } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IntraThreadPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IntraThreadPipe.java new file mode 100644 index 00000000..997665e1 --- /dev/null +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IntraThreadPipe.java @@ -0,0 +1,12 @@ +package teetime.variant.methodcallWithPorts.framework.core.pipe; + +import teetime.variant.methodcallWithPorts.framework.core.Signal; + +public abstract class IntraThreadPipe<T> extends AbstractPipe<T> { + + @Override + public void setSignal(final Signal signal) { + this.getTargetPort().getOwningStage().onSignal(signal, this.getTargetPort()); + } + +} diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowableArrayPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowableArrayPipe.java index ec4fa425..fc3d224a 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowableArrayPipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowableArrayPipe.java @@ -4,7 +4,7 @@ import teetime.util.concurrent.workstealing.CircularArray; import teetime.variant.methodcallWithPorts.framework.core.InputPort; import teetime.variant.methodcallWithPorts.framework.core.OutputPort; -public class OrderedGrowableArrayPipe<T> extends AbstractPipe<T> { +public class OrderedGrowableArrayPipe<T> extends IntraThreadPipe<T> { private CircularArray<T> elements; private int head; diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowablePipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowablePipe.java index d7eb9a01..eb92d943 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowablePipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowablePipe.java @@ -5,7 +5,7 @@ import java.util.LinkedList; import teetime.variant.methodcallWithPorts.framework.core.InputPort; import teetime.variant.methodcallWithPorts.framework.core.OutputPort; -public class OrderedGrowablePipe<T> extends AbstractPipe<T> { +public class OrderedGrowablePipe<T> extends IntraThreadPipe<T> { private LinkedList<T> elements; @@ -48,5 +48,4 @@ public class OrderedGrowablePipe<T> extends AbstractPipe<T> { public int size() { return this.elements.size(); } - } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/Pipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/Pipe.java index b89f27ad..eda193f3 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/Pipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/Pipe.java @@ -4,7 +4,7 @@ import teetime.util.list.CommittableResizableArrayQueue; import teetime.variant.methodcallWithPorts.framework.core.InputPort; import teetime.variant.methodcallWithPorts.framework.core.OutputPort; -public class Pipe<T> extends AbstractPipe<T> { +public class Pipe<T> extends IntraThreadPipe<T> { private final CommittableResizableArrayQueue<T> elements = new CommittableResizableArrayQueue<T>(null, 4); @@ -66,5 +66,4 @@ public class Pipe<T> extends AbstractPipe<T> { public int size() { return this.elements.size(); } - } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipe.java index c9edc536..c3581c1a 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipe.java @@ -3,8 +3,7 @@ package teetime.variant.methodcallWithPorts.framework.core.pipe; import teetime.variant.methodcallWithPorts.framework.core.InputPort; import teetime.variant.methodcallWithPorts.framework.core.OutputPort; -//public class SingleElementPipe<T> implements IPipe<T> { -public class SingleElementPipe<T> extends AbstractPipe<T> { +public class SingleElementPipe<T> extends IntraThreadPipe<T> { private T element; diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java index 9570f8fd..ed380b19 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java @@ -1,13 +1,17 @@ package teetime.variant.methodcallWithPorts.framework.core.pipe; +import java.util.concurrent.atomic.AtomicReference; + import teetime.util.concurrent.spsc.FFBufferOrdered3; import teetime.variant.methodcallWithPorts.framework.core.InputPort; import teetime.variant.methodcallWithPorts.framework.core.OutputPort; +import teetime.variant.methodcallWithPorts.framework.core.Signal; public class SpScPipe<T> extends AbstractPipe<T> { private final FFBufferOrdered3<T> queue; private int maxSize; + private final AtomicReference<Signal> signal = new AtomicReference<Signal>(); private SpScPipe(final int capacity) { this.queue = new FFBufferOrdered3<T>(capacity); @@ -53,4 +57,13 @@ public class SpScPipe<T> extends AbstractPipe<T> { return this.maxSize; } + @Override + public void setSignal(final Signal signal) { + this.signal.lazySet(signal); // lazySet is legal due to our single-writer requirement + } + + public Signal getSignal() { + return this.signal.get(); + } + } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/UnorderedGrowablePipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/UnorderedGrowablePipe.java index 729b651e..1ead0076 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/UnorderedGrowablePipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/UnorderedGrowablePipe.java @@ -3,33 +3,8 @@ package teetime.variant.methodcallWithPorts.framework.core.pipe; import teetime.variant.methodcallWithPorts.framework.core.InputPort; import teetime.variant.methodcallWithPorts.framework.core.OutputPort; -public class UnorderedGrowablePipe<T> extends AbstractPipe<T> { - - // private static final class ArrayWrapper2<T> { - // - // private final T[] elements; - // - // // private int lastFreeIndex; - // - // @SuppressWarnings("unchecked") - // public ArrayWrapper2(final int initialCapacity) { - // super(); - // this.elements = (T[]) new Object[initialCapacity]; - // } - // - // public final T get(final int index) { - // return this.elements[index]; - // } - // - // public final void put(final int index, final T element) { - // this.elements[index] = element; - // } - // - // public final int getCapacity() { - // return this.elements.length; - // } - // - // } +public class UnorderedGrowablePipe<T> extends IntraThreadPipe<T> { + private final int MIN_CAPACITY; private T[] elements; diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/Distributor.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/Distributor.java index e60b19e1..663b4d47 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/Distributor.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/Distributor.java @@ -36,12 +36,12 @@ public final class Distributor<T> extends AbstractStage<T, T> { @Override public void onIsPipelineHead() { - for (OutputPort<?> op : this.outputPorts) { - op.getPipe().close(); - if (this.logger.isDebugEnabled()) { - this.logger.debug("End signal sent, size: " + op.getPipe().size()); - } - } + // for (OutputPort<?> op : this.outputPorts) { + // op.getPipe().close(); + // if (this.logger.isDebugEnabled()) { + // this.logger.debug("End signal sent, size: " + op.getPipe().size()); + // } + // } // for (OutputPort<?> op : this.outputPorts) { // op.pipe = null; diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/EndStage.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/EndStage.java index 8019b0d8..62a092f5 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/EndStage.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/EndStage.java @@ -6,6 +6,7 @@ import java.util.List; import teetime.util.ConstructorClosure; import teetime.variant.methodcallWithPorts.framework.core.InputPort; import teetime.variant.methodcallWithPorts.framework.core.OutputPort; +import teetime.variant.methodcallWithPorts.framework.core.Signal; import teetime.variant.methodcallWithPorts.framework.core.StageWithPort; public class EndStage<T> implements StageWithPort<T, T> { @@ -64,4 +65,9 @@ public class EndStage<T> implements StageWithPort<T, T> { } + @Override + public void onSignal(final Signal signal, final InputPort<?> inputPort) { + // do nothing + } + } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/Relay.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/Relay.java index 26cb2acd..55f3591e 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/Relay.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/Relay.java @@ -2,9 +2,13 @@ package teetime.variant.methodcallWithPorts.stage; import teetime.util.list.CommittableQueue; import teetime.variant.methodcallWithPorts.framework.core.AbstractStage; +import teetime.variant.methodcallWithPorts.framework.core.Signal; +import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe; public class Relay<T> extends AbstractStage<T, T> { + private SpScPipe<T> inputPipe; + public Relay() { this.setReschedulable(true); } @@ -13,7 +17,8 @@ public class Relay<T> extends AbstractStage<T, T> { public void executeWithPorts() { T element = this.getInputPort().receive(); if (null == element) { - if (this.getInputPort().getPipe().isClosed()) { + // if (this.getInputPort().getPipe().isClosed()) { + if (this.inputPipe.getSignal() == Signal.FINISHED) { this.setReschedulable(false); assert 0 == this.getInputPort().getPipe().size(); } @@ -23,11 +28,17 @@ public class Relay<T> extends AbstractStage<T, T> { this.send(element); } + @Override + public void onStart() { + this.inputPipe = (SpScPipe<T>) this.getInputPort().getPipe(); + super.onStart(); + } + @Override public void onIsPipelineHead() { - if (this.getInputPort().getPipe().isClosed()) { - this.setReschedulable(false); - } + // if (this.getInputPort().getPipe().isClosed()) { + // this.setReschedulable(false); + // } } @Override diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/distributor/Distributor.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/distributor/Distributor.java index 8fc677ae..46454a89 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/distributor/Distributor.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/distributor/Distributor.java @@ -54,10 +54,10 @@ public class Distributor<T> extends AbstractStage<T, T> { @Override public void onIsPipelineHead() { - for (OutputPort<T> op : this.outputPortList) { - op.getPipe().close(); - System.out.println("End signal sent, size: " + op.getPipe().size()); - } + // for (OutputPort<T> op : this.outputPortList) { + // op.getPipe().close(); + // System.out.println("End signal sent, size: " + op.getPipe().size()); + // } } @Override @@ -83,4 +83,5 @@ public class Distributor<T> extends AbstractStage<T, T> { this.execute5(element); } + } diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment17/MethodCallThroughputAnalysis17.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment17/MethodCallThroughputAnalysis17.java index deafa63b..f9420908 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment17/MethodCallThroughputAnalysis17.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment17/MethodCallThroughputAnalysis17.java @@ -24,6 +24,7 @@ import teetime.variant.explicitScheduling.examples.throughput.TimestampObject; import teetime.variant.explicitScheduling.framework.core.Analysis; import teetime.variant.methodcallWithPorts.framework.core.Pipeline; import teetime.variant.methodcallWithPorts.framework.core.RunnableStage; +import teetime.variant.methodcallWithPorts.framework.core.Signal; import teetime.variant.methodcallWithPorts.framework.core.StageWithPort; import teetime.variant.methodcallWithPorts.framework.core.pipe.IPipe; import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe; @@ -159,7 +160,8 @@ public class MethodCallThroughputAnalysis17 extends Analysis { for (int i = 0; i < this.numInputObjects; i++) { startPipe.add(this.inputObjectCreator.create()); } - startPipe.close(); + // startPipe.close(); + startPipe.setSignal(Signal.FINISHED); UnorderedGrowablePipe.connect(relay.getOutputPort(), startTimestampFilter.getInputPort()); -- GitLab