From 1c315ed8598b4828cf57f5f14023944a0e049db0 Mon Sep 17 00:00:00 2001
From: Nelson Tavares de Sousa <stu103017@mail.uni-kiel.de>
Date: Tue, 19 May 2015 11:18:36 +0200
Subject: [PATCH] added waitForInitializingSignal method

---
 .../teetime/framework/AbstractInterThreadPipe.java     | 10 ++++++++++
 .../teetime/framework/AbstractIntraThreadPipe.java     |  6 ++++++
 src/main/java/teetime/framework/InputPort.java         |  4 ++++
 .../java/teetime/framework/RunnableConsumerStage.java  |  6 +++---
 src/main/java/teetime/framework/pipe/DummyPipe.java    |  5 +++++
 src/main/java/teetime/framework/pipe/IPipe.java        |  2 ++
 .../teetime/stage/basic/merger/MergerTestingPipe.java  |  3 +++
 7 files changed, 33 insertions(+), 3 deletions(-)

diff --git a/src/main/java/teetime/framework/AbstractInterThreadPipe.java b/src/main/java/teetime/framework/AbstractInterThreadPipe.java
index 4deee9e7..acb1752e 100644
--- a/src/main/java/teetime/framework/AbstractInterThreadPipe.java
+++ b/src/main/java/teetime/framework/AbstractInterThreadPipe.java
@@ -24,6 +24,7 @@ import org.jctools.queues.spec.Ordering;
 import org.jctools.queues.spec.Preference;
 
 import teetime.framework.signal.ISignal;
+import teetime.framework.signal.InitializingSignal;
 import teetime.util.concurrent.queue.PCBlockingQueue;
 import teetime.util.concurrent.queue.putstrategy.PutStrategy;
 import teetime.util.concurrent.queue.putstrategy.YieldPutStrategy;
@@ -69,6 +70,15 @@ public abstract class AbstractInterThreadPipe extends AbstractPipe {
 		cachedTargetStage.onSignal(signal, getTargetPort());
 	}
 
+	@Override
+	public final void waitForInitializingSignal() throws InterruptedException {
+		final ISignal signal = signalQueue.take();
+		if (!(signal instanceof InitializingSignal)) {
+			throw new IllegalStateException("Expected InitializingSignal, but was not the first arriving signal");
+		}
+		cachedTargetStage.onSignal(signal, getTargetPort());
+	}
+
 	@Override
 	public final boolean isClosed() {
 		return isClosed;
diff --git a/src/main/java/teetime/framework/AbstractIntraThreadPipe.java b/src/main/java/teetime/framework/AbstractIntraThreadPipe.java
index 92fe284b..ebeccd1c 100644
--- a/src/main/java/teetime/framework/AbstractIntraThreadPipe.java
+++ b/src/main/java/teetime/framework/AbstractIntraThreadPipe.java
@@ -56,4 +56,10 @@ public abstract class AbstractIntraThreadPipe extends AbstractPipe {
 	public void waitForStartSignal() throws InterruptedException {
 		// do nothing
 	}
+
+	@SuppressWarnings("PMD.EmptyMethodInAbstractClassShouldBeAbstract")
+	@Override
+	public void waitForInitializingSignal() throws InterruptedException {
+		// do nothing
+	}
 }
diff --git a/src/main/java/teetime/framework/InputPort.java b/src/main/java/teetime/framework/InputPort.java
index 2c2fee6f..f6e2a30d 100644
--- a/src/main/java/teetime/framework/InputPort.java
+++ b/src/main/java/teetime/framework/InputPort.java
@@ -38,4 +38,8 @@ public final class InputPort<T> extends AbstractPort<T> {
 		pipe.waitForStartSignal();
 	}
 
+	public void waitForInitializingSignal() throws InterruptedException {
+		pipe.waitForInitializingSignal();
+	};
+
 }
diff --git a/src/main/java/teetime/framework/RunnableConsumerStage.java b/src/main/java/teetime/framework/RunnableConsumerStage.java
index ded489eb..7a1d071d 100644
--- a/src/main/java/teetime/framework/RunnableConsumerStage.java
+++ b/src/main/java/teetime/framework/RunnableConsumerStage.java
@@ -43,14 +43,14 @@ final class RunnableConsumerStage extends AbstractRunnableStage {
 	@SuppressWarnings("PMD.GuardLogStatement")
 	@Override
 	protected void beforeStageExecution(final Stage stage) throws InterruptedException {
-		logger.trace("Waiting for start signals..." + inputPorts);
+		logger.trace("Waiting for start signals... " + stage);
 		for (InputPort<?> inputPort : inputPorts) {
-			inputPort.waitForStartSignal();
+			inputPort.waitForInitializingSignal();
 		}
 		for (InputPort<?> inputPort : inputPorts) {
 			inputPort.waitForStartSignal();
 		}
-		logger.trace("Starting..." + stage);
+		logger.trace("Starting... " + stage);
 	}
 
 	@Override
diff --git a/src/main/java/teetime/framework/pipe/DummyPipe.java b/src/main/java/teetime/framework/pipe/DummyPipe.java
index 23031fe4..2fe5d883 100644
--- a/src/main/java/teetime/framework/pipe/DummyPipe.java
+++ b/src/main/java/teetime/framework/pipe/DummyPipe.java
@@ -86,6 +86,11 @@ public final class DummyPipe implements IPipe {
 
 	}
 
+	@Override
+	public void waitForInitializingSignal() throws InterruptedException {
+
+	}
+
 	@Override
 	public void close() {
 
diff --git a/src/main/java/teetime/framework/pipe/IPipe.java b/src/main/java/teetime/framework/pipe/IPipe.java
index f2fd15ec..183aac72 100644
--- a/src/main/java/teetime/framework/pipe/IPipe.java
+++ b/src/main/java/teetime/framework/pipe/IPipe.java
@@ -92,6 +92,8 @@ public interface IPipe {
 
 	void waitForStartSignal() throws InterruptedException;
 
+	void waitForInitializingSignal() throws InterruptedException;
+
 	void close();
 
 }
diff --git a/src/test/java/teetime/stage/basic/merger/MergerTestingPipe.java b/src/test/java/teetime/stage/basic/merger/MergerTestingPipe.java
index 42696bf7..51c82d81 100644
--- a/src/test/java/teetime/stage/basic/merger/MergerTestingPipe.java
+++ b/src/test/java/teetime/stage/basic/merger/MergerTestingPipe.java
@@ -102,6 +102,9 @@ class MergerTestingPipe implements IPipe {
 	@Override
 	public void waitForStartSignal() throws InterruptedException {}
 
+	@Override
+	public void waitForInitializingSignal() throws InterruptedException {}
+
 	@Override
 	public void close() {}
 
-- 
GitLab