From fc81705b9c673b5c591faf6dbc36d56e1da90ac0 Mon Sep 17 00:00:00 2001
From: Nelson Tavares de Sousa <stu103017@mail.uni-kiel.de>
Date: Wed, 26 Aug 2015 14:54:14 +0200
Subject: [PATCH] moved declareActive to Stage

---
 .../framework/AbstractCompositeStage.java     | 27 +------------------
 .../java/teetime/framework/Configuration.java |  6 ++---
 src/main/java/teetime/framework/Stage.java    | 25 +++++++++++++++++
 .../java/teetime/framework/ThreadService.java |  2 +-
 .../teetime/framework/test/StageTester.java   |  2 +-
 .../wordcounter/WordCounterConfiguration.java |  6 ++---
 .../framework/AbstractCompositeStageTest.java |  8 +++---
 .../java/teetime/framework/ExecutionTest.java |  6 ++---
 ...unnableConsumerStageTestConfiguration.java |  8 +++---
 .../teetime/framework/TerminationTest.java    |  4 +--
 .../java/teetime/framework/TraverserTest.java |  4 +--
 .../framework/WaitStrategyConfiguration.java  |  6 ++---
 .../framework/YieldStrategyConfiguration.java |  4 +--
 .../ExceptionTestConfiguration.java           |  4 +--
 .../dynamic/DynamicDistributorTest.java       |  4 +--
 .../merger/dynamic/DynamicMergerTest.java     |  2 +-
 16 files changed, 59 insertions(+), 59 deletions(-)

diff --git a/src/main/java/teetime/framework/AbstractCompositeStage.java b/src/main/java/teetime/framework/AbstractCompositeStage.java
index 544d55cb..4a835710 100644
--- a/src/main/java/teetime/framework/AbstractCompositeStage.java
+++ b/src/main/java/teetime/framework/AbstractCompositeStage.java
@@ -32,31 +32,6 @@ public abstract class AbstractCompositeStage {
 	 */
 	private static final int DEFAULT_CAPACITY = 4;
 
-	/**
-	 * Execute this method, to add a stage to the configuration, which should be executed in a own thread.
-	 *
-	 * @param stage
-	 *            A arbitrary stage, which will be added to the configuration and executed in a thread.
-	 */
-	protected final void declareActive(final Stage stage) {
-		this.declareActive(stage, stage.getId());
-	}
-
-	/**
-	 * Execute this method, to add a stage to the configuration, which should be executed in a own thread.
-	 *
-	 * @param stage
-	 *            A arbitrary stage, which will be added to the configuration and executed in a thread.
-	 * @param threadName
-	 *            A string which can be used for debugging.
-	 */
-	protected void declareActive(final Stage stage, final String threadName) {
-		AbstractRunnableStage runnable = AbstractRunnableStage.create(stage);
-		Thread newThread = new TeeTimeThread(runnable, threadName);
-		stage.setOwningThread(newThread);
-		stage.setActive(true);
-	}
-
 	/**
 	 * Connects two ports with a pipe with a default capacity of currently {@value #DEFAULT_CAPACITY}.
 	 *
@@ -86,7 +61,7 @@ public abstract class AbstractCompositeStage {
 	protected <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
 		if (sourcePort.getOwningStage().getInputPorts().size() == 0) {
 			if (sourcePort.getOwningStage().getOwningThread() == null) {
-				declareActive(sourcePort.getOwningStage(), sourcePort.getOwningStage().getId());
+				sourcePort.getOwningStage().declareActive();
 			}
 		}
 
diff --git a/src/main/java/teetime/framework/Configuration.java b/src/main/java/teetime/framework/Configuration.java
index fb6270a0..9e6cb968 100644
--- a/src/main/java/teetime/framework/Configuration.java
+++ b/src/main/java/teetime/framework/Configuration.java
@@ -64,10 +64,8 @@ public abstract class Configuration extends AbstractCompositeStage {
 		return factory;
 	}
 
-	@Override
-	protected void declareActive(final Stage stage, final String threadName) {
-		startStage = stage; // memorize an arbitrary stage as starting point for traversing
-		super.declareActive(stage, threadName);
+	protected void registerCustomPipe(final AbstractPipe<?> pipe) {
+		startStage = pipe.getSourcePort().getOwningStage(); // memorize an arbitrary stage as starting point for traversing
 	}
 
 	@Override
diff --git a/src/main/java/teetime/framework/Stage.java b/src/main/java/teetime/framework/Stage.java
index 4a246b59..dcc6b97f 100644
--- a/src/main/java/teetime/framework/Stage.java
+++ b/src/main/java/teetime/framework/Stage.java
@@ -191,4 +191,29 @@ public abstract class Stage {
 		this.isActive = isActive;
 	}
 
+	/**
+	 * Execute this method, to add a stage to the configuration, which should be executed in a own thread.
+	 *
+	 * @param stage
+	 *            A arbitrary stage, which will be added to the configuration and executed in a thread.
+	 */
+	public void declareActive() {
+		declareActive(getId());
+	}
+
+	/**
+	 * Execute this method, to add a stage to the configuration, which should be executed in a own thread.
+	 *
+	 * @param stage
+	 *            A arbitrary stage, which will be added to the configuration and executed in a thread.
+	 * @param threadName
+	 *            A string which can be used for debugging.
+	 */
+	public void declareActive(final String threadName) {
+		AbstractRunnableStage runnable = AbstractRunnableStage.create(this);
+		Thread newThread = new TeeTimeThread(runnable, threadName);
+		this.setOwningThread(newThread);
+		this.setActive(true);
+	}
+
 }
diff --git a/src/main/java/teetime/framework/ThreadService.java b/src/main/java/teetime/framework/ThreadService.java
index 4561e47e..c7656c57 100644
--- a/src/main/java/teetime/framework/ThreadService.java
+++ b/src/main/java/teetime/framework/ThreadService.java
@@ -61,7 +61,7 @@ class ThreadService extends AbstractService<ThreadService> {
 	}
 
 	void startStageAtRuntime(final Stage newStage) {
-		configuration.declareActive(newStage);
+		newStage.declareActive();
 
 		Set<Stage> newThreadableStages = initialize(newStage);
 		startThreads(newThreadableStages);
diff --git a/src/main/java/teetime/framework/test/StageTester.java b/src/main/java/teetime/framework/test/StageTester.java
index eed58f55..10313c67 100644
--- a/src/main/java/teetime/framework/test/StageTester.java
+++ b/src/main/java/teetime/framework/test/StageTester.java
@@ -91,7 +91,7 @@ public final class StageTester {
 				connectPorts(producer.getOutputPort(), inputHolder.getPort());
 			}
 
-			declareActive(stage);
+			stage.declareActive();
 
 			for (OutputHolder<?> outputHolder : outputHolders) {
 				final CollectorSink<Object> sink = new CollectorSink<Object>(outputHolder.getOutputElements());
diff --git a/src/test/java/teetime/examples/wordcounter/WordCounterConfiguration.java b/src/test/java/teetime/examples/wordcounter/WordCounterConfiguration.java
index 529a7286..af9e9c3f 100644
--- a/src/test/java/teetime/examples/wordcounter/WordCounterConfiguration.java
+++ b/src/test/java/teetime/examples/wordcounter/WordCounterConfiguration.java
@@ -83,7 +83,7 @@ public class WordCounterConfiguration extends Configuration {
 			connectPorts(distributor.getNewOutputPort(), threadableStage.getInputPort(), 1000);
 			connectPorts(wc.getOutputPort(), merger.getNewInputPort());
 			// Add WordCounter as a threadable stage, so it runs in its own thread
-			declareActive(threadableStage.getInputPort().getOwningStage());
+			threadableStage.getInputPort().getOwningStage().declareActive();
 
 			distributorPorts.add(threadableStage.getInputPort());
 			mergerPorts.add(wc.getOutputPort());
@@ -95,8 +95,8 @@ public class WordCounterConfiguration extends Configuration {
 		connectPorts(merger.getOutputPort(), result.getInputPort());
 
 		// Add the first and last part to the threadable stages
-		declareActive(init);
-		declareActive(merger);
+		init.declareActive();
+		merger.declareActive();
 	}
 
 	public MonitoringThread getMonitoringThread() {
diff --git a/src/test/java/teetime/framework/AbstractCompositeStageTest.java b/src/test/java/teetime/framework/AbstractCompositeStageTest.java
index 418d9994..8cd72834 100644
--- a/src/test/java/teetime/framework/AbstractCompositeStageTest.java
+++ b/src/test/java/teetime/framework/AbstractCompositeStageTest.java
@@ -34,12 +34,12 @@ public class AbstractCompositeStageTest {
 	private class NestedConf extends Configuration {
 
 		private final InitialElementProducer<Object> init;
-		private final Sink sink;
+		private final Sink<Object> sink;
 		private final TestNestingCompositeStage compositeStage;
 
 		public NestedConf() {
 			init = new InitialElementProducer<Object>(new Object());
-			sink = new Sink();
+			sink = new Sink<Object>();
 			compositeStage = new TestNestingCompositeStage();
 			connectPorts(init.getOutputPort(), compositeStage.firstCompositeStage.firstCounter.getInputPort());
 			connectPorts(compositeStage.secondCompositeStage.secondCounter.getOutputPort(), sink.getInputPort());
@@ -52,7 +52,7 @@ public class AbstractCompositeStageTest {
 		private final Counter firstCounter = new Counter();
 
 		public TestCompositeOneStage() {
-			declareActive(firstCounter);
+			firstCounter.declareActive();
 		}
 
 	}
@@ -63,7 +63,7 @@ public class AbstractCompositeStageTest {
 		private final Counter secondCounter = new Counter();
 
 		public TestCompositeTwoStage() {
-			declareActive(firstCounter);
+			firstCounter.declareActive();
 			connectPorts(firstCounter.getOutputPort(), secondCounter.getInputPort());
 		}
 
diff --git a/src/test/java/teetime/framework/ExecutionTest.java b/src/test/java/teetime/framework/ExecutionTest.java
index dcd44505..617ab6a1 100644
--- a/src/test/java/teetime/framework/ExecutionTest.java
+++ b/src/test/java/teetime/framework/ExecutionTest.java
@@ -118,7 +118,7 @@ public class ExecutionTest {
 		public AnalysisTestConfig(final boolean inter) {
 			connectPorts(init.getOutputPort(), sink.getInputPort());
 			if (inter) {
-				declareActive(sink);
+				sink.declareActive();
 			}
 		}
 	}
@@ -143,7 +143,7 @@ public class ExecutionTest {
 			connectPorts(init.getOutputPort(), iof.getInputPort());
 			connectPorts(iof.getMatchedOutputPort(), sink.getInputPort());
 			connectPorts(init.createOutputPort(), sink.createInputPort());
-			declareActive(iof);
+			iof.declareActive();
 		}
 	}
 
@@ -191,7 +191,7 @@ public class ExecutionTest {
 			stageWithNamedThread = new InitialElementProducer<Object>(new Object());
 			Sink<Object> sink = new Sink<Object>();
 
-			declareActive(stageWithNamedThread, "TestName");
+			stageWithNamedThread.declareActive("TestName");
 
 			connectPorts(stageWithNamedThread.getOutputPort(), sink.getInputPort());
 		}
diff --git a/src/test/java/teetime/framework/RunnableConsumerStageTestConfiguration.java b/src/test/java/teetime/framework/RunnableConsumerStageTestConfiguration.java
index 3086ed51..79786451 100644
--- a/src/test/java/teetime/framework/RunnableConsumerStageTestConfiguration.java
+++ b/src/test/java/teetime/framework/RunnableConsumerStageTestConfiguration.java
@@ -18,6 +18,7 @@ package teetime.framework;
 import java.util.ArrayList;
 import java.util.List;
 
+import teetime.framework.pipe.IPipe;
 import teetime.framework.pipe.SpScPipeFactory;
 import teetime.stage.CollectorSink;
 import teetime.stage.InitialElementProducer;
@@ -30,14 +31,15 @@ public class RunnableConsumerStageTestConfiguration extends Configuration {
 	public RunnableConsumerStageTestConfiguration(final Integer... inputElements) {
 		InitialElementProducer<Integer> producer = new InitialElementProducer<Integer>(inputElements);
 		if (inputElements.length > 0) {
-			declareActive(producer);
+			producer.declareActive();
 		}
 
 		CollectorSink<Integer> collectorSink = new CollectorSink<Integer>(collectedElements);
-		declareActive(collectorSink);
+		collectorSink.declareActive();
 
 		// Can not use createPorts, as the if condition above will lead to an exception
-		new SpScPipeFactory().create(producer.getOutputPort(), collectorSink.getInputPort());
+		IPipe pipe = new SpScPipeFactory().create(producer.getOutputPort(), collectorSink.getInputPort());
+		registerCustomPipe((AbstractPipe<?>) pipe);
 
 		this.collectorSink = collectorSink;
 	}
diff --git a/src/test/java/teetime/framework/TerminationTest.java b/src/test/java/teetime/framework/TerminationTest.java
index 452cdbbe..f08b772f 100644
--- a/src/test/java/teetime/framework/TerminationTest.java
+++ b/src/test/java/teetime/framework/TerminationTest.java
@@ -55,11 +55,11 @@ public class TerminationTest {
 				connectPorts(init.getOutputPort(), firstProp.getInputPort());
 				connectPorts(firstProp.getOutputPort(), sinkStage.getInputPort(), capacity);
 				connectPorts(sinkStage.getOutputPort(), finalProp.getInputPort());
-				declareActive(sinkStage);
+				sinkStage.declareActive();
 			} else {
 				Sink<Integer> sink = new Sink<Integer>();
 				connectPorts(init.getOutputPort(), sink.getInputPort(), capacity);
-				declareActive(sink);
+				sink.declareActive();
 			}
 		}
 
diff --git a/src/test/java/teetime/framework/TraverserTest.java b/src/test/java/teetime/framework/TraverserTest.java
index aeae76c9..b4f3c0de 100644
--- a/src/test/java/teetime/framework/TraverserTest.java
+++ b/src/test/java/teetime/framework/TraverserTest.java
@@ -86,14 +86,14 @@ public class TraverserTest {
 				connectPorts(distributor.getNewOutputPort(), wc.getInputPort());
 				connectPorts(wc.getOutputPort(), merger.getNewInputPort());
 				// Add WordCounter as a threadable stage, so it runs in its own thread
-				declareActive(wc.getInputPort().getOwningStage());
+				wc.getInputPort().getOwningStage().declareActive();
 			}
 
 			// Connect the stages of the last part
 			connectPorts(merger.getOutputPort(), result.getInputPort());
 
 			// Add the first and last part to the threadable stages
-			declareActive(merger);
+			merger.declareActive();
 		}
 
 	}
diff --git a/src/test/java/teetime/framework/WaitStrategyConfiguration.java b/src/test/java/teetime/framework/WaitStrategyConfiguration.java
index 8003cb7d..507dc935 100644
--- a/src/test/java/teetime/framework/WaitStrategyConfiguration.java
+++ b/src/test/java/teetime/framework/WaitStrategyConfiguration.java
@@ -29,13 +29,13 @@ class WaitStrategyConfiguration extends Configuration {
 	public WaitStrategyConfiguration(final long initialDelayInMs, final Object... elements) {
 
 		Stage producer = buildProducer(elements);
-		declareActive(producer);
+		producer.declareActive();
 
 		Stage consumer = buildConsumer(delay);
-		declareActive(consumer);
+		consumer.declareActive();
 
 		Clock clock = buildClock(initialDelayInMs, delay);
-		declareActive(clock);
+		clock.declareActive();
 	}
 
 	private Clock buildClock(final long initialDelayInMs, final Delay<Object> delay) {
diff --git a/src/test/java/teetime/framework/YieldStrategyConfiguration.java b/src/test/java/teetime/framework/YieldStrategyConfiguration.java
index 0cc890ea..a44f45bf 100644
--- a/src/test/java/teetime/framework/YieldStrategyConfiguration.java
+++ b/src/test/java/teetime/framework/YieldStrategyConfiguration.java
@@ -26,10 +26,10 @@ class YieldStrategyConfiguration extends Configuration {
 	public YieldStrategyConfiguration(final Object... elements) {
 
 		InitialElementProducer<Object> producer = buildProducer(elements);
-		declareActive(producer);
+		producer.declareActive();
 
 		Stage consumer = buildConsumer(producer);
-		declareActive(consumer);
+		consumer.declareActive();
 	}
 
 	private InitialElementProducer<Object> buildProducer(final Object... elements) {
diff --git a/src/test/java/teetime/framework/exceptionHandling/ExceptionTestConfiguration.java b/src/test/java/teetime/framework/exceptionHandling/ExceptionTestConfiguration.java
index 54de3f41..79f7ff1d 100644
--- a/src/test/java/teetime/framework/exceptionHandling/ExceptionTestConfiguration.java
+++ b/src/test/java/teetime/framework/exceptionHandling/ExceptionTestConfiguration.java
@@ -33,8 +33,8 @@ public class ExceptionTestConfiguration extends Configuration {
 		connectPorts(first.getOutputPort(), second.getInputPort());
 		// this.addThreadableStage(new ExceptionTestStage());
 
-		this.declareActive(second);
-		this.declareActive(third);
+		second.declareActive();
+		third.declareActive();
 	}
 
 }
diff --git a/src/test/java/teetime/stage/basic/distributor/dynamic/DynamicDistributorTest.java b/src/test/java/teetime/stage/basic/distributor/dynamic/DynamicDistributorTest.java
index 553c5345..f8df4b07 100644
--- a/src/test/java/teetime/stage/basic/distributor/dynamic/DynamicDistributorTest.java
+++ b/src/test/java/teetime/stage/basic/distributor/dynamic/DynamicDistributorTest.java
@@ -137,8 +137,8 @@ public class DynamicDistributorTest {
 			connectPorts(initialElementProducer.getOutputPort(), distributor.getInputPort());
 			connectPorts(distributor.getNewOutputPort(), collectorSink.getInputPort());
 
-			declareActive(distributor);
-			declareActive(collectorSink);
+			distributor.declareActive();
+			collectorSink.declareActive();
 
 			for (PortAction<DynamicDistributor<T>> a : inputActions) {
 				distributor.addPortActionRequest(a);
diff --git a/src/test/java/teetime/stage/basic/merger/dynamic/DynamicMergerTest.java b/src/test/java/teetime/stage/basic/merger/dynamic/DynamicMergerTest.java
index 0b5187a1..fdff1836 100644
--- a/src/test/java/teetime/stage/basic/merger/dynamic/DynamicMergerTest.java
+++ b/src/test/java/teetime/stage/basic/merger/dynamic/DynamicMergerTest.java
@@ -118,7 +118,7 @@ public class DynamicMergerTest {
 			connectPorts(initialElementProducer.getOutputPort(), merger.getNewInputPort());
 			connectPorts(merger.getOutputPort(), collectorSink.getInputPort());
 
-			declareActive(merger);
+			merger.declareActive();
 
 			for (PortAction<DynamicMerger<T>> a : inputActions) {
 				boolean added = merger.addPortActionRequest(a);
-- 
GitLab