Skip to content
Snippets Groups Projects
Commit 78182435 authored by Christian Wulf's avatar Christian Wulf
Browse files

added generic Signal concept

parent f5a669dc
Branches
Tags
No related merge requests found
Showing
with 128 additions and 74 deletions
......@@ -102,6 +102,27 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> {
return this.id;
}
/**
* May not be invoked outside of IPipe implementations
*/
@Override
public void onSignal(final Signal signal, final InputPort<?> inputPort) {
switch (signal) {
case FINISHED:
this.onFinished();
break;
default:
this.logger.warn("Aborted sending signal " + signal + ". Reason: Unknown signal.");
break;
}
this.getOutputPort().sendSignal(signal);
}
protected void onFinished() {
// empty default implementation
}
@Override
public String toString() {
return this.getClass().getName() + ": " + this.id;
......
......@@ -34,4 +34,8 @@ public class OutputPort<T> {
this.cachedTargetStage = cachedTargetStage;
}
public void sendSignal(final Signal signal) {
this.pipe.setSignal(signal);
}
}
......@@ -61,13 +61,13 @@ public class Pipeline<I, O> implements StageWithPort<I, O> {
public void executeWithPorts() {
StageWithPort<?, ?> headStage = this.stages[this.firstStageIndex];
do {
// do {
headStage.executeWithPorts();
} while (headStage.isReschedulable());
// } while (headStage.isReschedulable());
// headStage.sendFinishedSignalToAllSuccessorStages();
this.updateRescheduable(headStage);
// this.updateRescheduable(headStage);
}
private final void updateRescheduable(final StageWithPort<?, ?> stage) {
......@@ -164,4 +164,9 @@ public class Pipeline<I, O> implements StageWithPort<I, O> {
this.lastStage = null;
}
@Override
public void onSignal(final Signal signal, final InputPort<?> inputPort) {
throw new IllegalStateException("Should not be used since the signal is directly passed via the first stage's input port.");
}
}
......@@ -3,12 +3,12 @@ package teetime.variant.methodcallWithPorts.framework.core;
import kieker.common.logging.Log;
import kieker.common.logging.LogFactory;
public class RunnableStage implements Runnable {
public class RunnableStage<I> implements Runnable {
private final StageWithPort<?, ?> stage;
private final StageWithPort<I, ?> stage;
private final Log logger;
public RunnableStage(final StageWithPort<?, ?> stage) {
public RunnableStage(final StageWithPort<I, ?> stage) {
this.stage = stage;
this.logger = LogFactory.getLog(stage.getClass());
}
......@@ -22,12 +22,11 @@ public class RunnableStage implements Runnable {
this.stage.executeWithPorts();
} while (this.stage.isReschedulable());
// stage.sendFinishedSignalToAllSuccessorStages();
this.stage.onSignal(Signal.FINISHED, this.stage.getInputPort());
} catch (RuntimeException e) {
this.logger.error("Terminating thread due to the following exception: ", e);
throw e;
}
}
}
package teetime.variant.methodcallWithPorts.framework.core;
public enum Signal {
FINISHED
}
package teetime.variant.methodcallWithPorts.framework.core;
public interface StageWithPort<I, O> {
InputPort<I> getInputPort();
......@@ -20,4 +19,6 @@ public interface StageWithPort<I, O> {
void onIsPipelineHead();
void onStart();
void onSignal(Signal signal, InputPort<?> inputPort);
}
package teetime.variant.methodcallWithPorts.framework.core.pipe;
import java.util.concurrent.atomic.AtomicBoolean;
import teetime.variant.methodcallWithPorts.framework.core.InputPort;
public abstract class AbstractPipe<T> implements IPipe<T> {
private final AtomicBoolean closed = new AtomicBoolean();
// private final AtomicBoolean closed = new AtomicBoolean();
private InputPort<T> targetPort;
@Override
public boolean isClosed() {
return this.closed.get();
}
@Override
public void close() {
this.closed.lazySet(true); // lazySet is legal due to our single-writer requirement
}
// @Override
// public boolean isClosed() {
// return this.closed.get();
// }
//
// @Override
// public void close() {
// this.closed.lazySet(true); // lazySet is legal due to our single-writer requirement
// }
@Override
public InputPort<T> getTargetPort() {
......
package teetime.variant.methodcallWithPorts.framework.core.pipe;
import teetime.variant.methodcallWithPorts.framework.core.InputPort;
import teetime.variant.methodcallWithPorts.framework.core.Signal;
public interface IPipe<T> {
......@@ -14,12 +15,14 @@ public interface IPipe<T> {
T readLast();
void close();
boolean isClosed();
// void close();
//
// boolean isClosed();
InputPort<T> getTargetPort();
void setTargetPort(InputPort<T> targetPort);
void setSignal(Signal signal);
}
package teetime.variant.methodcallWithPorts.framework.core.pipe;
import teetime.variant.methodcallWithPorts.framework.core.Signal;
public abstract class IntraThreadPipe<T> extends AbstractPipe<T> {
@Override
public void setSignal(final Signal signal) {
this.getTargetPort().getOwningStage().onSignal(signal, this.getTargetPort());
}
}
......@@ -4,7 +4,7 @@ import teetime.util.concurrent.workstealing.CircularArray;
import teetime.variant.methodcallWithPorts.framework.core.InputPort;
import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
public class OrderedGrowableArrayPipe<T> extends AbstractPipe<T> {
public class OrderedGrowableArrayPipe<T> extends IntraThreadPipe<T> {
private CircularArray<T> elements;
private int head;
......
......@@ -5,7 +5,7 @@ import java.util.LinkedList;
import teetime.variant.methodcallWithPorts.framework.core.InputPort;
import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
public class OrderedGrowablePipe<T> extends AbstractPipe<T> {
public class OrderedGrowablePipe<T> extends IntraThreadPipe<T> {
private LinkedList<T> elements;
......@@ -48,5 +48,4 @@ public class OrderedGrowablePipe<T> extends AbstractPipe<T> {
public int size() {
return this.elements.size();
}
}
......@@ -4,7 +4,7 @@ import teetime.util.list.CommittableResizableArrayQueue;
import teetime.variant.methodcallWithPorts.framework.core.InputPort;
import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
public class Pipe<T> extends AbstractPipe<T> {
public class Pipe<T> extends IntraThreadPipe<T> {
private final CommittableResizableArrayQueue<T> elements = new CommittableResizableArrayQueue<T>(null, 4);
......@@ -66,5 +66,4 @@ public class Pipe<T> extends AbstractPipe<T> {
public int size() {
return this.elements.size();
}
}
......@@ -3,8 +3,7 @@ package teetime.variant.methodcallWithPorts.framework.core.pipe;
import teetime.variant.methodcallWithPorts.framework.core.InputPort;
import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
//public class SingleElementPipe<T> implements IPipe<T> {
public class SingleElementPipe<T> extends AbstractPipe<T> {
public class SingleElementPipe<T> extends IntraThreadPipe<T> {
private T element;
......
package teetime.variant.methodcallWithPorts.framework.core.pipe;
import java.util.concurrent.atomic.AtomicReference;
import teetime.util.concurrent.spsc.FFBufferOrdered3;
import teetime.variant.methodcallWithPorts.framework.core.InputPort;
import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
import teetime.variant.methodcallWithPorts.framework.core.Signal;
public class SpScPipe<T> extends AbstractPipe<T> {
private final FFBufferOrdered3<T> queue;
private int maxSize;
private final AtomicReference<Signal> signal = new AtomicReference<Signal>();
private SpScPipe(final int capacity) {
this.queue = new FFBufferOrdered3<T>(capacity);
......@@ -53,4 +57,13 @@ public class SpScPipe<T> extends AbstractPipe<T> {
return this.maxSize;
}
@Override
public void setSignal(final Signal signal) {
this.signal.lazySet(signal); // lazySet is legal due to our single-writer requirement
}
public Signal getSignal() {
return this.signal.get();
}
}
......@@ -3,33 +3,8 @@ package teetime.variant.methodcallWithPorts.framework.core.pipe;
import teetime.variant.methodcallWithPorts.framework.core.InputPort;
import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
public class UnorderedGrowablePipe<T> extends AbstractPipe<T> {
// private static final class ArrayWrapper2<T> {
//
// private final T[] elements;
//
// // private int lastFreeIndex;
//
// @SuppressWarnings("unchecked")
// public ArrayWrapper2(final int initialCapacity) {
// super();
// this.elements = (T[]) new Object[initialCapacity];
// }
//
// public final T get(final int index) {
// return this.elements[index];
// }
//
// public final void put(final int index, final T element) {
// this.elements[index] = element;
// }
//
// public final int getCapacity() {
// return this.elements.length;
// }
//
// }
public class UnorderedGrowablePipe<T> extends IntraThreadPipe<T> {
private final int MIN_CAPACITY;
private T[] elements;
......
......@@ -36,12 +36,12 @@ public final class Distributor<T> extends AbstractStage<T, T> {
@Override
public void onIsPipelineHead() {
for (OutputPort<?> op : this.outputPorts) {
op.getPipe().close();
if (this.logger.isDebugEnabled()) {
this.logger.debug("End signal sent, size: " + op.getPipe().size());
}
}
// for (OutputPort<?> op : this.outputPorts) {
// op.getPipe().close();
// if (this.logger.isDebugEnabled()) {
// this.logger.debug("End signal sent, size: " + op.getPipe().size());
// }
// }
// for (OutputPort<?> op : this.outputPorts) {
// op.pipe = null;
......
......@@ -6,6 +6,7 @@ import java.util.List;
import teetime.util.ConstructorClosure;
import teetime.variant.methodcallWithPorts.framework.core.InputPort;
import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
import teetime.variant.methodcallWithPorts.framework.core.Signal;
import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
public class EndStage<T> implements StageWithPort<T, T> {
......@@ -64,4 +65,9 @@ public class EndStage<T> implements StageWithPort<T, T> {
}
@Override
public void onSignal(final Signal signal, final InputPort<?> inputPort) {
// do nothing
}
}
......@@ -2,9 +2,13 @@ package teetime.variant.methodcallWithPorts.stage;
import teetime.util.list.CommittableQueue;
import teetime.variant.methodcallWithPorts.framework.core.AbstractStage;
import teetime.variant.methodcallWithPorts.framework.core.Signal;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
public class Relay<T> extends AbstractStage<T, T> {
private SpScPipe<T> inputPipe;
public Relay() {
this.setReschedulable(true);
}
......@@ -13,7 +17,8 @@ public class Relay<T> extends AbstractStage<T, T> {
public void executeWithPorts() {
T element = this.getInputPort().receive();
if (null == element) {
if (this.getInputPort().getPipe().isClosed()) {
// if (this.getInputPort().getPipe().isClosed()) {
if (this.inputPipe.getSignal() == Signal.FINISHED) {
this.setReschedulable(false);
assert 0 == this.getInputPort().getPipe().size();
}
......@@ -24,10 +29,16 @@ public class Relay<T> extends AbstractStage<T, T> {
}
@Override
public void onIsPipelineHead() {
if (this.getInputPort().getPipe().isClosed()) {
this.setReschedulable(false);
public void onStart() {
this.inputPipe = (SpScPipe<T>) this.getInputPort().getPipe();
super.onStart();
}
@Override
public void onIsPipelineHead() {
// if (this.getInputPort().getPipe().isClosed()) {
// this.setReschedulable(false);
// }
}
@Override
......
......@@ -54,10 +54,10 @@ public class Distributor<T> extends AbstractStage<T, T> {
@Override
public void onIsPipelineHead() {
for (OutputPort<T> op : this.outputPortList) {
op.getPipe().close();
System.out.println("End signal sent, size: " + op.getPipe().size());
}
// for (OutputPort<T> op : this.outputPortList) {
// op.getPipe().close();
// System.out.println("End signal sent, size: " + op.getPipe().size());
// }
}
@Override
......@@ -83,4 +83,5 @@ public class Distributor<T> extends AbstractStage<T, T> {
this.execute5(element);
}
}
......@@ -24,6 +24,7 @@ import teetime.variant.explicitScheduling.examples.throughput.TimestampObject;
import teetime.variant.explicitScheduling.framework.core.Analysis;
import teetime.variant.methodcallWithPorts.framework.core.Pipeline;
import teetime.variant.methodcallWithPorts.framework.core.RunnableStage;
import teetime.variant.methodcallWithPorts.framework.core.Signal;
import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
import teetime.variant.methodcallWithPorts.framework.core.pipe.IPipe;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
......@@ -159,7 +160,8 @@ public class MethodCallThroughputAnalysis17 extends Analysis {
for (int i = 0; i < this.numInputObjects; i++) {
startPipe.add(this.inputObjectCreator.create());
}
startPipe.close();
// startPipe.close();
startPipe.setSignal(Signal.FINISHED);
UnorderedGrowablePipe.connect(relay.getOutputPort(), startTimestampFilter.getInputPort());
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment