From 487bece8f6f1e703d81bba024ef0ac73757761d0 Mon Sep 17 00:00:00 2001
From: Christian Wulf <chw@informatik.uni-kiel.de>
Date: Thu, 18 Dec 2014 15:03:00 +0100
Subject: [PATCH] added notify and interrupt in SpScPipe

---
 .../framework/AbstractInterThreadPipe.java    |  6 ++++
 .../java/teetime/framework/AbstractStage.java |  5 +--
 src/main/java/teetime/framework/Analysis.java |  3 ++
 .../framework/RunnableConsumerStage.java      |  4 +--
 src/main/java/teetime/framework/Stage.java    | 12 +++++++
 .../framework/pipe/PipeFactoryLoader.java     |  2 +-
 .../java/teetime/framework/pipe/SpScPipe.java |  8 +++++
 .../teetime/stage/io/EveryXthPrinter.java     |  5 +++
 .../MethodCallThroughputAnalysis15.java       | 26 ++++++++++-----
 .../MethodCallThroughputAnalysis16.java       | 33 +++++++++++++------
 .../java/teetime/framework/OldPipeline.java   | 15 +++++++++
 .../teetime/framework/pipe/SpScPipeTest.java  |  2 +-
 12 files changed, 96 insertions(+), 25 deletions(-)

diff --git a/src/main/java/teetime/framework/AbstractInterThreadPipe.java b/src/main/java/teetime/framework/AbstractInterThreadPipe.java
index 98021089..847d10ef 100644
--- a/src/main/java/teetime/framework/AbstractInterThreadPipe.java
+++ b/src/main/java/teetime/framework/AbstractInterThreadPipe.java
@@ -1,5 +1,6 @@
 package teetime.framework;
 
+import java.lang.Thread.State;
 import java.util.Queue;
 
 import org.jctools.queues.QueueFactory;
@@ -20,6 +21,11 @@ public abstract class AbstractInterThreadPipe extends AbstractPipe {
 	@Override
 	public void sendSignal(final ISignal signal) {
 		this.signalQueue.offer(signal);
+
+		Thread owningThread = cachedTargetStage.getOwningThread();
+		if (owningThread.getState() == State.WAITING || owningThread.getState() == State.TIMED_WAITING) {
+			owningThread.interrupt();
+		}
 	}
 
 	/**
diff --git a/src/main/java/teetime/framework/AbstractStage.java b/src/main/java/teetime/framework/AbstractStage.java
index 06b1b772..bfcfe993 100644
--- a/src/main/java/teetime/framework/AbstractStage.java
+++ b/src/main/java/teetime/framework/AbstractStage.java
@@ -16,7 +16,7 @@ public abstract class AbstractStage extends Stage {
 	private final List<OutputPort<?>> outputPortList = new ArrayList<OutputPort<?>>();
 
 	/** A cached instance of <code>inputPortList</code> to avoid creating an iterator each time iterating it */
-	protected InputPort<?>[] cachedInputPorts;
+	protected InputPort<?>[] cachedInputPorts = new InputPort[0];
 	/** A cached instance of <code>outputPortList</code> to avoid creating an iterator each time iterating it */
 	protected OutputPort<?>[] cachedOutputPorts;
 
@@ -35,7 +35,8 @@ public abstract class AbstractStage extends Stage {
 	/**
 	 * @return the stage's input ports
 	 */
-	protected InputPort<?>[] getInputPorts() {
+	@Override
+	public InputPort<?>[] getInputPorts() {
 		return this.cachedInputPorts;
 	}
 
diff --git a/src/main/java/teetime/framework/Analysis.java b/src/main/java/teetime/framework/Analysis.java
index 9645269e..3f14c1aa 100644
--- a/src/main/java/teetime/framework/Analysis.java
+++ b/src/main/java/teetime/framework/Analysis.java
@@ -73,16 +73,19 @@ public class Analysis implements UncaughtExceptionHandler {
 			switch (stage.getTerminationStrategy()) {
 			case BY_SIGNAL: {
 				final Thread thread = new Thread(new RunnableConsumerStage(stage));
+				stage.setOwningThread(thread);
 				this.consumerThreads.add(thread);
 				break;
 			}
 			case BY_SELF_DECISION: {
 				final Thread thread = new Thread(new RunnableProducerStage(stage));
+				stage.setOwningThread(thread);
 				this.finiteProducerThreads.add(thread);
 				break;
 			}
 			case BY_INTERRUPT: {
 				final Thread thread = new Thread(new RunnableProducerStage(stage));
+				stage.setOwningThread(thread);
 				this.infiniteProducerThreads.add(thread);
 				break;
 			}
diff --git a/src/main/java/teetime/framework/RunnableConsumerStage.java b/src/main/java/teetime/framework/RunnableConsumerStage.java
index 27e5e1d9..bfdefd7b 100644
--- a/src/main/java/teetime/framework/RunnableConsumerStage.java
+++ b/src/main/java/teetime/framework/RunnableConsumerStage.java
@@ -42,8 +42,8 @@ public final class RunnableConsumerStage extends RunnableStage {
 	}
 
 	private void checkforSignals() {
-		// FIXME consider to use AbstractStage or to move getInputPorts() to Stage or...
-		InputPort<?>[] inputPorts = ((AbstractStage) stage).getInputPorts();
+		// FIXME should getInputPorts() really be defined in Stage?
+		InputPort<?>[] inputPorts = stage.getInputPorts();
 		for (InputPort<?> inputPort : inputPorts) {
 			IPipe pipe = inputPort.getPipe();
 			if (pipe instanceof AbstractInterThreadPipe) {
diff --git a/src/main/java/teetime/framework/Stage.java b/src/main/java/teetime/framework/Stage.java
index 561f7623..96e16370 100644
--- a/src/main/java/teetime/framework/Stage.java
+++ b/src/main/java/teetime/framework/Stage.java
@@ -26,6 +26,8 @@ public abstract class Stage {
 	@SuppressWarnings("PMD.LoggerIsNotStaticFinal")
 	protected final Logger logger;
 
+	private Thread owningThread;
+
 	protected Stage() {
 		this.id = this.createId();
 		this.logger = LoggerFactory.getLogger(this.id);
@@ -82,4 +84,14 @@ public abstract class Stage {
 	protected abstract void terminate();
 
 	protected abstract boolean shouldBeTerminated();
+
+	public Thread getOwningThread() {
+		return owningThread;
+	}
+
+	public void setOwningThread(final Thread owningThread) {
+		this.owningThread = owningThread;
+	}
+
+	protected abstract InputPort<?>[] getInputPorts();
 }
diff --git a/src/main/java/teetime/framework/pipe/PipeFactoryLoader.java b/src/main/java/teetime/framework/pipe/PipeFactoryLoader.java
index 6fcbda81..1335c51a 100644
--- a/src/main/java/teetime/framework/pipe/PipeFactoryLoader.java
+++ b/src/main/java/teetime/framework/pipe/PipeFactoryLoader.java
@@ -38,7 +38,7 @@ public final class PipeFactoryLoader {
 						pipeFactories.add(pipeFactory);
 					}
 				} catch (ClassNotFoundException e) {
-					LOGGER.warn("Could not find class: " + line, e);
+					LOGGER.warn("Could not find class: " + line, e); // NOMPD (PMD.GuardLogStatement)
 				} catch (InstantiationException e) {
 					LOGGER.warn("Could not instantiate pipe factory", e);
 				} catch (IllegalAccessException e) {
diff --git a/src/main/java/teetime/framework/pipe/SpScPipe.java b/src/main/java/teetime/framework/pipe/SpScPipe.java
index 8facbf44..f32e2fbe 100644
--- a/src/main/java/teetime/framework/pipe/SpScPipe.java
+++ b/src/main/java/teetime/framework/pipe/SpScPipe.java
@@ -1,5 +1,6 @@
 package teetime.framework.pipe;
 
+import java.lang.Thread.State;
 import java.util.Queue;
 
 import org.jctools.queues.QueueFactory;
@@ -37,6 +38,13 @@ public final class SpScPipe extends AbstractInterThreadPipe {
 			Thread.yield();
 		}
 
+		Thread owningThread = cachedTargetStage.getOwningThread();
+		if (owningThread.getState() == State.WAITING || owningThread.getState() == State.TIMED_WAITING) {
+			synchronized (cachedTargetStage) {
+				cachedTargetStage.notify();
+			}
+		}
+
 		return true;
 	}
 
diff --git a/src/main/java/teetime/stage/io/EveryXthPrinter.java b/src/main/java/teetime/stage/io/EveryXthPrinter.java
index aacc5936..e76e71d2 100644
--- a/src/main/java/teetime/stage/io/EveryXthPrinter.java
+++ b/src/main/java/teetime/stage/io/EveryXthPrinter.java
@@ -70,4 +70,9 @@ public final class EveryXthPrinter<T> extends Stage {
 		return distributor.getNewOutputPort();
 	}
 
+	@Override
+	protected InputPort<?>[] getInputPorts() {
+		return distributor.getInputPorts();
+	}
+
 }
diff --git a/src/performancetest/java/teetime/examples/experiment15/MethodCallThroughputAnalysis15.java b/src/performancetest/java/teetime/examples/experiment15/MethodCallThroughputAnalysis15.java
index 1dda610e..fcde4393 100644
--- a/src/performancetest/java/teetime/examples/experiment15/MethodCallThroughputAnalysis15.java
+++ b/src/performancetest/java/teetime/examples/experiment15/MethodCallThroughputAnalysis15.java
@@ -17,11 +17,14 @@ package teetime.examples.experiment15;
 
 import java.util.List;
 
-import teetime.framework.Stage;
+import teetime.framework.AnalysisConfiguration;
 import teetime.framework.OldHeadPipeline;
 import teetime.framework.RunnableProducerStage;
+import teetime.framework.Stage;
+import teetime.framework.pipe.IPipeFactory;
 import teetime.framework.pipe.OrderedGrowableArrayPipe;
-import teetime.framework.pipe.SingleElementPipe;
+import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
+import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
 import teetime.framework.pipe.SpScPipe;
 import teetime.stage.Clock;
 import teetime.stage.CollectorSink;
@@ -39,11 +42,13 @@ import teetime.util.TimestampObject;
  *
  * @since 1.10
  */
-public class MethodCallThroughputAnalysis15 {
+public class MethodCallThroughputAnalysis15 extends AnalysisConfiguration {
 	// FIXME this analysis sometimes runs infinitely
 
 	private static final int SPSC_INITIAL_CAPACITY = 4;
 
+	private final IPipeFactory intraThreadPipeFactory;
+
 	private int numInputObjects;
 	private ConstructorClosure<TimestampObject> inputObjectCreator;
 	private int numNoopFilters;
@@ -53,8 +58,11 @@ public class MethodCallThroughputAnalysis15 {
 	private Runnable runnable;
 	private Clock clock;
 
-	public void init() {
+	public MethodCallThroughputAnalysis15() {
+		intraThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false);
+	}
 
+	public void init() {
 		OldHeadPipeline<Clock, Sink<Long>> clockPipeline = this.buildClockPipeline();
 		this.clockRunnable = new RunnableProducerStage(clockPipeline);
 
@@ -99,15 +107,15 @@ public class MethodCallThroughputAnalysis15 {
 
 		SpScPipe.connect(clock.getOutputPort(), delay.getTimestampTriggerInputPort(), SPSC_INITIAL_CAPACITY);
 
-		SingleElementPipe.connect(objectProducer.getOutputPort(), startTimestampFilter.getInputPort());
-		SingleElementPipe.connect(startTimestampFilter.getOutputPort(), noopFilters[0].getInputPort());
+		intraThreadPipeFactory.create(objectProducer.getOutputPort(), startTimestampFilter.getInputPort());
+		intraThreadPipeFactory.create(startTimestampFilter.getOutputPort(), noopFilters[0].getInputPort());
 		for (int i = 0; i < noopFilters.length - 1; i++) {
-			SingleElementPipe.connect(noopFilters[i].getOutputPort(), noopFilters[i + 1].getInputPort());
+			intraThreadPipeFactory.create(noopFilters[i].getOutputPort(), noopFilters[i + 1].getInputPort());
 		}
-		SingleElementPipe.connect(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort());
+		intraThreadPipeFactory.create(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort());
 		OrderedGrowableArrayPipe.connect(stopTimestampFilter.getOutputPort(), delay.getInputPort());
 
-		SingleElementPipe.connect(delay.getOutputPort(), collectorSink.getInputPort());
+		intraThreadPipeFactory.create(delay.getOutputPort(), collectorSink.getInputPort());
 
 		return pipeline;
 	}
diff --git a/src/performancetest/java/teetime/examples/experiment16/MethodCallThroughputAnalysis16.java b/src/performancetest/java/teetime/examples/experiment16/MethodCallThroughputAnalysis16.java
index 167c3f97..5ea97ae6 100644
--- a/src/performancetest/java/teetime/examples/experiment16/MethodCallThroughputAnalysis16.java
+++ b/src/performancetest/java/teetime/examples/experiment16/MethodCallThroughputAnalysis16.java
@@ -19,9 +19,13 @@ import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
 
+import teetime.framework.AnalysisConfiguration;
 import teetime.framework.OldHeadPipeline;
+import teetime.framework.RunnableConsumerStage;
 import teetime.framework.RunnableProducerStage;
-import teetime.framework.pipe.SingleElementPipe;
+import teetime.framework.pipe.IPipeFactory;
+import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
+import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
 import teetime.framework.pipe.SpScPipe;
 import teetime.stage.CollectorSink;
 import teetime.stage.NoopFilter;
@@ -30,6 +34,7 @@ import teetime.stage.Relay;
 import teetime.stage.StartTimestampFilter;
 import teetime.stage.StopTimestampFilter;
 import teetime.stage.basic.distributor.Distributor;
+import teetime.stage.io.EveryXthPrinter;
 import teetime.util.ConstructorClosure;
 import teetime.util.TimestampObject;
 
@@ -38,11 +43,13 @@ import teetime.util.TimestampObject;
  *
  * @since 1.10
  */
-public class MethodCallThroughputAnalysis16 {
+public class MethodCallThroughputAnalysis16 extends AnalysisConfiguration {
 
 	private static final int SPSC_INITIAL_CAPACITY = 100100;
 	private static final int NUM_WORKER_THREADS = Runtime.getRuntime().availableProcessors();
 
+	private final IPipeFactory intraThreadPipeFactory;
+
 	private int numInputObjects;
 	private ConstructorClosure<TimestampObject> inputObjectCreator;
 	private int numNoopFilters;
@@ -55,6 +62,10 @@ public class MethodCallThroughputAnalysis16 {
 
 	private int numWorkerThreads;
 
+	public MethodCallThroughputAnalysis16() {
+		intraThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false);
+	}
+
 	public void init() {
 		OldHeadPipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> producerPipeline = this.buildProducerPipeline(this.numInputObjects,
 				this.inputObjectCreator);
@@ -68,7 +79,8 @@ public class MethodCallThroughputAnalysis16 {
 			this.timestampObjectsList.add(resultList);
 
 			OldHeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>> workerPipeline = this.buildPipeline(producerPipeline, resultList);
-			this.workerThreads[i] = new Thread(new RunnableProducerStage(workerPipeline));
+			this.workerThreads[i] = new Thread(new RunnableConsumerStage(workerPipeline));
+			workerPipeline.setOwningThread(this.workerThreads[i]);
 		}
 	}
 
@@ -81,7 +93,7 @@ public class MethodCallThroughputAnalysis16 {
 		pipeline.setFirstStage(objectProducer);
 		pipeline.setLastStage(distributor);
 
-		SingleElementPipe.connect(objectProducer.getOutputPort(), distributor.getInputPort());
+		intraThreadPipeFactory.create(objectProducer.getOutputPort(), distributor.getInputPort());
 
 		return pipeline;
 	}
@@ -102,6 +114,7 @@ public class MethodCallThroughputAnalysis16 {
 			noopFilters[i] = new NoopFilter<TimestampObject>();
 		}
 		final StopTimestampFilter stopTimestampFilter = new StopTimestampFilter();
+		EveryXthPrinter<TimestampObject> everyXthPrinter = new EveryXthPrinter<TimestampObject>(100000);
 		final CollectorSink<TimestampObject> collectorSink = new CollectorSink<TimestampObject>(timestampObjects);
 
 		final OldHeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new OldHeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>>();
@@ -110,20 +123,20 @@ public class MethodCallThroughputAnalysis16 {
 
 		SpScPipe.connect(previousStage.getLastStage().getNewOutputPort(), relay.getInputPort(), SPSC_INITIAL_CAPACITY);
 
-		SingleElementPipe.connect(relay.getOutputPort(), startTimestampFilter.getInputPort());
+		intraThreadPipeFactory.create(relay.getOutputPort(), startTimestampFilter.getInputPort());
 
-		SingleElementPipe.connect(startTimestampFilter.getOutputPort(), noopFilters[0].getInputPort());
+		intraThreadPipeFactory.create(startTimestampFilter.getOutputPort(), noopFilters[0].getInputPort());
 		for (int i = 0; i < noopFilters.length - 1; i++) {
-			SingleElementPipe.connect(noopFilters[i].getOutputPort(), noopFilters[i + 1].getInputPort());
+			intraThreadPipeFactory.create(noopFilters[i].getOutputPort(), noopFilters[i + 1].getInputPort());
 		}
-		SingleElementPipe.connect(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort());
-		SingleElementPipe.connect(stopTimestampFilter.getOutputPort(), collectorSink.getInputPort());
+		intraThreadPipeFactory.create(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort());
+		intraThreadPipeFactory.create(stopTimestampFilter.getOutputPort(), everyXthPrinter.getInputPort());
+		intraThreadPipeFactory.create(everyXthPrinter.getNewOutputPort(), collectorSink.getInputPort());
 
 		return pipeline;
 	}
 
 	public void start() {
-
 		this.producerThread.start();
 
 		for (Thread workerThread : this.workerThreads) {
diff --git a/src/performancetest/java/teetime/framework/OldPipeline.java b/src/performancetest/java/teetime/framework/OldPipeline.java
index 56c8fffd..900de2a1 100644
--- a/src/performancetest/java/teetime/framework/OldPipeline.java
+++ b/src/performancetest/java/teetime/framework/OldPipeline.java
@@ -57,4 +57,19 @@ public class OldPipeline<FirstStage extends Stage, LastStage extends Stage> exte
 		return firstStage.shouldBeTerminated();
 	}
 
+	@Override
+	protected InputPort<?>[] getInputPorts() {
+		return firstStage.getInputPorts();
+	}
+
+	@Override
+	public void setOwningThread(final Thread owningThread) {
+		firstStage.setOwningThread(owningThread);
+	}
+
+	@Override
+	public Thread getOwningThread() {
+		return firstStage.getOwningThread();
+	}
+
 }
diff --git a/src/test/java/teetime/framework/pipe/SpScPipeTest.java b/src/test/java/teetime/framework/pipe/SpScPipeTest.java
index 20d366d6..2600789e 100644
--- a/src/test/java/teetime/framework/pipe/SpScPipeTest.java
+++ b/src/test/java/teetime/framework/pipe/SpScPipeTest.java
@@ -18,7 +18,7 @@ public class SpScPipeTest {
 
 	@Test
 	public void testSignalOrdering() throws Exception {
-		OutputPort<? extends Object> sourcePort = null;
+		OutputPort<Object> sourcePort = null;
 		InputPort<Object> targetPort = null;
 		AbstractInterThreadPipe pipe = new SpScPipe(sourcePort, targetPort, 1); // IPipe does not provide getSignal method
 
-- 
GitLab