Skip to content
Snippets Groups Projects
Commit e53a19dc authored by Christian Wulf's avatar Christian Wulf
Browse files

added generic Signal concept

parent 707ed229
Branches
Tags
No related merge requests found
Showing
with 128 additions and 74 deletions
...@@ -102,6 +102,27 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> { ...@@ -102,6 +102,27 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> {
return this.id; return this.id;
} }
/**
* May not be invoked outside of IPipe implementations
*/
@Override
public void onSignal(final Signal signal, final InputPort<?> inputPort) {
switch (signal) {
case FINISHED:
this.onFinished();
break;
default:
this.logger.warn("Aborted sending signal " + signal + ". Reason: Unknown signal.");
break;
}
this.getOutputPort().sendSignal(signal);
}
protected void onFinished() {
// empty default implementation
}
@Override @Override
public String toString() { public String toString() {
return this.getClass().getName() + ": " + this.id; return this.getClass().getName() + ": " + this.id;
......
...@@ -34,4 +34,8 @@ public class OutputPort<T> { ...@@ -34,4 +34,8 @@ public class OutputPort<T> {
this.cachedTargetStage = cachedTargetStage; this.cachedTargetStage = cachedTargetStage;
} }
public void sendSignal(final Signal signal) {
this.pipe.setSignal(signal);
}
} }
...@@ -61,13 +61,13 @@ public class Pipeline<I, O> implements StageWithPort<I, O> { ...@@ -61,13 +61,13 @@ public class Pipeline<I, O> implements StageWithPort<I, O> {
public void executeWithPorts() { public void executeWithPorts() {
StageWithPort<?, ?> headStage = this.stages[this.firstStageIndex]; StageWithPort<?, ?> headStage = this.stages[this.firstStageIndex];
do { // do {
headStage.executeWithPorts(); headStage.executeWithPorts();
} while (headStage.isReschedulable()); // } while (headStage.isReschedulable());
// headStage.sendFinishedSignalToAllSuccessorStages(); // headStage.sendFinishedSignalToAllSuccessorStages();
this.updateRescheduable(headStage); // this.updateRescheduable(headStage);
} }
private final void updateRescheduable(final StageWithPort<?, ?> stage) { private final void updateRescheduable(final StageWithPort<?, ?> stage) {
...@@ -164,4 +164,9 @@ public class Pipeline<I, O> implements StageWithPort<I, O> { ...@@ -164,4 +164,9 @@ public class Pipeline<I, O> implements StageWithPort<I, O> {
this.lastStage = null; this.lastStage = null;
} }
@Override
public void onSignal(final Signal signal, final InputPort<?> inputPort) {
throw new IllegalStateException("Should not be used since the signal is directly passed via the first stage's input port.");
}
} }
...@@ -3,12 +3,12 @@ package teetime.variant.methodcallWithPorts.framework.core; ...@@ -3,12 +3,12 @@ package teetime.variant.methodcallWithPorts.framework.core;
import kieker.common.logging.Log; import kieker.common.logging.Log;
import kieker.common.logging.LogFactory; import kieker.common.logging.LogFactory;
public class RunnableStage implements Runnable { public class RunnableStage<I> implements Runnable {
private final StageWithPort<?, ?> stage; private final StageWithPort<I, ?> stage;
private final Log logger; private final Log logger;
public RunnableStage(final StageWithPort<?, ?> stage) { public RunnableStage(final StageWithPort<I, ?> stage) {
this.stage = stage; this.stage = stage;
this.logger = LogFactory.getLog(stage.getClass()); this.logger = LogFactory.getLog(stage.getClass());
} }
...@@ -22,12 +22,11 @@ public class RunnableStage implements Runnable { ...@@ -22,12 +22,11 @@ public class RunnableStage implements Runnable {
this.stage.executeWithPorts(); this.stage.executeWithPorts();
} while (this.stage.isReschedulable()); } while (this.stage.isReschedulable());
// stage.sendFinishedSignalToAllSuccessorStages(); this.stage.onSignal(Signal.FINISHED, this.stage.getInputPort());
} catch (RuntimeException e) { } catch (RuntimeException e) {
this.logger.error("Terminating thread due to the following exception: ", e); this.logger.error("Terminating thread due to the following exception: ", e);
throw e; throw e;
} }
} }
} }
package teetime.variant.methodcallWithPorts.framework.core;
public enum Signal {
FINISHED
}
package teetime.variant.methodcallWithPorts.framework.core; package teetime.variant.methodcallWithPorts.framework.core;
public interface StageWithPort<I, O> { public interface StageWithPort<I, O> {
InputPort<I> getInputPort(); InputPort<I> getInputPort();
...@@ -20,4 +19,6 @@ public interface StageWithPort<I, O> { ...@@ -20,4 +19,6 @@ public interface StageWithPort<I, O> {
void onIsPipelineHead(); void onIsPipelineHead();
void onStart(); void onStart();
void onSignal(Signal signal, InputPort<?> inputPort);
} }
package teetime.variant.methodcallWithPorts.framework.core.pipe; package teetime.variant.methodcallWithPorts.framework.core.pipe;
import java.util.concurrent.atomic.AtomicBoolean;
import teetime.variant.methodcallWithPorts.framework.core.InputPort; import teetime.variant.methodcallWithPorts.framework.core.InputPort;
public abstract class AbstractPipe<T> implements IPipe<T> { public abstract class AbstractPipe<T> implements IPipe<T> {
private final AtomicBoolean closed = new AtomicBoolean(); // private final AtomicBoolean closed = new AtomicBoolean();
private InputPort<T> targetPort; private InputPort<T> targetPort;
@Override // @Override
public boolean isClosed() { // public boolean isClosed() {
return this.closed.get(); // return this.closed.get();
} // }
//
@Override // @Override
public void close() { // public void close() {
this.closed.lazySet(true); // lazySet is legal due to our single-writer requirement // this.closed.lazySet(true); // lazySet is legal due to our single-writer requirement
} // }
@Override @Override
public InputPort<T> getTargetPort() { public InputPort<T> getTargetPort() {
......
package teetime.variant.methodcallWithPorts.framework.core.pipe; package teetime.variant.methodcallWithPorts.framework.core.pipe;
import teetime.variant.methodcallWithPorts.framework.core.InputPort; import teetime.variant.methodcallWithPorts.framework.core.InputPort;
import teetime.variant.methodcallWithPorts.framework.core.Signal;
public interface IPipe<T> { public interface IPipe<T> {
...@@ -14,12 +15,14 @@ public interface IPipe<T> { ...@@ -14,12 +15,14 @@ public interface IPipe<T> {
T readLast(); T readLast();
void close(); // void close();
//
boolean isClosed(); // boolean isClosed();
InputPort<T> getTargetPort(); InputPort<T> getTargetPort();
void setTargetPort(InputPort<T> targetPort); void setTargetPort(InputPort<T> targetPort);
void setSignal(Signal signal);
} }
package teetime.variant.methodcallWithPorts.framework.core.pipe;
import teetime.variant.methodcallWithPorts.framework.core.Signal;
public abstract class IntraThreadPipe<T> extends AbstractPipe<T> {
@Override
public void setSignal(final Signal signal) {
this.getTargetPort().getOwningStage().onSignal(signal, this.getTargetPort());
}
}
...@@ -4,7 +4,7 @@ import teetime.util.concurrent.workstealing.CircularArray; ...@@ -4,7 +4,7 @@ import teetime.util.concurrent.workstealing.CircularArray;
import teetime.variant.methodcallWithPorts.framework.core.InputPort; import teetime.variant.methodcallWithPorts.framework.core.InputPort;
import teetime.variant.methodcallWithPorts.framework.core.OutputPort; import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
public class OrderedGrowableArrayPipe<T> extends AbstractPipe<T> { public class OrderedGrowableArrayPipe<T> extends IntraThreadPipe<T> {
private CircularArray<T> elements; private CircularArray<T> elements;
private int head; private int head;
......
...@@ -5,7 +5,7 @@ import java.util.LinkedList; ...@@ -5,7 +5,7 @@ import java.util.LinkedList;
import teetime.variant.methodcallWithPorts.framework.core.InputPort; import teetime.variant.methodcallWithPorts.framework.core.InputPort;
import teetime.variant.methodcallWithPorts.framework.core.OutputPort; import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
public class OrderedGrowablePipe<T> extends AbstractPipe<T> { public class OrderedGrowablePipe<T> extends IntraThreadPipe<T> {
private LinkedList<T> elements; private LinkedList<T> elements;
...@@ -48,5 +48,4 @@ public class OrderedGrowablePipe<T> extends AbstractPipe<T> { ...@@ -48,5 +48,4 @@ public class OrderedGrowablePipe<T> extends AbstractPipe<T> {
public int size() { public int size() {
return this.elements.size(); return this.elements.size();
} }
} }
...@@ -4,7 +4,7 @@ import teetime.util.list.CommittableResizableArrayQueue; ...@@ -4,7 +4,7 @@ import teetime.util.list.CommittableResizableArrayQueue;
import teetime.variant.methodcallWithPorts.framework.core.InputPort; import teetime.variant.methodcallWithPorts.framework.core.InputPort;
import teetime.variant.methodcallWithPorts.framework.core.OutputPort; import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
public class Pipe<T> extends AbstractPipe<T> { public class Pipe<T> extends IntraThreadPipe<T> {
private final CommittableResizableArrayQueue<T> elements = new CommittableResizableArrayQueue<T>(null, 4); private final CommittableResizableArrayQueue<T> elements = new CommittableResizableArrayQueue<T>(null, 4);
...@@ -66,5 +66,4 @@ public class Pipe<T> extends AbstractPipe<T> { ...@@ -66,5 +66,4 @@ public class Pipe<T> extends AbstractPipe<T> {
public int size() { public int size() {
return this.elements.size(); return this.elements.size();
} }
} }
...@@ -3,8 +3,7 @@ package teetime.variant.methodcallWithPorts.framework.core.pipe; ...@@ -3,8 +3,7 @@ package teetime.variant.methodcallWithPorts.framework.core.pipe;
import teetime.variant.methodcallWithPorts.framework.core.InputPort; import teetime.variant.methodcallWithPorts.framework.core.InputPort;
import teetime.variant.methodcallWithPorts.framework.core.OutputPort; import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
//public class SingleElementPipe<T> implements IPipe<T> { public class SingleElementPipe<T> extends IntraThreadPipe<T> {
public class SingleElementPipe<T> extends AbstractPipe<T> {
private T element; private T element;
......
package teetime.variant.methodcallWithPorts.framework.core.pipe; package teetime.variant.methodcallWithPorts.framework.core.pipe;
import java.util.concurrent.atomic.AtomicReference;
import teetime.util.concurrent.spsc.FFBufferOrdered3; import teetime.util.concurrent.spsc.FFBufferOrdered3;
import teetime.variant.methodcallWithPorts.framework.core.InputPort; import teetime.variant.methodcallWithPorts.framework.core.InputPort;
import teetime.variant.methodcallWithPorts.framework.core.OutputPort; import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
import teetime.variant.methodcallWithPorts.framework.core.Signal;
public class SpScPipe<T> extends AbstractPipe<T> { public class SpScPipe<T> extends AbstractPipe<T> {
private final FFBufferOrdered3<T> queue; private final FFBufferOrdered3<T> queue;
private int maxSize; private int maxSize;
private final AtomicReference<Signal> signal = new AtomicReference<Signal>();
private SpScPipe(final int capacity) { private SpScPipe(final int capacity) {
this.queue = new FFBufferOrdered3<T>(capacity); this.queue = new FFBufferOrdered3<T>(capacity);
...@@ -53,4 +57,13 @@ public class SpScPipe<T> extends AbstractPipe<T> { ...@@ -53,4 +57,13 @@ public class SpScPipe<T> extends AbstractPipe<T> {
return this.maxSize; return this.maxSize;
} }
@Override
public void setSignal(final Signal signal) {
this.signal.lazySet(signal); // lazySet is legal due to our single-writer requirement
}
public Signal getSignal() {
return this.signal.get();
}
} }
...@@ -3,33 +3,8 @@ package teetime.variant.methodcallWithPorts.framework.core.pipe; ...@@ -3,33 +3,8 @@ package teetime.variant.methodcallWithPorts.framework.core.pipe;
import teetime.variant.methodcallWithPorts.framework.core.InputPort; import teetime.variant.methodcallWithPorts.framework.core.InputPort;
import teetime.variant.methodcallWithPorts.framework.core.OutputPort; import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
public class UnorderedGrowablePipe<T> extends AbstractPipe<T> { public class UnorderedGrowablePipe<T> extends IntraThreadPipe<T> {
// private static final class ArrayWrapper2<T> {
//
// private final T[] elements;
//
// // private int lastFreeIndex;
//
// @SuppressWarnings("unchecked")
// public ArrayWrapper2(final int initialCapacity) {
// super();
// this.elements = (T[]) new Object[initialCapacity];
// }
//
// public final T get(final int index) {
// return this.elements[index];
// }
//
// public final void put(final int index, final T element) {
// this.elements[index] = element;
// }
//
// public final int getCapacity() {
// return this.elements.length;
// }
//
// }
private final int MIN_CAPACITY; private final int MIN_CAPACITY;
private T[] elements; private T[] elements;
......
...@@ -36,12 +36,12 @@ public final class Distributor<T> extends AbstractStage<T, T> { ...@@ -36,12 +36,12 @@ public final class Distributor<T> extends AbstractStage<T, T> {
@Override @Override
public void onIsPipelineHead() { public void onIsPipelineHead() {
for (OutputPort<?> op : this.outputPorts) { // for (OutputPort<?> op : this.outputPorts) {
op.getPipe().close(); // op.getPipe().close();
if (this.logger.isDebugEnabled()) { // if (this.logger.isDebugEnabled()) {
this.logger.debug("End signal sent, size: " + op.getPipe().size()); // this.logger.debug("End signal sent, size: " + op.getPipe().size());
} // }
} // }
// for (OutputPort<?> op : this.outputPorts) { // for (OutputPort<?> op : this.outputPorts) {
// op.pipe = null; // op.pipe = null;
......
...@@ -6,6 +6,7 @@ import java.util.List; ...@@ -6,6 +6,7 @@ import java.util.List;
import teetime.util.ConstructorClosure; import teetime.util.ConstructorClosure;
import teetime.variant.methodcallWithPorts.framework.core.InputPort; import teetime.variant.methodcallWithPorts.framework.core.InputPort;
import teetime.variant.methodcallWithPorts.framework.core.OutputPort; import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
import teetime.variant.methodcallWithPorts.framework.core.Signal;
import teetime.variant.methodcallWithPorts.framework.core.StageWithPort; import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
public class EndStage<T> implements StageWithPort<T, T> { public class EndStage<T> implements StageWithPort<T, T> {
...@@ -64,4 +65,9 @@ public class EndStage<T> implements StageWithPort<T, T> { ...@@ -64,4 +65,9 @@ public class EndStage<T> implements StageWithPort<T, T> {
} }
@Override
public void onSignal(final Signal signal, final InputPort<?> inputPort) {
// do nothing
}
} }
...@@ -2,9 +2,13 @@ package teetime.variant.methodcallWithPorts.stage; ...@@ -2,9 +2,13 @@ package teetime.variant.methodcallWithPorts.stage;
import teetime.util.list.CommittableQueue; import teetime.util.list.CommittableQueue;
import teetime.variant.methodcallWithPorts.framework.core.AbstractStage; import teetime.variant.methodcallWithPorts.framework.core.AbstractStage;
import teetime.variant.methodcallWithPorts.framework.core.Signal;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
public class Relay<T> extends AbstractStage<T, T> { public class Relay<T> extends AbstractStage<T, T> {
private SpScPipe<T> inputPipe;
public Relay() { public Relay() {
this.setReschedulable(true); this.setReschedulable(true);
} }
...@@ -13,7 +17,8 @@ public class Relay<T> extends AbstractStage<T, T> { ...@@ -13,7 +17,8 @@ public class Relay<T> extends AbstractStage<T, T> {
public void executeWithPorts() { public void executeWithPorts() {
T element = this.getInputPort().receive(); T element = this.getInputPort().receive();
if (null == element) { if (null == element) {
if (this.getInputPort().getPipe().isClosed()) { // if (this.getInputPort().getPipe().isClosed()) {
if (this.inputPipe.getSignal() == Signal.FINISHED) {
this.setReschedulable(false); this.setReschedulable(false);
assert 0 == this.getInputPort().getPipe().size(); assert 0 == this.getInputPort().getPipe().size();
} }
...@@ -24,10 +29,16 @@ public class Relay<T> extends AbstractStage<T, T> { ...@@ -24,10 +29,16 @@ public class Relay<T> extends AbstractStage<T, T> {
} }
@Override @Override
public void onIsPipelineHead() { public void onStart() {
if (this.getInputPort().getPipe().isClosed()) { this.inputPipe = (SpScPipe<T>) this.getInputPort().getPipe();
this.setReschedulable(false); super.onStart();
} }
@Override
public void onIsPipelineHead() {
// if (this.getInputPort().getPipe().isClosed()) {
// this.setReschedulable(false);
// }
} }
@Override @Override
......
...@@ -54,10 +54,10 @@ public class Distributor<T> extends AbstractStage<T, T> { ...@@ -54,10 +54,10 @@ public class Distributor<T> extends AbstractStage<T, T> {
@Override @Override
public void onIsPipelineHead() { public void onIsPipelineHead() {
for (OutputPort<T> op : this.outputPortList) { // for (OutputPort<T> op : this.outputPortList) {
op.getPipe().close(); // op.getPipe().close();
System.out.println("End signal sent, size: " + op.getPipe().size()); // System.out.println("End signal sent, size: " + op.getPipe().size());
} // }
} }
@Override @Override
...@@ -83,4 +83,5 @@ public class Distributor<T> extends AbstractStage<T, T> { ...@@ -83,4 +83,5 @@ public class Distributor<T> extends AbstractStage<T, T> {
this.execute5(element); this.execute5(element);
} }
} }
...@@ -24,6 +24,7 @@ import teetime.variant.explicitScheduling.examples.throughput.TimestampObject; ...@@ -24,6 +24,7 @@ import teetime.variant.explicitScheduling.examples.throughput.TimestampObject;
import teetime.variant.explicitScheduling.framework.core.Analysis; import teetime.variant.explicitScheduling.framework.core.Analysis;
import teetime.variant.methodcallWithPorts.framework.core.Pipeline; import teetime.variant.methodcallWithPorts.framework.core.Pipeline;
import teetime.variant.methodcallWithPorts.framework.core.RunnableStage; import teetime.variant.methodcallWithPorts.framework.core.RunnableStage;
import teetime.variant.methodcallWithPorts.framework.core.Signal;
import teetime.variant.methodcallWithPorts.framework.core.StageWithPort; import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
import teetime.variant.methodcallWithPorts.framework.core.pipe.IPipe; import teetime.variant.methodcallWithPorts.framework.core.pipe.IPipe;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe; import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
...@@ -159,7 +160,8 @@ public class MethodCallThroughputAnalysis17 extends Analysis { ...@@ -159,7 +160,8 @@ public class MethodCallThroughputAnalysis17 extends Analysis {
for (int i = 0; i < this.numInputObjects; i++) { for (int i = 0; i < this.numInputObjects; i++) {
startPipe.add(this.inputObjectCreator.create()); startPipe.add(this.inputObjectCreator.create());
} }
startPipe.close(); // startPipe.close();
startPipe.setSignal(Signal.FINISHED);
UnorderedGrowablePipe.connect(relay.getOutputPort(), startTimestampFilter.getInputPort()); UnorderedGrowablePipe.connect(relay.getOutputPort(), startTimestampFilter.getInputPort());
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment