Skip to content
Snippets Groups Projects
Commit 0f98dac4 authored by Christian Wulf's avatar Christian Wulf
Browse files

added recursive version of pipeline

parent e7ddf6c9
No related branches found
No related tags found
No related merge requests found
......@@ -20,6 +20,8 @@ abstract class AbstractStage<I, O> implements Stage<I, O> {
private Stage successor;
private boolean reschedulable;
// @Override
// public InputPort<I> getInputPort() {
// return this.inputPort;
......@@ -48,18 +50,10 @@ abstract class AbstractStage<I, O> implements Stage<I, O> {
// throw new IllegalStateException();
// }
// boolean inputIsEmpty = elements.isEmpty();
this.execute4(elements);
this.outputElements.commit();
// boolean outputIsEmpty = this.outputElements.isEmpty();
//
// if (inputIsEmpty && outputIsEmpty) {
// this.disable();
// }
return this.outputElements;
}
......@@ -69,6 +63,11 @@ abstract class AbstractStage<I, O> implements Stage<I, O> {
protected final void send(final O element) {
this.outputElements.addToTailUncommitted(element);
this.outputElements.commit();
do {
CommittableQueue execute = this.next().execute2(this.outputElements);
} while (this.next().isReschedulable());
}
@Override
......@@ -117,4 +116,13 @@ abstract class AbstractStage<I, O> implements Stage<I, O> {
this.successor = successor;
}
@Override
public boolean isReschedulable() {
return this.reschedulable;
}
public void setReschedulable(final boolean reschedulable) {
this.reschedulable = reschedulable;
}
}
......@@ -12,7 +12,9 @@ public abstract class ConsumerStage<I, O> extends AbstractStage<I, O> {
return this.outputElements;
}
return super.execute2(elements);
CommittableQueue<O> output = super.execute2(elements);
this.setReschedulable(!elements.isEmpty()); // costs ~1200 ns on chw-work
return output;
}
}
......@@ -80,7 +80,7 @@ public class MethodCallThroughputAnalysis2 extends Analysis {
do {
outputQueue = pipeline.execute2(inputQueue);
} while (pipeline.getSchedulingInformation().isActive());
} while (pipeline.getSchedulingInformation().isActive() && pipeline.isReschedulable());
}
};
......
......@@ -75,11 +75,13 @@ public class MethodCallThroughputAnalysis8 extends Analysis {
final AbstractStage[] stages = stageList.toArray(new AbstractStage[0]);
final WrappingPipeline pipeline = new WrappingPipeline() {
private int startIndex;
@Override
public boolean execute() {
// using the foreach for arrays (i.e., w/o using an iterator variable) increases the performance from 200ms to 130ms
Object element = null;
for (int i = 0; i < stages.length; i++) {
for (int i = this.startIndex; i < stages.length; i++) {
Stage stage = stages[i];
element = stage.execute(element);
if (element == null) {
......
......@@ -40,6 +40,7 @@ public class ObjectProducer<T> extends ProducerStage<Void, T> {
@Override
public T execute(final Object element) {
if (this.numInputObjects == 0) {
this.setReschedulable(false);
return null;
}
......@@ -89,6 +90,7 @@ public class ObjectProducer<T> extends ProducerStage<Void, T> {
@Override
protected void execute4(final CommittableQueue<Void> elements) {
if (this.numInputObjects == 0) {
this.setReschedulable(false);
return;
}
......
......@@ -20,6 +20,8 @@ public class Pipeline<I, O> implements Stage<I, O>, OnDisableListener {
private int startIndex;
private OnDisableListener listener;
private boolean reschedulable;
void setFirstStage(final Stage<I, ?> stage) {
this.firstStage = stage;
}
......@@ -46,10 +48,14 @@ public class Pipeline<I, O> implements Stage<I, O>, OnDisableListener {
// below is faster than above (probably because of the instantiation of a list iterator in each (!) execution)
CommittableQueue queue = elements;
for (int i = this.startIndex; i < this.stages.length; i++) {
Stage<?, ?> stage = this.stages[i];
queue = stage.execute2(queue);
}
// for (int i = this.startIndex; i < this.stages.length; i++) {
// Stage<?, ?> stage = this.stages[i];
// queue = stage.execute2(queue);
// }
this.stages[0].execute2(elements);
this.setReschedulable(this.stages[0].isReschedulable());
return queue;
}
......@@ -89,6 +95,11 @@ public class Pipeline<I, O> implements Stage<I, O>, OnDisableListener {
stage.setParentStage(this, i);
stage.setListener(this);
}
for (int i = 0; i < this.stages.length - 1; i++) {
Stage<?, ?> stage = this.stages[i];
stage.setSuccessor(this.stages[i + 1]);
}
}
//
......@@ -156,6 +167,15 @@ public class Pipeline<I, O> implements Stage<I, O>, OnDisableListener {
throw new IllegalStateException();
}
@Override
public boolean isReschedulable() {
return this.reschedulable;
}
public void setReschedulable(final boolean reschedulable) {
this.reschedulable = reschedulable;
}
// @Override
// public OutputPort getOutputPort() {
// return this.lastStage.getOutputPort();
......
......@@ -4,6 +4,10 @@ import teetime.util.list.CommittableQueue;
public abstract class ProducerStage<I, O> extends AbstractStage<I, O> {
public ProducerStage() {
this.setReschedulable(true);
}
@Override
public CommittableQueue<O> execute2(final CommittableQueue<I> elements) {
CommittableQueue<O> outputElements = super.execute2(elements);
......@@ -15,4 +19,5 @@ public abstract class ProducerStage<I, O> extends AbstractStage<I, O> {
return outputElements;
}
}
......@@ -27,4 +27,6 @@ public interface Stage<I, O> {
Stage next();
void setSuccessor(Stage<?, ?> successor);
boolean isReschedulable();
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment