diff --git a/src/main/java/teetime/framework/AbstractInterThreadPipe.java b/src/main/java/teetime/framework/AbstractInterThreadPipe.java index 980210899354588747cd2339406db559a0be709b..847d10ef6ea868cceaa0d60ea5b1db70061784c1 100644 --- a/src/main/java/teetime/framework/AbstractInterThreadPipe.java +++ b/src/main/java/teetime/framework/AbstractInterThreadPipe.java @@ -1,5 +1,6 @@ package teetime.framework; +import java.lang.Thread.State; import java.util.Queue; import org.jctools.queues.QueueFactory; @@ -20,6 +21,11 @@ public abstract class AbstractInterThreadPipe extends AbstractPipe { @Override public void sendSignal(final ISignal signal) { this.signalQueue.offer(signal); + + Thread owningThread = cachedTargetStage.getOwningThread(); + if (owningThread.getState() == State.WAITING || owningThread.getState() == State.TIMED_WAITING) { + owningThread.interrupt(); + } } /** diff --git a/src/main/java/teetime/framework/AbstractStage.java b/src/main/java/teetime/framework/AbstractStage.java index 06b1b77210a2b3bad5831307cbf3e482d792ca12..bfcfe9934b91a345980746a0366836fc9db3fd4c 100644 --- a/src/main/java/teetime/framework/AbstractStage.java +++ b/src/main/java/teetime/framework/AbstractStage.java @@ -16,7 +16,7 @@ public abstract class AbstractStage extends Stage { private final List<OutputPort<?>> outputPortList = new ArrayList<OutputPort<?>>(); /** A cached instance of <code>inputPortList</code> to avoid creating an iterator each time iterating it */ - protected InputPort<?>[] cachedInputPorts; + protected InputPort<?>[] cachedInputPorts = new InputPort[0]; /** A cached instance of <code>outputPortList</code> to avoid creating an iterator each time iterating it */ protected OutputPort<?>[] cachedOutputPorts; @@ -35,7 +35,8 @@ public abstract class AbstractStage extends Stage { /** * @return the stage's input ports */ - protected InputPort<?>[] getInputPorts() { + @Override + public InputPort<?>[] getInputPorts() { return this.cachedInputPorts; } diff --git a/src/main/java/teetime/framework/Analysis.java b/src/main/java/teetime/framework/Analysis.java index 9645269eaf9f1551bbb4d9e4e8eaa5653b3a659e..3f14c1aaa9883bde900f352846a42f5973f403a3 100644 --- a/src/main/java/teetime/framework/Analysis.java +++ b/src/main/java/teetime/framework/Analysis.java @@ -73,16 +73,19 @@ public class Analysis implements UncaughtExceptionHandler { switch (stage.getTerminationStrategy()) { case BY_SIGNAL: { final Thread thread = new Thread(new RunnableConsumerStage(stage)); + stage.setOwningThread(thread); this.consumerThreads.add(thread); break; } case BY_SELF_DECISION: { final Thread thread = new Thread(new RunnableProducerStage(stage)); + stage.setOwningThread(thread); this.finiteProducerThreads.add(thread); break; } case BY_INTERRUPT: { final Thread thread = new Thread(new RunnableProducerStage(stage)); + stage.setOwningThread(thread); this.infiniteProducerThreads.add(thread); break; } diff --git a/src/main/java/teetime/framework/RunnableConsumerStage.java b/src/main/java/teetime/framework/RunnableConsumerStage.java index 27e5e1d9d99a6efabc87546abad44b347c99596a..bfdefd7b9d49b6aa826091b95db7c5131e16e802 100644 --- a/src/main/java/teetime/framework/RunnableConsumerStage.java +++ b/src/main/java/teetime/framework/RunnableConsumerStage.java @@ -42,8 +42,8 @@ public final class RunnableConsumerStage extends RunnableStage { } private void checkforSignals() { - // FIXME consider to use AbstractStage or to move getInputPorts() to Stage or... - InputPort<?>[] inputPorts = ((AbstractStage) stage).getInputPorts(); + // FIXME should getInputPorts() really be defined in Stage? + InputPort<?>[] inputPorts = stage.getInputPorts(); for (InputPort<?> inputPort : inputPorts) { IPipe pipe = inputPort.getPipe(); if (pipe instanceof AbstractInterThreadPipe) { diff --git a/src/main/java/teetime/framework/Stage.java b/src/main/java/teetime/framework/Stage.java index 561f76238327c2eb162014c3d3446f3533ec0f56..96e16370f82bdb3bcfd82f25ab19886b5450127b 100644 --- a/src/main/java/teetime/framework/Stage.java +++ b/src/main/java/teetime/framework/Stage.java @@ -26,6 +26,8 @@ public abstract class Stage { @SuppressWarnings("PMD.LoggerIsNotStaticFinal") protected final Logger logger; + private Thread owningThread; + protected Stage() { this.id = this.createId(); this.logger = LoggerFactory.getLogger(this.id); @@ -82,4 +84,14 @@ public abstract class Stage { protected abstract void terminate(); protected abstract boolean shouldBeTerminated(); + + public Thread getOwningThread() { + return owningThread; + } + + public void setOwningThread(final Thread owningThread) { + this.owningThread = owningThread; + } + + protected abstract InputPort<?>[] getInputPorts(); } diff --git a/src/main/java/teetime/framework/pipe/PipeFactoryLoader.java b/src/main/java/teetime/framework/pipe/PipeFactoryLoader.java index 6fcbda81d8ff073ca27a9d2863b4ec02ad6f4688..1335c51a97bd15eeece300f8e045d8950ea52c40 100644 --- a/src/main/java/teetime/framework/pipe/PipeFactoryLoader.java +++ b/src/main/java/teetime/framework/pipe/PipeFactoryLoader.java @@ -38,7 +38,7 @@ public final class PipeFactoryLoader { pipeFactories.add(pipeFactory); } } catch (ClassNotFoundException e) { - LOGGER.warn("Could not find class: " + line, e); + LOGGER.warn("Could not find class: " + line, e); // NOMPD (PMD.GuardLogStatement) } catch (InstantiationException e) { LOGGER.warn("Could not instantiate pipe factory", e); } catch (IllegalAccessException e) { diff --git a/src/main/java/teetime/framework/pipe/SpScPipe.java b/src/main/java/teetime/framework/pipe/SpScPipe.java index 8facbf44a72c4aa96087f8fe2372d54a6c7d7a7f..f32e2fbec45041edf94959d497437376783269f2 100644 --- a/src/main/java/teetime/framework/pipe/SpScPipe.java +++ b/src/main/java/teetime/framework/pipe/SpScPipe.java @@ -1,5 +1,6 @@ package teetime.framework.pipe; +import java.lang.Thread.State; import java.util.Queue; import org.jctools.queues.QueueFactory; @@ -37,6 +38,13 @@ public final class SpScPipe extends AbstractInterThreadPipe { Thread.yield(); } + Thread owningThread = cachedTargetStage.getOwningThread(); + if (owningThread.getState() == State.WAITING || owningThread.getState() == State.TIMED_WAITING) { + synchronized (cachedTargetStage) { + cachedTargetStage.notify(); + } + } + return true; } diff --git a/src/main/java/teetime/stage/io/EveryXthPrinter.java b/src/main/java/teetime/stage/io/EveryXthPrinter.java index aacc5936acd18870ab2215ffe859ac6398e5139d..e76e71d23ebc6c96418068cc56b88a6dc95d7af4 100644 --- a/src/main/java/teetime/stage/io/EveryXthPrinter.java +++ b/src/main/java/teetime/stage/io/EveryXthPrinter.java @@ -70,4 +70,9 @@ public final class EveryXthPrinter<T> extends Stage { return distributor.getNewOutputPort(); } + @Override + protected InputPort<?>[] getInputPorts() { + return distributor.getInputPorts(); + } + } diff --git a/src/performancetest/java/teetime/examples/experiment15/MethodCallThroughputAnalysis15.java b/src/performancetest/java/teetime/examples/experiment15/MethodCallThroughputAnalysis15.java index 1dda610e43d5eccdffb0a4440c8baf5479cacac6..fcde43930f195403b1e9f759733e706de6857dd4 100644 --- a/src/performancetest/java/teetime/examples/experiment15/MethodCallThroughputAnalysis15.java +++ b/src/performancetest/java/teetime/examples/experiment15/MethodCallThroughputAnalysis15.java @@ -17,11 +17,14 @@ package teetime.examples.experiment15; import java.util.List; -import teetime.framework.Stage; +import teetime.framework.AnalysisConfiguration; import teetime.framework.OldHeadPipeline; import teetime.framework.RunnableProducerStage; +import teetime.framework.Stage; +import teetime.framework.pipe.IPipeFactory; import teetime.framework.pipe.OrderedGrowableArrayPipe; -import teetime.framework.pipe.SingleElementPipe; +import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; +import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; import teetime.framework.pipe.SpScPipe; import teetime.stage.Clock; import teetime.stage.CollectorSink; @@ -39,11 +42,13 @@ import teetime.util.TimestampObject; * * @since 1.10 */ -public class MethodCallThroughputAnalysis15 { +public class MethodCallThroughputAnalysis15 extends AnalysisConfiguration { // FIXME this analysis sometimes runs infinitely private static final int SPSC_INITIAL_CAPACITY = 4; + private final IPipeFactory intraThreadPipeFactory; + private int numInputObjects; private ConstructorClosure<TimestampObject> inputObjectCreator; private int numNoopFilters; @@ -53,8 +58,11 @@ public class MethodCallThroughputAnalysis15 { private Runnable runnable; private Clock clock; - public void init() { + public MethodCallThroughputAnalysis15() { + intraThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false); + } + public void init() { OldHeadPipeline<Clock, Sink<Long>> clockPipeline = this.buildClockPipeline(); this.clockRunnable = new RunnableProducerStage(clockPipeline); @@ -99,15 +107,15 @@ public class MethodCallThroughputAnalysis15 { SpScPipe.connect(clock.getOutputPort(), delay.getTimestampTriggerInputPort(), SPSC_INITIAL_CAPACITY); - SingleElementPipe.connect(objectProducer.getOutputPort(), startTimestampFilter.getInputPort()); - SingleElementPipe.connect(startTimestampFilter.getOutputPort(), noopFilters[0].getInputPort()); + intraThreadPipeFactory.create(objectProducer.getOutputPort(), startTimestampFilter.getInputPort()); + intraThreadPipeFactory.create(startTimestampFilter.getOutputPort(), noopFilters[0].getInputPort()); for (int i = 0; i < noopFilters.length - 1; i++) { - SingleElementPipe.connect(noopFilters[i].getOutputPort(), noopFilters[i + 1].getInputPort()); + intraThreadPipeFactory.create(noopFilters[i].getOutputPort(), noopFilters[i + 1].getInputPort()); } - SingleElementPipe.connect(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort()); + intraThreadPipeFactory.create(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort()); OrderedGrowableArrayPipe.connect(stopTimestampFilter.getOutputPort(), delay.getInputPort()); - SingleElementPipe.connect(delay.getOutputPort(), collectorSink.getInputPort()); + intraThreadPipeFactory.create(delay.getOutputPort(), collectorSink.getInputPort()); return pipeline; } diff --git a/src/performancetest/java/teetime/examples/experiment16/MethodCallThroughputAnalysis16.java b/src/performancetest/java/teetime/examples/experiment16/MethodCallThroughputAnalysis16.java index 167c3f9709400a03eb9563d0fb8a21d8bfbf16d6..5ea97ae65635c950bd1ecdea8b9b76bd95d3824e 100644 --- a/src/performancetest/java/teetime/examples/experiment16/MethodCallThroughputAnalysis16.java +++ b/src/performancetest/java/teetime/examples/experiment16/MethodCallThroughputAnalysis16.java @@ -19,9 +19,13 @@ import java.util.ArrayList; import java.util.LinkedList; import java.util.List; +import teetime.framework.AnalysisConfiguration; import teetime.framework.OldHeadPipeline; +import teetime.framework.RunnableConsumerStage; import teetime.framework.RunnableProducerStage; -import teetime.framework.pipe.SingleElementPipe; +import teetime.framework.pipe.IPipeFactory; +import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; +import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; import teetime.framework.pipe.SpScPipe; import teetime.stage.CollectorSink; import teetime.stage.NoopFilter; @@ -30,6 +34,7 @@ import teetime.stage.Relay; import teetime.stage.StartTimestampFilter; import teetime.stage.StopTimestampFilter; import teetime.stage.basic.distributor.Distributor; +import teetime.stage.io.EveryXthPrinter; import teetime.util.ConstructorClosure; import teetime.util.TimestampObject; @@ -38,11 +43,13 @@ import teetime.util.TimestampObject; * * @since 1.10 */ -public class MethodCallThroughputAnalysis16 { +public class MethodCallThroughputAnalysis16 extends AnalysisConfiguration { private static final int SPSC_INITIAL_CAPACITY = 100100; private static final int NUM_WORKER_THREADS = Runtime.getRuntime().availableProcessors(); + private final IPipeFactory intraThreadPipeFactory; + private int numInputObjects; private ConstructorClosure<TimestampObject> inputObjectCreator; private int numNoopFilters; @@ -55,6 +62,10 @@ public class MethodCallThroughputAnalysis16 { private int numWorkerThreads; + public MethodCallThroughputAnalysis16() { + intraThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false); + } + public void init() { OldHeadPipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> producerPipeline = this.buildProducerPipeline(this.numInputObjects, this.inputObjectCreator); @@ -68,7 +79,8 @@ public class MethodCallThroughputAnalysis16 { this.timestampObjectsList.add(resultList); OldHeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>> workerPipeline = this.buildPipeline(producerPipeline, resultList); - this.workerThreads[i] = new Thread(new RunnableProducerStage(workerPipeline)); + this.workerThreads[i] = new Thread(new RunnableConsumerStage(workerPipeline)); + workerPipeline.setOwningThread(this.workerThreads[i]); } } @@ -81,7 +93,7 @@ public class MethodCallThroughputAnalysis16 { pipeline.setFirstStage(objectProducer); pipeline.setLastStage(distributor); - SingleElementPipe.connect(objectProducer.getOutputPort(), distributor.getInputPort()); + intraThreadPipeFactory.create(objectProducer.getOutputPort(), distributor.getInputPort()); return pipeline; } @@ -102,6 +114,7 @@ public class MethodCallThroughputAnalysis16 { noopFilters[i] = new NoopFilter<TimestampObject>(); } final StopTimestampFilter stopTimestampFilter = new StopTimestampFilter(); + EveryXthPrinter<TimestampObject> everyXthPrinter = new EveryXthPrinter<TimestampObject>(100000); final CollectorSink<TimestampObject> collectorSink = new CollectorSink<TimestampObject>(timestampObjects); final OldHeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new OldHeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>>(); @@ -110,20 +123,20 @@ public class MethodCallThroughputAnalysis16 { SpScPipe.connect(previousStage.getLastStage().getNewOutputPort(), relay.getInputPort(), SPSC_INITIAL_CAPACITY); - SingleElementPipe.connect(relay.getOutputPort(), startTimestampFilter.getInputPort()); + intraThreadPipeFactory.create(relay.getOutputPort(), startTimestampFilter.getInputPort()); - SingleElementPipe.connect(startTimestampFilter.getOutputPort(), noopFilters[0].getInputPort()); + intraThreadPipeFactory.create(startTimestampFilter.getOutputPort(), noopFilters[0].getInputPort()); for (int i = 0; i < noopFilters.length - 1; i++) { - SingleElementPipe.connect(noopFilters[i].getOutputPort(), noopFilters[i + 1].getInputPort()); + intraThreadPipeFactory.create(noopFilters[i].getOutputPort(), noopFilters[i + 1].getInputPort()); } - SingleElementPipe.connect(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort()); - SingleElementPipe.connect(stopTimestampFilter.getOutputPort(), collectorSink.getInputPort()); + intraThreadPipeFactory.create(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort()); + intraThreadPipeFactory.create(stopTimestampFilter.getOutputPort(), everyXthPrinter.getInputPort()); + intraThreadPipeFactory.create(everyXthPrinter.getNewOutputPort(), collectorSink.getInputPort()); return pipeline; } public void start() { - this.producerThread.start(); for (Thread workerThread : this.workerThreads) { diff --git a/src/performancetest/java/teetime/framework/OldPipeline.java b/src/performancetest/java/teetime/framework/OldPipeline.java index 56c8fffd59bc22e7b2a9efec73b9c83095ed502b..900de2a1871b13bbdf7724a57e7299cad2f646f9 100644 --- a/src/performancetest/java/teetime/framework/OldPipeline.java +++ b/src/performancetest/java/teetime/framework/OldPipeline.java @@ -57,4 +57,19 @@ public class OldPipeline<FirstStage extends Stage, LastStage extends Stage> exte return firstStage.shouldBeTerminated(); } + @Override + protected InputPort<?>[] getInputPorts() { + return firstStage.getInputPorts(); + } + + @Override + public void setOwningThread(final Thread owningThread) { + firstStage.setOwningThread(owningThread); + } + + @Override + public Thread getOwningThread() { + return firstStage.getOwningThread(); + } + } diff --git a/src/test/java/teetime/framework/pipe/SpScPipeTest.java b/src/test/java/teetime/framework/pipe/SpScPipeTest.java index 20d366d6aea0079c67138c645e373f0f5a4f71e2..2600789edd7994148ab3380403500c397dd423f3 100644 --- a/src/test/java/teetime/framework/pipe/SpScPipeTest.java +++ b/src/test/java/teetime/framework/pipe/SpScPipeTest.java @@ -18,7 +18,7 @@ public class SpScPipeTest { @Test public void testSignalOrdering() throws Exception { - OutputPort<? extends Object> sourcePort = null; + OutputPort<Object> sourcePort = null; InputPort<Object> targetPort = null; AbstractInterThreadPipe pipe = new SpScPipe(sourcePort, targetPort, 1); // IPipe does not provide getSignal method