From 0d73dd201d3badef1a8cf052ab866c5f383ddcbc Mon Sep 17 00:00:00 2001
From: Nelson Tavares de Sousa <stu103017@mail.uni-kiel.de>
Date: Wed, 17 Jun 2015 19:23:58 +0200
Subject: [PATCH] New concept to enable multithreading in nested stages

---
 .../framework/AbstractCompositeStage.java     | 29 +++++++++++++++++--
 .../framework/AnalysisConfiguration.java      | 18 +++---------
 .../framework/AnalysisInstantiation.java      | 15 ++++++++++
 src/main/java/teetime/framework/Network.java  | 10 +++++++
 .../teetime/stage/io/EveryXthPrinter.java     |  6 ++--
 .../teetime/stage/string/WordCounter.java     |  6 ++--
 .../java/teetime/framework/TraversorTest.java |  4 +--
 7 files changed, 66 insertions(+), 22 deletions(-)
 create mode 100644 src/main/java/teetime/framework/Network.java

diff --git a/src/main/java/teetime/framework/AbstractCompositeStage.java b/src/main/java/teetime/framework/AbstractCompositeStage.java
index 6623809d..582ef9b0 100644
--- a/src/main/java/teetime/framework/AbstractCompositeStage.java
+++ b/src/main/java/teetime/framework/AbstractCompositeStage.java
@@ -23,8 +23,33 @@ package teetime.framework;
  *
  *
  */
-public abstract class AbstractCompositeStage extends AnalysisConfiguration {
+public abstract class AbstractCompositeStage extends Network {
 
-	protected abstract Stage getFirstStage();
+	private final AnalysisConfiguration context;
+
+	public abstract Stage getFirstStage();
+
+	public AbstractCompositeStage(final AnalysisConfiguration context) {
+		this.context = context;
+	}
+
+	@Override
+	protected <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
+		context.connectPorts(sourcePort, targetPort, capacity);
+	}
+
+	@Override
+	protected <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
+		connectPorts(sourcePort, targetPort, 4);
+	}
+
+	@Override
+	protected void addThreadableStage(final Stage stage) {
+		context.addThreadableStage(stage);
+	}
+
+	AnalysisConfiguration getContext() {
+		return context;
+	}
 
 }
diff --git a/src/main/java/teetime/framework/AnalysisConfiguration.java b/src/main/java/teetime/framework/AnalysisConfiguration.java
index 0d360b5b..7b285bcd 100644
--- a/src/main/java/teetime/framework/AnalysisConfiguration.java
+++ b/src/main/java/teetime/framework/AnalysisConfiguration.java
@@ -29,7 +29,7 @@ import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
  * Represents a configuration of connected stages, which is needed to run a analysis.
  * Stages can be added by executing {@link #addThreadableStage(Stage)}.
  */
-public abstract class AnalysisConfiguration {
+public abstract class AnalysisConfiguration extends Network {
 
 	private final Set<Stage> threadableStages = new HashSet<Stage>();
 
@@ -59,23 +59,11 @@ public abstract class AnalysisConfiguration {
 	 * @param stage
 	 *            A arbitrary stage, which will be added to the configuration and executed in a thread.
 	 */
+	@Override
 	protected final void addThreadableStage(final Stage stage) {
 		this.threadableStages.add(stage);
 	}
 
-	/**
-	 * Execute this method, to add a CompositeStage to the configuration, which should be executed in a own thread.
-	 *
-	 * @param stage
-	 *            A arbitrary CompositeStage, which will be added to the configuration and executed in a thread.
-	 */
-	protected final void addThreadableStage(final AbstractCompositeStage stage) {
-		this.threadableStages.add(stage.getFirstStage());
-		for (Stage threadableStage : stage.getThreadableStages()) {
-			this.addThreadableStage(threadableStage);
-		}
-	}
-
 	/**
 	 * Connects two stages with a pipe within the same thread.
 	 *
@@ -185,6 +173,7 @@ public abstract class AnalysisConfiguration {
 	 * @param <T>
 	 *            the type of elements to be sent
 	 */
+	@Override
 	protected final <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
 		connectPorts(sourcePort, targetPort, 4);
 	}
@@ -201,6 +190,7 @@ public abstract class AnalysisConfiguration {
 	 * @param <T>
 	 *            the type of elements to be sent
 	 */
+	@Override
 	protected final <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
 		new InstantiationPipe(sourcePort, targetPort, capacity);
 	}
diff --git a/src/main/java/teetime/framework/AnalysisInstantiation.java b/src/main/java/teetime/framework/AnalysisInstantiation.java
index d1283e22..2eba9e4c 100644
--- a/src/main/java/teetime/framework/AnalysisInstantiation.java
+++ b/src/main/java/teetime/framework/AnalysisInstantiation.java
@@ -1,3 +1,18 @@
+/**
+ * Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://teetime.sourceforge.net)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package teetime.framework;
 
 import java.util.HashMap;
diff --git a/src/main/java/teetime/framework/Network.java b/src/main/java/teetime/framework/Network.java
new file mode 100644
index 00000000..e73a7ebf
--- /dev/null
+++ b/src/main/java/teetime/framework/Network.java
@@ -0,0 +1,10 @@
+package teetime.framework;
+
+public abstract class Network {
+
+	protected abstract <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity);
+
+	protected abstract <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort);
+
+	protected abstract void addThreadableStage(final Stage stage);
+}
diff --git a/src/main/java/teetime/stage/io/EveryXthPrinter.java b/src/main/java/teetime/stage/io/EveryXthPrinter.java
index c074dfd5..3b57c7df 100644
--- a/src/main/java/teetime/stage/io/EveryXthPrinter.java
+++ b/src/main/java/teetime/stage/io/EveryXthPrinter.java
@@ -19,6 +19,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import teetime.framework.AbstractCompositeStage;
+import teetime.framework.AnalysisConfiguration;
 import teetime.framework.InputPort;
 import teetime.framework.OutputPort;
 import teetime.framework.Stage;
@@ -31,7 +32,8 @@ public final class EveryXthPrinter<T> extends AbstractCompositeStage {
 	private final Distributor<T> distributor;
 	private final List<Stage> lastStages = new ArrayList<Stage>();
 
-	public EveryXthPrinter(final int threshold) {
+	public EveryXthPrinter(final int threshold, final AnalysisConfiguration context) {
+		super(context);
 		distributor = new Distributor<T>(new CopyByReferenceStrategy());
 		EveryXthStage<T> everyXthStage = new EveryXthStage<T>(threshold);
 		Printer<Integer> printer = new Printer<Integer>();
@@ -51,7 +53,7 @@ public final class EveryXthPrinter<T> extends AbstractCompositeStage {
 	}
 
 	@Override
-	protected Stage getFirstStage() {
+	public Stage getFirstStage() {
 		return distributor;
 	}
 
diff --git a/src/main/java/teetime/stage/string/WordCounter.java b/src/main/java/teetime/stage/string/WordCounter.java
index 9c632b22..fad4c110 100644
--- a/src/main/java/teetime/stage/string/WordCounter.java
+++ b/src/main/java/teetime/stage/string/WordCounter.java
@@ -18,6 +18,7 @@ package teetime.stage.string;
 import java.util.ArrayList;
 
 import teetime.framework.AbstractCompositeStage;
+import teetime.framework.AnalysisConfiguration;
 import teetime.framework.InputPort;
 import teetime.framework.OutputPort;
 import teetime.framework.Stage;
@@ -41,7 +42,8 @@ public final class WordCounter extends AbstractCompositeStage {
 	private final ArrayList<Stage> lastStages = new ArrayList<Stage>();
 
 	// The connection of the different stages is realized within the construction of a instance of this class.
-	public WordCounter() {
+	public WordCounter(final AnalysisConfiguration context) {
+		super(context);
 		this.lastStages.add(this.mapCounter);
 		final ToLowerCase toLowerCase = new ToLowerCase();
 
@@ -51,7 +53,7 @@ public final class WordCounter extends AbstractCompositeStage {
 	}
 
 	@Override
-	protected Stage getFirstStage() {
+	public Stage getFirstStage() {
 		return this.tokenizer;
 	}
 
diff --git a/src/test/java/teetime/framework/TraversorTest.java b/src/test/java/teetime/framework/TraversorTest.java
index f8951048..6c092ac9 100644
--- a/src/test/java/teetime/framework/TraversorTest.java
+++ b/src/test/java/teetime/framework/TraversorTest.java
@@ -77,13 +77,13 @@ public class TraversorTest {
 			// Middle part... multiple instances of WordCounter are created and connected to the merger and distrubuter stages
 			for (int i = 0; i < threads; i++) {
 				// final InputPortSizePrinter<String> inputPortSizePrinter = new InputPortSizePrinter<String>();
-				final WordCounter wc = new WordCounter();
+				final WordCounter wc = new WordCounter(this);
 				// intraFact.create(inputPortSizePrinter.getOutputPort(), wc.getInputPort());
 
 				connectPorts(distributor.getNewOutputPort(), wc.getInputPort());
 				connectPorts(wc.getOutputPort(), merger.getNewInputPort());
 				// Add WordCounter as a threadable stage, so it runs in its own thread
-				addThreadableStage(wc);
+				addThreadableStage(wc.getFirstStage());
 
 			}
 
-- 
GitLab