diff --git a/jvm-flags.txt b/jvm-flags.txt new file mode 100644 index 0000000000000000000000000000000000000000..392db111c487dc13febf6dc73e2fd3d6fead4526 --- /dev/null +++ b/jvm-flags.txt @@ -0,0 +1,3 @@ + +-verbose:gc +-XX:+PrintCompilation \ No newline at end of file diff --git a/src/main/java/teetime/framework/core/Analysis.java b/src/main/java/teetime/framework/core/Analysis.java index a5726fab4d6cb4caebbc4e0e15d1d226200deed5..da54823a29f38f25610dd96703001e28c3738a72 100644 --- a/src/main/java/teetime/framework/core/Analysis.java +++ b/src/main/java/teetime/framework/core/Analysis.java @@ -24,7 +24,7 @@ package teetime.framework.core; public class Analysis { public void init() { - + System.out.println("Analysis initialized."); } public void start() { diff --git a/src/main/java/teetime/util/ListUtil.java b/src/main/java/teetime/util/ListUtil.java new file mode 100644 index 0000000000000000000000000000000000000000..15df95e36b9584a5d10edd80b7b5f031fa814105 --- /dev/null +++ b/src/main/java/teetime/util/ListUtil.java @@ -0,0 +1,20 @@ +package teetime.util; + +import java.util.Collection; +import java.util.List; + +public class ListUtil { + + private ListUtil() { + // utility class + } + + public static <T> List<T> merge(final List<List<T>> listOfLists) { + List<T> resultList = listOfLists.get(0); + for (int i = 1; i < listOfLists.size(); i++) { + Collection<? extends T> timestampObjectList = listOfLists.get(i); + resultList.addAll(timestampObjectList); + } + return resultList; + } +} diff --git a/src/test/java/teetime/examples/throughput/MethodCallThoughputTimestampAnalysis16Test.java b/src/test/java/teetime/examples/throughput/MethodCallThoughputTimestampAnalysis16Test.java index e950ddac22ff0e1bccad8105f06bb6e009ced7bb..d6c1aa27e26cbc42ab04946a6a3519a6c871ccc5 100644 --- a/src/test/java/teetime/examples/throughput/MethodCallThoughputTimestampAnalysis16Test.java +++ b/src/test/java/teetime/examples/throughput/MethodCallThoughputTimestampAnalysis16Test.java @@ -15,7 +15,6 @@ ***************************************************************************/ package teetime.examples.throughput; -import java.util.LinkedList; import java.util.List; import java.util.concurrent.Callable; @@ -23,6 +22,7 @@ import org.junit.Before; import org.junit.Test; import teetime.examples.throughput.methodcall.MethodCallThroughputAnalysis16; +import teetime.util.ListUtil; import teetime.util.StatisticsUtil; import teetime.util.StopWatch; @@ -67,11 +67,7 @@ public class MethodCallThoughputTimestampAnalysis16Test { analysis.onTerminate(); } - // merge - List<TimestampObject> timestampObjects = new LinkedList<TimestampObject>(); - for (List<TimestampObject> timestampObjectList : analysis.getTimestampObjectsList()) { - timestampObjects.addAll(timestampObjectList); - } + List<TimestampObject> timestampObjects = ListUtil.merge(analysis.getTimestampObjectsList()); StatisticsUtil.printStatistics(stopWatch.getDurationInNs(), timestampObjects); } } diff --git a/src/test/java/teetime/examples/throughput/MethodCallThoughputTimestampAnalysis17Test.java b/src/test/java/teetime/examples/throughput/MethodCallThoughputTimestampAnalysis17Test.java index 3fb6736d721a6b8f58d9adba588564d1115fdd13..23964a0550037fa84e69c86e83f68747755f96d9 100644 --- a/src/test/java/teetime/examples/throughput/MethodCallThoughputTimestampAnalysis17Test.java +++ b/src/test/java/teetime/examples/throughput/MethodCallThoughputTimestampAnalysis17Test.java @@ -15,14 +15,14 @@ ***************************************************************************/ package teetime.examples.throughput; -import java.util.Collection; import java.util.List; -import java.util.concurrent.Callable; import org.junit.Before; import org.junit.Test; +import teetime.examples.throughput.methodcall.Closure; import teetime.examples.throughput.methodcall.MethodCallThroughputAnalysis17; +import teetime.util.ListUtil; import teetime.util.StatisticsUtil; import teetime.util.StopWatch; @@ -51,14 +51,15 @@ public class MethodCallThoughputTimestampAnalysis17Test { final MethodCallThroughputAnalysis17 analysis = new MethodCallThroughputAnalysis17(); analysis.setNumNoopFilters(NUM_NOOP_FILTERS); - analysis.setInput(NUM_OBJECTS_TO_CREATE, new Callable<TimestampObject>() { + analysis.setInput(NUM_OBJECTS_TO_CREATE, new Closure<Void, TimestampObject>() { @Override - public TimestampObject call() throws Exception { + public TimestampObject execute(final Void element) { return new TimestampObject(); } }); analysis.init(); + System.out.println("starting"); stopWatch.start(); try { analysis.start(); @@ -67,12 +68,7 @@ public class MethodCallThoughputTimestampAnalysis17Test { analysis.onTerminate(); } - // merge - List<TimestampObject> timestampObjects = analysis.getTimestampObjectsList().get(0); - for (int i = 1; i < analysis.getTimestampObjectsList().size(); i++) { - Collection<? extends TimestampObject> timestampObjectList = analysis.getTimestampObjectsList().get(i); - timestampObjects.addAll(timestampObjectList); - } + List<TimestampObject> timestampObjects = ListUtil.merge(analysis.getTimestampObjectsList()); StatisticsUtil.printStatistics(stopWatch.getDurationInNs(), timestampObjects); } } diff --git a/src/test/java/teetime/examples/throughput/methodcall/Closure.java b/src/test/java/teetime/examples/throughput/methodcall/Closure.java new file mode 100644 index 0000000000000000000000000000000000000000..de9c4e64e158acf5dc2f6db6e9b415731f2872b6 --- /dev/null +++ b/src/test/java/teetime/examples/throughput/methodcall/Closure.java @@ -0,0 +1,6 @@ +package teetime.examples.throughput.methodcall; + +public interface Closure<I, O> { + + O execute(I element); +} diff --git a/src/test/java/teetime/examples/throughput/methodcall/MethodCallThroughputAnalysis16.java b/src/test/java/teetime/examples/throughput/methodcall/MethodCallThroughputAnalysis16.java index a0f7fd12e6c4352b4e2e34296cf150d97e77e10b..86cfc65f623018d47ebff2bfbb12e3a415ea7793 100644 --- a/src/test/java/teetime/examples/throughput/methodcall/MethodCallThroughputAnalysis16.java +++ b/src/test/java/teetime/examples/throughput/methodcall/MethodCallThroughputAnalysis16.java @@ -54,8 +54,8 @@ public class MethodCallThroughputAnalysis16 extends Analysis { @Override public void init() { super.init(); - Runnable producerRunnable = this.buildProducerPipeline(); - this.producerThread = new Thread(producerRunnable); + Pipeline<Void, TimestampObject> producerPipeline = this.buildProducerPipeline(); + this.producerThread = new Thread(new RunnableStage(producerPipeline)); int numWorkerThreads = Math.min(NUM_WORKER_THREADS, 1); // only for testing purpose @@ -64,7 +64,7 @@ public class MethodCallThroughputAnalysis16 extends Analysis { List<TimestampObject> resultList = new ArrayList<TimestampObject>(this.numInputObjects); this.timestampObjectsList.add(resultList); - Runnable workerRunnable = this.buildPipeline(this.distributor, resultList); + Runnable workerRunnable = this.buildPipeline(producerPipeline, resultList); this.workerThreads[i] = new Thread(workerRunnable); } @@ -78,7 +78,7 @@ public class MethodCallThroughputAnalysis16 extends Analysis { } } - private Runnable buildProducerPipeline() { + private Pipeline<Void, TimestampObject> buildProducerPipeline() { final ObjectProducer<TimestampObject> objectProducer = new ObjectProducer<TimestampObject>(this.numInputObjects, this.inputObjectCreator); this.distributor = new Distributor<TimestampObject>(); @@ -88,14 +88,14 @@ public class MethodCallThroughputAnalysis16 extends Analysis { UnorderedGrowablePipe.connect(objectProducer.getOutputPort(), this.distributor.getInputPort()); - return new RunnableStage(pipeline); + return pipeline; } /** * @param numNoopFilters * @since 1.10 */ - private Runnable buildPipeline(final Distributor<TimestampObject> distributor, final List<TimestampObject> timestampObjects) { + private Runnable buildPipeline(final StageWithPort<Void, TimestampObject> previousStage, final List<TimestampObject> timestampObjects) { Relay<TimestampObject> relay = new Relay<TimestampObject>(); @SuppressWarnings("unchecked") final NoopFilter<TimestampObject>[] noopFilters = new NoopFilter[this.numNoopFilters]; @@ -114,7 +114,7 @@ public class MethodCallThroughputAnalysis16 extends Analysis { pipeline.addIntermediateStage(stopTimestampFilter); pipeline.setLastStage(collectorSink); - SpScPipe.connect(distributor.getNewOutputPort(), relay.getInputPort()); + SpScPipe.connect(previousStage.getOutputPort(), relay.getInputPort()); UnorderedGrowablePipe.connect(relay.getOutputPort(), startTimestampFilter.getInputPort()); diff --git a/src/test/java/teetime/examples/throughput/methodcall/MethodCallThroughputAnalysis17.java b/src/test/java/teetime/examples/throughput/methodcall/MethodCallThroughputAnalysis17.java index 1d41cca7dd303ca9eb5e053f9e1e12ab82fdd654..81340bfedc1113a285c1554a717d7a0306054f81 100644 --- a/src/test/java/teetime/examples/throughput/methodcall/MethodCallThroughputAnalysis17.java +++ b/src/test/java/teetime/examples/throughput/methodcall/MethodCallThroughputAnalysis17.java @@ -18,14 +18,16 @@ package teetime.examples.throughput.methodcall; import java.util.ArrayList; import java.util.LinkedList; import java.util.List; -import java.util.concurrent.Callable; import teetime.examples.throughput.TimestampObject; import teetime.examples.throughput.methodcall.stage.CollectorSink; import teetime.examples.throughput.methodcall.stage.Distributor; +import teetime.examples.throughput.methodcall.stage.EndStage; import teetime.examples.throughput.methodcall.stage.NoopFilter; +import teetime.examples.throughput.methodcall.stage.ObjectProducer; import teetime.examples.throughput.methodcall.stage.Pipeline; import teetime.examples.throughput.methodcall.stage.Relay; +import teetime.examples.throughput.methodcall.stage.Sink; import teetime.examples.throughput.methodcall.stage.StartTimestampFilter; import teetime.examples.throughput.methodcall.stage.StopTimestampFilter; import teetime.framework.core.Analysis; @@ -40,21 +42,18 @@ public class MethodCallThroughputAnalysis17 extends Analysis { private static final int NUM_WORKER_THREADS = Runtime.getRuntime().availableProcessors(); private int numInputObjects; - private Callable<TimestampObject> inputObjectCreator; + private Closure<Void, TimestampObject> inputObjectCreator; private int numNoopFilters; private final List<List<TimestampObject>> timestampObjectsList = new LinkedList<List<TimestampObject>>(); - private Distributor<TimestampObject> distributor; private Thread producerThread; - private Thread[] workerThreads; @Override public void init() { - super.init(); - // Runnable producerRunnable = this.buildProducerPipeline(); - // this.producerThread = new Thread(producerRunnable); + final Pipeline<Void, TimestampObject> producerPipeline = this.buildProducerPipeline(this.numInputObjects, this.inputObjectCreator); + // this.producerThread = new Thread(new RunnableStage(producerPipeline)); int numWorkerThreads = Math.min(NUM_WORKER_THREADS, 1); // only for testing purpose @@ -63,38 +62,86 @@ public class MethodCallThroughputAnalysis17 extends Analysis { List<TimestampObject> resultList = new ArrayList<TimestampObject>(this.numInputObjects); this.timestampObjectsList.add(resultList); - Runnable workerRunnable = this.buildPipeline(this.distributor, resultList); + Runnable workerRunnable = this.buildPipeline(null, resultList); this.workerThreads[i] = new Thread(workerRunnable); } + // this.producerThread = new Thread(new Runnable() { + // @Override + // public void run() { + // TimestampObject ts; + // try { + // ts = MethodCallThroughputAnalysis17.this.inputObjectCreator.call(); + // System.out.println("test" + producerPipeline + ", # filters: " + MethodCallThroughputAnalysis17.this.numNoopFilters + ", ts: " + // + ts); + // MethodCallThroughputAnalysis17.this.numInputObjects++; + // System.out.println("numInputObjects: " + MethodCallThroughputAnalysis17.this.numInputObjects); + // MethodCallThroughputAnalysis17.this.numInputObjects--; + // } catch (Exception e) { + // // TODO Auto-generated catch block + // e.printStackTrace(); + // } + // System.out.println("run end"); + // } + // }); + // this.producerThread.start(); - // + // this.producerThread.run(); + new RunnableStage(producerPipeline).run(); + + // Pipeline<Void, TimestampObject> stage = producerPipeline; + // stage.onStart(); + // do { + // stage.executeWithPorts(); + // } while (stage.isReschedulable()); + // try { // this.producerThread.join(); // } catch (InterruptedException e1) { // // TODO Auto-generated catch block // e1.printStackTrace(); // } + + // try { + // Thread.sleep(1000); + // } catch (InterruptedException e) { + // // TODO Auto-generated catch block + // e.printStackTrace(); + // } + + super.init(); } - // private Runnable buildProducerPipeline() { - // final ObjectProducer<TimestampObject> objectProducer = new ObjectProducer<TimestampObject>(this.numInputObjects, this.inputObjectCreator); - // this.distributor = new Distributor<TimestampObject>(); - // - // final Pipeline<Void, TimestampObject> pipeline = new Pipeline<Void, TimestampObject>(); - // pipeline.setFirstStage(objectProducer); - // pipeline.setLastStage(this.distributor); - // - // UnorderedGrowablePipe.connect(objectProducer.getOutputPort(), this.distributor.getInputPort()); - // - // return new RunnableStage(pipeline); - // } + private Pipeline<Void, TimestampObject> buildProducerPipeline(final int numInputObjects, final Closure<Void, TimestampObject> inputObjectCreator) { + final ObjectProducer<TimestampObject> objectProducer = new ObjectProducer<TimestampObject>(numInputObjects, inputObjectCreator); + Distributor<TimestampObject> distributor = new Distributor<TimestampObject>(); + Sink<TimestampObject> sink = new Sink<TimestampObject>(); + EndStage<Void> endStage = new EndStage<Void>(); + endStage.closure = inputObjectCreator; + + final Pipeline<Void, TimestampObject> pipeline = new Pipeline<Void, TimestampObject>(); + pipeline.setFirstStage(objectProducer); + // pipeline.setFirstStage(sink); + // pipeline.setFirstStage(endStage); + + pipeline.setLastStage(distributor); + // pipeline.setLastStage(sink); + pipeline.setLastStage(new EndStage<TimestampObject>()); + + // UnorderedGrowablePipe.connect(objectProducer.getOutputPort(), sink.getInputPort()); + // objectProducer.getOutputPort().pipe = new UnorderedGrowablePipe<TimestampObject>(); + + UnorderedGrowablePipe.connect(objectProducer.getOutputPort(), distributor.getInputPort()); + distributor.getOutputPort().pipe = new UnorderedGrowablePipe<TimestampObject>(); + + return pipeline; + } /** * @param numNoopFilters * @since 1.10 */ - private Runnable buildPipeline(final Distributor<TimestampObject> distributor, final List<TimestampObject> timestampObjects) { + private Runnable buildPipeline(final StageWithPort<Void, TimestampObject> previousStage, final List<TimestampObject> timestampObjects) { Relay<TimestampObject> relay = new Relay<TimestampObject>(); @SuppressWarnings("unchecked") final NoopFilter<TimestampObject>[] noopFilters = new NoopFilter[this.numNoopFilters]; @@ -116,7 +163,7 @@ public class MethodCallThroughputAnalysis17 extends Analysis { IPipe<TimestampObject> startPipe = new SpScPipe<TimestampObject>(); try { for (int i = 0; i < this.numInputObjects; i++) { - startPipe.add(this.inputObjectCreator.call()); + startPipe.add(this.inputObjectCreator.execute(null)); } startPipe.close(); } catch (Exception e) { @@ -124,6 +171,7 @@ public class MethodCallThroughputAnalysis17 extends Analysis { e.printStackTrace(); } relay.getInputPort().pipe = startPipe; + // previousStage.getOutputPort().pipe = startPipe; UnorderedGrowablePipe.connect(relay.getOutputPort(), startTimestampFilter.getInputPort()); @@ -155,7 +203,7 @@ public class MethodCallThroughputAnalysis17 extends Analysis { } } - public void setInput(final int numInputObjects, final Callable<TimestampObject> inputObjectCreator) { + public void setInput(final int numInputObjects, final Closure<Void, TimestampObject> inputObjectCreator) { this.numInputObjects = numInputObjects; this.inputObjectCreator = inputObjectCreator; } diff --git a/src/test/java/teetime/examples/throughput/methodcall/ProducerStage.java b/src/test/java/teetime/examples/throughput/methodcall/ProducerStage.java index 27faf320bd5f750f337ecd56f15533ca60ab7dd1..731075b1147402c7053b653b16c29280c9a54c7a 100644 --- a/src/test/java/teetime/examples/throughput/methodcall/ProducerStage.java +++ b/src/test/java/teetime/examples/throughput/methodcall/ProducerStage.java @@ -15,7 +15,7 @@ public abstract class ProducerStage<I, O> extends AbstractStage<I, O> { boolean outputIsEmpty = outputElements.isEmpty(); if (outputIsEmpty) { - this.disable(); + this.getOutputPort().pipe.close(); } return outputElements; diff --git a/src/test/java/teetime/examples/throughput/methodcall/Stage.java b/src/test/java/teetime/examples/throughput/methodcall/Stage.java index e2f868bbd427640e9ceb0923163c1b576ca809ab..a1d926aaa8f6f9d9d13ad235e38827ee16d7add2 100644 --- a/src/test/java/teetime/examples/throughput/methodcall/Stage.java +++ b/src/test/java/teetime/examples/throughput/methodcall/Stage.java @@ -12,15 +12,15 @@ public interface Stage<I, O> { CommittableQueue<O> execute2(CommittableQueue<I> elements); - SchedulingInformation getSchedulingInformation(); + // SchedulingInformation getSchedulingInformation(); - Stage getParentStage(); + Stage<?, ?> getParentStage(); - void setParentStage(Stage parentStage, int index); + void setParentStage(Stage<?, ?> parentStage, int index); - void setListener(OnDisableListener listener); + // void setListener(OnDisableListener listener); - Stage next(); + Stage<?, ?> next(); void setSuccessor(Stage<?, ?> successor); diff --git a/src/test/java/teetime/examples/throughput/methodcall/UnorderedGrowablePipe.java b/src/test/java/teetime/examples/throughput/methodcall/UnorderedGrowablePipe.java index b2c2edffb9e979772397c51e1672df2f41ae6b5e..1f9032ce9c5a2327f541d38544423f59b36b9913 100644 --- a/src/test/java/teetime/examples/throughput/methodcall/UnorderedGrowablePipe.java +++ b/src/test/java/teetime/examples/throughput/methodcall/UnorderedGrowablePipe.java @@ -58,6 +58,7 @@ public class UnorderedGrowablePipe<T> extends AbstractPipe<T> { @Override public T removeLast() { T element = this.elements[--this.lastFreeIndex]; + this.elements[this.lastFreeIndex] = null; // T element = this.elements.get(--this.lastFreeIndex); return element; } diff --git a/src/test/java/teetime/examples/throughput/methodcall/stage/AbstractStage.java b/src/test/java/teetime/examples/throughput/methodcall/stage/AbstractStage.java index fa9fa3709c1a89fcc771e948d646d3152ab21be5..4b5502f79562f5d5cad330009a8cdfec3c45d574 100644 --- a/src/test/java/teetime/examples/throughput/methodcall/stage/AbstractStage.java +++ b/src/test/java/teetime/examples/throughput/methodcall/stage/AbstractStage.java @@ -1,30 +1,24 @@ package teetime.examples.throughput.methodcall.stage; import teetime.examples.throughput.methodcall.InputPort; -import teetime.examples.throughput.methodcall.OnDisableListener; import teetime.examples.throughput.methodcall.OutputPort; -import teetime.examples.throughput.methodcall.SchedulingInformation; import teetime.examples.throughput.methodcall.Stage; import teetime.examples.throughput.methodcall.StageWithPort; import teetime.util.list.CommittableQueue; -import teetime.util.list.CommittableResizableArrayQueue; public abstract class AbstractStage<I, O> implements StageWithPort<I, O> { private final InputPort<I> inputPort = new InputPort<I>(); private final OutputPort<O> outputPort = new OutputPort<O>(); - protected final CommittableQueue<O> outputElements = new CommittableResizableArrayQueue<O>(null, 4); + // protected final CommittableQueue<O> outputElements = new CommittableResizableArrayQueue<O>(null, 4); + protected final CommittableQueue<O> outputElements = null; - private final SchedulingInformation schedulingInformation = new SchedulingInformation(); - - private Stage parentStage; - - private OnDisableListener listener; + private Stage<?, ?> parentStage; private int index; - private StageWithPort successor; + private StageWithPort<?, ?> successor; private boolean reschedulable; @@ -34,7 +28,7 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> { if (result == null) { return null; } - StageWithPort next = this.next(); + StageWithPort<?, ?> next = this.next(); // if (next != null) { // return next.executeRecursively(result); // } else { @@ -100,31 +94,32 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> { // CommittableQueue execute; + StageWithPort<?, ?> next = this.next(); do { // execute = this.next().execute2(this.outputElements); // execute = this.next().execute2(this.getOutputPort().pipe.getElements()); - this.next().executeWithPorts(); + next.executeWithPorts(); // System.out.println("Executed " + this.next().getClass().getSimpleName()); - } while (this.next().isReschedulable()); + } while (next.isReschedulable()); // } while (this.next().getInputPort().pipe.size() > 0); // } while (execute.size() > 0); } - @Override - public SchedulingInformation getSchedulingInformation() { - return this.schedulingInformation; - } + // @Override + // public SchedulingInformation getSchedulingInformation() { + // return this.schedulingInformation; + // } - public void disable() { - this.schedulingInformation.setActive(false); - this.fireOnDisable(); - } + // public void disable() { + // this.schedulingInformation.setActive(false); + // this.fireOnDisable(); + // } - private void fireOnDisable() { - if (this.listener != null) { - this.listener.onDisable(this, this.index); - } - } + // private void fireOnDisable() { + // if (this.listener != null) { + // this.listener.onDisable(this, this.index); + // } + // } @Override public void onStart() { @@ -132,27 +127,18 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> { } @Override - public Stage getParentStage() { + public Stage<?, ?> getParentStage() { return this.parentStage; } @Override - public void setParentStage(final Stage parentStage, final int index) { + public void setParentStage(final Stage<?, ?> parentStage, final int index) { this.index = index; this.parentStage = parentStage; } - public OnDisableListener getListener() { - return this.listener; - } - - @Override - public void setListener(final OnDisableListener listener) { - this.listener = listener; - } - @Override - public StageWithPort next() { + public StageWithPort<?, ?> next() { return this.successor; } diff --git a/src/test/java/teetime/examples/throughput/methodcall/stage/Distributor.java b/src/test/java/teetime/examples/throughput/methodcall/stage/Distributor.java index ca66406587942fcea87b31650796c7ff39a6bc5b..628dd181cf076f972fca5f969c8685b109e00516 100644 --- a/src/test/java/teetime/examples/throughput/methodcall/stage/Distributor.java +++ b/src/test/java/teetime/examples/throughput/methodcall/stage/Distributor.java @@ -44,6 +44,12 @@ public class Distributor<T> extends ConsumerStage<T, T> { op.pipe.close(); System.out.println("End signal sent, size: " + op.pipe.size()); } + + // for (OutputPort<?> op : this.outputPorts) { + // op.pipe = null; + // } + // this.outputPorts = null; + // this.outputPortList.clear(); } @SuppressWarnings("unchecked") @@ -57,7 +63,12 @@ public class Distributor<T> extends ConsumerStage<T, T> { System.out.println("outputPorts: " + this.outputPorts); } - public OutputPort<T> getNewOutputPort() { + @Override + public OutputPort<T> getOutputPort() { + return this.getNewOutputPort(); + } + + private OutputPort<T> getNewOutputPort() { OutputPort<T> outputPort = new OutputPort<T>(); this.outputPortList.add(outputPort); return outputPort; diff --git a/src/test/java/teetime/examples/throughput/methodcall/stage/EndStage.java b/src/test/java/teetime/examples/throughput/methodcall/stage/EndStage.java index 9004c1c33afe317efed263bcde60158d5650d381..23439c4384f50a5c6e484a52c9dfe9f912575a80 100644 --- a/src/test/java/teetime/examples/throughput/methodcall/stage/EndStage.java +++ b/src/test/java/teetime/examples/throughput/methodcall/stage/EndStage.java @@ -1,9 +1,11 @@ package teetime.examples.throughput.methodcall.stage; +import java.util.LinkedList; +import java.util.List; + +import teetime.examples.throughput.methodcall.Closure; import teetime.examples.throughput.methodcall.InputPort; -import teetime.examples.throughput.methodcall.OnDisableListener; import teetime.examples.throughput.methodcall.OutputPort; -import teetime.examples.throughput.methodcall.SchedulingInformation; import teetime.examples.throughput.methodcall.Stage; import teetime.examples.throughput.methodcall.StageWithPort; import teetime.util.list.CommittableQueue; @@ -15,6 +17,10 @@ public class EndStage<T> implements StageWithPort<T, T> { return this.execute(element); } + public int count; + public Closure<Void, ?> closure; + public List<Object> list = new LinkedList<Object>(); + @Override public T execute(final Object element) { return (T) element; @@ -31,12 +37,6 @@ public class EndStage<T> implements StageWithPort<T, T> { return null; } - @Override - public SchedulingInformation getSchedulingInformation() { - // TODO Auto-generated method stub - return null; - } - @Override public Stage getParentStage() { // TODO Auto-generated method stub @@ -49,12 +49,6 @@ public class EndStage<T> implements StageWithPort<T, T> { } - @Override - public void setListener(final OnDisableListener listener) { - // TODO Auto-generated method stub - - } - @Override public Stage next() { // TODO Auto-generated method stub @@ -87,8 +81,11 @@ public class EndStage<T> implements StageWithPort<T, T> { @Override public void executeWithPorts() { - // TODO Auto-generated method stub - + // this.getInputPort().receive(); // just consume + // do nothing + // this.count++; + // Object r = this.closure.execute(null); + // this.list.add(r); } @Override diff --git a/src/test/java/teetime/examples/throughput/methodcall/stage/ObjectProducer.java b/src/test/java/teetime/examples/throughput/methodcall/stage/ObjectProducer.java index c31be8165a9f528e9caa337c31de8f66d88afaa0..86ee06067edfb243a58a171d4489d1ab66d93c2f 100644 --- a/src/test/java/teetime/examples/throughput/methodcall/stage/ObjectProducer.java +++ b/src/test/java/teetime/examples/throughput/methodcall/stage/ObjectProducer.java @@ -15,8 +15,7 @@ ***************************************************************************/ package teetime.examples.throughput.methodcall.stage; -import java.util.concurrent.Callable; - +import teetime.examples.throughput.methodcall.Closure; import teetime.examples.throughput.methodcall.ProducerStage; import teetime.util.list.CommittableQueue; @@ -28,12 +27,12 @@ import teetime.util.list.CommittableQueue; public class ObjectProducer<T> extends ProducerStage<Void, T> { private long numInputObjects; - private Callable<T> inputObjectCreator; + private Closure<Void, T> inputObjectCreator; /** * @since 1.10 */ - public ObjectProducer(final long numInputObjects, final Callable<T> inputObjectCreator) { + public ObjectProducer(final long numInputObjects, final Closure<Void, T> inputObjectCreator) { this.numInputObjects = numInputObjects; this.inputObjectCreator = inputObjectCreator; } @@ -46,7 +45,8 @@ public class ObjectProducer<T> extends ProducerStage<Void, T> { } try { - final T newObject = this.inputObjectCreator.call(); + // final T newObject = this.inputObjectCreator.call(); + final T newObject = null; this.numInputObjects--; return newObject; @@ -63,11 +63,11 @@ public class ObjectProducer<T> extends ProducerStage<Void, T> { this.numInputObjects = numInputObjects; } - public Callable<T> getInputObjectCreator() { + public Closure<Void, T> getInputObjectCreator() { return this.inputObjectCreator; } - public void setInputObjectCreator(final Callable<T> inputObjectCreator) { + public void setInputObjectCreator(final Closure<Void, T> inputObjectCreator) { this.inputObjectCreator = inputObjectCreator; } @@ -100,14 +100,21 @@ public class ObjectProducer<T> extends ProducerStage<Void, T> { return; } - try { - final T newObject = this.inputObjectCreator.call(); - this.numInputObjects--; - // System.out.println(this.getClass().getSimpleName() + ": sending " + this.numInputObjects); - this.send(newObject); - } catch (final Exception e) { - throw new IllegalStateException(e); - } + // try { + T newObject = null; + newObject = this.inputObjectCreator.execute(null); + this.numInputObjects--; + // System.out.println(this.getClass().getSimpleName() + ": sending " + this.numInputObjects); + this.send(newObject); + // } catch (final Exception e) { + // throw new IllegalStateException(e); + // } + } + + @Override + public void onIsPipelineHead() { + // this.getOutputPort().pipe = null; // no performance increase + super.onIsPipelineHead(); } } diff --git a/src/test/java/teetime/examples/throughput/methodcall/stage/Pipeline.java b/src/test/java/teetime/examples/throughput/methodcall/stage/Pipeline.java index 884c39d9de026d78ab77c42601f05389c44b6517..d77aa89906af2ebd7c467e32964a1059cf828cfe 100644 --- a/src/test/java/teetime/examples/throughput/methodcall/stage/Pipeline.java +++ b/src/test/java/teetime/examples/throughput/methodcall/stage/Pipeline.java @@ -5,18 +5,17 @@ import java.util.LinkedList; import java.util.List; import teetime.examples.throughput.methodcall.InputPort; -import teetime.examples.throughput.methodcall.OnDisableListener; import teetime.examples.throughput.methodcall.OutputPort; import teetime.examples.throughput.methodcall.SchedulingInformation; import teetime.examples.throughput.methodcall.Stage; import teetime.examples.throughput.methodcall.StageWithPort; import teetime.util.list.CommittableQueue; -public class Pipeline<I, O> implements StageWithPort<I, O>, OnDisableListener { +public class Pipeline<I, O> implements StageWithPort<I, O> { - private StageWithPort firstStage; + private StageWithPort<I, ?> firstStage; private final List<StageWithPort> intermediateStages = new LinkedList<StageWithPort>(); - private StageWithPort lastStage; + private StageWithPort<?, O> lastStage; private final SchedulingInformation schedulingInformation = new SchedulingInformation(); @@ -24,7 +23,6 @@ public class Pipeline<I, O> implements StageWithPort<I, O>, OnDisableListener { private Stage parentStage; private int index; private int startIndex; - private OnDisableListener listener; private boolean reschedulable; private int firstStageIndex; @@ -75,12 +73,13 @@ public class Pipeline<I, O> implements StageWithPort<I, O>, OnDisableListener { // this.setReschedulable(stage.isReschedulable()); } - private final void updateRescheduable(Stage stage) { + private final void updateRescheduable(Stage<?, ?> stage) { while (!stage.isReschedulable()) { this.firstStageIndex++; stage = stage.next(); if (stage == null) { // loop reaches the last stage this.setReschedulable(false); + this.cleanUp(); return; } stage.onIsPipelineHead(); @@ -134,7 +133,7 @@ public class Pipeline<I, O> implements StageWithPort<I, O>, OnDisableListener { for (int i = 0; i < this.stages.length; i++) { StageWithPort<?, ?> stage = this.stages[i]; stage.setParentStage(this, i); - stage.setListener(this); + // stage.setListener(this); } for (int i = 0; i < this.stages.length - 1; i++) { @@ -154,11 +153,6 @@ public class Pipeline<I, O> implements StageWithPort<I, O>, OnDisableListener { // return this.firstStage.getInputPort(); // } - @Override - public SchedulingInformation getSchedulingInformation() { - return this.schedulingInformation; - } - @Override public Stage getParentStage() { return this.parentStage; @@ -170,34 +164,6 @@ public class Pipeline<I, O> implements StageWithPort<I, O>, OnDisableListener { this.parentStage = parentStage; } - @Override - public void onDisable(final Stage stage, final int index) { - this.startIndex = index + 1; - if (this.startIndex == this.stages.length) { - this.disable(); - } - } - - public void disable() { - this.schedulingInformation.setActive(false); - this.fireOnDisable(); - } - - private void fireOnDisable() { - if (this.listener != null) { - this.listener.onDisable(this, this.index); - } - } - - public OnDisableListener getListener() { - return this.listener; - } - - @Override - public void setListener(final OnDisableListener listener) { - this.listener = listener; - } - @Override public O execute(final Object element) { throw new IllegalStateException(); @@ -237,9 +203,20 @@ public class Pipeline<I, O> implements StageWithPort<I, O>, OnDisableListener { return this.lastStage.getOutputPort(); } - // @Override - // public OutputPort getOutputPort() { - // return this.lastStage.getOutputPort(); - // } + // TODO remove since it does not increase performances + private void cleanUp() { + for (int i = 0; i < this.stages.length; i++) { + StageWithPort<?, ?> stage = this.stages[i]; + stage.setParentStage(null, i); + // stage.setListener(null); + stage.setSuccessor(null); + } + + this.firstStage = null; + this.intermediateStages.clear(); + this.lastStage = null; + + System.out.println("cleaned up"); + } } diff --git a/src/test/java/teetime/examples/throughput/methodcall/stage/Sink.java b/src/test/java/teetime/examples/throughput/methodcall/stage/Sink.java new file mode 100644 index 0000000000000000000000000000000000000000..4b358384eac52e356abcc8082726d6b729f31bf3 --- /dev/null +++ b/src/test/java/teetime/examples/throughput/methodcall/stage/Sink.java @@ -0,0 +1,25 @@ +package teetime.examples.throughput.methodcall.stage; + +import teetime.examples.throughput.methodcall.ConsumerStage; +import teetime.util.list.CommittableQueue; + +public class Sink<T> extends ConsumerStage<T, T> { + + @Override + public T execute(final Object element) { + // TODO Auto-generated method stub + return null; + } + + @Override + protected void execute4(final CommittableQueue<T> elements) { + // TODO Auto-generated method stub + + } + + @Override + protected void execute5(final T element) { + // do nothing + } + +}