diff --git a/results/overhead-findings.txt b/results/overhead-findings.txt index 8964457ec62b716e6c3b9cb3e767ebfb467ba1c0..4b9897df222e410155c7a7c3ac1f1f93c2399edd 100644 --- a/results/overhead-findings.txt +++ b/results/overhead-findings.txt @@ -10,6 +10,7 @@ -foreach vs. index-based iteration -iterative vs. recursive execution -null-check vs. NullObject +-AbstractPipe vs. IPipe - @@ -29,3 +30,5 @@ 12: 3300 ns (recursive; argument/return w/o pipe) 13: 3300 ns (recursive; argument/return w/o pipe; w/o pipeline class) 14: 21,000 ns (spsc pipe) +16: 14,500 ns (with distributor thread) +17: 9800 ns (as 16, but with direct feeding of SpScPipe) diff --git a/src/main/java/teetime/framework/core/Analysis.java b/src/main/java/teetime/framework/core/Analysis.java index 4fc4782a79028cffdfa89c075b7182bc515329e4..a5726fab4d6cb4caebbc4e0e15d1d226200deed5 100644 --- a/src/main/java/teetime/framework/core/Analysis.java +++ b/src/main/java/teetime/framework/core/Analysis.java @@ -28,10 +28,10 @@ public class Analysis { } public void start() { - + // System.out.println("Analysis started."); } public void onTerminate() { - + // System.out.println("Analysis stopped."); } } diff --git a/src/test/java/teetime/examples/throughput/MethodCallThoughputTimestampAnalysis17Test.java b/src/test/java/teetime/examples/throughput/MethodCallThoughputTimestampAnalysis17Test.java new file mode 100644 index 0000000000000000000000000000000000000000..3fb6736d721a6b8f58d9adba588564d1115fdd13 --- /dev/null +++ b/src/test/java/teetime/examples/throughput/MethodCallThoughputTimestampAnalysis17Test.java @@ -0,0 +1,78 @@ +/*************************************************************************** + * Copyright 2014 Kieker Project (http://kieker-monitoring.net) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ***************************************************************************/ +package teetime.examples.throughput; + +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Callable; + +import org.junit.Before; +import org.junit.Test; + +import teetime.examples.throughput.methodcall.MethodCallThroughputAnalysis17; +import teetime.util.StatisticsUtil; +import teetime.util.StopWatch; + +import kieker.common.logging.LogFactory; + +/** + * @author Christian Wulf + * + * @since 1.10 + */ +public class MethodCallThoughputTimestampAnalysis17Test { + + private static final int NUM_OBJECTS_TO_CREATE = 100000; + private static final int NUM_NOOP_FILTERS = 800; + + @Before + public void before() { + System.setProperty(LogFactory.CUSTOM_LOGGER_JVM, "NONE"); + } + + @Test + public void testWithManyObjects() { + System.out.println("Testing teetime (mc) with NUM_OBJECTS_TO_CREATE=" + NUM_OBJECTS_TO_CREATE + ", NUM_NOOP_FILTERS=" + + NUM_NOOP_FILTERS + "..."); + final StopWatch stopWatch = new StopWatch(); + + final MethodCallThroughputAnalysis17 analysis = new MethodCallThroughputAnalysis17(); + analysis.setNumNoopFilters(NUM_NOOP_FILTERS); + analysis.setInput(NUM_OBJECTS_TO_CREATE, new Callable<TimestampObject>() { + @Override + public TimestampObject call() throws Exception { + return new TimestampObject(); + } + }); + analysis.init(); + + stopWatch.start(); + try { + analysis.start(); + } finally { + stopWatch.end(); + analysis.onTerminate(); + } + + // merge + List<TimestampObject> timestampObjects = analysis.getTimestampObjectsList().get(0); + for (int i = 1; i < analysis.getTimestampObjectsList().size(); i++) { + Collection<? extends TimestampObject> timestampObjectList = analysis.getTimestampObjectsList().get(i); + timestampObjects.addAll(timestampObjectList); + } + StatisticsUtil.printStatistics(stopWatch.getDurationInNs(), timestampObjects); + } +} diff --git a/src/test/java/teetime/examples/throughput/methodcall/AbstractPipe.java b/src/test/java/teetime/examples/throughput/methodcall/AbstractPipe.java new file mode 100644 index 0000000000000000000000000000000000000000..e0a02f44786562d50aaff5d92a453ab812097d7f --- /dev/null +++ b/src/test/java/teetime/examples/throughput/methodcall/AbstractPipe.java @@ -0,0 +1,19 @@ +package teetime.examples.throughput.methodcall; + +import java.util.concurrent.atomic.AtomicBoolean; + +public abstract class AbstractPipe<T> implements IPipe<T> { + + private final AtomicBoolean closed = new AtomicBoolean(); + + @Override + public boolean isClosed() { + return this.closed.get(); + } + + @Override + public void close() { + this.closed.lazySet(true); // lazySet is legal due to our single-writer requirement + } + +} diff --git a/src/test/java/teetime/examples/throughput/methodcall/IPipe.java b/src/test/java/teetime/examples/throughput/methodcall/IPipe.java index 25ec061b7b899fc86d700f8f5382f76ae7d3eb36..d382daa9f78e2ba4be2f8d04a64f16919afb29ba 100644 --- a/src/test/java/teetime/examples/throughput/methodcall/IPipe.java +++ b/src/test/java/teetime/examples/throughput/methodcall/IPipe.java @@ -12,4 +12,8 @@ public interface IPipe<T> { public abstract T readLast(); + public abstract void close(); + + public abstract boolean isClosed(); + } diff --git a/src/test/java/teetime/examples/throughput/methodcall/InputPort.java b/src/test/java/teetime/examples/throughput/methodcall/InputPort.java index b41efcbd03ca62b835597955e0a496ab35102b1c..3da7d22b107005085ebc4caa4c9fa266d2a43f20 100644 --- a/src/test/java/teetime/examples/throughput/methodcall/InputPort.java +++ b/src/test/java/teetime/examples/throughput/methodcall/InputPort.java @@ -13,4 +13,5 @@ public class InputPort<T> { T element = this.pipe.readLast(); return element; } + } diff --git a/src/test/java/teetime/examples/throughput/methodcall/MethodCallThroughputAnalysis10.java b/src/test/java/teetime/examples/throughput/methodcall/MethodCallThroughputAnalysis10.java index 8f16576d14b338aa872293dfa68f3b26ca4a5dd3..0ca0b26fcaee872240599c55b24a90bf22a4263d 100644 --- a/src/test/java/teetime/examples/throughput/methodcall/MethodCallThroughputAnalysis10.java +++ b/src/test/java/teetime/examples/throughput/methodcall/MethodCallThroughputAnalysis10.java @@ -77,23 +77,7 @@ public class MethodCallThroughputAnalysis10 extends Analysis { SingleElementPipe.connect(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort()); SingleElementPipe.connect(stopTimestampFilter.getOutputPort(), collectorSink.getInputPort()); - pipeline.onStart(); - - // pipeline.getInputPort().pipe = new Pipe<Void>(); - // pipeline.getInputPort().pipe.add(new Object()); - - // pipeline.getOutputPort().pipe = new Pipe<Void>(); - - final Runnable runnable = new Runnable() { - @Override - public void run() { - do { - pipeline.executeWithPorts(); - } while (pipeline.getSchedulingInformation().isActive() && pipeline.isReschedulable()); - } - }; - - return runnable; + return new RunnableStage(pipeline); } @Override diff --git a/src/test/java/teetime/examples/throughput/methodcall/MethodCallThroughputAnalysis16.java b/src/test/java/teetime/examples/throughput/methodcall/MethodCallThroughputAnalysis16.java index d9cade7b627e689d754312eb9f1efa0969cadf41..a0f7fd12e6c4352b4e2e34296cf150d97e77e10b 100644 --- a/src/test/java/teetime/examples/throughput/methodcall/MethodCallThroughputAnalysis16.java +++ b/src/test/java/teetime/examples/throughput/methodcall/MethodCallThroughputAnalysis16.java @@ -114,7 +114,7 @@ public class MethodCallThroughputAnalysis16 extends Analysis { pipeline.addIntermediateStage(stopTimestampFilter); pipeline.setLastStage(collectorSink); - OrderedGrowableArrayPipe.connect(distributor.getNewOutputPort(), relay.getInputPort()); + SpScPipe.connect(distributor.getNewOutputPort(), relay.getInputPort()); UnorderedGrowablePipe.connect(relay.getOutputPort(), startTimestampFilter.getInputPort()); diff --git a/src/test/java/teetime/examples/throughput/methodcall/MethodCallThroughputAnalysis17.java b/src/test/java/teetime/examples/throughput/methodcall/MethodCallThroughputAnalysis17.java new file mode 100644 index 0000000000000000000000000000000000000000..1d41cca7dd303ca9eb5e053f9e1e12ab82fdd654 --- /dev/null +++ b/src/test/java/teetime/examples/throughput/methodcall/MethodCallThroughputAnalysis17.java @@ -0,0 +1,175 @@ +/*************************************************************************** + * Copyright 2014 Kieker Project (http://kieker-monitoring.net) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ***************************************************************************/ +package teetime.examples.throughput.methodcall; + +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.Callable; + +import teetime.examples.throughput.TimestampObject; +import teetime.examples.throughput.methodcall.stage.CollectorSink; +import teetime.examples.throughput.methodcall.stage.Distributor; +import teetime.examples.throughput.methodcall.stage.NoopFilter; +import teetime.examples.throughput.methodcall.stage.Pipeline; +import teetime.examples.throughput.methodcall.stage.Relay; +import teetime.examples.throughput.methodcall.stage.StartTimestampFilter; +import teetime.examples.throughput.methodcall.stage.StopTimestampFilter; +import teetime.framework.core.Analysis; + +/** + * @author Christian Wulf + * + * @since 1.10 + */ +public class MethodCallThroughputAnalysis17 extends Analysis { + + private static final int NUM_WORKER_THREADS = Runtime.getRuntime().availableProcessors(); + + private int numInputObjects; + private Callable<TimestampObject> inputObjectCreator; + private int numNoopFilters; + + private final List<List<TimestampObject>> timestampObjectsList = new LinkedList<List<TimestampObject>>(); + + private Distributor<TimestampObject> distributor; + private Thread producerThread; + + private Thread[] workerThreads; + + @Override + public void init() { + super.init(); + // Runnable producerRunnable = this.buildProducerPipeline(); + // this.producerThread = new Thread(producerRunnable); + + int numWorkerThreads = Math.min(NUM_WORKER_THREADS, 1); // only for testing purpose + + this.workerThreads = new Thread[numWorkerThreads]; + for (int i = 0; i < this.workerThreads.length; i++) { + List<TimestampObject> resultList = new ArrayList<TimestampObject>(this.numInputObjects); + this.timestampObjectsList.add(resultList); + + Runnable workerRunnable = this.buildPipeline(this.distributor, resultList); + this.workerThreads[i] = new Thread(workerRunnable); + } + + // this.producerThread.start(); + // + // try { + // this.producerThread.join(); + // } catch (InterruptedException e1) { + // // TODO Auto-generated catch block + // e1.printStackTrace(); + // } + } + + // private Runnable buildProducerPipeline() { + // final ObjectProducer<TimestampObject> objectProducer = new ObjectProducer<TimestampObject>(this.numInputObjects, this.inputObjectCreator); + // this.distributor = new Distributor<TimestampObject>(); + // + // final Pipeline<Void, TimestampObject> pipeline = new Pipeline<Void, TimestampObject>(); + // pipeline.setFirstStage(objectProducer); + // pipeline.setLastStage(this.distributor); + // + // UnorderedGrowablePipe.connect(objectProducer.getOutputPort(), this.distributor.getInputPort()); + // + // return new RunnableStage(pipeline); + // } + + /** + * @param numNoopFilters + * @since 1.10 + */ + private Runnable buildPipeline(final Distributor<TimestampObject> distributor, final List<TimestampObject> timestampObjects) { + Relay<TimestampObject> relay = new Relay<TimestampObject>(); + @SuppressWarnings("unchecked") + final NoopFilter<TimestampObject>[] noopFilters = new NoopFilter[this.numNoopFilters]; + // create stages + final StartTimestampFilter startTimestampFilter = new StartTimestampFilter(); + for (int i = 0; i < noopFilters.length; i++) { + noopFilters[i] = new NoopFilter<TimestampObject>(); + } + final StopTimestampFilter stopTimestampFilter = new StopTimestampFilter(); + final CollectorSink<TimestampObject> collectorSink = new CollectorSink<TimestampObject>(timestampObjects); + + final Pipeline<TimestampObject, Object> pipeline = new Pipeline<TimestampObject, Object>(); + pipeline.setFirstStage(relay); + pipeline.addIntermediateStage(startTimestampFilter); + pipeline.addIntermediateStages(noopFilters); + pipeline.addIntermediateStage(stopTimestampFilter); + pipeline.setLastStage(collectorSink); + + IPipe<TimestampObject> startPipe = new SpScPipe<TimestampObject>(); + try { + for (int i = 0; i < this.numInputObjects; i++) { + startPipe.add(this.inputObjectCreator.call()); + } + startPipe.close(); + } catch (Exception e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + relay.getInputPort().pipe = startPipe; + + UnorderedGrowablePipe.connect(relay.getOutputPort(), startTimestampFilter.getInputPort()); + + UnorderedGrowablePipe.connect(startTimestampFilter.getOutputPort(), noopFilters[0].getInputPort()); + for (int i = 0; i < noopFilters.length - 1; i++) { + UnorderedGrowablePipe.connect(noopFilters[i].getOutputPort(), noopFilters[i + 1].getInputPort()); + } + UnorderedGrowablePipe.connect(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort()); + UnorderedGrowablePipe.connect(stopTimestampFilter.getOutputPort(), collectorSink.getInputPort()); + + return new RunnableStage(pipeline); + } + + @Override + public void start() { + super.start(); + + for (Thread workerThread : this.workerThreads) { + workerThread.start(); + } + + try { + for (Thread workerThread : this.workerThreads) { + workerThread.join(); + } + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + + public void setInput(final int numInputObjects, final Callable<TimestampObject> inputObjectCreator) { + this.numInputObjects = numInputObjects; + this.inputObjectCreator = inputObjectCreator; + } + + public int getNumNoopFilters() { + return this.numNoopFilters; + } + + public void setNumNoopFilters(final int numNoopFilters) { + this.numNoopFilters = numNoopFilters; + } + + public List<List<TimestampObject>> getTimestampObjectsList() { + return this.timestampObjectsList; + } + +} diff --git a/src/test/java/teetime/examples/throughput/methodcall/OrderedGrowableArrayPipe.java b/src/test/java/teetime/examples/throughput/methodcall/OrderedGrowableArrayPipe.java index e23a519d7f0109e1095eb0e7ab8dd8c01f256d2c..ad653410600ee0f881fa7dbd572831953e3a0b06 100644 --- a/src/test/java/teetime/examples/throughput/methodcall/OrderedGrowableArrayPipe.java +++ b/src/test/java/teetime/examples/throughput/methodcall/OrderedGrowableArrayPipe.java @@ -2,7 +2,7 @@ package teetime.examples.throughput.methodcall; import teetime.util.concurrent.workstealing.CircularArray; -public class OrderedGrowableArrayPipe<T> implements IPipe<T> { +public class OrderedGrowableArrayPipe<T> extends AbstractPipe<T> { private CircularArray<T> elements; private int head; diff --git a/src/test/java/teetime/examples/throughput/methodcall/OrderedGrowablePipe.java b/src/test/java/teetime/examples/throughput/methodcall/OrderedGrowablePipe.java index 8cdc34f76fb1469c190fc04952671d22d2667fd1..62f7bf9a0b8a696f390742788b7e1b9fc92d4add 100644 --- a/src/test/java/teetime/examples/throughput/methodcall/OrderedGrowablePipe.java +++ b/src/test/java/teetime/examples/throughput/methodcall/OrderedGrowablePipe.java @@ -2,7 +2,7 @@ package teetime.examples.throughput.methodcall; import java.util.LinkedList; -public class OrderedGrowablePipe<T> implements IPipe<T> { +public class OrderedGrowablePipe<T> extends AbstractPipe<T> { private LinkedList<T> elements; @@ -22,12 +22,12 @@ public class OrderedGrowablePipe<T> implements IPipe<T> { @Override public void add(final T element) { - this.elements.add(element); + this.elements.offer(element); } @Override public T removeLast() { - return this.elements.removeFirst(); + return this.elements.poll(); } @Override @@ -37,7 +37,7 @@ public class OrderedGrowablePipe<T> implements IPipe<T> { @Override public T readLast() { - return this.elements.getFirst(); + return this.elements.peek(); } @Override diff --git a/src/test/java/teetime/examples/throughput/methodcall/Pipe.java b/src/test/java/teetime/examples/throughput/methodcall/Pipe.java index 2c4309afbb37ecbf50b749d13c04e000f5c2b2cb..e6e9c00eeb184da79f476f73631884eb1bc9185f 100644 --- a/src/test/java/teetime/examples/throughput/methodcall/Pipe.java +++ b/src/test/java/teetime/examples/throughput/methodcall/Pipe.java @@ -2,7 +2,7 @@ package teetime.examples.throughput.methodcall; import teetime.util.list.CommittableResizableArrayQueue; -public class Pipe<T> implements IPipe<T> { +public class Pipe<T> extends AbstractPipe<T> { private final CommittableResizableArrayQueue<T> elements = new CommittableResizableArrayQueue<T>(null, 4); diff --git a/src/test/java/teetime/examples/throughput/methodcall/SingleElementPipe.java b/src/test/java/teetime/examples/throughput/methodcall/SingleElementPipe.java index 82cad708dd0c29eed8f456f57e3d22baf9ff8060..ab6a33de8c381de8664b363e455372989029391d 100644 --- a/src/test/java/teetime/examples/throughput/methodcall/SingleElementPipe.java +++ b/src/test/java/teetime/examples/throughput/methodcall/SingleElementPipe.java @@ -1,6 +1,7 @@ package teetime.examples.throughput.methodcall; -public class SingleElementPipe<T> implements IPipe<T> { +//public class SingleElementPipe<T> implements IPipe<T> { +public class SingleElementPipe<T> extends AbstractPipe<T> { private T element; @@ -37,4 +38,16 @@ public class SingleElementPipe<T> implements IPipe<T> { return (this.element == null) ? 0 : 1; } + // @Override + // public void close() { + // + // + // } + // + // @Override + // public boolean isClosed() { + // + // return false; + // } + } diff --git a/src/test/java/teetime/examples/throughput/methodcall/SpScPipe.java b/src/test/java/teetime/examples/throughput/methodcall/SpScPipe.java index 37710721d29f3d4c339ca455669f4101068621ca..37380ef64d99f61503decc6345b587d6a4caf240 100644 --- a/src/test/java/teetime/examples/throughput/methodcall/SpScPipe.java +++ b/src/test/java/teetime/examples/throughput/methodcall/SpScPipe.java @@ -2,7 +2,7 @@ package teetime.examples.throughput.methodcall; import teetime.util.concurrent.spsc.FFBufferOrdered3; -public class SpScPipe<T> implements IPipe<T> { +public class SpScPipe<T> extends AbstractPipe<T> { private final FFBufferOrdered3<T> queue = new FFBufferOrdered3<T>(100010); diff --git a/src/test/java/teetime/examples/throughput/methodcall/Stage.java b/src/test/java/teetime/examples/throughput/methodcall/Stage.java index 9f183595a15562ab3389517d25ce58c74501bf08..e2f868bbd427640e9ceb0923163c1b576ca809ab 100644 --- a/src/test/java/teetime/examples/throughput/methodcall/Stage.java +++ b/src/test/java/teetime/examples/throughput/methodcall/Stage.java @@ -4,8 +4,6 @@ import teetime.util.list.CommittableQueue; public interface Stage<I, O> { - public static final Object END_SIGNAL = new Object(); - Object executeRecursively(Object element); O execute(Object element); diff --git a/src/test/java/teetime/examples/throughput/methodcall/UnorderedGrowablePipe.java b/src/test/java/teetime/examples/throughput/methodcall/UnorderedGrowablePipe.java index 383145e3e343d7e83bca2ac0b401a9706857ff0d..b2c2edffb9e979772397c51e1672df2f41ae6b5e 100644 --- a/src/test/java/teetime/examples/throughput/methodcall/UnorderedGrowablePipe.java +++ b/src/test/java/teetime/examples/throughput/methodcall/UnorderedGrowablePipe.java @@ -1,6 +1,6 @@ package teetime.examples.throughput.methodcall; -public class UnorderedGrowablePipe<T> implements IPipe<T> { +public class UnorderedGrowablePipe<T> extends AbstractPipe<T> { // private static final class ArrayWrapper2<T> { // diff --git a/src/test/java/teetime/examples/throughput/methodcall/stage/Distributor.java b/src/test/java/teetime/examples/throughput/methodcall/stage/Distributor.java index 894c734ad5b232c604a73f721d004a09931e9340..ca66406587942fcea87b31650796c7ff39a6bc5b 100644 --- a/src/test/java/teetime/examples/throughput/methodcall/stage/Distributor.java +++ b/src/test/java/teetime/examples/throughput/methodcall/stage/Distributor.java @@ -17,7 +17,7 @@ public class Distributor<T> extends ConsumerStage<T, T> { private int size; - private int mask; + // private int mask; @Override public T execute(final Object element) { @@ -40,9 +40,9 @@ public class Distributor<T> extends ConsumerStage<T, T> { @Override public void onIsPipelineHead() { - for (OutputPort op : this.outputPorts) { - op.send(END_SIGNAL); - System.out.println("End signal sent, size: " + op.pipe.size() + ", end signal:" + (op.pipe.readLast() == END_SIGNAL)); + for (OutputPort<?> op : this.outputPorts) { + op.pipe.close(); + System.out.println("End signal sent, size: " + op.pipe.size()); } } diff --git a/src/test/java/teetime/examples/throughput/methodcall/stage/Relay.java b/src/test/java/teetime/examples/throughput/methodcall/stage/Relay.java index 8cff7774c43a9789195492aa5084c258af0876c3..fe6bc87b9becefefeab6b821b79b34261d38ecb3 100644 --- a/src/test/java/teetime/examples/throughput/methodcall/stage/Relay.java +++ b/src/test/java/teetime/examples/throughput/methodcall/stage/Relay.java @@ -12,10 +12,11 @@ public class Relay<T> extends AbstractStage<T, T> { public void executeWithPorts() { T element = this.getInputPort().receive(); if (null == element) { - return; - } else if (END_SIGNAL == element) { - this.setReschedulable(false); - System.out.println("got end signal; pipe.size: " + this.getInputPort().pipe.size()); + if (this.getInputPort().pipe.isClosed()) { + this.setReschedulable(false); + System.out.println("got end signal; pipe.size: " + this.getInputPort().pipe.size()); + return; + } return; }