From 92a3cf10a71919ba08771e48b9d43a37a232fce8 Mon Sep 17 00:00:00 2001 From: Christian Wulf <chw@informatik.uni-kiel.de> Date: Thu, 19 Jun 2014 09:36:35 +0200 Subject: [PATCH] fixing broken experiments --- .../framework/core/AbstractStage.java | 84 +------- .../framework/core/ConsumerStage.java | 15 -- .../methodcall/framework/core/Pipeline.java | 103 ++------- .../framework/core/ProducerStage.java | 11 +- .../methodcall/framework/core/Stage.java | 2 +- .../framework/core/StageWithPort.java | 14 -- .../methodcall/stage/CollectorSink.java | 15 -- .../variant/methodcall/stage/EndStage.java | 44 +--- .../variant/methodcall/stage/NoopFilter.java | 14 +- .../methodcall/stage/ObjectProducer.java | 31 +-- .../stage/StartTimestampFilter.java | 15 +- .../methodcall/stage/StopTimestampFilter.java | 9 +- .../framework/core/AbstractStage.java | 141 +++++++++++++ .../framework/core/ConsumerStage.java | 41 ++++ .../framework/core/InputPort.java | 4 +- .../framework/core/OutputPort.java | 4 +- .../framework/core/Pipeline.java | 196 ++++++++++++++++++ .../framework/core/ProducerStage.java | 37 ++++ .../framework/core/RunnableStage.java | 2 +- .../framework/core/StageWithPort.java | 38 ++++ .../framework/core/pipe/AbstractPipe.java | 2 +- .../framework/core/pipe/IPipe.java | 2 +- .../core/pipe/OrderedGrowableArrayPipe.java | 6 +- .../core/pipe/OrderedGrowablePipe.java | 6 +- .../framework/core/pipe/Pipe.java | 6 +- .../core/pipe/SingleElementPipe.java | 6 +- .../framework/core/pipe/SpScPipe.java | 6 +- .../core/pipe/UnorderedGrowablePipe.java | 6 +- .../stage/Clock.java | 10 +- .../stage/CollectorSink.java | 61 ++++++ .../stage/Delay.java | 12 +- .../stage/Distributor.java | 12 +- .../methodcallWithPorts/stage/EndStage.java | 86 ++++++++ .../stage/Merger.java | 12 +- .../methodcallWithPorts/stage/NoopFilter.java | 45 ++++ .../stage/ObjectProducer.java | 99 +++++++++ .../stage/Relay.java | 10 +- .../stage/Sink.java | 10 +- .../stage/StartTimestampFilter.java | 47 +++++ .../stage/StopTimestampFilter.java | 47 +++++ .../MethodCallThroughputAnalysis1.java | 7 +- .../MethodCallThroughputAnalysis2.java | 5 - ...odCallThoughputTimestampAnalysis9Test.java | 2 +- .../MethodCallThroughputAnalysis9.java | 2 +- ...dCallThoughputTimestampAnalysis10Test.java | 2 +- .../MethodCallThroughputAnalysis10.java | 2 +- ...dCallThoughputTimestampAnalysis11Test.java | 2 +- .../MethodCallThroughputAnalysis11.java | 18 +- src/test/java/what tests work.txt | 17 ++ 49 files changed, 963 insertions(+), 405 deletions(-) delete mode 100644 src/main/java/teetime/variant/methodcall/framework/core/StageWithPort.java create mode 100644 src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java create mode 100644 src/main/java/teetime/variant/methodcallWithPorts/framework/core/ConsumerStage.java rename src/main/java/teetime/variant/{methodcall => methodcallWithPorts}/framework/core/InputPort.java (72%) rename src/main/java/teetime/variant/{methodcall => methodcallWithPorts}/framework/core/OutputPort.java (65%) create mode 100644 src/main/java/teetime/variant/methodcallWithPorts/framework/core/Pipeline.java create mode 100644 src/main/java/teetime/variant/methodcallWithPorts/framework/core/ProducerStage.java rename src/main/java/teetime/variant/{methodcall => methodcallWithPorts}/framework/core/RunnableStage.java (84%) create mode 100644 src/main/java/teetime/variant/methodcallWithPorts/framework/core/StageWithPort.java rename src/main/java/teetime/variant/{methodcall => methodcallWithPorts}/framework/core/pipe/AbstractPipe.java (85%) rename src/main/java/teetime/variant/{methodcall => methodcallWithPorts}/framework/core/pipe/IPipe.java (80%) rename src/main/java/teetime/variant/{methodcall => methodcallWithPorts}/framework/core/pipe/OrderedGrowableArrayPipe.java (83%) rename src/main/java/teetime/variant/{methodcall => methodcallWithPorts}/framework/core/pipe/OrderedGrowablePipe.java (80%) rename src/main/java/teetime/variant/{methodcall => methodcallWithPorts}/framework/core/pipe/Pipe.java (86%) rename src/main/java/teetime/variant/{methodcall => methodcallWithPorts}/framework/core/pipe/SingleElementPipe.java (81%) rename src/main/java/teetime/variant/{methodcall => methodcallWithPorts}/framework/core/pipe/SpScPipe.java (78%) rename src/main/java/teetime/variant/{methodcall => methodcallWithPorts}/framework/core/pipe/UnorderedGrowablePipe.java (92%) rename src/main/java/teetime/variant/{methodcall => methodcallWithPorts}/stage/Clock.java (85%) create mode 100644 src/main/java/teetime/variant/methodcallWithPorts/stage/CollectorSink.java rename src/main/java/teetime/variant/{methodcall => methodcallWithPorts}/stage/Delay.java (82%) rename src/main/java/teetime/variant/{methodcall => methodcallWithPorts}/stage/Distributor.java (87%) create mode 100644 src/main/java/teetime/variant/methodcallWithPorts/stage/EndStage.java rename src/main/java/teetime/variant/{methodcall => methodcallWithPorts}/stage/Merger.java (86%) create mode 100644 src/main/java/teetime/variant/methodcallWithPorts/stage/NoopFilter.java create mode 100644 src/main/java/teetime/variant/methodcallWithPorts/stage/ObjectProducer.java rename src/main/java/teetime/variant/{methodcall => methodcallWithPorts}/stage/Relay.java (80%) rename src/main/java/teetime/variant/{methodcall => methodcallWithPorts}/stage/Sink.java (57%) create mode 100644 src/main/java/teetime/variant/methodcallWithPorts/stage/StartTimestampFilter.java create mode 100644 src/main/java/teetime/variant/methodcallWithPorts/stage/StopTimestampFilter.java rename src/test/java/teetime/variant/{methodcall => methodcallWithPorts}/examples/experiment09/MethodCallThoughputTimestampAnalysis9Test.java (97%) rename src/test/java/teetime/variant/{methodcall => methodcallWithPorts}/examples/experiment09/MethodCallThroughputAnalysis9.java (98%) rename src/test/java/teetime/variant/{methodcall => methodcallWithPorts}/examples/experiment10/MethodCallThoughputTimestampAnalysis10Test.java (97%) rename src/test/java/teetime/variant/{methodcall => methodcallWithPorts}/examples/experiment10/MethodCallThroughputAnalysis10.java (98%) rename src/test/java/teetime/variant/{methodcall => methodcallWithPorts}/examples/experiment11/MethodCallThoughputTimestampAnalysis11Test.java (97%) rename src/test/java/teetime/variant/{methodcall => methodcallWithPorts}/examples/experiment11/MethodCallThroughputAnalysis11.java (87%) create mode 100644 src/test/java/what tests work.txt diff --git a/src/main/java/teetime/variant/methodcall/framework/core/AbstractStage.java b/src/main/java/teetime/variant/methodcall/framework/core/AbstractStage.java index 947f68b5..3fa3175e 100644 --- a/src/main/java/teetime/variant/methodcall/framework/core/AbstractStage.java +++ b/src/main/java/teetime/variant/methodcall/framework/core/AbstractStage.java @@ -1,20 +1,17 @@ package teetime.variant.methodcall.framework.core; import teetime.util.list.CommittableQueue; +import teetime.util.list.CommittableResizableArrayQueue; -public abstract class AbstractStage<I, O> implements StageWithPort<I, O> { +public abstract class AbstractStage<I, O> implements Stage<I, O> { - private final InputPort<I> inputPort = new InputPort<I>(); - private final OutputPort<O> outputPort = new OutputPort<O>(); - - // protected final CommittableQueue<O> outputElements = new CommittableResizableArrayQueue<O>(null, 4); - // private final CommittableQueue<O> outputElements = null; + protected final CommittableQueue<O> outputElements = new CommittableResizableArrayQueue<O>(null, 4); private Stage<?, ?> parentStage; private int index; - private StageWithPort<?, ?> successor; + private Stage<?, ?> successor; private boolean reschedulable; @@ -24,7 +21,7 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> { if (result == null) { return null; } - StageWithPort<?, ?> next = this.next(); + Stage<?, ?> next = this.next(); // if (next != null) { // return next.executeRecursively(result); // } else { @@ -33,16 +30,6 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> { return next.executeRecursively(result); } - @Override - public InputPort<I> getInputPort() { - return this.inputPort; - } - - @Override - public OutputPort<O> getOutputPort() { - return this.outputPort; - } - @Override public CommittableQueue<O> execute2(final CommittableQueue<I> elements) { // pass through the end signal @@ -63,61 +50,17 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> { this.execute4(elements); - // this.outputElements.commit(); + this.outputElements.commit(); - // return this.outputElements; - return null; + return this.outputElements; } - // @Override - // public void executeWithPorts() { - // CommittableQueue execute; - // do { - // // execute = this.next().execute2(this.outputElements); - // // execute = this.next().execute2(this.getOutputPort().pipe.getElements()); - // this.next().executeWithPorts(); - // } while (this.next().isReschedulable()); - // } - protected abstract void execute4(CommittableQueue<I> elements); - protected abstract void execute5(I element); - - protected final void send(final O element) { - // this.outputElements.addToTailUncommitted(element); - // this.outputElements.commit(); - - this.getOutputPort().send(element); - - // CommittableQueue execute; - - StageWithPort<?, ?> next = this.next(); - do { - // execute = this.next().execute2(this.outputElements); - // execute = this.next().execute2(this.getOutputPort().pipe.getElements()); - next.executeWithPorts(); - // System.out.println("Executed " + this.next().getClass().getSimpleName()); - } while (next.isReschedulable()); - // } while (this.next().getInputPort().pipe.size() > 0); - // } while (execute.size() > 0); + protected void send(final O element) { + this.outputElements.addToTailUncommitted(element); } - // @Override - // public SchedulingInformation getSchedulingInformation() { - // return this.schedulingInformation; - // } - - // public void disable() { - // this.schedulingInformation.setActive(false); - // this.fireOnDisable(); - // } - - // private void fireOnDisable() { - // if (this.listener != null) { - // this.listener.onDisable(this, this.index); - // } - // } - @Override public void onStart() { // empty default implementation @@ -135,20 +78,15 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> { } @Override - public StageWithPort<?, ?> next() { + public Stage<?, ?> next() { return this.successor; } @Override - public void setSuccessor(final StageWithPort<?, ?> successor) { + public void setSuccessor(final Stage<? super O, ?> successor) { this.successor = successor; } - @Override - public void setSuccessor(final Stage<?, ?> successor) { - throw new IllegalStateException(); - } - @Override public boolean isReschedulable() { return this.reschedulable; diff --git a/src/main/java/teetime/variant/methodcall/framework/core/ConsumerStage.java b/src/main/java/teetime/variant/methodcall/framework/core/ConsumerStage.java index ea72ab47..2ea5b659 100644 --- a/src/main/java/teetime/variant/methodcall/framework/core/ConsumerStage.java +++ b/src/main/java/teetime/variant/methodcall/framework/core/ConsumerStage.java @@ -18,21 +18,6 @@ public abstract class ConsumerStage<I, O> extends AbstractStage<I, O> { return output; } - @Override - public void executeWithPorts() { - I element = this.getInputPort().receive(); - - this.setReschedulable(this.getInputPort().getPipe().size() > 0); - - this.execute5(element); - - // this.send(result); - - // if (!this.getOutputPort().pipe.isEmpty()) { - // super.executeWithPorts(); - // } - } - @Override public void onIsPipelineHead() { // do nothing diff --git a/src/main/java/teetime/variant/methodcall/framework/core/Pipeline.java b/src/main/java/teetime/variant/methodcall/framework/core/Pipeline.java index 26567378..0309da6e 100644 --- a/src/main/java/teetime/variant/methodcall/framework/core/Pipeline.java +++ b/src/main/java/teetime/variant/methodcall/framework/core/Pipeline.java @@ -7,33 +7,31 @@ import java.util.List; import teetime.util.list.CommittableQueue; import teetime.variant.methodcall.stage.EndStage; -public class Pipeline<I, O> implements StageWithPort<I, O> { +public class Pipeline<I, O> implements Stage<I, O> { - private StageWithPort<I, ?> firstStage; - private final List<StageWithPort<?, ?>> intermediateStages = new LinkedList<StageWithPort<?, ?>>(); - private StageWithPort<?, O> lastStage; + private Stage<I, ?> firstStage; + private final List<Stage<?, ?>> intermediateStages = new LinkedList<Stage<?, ?>>(); + private Stage<?, O> lastStage; - private StageWithPort<?, ?>[] stages; + private Stage<?, ?>[] stages; private Stage<?, ?> parentStage; private int index; - private int startIndex; private boolean reschedulable; - private int firstStageIndex; - public void setFirstStage(final StageWithPort<I, ?> stage) { + public void setFirstStage(final Stage<I, ?> stage) { this.firstStage = stage; } - public void addIntermediateStages(final StageWithPort<?, ?>... stages) { + public void addIntermediateStages(final Stage<?, ?>... stages) { this.intermediateStages.addAll(Arrays.asList(stages)); } - public void addIntermediateStage(final StageWithPort stage) { + public void addIntermediateStage(final Stage<?, ?> stage) { this.intermediateStages.add(stage); } - public void setLastStage(final StageWithPort<?, O> stage) { + public void setLastStage(final Stage<?, O> stage) { this.lastStage = stage; } @@ -53,34 +51,10 @@ public class Pipeline<I, O> implements StageWithPort<I, O> { // queue = stage.execute2(queue); // } - this.firstStage.execute2(elements); + queue = this.firstStage.execute2(elements); this.setReschedulable(this.firstStage.isReschedulable()); - return queue; - } - - @Override - public void executeWithPorts() { - StageWithPort<?, ?> firstStage = this.stages[this.firstStageIndex]; - firstStage.executeWithPorts(); - this.updateRescheduable(firstStage); - // this.setReschedulable(stage.isReschedulable()); - } - - private final void updateRescheduable(final Stage<?, ?> stage) { - Stage<?, ?> currentStage = stage; - while (!currentStage.isReschedulable()) { - this.firstStageIndex++; - currentStage = currentStage.next(); - if (currentStage == null) { // loop reaches the last stage - this.setReschedulable(false); - this.cleanUp(); - return; - } - currentStage.onIsPipelineHead(); - // System.out.println("firstStageIndex: " + this.firstStageIndex + ", class:" + stage.getClass().getSimpleName()); - } - this.setReschedulable(true); + return queue; } @Override @@ -90,7 +64,7 @@ public class Pipeline<I, O> implements StageWithPort<I, O> { @Override public Object executeRecursively(final Object element) { - return this.stages[0].executeRecursively(element); + return this.firstStage.executeRecursively(element); } @Override @@ -117,44 +91,38 @@ public class Pipeline<I, O> implements StageWithPort<I, O> { // this.lastStage.getInputPort().pipe = pipe; int size = 1 + this.intermediateStages.size() + 1; - this.stages = new StageWithPort[size]; + this.stages = new Stage[size]; this.stages[0] = this.firstStage; for (int i = 0; i < this.intermediateStages.size(); i++) { - StageWithPort<?, ?> stage = this.intermediateStages.get(i); + Stage<?, ?> stage = this.intermediateStages.get(i); this.stages[1 + i] = stage; } this.stages[this.stages.length - 1] = this.lastStage; for (int i = 0; i < this.stages.length; i++) { - StageWithPort<?, ?> stage = this.stages[i]; + // Stage<?, ?> stage = this.stages[i]; // stage.setParentStage(this, i); // stage.setListener(this); } for (int i = 0; i < this.stages.length - 1; i++) { - StageWithPort<?, ?> stage = this.stages[i]; + Stage stage = this.stages[i]; stage.setSuccessor(this.stages[i + 1]); } this.stages[this.stages.length - 1].setSuccessor(new EndStage<Object>()); - for (StageWithPort<?, ?> stage : this.stages) { + for (Stage<?, ?> stage : this.stages) { stage.onStart(); } } - // - // @Override - // public InputPort getInputPort() { - // return this.firstStage.getInputPort(); - // } - @Override - public Stage getParentStage() { + public Stage<?, ?> getParentStage() { return this.parentStage; } @Override - public void setParentStage(final Stage parentStage, final int index) { + public void setParentStage(final Stage<?, ?> parentStage, final int index) { this.index = index; this.parentStage = parentStage; } @@ -170,12 +138,7 @@ public class Pipeline<I, O> implements StageWithPort<I, O> { } @Override - public void setSuccessor(final Stage<?, ?> successor) { - throw new IllegalStateException(); - } - - @Override - public void setSuccessor(final StageWithPort<?, ?> successor) { + public void setSuccessor(final Stage<? super O, ?> successor) { throw new IllegalStateException(); } @@ -188,30 +151,4 @@ public class Pipeline<I, O> implements StageWithPort<I, O> { this.reschedulable = reschedulable; } - @Override - public InputPort<I> getInputPort() { - return this.firstStage.getInputPort(); - } - - @Override - public OutputPort<O> getOutputPort() { - return this.lastStage.getOutputPort(); - } - - // TODO remove since it does not increase performances - private void cleanUp() { - for (int i = 0; i < this.stages.length; i++) { - StageWithPort<?, ?> stage = this.stages[i]; - stage.setParentStage(null, i); - // stage.setListener(null); - stage.setSuccessor(null); - } - - this.firstStage = null; - this.intermediateStages.clear(); - this.lastStage = null; - - System.out.println("cleaned up"); - } - } diff --git a/src/main/java/teetime/variant/methodcall/framework/core/ProducerStage.java b/src/main/java/teetime/variant/methodcall/framework/core/ProducerStage.java index b032b0e3..bd7707be 100644 --- a/src/main/java/teetime/variant/methodcall/framework/core/ProducerStage.java +++ b/src/main/java/teetime/variant/methodcall/framework/core/ProducerStage.java @@ -14,21 +14,12 @@ public abstract class ProducerStage<I, O> extends AbstractStage<I, O> { boolean outputIsEmpty = outputElements.isEmpty(); if (outputIsEmpty) { - this.getOutputPort().getPipe().close(); + // this.getOutputPort().getPipe().close(); } return outputElements; } - @Override - public void executeWithPorts() { - this.execute5(null); - - // if (!this.getOutputPort().pipe.isEmpty()) { - // super.executeWithPorts(); - // } - } - @Override public void onIsPipelineHead() { // do nothing diff --git a/src/main/java/teetime/variant/methodcall/framework/core/Stage.java b/src/main/java/teetime/variant/methodcall/framework/core/Stage.java index 95ac0b02..c8914350 100644 --- a/src/main/java/teetime/variant/methodcall/framework/core/Stage.java +++ b/src/main/java/teetime/variant/methodcall/framework/core/Stage.java @@ -22,7 +22,7 @@ public interface Stage<I, O> { Stage<?, ?> next(); - void setSuccessor(Stage<?, ?> successor); + void setSuccessor(Stage<? super O, ?> successor); boolean isReschedulable(); diff --git a/src/main/java/teetime/variant/methodcall/framework/core/StageWithPort.java b/src/main/java/teetime/variant/methodcall/framework/core/StageWithPort.java deleted file mode 100644 index 9246317e..00000000 --- a/src/main/java/teetime/variant/methodcall/framework/core/StageWithPort.java +++ /dev/null @@ -1,14 +0,0 @@ -package teetime.variant.methodcall.framework.core; - -public interface StageWithPort<I, O> extends Stage<I, O> { - - InputPort<I> getInputPort(); - - OutputPort<O> getOutputPort(); - - void executeWithPorts(); - - // void executeWithPorts(Object element); - - void setSuccessor(StageWithPort<?, ?> successor); -} diff --git a/src/main/java/teetime/variant/methodcall/stage/CollectorSink.java b/src/main/java/teetime/variant/methodcall/stage/CollectorSink.java index 4bc890e0..49ea2928 100644 --- a/src/main/java/teetime/variant/methodcall/stage/CollectorSink.java +++ b/src/main/java/teetime/variant/methodcall/stage/CollectorSink.java @@ -50,24 +50,9 @@ public class CollectorSink<T> extends ConsumerStage<T, Object> { System.out.println("size: " + this.elements.size()); } - // @Override - // public void execute3() { - // T element = this.getInputPort().receive(); - // - // this.elements.add(element); - // if ((this.elements.size() % THRESHOLD) == 0) { - // System.out.println("size: " + this.elements.size()); - // } - // } - @Override protected void execute4(final CommittableQueue<T> elements) { T element = elements.removeFromHead(); - this.execute5(element); - } - - @Override - protected void execute5(final T element) { this.elements.add(element); if ((this.elements.size() % THRESHOLD) == 0) { System.out.println("size: " + this.elements.size()); diff --git a/src/main/java/teetime/variant/methodcall/stage/EndStage.java b/src/main/java/teetime/variant/methodcall/stage/EndStage.java index f3c12625..dc791a44 100644 --- a/src/main/java/teetime/variant/methodcall/stage/EndStage.java +++ b/src/main/java/teetime/variant/methodcall/stage/EndStage.java @@ -5,17 +5,9 @@ import java.util.List; import teetime.util.ConstructorClosure; import teetime.util.list.CommittableQueue; -import teetime.variant.methodcall.framework.core.InputPort; -import teetime.variant.methodcall.framework.core.OutputPort; import teetime.variant.methodcall.framework.core.Stage; -import teetime.variant.methodcall.framework.core.StageWithPort; -public class EndStage<T> implements StageWithPort<T, T> { - - @Override - public Object executeRecursively(final Object element) { - return this.execute(element); - } +public class EndStage<T> implements Stage<T, T> { public int count; public ConstructorClosure<?> closure; @@ -23,7 +15,7 @@ public class EndStage<T> implements StageWithPort<T, T> { @Override public T execute(final Object element) { - return (T) element; + return null; } @Override @@ -44,7 +36,7 @@ public class EndStage<T> implements StageWithPort<T, T> { } @Override - public void setParentStage(final Stage parentStage, final int index) { + public void setParentStage(final Stage<?, ?> parentStage, final int index) { // TODO Auto-generated method stub } @@ -56,48 +48,26 @@ public class EndStage<T> implements StageWithPort<T, T> { } @Override - public void setSuccessor(final Stage<?, ?> successor) { + public void setSuccessor(final Stage<? super T, ?> successor) { // TODO Auto-generated method stub } @Override public boolean isReschedulable() { - // TODO Auto-generated method stub return false; } @Override - public InputPort<T> getInputPort() { - // TODO Auto-generated method stub - return null; - } - - @Override - public OutputPort<T> getOutputPort() { - // TODO Auto-generated method stub - return null; - } - - @Override - public void executeWithPorts() { - // this.getInputPort().receive(); // just consume - // do nothing - // this.count++; - // Object r = this.closure.execute(null); - // this.list.add(r); - } - - @Override - public void setSuccessor(final StageWithPort<?, ?> successor) { + public void onStart() { // TODO Auto-generated method stub } @Override - public void onStart() { + public Object executeRecursively(final Object element) { // TODO Auto-generated method stub - + return null; } } diff --git a/src/main/java/teetime/variant/methodcall/stage/NoopFilter.java b/src/main/java/teetime/variant/methodcall/stage/NoopFilter.java index ace51b02..cdd44237 100644 --- a/src/main/java/teetime/variant/methodcall/stage/NoopFilter.java +++ b/src/main/java/teetime/variant/methodcall/stage/NoopFilter.java @@ -31,21 +31,11 @@ public class NoopFilter<T> extends ConsumerStage<T, T> { return (T) obj; } - // @Override - // public void execute3() { - // T element = this.getInputPort().receive(); - // // this.getOutputPort().send(element); - // } - @Override protected void execute4(final CommittableQueue<T> elements) { T element = elements.removeFromHead(); - this.execute5(element); - } - - @Override - protected void execute5(final T element) { - this.send(element); // "send" calls the next stage and so on + // this.send(element); // "send" calls the next stage and so on + throw new IllegalStateException(); } } diff --git a/src/main/java/teetime/variant/methodcall/stage/ObjectProducer.java b/src/main/java/teetime/variant/methodcall/stage/ObjectProducer.java index efd8cbdc..0a50a69d 100644 --- a/src/main/java/teetime/variant/methodcall/stage/ObjectProducer.java +++ b/src/main/java/teetime/variant/methodcall/stage/ObjectProducer.java @@ -40,7 +40,7 @@ public class ObjectProducer<T> extends ProducerStage<Void, T> { @Override public T execute(final Object element) { // if (this.numInputObjects == 0) { - // this.setReschedulable(false); + // // this.setReschedulable(false); // return null; // } @@ -75,30 +75,8 @@ public class ObjectProducer<T> extends ProducerStage<Void, T> { this.inputObjectCreator = inputObjectCreator; } - // @Override - // protected void execute3() { - // if (this.numInputObjects == 0) { - // // this.getOutputPort().send((T) END_SIGNAL); - // return; - // } - // - // try { - // final T newObject = this.inputObjectCreator.call(); - // this.numInputObjects--; - // - // // this.getOutputPort().send(newObject); - // } catch (final Exception e) { - // throw new IllegalStateException(e); - // } - // } - @Override protected void execute4(final CommittableQueue<Void> elements) { - this.execute5(null); - } - - @Override - protected void execute5(final Void element) { T newObject = null; newObject = this.inputObjectCreator.create(); this.numInputObjects--; @@ -110,12 +88,7 @@ public class ObjectProducer<T> extends ProducerStage<Void, T> { // System.out.println(this.getClass().getSimpleName() + ": sending " + this.numInputObjects); this.send(newObject); + // throw new IllegalStateException(); } - // @Override - // public void onIsPipelineHead() { - // // this.getOutputPort().pipe = null; // no performance increase - // super.onIsPipelineHead(); - // } - } diff --git a/src/main/java/teetime/variant/methodcall/stage/StartTimestampFilter.java b/src/main/java/teetime/variant/methodcall/stage/StartTimestampFilter.java index 3619f24f..754de3dc 100644 --- a/src/main/java/teetime/variant/methodcall/stage/StartTimestampFilter.java +++ b/src/main/java/teetime/variant/methodcall/stage/StartTimestampFilter.java @@ -33,22 +33,11 @@ public class StartTimestampFilter extends ConsumerStage<TimestampObject, Timesta return timestampObject; } - // @Override - // public void execute3() { - // TimestampObject element = this.getInputPort().receive(); - // element.setStartTimestamp(System.nanoTime()); - // // this.getOutputPort().send(element); - // } - @Override protected void execute4(final CommittableQueue<TimestampObject> elements) { TimestampObject element = elements.removeFromHead(); - this.execute5(element); - } - - @Override - protected void execute5(final TimestampObject element) { element.setStartTimestamp(System.nanoTime()); - this.send(element); + // this.send(element); + throw new IllegalStateException(); } } diff --git a/src/main/java/teetime/variant/methodcall/stage/StopTimestampFilter.java b/src/main/java/teetime/variant/methodcall/stage/StopTimestampFilter.java index 07ee037a..b855088a 100644 --- a/src/main/java/teetime/variant/methodcall/stage/StopTimestampFilter.java +++ b/src/main/java/teetime/variant/methodcall/stage/StopTimestampFilter.java @@ -43,12 +43,9 @@ public class StopTimestampFilter extends ConsumerStage<TimestampObject, Timestam @Override protected void execute4(final CommittableQueue<TimestampObject> elements) { TimestampObject element = elements.removeFromHead(); - this.execute5(element); - } - - @Override - protected void execute5(final TimestampObject element) { element.setStopTimestamp(System.nanoTime()); - this.send(element); + // this.send(element); + throw new IllegalStateException(); } + } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java new file mode 100644 index 00000000..ab6fc894 --- /dev/null +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java @@ -0,0 +1,141 @@ +package teetime.variant.methodcallWithPorts.framework.core; + +import teetime.util.list.CommittableQueue; + +public abstract class AbstractStage<I, O> implements StageWithPort<I, O> { + + private final InputPort<I> inputPort = new InputPort<I>(); + private final OutputPort<O> outputPort = new OutputPort<O>(); + + // protected final CommittableQueue<O> outputElements = new CommittableResizableArrayQueue<O>(null, 4); + // private final CommittableQueue<O> outputElements = null; + + private StageWithPort<?, ?> parentStage; + + private int index; + + private StageWithPort<?, ?> successor; + + private boolean reschedulable; + + @Override + public InputPort<I> getInputPort() { + return this.inputPort; + } + + @Override + public OutputPort<O> getOutputPort() { + return this.outputPort; + } + + @Override + public CommittableQueue<O> execute2(final CommittableQueue<I> elements) { + // pass through the end signal + // InputPort<I> port = this.getInputPort(); + // if (elements != null) { + // // I element = port.read(); + // // I element = elements.getTail(); + // // if (element == END_SIGNAL) { + // // this.send((O) END_SIGNAL); + // // } else { + // // // elements = this.getInputPort().pipe.getElements(); + // // } + // + // this.execute4(elements); + // } else { + // throw new IllegalStateException(); + // } + + this.execute4(elements); + + // this.outputElements.commit(); + + // return this.outputElements; + return null; + } + + // @Override + // public void executeWithPorts() { + // CommittableQueue execute; + // do { + // // execute = this.next().execute2(this.outputElements); + // // execute = this.next().execute2(this.getOutputPort().pipe.getElements()); + // this.next().executeWithPorts(); + // } while (this.next().isReschedulable()); + // } + + protected abstract void execute4(CommittableQueue<I> elements); + + protected abstract void execute5(I element); + + protected final void send(final O element) { + // this.outputElements.addToTailUncommitted(element); + // this.outputElements.commit(); + + this.getOutputPort().send(element); + + // CommittableQueue execute; + + StageWithPort<?, ?> next = this.next(); + do { + // execute = this.next().execute2(this.outputElements); + // execute = this.next().execute2(this.getOutputPort().pipe.getElements()); + next.executeWithPorts(); + // System.out.println("Executed " + this.next().getClass().getSimpleName()); + } while (next.isReschedulable()); + // } while (this.next().getInputPort().pipe.size() > 0); + // } while (execute.size() > 0); + } + + // @Override + // public SchedulingInformation getSchedulingInformation() { + // return this.schedulingInformation; + // } + + // public void disable() { + // this.schedulingInformation.setActive(false); + // this.fireOnDisable(); + // } + + // private void fireOnDisable() { + // if (this.listener != null) { + // this.listener.onDisable(this, this.index); + // } + // } + + @Override + public void onStart() { + // empty default implementation + } + + @Override + public StageWithPort<?, ?> getParentStage() { + return this.parentStage; + } + + @Override + public void setParentStage(final StageWithPort<?, ?> parentStage, final int index) { + this.index = index; + this.parentStage = parentStage; + } + + @Override + public StageWithPort<?, ?> next() { + return this.successor; + } + + @Override + public void setSuccessor(final StageWithPort<? super O, ?> successor) { + this.successor = successor; + } + + @Override + public boolean isReschedulable() { + return this.reschedulable; + } + + public void setReschedulable(final boolean reschedulable) { + this.reschedulable = reschedulable; + } + +} diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/ConsumerStage.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/ConsumerStage.java new file mode 100644 index 00000000..1f4f6ab8 --- /dev/null +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/ConsumerStage.java @@ -0,0 +1,41 @@ +package teetime.variant.methodcallWithPorts.framework.core; + +import teetime.util.list.CommittableQueue; + +public abstract class ConsumerStage<I, O> extends AbstractStage<I, O> { + + @Override + public CommittableQueue<O> execute2(final CommittableQueue<I> elements) { + // the following code block does not harm the performance + // boolean inputIsEmpty = elements.isEmpty(); + // if (inputIsEmpty) { + // this.disable(); + // return this.outputElements; + // } + + CommittableQueue<O> output = super.execute2(elements); + this.setReschedulable(!elements.isEmpty()); // costs ~1200 ns on chw-work (not reproducible) + return output; + } + + @Override + public void executeWithPorts() { + I element = this.getInputPort().receive(); + + this.setReschedulable(this.getInputPort().getPipe().size() > 0); + + this.execute5(element); + + // this.send(result); + + // if (!this.getOutputPort().pipe.isEmpty()) { + // super.executeWithPorts(); + // } + } + + @Override + public void onIsPipelineHead() { + // do nothing + } + +} diff --git a/src/main/java/teetime/variant/methodcall/framework/core/InputPort.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/InputPort.java similarity index 72% rename from src/main/java/teetime/variant/methodcall/framework/core/InputPort.java rename to src/main/java/teetime/variant/methodcallWithPorts/framework/core/InputPort.java index f74b2bfc..95f674f2 100644 --- a/src/main/java/teetime/variant/methodcall/framework/core/InputPort.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/InputPort.java @@ -1,6 +1,6 @@ -package teetime.variant.methodcall.framework.core; +package teetime.variant.methodcallWithPorts.framework.core; -import teetime.variant.methodcall.framework.core.pipe.IPipe; +import teetime.variant.methodcallWithPorts.framework.core.pipe.IPipe; public class InputPort<T> { diff --git a/src/main/java/teetime/variant/methodcall/framework/core/OutputPort.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/OutputPort.java similarity index 65% rename from src/main/java/teetime/variant/methodcall/framework/core/OutputPort.java rename to src/main/java/teetime/variant/methodcallWithPorts/framework/core/OutputPort.java index b7612df5..b426506e 100644 --- a/src/main/java/teetime/variant/methodcall/framework/core/OutputPort.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/OutputPort.java @@ -1,6 +1,6 @@ -package teetime.variant.methodcall.framework.core; +package teetime.variant.methodcallWithPorts.framework.core; -import teetime.variant.methodcall.framework.core.pipe.IPipe; +import teetime.variant.methodcallWithPorts.framework.core.pipe.IPipe; public class OutputPort<T> { diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Pipeline.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Pipeline.java new file mode 100644 index 00000000..4ba07ebb --- /dev/null +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Pipeline.java @@ -0,0 +1,196 @@ +package teetime.variant.methodcallWithPorts.framework.core; + +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; + +import teetime.util.list.CommittableQueue; +import teetime.variant.methodcallWithPorts.stage.EndStage; + +public class Pipeline<I, O> implements StageWithPort<I, O> { + + private StageWithPort<I, ?> firstStage; + private final List<StageWithPort<?, ?>> intermediateStages = new LinkedList<StageWithPort<?, ?>>(); + private StageWithPort<?, O> lastStage; + + private StageWithPort<?, ?>[] stages; + private StageWithPort<?, ?> parentStage; + private int index; + private int startIndex; + + private boolean reschedulable; + private int firstStageIndex; + + public void setFirstStage(final StageWithPort<I, ?> stage) { + this.firstStage = stage; + } + + public void addIntermediateStages(final StageWithPort<?, ?>... stages) { + this.intermediateStages.addAll(Arrays.asList(stages)); + } + + public void addIntermediateStage(final StageWithPort stage) { + this.intermediateStages.add(stage); + } + + public void setLastStage(final StageWithPort<?, O> stage) { + this.lastStage = stage; + } + + @Override + public CommittableQueue<O> execute2(final CommittableQueue<I> elements) { + // CommittableQueue queue = this.firstStage.execute2(elements); + // for (Stage<?, ?> stage : this.intermediateStages) { + // queue = stage.execute2(queue); + // } + // return this.lastStage.execute2(queue); + + // below is faster than above (probably because of the instantiation of a list iterator in each (!) execution) + CommittableQueue queue = elements; + + // for (int i = this.startIndex; i < this.stages.length; i++) { + // Stage<?, ?> stage = this.stages[i]; + // queue = stage.execute2(queue); + // } + + this.firstStage.execute2(elements); + this.setReschedulable(this.firstStage.isReschedulable()); + return queue; + } + + @Override + public void executeWithPorts() { + StageWithPort<?, ?> firstStage = this.stages[this.firstStageIndex]; + firstStage.executeWithPorts(); + + this.updateRescheduable(firstStage); + // this.setReschedulable(stage.isReschedulable()); + } + + private final void updateRescheduable(final StageWithPort<?, ?> stage) { + StageWithPort<?, ?> currentStage = stage; + while (!currentStage.isReschedulable()) { + this.firstStageIndex++; + currentStage = currentStage.next(); + if (currentStage == null) { // loop reaches the last stage + this.setReschedulable(false); + this.cleanUp(); + return; + } + currentStage.onIsPipelineHead(); + // System.out.println("firstStageIndex: " + this.firstStageIndex + ", class:" + stage.getClass().getSimpleName()); + } + this.setReschedulable(true); + } + + @Override + public void onIsPipelineHead() { + // do nothing + } + + @Override + public void onStart() { + // Pipe pipe = new Pipe(); + // this.outputPort.pipe = pipe; + // this.firstStage.getInputPort().pipe = pipe; + + // Pipe pipe = new Pipe(); + // this.firstStage.getOutputPort().pipe = pipe; + // this.intermediateStages.get(0).getInputPort().pipe = pipe; + // + // for (int i = 0; i < this.intermediateStages.size() - 1; i++) { + // Stage left = this.intermediateStages.get(i); + // Stage right = this.intermediateStages.get(i + 1); + // + // pipe = new Pipe(); + // left.getOutputPort().pipe = pipe; + // right.getInputPort().pipe = pipe; + // } + // + // pipe = new Pipe(); + // this.intermediateStages.get(this.intermediateStages.size() - 1).getOutputPort().pipe = pipe; + // this.lastStage.getInputPort().pipe = pipe; + + int size = 1 + this.intermediateStages.size() + 1; + this.stages = new StageWithPort[size]; + this.stages[0] = this.firstStage; + for (int i = 0; i < this.intermediateStages.size(); i++) { + StageWithPort<?, ?> stage = this.intermediateStages.get(i); + this.stages[1 + i] = stage; + } + this.stages[this.stages.length - 1] = this.lastStage; + + for (int i = 0; i < this.stages.length; i++) { + // StageWithPort<?, ?> stage = this.stages[i]; + // stage.setParentStage(this, i); + // stage.setListener(this); + } + + for (int i = 0; i < this.stages.length - 1; i++) { + StageWithPort stage = this.stages[i]; + stage.setSuccessor(this.stages[i + 1]); + } + this.stages[this.stages.length - 1].setSuccessor(new EndStage<Object>()); + + for (StageWithPort<?, ?> stage : this.stages) { + stage.onStart(); + } + } + + @Override + public StageWithPort<?, ?> getParentStage() { + return this.parentStage; + } + + @Override + public void setParentStage(final StageWithPort<?, ?> parentStage, final int index) { + this.index = index; + this.parentStage = parentStage; + } + + @Override + public StageWithPort<?, ?> next() { + throw new IllegalStateException(); + } + + @Override + public void setSuccessor(final StageWithPort<? super O, ?> successor) { + throw new IllegalStateException(); + } + + @Override + public boolean isReschedulable() { + return this.reschedulable; + } + + public void setReschedulable(final boolean reschedulable) { + this.reschedulable = reschedulable; + } + + @Override + public InputPort<I> getInputPort() { + return this.firstStage.getInputPort(); + } + + @Override + public OutputPort<O> getOutputPort() { + return this.lastStage.getOutputPort(); + } + + // TODO remove since it does not increase performances + private void cleanUp() { + for (int i = 0; i < this.stages.length; i++) { + StageWithPort<?, ?> stage = this.stages[i]; + stage.setParentStage(null, i); + // stage.setListener(null); + stage.setSuccessor(null); + } + + this.firstStage = null; + this.intermediateStages.clear(); + this.lastStage = null; + + System.out.println("cleaned up"); + } + +} diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/ProducerStage.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/ProducerStage.java new file mode 100644 index 00000000..16dabc4c --- /dev/null +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/ProducerStage.java @@ -0,0 +1,37 @@ +package teetime.variant.methodcallWithPorts.framework.core; + +import teetime.util.list.CommittableQueue; + +public abstract class ProducerStage<I, O> extends AbstractStage<I, O> { + + public ProducerStage() { + this.setReschedulable(true); + } + + @Override + public CommittableQueue<O> execute2(final CommittableQueue<I> elements) { + CommittableQueue<O> outputElements = super.execute2(elements); + + boolean outputIsEmpty = outputElements.isEmpty(); + if (outputIsEmpty) { + this.getOutputPort().getPipe().close(); + } + + return outputElements; + } + + @Override + public void executeWithPorts() { + this.execute5(null); + + // if (!this.getOutputPort().pipe.isEmpty()) { + // super.executeWithPorts(); + // } + } + + @Override + public void onIsPipelineHead() { + // do nothing + } + +} diff --git a/src/main/java/teetime/variant/methodcall/framework/core/RunnableStage.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/RunnableStage.java similarity index 84% rename from src/main/java/teetime/variant/methodcall/framework/core/RunnableStage.java rename to src/main/java/teetime/variant/methodcallWithPorts/framework/core/RunnableStage.java index 61e9c6b4..c7435b03 100644 --- a/src/main/java/teetime/variant/methodcall/framework/core/RunnableStage.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/RunnableStage.java @@ -1,4 +1,4 @@ -package teetime.variant.methodcall.framework.core; +package teetime.variant.methodcallWithPorts.framework.core; public class RunnableStage implements Runnable { diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/StageWithPort.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/StageWithPort.java new file mode 100644 index 00000000..d8ee2839 --- /dev/null +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/StageWithPort.java @@ -0,0 +1,38 @@ +package teetime.variant.methodcallWithPorts.framework.core; + +import teetime.util.list.CommittableQueue; + +public interface StageWithPort<I, O> { + + InputPort<I> getInputPort(); + + OutputPort<O> getOutputPort(); + + void executeWithPorts(); + + // void executeWithPorts(Object element); + + // O execute(Object element); + + // CommittableQueue<O> execute2(); + + CommittableQueue<O> execute2(CommittableQueue<I> elements); + + // SchedulingInformation getSchedulingInformation(); + + StageWithPort<?, ?> getParentStage(); + + void setParentStage(StageWithPort<?, ?> parentStage, int index); + + // void setListener(OnDisableListener listener); + + StageWithPort<?, ?> next(); + + void setSuccessor(StageWithPort<? super O, ?> successor); + + boolean isReschedulable(); + + void onIsPipelineHead(); + + void onStart(); +} diff --git a/src/main/java/teetime/variant/methodcall/framework/core/pipe/AbstractPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/AbstractPipe.java similarity index 85% rename from src/main/java/teetime/variant/methodcall/framework/core/pipe/AbstractPipe.java rename to src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/AbstractPipe.java index 390443af..dc56e3c2 100644 --- a/src/main/java/teetime/variant/methodcall/framework/core/pipe/AbstractPipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/AbstractPipe.java @@ -1,4 +1,4 @@ -package teetime.variant.methodcall.framework.core.pipe; +package teetime.variant.methodcallWithPorts.framework.core.pipe; import java.util.concurrent.atomic.AtomicBoolean; diff --git a/src/main/java/teetime/variant/methodcall/framework/core/pipe/IPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IPipe.java similarity index 80% rename from src/main/java/teetime/variant/methodcall/framework/core/pipe/IPipe.java rename to src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IPipe.java index 3551eb1f..0cb86e45 100644 --- a/src/main/java/teetime/variant/methodcall/framework/core/pipe/IPipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IPipe.java @@ -1,4 +1,4 @@ -package teetime.variant.methodcall.framework.core.pipe; +package teetime.variant.methodcallWithPorts.framework.core.pipe; public interface IPipe<T> { diff --git a/src/main/java/teetime/variant/methodcall/framework/core/pipe/OrderedGrowableArrayPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowableArrayPipe.java similarity index 83% rename from src/main/java/teetime/variant/methodcall/framework/core/pipe/OrderedGrowableArrayPipe.java rename to src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowableArrayPipe.java index 7989ba04..26c8a884 100644 --- a/src/main/java/teetime/variant/methodcall/framework/core/pipe/OrderedGrowableArrayPipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowableArrayPipe.java @@ -1,8 +1,8 @@ -package teetime.variant.methodcall.framework.core.pipe; +package teetime.variant.methodcallWithPorts.framework.core.pipe; import teetime.util.concurrent.workstealing.CircularArray; -import teetime.variant.methodcall.framework.core.InputPort; -import teetime.variant.methodcall.framework.core.OutputPort; +import teetime.variant.methodcallWithPorts.framework.core.InputPort; +import teetime.variant.methodcallWithPorts.framework.core.OutputPort; public class OrderedGrowableArrayPipe<T> extends AbstractPipe<T> { diff --git a/src/main/java/teetime/variant/methodcall/framework/core/pipe/OrderedGrowablePipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowablePipe.java similarity index 80% rename from src/main/java/teetime/variant/methodcall/framework/core/pipe/OrderedGrowablePipe.java rename to src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowablePipe.java index 124d1c7c..4c36c839 100644 --- a/src/main/java/teetime/variant/methodcall/framework/core/pipe/OrderedGrowablePipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowablePipe.java @@ -1,9 +1,9 @@ -package teetime.variant.methodcall.framework.core.pipe; +package teetime.variant.methodcallWithPorts.framework.core.pipe; import java.util.LinkedList; -import teetime.variant.methodcall.framework.core.InputPort; -import teetime.variant.methodcall.framework.core.OutputPort; +import teetime.variant.methodcallWithPorts.framework.core.InputPort; +import teetime.variant.methodcallWithPorts.framework.core.OutputPort; public class OrderedGrowablePipe<T> extends AbstractPipe<T> { diff --git a/src/main/java/teetime/variant/methodcall/framework/core/pipe/Pipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/Pipe.java similarity index 86% rename from src/main/java/teetime/variant/methodcall/framework/core/pipe/Pipe.java rename to src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/Pipe.java index e710e6fa..79321208 100644 --- a/src/main/java/teetime/variant/methodcall/framework/core/pipe/Pipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/Pipe.java @@ -1,8 +1,8 @@ -package teetime.variant.methodcall.framework.core.pipe; +package teetime.variant.methodcallWithPorts.framework.core.pipe; import teetime.util.list.CommittableResizableArrayQueue; -import teetime.variant.methodcall.framework.core.InputPort; -import teetime.variant.methodcall.framework.core.OutputPort; +import teetime.variant.methodcallWithPorts.framework.core.InputPort; +import teetime.variant.methodcallWithPorts.framework.core.OutputPort; public class Pipe<T> extends AbstractPipe<T> { diff --git a/src/main/java/teetime/variant/methodcall/framework/core/pipe/SingleElementPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipe.java similarity index 81% rename from src/main/java/teetime/variant/methodcall/framework/core/pipe/SingleElementPipe.java rename to src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipe.java index c7f8053f..f4dd5c6b 100644 --- a/src/main/java/teetime/variant/methodcall/framework/core/pipe/SingleElementPipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipe.java @@ -1,7 +1,7 @@ -package teetime.variant.methodcall.framework.core.pipe; +package teetime.variant.methodcallWithPorts.framework.core.pipe; -import teetime.variant.methodcall.framework.core.InputPort; -import teetime.variant.methodcall.framework.core.OutputPort; +import teetime.variant.methodcallWithPorts.framework.core.InputPort; +import teetime.variant.methodcallWithPorts.framework.core.OutputPort; //public class SingleElementPipe<T> implements IPipe<T> { public class SingleElementPipe<T> extends AbstractPipe<T> { diff --git a/src/main/java/teetime/variant/methodcall/framework/core/pipe/SpScPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java similarity index 78% rename from src/main/java/teetime/variant/methodcall/framework/core/pipe/SpScPipe.java rename to src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java index 2856c165..54869f0f 100644 --- a/src/main/java/teetime/variant/methodcall/framework/core/pipe/SpScPipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java @@ -1,8 +1,8 @@ -package teetime.variant.methodcall.framework.core.pipe; +package teetime.variant.methodcallWithPorts.framework.core.pipe; import teetime.util.concurrent.spsc.FFBufferOrdered3; -import teetime.variant.methodcall.framework.core.InputPort; -import teetime.variant.methodcall.framework.core.OutputPort; +import teetime.variant.methodcallWithPorts.framework.core.InputPort; +import teetime.variant.methodcallWithPorts.framework.core.OutputPort; public class SpScPipe<T> extends AbstractPipe<T> { diff --git a/src/main/java/teetime/variant/methodcall/framework/core/pipe/UnorderedGrowablePipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/UnorderedGrowablePipe.java similarity index 92% rename from src/main/java/teetime/variant/methodcall/framework/core/pipe/UnorderedGrowablePipe.java rename to src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/UnorderedGrowablePipe.java index fe5359ab..48c2afa1 100644 --- a/src/main/java/teetime/variant/methodcall/framework/core/pipe/UnorderedGrowablePipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/UnorderedGrowablePipe.java @@ -1,7 +1,7 @@ -package teetime.variant.methodcall.framework.core.pipe; +package teetime.variant.methodcallWithPorts.framework.core.pipe; -import teetime.variant.methodcall.framework.core.InputPort; -import teetime.variant.methodcall.framework.core.OutputPort; +import teetime.variant.methodcallWithPorts.framework.core.InputPort; +import teetime.variant.methodcallWithPorts.framework.core.OutputPort; public class UnorderedGrowablePipe<T> extends AbstractPipe<T> { diff --git a/src/main/java/teetime/variant/methodcall/stage/Clock.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/Clock.java similarity index 85% rename from src/main/java/teetime/variant/methodcall/stage/Clock.java rename to src/main/java/teetime/variant/methodcallWithPorts/stage/Clock.java index 7479e205..70ba4d7a 100644 --- a/src/main/java/teetime/variant/methodcall/stage/Clock.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/Clock.java @@ -1,7 +1,7 @@ -package teetime.variant.methodcall.stage; +package teetime.variant.methodcallWithPorts.stage; import teetime.util.list.CommittableQueue; -import teetime.variant.methodcall.framework.core.ProducerStage; +import teetime.variant.methodcallWithPorts.framework.core.ProducerStage; public class Clock extends ProducerStage<Void, Long> { @@ -10,12 +10,6 @@ public class Clock extends ProducerStage<Void, Long> { private long initialDelayInMs; private long intervalDelayInMs; - @Override - public Long execute(final Object element) { - // TODO Auto-generated method stub - return null; - } - @Override protected void execute4(final CommittableQueue<Void> elements) { // TODO Auto-generated method stub diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/CollectorSink.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/CollectorSink.java new file mode 100644 index 00000000..c13fa894 --- /dev/null +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/CollectorSink.java @@ -0,0 +1,61 @@ +/*************************************************************************** + * Copyright 2014 Kieker Project (http://kieker-monitoring.net) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ***************************************************************************/ +package teetime.variant.methodcallWithPorts.stage; + +import java.util.List; + +import teetime.util.list.CommittableQueue; +import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage; + +/** + * @author Christian Wulf + * + * @since 1.10 + */ +public class CollectorSink<T> extends ConsumerStage<T, Object> { + + private static final int THRESHOLD = 10000; + + private final List<T> elements; + + public CollectorSink(final List<T> list) { + this.elements = list; + } + + @Override + public void onIsPipelineHead() { + System.out.println("size: " + this.elements.size()); + } + + @Override + protected void execute4(final CommittableQueue<T> elements) { + T element = elements.removeFromHead(); + this.execute5(element); + } + + @Override + protected void execute5(final T element) { + this.elements.add(element); + if ((this.elements.size() % THRESHOLD) == 0) { + System.out.println("size: " + this.elements.size()); + } + + if (this.elements.size() > 90000) { + // System.out.println("size > 90000: " + this.elements.size()); + } + } + +} diff --git a/src/main/java/teetime/variant/methodcall/stage/Delay.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/Delay.java similarity index 82% rename from src/main/java/teetime/variant/methodcall/stage/Delay.java rename to src/main/java/teetime/variant/methodcallWithPorts/stage/Delay.java index f5103209..d27bc6b6 100644 --- a/src/main/java/teetime/variant/methodcall/stage/Delay.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/Delay.java @@ -1,8 +1,8 @@ -package teetime.variant.methodcall.stage; +package teetime.variant.methodcallWithPorts.stage; import teetime.util.list.CommittableQueue; -import teetime.variant.methodcall.framework.core.AbstractStage; -import teetime.variant.methodcall.framework.core.InputPort; +import teetime.variant.methodcallWithPorts.framework.core.AbstractStage; +import teetime.variant.methodcallWithPorts.framework.core.InputPort; public class Delay<I> extends AbstractStage<I, I> { @@ -37,12 +37,6 @@ public class Delay<I> extends AbstractStage<I, I> { this.setReschedulable(true); } - @Override - public I execute(final Object element) { - // TODO Auto-generated method stub - return null; - } - @Override protected void execute4(final CommittableQueue<I> elements) { // TODO Auto-generated method stub diff --git a/src/main/java/teetime/variant/methodcall/stage/Distributor.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/Distributor.java similarity index 87% rename from src/main/java/teetime/variant/methodcall/stage/Distributor.java rename to src/main/java/teetime/variant/methodcallWithPorts/stage/Distributor.java index ea56a54b..5d8be6cd 100644 --- a/src/main/java/teetime/variant/methodcall/stage/Distributor.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/Distributor.java @@ -1,12 +1,12 @@ -package teetime.variant.methodcall.stage; +package teetime.variant.methodcallWithPorts.stage; import java.util.ArrayList; import java.util.List; import teetime.util.concurrent.spsc.Pow2; import teetime.util.list.CommittableQueue; -import teetime.variant.methodcall.framework.core.AbstractStage; -import teetime.variant.methodcall.framework.core.OutputPort; +import teetime.variant.methodcallWithPorts.framework.core.AbstractStage; +import teetime.variant.methodcallWithPorts.framework.core.OutputPort; public final class Distributor<T> extends AbstractStage<T, T> { @@ -21,12 +21,6 @@ public final class Distributor<T> extends AbstractStage<T, T> { // private int mask; - @Override - public T execute(final Object element) { - // TODO Auto-generated method stub - return null; - } - @Override protected void execute4(final CommittableQueue<T> elements) { // TODO Auto-generated method stub diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/EndStage.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/EndStage.java new file mode 100644 index 00000000..24d2907c --- /dev/null +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/EndStage.java @@ -0,0 +1,86 @@ +package teetime.variant.methodcallWithPorts.stage; + +import java.util.LinkedList; +import java.util.List; + +import teetime.util.ConstructorClosure; +import teetime.util.list.CommittableQueue; +import teetime.variant.methodcallWithPorts.framework.core.InputPort; +import teetime.variant.methodcallWithPorts.framework.core.OutputPort; +import teetime.variant.methodcallWithPorts.framework.core.StageWithPort; + +public class EndStage<T> implements StageWithPort<T, T> { + + public int count; + public ConstructorClosure<?> closure; + public List<Object> list = new LinkedList<Object>(); + + @Override + public void onIsPipelineHead() { + // do nothing + } + + @Override + public CommittableQueue<T> execute2(final CommittableQueue<T> elements) { + // TODO Auto-generated method stub + return null; + } + + @Override + public StageWithPort getParentStage() { + // TODO Auto-generated method stub + return null; + } + + @Override + public void setParentStage(final StageWithPort parentStage, final int index) { + // TODO Auto-generated method stub + + } + + @Override + public StageWithPort next() { + // TODO Auto-generated method stub + return null; + } + + @Override + public void setSuccessor(final StageWithPort<? super T, ?> successor) { + // TODO Auto-generated method stub + + } + + @Override + public boolean isReschedulable() { + // TODO Auto-generated method stub + return false; + } + + @Override + public InputPort<T> getInputPort() { + // TODO Auto-generated method stub + return null; + } + + @Override + public OutputPort<T> getOutputPort() { + // TODO Auto-generated method stub + return null; + } + + @Override + public void executeWithPorts() { + // this.getInputPort().receive(); // just consume + // do nothing + // this.count++; + // Object r = this.closure.execute(null); + // this.list.add(r); + } + + @Override + public void onStart() { + // TODO Auto-generated method stub + + } + +} diff --git a/src/main/java/teetime/variant/methodcall/stage/Merger.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/Merger.java similarity index 86% rename from src/main/java/teetime/variant/methodcall/stage/Merger.java rename to src/main/java/teetime/variant/methodcallWithPorts/stage/Merger.java index 0c5a9f31..aae8c293 100644 --- a/src/main/java/teetime/variant/methodcall/stage/Merger.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/Merger.java @@ -1,12 +1,12 @@ -package teetime.variant.methodcall.stage; +package teetime.variant.methodcallWithPorts.stage; import java.util.ArrayList; import java.util.List; import teetime.util.concurrent.spsc.Pow2; import teetime.util.list.CommittableQueue; -import teetime.variant.methodcall.framework.core.AbstractStage; -import teetime.variant.methodcall.framework.core.InputPort; +import teetime.variant.methodcallWithPorts.framework.core.AbstractStage; +import teetime.variant.methodcallWithPorts.framework.core.InputPort; public class Merger<T> extends AbstractStage<T, T> { @@ -17,12 +17,6 @@ public class Merger<T> extends AbstractStage<T, T> { private int size; private InputPort<T>[] inputPorts; - @Override - public T execute(final Object element) { - // TODO Auto-generated method stub - return null; - } - @Override protected void execute4(final CommittableQueue<T> elements) { // TODO Auto-generated method stub diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/NoopFilter.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/NoopFilter.java new file mode 100644 index 00000000..69110cb6 --- /dev/null +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/NoopFilter.java @@ -0,0 +1,45 @@ +/*************************************************************************** + * Copyright 2014 Kieker Project (http://kieker-monitoring.net) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ***************************************************************************/ +package teetime.variant.methodcallWithPorts.stage; + +import teetime.util.list.CommittableQueue; +import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage; + +/** + * @author Christian Wulf + * + * @since 1.10 + */ +public class NoopFilter<T> extends ConsumerStage<T, T> { + + // @Override + // public void execute3() { + // T element = this.getInputPort().receive(); + // // this.getOutputPort().send(element); + // } + + @Override + protected void execute4(final CommittableQueue<T> elements) { + T element = elements.removeFromHead(); + this.execute5(element); + } + + @Override + protected void execute5(final T element) { + this.send(element); // "send" calls the next stage and so on + } + +} diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/ObjectProducer.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/ObjectProducer.java new file mode 100644 index 00000000..61d001cd --- /dev/null +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/ObjectProducer.java @@ -0,0 +1,99 @@ +/*************************************************************************** + * Copyright 2014 Kieker Project (http://kieker-monitoring.net) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ***************************************************************************/ +package teetime.variant.methodcallWithPorts.stage; + +import teetime.util.ConstructorClosure; +import teetime.util.list.CommittableQueue; +import teetime.variant.methodcallWithPorts.framework.core.ProducerStage; + +/** + * @author Christian Wulf + * + * @since 1.10 + */ +public class ObjectProducer<T> extends ProducerStage<Void, T> { + + private long numInputObjects; + private ConstructorClosure<T> inputObjectCreator; + + /** + * @since 1.10 + */ + public ObjectProducer(final long numInputObjects, final ConstructorClosure<T> inputObjectCreator) { + this.numInputObjects = numInputObjects; + this.inputObjectCreator = inputObjectCreator; + } + + public long getNumInputObjects() { + return this.numInputObjects; + } + + public void setNumInputObjects(final long numInputObjects) { + this.numInputObjects = numInputObjects; + } + + public ConstructorClosure<T> getInputObjectCreator() { + return this.inputObjectCreator; + } + + public void setInputObjectCreator(final ConstructorClosure<T> inputObjectCreator) { + this.inputObjectCreator = inputObjectCreator; + } + + // @Override + // protected void execute3() { + // if (this.numInputObjects == 0) { + // // this.getOutputPort().send((T) END_SIGNAL); + // return; + // } + // + // try { + // final T newObject = this.inputObjectCreator.call(); + // this.numInputObjects--; + // + // // this.getOutputPort().send(newObject); + // } catch (final Exception e) { + // throw new IllegalStateException(e); + // } + // } + + @Override + protected void execute4(final CommittableQueue<Void> elements) { + this.execute5(null); + } + + @Override + protected void execute5(final Void element) { + T newObject = null; + newObject = this.inputObjectCreator.create(); + this.numInputObjects--; + + if (this.numInputObjects == 0) { + this.setReschedulable(false); + // this.getOutputPort().pipe.close(); + } + + // System.out.println(this.getClass().getSimpleName() + ": sending " + this.numInputObjects); + this.send(newObject); + } + + // @Override + // public void onIsPipelineHead() { + // // this.getOutputPort().pipe = null; // no performance increase + // super.onIsPipelineHead(); + // } + +} diff --git a/src/main/java/teetime/variant/methodcall/stage/Relay.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/Relay.java similarity index 80% rename from src/main/java/teetime/variant/methodcall/stage/Relay.java rename to src/main/java/teetime/variant/methodcallWithPorts/stage/Relay.java index f1a35209..c4e7a006 100644 --- a/src/main/java/teetime/variant/methodcall/stage/Relay.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/Relay.java @@ -1,7 +1,7 @@ -package teetime.variant.methodcall.stage; +package teetime.variant.methodcallWithPorts.stage; import teetime.util.list.CommittableQueue; -import teetime.variant.methodcall.framework.core.AbstractStage; +import teetime.variant.methodcallWithPorts.framework.core.AbstractStage; public class Relay<T> extends AbstractStage<T, T> { @@ -22,12 +22,6 @@ public class Relay<T> extends AbstractStage<T, T> { this.send(element); } - @Override - public T execute(final Object element) { - // TODO Auto-generated method stub - return null; - } - @Override public void onIsPipelineHead() { System.out.println("onIsPipelineHead"); diff --git a/src/main/java/teetime/variant/methodcall/stage/Sink.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/Sink.java similarity index 57% rename from src/main/java/teetime/variant/methodcall/stage/Sink.java rename to src/main/java/teetime/variant/methodcallWithPorts/stage/Sink.java index 89ff7c75..b4485b35 100644 --- a/src/main/java/teetime/variant/methodcall/stage/Sink.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/Sink.java @@ -1,16 +1,10 @@ -package teetime.variant.methodcall.stage; +package teetime.variant.methodcallWithPorts.stage; import teetime.util.list.CommittableQueue; -import teetime.variant.methodcall.framework.core.ConsumerStage; +import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage; public class Sink<T> extends ConsumerStage<T, T> { - @Override - public T execute(final Object element) { - // TODO Auto-generated method stub - return null; - } - @Override protected void execute4(final CommittableQueue<T> elements) { // TODO Auto-generated method stub diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/StartTimestampFilter.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/StartTimestampFilter.java new file mode 100644 index 00000000..a35bf75f --- /dev/null +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/StartTimestampFilter.java @@ -0,0 +1,47 @@ +/*************************************************************************** + * Copyright 2014 Kieker Project (http://kieker-monitoring.net) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ***************************************************************************/ +package teetime.variant.methodcallWithPorts.stage; + +import teetime.util.list.CommittableQueue; +import teetime.variant.explicitScheduling.examples.throughput.TimestampObject; +import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage; + +/** + * @author Christian Wulf + * + * @since 1.10 + */ +public class StartTimestampFilter extends ConsumerStage<TimestampObject, TimestampObject> { + + // @Override + // public void execute3() { + // TimestampObject element = this.getInputPort().receive(); + // element.setStartTimestamp(System.nanoTime()); + // // this.getOutputPort().send(element); + // } + + @Override + protected void execute4(final CommittableQueue<TimestampObject> elements) { + TimestampObject element = elements.removeFromHead(); + this.execute5(element); + } + + @Override + protected void execute5(final TimestampObject element) { + element.setStartTimestamp(System.nanoTime()); + this.send(element); + } +} diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/StopTimestampFilter.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/StopTimestampFilter.java new file mode 100644 index 00000000..a1601cb4 --- /dev/null +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/StopTimestampFilter.java @@ -0,0 +1,47 @@ +/*************************************************************************** + * Copyright 2014 Kieker Project (http://kieker-monitoring.net) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ***************************************************************************/ +package teetime.variant.methodcallWithPorts.stage; + +import teetime.util.list.CommittableQueue; +import teetime.variant.explicitScheduling.examples.throughput.TimestampObject; +import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage; + +/** + * @author Christian Wulf + * + * @since 1.10 + */ +public class StopTimestampFilter extends ConsumerStage<TimestampObject, TimestampObject> { + + // @Override + // public void execute3() { + // TimestampObject element = this.getInputPort().receive(); + // element.setStopTimestamp(System.nanoTime()); + // // this.getOutputPort().send(element); + // } + + @Override + protected void execute4(final CommittableQueue<TimestampObject> elements) { + TimestampObject element = elements.removeFromHead(); + this.execute5(element); + } + + @Override + protected void execute5(final TimestampObject element) { + element.setStopTimestamp(System.nanoTime()); + this.send(element); + } +} diff --git a/src/test/java/teetime/variant/methodcall/examples/experiment01/MethodCallThroughputAnalysis1.java b/src/test/java/teetime/variant/methodcall/examples/experiment01/MethodCallThroughputAnalysis1.java index e53851df..f5a71bf9 100644 --- a/src/test/java/teetime/variant/methodcall/examples/experiment01/MethodCallThroughputAnalysis1.java +++ b/src/test/java/teetime/variant/methodcall/examples/experiment01/MethodCallThroughputAnalysis1.java @@ -65,10 +65,13 @@ public class MethodCallThroughputAnalysis1 extends Analysis { @Override public void run() { while (true) { - TimestampObject object = objectProducer.execute(null); - if (object == null) { + if (!objectProducer.isReschedulable()) { return; } + TimestampObject object = objectProducer.execute(null); + // if (object == null) + // return; + object = startTimestampFilter.execute(object); for (final NoopFilter<TimestampObject> noopFilter : noopFilters) { object = noopFilter.execute(object); diff --git a/src/test/java/teetime/variant/methodcall/examples/experiment02/MethodCallThroughputAnalysis2.java b/src/test/java/teetime/variant/methodcall/examples/experiment02/MethodCallThroughputAnalysis2.java index 1155da2e..85eaebca 100644 --- a/src/test/java/teetime/variant/methodcall/examples/experiment02/MethodCallThroughputAnalysis2.java +++ b/src/test/java/teetime/variant/methodcall/examples/experiment02/MethodCallThroughputAnalysis2.java @@ -73,11 +73,6 @@ public class MethodCallThroughputAnalysis2 extends Analysis { pipeline.onStart(); - // pipeline.getInputPort().pipe = new Pipe<Void>(); - // pipeline.getInputPort().pipe.add(new Object()); - - // pipeline.getOutputPort().pipe = new Pipe<Void>(); - final Runnable runnable = new Runnable() { @Override public void run() { diff --git a/src/test/java/teetime/variant/methodcall/examples/experiment09/MethodCallThoughputTimestampAnalysis9Test.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment09/MethodCallThoughputTimestampAnalysis9Test.java similarity index 97% rename from src/test/java/teetime/variant/methodcall/examples/experiment09/MethodCallThoughputTimestampAnalysis9Test.java rename to src/test/java/teetime/variant/methodcallWithPorts/examples/experiment09/MethodCallThoughputTimestampAnalysis9Test.java index a09a084c..a0aa656b 100644 --- a/src/test/java/teetime/variant/methodcall/examples/experiment09/MethodCallThoughputTimestampAnalysis9Test.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment09/MethodCallThoughputTimestampAnalysis9Test.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. ***************************************************************************/ -package teetime.variant.methodcall.examples.experiment09; +package teetime.variant.methodcallWithPorts.examples.experiment09; import java.util.ArrayList; import java.util.List; diff --git a/src/test/java/teetime/variant/methodcall/examples/experiment09/MethodCallThroughputAnalysis9.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment09/MethodCallThroughputAnalysis9.java similarity index 98% rename from src/test/java/teetime/variant/methodcall/examples/experiment09/MethodCallThroughputAnalysis9.java rename to src/test/java/teetime/variant/methodcallWithPorts/examples/experiment09/MethodCallThroughputAnalysis9.java index 2ace0eb4..058b0b30 100644 --- a/src/test/java/teetime/variant/methodcall/examples/experiment09/MethodCallThroughputAnalysis9.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment09/MethodCallThroughputAnalysis9.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. ***************************************************************************/ -package teetime.variant.methodcall.examples.experiment09; +package teetime.variant.methodcallWithPorts.examples.experiment09; import java.util.List; diff --git a/src/test/java/teetime/variant/methodcall/examples/experiment10/MethodCallThoughputTimestampAnalysis10Test.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment10/MethodCallThoughputTimestampAnalysis10Test.java similarity index 97% rename from src/test/java/teetime/variant/methodcall/examples/experiment10/MethodCallThoughputTimestampAnalysis10Test.java rename to src/test/java/teetime/variant/methodcallWithPorts/examples/experiment10/MethodCallThoughputTimestampAnalysis10Test.java index 0c1e3e55..16d44a26 100644 --- a/src/test/java/teetime/variant/methodcall/examples/experiment10/MethodCallThoughputTimestampAnalysis10Test.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment10/MethodCallThoughputTimestampAnalysis10Test.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. ***************************************************************************/ -package teetime.variant.methodcall.examples.experiment10; +package teetime.variant.methodcallWithPorts.examples.experiment10; import java.util.ArrayList; import java.util.List; diff --git a/src/test/java/teetime/variant/methodcall/examples/experiment10/MethodCallThroughputAnalysis10.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment10/MethodCallThroughputAnalysis10.java similarity index 98% rename from src/test/java/teetime/variant/methodcall/examples/experiment10/MethodCallThroughputAnalysis10.java rename to src/test/java/teetime/variant/methodcallWithPorts/examples/experiment10/MethodCallThroughputAnalysis10.java index 35282b24..b5c71498 100644 --- a/src/test/java/teetime/variant/methodcall/examples/experiment10/MethodCallThroughputAnalysis10.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment10/MethodCallThroughputAnalysis10.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. ***************************************************************************/ -package teetime.variant.methodcall.examples.experiment10; +package teetime.variant.methodcallWithPorts.examples.experiment10; import java.util.List; diff --git a/src/test/java/teetime/variant/methodcall/examples/experiment11/MethodCallThoughputTimestampAnalysis11Test.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment11/MethodCallThoughputTimestampAnalysis11Test.java similarity index 97% rename from src/test/java/teetime/variant/methodcall/examples/experiment11/MethodCallThoughputTimestampAnalysis11Test.java rename to src/test/java/teetime/variant/methodcallWithPorts/examples/experiment11/MethodCallThoughputTimestampAnalysis11Test.java index 1f9d7099..a6f76d00 100644 --- a/src/test/java/teetime/variant/methodcall/examples/experiment11/MethodCallThoughputTimestampAnalysis11Test.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment11/MethodCallThoughputTimestampAnalysis11Test.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. ***************************************************************************/ -package teetime.variant.methodcall.examples.experiment11; +package teetime.variant.methodcallWithPorts.examples.experiment11; import java.util.ArrayList; import java.util.List; diff --git a/src/test/java/teetime/variant/methodcall/examples/experiment11/MethodCallThroughputAnalysis11.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment11/MethodCallThroughputAnalysis11.java similarity index 87% rename from src/test/java/teetime/variant/methodcall/examples/experiment11/MethodCallThroughputAnalysis11.java rename to src/test/java/teetime/variant/methodcallWithPorts/examples/experiment11/MethodCallThroughputAnalysis11.java index c544e75e..66514963 100644 --- a/src/test/java/teetime/variant/methodcall/examples/experiment11/MethodCallThroughputAnalysis11.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment11/MethodCallThroughputAnalysis11.java @@ -13,21 +13,21 @@ * See the License for the specific language governing permissions and * limitations under the License. ***************************************************************************/ -package teetime.variant.methodcall.examples.experiment11; +package teetime.variant.methodcallWithPorts.examples.experiment11; import java.util.List; import teetime.util.ConstructorClosure; import teetime.variant.explicitScheduling.examples.throughput.TimestampObject; import teetime.variant.explicitScheduling.framework.core.Analysis; -import teetime.variant.methodcall.framework.core.Pipeline; -import teetime.variant.methodcall.framework.core.RunnableStage; -import teetime.variant.methodcall.framework.core.pipe.UnorderedGrowablePipe; -import teetime.variant.methodcall.stage.CollectorSink; -import teetime.variant.methodcall.stage.NoopFilter; -import teetime.variant.methodcall.stage.ObjectProducer; -import teetime.variant.methodcall.stage.StartTimestampFilter; -import teetime.variant.methodcall.stage.StopTimestampFilter; +import teetime.variant.methodcallWithPorts.framework.core.Pipeline; +import teetime.variant.methodcallWithPorts.framework.core.RunnableStage; +import teetime.variant.methodcallWithPorts.framework.core.pipe.UnorderedGrowablePipe; +import teetime.variant.methodcallWithPorts.stage.CollectorSink; +import teetime.variant.methodcallWithPorts.stage.NoopFilter; +import teetime.variant.methodcallWithPorts.stage.ObjectProducer; +import teetime.variant.methodcallWithPorts.stage.StartTimestampFilter; +import teetime.variant.methodcallWithPorts.stage.StopTimestampFilter; /** * @author Christian Wulf diff --git a/src/test/java/what tests work.txt b/src/test/java/what tests work.txt new file mode 100644 index 00000000..5a29be3e --- /dev/null +++ b/src/test/java/what tests work.txt @@ -0,0 +1,17 @@ +ok +no +inf +inf +inf +no +no +no + + +ok +no + + + + + -- GitLab