diff --git a/src/main/java/teetime/util/list/CommittableQueue.java b/src/main/java/teetime/util/list/CommittableQueue.java index 83f43f1bd7f4caec05c88bd9d366a60f1e19b966..686ef8f8328d700c79116b43eb48f884314c4173 100644 --- a/src/main/java/teetime/util/list/CommittableQueue.java +++ b/src/main/java/teetime/util/list/CommittableQueue.java @@ -24,4 +24,6 @@ public interface CommittableQueue<T> { T getTail(); + T removeFromHead(); + } diff --git a/src/main/java/teetime/util/list/CommittableResizableArrayQueue.java b/src/main/java/teetime/util/list/CommittableResizableArrayQueue.java index 33a78aa4ef7fece974eceded10b07c951116f4dd..a13d43b75147c16ba5e969c9407efc7a3d600b15 100644 --- a/src/main/java/teetime/util/list/CommittableResizableArrayQueue.java +++ b/src/main/java/teetime/util/list/CommittableResizableArrayQueue.java @@ -13,8 +13,8 @@ public class CommittableResizableArrayQueue<T> implements CommittableQueue<T> { public CommittableResizableArrayQueue(final Object emptyObject, final int initialCapacity) { super(); this.arrayPool = new ArrayPool<T>(); - this.MIN_CAPACITY = initialCapacity; - this.elements = this.arrayPool.acquire(initialCapacity); + this.MIN_CAPACITY = initialCapacity + 1; + this.elements = this.arrayPool.acquire(initialCapacity + 1); this.elements[0] = (T) emptyObject; // optimization: avoids the use of an index out-of-bounds check this.clear(); @@ -107,4 +107,11 @@ public class CommittableResizableArrayQueue<T> implements CommittableQueue<T> { private final int capacity() { return this.elements.length - 1; } + + @Override + public T removeFromHead() { + T element = this.removeFromHeadUncommitted(); + this.commit(); + return element; + } } diff --git a/src/test/java/teetime/examples/throughput/methodcall/AbstractStage.java b/src/test/java/teetime/examples/throughput/methodcall/AbstractStage.java index 9814f30ff0dd8a670583f9a3cb95750f7a486352..85bb41dc4e2b4f151ab4f10a97e4cc99ea56d421 100644 --- a/src/test/java/teetime/examples/throughput/methodcall/AbstractStage.java +++ b/src/test/java/teetime/examples/throughput/methodcall/AbstractStage.java @@ -1,36 +1,52 @@ package teetime.examples.throughput.methodcall; +import teetime.util.list.CommittableQueue; +import teetime.util.list.CommittableResizableArrayQueue; + public abstract class AbstractStage<I, O> implements Stage<I, O> { - private final InputPort<I> inputPort = new InputPort<I>(); - private final OutputPort<O> outputPort = new OutputPort<O>(); + // private final InputPort<I> inputPort = new InputPort<I>(); + // private final OutputPort<O> outputPort = new OutputPort<O>(); - @Override - public InputPort<I> getInputPort() { - return this.inputPort; - } + CommittableQueue<O> outputElements = new CommittableResizableArrayQueue<O>(null, 4); - @Override - public OutputPort<O> getOutputPort() { - return this.outputPort; - } + // @Override + // public InputPort<I> getInputPort() { + // return this.inputPort; + // } + + // @Override + // public OutputPort<O> getOutputPort() { + // return this.outputPort; + // } @Override - public final void execute2() { + public final CommittableQueue<O> execute2(final CommittableQueue<I> elements) { + // pass through the end signal - InputPort<I> port = this.getInputPort(); - if (port.pipe != null) { - I element = port.read(); + // InputPort<I> port = this.getInputPort(); + if (elements != null) { + // I element = port.read(); + I element = elements.getTail(); if (element == END_SIGNAL) { - this.getOutputPort().send((O) END_SIGNAL); - return; + this.send((O) END_SIGNAL); + } else { + // elements = this.getInputPort().pipe.getElements(); + this.execute4(elements); } + } else { + throw new IllegalStateException(); } - this.execute3(); + this.outputElements.commit(); + return this.outputElements; } - protected abstract void execute3(); + // protected abstract void execute3(); - // protected abstract O[] execute4(I[] elements, int size); + protected abstract void execute4(CommittableQueue<I> elements); + + void send(final O element) { + this.outputElements.addToTailUncommitted(element); + } } diff --git a/src/test/java/teetime/examples/throughput/methodcall/CollectorSink.java b/src/test/java/teetime/examples/throughput/methodcall/CollectorSink.java index dcca4f35b0eb783f41709f0361eb55c9bb97e1c4..e225181d76e45d5dd024dacfecd827f9c34b72dd 100644 --- a/src/test/java/teetime/examples/throughput/methodcall/CollectorSink.java +++ b/src/test/java/teetime/examples/throughput/methodcall/CollectorSink.java @@ -17,12 +17,14 @@ package teetime.examples.throughput.methodcall; import java.util.List; +import teetime.util.list.CommittableQueue; + /** * @author Christian Wulf * * @since 1.10 */ -public class CollectorSink<T> extends AbstractStage<T, T> { +public class CollectorSink<T> extends AbstractStage<T, Void> { private static final int THRESHOLD = 10000; @@ -42,13 +44,23 @@ public class CollectorSink<T> extends AbstractStage<T, T> { } } - @Override - public void execute3() { - T element = this.getInputPort().receive(); + // @Override + // public void execute3() { + // T element = this.getInputPort().receive(); + // + // this.elements.add(element); + // if ((this.elements.size() % THRESHOLD) == 0) { + // System.out.println("size: " + this.elements.size()); + // } + // } + @Override + protected void execute4(final CommittableQueue<T> elements) { + T element = elements.removeFromHead(); this.elements.add(element); if ((this.elements.size() % THRESHOLD) == 0) { System.out.println("size: " + this.elements.size()); } } + } diff --git a/src/test/java/teetime/examples/throughput/methodcall/MethodCallThroughputAnalysis2.java b/src/test/java/teetime/examples/throughput/methodcall/MethodCallThroughputAnalysis2.java index fd5794ec580cd941f318af8ccda03444d991cf6a..0c34dc3f3b6a0d85f780994ffb157931a0085279 100644 --- a/src/test/java/teetime/examples/throughput/methodcall/MethodCallThroughputAnalysis2.java +++ b/src/test/java/teetime/examples/throughput/methodcall/MethodCallThroughputAnalysis2.java @@ -20,6 +20,8 @@ import java.util.concurrent.Callable; import teetime.examples.throughput.TimestampObject; import teetime.framework.core.Analysis; +import teetime.util.list.CommittableQueue; +import teetime.util.list.CommittableResizableArrayQueue; /** * @author Christian Wulf @@ -56,7 +58,7 @@ public class MethodCallThroughputAnalysis2 extends Analysis { final StopTimestampFilter stopTimestampFilter = new StopTimestampFilter(); final CollectorSink<TimestampObject> collectorSink = new CollectorSink<TimestampObject>(this.timestampObjects); - final Pipeline pipeline = new Pipeline(); + final Pipeline<Void, Void> pipeline = new Pipeline<Void, Void>(); pipeline.setFirstStage(objectProducer); pipeline.addIntermediateStage(startTimestampFilter); pipeline.addIntermediateStages(noopFilters); @@ -66,14 +68,18 @@ public class MethodCallThroughputAnalysis2 extends Analysis { // pipeline.getInputPort().pipe = new Pipe<Void>(); // pipeline.getInputPort().pipe.add(new Object()); - pipeline.getOutputPort().pipe = new Pipe<Void>(); + // pipeline.getOutputPort().pipe = new Pipe<Void>(); final Runnable runnable = new Runnable() { @Override public void run() { pipeline.onStart(); - while (!(pipeline.getOutputPort().pipe.readLast() == Stage.END_SIGNAL)) { - pipeline.execute2(); + + CommittableQueue<Void> inputQueue = new CommittableResizableArrayQueue<Void>(null, 0); + CommittableQueue<Void> outputQueue = new CommittableResizableArrayQueue<Void>(null, 0); + + while (!(outputQueue.getTail() == Stage.END_SIGNAL)) { + outputQueue = pipeline.execute2(inputQueue); } } }; diff --git a/src/test/java/teetime/examples/throughput/methodcall/NoopFilter.java b/src/test/java/teetime/examples/throughput/methodcall/NoopFilter.java index db6d09b870a833e38ac916d13f3e8f574fb4a191..0012b0bbf393861cbf1d929d5f1ecf741fa8fc99 100644 --- a/src/test/java/teetime/examples/throughput/methodcall/NoopFilter.java +++ b/src/test/java/teetime/examples/throughput/methodcall/NoopFilter.java @@ -15,6 +15,8 @@ ***************************************************************************/ package teetime.examples.throughput.methodcall; +import teetime.util.list.CommittableQueue; + /** * @author Christian Wulf * @@ -26,9 +28,16 @@ public class NoopFilter<T> extends AbstractStage<T, T> { return obj; } + // @Override + // public void execute3() { + // T element = this.getInputPort().receive(); + // // this.getOutputPort().send(element); + // } + @Override - public void execute3() { - T element = this.getInputPort().receive(); - this.getOutputPort().send(element); + protected void execute4(final CommittableQueue<T> elements) { + T element = elements.removeFromHead(); + this.send(element); } + } diff --git a/src/test/java/teetime/examples/throughput/methodcall/ObjectProducer.java b/src/test/java/teetime/examples/throughput/methodcall/ObjectProducer.java index 0e7b0d772e0541842bdf29b154ceadb0e8d18061..6782445f5918bed21aa9c28272377b253d886570 100644 --- a/src/test/java/teetime/examples/throughput/methodcall/ObjectProducer.java +++ b/src/test/java/teetime/examples/throughput/methodcall/ObjectProducer.java @@ -17,6 +17,8 @@ package teetime.examples.throughput.methodcall; import java.util.concurrent.Callable; +import teetime.util.list.CommittableQueue; + /** * @author Christian Wulf * @@ -66,10 +68,27 @@ public class ObjectProducer<T> extends AbstractStage<Void, T> { this.inputObjectCreator = inputObjectCreator; } + // @Override + // protected void execute3() { + // if (this.numInputObjects == 0) { + // // this.getOutputPort().send((T) END_SIGNAL); + // return; + // } + // + // try { + // final T newObject = this.inputObjectCreator.call(); + // this.numInputObjects--; + // + // // this.getOutputPort().send(newObject); + // } catch (final Exception e) { + // throw new IllegalStateException(e); + // } + // } + @Override - protected void execute3() { + protected void execute4(final CommittableQueue<Void> elements) { if (this.numInputObjects == 0) { - this.getOutputPort().send((T) END_SIGNAL); + this.send((T) END_SIGNAL); return; } @@ -77,7 +96,7 @@ public class ObjectProducer<T> extends AbstractStage<Void, T> { final T newObject = this.inputObjectCreator.call(); this.numInputObjects--; - this.getOutputPort().send(newObject); + this.send(newObject); } catch (final Exception e) { throw new IllegalStateException(e); } diff --git a/src/test/java/teetime/examples/throughput/methodcall/Pipe.java b/src/test/java/teetime/examples/throughput/methodcall/Pipe.java index 0ce187c77c2d10d5bc396b5dec6fa34dabb7aa4b..18afaa07d2e6a1cbf642a6db752d91bf8c49c206 100644 --- a/src/test/java/teetime/examples/throughput/methodcall/Pipe.java +++ b/src/test/java/teetime/examples/throughput/methodcall/Pipe.java @@ -24,4 +24,9 @@ public class Pipe<T> { public T readLast() { return this.elements.getTail(); } + + public CommittableResizableArrayQueue<T> getElements() { + return this.elements; + } + } diff --git a/src/test/java/teetime/examples/throughput/methodcall/Pipeline.java b/src/test/java/teetime/examples/throughput/methodcall/Pipeline.java index 3a1df2f0795f7fbeed8c546614b719c920442d88..1ecc4fdece8c09c162ce8b21ebd4d87af011c090 100644 --- a/src/test/java/teetime/examples/throughput/methodcall/Pipeline.java +++ b/src/test/java/teetime/examples/throughput/methodcall/Pipeline.java @@ -4,13 +4,15 @@ import java.util.Arrays; import java.util.LinkedList; import java.util.List; -public class Pipeline implements Stage { +import teetime.util.list.CommittableQueue; + +public class Pipeline<I, O> implements Stage<I, O> { private Stage firstStage; private final List<Stage> intermediateStages = new LinkedList<Stage>(); private Stage lastStage; - void setFirstStage(final Stage stage) { + void setFirstStage(final Stage<I, ?> stage) { this.firstStage = stage; } @@ -22,17 +24,17 @@ public class Pipeline implements Stage { this.intermediateStages.add(stage); } - void setLastStage(final Stage stage) { + void setLastStage(final Stage<?, O> stage) { this.lastStage = stage; } @Override - public void execute2() { - this.firstStage.execute2(); + public CommittableQueue<O> execute2(final CommittableQueue<I> elements) { + CommittableQueue queue = this.firstStage.execute2(elements); for (Stage<?, ?> stage : this.intermediateStages) { - stage.execute2(); + queue = stage.execute2(queue); } - this.lastStage.execute2(); + return this.lastStage.execute2(queue); } void onStart() { @@ -40,32 +42,32 @@ public class Pipeline implements Stage { // this.outputPort.pipe = pipe; // this.firstStage.getInputPort().pipe = pipe; - Pipe pipe = new Pipe(); - this.firstStage.getOutputPort().pipe = pipe; - this.intermediateStages.get(0).getInputPort().pipe = pipe; - - for (int i = 0; i < this.intermediateStages.size() - 1; i++) { - Stage left = this.intermediateStages.get(i); - Stage right = this.intermediateStages.get(i + 1); - - pipe = new Pipe(); - left.getOutputPort().pipe = pipe; - right.getInputPort().pipe = pipe; - } - - pipe = new Pipe(); - this.intermediateStages.get(this.intermediateStages.size() - 1).getOutputPort().pipe = pipe; - this.lastStage.getInputPort().pipe = pipe; - } - - @Override - public InputPort getInputPort() { - return this.firstStage.getInputPort(); - } - - @Override - public OutputPort getOutputPort() { - return this.lastStage.getOutputPort(); + // Pipe pipe = new Pipe(); + // this.firstStage.getOutputPort().pipe = pipe; + // this.intermediateStages.get(0).getInputPort().pipe = pipe; + // + // for (int i = 0; i < this.intermediateStages.size() - 1; i++) { + // Stage left = this.intermediateStages.get(i); + // Stage right = this.intermediateStages.get(i + 1); + // + // pipe = new Pipe(); + // left.getOutputPort().pipe = pipe; + // right.getInputPort().pipe = pipe; + // } + // + // pipe = new Pipe(); + // this.intermediateStages.get(this.intermediateStages.size() - 1).getOutputPort().pipe = pipe; + // this.lastStage.getInputPort().pipe = pipe; } + // + // @Override + // public InputPort getInputPort() { + // return this.firstStage.getInputPort(); + // } + + // @Override + // public OutputPort getOutputPort() { + // return this.lastStage.getOutputPort(); + // } } diff --git a/src/test/java/teetime/examples/throughput/methodcall/Stage.java b/src/test/java/teetime/examples/throughput/methodcall/Stage.java index d5d604c8c37143833f205c047a42d9e067fbc983..e437606b24b69618e58abf81c0a0a72138b0ce95 100644 --- a/src/test/java/teetime/examples/throughput/methodcall/Stage.java +++ b/src/test/java/teetime/examples/throughput/methodcall/Stage.java @@ -1,12 +1,16 @@ package teetime.examples.throughput.methodcall; +import teetime.util.list.CommittableQueue; + public interface Stage<I, O> { public static final Object END_SIGNAL = new Object(); - void execute2(); + // CommittableQueue<O> execute2(); + + // InputPort<I> getInputPort(); - InputPort<I> getInputPort(); + CommittableQueue<O> execute2(CommittableQueue<I> elements); - OutputPort<O> getOutputPort(); + // OutputPort<O> getOutputPort(); } diff --git a/src/test/java/teetime/examples/throughput/methodcall/StartTimestampFilter.java b/src/test/java/teetime/examples/throughput/methodcall/StartTimestampFilter.java index afa6721b101ea2df38fd31d58008cb80f2533e0f..7de7da5fc4c56cbbf096871ea467b95f11bbc453 100644 --- a/src/test/java/teetime/examples/throughput/methodcall/StartTimestampFilter.java +++ b/src/test/java/teetime/examples/throughput/methodcall/StartTimestampFilter.java @@ -16,6 +16,7 @@ package teetime.examples.throughput.methodcall; import teetime.examples.throughput.TimestampObject; +import teetime.util.list.CommittableQueue; /** * @author Christian Wulf @@ -29,10 +30,17 @@ public class StartTimestampFilter extends AbstractStage<TimestampObject, Timesta return obj; } + // @Override + // public void execute3() { + // TimestampObject element = this.getInputPort().receive(); + // element.setStartTimestamp(System.nanoTime()); + // // this.getOutputPort().send(element); + // } + @Override - public void execute3() { - TimestampObject element = this.getInputPort().receive(); + protected void execute4(final CommittableQueue<TimestampObject> elements) { + TimestampObject element = elements.removeFromHead(); element.setStartTimestamp(System.nanoTime()); - this.getOutputPort().send(element); + this.send(element); } } diff --git a/src/test/java/teetime/examples/throughput/methodcall/StopTimestampFilter.java b/src/test/java/teetime/examples/throughput/methodcall/StopTimestampFilter.java index e791904744367feb970b3c84f5f06b754d5a7fdb..92fe8211c8029e1995b6c54faa3f77e5ae1a9eba 100644 --- a/src/test/java/teetime/examples/throughput/methodcall/StopTimestampFilter.java +++ b/src/test/java/teetime/examples/throughput/methodcall/StopTimestampFilter.java @@ -16,6 +16,7 @@ package teetime.examples.throughput.methodcall; import teetime.examples.throughput.TimestampObject; +import teetime.util.list.CommittableQueue; /** * @author Christian Wulf @@ -29,10 +30,17 @@ public class StopTimestampFilter extends AbstractStage<TimestampObject, Timestam return obj; } + // @Override + // public void execute3() { + // TimestampObject element = this.getInputPort().receive(); + // element.setStopTimestamp(System.nanoTime()); + // // this.getOutputPort().send(element); + // } + @Override - public void execute3() { - TimestampObject element = this.getInputPort().receive(); + protected void execute4(final CommittableQueue<TimestampObject> elements) { + TimestampObject element = elements.removeFromHead(); element.setStopTimestamp(System.nanoTime()); - this.getOutputPort().send(element); + this.send(element); } }