diff --git a/src/main/java/teetime/framework/core/Analysis.java b/src/main/java/teetime/framework/core/Analysis.java index 915773cde27abdb717bdb3070e48b2bd1edccf3e..4fc4782a79028cffdfa89c075b7182bc515329e4 100644 --- a/src/main/java/teetime/framework/core/Analysis.java +++ b/src/main/java/teetime/framework/core/Analysis.java @@ -31,7 +31,7 @@ public class Analysis { } - public void terminate() { + public void onTerminate() { } } diff --git a/src/test/java/teetime/examples/throughput/MethodCallThoughputTimestampAnalysis16Test.java b/src/test/java/teetime/examples/throughput/MethodCallThoughputTimestampAnalysis16Test.java new file mode 100644 index 0000000000000000000000000000000000000000..e950ddac22ff0e1bccad8105f06bb6e009ced7bb --- /dev/null +++ b/src/test/java/teetime/examples/throughput/MethodCallThoughputTimestampAnalysis16Test.java @@ -0,0 +1,77 @@ +/*************************************************************************** + * Copyright 2014 Kieker Project (http://kieker-monitoring.net) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ***************************************************************************/ +package teetime.examples.throughput; + +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.Callable; + +import org.junit.Before; +import org.junit.Test; + +import teetime.examples.throughput.methodcall.MethodCallThroughputAnalysis16; +import teetime.util.StatisticsUtil; +import teetime.util.StopWatch; + +import kieker.common.logging.LogFactory; + +/** + * @author Christian Wulf + * + * @since 1.10 + */ +public class MethodCallThoughputTimestampAnalysis16Test { + + private static final int NUM_OBJECTS_TO_CREATE = 100000; + private static final int NUM_NOOP_FILTERS = 800; + + @Before + public void before() { + System.setProperty(LogFactory.CUSTOM_LOGGER_JVM, "NONE"); + } + + @Test + public void testWithManyObjects() { + System.out.println("Testing teetime (mc) with NUM_OBJECTS_TO_CREATE=" + NUM_OBJECTS_TO_CREATE + ", NUM_NOOP_FILTERS=" + + NUM_NOOP_FILTERS + "..."); + final StopWatch stopWatch = new StopWatch(); + + final MethodCallThroughputAnalysis16 analysis = new MethodCallThroughputAnalysis16(); + analysis.setNumNoopFilters(NUM_NOOP_FILTERS); + analysis.setInput(NUM_OBJECTS_TO_CREATE, new Callable<TimestampObject>() { + @Override + public TimestampObject call() throws Exception { + return new TimestampObject(); + } + }); + analysis.init(); + + stopWatch.start(); + try { + analysis.start(); + } finally { + stopWatch.end(); + analysis.onTerminate(); + } + + // merge + List<TimestampObject> timestampObjects = new LinkedList<TimestampObject>(); + for (List<TimestampObject> timestampObjectList : analysis.getTimestampObjectsList()) { + timestampObjects.addAll(timestampObjectList); + } + StatisticsUtil.printStatistics(stopWatch.getDurationInNs(), timestampObjects); + } +} diff --git a/src/test/java/teetime/examples/throughput/methodcall/MethodCallThroughputAnalysis11.java b/src/test/java/teetime/examples/throughput/methodcall/MethodCallThroughputAnalysis11.java index aa726513a81654e14fc18affebc167dc6e6aaf79..b8093dbbdf0c074c5a672d5b8a6a878245403715 100644 --- a/src/test/java/teetime/examples/throughput/methodcall/MethodCallThroughputAnalysis11.java +++ b/src/test/java/teetime/examples/throughput/methodcall/MethodCallThroughputAnalysis11.java @@ -62,22 +62,26 @@ public class MethodCallThroughputAnalysis11 extends Analysis { final StopTimestampFilter stopTimestampFilter = new StopTimestampFilter(); final CollectorSink<TimestampObject> collectorSink = new CollectorSink<TimestampObject>(this.timestampObjects); + // Relay<TimestampObject> relay = new Relay<TimestampObject>(); + final Pipeline<Void, Object> pipeline = new Pipeline<Void, Object>(); pipeline.setFirstStage(objectProducer); + // pipeline.addIntermediateStage(relay); pipeline.addIntermediateStage(startTimestampFilter); pipeline.addIntermediateStages(noopFilters); pipeline.addIntermediateStage(stopTimestampFilter); pipeline.setLastStage(collectorSink); - FixedSizedPipe.connect(objectProducer.getOutputPort(), startTimestampFilter.getInputPort()); - FixedSizedPipe.connect(startTimestampFilter.getOutputPort(), noopFilters[0].getInputPort()); + UnorderedGrowablePipe.connect(objectProducer.getOutputPort(), startTimestampFilter.getInputPort()); + // UnorderedGrowablePipe.connect(objectProducer.getOutputPort(), relay.getInputPort()); + // UnorderedGrowablePipe.connect(relay.getOutputPort(), startTimestampFilter.getInputPort()); + + UnorderedGrowablePipe.connect(startTimestampFilter.getOutputPort(), noopFilters[0].getInputPort()); for (int i = 0; i < noopFilters.length - 1; i++) { - FixedSizedPipe.connect(noopFilters[i].getOutputPort(), noopFilters[i + 1].getInputPort()); + UnorderedGrowablePipe.connect(noopFilters[i].getOutputPort(), noopFilters[i + 1].getInputPort()); } - FixedSizedPipe.connect(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort()); - FixedSizedPipe.connect(stopTimestampFilter.getOutputPort(), collectorSink.getInputPort()); - - pipeline.onStart(); + UnorderedGrowablePipe.connect(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort()); + UnorderedGrowablePipe.connect(stopTimestampFilter.getOutputPort(), collectorSink.getInputPort()); // pipeline.getInputPort().pipe = new Pipe<Void>(); // pipeline.getInputPort().pipe.add(new Object()); @@ -87,6 +91,7 @@ public class MethodCallThroughputAnalysis11 extends Analysis { final Runnable runnable = new Runnable() { @Override public void run() { + pipeline.onStart(); do { pipeline.executeWithPorts(); } while (pipeline.getSchedulingInformation().isActive() && pipeline.isReschedulable()); diff --git a/src/test/java/teetime/examples/throughput/methodcall/MethodCallThroughputAnalysis15.java b/src/test/java/teetime/examples/throughput/methodcall/MethodCallThroughputAnalysis15.java index 97604c13e984db4d11d2ba8e854846392eec1cef..e16c07a5dbba0e0aea02b5ccaffd7a2dbbc6d99f 100644 --- a/src/test/java/teetime/examples/throughput/methodcall/MethodCallThroughputAnalysis15.java +++ b/src/test/java/teetime/examples/throughput/methodcall/MethodCallThroughputAnalysis15.java @@ -105,15 +105,15 @@ public class MethodCallThroughputAnalysis15 extends Analysis { SpScPipe.connect(clock.getOutputPort(), delay.getTimestampTriggerInputPort()); - FixedSizedPipe.connect(objectProducer.getOutputPort(), startTimestampFilter.getInputPort()); - FixedSizedPipe.connect(startTimestampFilter.getOutputPort(), noopFilters[0].getInputPort()); + UnorderedGrowablePipe.connect(objectProducer.getOutputPort(), startTimestampFilter.getInputPort()); + UnorderedGrowablePipe.connect(startTimestampFilter.getOutputPort(), noopFilters[0].getInputPort()); for (int i = 0; i < noopFilters.length - 1; i++) { - FixedSizedPipe.connect(noopFilters[i].getOutputPort(), noopFilters[i + 1].getInputPort()); + UnorderedGrowablePipe.connect(noopFilters[i].getOutputPort(), noopFilters[i + 1].getInputPort()); } - FixedSizedPipe.connect(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort()); - FixedSizedPipe.connect(stopTimestampFilter.getOutputPort(), delay.getInputPort()); + UnorderedGrowablePipe.connect(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort()); + UnorderedGrowablePipe.connect(stopTimestampFilter.getOutputPort(), delay.getInputPort()); - FixedSizedPipe.connect(delay.getOutputPort(), collectorSink.getInputPort()); + UnorderedGrowablePipe.connect(delay.getOutputPort(), collectorSink.getInputPort()); pipeline.onStart(); diff --git a/src/test/java/teetime/examples/throughput/methodcall/MethodCallThroughputAnalysis16.java b/src/test/java/teetime/examples/throughput/methodcall/MethodCallThroughputAnalysis16.java new file mode 100644 index 0000000000000000000000000000000000000000..02a812a73352f43ede46d23a434e73c36ac73159 --- /dev/null +++ b/src/test/java/teetime/examples/throughput/methodcall/MethodCallThroughputAnalysis16.java @@ -0,0 +1,188 @@ +/*************************************************************************** + * Copyright 2014 Kieker Project (http://kieker-monitoring.net) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ***************************************************************************/ +package teetime.examples.throughput.methodcall; + +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.Callable; + +import teetime.examples.throughput.TimestampObject; +import teetime.examples.throughput.methodcall.stage.CollectorSink; +import teetime.examples.throughput.methodcall.stage.Distributor; +import teetime.examples.throughput.methodcall.stage.NoopFilter; +import teetime.examples.throughput.methodcall.stage.ObjectProducer; +import teetime.examples.throughput.methodcall.stage.Pipeline; +import teetime.examples.throughput.methodcall.stage.Relay; +import teetime.examples.throughput.methodcall.stage.StartTimestampFilter; +import teetime.examples.throughput.methodcall.stage.StopTimestampFilter; +import teetime.framework.core.Analysis; + +/** + * @author Christian Wulf + * + * @since 1.10 + */ +public class MethodCallThroughputAnalysis16 extends Analysis { + + private static final int NUM_WORKER_THREADS = Runtime.getRuntime().availableProcessors(); + + private int numInputObjects; + private Callable<TimestampObject> inputObjectCreator; + private int numNoopFilters; + + private final List<List<TimestampObject>> timestampObjectsList = new LinkedList<List<TimestampObject>>(); + + private Distributor<TimestampObject> distributor; + private Thread producerThread; + + private Thread[] workerThreads; + + @Override + public void init() { + super.init(); + Runnable producerRunnable = this.buildProducerPipeline(); + this.producerThread = new Thread(producerRunnable); + + int numWorkerThreads = Math.min(NUM_WORKER_THREADS, 1); // only for testing purpose + + this.workerThreads = new Thread[numWorkerThreads]; + for (int i = 0; i < this.workerThreads.length; i++) { + List<TimestampObject> resultList = new ArrayList<TimestampObject>(this.numInputObjects); + this.timestampObjectsList.add(resultList); + + Runnable workerRunnable = this.buildPipeline(this.distributor, resultList); + this.workerThreads[i] = new Thread(workerRunnable); + } + + this.producerThread.start(); + + try { + this.producerThread.join(); + } catch (InterruptedException e1) { + // TODO Auto-generated catch block + e1.printStackTrace(); + } + } + + private Runnable buildProducerPipeline() { + final ObjectProducer<TimestampObject> objectProducer = new ObjectProducer<TimestampObject>(this.numInputObjects, this.inputObjectCreator); + this.distributor = new Distributor<TimestampObject>(); + + final Pipeline<Void, TimestampObject> pipeline = new Pipeline<Void, TimestampObject>(); + pipeline.setFirstStage(objectProducer); + pipeline.setLastStage(this.distributor); + + UnorderedGrowablePipe.connect(objectProducer.getOutputPort(), this.distributor.getInputPort()); + + final Runnable runnable = new Runnable() { + @Override + public void run() { + pipeline.onStart(); + do { + pipeline.executeWithPorts(); + } while (pipeline.isReschedulable()); + // System.out.println("buildProducerPipeline finished"); + } + }; + + return runnable; + } + + /** + * @param numNoopFilters + * @since 1.10 + */ + private Runnable buildPipeline(final Distributor<TimestampObject> distributor, final List<TimestampObject> timestampObjects) { + Relay<TimestampObject> relay = new Relay<TimestampObject>(); + @SuppressWarnings("unchecked") + final NoopFilter<TimestampObject>[] noopFilters = new NoopFilter[this.numNoopFilters]; + // create stages + final StartTimestampFilter startTimestampFilter = new StartTimestampFilter(); + for (int i = 0; i < noopFilters.length; i++) { + noopFilters[i] = new NoopFilter<TimestampObject>(); + } + final StopTimestampFilter stopTimestampFilter = new StopTimestampFilter(); + final CollectorSink<TimestampObject> collectorSink = new CollectorSink<TimestampObject>(timestampObjects); + + final Pipeline<TimestampObject, Object> pipeline = new Pipeline<TimestampObject, Object>(); + pipeline.setFirstStage(relay); + pipeline.addIntermediateStage(startTimestampFilter); + pipeline.addIntermediateStages(noopFilters); + pipeline.addIntermediateStage(stopTimestampFilter); + pipeline.setLastStage(collectorSink); + + OrderedGrowableArrayPipe.connect(distributor.getNewOutputPort(), relay.getInputPort()); + + UnorderedGrowablePipe.connect(relay.getOutputPort(), startTimestampFilter.getInputPort()); + + UnorderedGrowablePipe.connect(startTimestampFilter.getOutputPort(), noopFilters[0].getInputPort()); + for (int i = 0; i < noopFilters.length - 1; i++) { + UnorderedGrowablePipe.connect(noopFilters[i].getOutputPort(), noopFilters[i + 1].getInputPort()); + } + UnorderedGrowablePipe.connect(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort()); + UnorderedGrowablePipe.connect(stopTimestampFilter.getOutputPort(), collectorSink.getInputPort()); + + final Runnable runnable = new Runnable() { + @Override + public void run() { + pipeline.onStart(); + do { + pipeline.executeWithPorts(); + } while (pipeline.isReschedulable()); + // System.out.println("buildPipeline finished"); + } + }; + + return runnable; + } + + @Override + public void start() { + super.start(); + + for (Thread workerThread : this.workerThreads) { + workerThread.start(); + } + + try { + for (Thread workerThread : this.workerThreads) { + workerThread.join(); + } + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + + public void setInput(final int numInputObjects, final Callable<TimestampObject> inputObjectCreator) { + this.numInputObjects = numInputObjects; + this.inputObjectCreator = inputObjectCreator; + } + + public int getNumNoopFilters() { + return this.numNoopFilters; + } + + public void setNumNoopFilters(final int numNoopFilters) { + this.numNoopFilters = numNoopFilters; + } + + public List<List<TimestampObject>> getTimestampObjectsList() { + return this.timestampObjectsList; + } + +} diff --git a/src/test/java/teetime/examples/throughput/methodcall/OrderedGrowableArrayPipe.java b/src/test/java/teetime/examples/throughput/methodcall/OrderedGrowableArrayPipe.java new file mode 100644 index 0000000000000000000000000000000000000000..e23a519d7f0109e1095eb0e7ab8dd8c01f256d2c --- /dev/null +++ b/src/test/java/teetime/examples/throughput/methodcall/OrderedGrowableArrayPipe.java @@ -0,0 +1,54 @@ +package teetime.examples.throughput.methodcall; + +import teetime.util.concurrent.workstealing.CircularArray; + +public class OrderedGrowableArrayPipe<T> implements IPipe<T> { + + private CircularArray<T> elements; + private int head; + private int tail; + + public OrderedGrowableArrayPipe() { + this(17); + } + + public OrderedGrowableArrayPipe(final int initialCapacity) { + this.elements = new CircularArray<T>(initialCapacity); + } + + public static <T> void connect(final OutputPort<T> sourcePort, final InputPort<T> targetPort) { + IPipe<T> pipe = new OrderedGrowableArrayPipe<T>(); + sourcePort.pipe = pipe; + targetPort.pipe = pipe; + } + + @Override + public void add(final T element) { + this.elements.put(this.tail++, element); + } + + @Override + public T removeLast() { + if (this.head < this.tail) { + return this.elements.get(this.head++); + } else { + return null; + } + } + + @Override + public boolean isEmpty() { + return this.size() == 0; + } + + @Override + public T readLast() { + return this.elements.get(this.head); + } + + @Override + public int size() { + return this.tail - this.head; + } + +} diff --git a/src/test/java/teetime/examples/throughput/methodcall/OrderedGrowablePipe.java b/src/test/java/teetime/examples/throughput/methodcall/OrderedGrowablePipe.java new file mode 100644 index 0000000000000000000000000000000000000000..8cdc34f76fb1469c190fc04952671d22d2667fd1 --- /dev/null +++ b/src/test/java/teetime/examples/throughput/methodcall/OrderedGrowablePipe.java @@ -0,0 +1,48 @@ +package teetime.examples.throughput.methodcall; + +import java.util.LinkedList; + +public class OrderedGrowablePipe<T> implements IPipe<T> { + + private LinkedList<T> elements; + + public OrderedGrowablePipe() { + this(100000); + } + + public OrderedGrowablePipe(final int initialCapacity) { + this.elements = new LinkedList<T>(); + } + + public static <T> void connect(final OutputPort<T> sourcePort, final InputPort<T> targetPort) { + IPipe<T> pipe = new OrderedGrowablePipe<T>(); + sourcePort.pipe = pipe; + targetPort.pipe = pipe; + } + + @Override + public void add(final T element) { + this.elements.add(element); + } + + @Override + public T removeLast() { + return this.elements.removeFirst(); + } + + @Override + public boolean isEmpty() { + return this.elements.isEmpty(); + } + + @Override + public T readLast() { + return this.elements.getFirst(); + } + + @Override + public int size() { + return this.elements.size(); + } + +} diff --git a/src/test/java/teetime/examples/throughput/methodcall/SpScPipe.java b/src/test/java/teetime/examples/throughput/methodcall/SpScPipe.java index f499a2373ac41cffea73a56f935f5a54db1505c8..37710721d29f3d4c339ca455669f4101068621ca 100644 --- a/src/test/java/teetime/examples/throughput/methodcall/SpScPipe.java +++ b/src/test/java/teetime/examples/throughput/methodcall/SpScPipe.java @@ -4,7 +4,7 @@ import teetime.util.concurrent.spsc.FFBufferOrdered3; public class SpScPipe<T> implements IPipe<T> { - private final FFBufferOrdered3<T> queue = new FFBufferOrdered3<T>(4); + private final FFBufferOrdered3<T> queue = new FFBufferOrdered3<T>(100010); public static <T> void connect(final OutputPort<T> sourcePort, final InputPort<T> targetPort) { IPipe<T> pipe = new SpScPipe<T>(); diff --git a/src/test/java/teetime/examples/throughput/methodcall/Stage.java b/src/test/java/teetime/examples/throughput/methodcall/Stage.java index 06b95eb4a59f02ad6db1edb277d7b0c828328233..9f183595a15562ab3389517d25ce58c74501bf08 100644 --- a/src/test/java/teetime/examples/throughput/methodcall/Stage.java +++ b/src/test/java/teetime/examples/throughput/methodcall/Stage.java @@ -30,4 +30,6 @@ public interface Stage<I, O> { void onIsPipelineHead(); + void onStart(); + } diff --git a/src/test/java/teetime/examples/throughput/methodcall/FixedSizedPipe.java b/src/test/java/teetime/examples/throughput/methodcall/UnorderedGrowablePipe.java similarity index 92% rename from src/test/java/teetime/examples/throughput/methodcall/FixedSizedPipe.java rename to src/test/java/teetime/examples/throughput/methodcall/UnorderedGrowablePipe.java index ecec0c0d925d35ea0156140aab723ab6ef8d2bc5..383145e3e343d7e83bca2ac0b401a9706857ff0d 100644 --- a/src/test/java/teetime/examples/throughput/methodcall/FixedSizedPipe.java +++ b/src/test/java/teetime/examples/throughput/methodcall/UnorderedGrowablePipe.java @@ -1,6 +1,6 @@ package teetime.examples.throughput.methodcall; -public class FixedSizedPipe<T> implements IPipe<T> { +public class UnorderedGrowablePipe<T> implements IPipe<T> { // private static final class ArrayWrapper2<T> { // @@ -34,13 +34,13 @@ public class FixedSizedPipe<T> implements IPipe<T> { private int lastFreeIndex; @SuppressWarnings("unchecked") - public FixedSizedPipe() { + public UnorderedGrowablePipe() { this.MIN_CAPACITY = 4; this.elements = (T[]) new Object[this.MIN_CAPACITY]; } public static <T> void connect(final OutputPort<T> sourcePort, final InputPort<T> targetPort) { - IPipe<T> pipe = new FixedSizedPipe<T>(); + IPipe<T> pipe = new UnorderedGrowablePipe<T>(); sourcePort.pipe = pipe; targetPort.pipe = pipe; } @@ -80,6 +80,7 @@ public class FixedSizedPipe<T> implements IPipe<T> { private T[] grow() { int newSize = this.elements.length * 2; + // System.out.println("growing to " + newSize); return this.newArray(newSize); } diff --git a/src/test/java/teetime/examples/throughput/methodcall/stage/AbstractStage.java b/src/test/java/teetime/examples/throughput/methodcall/stage/AbstractStage.java index 14881d714ac0fad728babac07082f6f8b43c6eff..fa9fa3709c1a89fcc771e948d646d3152ab21be5 100644 --- a/src/test/java/teetime/examples/throughput/methodcall/stage/AbstractStage.java +++ b/src/test/java/teetime/examples/throughput/methodcall/stage/AbstractStage.java @@ -126,6 +126,11 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> { } } + @Override + public void onStart() { + // empty default implementation + } + @Override public Stage getParentStage() { return this.parentStage; diff --git a/src/test/java/teetime/examples/throughput/methodcall/stage/CollectorSink.java b/src/test/java/teetime/examples/throughput/methodcall/stage/CollectorSink.java index 6d844a02dec40403b5b9162d23f396b346d8e30a..2dfce1365845431b13a62a827310adf72bf8ac80 100644 --- a/src/test/java/teetime/examples/throughput/methodcall/stage/CollectorSink.java +++ b/src/test/java/teetime/examples/throughput/methodcall/stage/CollectorSink.java @@ -45,6 +45,11 @@ public class CollectorSink<T> extends ConsumerStage<T, Object> { return continueSignal; } + @Override + public void onIsPipelineHead() { + System.out.println("size: " + this.elements.size()); + } + // @Override // public void execute3() { // T element = this.getInputPort().receive(); diff --git a/src/test/java/teetime/examples/throughput/methodcall/stage/Delay.java b/src/test/java/teetime/examples/throughput/methodcall/stage/Delay.java index 870ff604bf097342a2b0a14875f01800c1368ce6..b1c590777c8b843fcf740dedd029486250f74adf 100644 --- a/src/test/java/teetime/examples/throughput/methodcall/stage/Delay.java +++ b/src/test/java/teetime/examples/throughput/methodcall/stage/Delay.java @@ -26,7 +26,8 @@ public class Delay<I> extends AbstractStage<I, I> { this.send(element); } - this.setReschedulable(this.getInputPort().pipe.size() > 0); + // this.setReschedulable(this.getInputPort().pipe.size() > 0); + this.setReschedulable(false); // System.out.println("delay: " + this.getInputPort().pipe.size()); } diff --git a/src/test/java/teetime/examples/throughput/methodcall/stage/Distributor.java b/src/test/java/teetime/examples/throughput/methodcall/stage/Distributor.java new file mode 100644 index 0000000000000000000000000000000000000000..894c734ad5b232c604a73f721d004a09931e9340 --- /dev/null +++ b/src/test/java/teetime/examples/throughput/methodcall/stage/Distributor.java @@ -0,0 +1,66 @@ +package teetime.examples.throughput.methodcall.stage; + +import java.util.ArrayList; +import java.util.List; + +import teetime.examples.throughput.methodcall.ConsumerStage; +import teetime.examples.throughput.methodcall.OutputPort; +import teetime.util.concurrent.spsc.Pow2; +import teetime.util.list.CommittableQueue; + +public class Distributor<T> extends ConsumerStage<T, T> { + + private final List<OutputPort<T>> outputPortList = new ArrayList<OutputPort<T>>(); + + private OutputPort<T>[] outputPorts; + private int nextOutputPortIndex; + + private int size; + + private int mask; + + @Override + public T execute(final Object element) { + // TODO Auto-generated method stub + return null; + } + + @Override + protected void execute4(final CommittableQueue<T> elements) { + // TODO Auto-generated method stub + + } + + @Override + protected void execute5(final T element) { + OutputPort<T> outputPort = this.outputPorts[this.nextOutputPortIndex % this.size]; + this.nextOutputPortIndex++; + outputPort.send(element); + } + + @Override + public void onIsPipelineHead() { + for (OutputPort op : this.outputPorts) { + op.send(END_SIGNAL); + System.out.println("End signal sent, size: " + op.pipe.size() + ", end signal:" + (op.pipe.readLast() == END_SIGNAL)); + } + } + + @SuppressWarnings("unchecked") + @Override + public void onStart() { + this.size = this.outputPortList.size(); + // this.mask = this.size - 1; + + int sizeInPow2 = Pow2.findNextPositivePowerOfTwo(this.size); // is not necessary so far + this.outputPorts = this.outputPortList.toArray(new OutputPort[sizeInPow2]); + System.out.println("outputPorts: " + this.outputPorts); + } + + public OutputPort<T> getNewOutputPort() { + OutputPort<T> outputPort = new OutputPort<T>(); + this.outputPortList.add(outputPort); + return outputPort; + } + +} diff --git a/src/test/java/teetime/examples/throughput/methodcall/stage/EndStage.java b/src/test/java/teetime/examples/throughput/methodcall/stage/EndStage.java index 7fe7b31705716732ee27d6bb3440ca4bc10b6708..9004c1c33afe317efed263bcde60158d5650d381 100644 --- a/src/test/java/teetime/examples/throughput/methodcall/stage/EndStage.java +++ b/src/test/java/teetime/examples/throughput/methodcall/stage/EndStage.java @@ -97,4 +97,10 @@ public class EndStage<T> implements StageWithPort<T, T> { } + @Override + public void onStart() { + // TODO Auto-generated method stub + + } + } diff --git a/src/test/java/teetime/examples/throughput/methodcall/stage/Pipeline.java b/src/test/java/teetime/examples/throughput/methodcall/stage/Pipeline.java index 87926f5698e4a9251eed0f8f99005646e071696a..884c39d9de026d78ab77c42601f05389c44b6517 100644 --- a/src/test/java/teetime/examples/throughput/methodcall/stage/Pipeline.java +++ b/src/test/java/teetime/examples/throughput/methodcall/stage/Pipeline.java @@ -99,6 +99,7 @@ public class Pipeline<I, O> implements StageWithPort<I, O>, OnDisableListener { return this.stages[0].executeRecursively(element); } + @Override public void onStart() { // Pipe pipe = new Pipe(); // this.outputPort.pipe = pipe; @@ -141,6 +142,10 @@ public class Pipeline<I, O> implements StageWithPort<I, O>, OnDisableListener { stage.setSuccessor(this.stages[i + 1]); } this.stages[this.stages.length - 1].setSuccessor(new EndStage<Object>()); + + for (StageWithPort<?, ?> stage : this.stages) { + stage.onStart(); + } } // diff --git a/src/test/java/teetime/examples/throughput/methodcall/stage/Relay.java b/src/test/java/teetime/examples/throughput/methodcall/stage/Relay.java new file mode 100644 index 0000000000000000000000000000000000000000..8cff7774c43a9789195492aa5084c258af0876c3 --- /dev/null +++ b/src/test/java/teetime/examples/throughput/methodcall/stage/Relay.java @@ -0,0 +1,48 @@ +package teetime.examples.throughput.methodcall.stage; + +import teetime.util.list.CommittableQueue; + +public class Relay<T> extends AbstractStage<T, T> { + + public Relay() { + this.setReschedulable(true); + } + + @Override + public void executeWithPorts() { + T element = this.getInputPort().receive(); + if (null == element) { + return; + } else if (END_SIGNAL == element) { + this.setReschedulable(false); + System.out.println("got end signal; pipe.size: " + this.getInputPort().pipe.size()); + return; + } + + this.send(element); + } + + @Override + public T execute(final Object element) { + // TODO Auto-generated method stub + return null; + } + + @Override + public void onIsPipelineHead() { + // do nothing + } + + @Override + protected void execute4(final CommittableQueue<T> elements) { + // TODO Auto-generated method stub + + } + + @Override + protected void execute5(final T element) { + // TODO Auto-generated method stub + + } + +}