diff --git a/results/overhead-findings.txt b/results/overhead-findings.txt index a0da29fd0e633fc95009a70b155613ac70acb519..8964457ec62b716e6c3b9cb3e767ebfb467ba1c0 100644 --- a/results/overhead-findings.txt +++ b/results/overhead-findings.txt @@ -24,6 +24,8 @@ 11: 8600 ns (executeWithPorts: fixed sized pipe; with CircularArray(int)) 11: 8200 ns (executeWithPorts: fixed sized pipe; with CircularArray(int) w/o mask) 11: 7800 ns (executeWithPorts: fixed sized pipe; with setReschedulable() after each read) + 11: 8200 ns (executeWithPorts: fixed sized pipe; with setReschedulable() after each read; non-final elements) + 11: 7000 ns (executeWithPorts: fixed sized pipe; with setReschedulable() after each read; non-final elements; pipeline searches for firstStageIndex) 12: 3300 ns (recursive; argument/return w/o pipe) 13: 3300 ns (recursive; argument/return w/o pipe; w/o pipeline class) 14: 21,000 ns (spsc pipe) diff --git a/src/test/java/teetime/examples/throughput/MethodCallThoughputTimestampAnalysis15Test.java b/src/test/java/teetime/examples/throughput/MethodCallThoughputTimestampAnalysis15Test.java new file mode 100644 index 0000000000000000000000000000000000000000..fec03aa93538cb18de1819ff265a2e199d4512ca --- /dev/null +++ b/src/test/java/teetime/examples/throughput/MethodCallThoughputTimestampAnalysis15Test.java @@ -0,0 +1,74 @@ +/*************************************************************************** + * Copyright 2014 Kieker Project (http://kieker-monitoring.net) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ***************************************************************************/ +package teetime.examples.throughput; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; + +import org.junit.Before; +import org.junit.Test; + +import teetime.examples.throughput.methodcall.MethodCallThroughputAnalysis15; +import teetime.util.StatisticsUtil; +import teetime.util.StopWatch; + +import kieker.common.logging.LogFactory; + +/** + * @author Christian Wulf + * + * @since 1.10 + */ +public class MethodCallThoughputTimestampAnalysis15Test { + + private static final int NUM_OBJECTS_TO_CREATE = 100000; + private static final int NUM_NOOP_FILTERS = 800; + + @Before + public void before() { + System.setProperty(LogFactory.CUSTOM_LOGGER_JVM, "NONE"); + } + + @Test + public void testWithManyObjects() { + System.out.println("Testing teetime (mc) with NUM_OBJECTS_TO_CREATE=" + NUM_OBJECTS_TO_CREATE + ", NUM_NOOP_FILTERS=" + + NUM_NOOP_FILTERS + "..."); + final StopWatch stopWatch = new StopWatch(); + final List<TimestampObject> timestampObjects = new ArrayList<TimestampObject>(NUM_OBJECTS_TO_CREATE); + + final MethodCallThroughputAnalysis15 analysis = new MethodCallThroughputAnalysis15(); + analysis.setNumNoopFilters(NUM_NOOP_FILTERS); + analysis.setTimestampObjects(timestampObjects); + analysis.setInput(NUM_OBJECTS_TO_CREATE, new Callable<TimestampObject>() { + @Override + public TimestampObject call() throws Exception { + return new TimestampObject(); + } + }); + analysis.init(); + + stopWatch.start(); + try { + analysis.start(); + } finally { + stopWatch.end(); + } + + StatisticsUtil.printStatistics(stopWatch.getDurationInNs(), timestampObjects); + } + +} diff --git a/src/test/java/teetime/examples/throughput/methodcall/ConsumerStage.java b/src/test/java/teetime/examples/throughput/methodcall/ConsumerStage.java index 6f5f00edd85a5dd030618eb22934f6e11540c020..710b6afef21da63dc3f4f6ef2ec639243180bfd8 100644 --- a/src/test/java/teetime/examples/throughput/methodcall/ConsumerStage.java +++ b/src/test/java/teetime/examples/throughput/methodcall/ConsumerStage.java @@ -1,5 +1,6 @@ package teetime.examples.throughput.methodcall; +import teetime.examples.throughput.methodcall.stage.AbstractStage; import teetime.util.list.CommittableQueue; public abstract class ConsumerStage<I, O> extends AbstractStage<I, O> { @@ -33,4 +34,9 @@ public abstract class ConsumerStage<I, O> extends AbstractStage<I, O> { // } } + @Override + public void onIsPipelineHead() { + // do nothing + } + } diff --git a/src/test/java/teetime/examples/throughput/methodcall/FixedSizedPipe.java b/src/test/java/teetime/examples/throughput/methodcall/FixedSizedPipe.java index f1288355cdfc3723719940985930a25380823429..ecec0c0d925d35ea0156140aab723ab6ef8d2bc5 100644 --- a/src/test/java/teetime/examples/throughput/methodcall/FixedSizedPipe.java +++ b/src/test/java/teetime/examples/throughput/methodcall/FixedSizedPipe.java @@ -27,12 +27,18 @@ public class FixedSizedPipe<T> implements IPipe<T> { // } // // } + private final int MIN_CAPACITY; - @SuppressWarnings("unchecked") - private final T[] elements = (T[]) new Object[4]; + private T[] elements; // private final ArrayWrapper2<T> elements = new ArrayWrapper2<T>(2); private int lastFreeIndex; + @SuppressWarnings("unchecked") + public FixedSizedPipe() { + this.MIN_CAPACITY = 4; + this.elements = (T[]) new Object[this.MIN_CAPACITY]; + } + public static <T> void connect(final OutputPort<T> sourcePort, final InputPort<T> targetPort) { IPipe<T> pipe = new FixedSizedPipe<T>(); sourcePort.pipe = pipe; @@ -43,13 +49,7 @@ public class FixedSizedPipe<T> implements IPipe<T> { public void add(final T element) { if (this.lastFreeIndex == this.elements.length) { // if (this.lastFreeIndex == this.elements.getCapacity()) { - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - // grow + this.elements = this.grow(); } this.elements[this.lastFreeIndex++] = element; // this.elements.put(this.lastFreeIndex++, element); @@ -57,8 +57,9 @@ public class FixedSizedPipe<T> implements IPipe<T> { @Override public T removeLast() { - return this.elements[--this.lastFreeIndex]; - // return this.elements.get(--this.lastFreeIndex); + T element = this.elements[--this.lastFreeIndex]; + // T element = this.elements.get(--this.lastFreeIndex); + return element; } @Override @@ -77,4 +78,24 @@ public class FixedSizedPipe<T> implements IPipe<T> { return this.lastFreeIndex; } + private T[] grow() { + int newSize = this.elements.length * 2; + return this.newArray(newSize); + } + + // we do not support shrink since it causes too much overhead due to the capacity checks + // private T[] shrink() { + // int newSize = this.elements.length / 2; + // return this.newArray(newSize); + // } + + private T[] newArray(final int newSize) { + @SuppressWarnings("unchecked") + T[] newElements = (T[]) new Object[newSize]; + + System.arraycopy(this.elements, 0, newElements, 0, this.elements.length); + + return newElements; + } + } diff --git a/src/test/java/teetime/examples/throughput/methodcall/MethodCallThroughputAnalysis13.java b/src/test/java/teetime/examples/throughput/methodcall/MethodCallThroughputAnalysis13.java index 1fb71dbf819385998ae6f47e0866584e976b9c3b..dd1e6d566094db587eeda3266a39a99503854655 100644 --- a/src/test/java/teetime/examples/throughput/methodcall/MethodCallThroughputAnalysis13.java +++ b/src/test/java/teetime/examples/throughput/methodcall/MethodCallThroughputAnalysis13.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.concurrent.Callable; import teetime.examples.throughput.TimestampObject; +import teetime.examples.throughput.methodcall.stage.AbstractStage; import teetime.examples.throughput.methodcall.stage.CollectorSink; import teetime.examples.throughput.methodcall.stage.EndStage; import teetime.examples.throughput.methodcall.stage.NoopFilter; diff --git a/src/test/java/teetime/examples/throughput/methodcall/MethodCallThroughputAnalysis15.java b/src/test/java/teetime/examples/throughput/methodcall/MethodCallThroughputAnalysis15.java new file mode 100644 index 0000000000000000000000000000000000000000..97604c13e984db4d11d2ba8e854846392eec1cef --- /dev/null +++ b/src/test/java/teetime/examples/throughput/methodcall/MethodCallThroughputAnalysis15.java @@ -0,0 +1,166 @@ +/*************************************************************************** + * Copyright 2014 Kieker Project (http://kieker-monitoring.net) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ***************************************************************************/ +package teetime.examples.throughput.methodcall; + +import java.util.List; +import java.util.concurrent.Callable; + +import teetime.examples.throughput.TimestampObject; +import teetime.examples.throughput.methodcall.stage.Clock; +import teetime.examples.throughput.methodcall.stage.CollectorSink; +import teetime.examples.throughput.methodcall.stage.Delay; +import teetime.examples.throughput.methodcall.stage.EndStage; +import teetime.examples.throughput.methodcall.stage.NoopFilter; +import teetime.examples.throughput.methodcall.stage.ObjectProducer; +import teetime.examples.throughput.methodcall.stage.Pipeline; +import teetime.examples.throughput.methodcall.stage.StartTimestampFilter; +import teetime.examples.throughput.methodcall.stage.StopTimestampFilter; +import teetime.framework.core.Analysis; + +/** + * @author Christian Wulf + * + * @since 1.10 + */ +public class MethodCallThroughputAnalysis15 extends Analysis { + + private long numInputObjects; + private Callable<TimestampObject> inputObjectCreator; + private int numNoopFilters; + private List<TimestampObject> timestampObjects; + + private Runnable clockRunnable; + private Runnable runnable; + private Clock clock; + + @Override + public void init() { + super.init(); + + this.clockRunnable = this.buildClockPipeline(); + this.runnable = this.buildPipeline(this.clock); + } + + private Runnable buildClockPipeline() { + this.clock = new Clock(); + + this.clock.setInitialDelayInMs(100); + this.clock.setIntervalDelayInMs(100); + + final Pipeline<Void, Long> pipeline = new Pipeline<Void, Long>(); + pipeline.setFirstStage(this.clock); + pipeline.setLastStage(new EndStage<Long>()); + + pipeline.onStart(); + + final Runnable runnable = new Runnable() { + @Override + public void run() { + do { + pipeline.executeWithPorts(); + } while (pipeline.isReschedulable()); + } + }; + + return runnable; + } + + /** + * @param numNoopFilters + * @since 1.10 + */ + private Runnable buildPipeline(final Clock clock) { + @SuppressWarnings("unchecked") + final NoopFilter<TimestampObject>[] noopFilters = new NoopFilter[this.numNoopFilters]; + // create stages + final ObjectProducer<TimestampObject> objectProducer = new ObjectProducer<TimestampObject>(this.numInputObjects, this.inputObjectCreator); + Delay<TimestampObject> delay = new Delay<TimestampObject>(); + final StartTimestampFilter startTimestampFilter = new StartTimestampFilter(); + for (int i = 0; i < noopFilters.length; i++) { + noopFilters[i] = new NoopFilter<TimestampObject>(); + } + final StopTimestampFilter stopTimestampFilter = new StopTimestampFilter(); + final CollectorSink<TimestampObject> collectorSink = new CollectorSink<TimestampObject>(this.timestampObjects); + + final Pipeline<Void, Object> pipeline = new Pipeline<Void, Object>(); + pipeline.setFirstStage(objectProducer); + pipeline.addIntermediateStage(startTimestampFilter); + pipeline.addIntermediateStages(noopFilters); + pipeline.addIntermediateStage(stopTimestampFilter); + pipeline.addIntermediateStage(delay); + pipeline.setLastStage(collectorSink); + + SpScPipe.connect(clock.getOutputPort(), delay.getTimestampTriggerInputPort()); + + FixedSizedPipe.connect(objectProducer.getOutputPort(), startTimestampFilter.getInputPort()); + FixedSizedPipe.connect(startTimestampFilter.getOutputPort(), noopFilters[0].getInputPort()); + for (int i = 0; i < noopFilters.length - 1; i++) { + FixedSizedPipe.connect(noopFilters[i].getOutputPort(), noopFilters[i + 1].getInputPort()); + } + FixedSizedPipe.connect(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort()); + FixedSizedPipe.connect(stopTimestampFilter.getOutputPort(), delay.getInputPort()); + + FixedSizedPipe.connect(delay.getOutputPort(), collectorSink.getInputPort()); + + pipeline.onStart(); + + // pipeline.getInputPort().pipe = new Pipe<Void>(); + // pipeline.getInputPort().pipe.add(new Object()); + + // pipeline.getOutputPort().pipe = new Pipe<Void>(); + + final Runnable runnable = new Runnable() { + @Override + public void run() { + do { + pipeline.executeWithPorts(); + } while (pipeline.getSchedulingInformation().isActive() && pipeline.isReschedulable()); + } + }; + + return runnable; + } + + @Override + public void start() { + super.start(); + Thread clockThread = new Thread(this.clockRunnable); + clockThread.start(); + this.runnable.run(); + clockThread.interrupt(); + } + + public void setInput(final int numInputObjects, final Callable<TimestampObject> inputObjectCreator) { + this.numInputObjects = numInputObjects; + this.inputObjectCreator = inputObjectCreator; + } + + public int getNumNoopFilters() { + return this.numNoopFilters; + } + + public void setNumNoopFilters(final int numNoopFilters) { + this.numNoopFilters = numNoopFilters; + } + + public List<TimestampObject> getTimestampObjects() { + return this.timestampObjects; + } + + public void setTimestampObjects(final List<TimestampObject> timestampObjects) { + this.timestampObjects = timestampObjects; + } +} diff --git a/src/test/java/teetime/examples/throughput/methodcall/MethodCallThroughputAnalysis7.java b/src/test/java/teetime/examples/throughput/methodcall/MethodCallThroughputAnalysis7.java index 7f865f5a8c16c0a8e56d2ea7346b4efdf1d09fba..ac22b177bf70b028a73b386b8b603877eb2cac1d 100644 --- a/src/test/java/teetime/examples/throughput/methodcall/MethodCallThroughputAnalysis7.java +++ b/src/test/java/teetime/examples/throughput/methodcall/MethodCallThroughputAnalysis7.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.concurrent.Callable; import teetime.examples.throughput.TimestampObject; +import teetime.examples.throughput.methodcall.stage.AbstractStage; import teetime.examples.throughput.methodcall.stage.CollectorSink; import teetime.examples.throughput.methodcall.stage.NoopFilter; import teetime.examples.throughput.methodcall.stage.ObjectProducer; diff --git a/src/test/java/teetime/examples/throughput/methodcall/MethodCallThroughputAnalysis8.java b/src/test/java/teetime/examples/throughput/methodcall/MethodCallThroughputAnalysis8.java index 7f458284cade9b127f02209f2c3c2f76d9164e1a..5de987ca64fabbee7da70d30426b5b9980158a16 100644 --- a/src/test/java/teetime/examples/throughput/methodcall/MethodCallThroughputAnalysis8.java +++ b/src/test/java/teetime/examples/throughput/methodcall/MethodCallThroughputAnalysis8.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.concurrent.Callable; import teetime.examples.throughput.TimestampObject; +import teetime.examples.throughput.methodcall.stage.AbstractStage; import teetime.examples.throughput.methodcall.stage.CollectorSink; import teetime.examples.throughput.methodcall.stage.NoopFilter; import teetime.examples.throughput.methodcall.stage.ObjectProducer; diff --git a/src/test/java/teetime/examples/throughput/methodcall/ProducerStage.java b/src/test/java/teetime/examples/throughput/methodcall/ProducerStage.java index 8ce842844cdf1a8406dfb0cc7c037dd09f5e2654..27faf320bd5f750f337ecd56f15533ca60ab7dd1 100644 --- a/src/test/java/teetime/examples/throughput/methodcall/ProducerStage.java +++ b/src/test/java/teetime/examples/throughput/methodcall/ProducerStage.java @@ -1,5 +1,6 @@ package teetime.examples.throughput.methodcall; +import teetime.examples.throughput.methodcall.stage.AbstractStage; import teetime.util.list.CommittableQueue; public abstract class ProducerStage<I, O> extends AbstractStage<I, O> { @@ -29,4 +30,9 @@ public abstract class ProducerStage<I, O> extends AbstractStage<I, O> { // } } + @Override + public void onIsPipelineHead() { + // do nothing + } + } diff --git a/src/test/java/teetime/examples/throughput/methodcall/Stage.java b/src/test/java/teetime/examples/throughput/methodcall/Stage.java index 7ed18326641824d23cdb75e8714b47e04bb9a8cb..06b95eb4a59f02ad6db1edb277d7b0c828328233 100644 --- a/src/test/java/teetime/examples/throughput/methodcall/Stage.java +++ b/src/test/java/teetime/examples/throughput/methodcall/Stage.java @@ -28,4 +28,6 @@ public interface Stage<I, O> { boolean isReschedulable(); + void onIsPipelineHead(); + } diff --git a/src/test/java/teetime/examples/throughput/methodcall/AbstractStage.java b/src/test/java/teetime/examples/throughput/methodcall/stage/AbstractStage.java similarity index 87% rename from src/test/java/teetime/examples/throughput/methodcall/AbstractStage.java rename to src/test/java/teetime/examples/throughput/methodcall/stage/AbstractStage.java index 42b55b262c7c33fde360ead73a86c81543129c4e..14881d714ac0fad728babac07082f6f8b43c6eff 100644 --- a/src/test/java/teetime/examples/throughput/methodcall/AbstractStage.java +++ b/src/test/java/teetime/examples/throughput/methodcall/stage/AbstractStage.java @@ -1,9 +1,15 @@ -package teetime.examples.throughput.methodcall; - +package teetime.examples.throughput.methodcall.stage; + +import teetime.examples.throughput.methodcall.InputPort; +import teetime.examples.throughput.methodcall.OnDisableListener; +import teetime.examples.throughput.methodcall.OutputPort; +import teetime.examples.throughput.methodcall.SchedulingInformation; +import teetime.examples.throughput.methodcall.Stage; +import teetime.examples.throughput.methodcall.StageWithPort; import teetime.util.list.CommittableQueue; import teetime.util.list.CommittableResizableArrayQueue; -abstract class AbstractStage<I, O> implements StageWithPort<I, O> { +public abstract class AbstractStage<I, O> implements StageWithPort<I, O> { private final InputPort<I> inputPort = new InputPort<I>(); private final OutputPort<O> outputPort = new OutputPort<O>(); @@ -98,6 +104,7 @@ abstract class AbstractStage<I, O> implements StageWithPort<I, O> { // execute = this.next().execute2(this.outputElements); // execute = this.next().execute2(this.getOutputPort().pipe.getElements()); this.next().executeWithPorts(); + // System.out.println("Executed " + this.next().getClass().getSimpleName()); } while (this.next().isReschedulable()); // } while (this.next().getInputPort().pipe.size() > 0); // } while (execute.size() > 0); diff --git a/src/test/java/teetime/examples/throughput/methodcall/stage/Clock.java b/src/test/java/teetime/examples/throughput/methodcall/stage/Clock.java new file mode 100644 index 0000000000000000000000000000000000000000..0381e75851140f097d3c71f9c07db6e20d5c91d9 --- /dev/null +++ b/src/test/java/teetime/examples/throughput/methodcall/stage/Clock.java @@ -0,0 +1,66 @@ +package teetime.examples.throughput.methodcall.stage; + +import teetime.examples.throughput.methodcall.ProducerStage; +import teetime.util.list.CommittableQueue; + +public class Clock extends ProducerStage<Void, Long> { + + private boolean initialDelayExceeded = false; + + private long initialDelayInMs; + private long intervalDelayInMs; + + @Override + public Long execute(final Object element) { + // TODO Auto-generated method stub + return null; + } + + @Override + protected void execute4(final CommittableQueue<Void> elements) { + // TODO Auto-generated method stub + + } + + @Override + protected void execute5(final Void element) { + if (!this.initialDelayExceeded) { + this.initialDelayExceeded = true; + this.sleep(this.initialDelayInMs); + } else { + this.sleep(this.intervalDelayInMs); + } + + // System.out.println("Emitting timestamp"); + this.send(this.getCurrentTimeInNs()); + } + + private void sleep(final long delayInMs) { + try { + Thread.sleep(delayInMs); + } catch (InterruptedException e) { + this.setReschedulable(false); + } + } + + private long getCurrentTimeInNs() { + return System.nanoTime(); + } + + public long getInitialDelayInMs() { + return this.initialDelayInMs; + } + + public void setInitialDelayInMs(final long initialDelayInMs) { + this.initialDelayInMs = initialDelayInMs; + } + + public long getIntervalDelayInMs() { + return this.intervalDelayInMs; + } + + public void setIntervalDelayInMs(final long intervalDelayInMs) { + this.intervalDelayInMs = intervalDelayInMs; + } + +} diff --git a/src/test/java/teetime/examples/throughput/methodcall/stage/CollectorSink.java b/src/test/java/teetime/examples/throughput/methodcall/stage/CollectorSink.java index 95963d8c7e9aae3a340f7319d64231b1d7b8aef0..6d844a02dec40403b5b9162d23f396b346d8e30a 100644 --- a/src/test/java/teetime/examples/throughput/methodcall/stage/CollectorSink.java +++ b/src/test/java/teetime/examples/throughput/methodcall/stage/CollectorSink.java @@ -67,6 +67,10 @@ public class CollectorSink<T> extends ConsumerStage<T, Object> { if ((this.elements.size() % THRESHOLD) == 0) { System.out.println("size: " + this.elements.size()); } + + if (this.elements.size() > 90000) { + // System.out.println("size > 90000: " + this.elements.size()); + } } } diff --git a/src/test/java/teetime/examples/throughput/methodcall/stage/Delay.java b/src/test/java/teetime/examples/throughput/methodcall/stage/Delay.java new file mode 100644 index 0000000000000000000000000000000000000000..870ff604bf097342a2b0a14875f01800c1368ce6 --- /dev/null +++ b/src/test/java/teetime/examples/throughput/methodcall/stage/Delay.java @@ -0,0 +1,60 @@ +package teetime.examples.throughput.methodcall.stage; + +import teetime.examples.throughput.methodcall.InputPort; +import teetime.util.list.CommittableQueue; + +public class Delay<I> extends AbstractStage<I, I> { + + private final InputPort<Long> timestampTriggerInputPort = new InputPort<Long>(); + + public Delay() { + // this.setReschedulable(true); + } + + @Override + public void executeWithPorts() { + Long timestampTrigger = this.timestampTriggerInputPort.receive(); + if (null == timestampTrigger) { + return; + } + // System.out.println("got timestamp; #elements: " + this.getInputPort().pipe.size()); + + // System.out.println("#elements: " + this.getInputPort().pipe.size()); + // TODO implement receiveAll() and sendMultiple() + while (!this.getInputPort().pipe.isEmpty()) { + I element = this.getInputPort().receive(); + this.send(element); + } + + this.setReschedulable(this.getInputPort().pipe.size() > 0); + // System.out.println("delay: " + this.getInputPort().pipe.size()); + } + + @Override + public void onIsPipelineHead() { + this.setReschedulable(true); + } + + @Override + public I execute(final Object element) { + // TODO Auto-generated method stub + return null; + } + + @Override + protected void execute4(final CommittableQueue<I> elements) { + // TODO Auto-generated method stub + + } + + @Override + protected void execute5(final I element) { + // TODO Auto-generated method stub + + } + + public InputPort<Long> getTimestampTriggerInputPort() { + return this.timestampTriggerInputPort; + } + +} diff --git a/src/test/java/teetime/examples/throughput/methodcall/stage/EndStage.java b/src/test/java/teetime/examples/throughput/methodcall/stage/EndStage.java index 6f960eb892daef0ab051f972bbd6529f2d291fe4..7fe7b31705716732ee27d6bb3440ca4bc10b6708 100644 --- a/src/test/java/teetime/examples/throughput/methodcall/stage/EndStage.java +++ b/src/test/java/teetime/examples/throughput/methodcall/stage/EndStage.java @@ -20,6 +20,11 @@ public class EndStage<T> implements StageWithPort<T, T> { return (T) element; } + @Override + public void onIsPipelineHead() { + // do nothing + } + @Override public CommittableQueue<T> execute2(final CommittableQueue<T> elements) { // TODO Auto-generated method stub diff --git a/src/test/java/teetime/examples/throughput/methodcall/stage/ObjectProducer.java b/src/test/java/teetime/examples/throughput/methodcall/stage/ObjectProducer.java index d6093bbe0806d26ceb1614f0ae244908be8ae0f1..c31be8165a9f528e9caa337c31de8f66d88afaa0 100644 --- a/src/test/java/teetime/examples/throughput/methodcall/stage/ObjectProducer.java +++ b/src/test/java/teetime/examples/throughput/methodcall/stage/ObjectProducer.java @@ -103,7 +103,7 @@ public class ObjectProducer<T> extends ProducerStage<Void, T> { try { final T newObject = this.inputObjectCreator.call(); this.numInputObjects--; - + // System.out.println(this.getClass().getSimpleName() + ": sending " + this.numInputObjects); this.send(newObject); } catch (final Exception e) { throw new IllegalStateException(e); diff --git a/src/test/java/teetime/examples/throughput/methodcall/stage/Pipeline.java b/src/test/java/teetime/examples/throughput/methodcall/stage/Pipeline.java index 4bb2d03a70d13eea3dc124367601895ba7ac1b2b..87926f5698e4a9251eed0f8f99005646e071696a 100644 --- a/src/test/java/teetime/examples/throughput/methodcall/stage/Pipeline.java +++ b/src/test/java/teetime/examples/throughput/methodcall/stage/Pipeline.java @@ -27,6 +27,7 @@ public class Pipeline<I, O> implements StageWithPort<I, O>, OnDisableListener { private OnDisableListener listener; private boolean reschedulable; + private int firstStageIndex; public void setFirstStage(final StageWithPort<I, ?> stage) { this.firstStage = stage; @@ -67,9 +68,30 @@ public class Pipeline<I, O> implements StageWithPort<I, O>, OnDisableListener { @Override public void executeWithPorts() { - this.stages[0].executeWithPorts(); + StageWithPort firstStage = this.stages[this.firstStageIndex]; + firstStage.executeWithPorts(); + + this.updateRescheduable(firstStage); + // this.setReschedulable(stage.isReschedulable()); + } + + private final void updateRescheduable(Stage stage) { + while (!stage.isReschedulable()) { + this.firstStageIndex++; + stage = stage.next(); + if (stage == null) { // loop reaches the last stage + this.setReschedulable(false); + return; + } + stage.onIsPipelineHead(); + // System.out.println("firstStageIndex: " + this.firstStageIndex + ", class:" + stage.getClass().getSimpleName()); + } + this.setReschedulable(true); + } - this.setReschedulable(this.stages[0].isReschedulable()); + @Override + public void onIsPipelineHead() { + // do nothing } @Override