From d83dea0e2244a948f047fa607385325a4fc8a1f5 Mon Sep 17 00:00:00 2001
From: Nelson Tavares de Sousa <ntd@informatik.uni-kiel.de>
Date: Fri, 7 Nov 2014 13:40:11 +0100
Subject: [PATCH] Added functionality from prod-cons-abstraction branch

---
 .../java/teetime/framework/AbstractStage.java | 18 +++++++-
 src/main/java/teetime/framework/Analysis.java |  6 +--
 .../framework/AnalysisConfiguration.java      | 43 ++++++++++++++++---
 .../java/teetime/framework/ProducerStage.java | 12 ++----
 .../java/teetime/framework/RunnableStage.java |  4 +-
 src/main/java/teetime/framework/Stage.java    |  2 +-
 .../{HeadStage.java => Terminable.java}       |  7 ++-
 .../framework/TerminationStrategy.java        |  5 +++
 .../java/teetime/stage/io/File2ByteArray.java |  4 +-
 .../MethodCallThroughputAnalysis9.java        |  4 +-
 .../MethodCallThroughputAnalysis11.java       |  4 +-
 .../MethodCallThroughputAnalysis14.java       |  4 +-
 .../MethodCallThroughputAnalysis15.java       |  4 +-
 .../teetime/framework/OldHeadPipeline.java    |  2 +-
 .../java/teetime/framework/OldPipeline.java   | 18 ++++++++
 15 files changed, 102 insertions(+), 35 deletions(-)
 rename src/main/java/teetime/framework/{HeadStage.java => Terminable.java} (53%)
 create mode 100644 src/main/java/teetime/framework/TerminationStrategy.java

diff --git a/src/main/java/teetime/framework/AbstractStage.java b/src/main/java/teetime/framework/AbstractStage.java
index 63ef5546..a7fb99ed 100644
--- a/src/main/java/teetime/framework/AbstractStage.java
+++ b/src/main/java/teetime/framework/AbstractStage.java
@@ -33,6 +33,7 @@ public abstract class AbstractStage implements Stage {
 	protected OutputPort<?>[] cachedOutputPorts;
 
 	private final Map<ISignal, Void> visited = new HashMap<ISignal, Void>();
+	private boolean shouldTerminate;
 
 	public AbstractStage() {
 		this.id = UUID.randomUUID().toString(); // the id should only be represented by a UUID, not additionally by the class name
@@ -125,7 +126,7 @@ public abstract class AbstractStage implements Stage {
 	}
 
 	public void onTerminating() throws Exception {
-		// empty default implementation
+		terminate();
 	}
 
 	protected <T> InputPort<T> createInputPort() {
@@ -162,4 +163,19 @@ public abstract class AbstractStage implements Stage {
 		return this.getClass().getName() + ": " + this.id;
 	}
 
+	@Override
+	public void terminate() {
+		this.shouldTerminate = true;
+	}
+
+	@Override
+	public boolean shouldBeTerminated() {
+		return this.shouldTerminate;
+	}
+
+	@Override
+	public TerminationStrategy getTerminationStrategy() {
+		return TerminationStrategy.BY_SIGNAL;
+	}
+
 }
diff --git a/src/main/java/teetime/framework/Analysis.java b/src/main/java/teetime/framework/Analysis.java
index c9fd8e11..40aad369 100644
--- a/src/main/java/teetime/framework/Analysis.java
+++ b/src/main/java/teetime/framework/Analysis.java
@@ -28,17 +28,17 @@ public class Analysis implements UncaughtExceptionHandler {
 	}
 
 	public void init() {
-		for (HeadStage stage : this.configuration.getConsumerStages()) {
+		for (Stage stage : this.configuration.getConsumerStages()) {
 			Thread thread = new Thread(new RunnableStage(stage));
 			this.consumerThreads.add(thread);
 		}
 
-		for (HeadStage stage : this.configuration.getFiniteProducerStages()) {
+		for (Stage stage : this.configuration.getFiniteProducerStages()) {
 			Thread thread = new Thread(new RunnableStage(stage));
 			this.finiteProducerThreads.add(thread);
 		}
 
-		for (HeadStage stage : this.configuration.getInfiniteProducerStages()) {
+		for (Stage stage : this.configuration.getInfiniteProducerStages()) {
 			Thread thread = new Thread(new RunnableStage(stage));
 			this.infiniteProducerThreads.add(thread);
 		}
diff --git a/src/main/java/teetime/framework/AnalysisConfiguration.java b/src/main/java/teetime/framework/AnalysisConfiguration.java
index c0515944..67eb52d9 100644
--- a/src/main/java/teetime/framework/AnalysisConfiguration.java
+++ b/src/main/java/teetime/framework/AnalysisConfiguration.java
@@ -9,20 +9,51 @@ public class AnalysisConfiguration {
 
 	protected static final PipeFactoryRegistry PIPE_FACTORY_REGISTRY = PipeFactoryRegistry.INSTANCE;
 
-	private final List<HeadStage> consumerStages = new LinkedList<HeadStage>();
-	private final List<HeadStage> finiteProducerStages = new LinkedList<HeadStage>();
-	private final List<HeadStage> infiniteProducerStages = new LinkedList<HeadStage>();
+	private final List<Runnable> threadableStageJobs = new LinkedList<Runnable>();
 
-	public List<HeadStage> getConsumerStages() {
+	private final List<Stage> consumerStages = new LinkedList<Stage>();
+	private final List<Stage> finiteProducerStages = new LinkedList<Stage>();
+	private final List<Stage> infiniteProducerStages = new LinkedList<Stage>();
+
+	public List<Stage> getConsumerStages() {
 		return this.consumerStages;
 	}
 
-	public List<HeadStage> getFiniteProducerStages() {
+	public List<Stage> getFiniteProducerStages() {
 		return this.finiteProducerStages;
 	}
 
-	public List<HeadStage> getInfiniteProducerStages() {
+	public List<Stage> getInfiniteProducerStages() {
 		return this.infiniteProducerStages;
 	}
 
+	public void addThreadableStage(final Stage stage) {
+		// wrap the stage categorization in a runnable
+		// because the termination strategy could depend on port configuration that is set later
+		final Runnable addThreadableStageJob = new Runnable() {
+			@Override
+			public void run() {
+				switch (stage.getTerminationStrategy()) {
+				case BY_SIGNAL:
+					consumerStages.add(stage);
+					break;
+				case BY_SELF_DECISION:
+					finiteProducerStages.add(stage);
+					break;
+				case BY_INTERRUPT:
+					infiniteProducerStages.add(stage);
+					break;
+				}
+			}
+		};
+
+		threadableStageJobs.add(addThreadableStageJob);
+	}
+
+	void init() {
+		for (Runnable job : threadableStageJobs) {
+			job.run();
+		}
+	}
+
 }
diff --git a/src/main/java/teetime/framework/ProducerStage.java b/src/main/java/teetime/framework/ProducerStage.java
index ac3dbd1f..02348cd9 100644
--- a/src/main/java/teetime/framework/ProducerStage.java
+++ b/src/main/java/teetime/framework/ProducerStage.java
@@ -9,10 +9,9 @@ package teetime.framework;
  *            the type of the default output port
  *
  */
-public abstract class ProducerStage<O> extends AbstractStage implements HeadStage {
+public abstract class ProducerStage<O> extends AbstractStage implements Stage {
 
 	protected final OutputPort<O> outputPort = this.createOutputPort();
-	private boolean shouldTerminate;
 
 	public final OutputPort<O> getOutputPort() {
 		return this.outputPort;
@@ -24,13 +23,8 @@ public abstract class ProducerStage<O> extends AbstractStage implements HeadStag
 	}
 
 	@Override
-	public void terminate() {
-		this.shouldTerminate = true;
-	}
-
-	@Override
-	public boolean shouldBeTerminated() {
-		return this.shouldTerminate;
+	public TerminationStrategy getTerminationStrategy() {
+		return TerminationStrategy.BY_SELF_DECISION;
 	}
 
 	protected abstract void execute();
diff --git a/src/main/java/teetime/framework/RunnableStage.java b/src/main/java/teetime/framework/RunnableStage.java
index a2240015..882423a3 100644
--- a/src/main/java/teetime/framework/RunnableStage.java
+++ b/src/main/java/teetime/framework/RunnableStage.java
@@ -10,11 +10,11 @@ import teetime.framework.validation.AnalysisNotValidException;
 
 public class RunnableStage implements Runnable {
 
-	private final HeadStage stage;
+	private final Stage stage;
 	private final Logger logger;
 	private boolean validationEnabled;
 
-	public RunnableStage(final HeadStage stage) {
+	public RunnableStage(final Stage stage) {
 		this.stage = stage;
 		this.logger = LoggerFactory.getLogger(stage.getClass());
 	}
diff --git a/src/main/java/teetime/framework/Stage.java b/src/main/java/teetime/framework/Stage.java
index 3f0798ac..55d676b9 100644
--- a/src/main/java/teetime/framework/Stage.java
+++ b/src/main/java/teetime/framework/Stage.java
@@ -5,7 +5,7 @@ import java.util.List;
 import teetime.framework.signal.ISignal;
 import teetime.framework.validation.InvalidPortConnection;
 
-public interface Stage {
+public interface Stage extends Terminable {
 
 	String getId();
 
diff --git a/src/main/java/teetime/framework/HeadStage.java b/src/main/java/teetime/framework/Terminable.java
similarity index 53%
rename from src/main/java/teetime/framework/HeadStage.java
rename to src/main/java/teetime/framework/Terminable.java
index 5e8de765..fd606b2f 100644
--- a/src/main/java/teetime/framework/HeadStage.java
+++ b/src/main/java/teetime/framework/Terminable.java
@@ -1,8 +1,11 @@
 package teetime.framework;
 
-public interface HeadStage extends Stage {
+interface Terminable {
 
-	boolean shouldBeTerminated();
+	TerminationStrategy getTerminationStrategy();
 
 	void terminate();
+
+	boolean shouldBeTerminated();
+
 }
diff --git a/src/main/java/teetime/framework/TerminationStrategy.java b/src/main/java/teetime/framework/TerminationStrategy.java
new file mode 100644
index 00000000..eef7d787
--- /dev/null
+++ b/src/main/java/teetime/framework/TerminationStrategy.java
@@ -0,0 +1,5 @@
+package teetime.framework;
+
+public enum TerminationStrategy {
+	BY_SIGNAL, BY_SELF_DECISION, BY_INTERRUPT
+}
diff --git a/src/main/java/teetime/stage/io/File2ByteArray.java b/src/main/java/teetime/stage/io/File2ByteArray.java
index 390598a1..aa337687 100644
--- a/src/main/java/teetime/stage/io/File2ByteArray.java
+++ b/src/main/java/teetime/stage/io/File2ByteArray.java
@@ -4,12 +4,12 @@ import java.io.File;
 import java.io.IOException;
 
 import teetime.framework.ConsumerStage;
-import teetime.framework.HeadStage;
 import teetime.framework.OutputPort;
+import teetime.framework.Stage;
 
 import com.google.common.io.Files;
 
-public class File2ByteArray extends ConsumerStage<File> implements HeadStage {
+public class File2ByteArray extends ConsumerStage<File> implements Stage {
 
 	private final OutputPort<byte[]> outputPort = this.createOutputPort();
 
diff --git a/src/performancetest/java/teetime/examples/experiment09/MethodCallThroughputAnalysis9.java b/src/performancetest/java/teetime/examples/experiment09/MethodCallThroughputAnalysis9.java
index 882af15e..785ef39d 100644
--- a/src/performancetest/java/teetime/examples/experiment09/MethodCallThroughputAnalysis9.java
+++ b/src/performancetest/java/teetime/examples/experiment09/MethodCallThroughputAnalysis9.java
@@ -18,8 +18,8 @@ package teetime.examples.experiment09;
 import java.util.List;
 
 import teetime.framework.OldHeadPipeline;
-import teetime.framework.HeadStage;
 import teetime.framework.RunnableStage;
+import teetime.framework.Stage;
 import teetime.framework.pipe.CommittablePipe;
 import teetime.stage.CollectorSink;
 import teetime.stage.NoopFilter;
@@ -43,7 +43,7 @@ public class MethodCallThroughputAnalysis9 {
 	private Runnable runnable;
 
 	public void init() {
-		HeadStage pipeline = this.buildPipeline();
+		Stage pipeline = this.buildPipeline();
 		this.runnable = new RunnableStage(pipeline);
 	}
 
diff --git a/src/performancetest/java/teetime/examples/experiment11/MethodCallThroughputAnalysis11.java b/src/performancetest/java/teetime/examples/experiment11/MethodCallThroughputAnalysis11.java
index e24c7d48..31e7dfbd 100644
--- a/src/performancetest/java/teetime/examples/experiment11/MethodCallThroughputAnalysis11.java
+++ b/src/performancetest/java/teetime/examples/experiment11/MethodCallThroughputAnalysis11.java
@@ -18,8 +18,8 @@ package teetime.examples.experiment11;
 import java.util.List;
 
 import teetime.framework.OldHeadPipeline;
-import teetime.framework.HeadStage;
 import teetime.framework.RunnableStage;
+import teetime.framework.Stage;
 import teetime.framework.pipe.UnorderedGrowablePipe;
 import teetime.stage.CollectorSink;
 import teetime.stage.NoopFilter;
@@ -43,7 +43,7 @@ public class MethodCallThroughputAnalysis11 {
 	private Runnable runnable;
 
 	public void init() {
-		HeadStage pipeline = this.buildPipeline(this.numInputObjects, this.inputObjectCreator);
+		Stage pipeline = this.buildPipeline(this.numInputObjects, this.inputObjectCreator);
 		this.runnable = new RunnableStage(pipeline);
 	}
 
diff --git a/src/performancetest/java/teetime/examples/experiment14/MethodCallThroughputAnalysis14.java b/src/performancetest/java/teetime/examples/experiment14/MethodCallThroughputAnalysis14.java
index 01de043b..f744ee4a 100644
--- a/src/performancetest/java/teetime/examples/experiment14/MethodCallThroughputAnalysis14.java
+++ b/src/performancetest/java/teetime/examples/experiment14/MethodCallThroughputAnalysis14.java
@@ -18,8 +18,8 @@ package teetime.examples.experiment14;
 import java.util.List;
 
 import teetime.framework.OldHeadPipeline;
-import teetime.framework.HeadStage;
 import teetime.framework.RunnableStage;
+import teetime.framework.Stage;
 import teetime.framework.pipe.IPipeFactory;
 import teetime.framework.pipe.PipeFactoryRegistry;
 import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
@@ -47,7 +47,7 @@ public class MethodCallThroughputAnalysis14 {
 	private final PipeFactoryRegistry pipeFactory = PipeFactoryRegistry.INSTANCE;
 
 	public void init() {
-		HeadStage pipeline = this.buildPipeline();
+		Stage pipeline = this.buildPipeline();
 		this.runnable = new RunnableStage(pipeline);
 	}
 
diff --git a/src/performancetest/java/teetime/examples/experiment15/MethodCallThroughputAnalysis15.java b/src/performancetest/java/teetime/examples/experiment15/MethodCallThroughputAnalysis15.java
index 6b92bec5..df9e7a4f 100644
--- a/src/performancetest/java/teetime/examples/experiment15/MethodCallThroughputAnalysis15.java
+++ b/src/performancetest/java/teetime/examples/experiment15/MethodCallThroughputAnalysis15.java
@@ -18,8 +18,8 @@ package teetime.examples.experiment15;
 import java.util.List;
 
 import teetime.framework.OldHeadPipeline;
-import teetime.framework.HeadStage;
 import teetime.framework.RunnableStage;
+import teetime.framework.Stage;
 import teetime.framework.pipe.OrderedGrowableArrayPipe;
 import teetime.framework.pipe.SingleElementPipe;
 import teetime.framework.pipe.SpScPipe;
@@ -58,7 +58,7 @@ public class MethodCallThroughputAnalysis15 {
 		OldHeadPipeline<Clock, Sink<Long>> clockPipeline = this.buildClockPipeline();
 		this.clockRunnable = new RunnableStage(clockPipeline);
 
-		HeadStage pipeline = this.buildPipeline(this.clock);
+		Stage pipeline = this.buildPipeline(this.clock);
 		this.runnable = new RunnableStage(pipeline);
 	}
 
diff --git a/src/performancetest/java/teetime/framework/OldHeadPipeline.java b/src/performancetest/java/teetime/framework/OldHeadPipeline.java
index d3dd6414..4d166117 100644
--- a/src/performancetest/java/teetime/framework/OldHeadPipeline.java
+++ b/src/performancetest/java/teetime/framework/OldHeadPipeline.java
@@ -1,7 +1,7 @@
 package teetime.framework;
 
 @Deprecated
-public class OldHeadPipeline<FirstStage extends HeadStage, LastStage extends Stage> extends OldPipeline<FirstStage, LastStage> implements HeadStage {
+public class OldHeadPipeline<FirstStage extends Stage, LastStage extends Stage> extends OldPipeline<FirstStage, LastStage> implements Stage {
 
 	public OldHeadPipeline() {}
 
diff --git a/src/performancetest/java/teetime/framework/OldPipeline.java b/src/performancetest/java/teetime/framework/OldPipeline.java
index 11ff018d..e6c8244b 100644
--- a/src/performancetest/java/teetime/framework/OldPipeline.java
+++ b/src/performancetest/java/teetime/framework/OldPipeline.java
@@ -57,4 +57,22 @@ public class OldPipeline<FirstStage extends Stage, LastStage extends Stage> impl
 		this.lastStage.validateOutputPorts(invalidPortConnections);
 	}
 
+	@Override
+	public TerminationStrategy getTerminationStrategy() {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+	@Override
+	public void terminate() {
+		// TODO Auto-generated method stub
+
+	}
+
+	@Override
+	public boolean shouldBeTerminated() {
+		// TODO Auto-generated method stub
+		return false;
+	}
+
 }
-- 
GitLab