Skip to content
Snippets Groups Projects
Commit b93e2ab2 authored by Christian Wulf's avatar Christian Wulf
Browse files

added cached access on target stage

parent aa03dfa7
No related branches found
No related tags found
No related merge requests found
Showing
with 49 additions and 9 deletions
eclipse.preferences.version=1
org.moreunit.preferences.version=2
......@@ -22,6 +22,11 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> {
private boolean reschedulable;
/**
* cached successor for default output port
*/
private StageWithPort<?, ?> next;
public AbstractStage() {
this.id = UUID.randomUUID().toString(); // the id should only be represented by a UUID, not additionally by the class name
this.logger = LogFactory.getLog(this.getClass().getName() + "(" + this.id + ")");
......@@ -81,7 +86,8 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> {
protected final void send(final OutputPort<O> outputPort, final O element) {
outputPort.send(element);
StageWithPort<?, ?> next = outputPort.getPipe().getTargetPort().getOwningStage();
// StageWithPort<?, ?> next = outputPort.getPipe().getTargetPort().getOwningStage();
StageWithPort<?, ?> next = outputPort.getCachedTargetStage();
do {
next.executeWithPorts();
......@@ -102,6 +108,7 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> {
@Override
public void onStart() {
// empty default implementation
this.next = (this.outputPort.getPipe() != null) ? this.outputPort.getPipe().getTargetPort().getOwningStage() : null;
}
@Override
......
......@@ -26,6 +26,11 @@ public class InputPort<T> {
return this.pipe;
}
/**
* Connects this input port with the given <code>pipe</code> bi-directionally
*
* @param pipe
*/
public void setPipe(final IPipe<T> pipe) {
this.pipe = pipe;
pipe.setTargetPort(this);
......
......@@ -5,6 +5,14 @@ import teetime.variant.methodcallWithPorts.framework.core.pipe.IPipe;
public class OutputPort<T> {
private IPipe<T> pipe;
/**
* Performance cache: Avoids the following method chain
*
* <pre>
* this.getPipe().getTargetPort().getOwningStage()
* </pre>
*/
private StageWithPort<?, ?> cachedTargetStage;
public void send(final T element) {
this.pipe.add(element);
......@@ -17,4 +25,13 @@ public class OutputPort<T> {
public void setPipe(final IPipe<T> pipe) {
this.pipe = pipe;
}
public StageWithPort<?, ?> getCachedTargetStage() {
return this.cachedTargetStage;
}
public void setCachedTargetStage(final StageWithPort<?, ?> cachedTargetStage) {
this.cachedTargetStage = cachedTargetStage;
}
}
......@@ -22,6 +22,8 @@ public class Pipeline<I, O> implements StageWithPort<I, O> {
private final List<StageWithPort<?, ?>> intermediateStages = new LinkedList<StageWithPort<?, ?>>();
private StageWithPort<?, O> lastStage;
// BETTER remove the stage array and use the output ports instead for passing a signal to all stages in the same thread; what about multiple same signals due to
// multiple input ports?
private StageWithPort<?, ?>[] stages;
private StageWithPort<?, ?> parentStage;
// private int startIndex;
......
......@@ -22,6 +22,7 @@ public class OrderedGrowableArrayPipe<T> extends AbstractPipe<T> {
IPipe<T> pipe = new OrderedGrowableArrayPipe<T>();
sourcePort.setPipe(pipe);
targetPort.setPipe(pipe);
sourcePort.setCachedTargetStage(targetPort.getOwningStage());
}
@Override
......
......@@ -21,6 +21,7 @@ public class OrderedGrowablePipe<T> extends AbstractPipe<T> {
IPipe<T> pipe = new OrderedGrowablePipe<T>();
sourcePort.setPipe(pipe);
targetPort.setPipe(pipe);
sourcePort.setCachedTargetStage(targetPort.getOwningStage());
}
@Override
......
......@@ -12,6 +12,7 @@ public class Pipe<T> extends AbstractPipe<T> {
IPipe<T> pipe = new Pipe<T>();
sourcePort.setPipe(pipe);
targetPort.setPipe(pipe);
sourcePort.setCachedTargetStage(targetPort.getOwningStage());
}
/*
......
......@@ -12,6 +12,7 @@ public class SingleElementPipe<T> extends AbstractPipe<T> {
IPipe<T> pipe = new SingleElementPipe<T>();
sourcePort.setPipe(pipe);
targetPort.setPipe(pipe);
sourcePort.setCachedTargetStage(targetPort.getOwningStage());
}
@Override
......
......@@ -16,6 +16,7 @@ public class SpScPipe<T> extends AbstractPipe<T> {
IPipe<T> pipe = new SpScPipe<T>(initialCapacity);
sourcePort.setPipe(pipe);
targetPort.setPipe(pipe);
sourcePort.setCachedTargetStage(targetPort.getOwningStage());
}
@Override
......
......@@ -46,6 +46,7 @@ public class UnorderedGrowablePipe<T> extends AbstractPipe<T> {
IPipe<T> pipe = new UnorderedGrowablePipe<T>();
sourcePort.setPipe(pipe);
targetPort.setPipe(pipe);
sourcePort.setCachedTargetStage(targetPort.getOwningStage());
}
@Override
......
......@@ -27,7 +27,7 @@ import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage;
*/
public class CollectorSink<T> extends ConsumerStage<T, Object> {
private static final int THRESHOLD = 100; // TODO make configurable or use an sysout stage instead
private static final int THRESHOLD = 10000; // TODO make configurable or use an sysout stage instead
private final List<T> elements;
......
......@@ -22,8 +22,9 @@ import teetime.variant.explicitScheduling.examples.throughput.TimestampObject;
import teetime.variant.explicitScheduling.framework.core.Analysis;
import teetime.variant.methodcallWithPorts.framework.core.Pipeline;
import teetime.variant.methodcallWithPorts.framework.core.RunnableStage;
import teetime.variant.methodcallWithPorts.framework.core.pipe.OrderedGrowableArrayPipe;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
import teetime.variant.methodcallWithPorts.framework.core.pipe.UnorderedGrowablePipe;
import teetime.variant.methodcallWithPorts.stage.Clock;
import teetime.variant.methodcallWithPorts.stage.CollectorSink;
import teetime.variant.methodcallWithPorts.stage.Delay;
......@@ -99,15 +100,15 @@ public class MethodCallThroughputAnalysis15 extends Analysis {
SpScPipe.connect(clock.getOutputPort(), delay.getTimestampTriggerInputPort(), SPSC_INITIAL_CAPACITY);
UnorderedGrowablePipe.connect(objectProducer.getOutputPort(), startTimestampFilter.getInputPort());
UnorderedGrowablePipe.connect(startTimestampFilter.getOutputPort(), noopFilters[0].getInputPort());
SingleElementPipe.connect(objectProducer.getOutputPort(), startTimestampFilter.getInputPort());
SingleElementPipe.connect(startTimestampFilter.getOutputPort(), noopFilters[0].getInputPort());
for (int i = 0; i < noopFilters.length - 1; i++) {
UnorderedGrowablePipe.connect(noopFilters[i].getOutputPort(), noopFilters[i + 1].getInputPort());
SingleElementPipe.connect(noopFilters[i].getOutputPort(), noopFilters[i + 1].getInputPort());
}
UnorderedGrowablePipe.connect(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort());
UnorderedGrowablePipe.connect(stopTimestampFilter.getOutputPort(), delay.getInputPort());
SingleElementPipe.connect(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort());
OrderedGrowableArrayPipe.connect(stopTimestampFilter.getOutputPort(), delay.getInputPort());
UnorderedGrowablePipe.connect(delay.getOutputPort(), collectorSink.getInputPort());
SingleElementPipe.connect(delay.getOutputPort(), collectorSink.getInputPort());
return new RunnableStage(pipeline);
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment