Skip to content
Snippets Groups Projects
Commit 1364a26a authored by Christian Wulf's avatar Christian Wulf
Browse files

pipe calls the target stage from now on if it is an intra pipe

parent f1bffd97
No related branches found
No related tags found
No related merge requests found
Showing
with 64 additions and 34 deletions
...@@ -48,11 +48,13 @@ public abstract class AbstractStage implements StageWithPort { ...@@ -48,11 +48,13 @@ public abstract class AbstractStage implements StageWithPort {
return false; return false;
} }
StageWithPort next = outputPort.getCachedTargetStage(); outputPort.reportNewElement();
do { // StageWithPort next = outputPort.getCachedTargetStage();
next.executeWithPorts(); // PERFORMANCE use the return value as indicator for re-schedulability instead //
} while (next.isReschedulable()); // do {
// next.executeWithPorts(); // PERFORMANCE use the return value as indicator for re-schedulability instead
// } while (next.isReschedulable());
return true; return true;
} }
......
...@@ -2,7 +2,7 @@ package teetime.variant.methodcallWithPorts.framework.core; ...@@ -2,7 +2,7 @@ package teetime.variant.methodcallWithPorts.framework.core;
import teetime.variant.methodcallWithPorts.framework.core.signal.Signal; import teetime.variant.methodcallWithPorts.framework.core.signal.Signal;
public class OutputPort<T> extends AbstractPort<T> { public final class OutputPort<T> extends AbstractPort<T> {
/** /**
* Performance cache: Avoids the following method chain * Performance cache: Avoids the following method chain
...@@ -11,7 +11,7 @@ public class OutputPort<T> extends AbstractPort<T> { ...@@ -11,7 +11,7 @@ public class OutputPort<T> extends AbstractPort<T> {
* this.getPipe().getTargetPort().getOwningStage() * this.getPipe().getTargetPort().getOwningStage()
* </pre> * </pre>
*/ */
private StageWithPort cachedTargetStage; // private StageWithPort cachedTargetStage;
OutputPort() { OutputPort() {
super(); super();
...@@ -26,16 +26,21 @@ public class OutputPort<T> extends AbstractPort<T> { ...@@ -26,16 +26,21 @@ public class OutputPort<T> extends AbstractPort<T> {
return this.pipe.add(element); return this.pipe.add(element);
} }
public StageWithPort getCachedTargetStage() { // public StageWithPort getCachedTargetStage() {
return this.cachedTargetStage; // return this.cachedTargetStage;
} // }
@Deprecated
public void setCachedTargetStage(final StageWithPort cachedTargetStage) { public void setCachedTargetStage(final StageWithPort cachedTargetStage) {
this.cachedTargetStage = cachedTargetStage; // this.cachedTargetStage = cachedTargetStage;
} }
public void sendSignal(final Signal signal) { public void sendSignal(final Signal signal) {
this.pipe.setSignal(signal); this.pipe.setSignal(signal);
} }
public void reportNewElement() {
this.pipe.reportNewElement();
}
} }
package teetime.variant.methodcallWithPorts.framework.core.pipe; package teetime.variant.methodcallWithPorts.framework.core.pipe;
import teetime.variant.methodcallWithPorts.framework.core.InputPort; import teetime.variant.methodcallWithPorts.framework.core.InputPort;
import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
public abstract class AbstractPipe<T> implements IPipe<T> { public abstract class AbstractPipe<T> implements IPipe<T> {
// private final AtomicBoolean closed = new AtomicBoolean();
private InputPort<T> targetPort; private InputPort<T> targetPort;
// @Override /**
// public boolean isClosed() { * Performance cache: Avoids the following method chain
// return this.closed.get(); *
// } * <pre>
// * this.getPipe().getTargetPort().getOwningStage()
// @Override * </pre>
// public void close() { */
// this.closed.lazySet(true); // lazySet is legal due to our single-writer requirement protected StageWithPort cachedTargetStage;
// }
@Override @Override
public InputPort<T> getTargetPort() { public InputPort<T> getTargetPort() {
...@@ -25,6 +24,7 @@ public abstract class AbstractPipe<T> implements IPipe<T> { ...@@ -25,6 +24,7 @@ public abstract class AbstractPipe<T> implements IPipe<T> {
@Override @Override
public void setTargetPort(final InputPort<T> targetPort) { public void setTargetPort(final InputPort<T> targetPort) {
this.targetPort = targetPort; this.targetPort = targetPort;
this.cachedTargetStage = targetPort.getOwningStage();
} }
} }
...@@ -52,4 +52,9 @@ public final class DummyPipe implements IPipe { ...@@ -52,4 +52,9 @@ public final class DummyPipe implements IPipe {
@Override @Override
public void connectPorts(final OutputPort sourcePort, final InputPort targetPort) {} public void connectPorts(final OutputPort sourcePort, final InputPort targetPort) {}
@Override
public void reportNewElement() {
// do nothing
}
} }
...@@ -29,4 +29,6 @@ public interface IPipe<T> { ...@@ -29,4 +29,6 @@ public interface IPipe<T> {
// BETTER change signature to allow {OutputPort<T>, OutputPort<A0 extends T>, OutputPort<A1 extends T>, ...} // BETTER change signature to allow {OutputPort<T>, OutputPort<A0 extends T>, OutputPort<A1 extends T>, ...}
void connectPorts(OutputPort<T> sourcePort, InputPort<T> targetPort); void connectPorts(OutputPort<T> sourcePort, InputPort<T> targetPort);
void reportNewElement();
} }
...@@ -7,8 +7,13 @@ public abstract class IntraThreadPipe<T> extends AbstractPipe<T> { ...@@ -7,8 +7,13 @@ public abstract class IntraThreadPipe<T> extends AbstractPipe<T> {
@Override @Override
public void setSignal(final Signal signal) { public void setSignal(final Signal signal) {
if (this.getTargetPort() != null) { if (this.getTargetPort() != null) {
this.getTargetPort().getOwningStage().onSignal(signal, this.getTargetPort()); this.cachedTargetStage.onSignal(signal, this.getTargetPort());
} }
} }
@Override
public final void reportNewElement() {
this.cachedTargetStage.executeWithPorts();
}
} }
...@@ -83,4 +83,9 @@ public class SpScPipe<T> extends AbstractPipe<T> { ...@@ -83,4 +83,9 @@ public class SpScPipe<T> extends AbstractPipe<T> {
return this.signal.get(); return this.signal.get();
} }
@Override
public void reportNewElement() {
// do nothing
}
} }
...@@ -89,10 +89,10 @@ public class ChwWorkComparisonMethodcallWithPorts implements PerformanceCheckPro ...@@ -89,10 +89,10 @@ public class ChwWorkComparisonMethodcallWithPorts implements PerformanceCheckPro
// assertEquals(53, value17, 4.1); // +0 // assertEquals(53, value17, 4.1); // +0
// since 27.08.2014 (incl.) // since 27.08.2014 (incl.)
assertEquals(112, value14, 5.1); // +16 assertEquals(102, value14, 5.1); // +16
assertEquals(42, value10, 2.1); // +16 assertEquals(56, value10, 2.1); // +30
assertEquals(41, value11, 4.1); // -3 assertEquals(64, value11, 4.1); // +15
assertEquals(42, value9, 2.1); // +6 assertEquals(77, value9, 2.1); // +35
assertEquals(44, value15, 4.1); // +0 assertEquals(44, value15, 4.1); // +0
assertEquals(53, value17, 4.1); // +0 assertEquals(53, value17, 4.1); // +0
......
...@@ -23,7 +23,9 @@ import teetime.variant.explicitScheduling.framework.core.Analysis; ...@@ -23,7 +23,9 @@ import teetime.variant.explicitScheduling.framework.core.Analysis;
import teetime.variant.methodcallWithPorts.framework.core.Pipeline; import teetime.variant.methodcallWithPorts.framework.core.Pipeline;
import teetime.variant.methodcallWithPorts.framework.core.RunnableStage; import teetime.variant.methodcallWithPorts.framework.core.RunnableStage;
import teetime.variant.methodcallWithPorts.framework.core.StageWithPort; import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe; import teetime.variant.methodcallWithPorts.framework.core.pipe.IPipe;
import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory;
import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.ThreadCommunication;
import teetime.variant.methodcallWithPorts.stage.CollectorSink; import teetime.variant.methodcallWithPorts.stage.CollectorSink;
import teetime.variant.methodcallWithPorts.stage.NoopFilter; import teetime.variant.methodcallWithPorts.stage.NoopFilter;
import teetime.variant.methodcallWithPorts.stage.ObjectProducer; import teetime.variant.methodcallWithPorts.stage.ObjectProducer;
...@@ -37,8 +39,6 @@ import teetime.variant.methodcallWithPorts.stage.StopTimestampFilter; ...@@ -37,8 +39,6 @@ import teetime.variant.methodcallWithPorts.stage.StopTimestampFilter;
*/ */
public class MethodCallThroughputAnalysis14 extends Analysis { public class MethodCallThroughputAnalysis14 extends Analysis {
private static final int SPSC_INITIAL_CAPACITY = 4;
private long numInputObjects; private long numInputObjects;
private ConstructorClosure<TimestampObject> inputObjectCreator; private ConstructorClosure<TimestampObject> inputObjectCreator;
private int numNoopFilters; private int numNoopFilters;
...@@ -75,13 +75,19 @@ public class MethodCallThroughputAnalysis14 extends Analysis { ...@@ -75,13 +75,19 @@ public class MethodCallThroughputAnalysis14 extends Analysis {
pipeline.addIntermediateStage(stopTimestampFilter); pipeline.addIntermediateStage(stopTimestampFilter);
pipeline.setLastStage(collectorSink); pipeline.setLastStage(collectorSink);
SpScPipe.connect(objectProducer.getOutputPort(), startTimestampFilter.getInputPort(), SPSC_INITIAL_CAPACITY); PipeFactory pipeFactory = new PipeFactory();
SpScPipe.connect(startTimestampFilter.getOutputPort(), noopFilters[0].getInputPort(), SPSC_INITIAL_CAPACITY); IPipe<TimestampObject> pipe = pipeFactory.create(ThreadCommunication.INTRA);
pipe.connectPorts(objectProducer.getOutputPort(), startTimestampFilter.getInputPort());
pipe = pipeFactory.create(ThreadCommunication.INTRA);
pipe.connectPorts(startTimestampFilter.getOutputPort(), noopFilters[0].getInputPort());
for (int i = 0; i < noopFilters.length - 1; i++) { for (int i = 0; i < noopFilters.length - 1; i++) {
SpScPipe.connect(noopFilters[i].getOutputPort(), noopFilters[i + 1].getInputPort(), SPSC_INITIAL_CAPACITY); pipe = pipeFactory.create(ThreadCommunication.INTRA);
pipe.connectPorts(noopFilters[i].getOutputPort(), noopFilters[i + 1].getInputPort());
} }
SpScPipe.connect(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort(), SPSC_INITIAL_CAPACITY); pipe = pipeFactory.create(ThreadCommunication.INTRA);
SpScPipe.connect(stopTimestampFilter.getOutputPort(), collectorSink.getInputPort(), SPSC_INITIAL_CAPACITY); pipe.connectPorts(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort());
pipe = pipeFactory.create(ThreadCommunication.INTRA);
pipe.connectPorts(stopTimestampFilter.getOutputPort(), collectorSink.getInputPort());
return pipeline; return pipeline;
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment