Skip to content
Snippets Groups Projects
Commit 3645dd80 authored by Christian Wulf's avatar Christian Wulf
Browse files

added end signal strategy with minimal performance loss

parent e282bfa4
No related branches found
No related tags found
No related merge requests found
Showing
with 229 additions and 35 deletions
......@@ -21,24 +21,24 @@ public class CommittableResizableArrayQueue<T> implements CommittableQueue<T> {
}
@Override
public T get(final int index) {
public final T get(final int index) {
T element = this.elements[index + 1];
return element;
}
@Override
public void addToTailUncommitted(final T element) {
if (this.lastFreeIndexUncommitted == this.capacity()) {
this.grow();
}
// if (this.lastFreeIndexUncommitted == this.capacity()) { // TODO uncomment
// this.grow();
// }
this.put(this.lastFreeIndexUncommitted++, element);
}
@Override
public T removeFromHeadUncommitted() {
if (this.capacity() > this.MIN_CAPACITY && this.lastFreeIndexUncommitted < this.capacity() / 2) {
this.shrink();
}
// if (this.capacity() > this.MIN_CAPACITY && this.lastFreeIndexUncommitted < this.capacity() / 2) { // TODO uncomment
// this.shrink();
// }
T element = this.get(--this.lastFreeIndexUncommitted);
return element;
}
......
......@@ -3,12 +3,20 @@ package teetime.examples.throughput.methodcall;
import teetime.util.list.CommittableQueue;
import teetime.util.list.CommittableResizableArrayQueue;
public abstract class AbstractStage<I, O> implements Stage<I, O> {
abstract class AbstractStage<I, O> implements Stage<I, O> {
// private final InputPort<I> inputPort = new InputPort<I>();
// private final OutputPort<O> outputPort = new OutputPort<O>();
CommittableQueue<O> outputElements = new CommittableResizableArrayQueue<O>(null, 4);
protected final CommittableQueue<O> outputElements = new CommittableResizableArrayQueue<O>(null, 4);
private final SchedulingInformation schedulingInformation = new SchedulingInformation();
private Stage parentStage;
private OnDisableListener listener;
private int index;
// @Override
// public InputPort<I> getInputPort() {
......@@ -21,24 +29,35 @@ public abstract class AbstractStage<I, O> implements Stage<I, O> {
// }
@Override
public final CommittableQueue<O> execute2(final CommittableQueue<I> elements) {
public CommittableQueue<O> execute2(final CommittableQueue<I> elements) {
// pass through the end signal
// InputPort<I> port = this.getInputPort();
if (elements != null) {
// I element = port.read();
I element = elements.getTail();
if (element == END_SIGNAL) {
this.send((O) END_SIGNAL);
} else {
// elements = this.getInputPort().pipe.getElements();
this.execute4(elements);
}
} else {
throw new IllegalStateException();
}
// if (elements != null) {
// // I element = port.read();
// // I element = elements.getTail();
// // if (element == END_SIGNAL) {
// // this.send((O) END_SIGNAL);
// // } else {
// // // elements = this.getInputPort().pipe.getElements();
// // }
//
// this.execute4(elements);
// } else {
// throw new IllegalStateException();
// }
// boolean inputIsEmpty = elements.isEmpty();
this.execute4(elements);
this.outputElements.commit();
// boolean outputIsEmpty = this.outputElements.isEmpty();
//
// if (inputIsEmpty && outputIsEmpty) {
// this.disable();
// }
return this.outputElements;
}
......@@ -46,7 +65,43 @@ public abstract class AbstractStage<I, O> implements Stage<I, O> {
protected abstract void execute4(CommittableQueue<I> elements);
void send(final O element) {
protected final void send(final O element) {
this.outputElements.addToTailUncommitted(element);
}
@Override
public SchedulingInformation getSchedulingInformation() {
return this.schedulingInformation;
}
public void disable() {
this.schedulingInformation.setActive(false);
this.fireOnDisable();
}
private void fireOnDisable() {
if (this.listener != null) {
this.listener.onDisable(this, this.index);
}
}
@Override
public Stage getParentStage() {
return this.parentStage;
}
@Override
public void setParentStage(final Stage parentStage, final int index) {
this.index = index;
this.parentStage = parentStage;
}
public OnDisableListener getListener() {
return this.listener;
}
public void setListener(final OnDisableListener listener) {
this.listener = listener;
}
}
......@@ -24,7 +24,7 @@ import teetime.util.list.CommittableQueue;
*
* @since 1.10
*/
public class CollectorSink<T> extends AbstractStage<T, Void> {
public class CollectorSink<T> extends ConsumerStage<T, Void> {
private static final int THRESHOLD = 10000;
......
package teetime.examples.throughput.methodcall;
import teetime.util.list.CommittableQueue;
public abstract class ConsumerStage<I, O> extends AbstractStage<I, O> {
@Override
public CommittableQueue<O> execute2(final CommittableQueue<I> elements) {
boolean inputIsEmpty = elements.isEmpty();
if (inputIsEmpty) {
this.disable();
return this.outputElements;
}
return super.execute2(elements);
}
}
......@@ -78,7 +78,7 @@ public class MethodCallThroughputAnalysis2 extends Analysis {
CommittableQueue<Void> inputQueue = new CommittableResizableArrayQueue<Void>(null, 0);
CommittableQueue<Void> outputQueue = new CommittableResizableArrayQueue<Void>(null, 0);
while (!(outputQueue.getTail() == Stage.END_SIGNAL)) {
while (pipeline.getSchedulingInformation().isActive()) {
outputQueue = pipeline.execute2(inputQueue);
}
}
......
......@@ -22,7 +22,7 @@ import teetime.util.list.CommittableQueue;
*
* @since 1.10
*/
public class NoopFilter<T> extends AbstractStage<T, T> {
public class NoopFilter<T> extends ConsumerStage<T, T> {
public T execute(final T obj) {
return obj;
......
......@@ -24,7 +24,7 @@ import teetime.util.list.CommittableQueue;
*
* @since 1.10
*/
public class ObjectProducer<T> extends AbstractStage<Void, T> {
public class ObjectProducer<T> extends ProducerStage<Void, T> {
private long numInputObjects;
private Callable<T> inputObjectCreator;
......@@ -88,7 +88,6 @@ public class ObjectProducer<T> extends AbstractStage<Void, T> {
@Override
protected void execute4(final CommittableQueue<Void> elements) {
if (this.numInputObjects == 0) {
this.send((T) END_SIGNAL);
return;
}
......
package teetime.examples.throughput.methodcall;
public interface OnDisableListener {
void onDisable(Stage stage, int index);
}
......@@ -6,12 +6,20 @@ import java.util.List;
import teetime.util.list.CommittableQueue;
public class Pipeline<I, O> implements Stage<I, O> {
public class Pipeline<I, O> implements Stage<I, O>, OnDisableListener {
private Stage firstStage;
private final List<Stage> intermediateStages = new LinkedList<Stage>();
private Stage lastStage;
private final SchedulingInformation schedulingInformation = new SchedulingInformation();
private Stage[] stages;
private Stage parentStage;
private int index;
private int startIndex;
private OnDisableListener listener;
void setFirstStage(final Stage<I, ?> stage) {
this.firstStage = stage;
}
......@@ -30,11 +38,19 @@ public class Pipeline<I, O> implements Stage<I, O> {
@Override
public CommittableQueue<O> execute2(final CommittableQueue<I> elements) {
CommittableQueue queue = this.firstStage.execute2(elements);
for (Stage<?, ?> stage : this.intermediateStages) {
// CommittableQueue queue = this.firstStage.execute2(elements);
// for (Stage<?, ?> stage : this.intermediateStages) {
// queue = stage.execute2(queue);
// }
// return this.lastStage.execute2(queue);
// below is faster than above (probably because of the instantiation of a list iterator in each (!) execution)
CommittableQueue queue = elements;
for (int i = this.startIndex; i < this.stages.length; i++) {
Stage<?, ?> stage = this.stages[i];
queue = stage.execute2(queue);
}
return this.lastStage.execute2(queue);
return queue;
}
void onStart() {
......@@ -58,13 +74,73 @@ public class Pipeline<I, O> implements Stage<I, O> {
// pipe = new Pipe();
// this.intermediateStages.get(this.intermediateStages.size() - 1).getOutputPort().pipe = pipe;
// this.lastStage.getInputPort().pipe = pipe;
int size = 1 + this.intermediateStages.size() + 1;
this.stages = new Stage[size];
this.stages[0] = this.firstStage;
for (int i = 0; i < this.intermediateStages.size(); i++) {
Stage<?, ?> stage = this.intermediateStages.get(i);
this.stages[1 + i] = stage;
}
this.stages[this.stages.length - 1] = this.lastStage;
for (int i = 0; i < this.stages.length; i++) {
Stage<?, ?> stage = this.stages[i];
stage.setParentStage(this, i);
stage.setListener(this);
}
}
//
// @Override
// public InputPort getInputPort() {
// return this.firstStage.getInputPort();
// }
@Override
public SchedulingInformation getSchedulingInformation() {
return this.schedulingInformation;
}
@Override
public Stage getParentStage() {
return this.parentStage;
}
@Override
public void setParentStage(final Stage parentStage, final int index) {
this.index = index;
this.parentStage = parentStage;
}
@Override
public void onDisable(final Stage stage, final int index) {
this.startIndex = index + 1;
if (this.startIndex == this.stages.length) {
this.disable();
}
}
public void disable() {
this.schedulingInformation.setActive(false);
this.fireOnDisable();
}
private void fireOnDisable() {
if (this.listener != null) {
this.listener.onDisable(this, this.index);
}
}
public OnDisableListener getListener() {
return this.listener;
}
@Override
public void setListener(final OnDisableListener listener) {
this.listener = listener;
}
// @Override
// public OutputPort getOutputPort() {
// return this.lastStage.getOutputPort();
......
package teetime.examples.throughput.methodcall;
import teetime.util.list.CommittableQueue;
public abstract class ProducerStage<I, O> extends AbstractStage<I, O> {
@Override
public CommittableQueue<O> execute2(final CommittableQueue<I> elements) {
CommittableQueue<O> outputElements = super.execute2(elements);
boolean outputIsEmpty = outputElements.isEmpty();
if (outputIsEmpty) {
this.disable();
}
return outputElements;
}
}
package teetime.examples.throughput.methodcall;
public class SchedulingInformation {
private boolean active = true;
public boolean isActive() {
return this.active;
}
public void setActive(final boolean active) {
this.active = active;
}
}
......@@ -12,5 +12,13 @@ public interface Stage<I, O> {
CommittableQueue<O> execute2(CommittableQueue<I> elements);
SchedulingInformation getSchedulingInformation();
// OutputPort<O> getOutputPort();
Stage getParentStage();
void setParentStage(Stage parentStage, int index);
void setListener(OnDisableListener listener);
}
......@@ -23,7 +23,7 @@ import teetime.util.list.CommittableQueue;
*
* @since 1.10
*/
public class StartTimestampFilter extends AbstractStage<TimestampObject, TimestampObject> {
public class StartTimestampFilter extends ConsumerStage<TimestampObject, TimestampObject> {
public TimestampObject execute(final TimestampObject obj) {
obj.setStartTimestamp(System.nanoTime());
......
......@@ -23,7 +23,7 @@ import teetime.util.list.CommittableQueue;
*
* @since 1.10
*/
public class StopTimestampFilter extends AbstractStage<TimestampObject, TimestampObject> {
public class StopTimestampFilter extends ConsumerStage<TimestampObject, TimestampObject> {
public TimestampObject execute(final TimestampObject obj) {
obj.setStopTimestamp(System.nanoTime());
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment