Skip to content
Snippets Groups Projects
Commit 22d6aae4 authored by Christian Wulf's avatar Christian Wulf
Browse files

added recursive version of pipeline

parent e8d544bf
No related branches found
No related tags found
No related merge requests found
...@@ -20,6 +20,8 @@ abstract class AbstractStage<I, O> implements Stage<I, O> { ...@@ -20,6 +20,8 @@ abstract class AbstractStage<I, O> implements Stage<I, O> {
private Stage successor; private Stage successor;
private boolean reschedulable;
// @Override // @Override
// public InputPort<I> getInputPort() { // public InputPort<I> getInputPort() {
// return this.inputPort; // return this.inputPort;
...@@ -48,18 +50,10 @@ abstract class AbstractStage<I, O> implements Stage<I, O> { ...@@ -48,18 +50,10 @@ abstract class AbstractStage<I, O> implements Stage<I, O> {
// throw new IllegalStateException(); // throw new IllegalStateException();
// } // }
// boolean inputIsEmpty = elements.isEmpty();
this.execute4(elements); this.execute4(elements);
this.outputElements.commit(); this.outputElements.commit();
// boolean outputIsEmpty = this.outputElements.isEmpty();
//
// if (inputIsEmpty && outputIsEmpty) {
// this.disable();
// }
return this.outputElements; return this.outputElements;
} }
...@@ -69,6 +63,11 @@ abstract class AbstractStage<I, O> implements Stage<I, O> { ...@@ -69,6 +63,11 @@ abstract class AbstractStage<I, O> implements Stage<I, O> {
protected final void send(final O element) { protected final void send(final O element) {
this.outputElements.addToTailUncommitted(element); this.outputElements.addToTailUncommitted(element);
this.outputElements.commit();
do {
CommittableQueue execute = this.next().execute2(this.outputElements);
} while (this.next().isReschedulable());
} }
@Override @Override
...@@ -117,4 +116,13 @@ abstract class AbstractStage<I, O> implements Stage<I, O> { ...@@ -117,4 +116,13 @@ abstract class AbstractStage<I, O> implements Stage<I, O> {
this.successor = successor; this.successor = successor;
} }
@Override
public boolean isReschedulable() {
return this.reschedulable;
}
public void setReschedulable(final boolean reschedulable) {
this.reschedulable = reschedulable;
}
} }
...@@ -12,7 +12,9 @@ public abstract class ConsumerStage<I, O> extends AbstractStage<I, O> { ...@@ -12,7 +12,9 @@ public abstract class ConsumerStage<I, O> extends AbstractStage<I, O> {
return this.outputElements; return this.outputElements;
} }
return super.execute2(elements); CommittableQueue<O> output = super.execute2(elements);
this.setReschedulable(!elements.isEmpty()); // costs ~1200 ns on chw-work
return output;
} }
} }
...@@ -80,7 +80,7 @@ public class MethodCallThroughputAnalysis2 extends Analysis { ...@@ -80,7 +80,7 @@ public class MethodCallThroughputAnalysis2 extends Analysis {
do { do {
outputQueue = pipeline.execute2(inputQueue); outputQueue = pipeline.execute2(inputQueue);
} while (pipeline.getSchedulingInformation().isActive()); } while (pipeline.getSchedulingInformation().isActive() && pipeline.isReschedulable());
} }
}; };
......
...@@ -75,11 +75,13 @@ public class MethodCallThroughputAnalysis8 extends Analysis { ...@@ -75,11 +75,13 @@ public class MethodCallThroughputAnalysis8 extends Analysis {
final AbstractStage[] stages = stageList.toArray(new AbstractStage[0]); final AbstractStage[] stages = stageList.toArray(new AbstractStage[0]);
final WrappingPipeline pipeline = new WrappingPipeline() { final WrappingPipeline pipeline = new WrappingPipeline() {
private int startIndex;
@Override @Override
public boolean execute() { public boolean execute() {
// using the foreach for arrays (i.e., w/o using an iterator variable) increases the performance from 200ms to 130ms // using the foreach for arrays (i.e., w/o using an iterator variable) increases the performance from 200ms to 130ms
Object element = null; Object element = null;
for (int i = 0; i < stages.length; i++) { for (int i = this.startIndex; i < stages.length; i++) {
Stage stage = stages[i]; Stage stage = stages[i];
element = stage.execute(element); element = stage.execute(element);
if (element == null) { if (element == null) {
......
...@@ -40,6 +40,7 @@ public class ObjectProducer<T> extends ProducerStage<Void, T> { ...@@ -40,6 +40,7 @@ public class ObjectProducer<T> extends ProducerStage<Void, T> {
@Override @Override
public T execute(final Object element) { public T execute(final Object element) {
if (this.numInputObjects == 0) { if (this.numInputObjects == 0) {
this.setReschedulable(false);
return null; return null;
} }
...@@ -89,6 +90,7 @@ public class ObjectProducer<T> extends ProducerStage<Void, T> { ...@@ -89,6 +90,7 @@ public class ObjectProducer<T> extends ProducerStage<Void, T> {
@Override @Override
protected void execute4(final CommittableQueue<Void> elements) { protected void execute4(final CommittableQueue<Void> elements) {
if (this.numInputObjects == 0) { if (this.numInputObjects == 0) {
this.setReschedulable(false);
return; return;
} }
......
...@@ -20,6 +20,8 @@ public class Pipeline<I, O> implements Stage<I, O>, OnDisableListener { ...@@ -20,6 +20,8 @@ public class Pipeline<I, O> implements Stage<I, O>, OnDisableListener {
private int startIndex; private int startIndex;
private OnDisableListener listener; private OnDisableListener listener;
private boolean reschedulable;
void setFirstStage(final Stage<I, ?> stage) { void setFirstStage(final Stage<I, ?> stage) {
this.firstStage = stage; this.firstStage = stage;
} }
...@@ -46,10 +48,14 @@ public class Pipeline<I, O> implements Stage<I, O>, OnDisableListener { ...@@ -46,10 +48,14 @@ public class Pipeline<I, O> implements Stage<I, O>, OnDisableListener {
// below is faster than above (probably because of the instantiation of a list iterator in each (!) execution) // below is faster than above (probably because of the instantiation of a list iterator in each (!) execution)
CommittableQueue queue = elements; CommittableQueue queue = elements;
for (int i = this.startIndex; i < this.stages.length; i++) {
Stage<?, ?> stage = this.stages[i]; // for (int i = this.startIndex; i < this.stages.length; i++) {
queue = stage.execute2(queue); // Stage<?, ?> stage = this.stages[i];
} // queue = stage.execute2(queue);
// }
this.stages[0].execute2(elements);
this.setReschedulable(this.stages[0].isReschedulable());
return queue; return queue;
} }
...@@ -89,6 +95,11 @@ public class Pipeline<I, O> implements Stage<I, O>, OnDisableListener { ...@@ -89,6 +95,11 @@ public class Pipeline<I, O> implements Stage<I, O>, OnDisableListener {
stage.setParentStage(this, i); stage.setParentStage(this, i);
stage.setListener(this); stage.setListener(this);
} }
for (int i = 0; i < this.stages.length - 1; i++) {
Stage<?, ?> stage = this.stages[i];
stage.setSuccessor(this.stages[i + 1]);
}
} }
// //
...@@ -156,6 +167,15 @@ public class Pipeline<I, O> implements Stage<I, O>, OnDisableListener { ...@@ -156,6 +167,15 @@ public class Pipeline<I, O> implements Stage<I, O>, OnDisableListener {
throw new IllegalStateException(); throw new IllegalStateException();
} }
@Override
public boolean isReschedulable() {
return this.reschedulable;
}
public void setReschedulable(final boolean reschedulable) {
this.reschedulable = reschedulable;
}
// @Override // @Override
// public OutputPort getOutputPort() { // public OutputPort getOutputPort() {
// return this.lastStage.getOutputPort(); // return this.lastStage.getOutputPort();
......
...@@ -4,6 +4,10 @@ import teetime.util.list.CommittableQueue; ...@@ -4,6 +4,10 @@ import teetime.util.list.CommittableQueue;
public abstract class ProducerStage<I, O> extends AbstractStage<I, O> { public abstract class ProducerStage<I, O> extends AbstractStage<I, O> {
public ProducerStage() {
this.setReschedulable(true);
}
@Override @Override
public CommittableQueue<O> execute2(final CommittableQueue<I> elements) { public CommittableQueue<O> execute2(final CommittableQueue<I> elements) {
CommittableQueue<O> outputElements = super.execute2(elements); CommittableQueue<O> outputElements = super.execute2(elements);
...@@ -15,4 +19,5 @@ public abstract class ProducerStage<I, O> extends AbstractStage<I, O> { ...@@ -15,4 +19,5 @@ public abstract class ProducerStage<I, O> extends AbstractStage<I, O> {
return outputElements; return outputElements;
} }
} }
...@@ -27,4 +27,6 @@ public interface Stage<I, O> { ...@@ -27,4 +27,6 @@ public interface Stage<I, O> {
Stage next(); Stage next();
void setSuccessor(Stage<?, ?> successor); void setSuccessor(Stage<?, ?> successor);
boolean isReschedulable();
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment