Skip to content
Snippets Groups Projects
Commit f8d0b388 authored by Christian Wulf's avatar Christian Wulf
Browse files

added end signal strategy with minimal performance loss

parent b1031f69
No related branches found
No related tags found
No related merge requests found
Showing
with 229 additions and 35 deletions
...@@ -21,24 +21,24 @@ public class CommittableResizableArrayQueue<T> implements CommittableQueue<T> { ...@@ -21,24 +21,24 @@ public class CommittableResizableArrayQueue<T> implements CommittableQueue<T> {
} }
@Override @Override
public T get(final int index) { public final T get(final int index) {
T element = this.elements[index + 1]; T element = this.elements[index + 1];
return element; return element;
} }
@Override @Override
public void addToTailUncommitted(final T element) { public void addToTailUncommitted(final T element) {
if (this.lastFreeIndexUncommitted == this.capacity()) { // if (this.lastFreeIndexUncommitted == this.capacity()) { // TODO uncomment
this.grow(); // this.grow();
} // }
this.put(this.lastFreeIndexUncommitted++, element); this.put(this.lastFreeIndexUncommitted++, element);
} }
@Override @Override
public T removeFromHeadUncommitted() { public T removeFromHeadUncommitted() {
if (this.capacity() > this.MIN_CAPACITY && this.lastFreeIndexUncommitted < this.capacity() / 2) { // if (this.capacity() > this.MIN_CAPACITY && this.lastFreeIndexUncommitted < this.capacity() / 2) { // TODO uncomment
this.shrink(); // this.shrink();
} // }
T element = this.get(--this.lastFreeIndexUncommitted); T element = this.get(--this.lastFreeIndexUncommitted);
return element; return element;
} }
......
...@@ -3,12 +3,20 @@ package teetime.examples.throughput.methodcall; ...@@ -3,12 +3,20 @@ package teetime.examples.throughput.methodcall;
import teetime.util.list.CommittableQueue; import teetime.util.list.CommittableQueue;
import teetime.util.list.CommittableResizableArrayQueue; import teetime.util.list.CommittableResizableArrayQueue;
public abstract class AbstractStage<I, O> implements Stage<I, O> { abstract class AbstractStage<I, O> implements Stage<I, O> {
// private final InputPort<I> inputPort = new InputPort<I>(); // private final InputPort<I> inputPort = new InputPort<I>();
// private final OutputPort<O> outputPort = new OutputPort<O>(); // private final OutputPort<O> outputPort = new OutputPort<O>();
CommittableQueue<O> outputElements = new CommittableResizableArrayQueue<O>(null, 4); protected final CommittableQueue<O> outputElements = new CommittableResizableArrayQueue<O>(null, 4);
private final SchedulingInformation schedulingInformation = new SchedulingInformation();
private Stage parentStage;
private OnDisableListener listener;
private int index;
// @Override // @Override
// public InputPort<I> getInputPort() { // public InputPort<I> getInputPort() {
...@@ -21,24 +29,35 @@ public abstract class AbstractStage<I, O> implements Stage<I, O> { ...@@ -21,24 +29,35 @@ public abstract class AbstractStage<I, O> implements Stage<I, O> {
// } // }
@Override @Override
public final CommittableQueue<O> execute2(final CommittableQueue<I> elements) { public CommittableQueue<O> execute2(final CommittableQueue<I> elements) {
// pass through the end signal // pass through the end signal
// InputPort<I> port = this.getInputPort(); // InputPort<I> port = this.getInputPort();
if (elements != null) { // if (elements != null) {
// I element = port.read(); // // I element = port.read();
I element = elements.getTail(); // // I element = elements.getTail();
if (element == END_SIGNAL) { // // if (element == END_SIGNAL) {
this.send((O) END_SIGNAL); // // this.send((O) END_SIGNAL);
} else { // // } else {
// elements = this.getInputPort().pipe.getElements(); // // // elements = this.getInputPort().pipe.getElements();
this.execute4(elements); // // }
} //
} else { // this.execute4(elements);
throw new IllegalStateException(); // } else {
} // throw new IllegalStateException();
// }
// boolean inputIsEmpty = elements.isEmpty();
this.execute4(elements);
this.outputElements.commit(); this.outputElements.commit();
// boolean outputIsEmpty = this.outputElements.isEmpty();
//
// if (inputIsEmpty && outputIsEmpty) {
// this.disable();
// }
return this.outputElements; return this.outputElements;
} }
...@@ -46,7 +65,43 @@ public abstract class AbstractStage<I, O> implements Stage<I, O> { ...@@ -46,7 +65,43 @@ public abstract class AbstractStage<I, O> implements Stage<I, O> {
protected abstract void execute4(CommittableQueue<I> elements); protected abstract void execute4(CommittableQueue<I> elements);
void send(final O element) { protected final void send(final O element) {
this.outputElements.addToTailUncommitted(element); this.outputElements.addToTailUncommitted(element);
} }
@Override
public SchedulingInformation getSchedulingInformation() {
return this.schedulingInformation;
}
public void disable() {
this.schedulingInformation.setActive(false);
this.fireOnDisable();
}
private void fireOnDisable() {
if (this.listener != null) {
this.listener.onDisable(this, this.index);
}
}
@Override
public Stage getParentStage() {
return this.parentStage;
}
@Override
public void setParentStage(final Stage parentStage, final int index) {
this.index = index;
this.parentStage = parentStage;
}
public OnDisableListener getListener() {
return this.listener;
}
public void setListener(final OnDisableListener listener) {
this.listener = listener;
}
} }
...@@ -24,7 +24,7 @@ import teetime.util.list.CommittableQueue; ...@@ -24,7 +24,7 @@ import teetime.util.list.CommittableQueue;
* *
* @since 1.10 * @since 1.10
*/ */
public class CollectorSink<T> extends AbstractStage<T, Void> { public class CollectorSink<T> extends ConsumerStage<T, Void> {
private static final int THRESHOLD = 10000; private static final int THRESHOLD = 10000;
......
package teetime.examples.throughput.methodcall;
import teetime.util.list.CommittableQueue;
public abstract class ConsumerStage<I, O> extends AbstractStage<I, O> {
@Override
public CommittableQueue<O> execute2(final CommittableQueue<I> elements) {
boolean inputIsEmpty = elements.isEmpty();
if (inputIsEmpty) {
this.disable();
return this.outputElements;
}
return super.execute2(elements);
}
}
...@@ -78,7 +78,7 @@ public class MethodCallThroughputAnalysis2 extends Analysis { ...@@ -78,7 +78,7 @@ public class MethodCallThroughputAnalysis2 extends Analysis {
CommittableQueue<Void> inputQueue = new CommittableResizableArrayQueue<Void>(null, 0); CommittableQueue<Void> inputQueue = new CommittableResizableArrayQueue<Void>(null, 0);
CommittableQueue<Void> outputQueue = new CommittableResizableArrayQueue<Void>(null, 0); CommittableQueue<Void> outputQueue = new CommittableResizableArrayQueue<Void>(null, 0);
while (!(outputQueue.getTail() == Stage.END_SIGNAL)) { while (pipeline.getSchedulingInformation().isActive()) {
outputQueue = pipeline.execute2(inputQueue); outputQueue = pipeline.execute2(inputQueue);
} }
} }
......
...@@ -22,7 +22,7 @@ import teetime.util.list.CommittableQueue; ...@@ -22,7 +22,7 @@ import teetime.util.list.CommittableQueue;
* *
* @since 1.10 * @since 1.10
*/ */
public class NoopFilter<T> extends AbstractStage<T, T> { public class NoopFilter<T> extends ConsumerStage<T, T> {
public T execute(final T obj) { public T execute(final T obj) {
return obj; return obj;
......
...@@ -24,7 +24,7 @@ import teetime.util.list.CommittableQueue; ...@@ -24,7 +24,7 @@ import teetime.util.list.CommittableQueue;
* *
* @since 1.10 * @since 1.10
*/ */
public class ObjectProducer<T> extends AbstractStage<Void, T> { public class ObjectProducer<T> extends ProducerStage<Void, T> {
private long numInputObjects; private long numInputObjects;
private Callable<T> inputObjectCreator; private Callable<T> inputObjectCreator;
...@@ -88,7 +88,6 @@ public class ObjectProducer<T> extends AbstractStage<Void, T> { ...@@ -88,7 +88,6 @@ public class ObjectProducer<T> extends AbstractStage<Void, T> {
@Override @Override
protected void execute4(final CommittableQueue<Void> elements) { protected void execute4(final CommittableQueue<Void> elements) {
if (this.numInputObjects == 0) { if (this.numInputObjects == 0) {
this.send((T) END_SIGNAL);
return; return;
} }
......
package teetime.examples.throughput.methodcall;
public interface OnDisableListener {
void onDisable(Stage stage, int index);
}
...@@ -6,12 +6,20 @@ import java.util.List; ...@@ -6,12 +6,20 @@ import java.util.List;
import teetime.util.list.CommittableQueue; import teetime.util.list.CommittableQueue;
public class Pipeline<I, O> implements Stage<I, O> { public class Pipeline<I, O> implements Stage<I, O>, OnDisableListener {
private Stage firstStage; private Stage firstStage;
private final List<Stage> intermediateStages = new LinkedList<Stage>(); private final List<Stage> intermediateStages = new LinkedList<Stage>();
private Stage lastStage; private Stage lastStage;
private final SchedulingInformation schedulingInformation = new SchedulingInformation();
private Stage[] stages;
private Stage parentStage;
private int index;
private int startIndex;
private OnDisableListener listener;
void setFirstStage(final Stage<I, ?> stage) { void setFirstStage(final Stage<I, ?> stage) {
this.firstStage = stage; this.firstStage = stage;
} }
...@@ -30,11 +38,19 @@ public class Pipeline<I, O> implements Stage<I, O> { ...@@ -30,11 +38,19 @@ public class Pipeline<I, O> implements Stage<I, O> {
@Override @Override
public CommittableQueue<O> execute2(final CommittableQueue<I> elements) { public CommittableQueue<O> execute2(final CommittableQueue<I> elements) {
CommittableQueue queue = this.firstStage.execute2(elements); // CommittableQueue queue = this.firstStage.execute2(elements);
for (Stage<?, ?> stage : this.intermediateStages) { // for (Stage<?, ?> stage : this.intermediateStages) {
// queue = stage.execute2(queue);
// }
// return this.lastStage.execute2(queue);
// below is faster than above (probably because of the instantiation of a list iterator in each (!) execution)
CommittableQueue queue = elements;
for (int i = this.startIndex; i < this.stages.length; i++) {
Stage<?, ?> stage = this.stages[i];
queue = stage.execute2(queue); queue = stage.execute2(queue);
} }
return this.lastStage.execute2(queue); return queue;
} }
void onStart() { void onStart() {
...@@ -58,13 +74,73 @@ public class Pipeline<I, O> implements Stage<I, O> { ...@@ -58,13 +74,73 @@ public class Pipeline<I, O> implements Stage<I, O> {
// pipe = new Pipe(); // pipe = new Pipe();
// this.intermediateStages.get(this.intermediateStages.size() - 1).getOutputPort().pipe = pipe; // this.intermediateStages.get(this.intermediateStages.size() - 1).getOutputPort().pipe = pipe;
// this.lastStage.getInputPort().pipe = pipe; // this.lastStage.getInputPort().pipe = pipe;
int size = 1 + this.intermediateStages.size() + 1;
this.stages = new Stage[size];
this.stages[0] = this.firstStage;
for (int i = 0; i < this.intermediateStages.size(); i++) {
Stage<?, ?> stage = this.intermediateStages.get(i);
this.stages[1 + i] = stage;
}
this.stages[this.stages.length - 1] = this.lastStage;
for (int i = 0; i < this.stages.length; i++) {
Stage<?, ?> stage = this.stages[i];
stage.setParentStage(this, i);
stage.setListener(this);
}
} }
// //
// @Override // @Override
// public InputPort getInputPort() { // public InputPort getInputPort() {
// return this.firstStage.getInputPort(); // return this.firstStage.getInputPort();
// } // }
@Override
public SchedulingInformation getSchedulingInformation() {
return this.schedulingInformation;
}
@Override
public Stage getParentStage() {
return this.parentStage;
}
@Override
public void setParentStage(final Stage parentStage, final int index) {
this.index = index;
this.parentStage = parentStage;
}
@Override
public void onDisable(final Stage stage, final int index) {
this.startIndex = index + 1;
if (this.startIndex == this.stages.length) {
this.disable();
}
}
public void disable() {
this.schedulingInformation.setActive(false);
this.fireOnDisable();
}
private void fireOnDisable() {
if (this.listener != null) {
this.listener.onDisable(this, this.index);
}
}
public OnDisableListener getListener() {
return this.listener;
}
@Override
public void setListener(final OnDisableListener listener) {
this.listener = listener;
}
// @Override // @Override
// public OutputPort getOutputPort() { // public OutputPort getOutputPort() {
// return this.lastStage.getOutputPort(); // return this.lastStage.getOutputPort();
......
package teetime.examples.throughput.methodcall;
import teetime.util.list.CommittableQueue;
public abstract class ProducerStage<I, O> extends AbstractStage<I, O> {
@Override
public CommittableQueue<O> execute2(final CommittableQueue<I> elements) {
CommittableQueue<O> outputElements = super.execute2(elements);
boolean outputIsEmpty = outputElements.isEmpty();
if (outputIsEmpty) {
this.disable();
}
return outputElements;
}
}
package teetime.examples.throughput.methodcall;
public class SchedulingInformation {
private boolean active = true;
public boolean isActive() {
return this.active;
}
public void setActive(final boolean active) {
this.active = active;
}
}
...@@ -12,5 +12,13 @@ public interface Stage<I, O> { ...@@ -12,5 +12,13 @@ public interface Stage<I, O> {
CommittableQueue<O> execute2(CommittableQueue<I> elements); CommittableQueue<O> execute2(CommittableQueue<I> elements);
SchedulingInformation getSchedulingInformation();
// OutputPort<O> getOutputPort(); // OutputPort<O> getOutputPort();
Stage getParentStage();
void setParentStage(Stage parentStage, int index);
void setListener(OnDisableListener listener);
} }
...@@ -23,7 +23,7 @@ import teetime.util.list.CommittableQueue; ...@@ -23,7 +23,7 @@ import teetime.util.list.CommittableQueue;
* *
* @since 1.10 * @since 1.10
*/ */
public class StartTimestampFilter extends AbstractStage<TimestampObject, TimestampObject> { public class StartTimestampFilter extends ConsumerStage<TimestampObject, TimestampObject> {
public TimestampObject execute(final TimestampObject obj) { public TimestampObject execute(final TimestampObject obj) {
obj.setStartTimestamp(System.nanoTime()); obj.setStartTimestamp(System.nanoTime());
......
...@@ -23,7 +23,7 @@ import teetime.util.list.CommittableQueue; ...@@ -23,7 +23,7 @@ import teetime.util.list.CommittableQueue;
* *
* @since 1.10 * @since 1.10
*/ */
public class StopTimestampFilter extends AbstractStage<TimestampObject, TimestampObject> { public class StopTimestampFilter extends ConsumerStage<TimestampObject, TimestampObject> {
public TimestampObject execute(final TimestampObject obj) { public TimestampObject execute(final TimestampObject obj) {
obj.setStopTimestamp(System.nanoTime()); obj.setStopTimestamp(System.nanoTime());
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment