Skip to content
Snippets Groups Projects
Commit b1031f69 authored by Christian Wulf's avatar Christian Wulf
Browse files

changed input/output access from parameter/to return

parent 6cfdb7bd
Branches
Tags
No related merge requests found
Showing
with 175 additions and 77 deletions
...@@ -24,4 +24,6 @@ public interface CommittableQueue<T> { ...@@ -24,4 +24,6 @@ public interface CommittableQueue<T> {
T getTail(); T getTail();
T removeFromHead();
} }
...@@ -13,8 +13,8 @@ public class CommittableResizableArrayQueue<T> implements CommittableQueue<T> { ...@@ -13,8 +13,8 @@ public class CommittableResizableArrayQueue<T> implements CommittableQueue<T> {
public CommittableResizableArrayQueue(final Object emptyObject, final int initialCapacity) { public CommittableResizableArrayQueue(final Object emptyObject, final int initialCapacity) {
super(); super();
this.arrayPool = new ArrayPool<T>(); this.arrayPool = new ArrayPool<T>();
this.MIN_CAPACITY = initialCapacity; this.MIN_CAPACITY = initialCapacity + 1;
this.elements = this.arrayPool.acquire(initialCapacity); this.elements = this.arrayPool.acquire(initialCapacity + 1);
this.elements[0] = (T) emptyObject; // optimization: avoids the use of an index out-of-bounds check this.elements[0] = (T) emptyObject; // optimization: avoids the use of an index out-of-bounds check
this.clear(); this.clear();
...@@ -107,4 +107,11 @@ public class CommittableResizableArrayQueue<T> implements CommittableQueue<T> { ...@@ -107,4 +107,11 @@ public class CommittableResizableArrayQueue<T> implements CommittableQueue<T> {
private final int capacity() { private final int capacity() {
return this.elements.length - 1; return this.elements.length - 1;
} }
@Override
public T removeFromHead() {
T element = this.removeFromHeadUncommitted();
this.commit();
return element;
}
} }
package teetime.examples.throughput.methodcall; package teetime.examples.throughput.methodcall;
import teetime.util.list.CommittableQueue;
import teetime.util.list.CommittableResizableArrayQueue;
public abstract class AbstractStage<I, O> implements Stage<I, O> { public abstract class AbstractStage<I, O> implements Stage<I, O> {
private final InputPort<I> inputPort = new InputPort<I>(); // private final InputPort<I> inputPort = new InputPort<I>();
private final OutputPort<O> outputPort = new OutputPort<O>(); // private final OutputPort<O> outputPort = new OutputPort<O>();
@Override CommittableQueue<O> outputElements = new CommittableResizableArrayQueue<O>(null, 4);
public InputPort<I> getInputPort() {
return this.inputPort;
}
@Override // @Override
public OutputPort<O> getOutputPort() { // public InputPort<I> getInputPort() {
return this.outputPort; // return this.inputPort;
} // }
// @Override
// public OutputPort<O> getOutputPort() {
// return this.outputPort;
// }
@Override @Override
public final void execute2() { public final CommittableQueue<O> execute2(final CommittableQueue<I> elements) {
// pass through the end signal // pass through the end signal
InputPort<I> port = this.getInputPort(); // InputPort<I> port = this.getInputPort();
if (port.pipe != null) { if (elements != null) {
I element = port.read(); // I element = port.read();
I element = elements.getTail();
if (element == END_SIGNAL) { if (element == END_SIGNAL) {
this.getOutputPort().send((O) END_SIGNAL); this.send((O) END_SIGNAL);
return; } else {
// elements = this.getInputPort().pipe.getElements();
this.execute4(elements);
} }
} else {
throw new IllegalStateException();
} }
this.execute3(); this.outputElements.commit();
return this.outputElements;
} }
protected abstract void execute3(); // protected abstract void execute3();
// protected abstract O[] execute4(I[] elements, int size); protected abstract void execute4(CommittableQueue<I> elements);
void send(final O element) {
this.outputElements.addToTailUncommitted(element);
}
} }
...@@ -17,12 +17,14 @@ package teetime.examples.throughput.methodcall; ...@@ -17,12 +17,14 @@ package teetime.examples.throughput.methodcall;
import java.util.List; import java.util.List;
import teetime.util.list.CommittableQueue;
/** /**
* @author Christian Wulf * @author Christian Wulf
* *
* @since 1.10 * @since 1.10
*/ */
public class CollectorSink<T> extends AbstractStage<T, T> { public class CollectorSink<T> extends AbstractStage<T, Void> {
private static final int THRESHOLD = 10000; private static final int THRESHOLD = 10000;
...@@ -42,13 +44,23 @@ public class CollectorSink<T> extends AbstractStage<T, T> { ...@@ -42,13 +44,23 @@ public class CollectorSink<T> extends AbstractStage<T, T> {
} }
} }
@Override // @Override
public void execute3() { // public void execute3() {
T element = this.getInputPort().receive(); // T element = this.getInputPort().receive();
//
// this.elements.add(element);
// if ((this.elements.size() % THRESHOLD) == 0) {
// System.out.println("size: " + this.elements.size());
// }
// }
@Override
protected void execute4(final CommittableQueue<T> elements) {
T element = elements.removeFromHead();
this.elements.add(element); this.elements.add(element);
if ((this.elements.size() % THRESHOLD) == 0) { if ((this.elements.size() % THRESHOLD) == 0) {
System.out.println("size: " + this.elements.size()); System.out.println("size: " + this.elements.size());
} }
} }
} }
...@@ -20,6 +20,8 @@ import java.util.concurrent.Callable; ...@@ -20,6 +20,8 @@ import java.util.concurrent.Callable;
import teetime.examples.throughput.TimestampObject; import teetime.examples.throughput.TimestampObject;
import teetime.framework.core.Analysis; import teetime.framework.core.Analysis;
import teetime.util.list.CommittableQueue;
import teetime.util.list.CommittableResizableArrayQueue;
/** /**
* @author Christian Wulf * @author Christian Wulf
...@@ -56,7 +58,7 @@ public class MethodCallThroughputAnalysis2 extends Analysis { ...@@ -56,7 +58,7 @@ public class MethodCallThroughputAnalysis2 extends Analysis {
final StopTimestampFilter stopTimestampFilter = new StopTimestampFilter(); final StopTimestampFilter stopTimestampFilter = new StopTimestampFilter();
final CollectorSink<TimestampObject> collectorSink = new CollectorSink<TimestampObject>(this.timestampObjects); final CollectorSink<TimestampObject> collectorSink = new CollectorSink<TimestampObject>(this.timestampObjects);
final Pipeline pipeline = new Pipeline(); final Pipeline<Void, Void> pipeline = new Pipeline<Void, Void>();
pipeline.setFirstStage(objectProducer); pipeline.setFirstStage(objectProducer);
pipeline.addIntermediateStage(startTimestampFilter); pipeline.addIntermediateStage(startTimestampFilter);
pipeline.addIntermediateStages(noopFilters); pipeline.addIntermediateStages(noopFilters);
...@@ -66,14 +68,18 @@ public class MethodCallThroughputAnalysis2 extends Analysis { ...@@ -66,14 +68,18 @@ public class MethodCallThroughputAnalysis2 extends Analysis {
// pipeline.getInputPort().pipe = new Pipe<Void>(); // pipeline.getInputPort().pipe = new Pipe<Void>();
// pipeline.getInputPort().pipe.add(new Object()); // pipeline.getInputPort().pipe.add(new Object());
pipeline.getOutputPort().pipe = new Pipe<Void>(); // pipeline.getOutputPort().pipe = new Pipe<Void>();
final Runnable runnable = new Runnable() { final Runnable runnable = new Runnable() {
@Override @Override
public void run() { public void run() {
pipeline.onStart(); pipeline.onStart();
while (!(pipeline.getOutputPort().pipe.readLast() == Stage.END_SIGNAL)) {
pipeline.execute2(); CommittableQueue<Void> inputQueue = new CommittableResizableArrayQueue<Void>(null, 0);
CommittableQueue<Void> outputQueue = new CommittableResizableArrayQueue<Void>(null, 0);
while (!(outputQueue.getTail() == Stage.END_SIGNAL)) {
outputQueue = pipeline.execute2(inputQueue);
} }
} }
}; };
... ...
......
...@@ -15,6 +15,8 @@ ...@@ -15,6 +15,8 @@
***************************************************************************/ ***************************************************************************/
package teetime.examples.throughput.methodcall; package teetime.examples.throughput.methodcall;
import teetime.util.list.CommittableQueue;
/** /**
* @author Christian Wulf * @author Christian Wulf
* *
...@@ -26,9 +28,16 @@ public class NoopFilter<T> extends AbstractStage<T, T> { ...@@ -26,9 +28,16 @@ public class NoopFilter<T> extends AbstractStage<T, T> {
return obj; return obj;
} }
// @Override
// public void execute3() {
// T element = this.getInputPort().receive();
// // this.getOutputPort().send(element);
// }
@Override @Override
public void execute3() { protected void execute4(final CommittableQueue<T> elements) {
T element = this.getInputPort().receive(); T element = elements.removeFromHead();
this.getOutputPort().send(element); this.send(element);
} }
} }
...@@ -17,6 +17,8 @@ package teetime.examples.throughput.methodcall; ...@@ -17,6 +17,8 @@ package teetime.examples.throughput.methodcall;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import teetime.util.list.CommittableQueue;
/** /**
* @author Christian Wulf * @author Christian Wulf
* *
...@@ -66,10 +68,27 @@ public class ObjectProducer<T> extends AbstractStage<Void, T> { ...@@ -66,10 +68,27 @@ public class ObjectProducer<T> extends AbstractStage<Void, T> {
this.inputObjectCreator = inputObjectCreator; this.inputObjectCreator = inputObjectCreator;
} }
// @Override
// protected void execute3() {
// if (this.numInputObjects == 0) {
// // this.getOutputPort().send((T) END_SIGNAL);
// return;
// }
//
// try {
// final T newObject = this.inputObjectCreator.call();
// this.numInputObjects--;
//
// // this.getOutputPort().send(newObject);
// } catch (final Exception e) {
// throw new IllegalStateException(e);
// }
// }
@Override @Override
protected void execute3() { protected void execute4(final CommittableQueue<Void> elements) {
if (this.numInputObjects == 0) { if (this.numInputObjects == 0) {
this.getOutputPort().send((T) END_SIGNAL); this.send((T) END_SIGNAL);
return; return;
} }
...@@ -77,7 +96,7 @@ public class ObjectProducer<T> extends AbstractStage<Void, T> { ...@@ -77,7 +96,7 @@ public class ObjectProducer<T> extends AbstractStage<Void, T> {
final T newObject = this.inputObjectCreator.call(); final T newObject = this.inputObjectCreator.call();
this.numInputObjects--; this.numInputObjects--;
this.getOutputPort().send(newObject); this.send(newObject);
} catch (final Exception e) { } catch (final Exception e) {
throw new IllegalStateException(e); throw new IllegalStateException(e);
} }
... ...
......
...@@ -24,4 +24,9 @@ public class Pipe<T> { ...@@ -24,4 +24,9 @@ public class Pipe<T> {
public T readLast() { public T readLast() {
return this.elements.getTail(); return this.elements.getTail();
} }
public CommittableResizableArrayQueue<T> getElements() {
return this.elements;
}
} }
...@@ -4,13 +4,15 @@ import java.util.Arrays; ...@@ -4,13 +4,15 @@ import java.util.Arrays;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
public class Pipeline implements Stage { import teetime.util.list.CommittableQueue;
public class Pipeline<I, O> implements Stage<I, O> {
private Stage firstStage; private Stage firstStage;
private final List<Stage> intermediateStages = new LinkedList<Stage>(); private final List<Stage> intermediateStages = new LinkedList<Stage>();
private Stage lastStage; private Stage lastStage;
void setFirstStage(final Stage stage) { void setFirstStage(final Stage<I, ?> stage) {
this.firstStage = stage; this.firstStage = stage;
} }
...@@ -22,17 +24,17 @@ public class Pipeline implements Stage { ...@@ -22,17 +24,17 @@ public class Pipeline implements Stage {
this.intermediateStages.add(stage); this.intermediateStages.add(stage);
} }
void setLastStage(final Stage stage) { void setLastStage(final Stage<?, O> stage) {
this.lastStage = stage; this.lastStage = stage;
} }
@Override @Override
public void execute2() { public CommittableQueue<O> execute2(final CommittableQueue<I> elements) {
this.firstStage.execute2(); CommittableQueue queue = this.firstStage.execute2(elements);
for (Stage<?, ?> stage : this.intermediateStages) { for (Stage<?, ?> stage : this.intermediateStages) {
stage.execute2(); queue = stage.execute2(queue);
} }
this.lastStage.execute2(); return this.lastStage.execute2(queue);
} }
void onStart() { void onStart() {
...@@ -40,32 +42,32 @@ public class Pipeline implements Stage { ...@@ -40,32 +42,32 @@ public class Pipeline implements Stage {
// this.outputPort.pipe = pipe; // this.outputPort.pipe = pipe;
// this.firstStage.getInputPort().pipe = pipe; // this.firstStage.getInputPort().pipe = pipe;
Pipe pipe = new Pipe(); // Pipe pipe = new Pipe();
this.firstStage.getOutputPort().pipe = pipe; // this.firstStage.getOutputPort().pipe = pipe;
this.intermediateStages.get(0).getInputPort().pipe = pipe; // this.intermediateStages.get(0).getInputPort().pipe = pipe;
//
for (int i = 0; i < this.intermediateStages.size() - 1; i++) { // for (int i = 0; i < this.intermediateStages.size() - 1; i++) {
Stage left = this.intermediateStages.get(i); // Stage left = this.intermediateStages.get(i);
Stage right = this.intermediateStages.get(i + 1); // Stage right = this.intermediateStages.get(i + 1);
//
pipe = new Pipe(); // pipe = new Pipe();
left.getOutputPort().pipe = pipe; // left.getOutputPort().pipe = pipe;
right.getInputPort().pipe = pipe; // right.getInputPort().pipe = pipe;
} // }
//
pipe = new Pipe(); // pipe = new Pipe();
this.intermediateStages.get(this.intermediateStages.size() - 1).getOutputPort().pipe = pipe; // this.intermediateStages.get(this.intermediateStages.size() - 1).getOutputPort().pipe = pipe;
this.lastStage.getInputPort().pipe = pipe; // this.lastStage.getInputPort().pipe = pipe;
}
@Override
public InputPort getInputPort() {
return this.firstStage.getInputPort();
}
@Override
public OutputPort getOutputPort() {
return this.lastStage.getOutputPort();
} }
//
// @Override
// public InputPort getInputPort() {
// return this.firstStage.getInputPort();
// }
// @Override
// public OutputPort getOutputPort() {
// return this.lastStage.getOutputPort();
// }
} }
package teetime.examples.throughput.methodcall; package teetime.examples.throughput.methodcall;
import teetime.util.list.CommittableQueue;
public interface Stage<I, O> { public interface Stage<I, O> {
public static final Object END_SIGNAL = new Object(); public static final Object END_SIGNAL = new Object();
void execute2(); // CommittableQueue<O> execute2();
// InputPort<I> getInputPort();
InputPort<I> getInputPort(); CommittableQueue<O> execute2(CommittableQueue<I> elements);
OutputPort<O> getOutputPort(); // OutputPort<O> getOutputPort();
} }
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
package teetime.examples.throughput.methodcall; package teetime.examples.throughput.methodcall;
import teetime.examples.throughput.TimestampObject; import teetime.examples.throughput.TimestampObject;
import teetime.util.list.CommittableQueue;
/** /**
* @author Christian Wulf * @author Christian Wulf
...@@ -29,10 +30,17 @@ public class StartTimestampFilter extends AbstractStage<TimestampObject, Timesta ...@@ -29,10 +30,17 @@ public class StartTimestampFilter extends AbstractStage<TimestampObject, Timesta
return obj; return obj;
} }
// @Override
// public void execute3() {
// TimestampObject element = this.getInputPort().receive();
// element.setStartTimestamp(System.nanoTime());
// // this.getOutputPort().send(element);
// }
@Override @Override
public void execute3() { protected void execute4(final CommittableQueue<TimestampObject> elements) {
TimestampObject element = this.getInputPort().receive(); TimestampObject element = elements.removeFromHead();
element.setStartTimestamp(System.nanoTime()); element.setStartTimestamp(System.nanoTime());
this.getOutputPort().send(element); this.send(element);
} }
} }
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
package teetime.examples.throughput.methodcall; package teetime.examples.throughput.methodcall;
import teetime.examples.throughput.TimestampObject; import teetime.examples.throughput.TimestampObject;
import teetime.util.list.CommittableQueue;
/** /**
* @author Christian Wulf * @author Christian Wulf
...@@ -29,10 +30,17 @@ public class StopTimestampFilter extends AbstractStage<TimestampObject, Timestam ...@@ -29,10 +30,17 @@ public class StopTimestampFilter extends AbstractStage<TimestampObject, Timestam
return obj; return obj;
} }
// @Override
// public void execute3() {
// TimestampObject element = this.getInputPort().receive();
// element.setStopTimestamp(System.nanoTime());
// // this.getOutputPort().send(element);
// }
@Override @Override
public void execute3() { protected void execute4(final CommittableQueue<TimestampObject> elements) {
TimestampObject element = this.getInputPort().receive(); TimestampObject element = elements.removeFromHead();
element.setStopTimestamp(System.nanoTime()); element.setStopTimestamp(System.nanoTime());
this.getOutputPort().send(element); this.send(element);
} }
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment