From 1a954821a0a7ee6e8c7a90659f0b16872220753a Mon Sep 17 00:00:00 2001
From: Christian Wulf <chw@informatik.uni-kiel.de>
Date: Fri, 4 Jul 2014 10:54:53 +0200
Subject: [PATCH] fixed bugs due to the new signal concept

---
 conf/logging.properties                       |  9 ++--
 .../framework/core/AbstractStage.java         |  6 ++-
 .../framework/core/ConsumerStage.java         |  1 -
 .../framework/core/OutputPort.java            |  4 +-
 .../framework/core/Pipeline.java              | 49 ++++++++++---------
 .../framework/core/ProducerStage.java         |  5 +-
 .../framework/core/RunnableStage.java         |  2 +
 .../framework/core/pipe/IntraThreadPipe.java  |  4 +-
 .../stage/CollectorSink.java                  |  7 +--
 .../stage/Distributor.java                    | 21 ++++++++
 .../stage/ObjectProducer.java                 | 25 +---------
 .../methodcallWithPorts/stage/Relay.java      |  6 +--
 .../MethodCallThroughputAnalysis11.java       |  4 +-
 13 files changed, 80 insertions(+), 63 deletions(-)

diff --git a/conf/logging.properties b/conf/logging.properties
index fa346409..7e34493d 100644
--- a/conf/logging.properties
+++ b/conf/logging.properties
@@ -1,12 +1,13 @@
 .handlers = java.util.logging.ConsoleHandler
-.level = WARNING
+.level = ALL
 
 java.util.logging.ConsoleHandler.level = ALL
 #java.util.logging.ConsoleHandler.formatter = java.util.logging.SimpleFormatter
 java.util.logging.SimpleFormatter.format=[%1$tF %1$tr] %4$s: %5$s (%2$s)%n
 
-#teetime.level = ALL
+teetime.level = ALL
 
-#teetime.variant.methodcallWithPorts.framework.core.level = ALL
-#teetime.variant.methodcallWithPorts.stage.level = FINE
+teetime.variant.methodcallWithPorts.framework.level = ALL
+teetime.variant.methodcallWithPorts.framework.core.level = ALL
+teetime.variant.methodcallWithPorts.stage.level = FINE
 #teetime.variant.methodcallWithPorts.examples.traceReconstructionWithThreads.level = FINE
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java
index 9c3b7905..e2014a6e 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java
@@ -107,6 +107,9 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> {
 	 */
 	@Override
 	public void onSignal(final Signal signal, final InputPort<?> inputPort) {
+		this.logger.debug("Got signal: " + signal + " from input port: " + inputPort);
+		// System.out.println("Got signal: " + signal + " from input port: " + this.getClass().getSimpleName() + "." + inputPort);
+
 		switch (signal) {
 		case FINISHED:
 			this.onFinished();
@@ -116,11 +119,12 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> {
 			break;
 		}
 
-		this.getOutputPort().sendSignal(signal);
+		this.outputPort.sendSignal(signal);
 	}
 
 	protected void onFinished() {
 		// empty default implementation
+		this.onIsPipelineHead();
 	}
 
 	@Override
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/ConsumerStage.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/ConsumerStage.java
index e43c0f44..9f17e535 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/ConsumerStage.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/ConsumerStage.java
@@ -1,6 +1,5 @@
 package teetime.variant.methodcallWithPorts.framework.core;
 
-
 public abstract class ConsumerStage<I, O> extends AbstractStage<I, O> {
 
 	@Override
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/OutputPort.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/OutputPort.java
index 7500e4de..e95b94fa 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/OutputPort.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/OutputPort.java
@@ -35,7 +35,9 @@ public class OutputPort<T> {
 	}
 
 	public void sendSignal(final Signal signal) {
-		this.pipe.setSignal(signal);
+		if (this.pipe != null) { // if the output port is connected with a pipe
+			this.pipe.setSignal(signal);
+		}
 	}
 
 }
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Pipeline.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Pipeline.java
index 3a90f164..02fdaa01 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Pipeline.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Pipeline.java
@@ -68,25 +68,27 @@ public class Pipeline<I, O> implements StageWithPort<I, O> {
 		// headStage.sendFinishedSignalToAllSuccessorStages();
 
 		// this.updateRescheduable(headStage);
-	}
-
-	private final void updateRescheduable(final StageWithPort<?, ?> stage) {
-		StageWithPort<?, ?> currentStage = stage;
-		do {
-			this.firstStageIndex++;
-			// currentStage = currentStage.getOutputPort().getPipe().getTargetStage(); // FIXME what to do with a stage with more than one output port?
-			// if (currentStage == null) { // loop reaches the last stage
-			if (this.firstStageIndex == this.stages.length) { // loop reaches the last stage
-				this.setReschedulable(false);
-				this.cleanUp();
-				return;
-			}
-			currentStage = this.stages[this.firstStageIndex];
-			currentStage.onIsPipelineHead();
-		} while (!currentStage.isReschedulable());
 
-		this.setReschedulable(true);
-	}
+		// this.setReschedulable(headStage.isReschedulable());
+	}
+
+	// private final void updateRescheduable(final StageWithPort<?, ?> stage) {
+	// StageWithPort<?, ?> currentStage = stage;
+	// do {
+	// this.firstStageIndex++;
+	// // currentStage = currentStage.getOutputPort().getPipe().getTargetStage(); // FIXME what to do with a stage with more than one output port?
+	// // if (currentStage == null) { // loop reaches the last stage
+	// if (this.firstStageIndex == this.stages.length) { // loop reaches the last stage
+	// this.setReschedulable(false);
+	// this.cleanUp();
+	// return;
+	// }
+	// currentStage = this.stages[this.firstStageIndex];
+	// currentStage.onIsPipelineHead();
+	// } while (!currentStage.isReschedulable());
+	//
+	// this.setReschedulable(true);
+	// }
 
 	@Override
 	public void onIsPipelineHead() {
@@ -133,12 +135,13 @@ public class Pipeline<I, O> implements StageWithPort<I, O> {
 
 	@Override
 	public boolean isReschedulable() {
-		return this.reschedulable;
+		// return this.reschedulable;
+		return this.firstStage.isReschedulable();
 	}
 
-	public void setReschedulable(final boolean reschedulable) {
-		this.reschedulable = reschedulable;
-	}
+	// public void setReschedulable(final boolean reschedulable) {
+	// this.reschedulable = reschedulable;
+	// }
 
 	@Override
 	public InputPort<I> getInputPort() {
@@ -166,7 +169,7 @@ public class Pipeline<I, O> implements StageWithPort<I, O> {
 
 	@Override
 	public void onSignal(final Signal signal, final InputPort<?> inputPort) {
-		throw new IllegalStateException("Should not be used since the signal is directly passed via the first stage's input port.");
+		this.firstStage.onSignal(signal, inputPort);
 	}
 
 }
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/ProducerStage.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/ProducerStage.java
index f960b569..44646b80 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/ProducerStage.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/ProducerStage.java
@@ -1,6 +1,5 @@
 package teetime.variant.methodcallWithPorts.framework.core;
 
-
 public abstract class ProducerStage<I, O> extends AbstractStage<I, O> {
 
 	public ProducerStage() {
@@ -9,6 +8,10 @@ public abstract class ProducerStage<I, O> extends AbstractStage<I, O> {
 
 	@Override
 	public void executeWithPorts() {
+		// if (this.logger.isDebugEnabled()) {
+		// this.logger.debug("Executing stage...");
+		// }
+
 		this.execute5(null);
 
 		// if (!this.getOutputPort().pipe.isEmpty()) {
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/RunnableStage.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/RunnableStage.java
index 62b23637..cb1e9329 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/RunnableStage.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/RunnableStage.java
@@ -15,6 +15,8 @@ public class RunnableStage<I> implements Runnable {
 
 	@Override
 	public void run() {
+		this.logger.debug("Executing runnable stage...");
+
 		try {
 			this.stage.onStart();
 
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IntraThreadPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IntraThreadPipe.java
index 997665e1..59af28e5 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IntraThreadPipe.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IntraThreadPipe.java
@@ -6,7 +6,9 @@ public abstract class IntraThreadPipe<T> extends AbstractPipe<T> {
 
 	@Override
 	public void setSignal(final Signal signal) {
-		this.getTargetPort().getOwningStage().onSignal(signal, this.getTargetPort());
+		if (this.getTargetPort() != null) {
+			this.getTargetPort().getOwningStage().onSignal(signal, this.getTargetPort());
+		}
 	}
 
 }
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/CollectorSink.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/CollectorSink.java
index 84a138e7..5e643e68 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/stage/CollectorSink.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/CollectorSink.java
@@ -49,13 +49,14 @@ public class CollectorSink<T> extends ConsumerStage<T, Void> {
 	@Override
 	protected void execute5(final T element) {
 		this.elements.add(element);
+
 		if ((this.elements.size() % THRESHOLD) == 0) {
 			System.out.println("size: " + this.elements.size());
 		}
 
-		if (this.elements.size() > 90000) {
-			// System.out.println("size > 90000: " + this.elements.size());
-		}
+		// if (this.elements.size() > 90000) {
+		// // System.out.println("size > 90000: " + this.elements.size());
+		// }
 	}
 
 }
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/Distributor.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/Distributor.java
index 663b4d47..c700ebfa 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/stage/Distributor.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/Distributor.java
@@ -6,7 +6,9 @@ import java.util.List;
 import teetime.util.concurrent.spsc.Pow2;
 import teetime.util.list.CommittableQueue;
 import teetime.variant.methodcallWithPorts.framework.core.AbstractStage;
+import teetime.variant.methodcallWithPorts.framework.core.InputPort;
 import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
+import teetime.variant.methodcallWithPorts.framework.core.Signal;
 
 public final class Distributor<T> extends AbstractStage<T, T> {
 
@@ -50,6 +52,25 @@ public final class Distributor<T> extends AbstractStage<T, T> {
 		// this.outputPortList.clear();
 	}
 
+	@Override
+	public void onSignal(final Signal signal, final InputPort<?> inputPort) {
+		this.logger.debug("Got signal: " + signal + " from input port: " + inputPort);
+		// System.out.println("Got signal: " + signal + " from input port: " + this.getClass().getSimpleName() + "." + inputPort);
+
+		switch (signal) {
+		case FINISHED:
+			this.onFinished();
+			break;
+		default:
+			this.logger.warn("Aborted sending signal " + signal + ". Reason: Unknown signal.");
+			break;
+		}
+
+		for (OutputPort<?> op : this.outputPorts) {
+			op.sendSignal(signal);
+		}
+	}
+
 	@SuppressWarnings("unchecked")
 	@Override
 	public void onStart() {
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/ObjectProducer.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/ObjectProducer.java
index 61d001cd..b99078f9 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/stage/ObjectProducer.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/ObjectProducer.java
@@ -53,23 +53,6 @@ public class ObjectProducer<T> extends ProducerStage<Void, T> {
 		this.inputObjectCreator = inputObjectCreator;
 	}
 
-	// @Override
-	// protected void execute3() {
-	// if (this.numInputObjects == 0) {
-	// // this.getOutputPort().send((T) END_SIGNAL);
-	// return;
-	// }
-	//
-	// try {
-	// final T newObject = this.inputObjectCreator.call();
-	// this.numInputObjects--;
-	//
-	// // this.getOutputPort().send(newObject);
-	// } catch (final Exception e) {
-	// throw new IllegalStateException(e);
-	// }
-	// }
-
 	@Override
 	protected void execute4(final CommittableQueue<Void> elements) {
 		this.execute5(null);
@@ -77,6 +60,8 @@ public class ObjectProducer<T> extends ProducerStage<Void, T> {
 
 	@Override
 	protected void execute5(final Void element) {
+		// this.logger.debug("Executing object producer...");
+
 		T newObject = null;
 		newObject = this.inputObjectCreator.create();
 		this.numInputObjects--;
@@ -90,10 +75,4 @@ public class ObjectProducer<T> extends ProducerStage<Void, T> {
 		this.send(newObject);
 	}
 
-	// @Override
-	// public void onIsPipelineHead() {
-	// // this.getOutputPort().pipe = null; // no performance increase
-	// super.onIsPipelineHead();
-	// }
-
 }
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/Relay.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/Relay.java
index 55f3591e..98cc5b97 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/stage/Relay.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/Relay.java
@@ -7,7 +7,7 @@ import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
 
 public class Relay<T> extends AbstractStage<T, T> {
 
-	private SpScPipe<T> inputPipe;
+	private SpScPipe<T> cachedCastedInputPipe;
 
 	public Relay() {
 		this.setReschedulable(true);
@@ -18,7 +18,7 @@ public class Relay<T> extends AbstractStage<T, T> {
 		T element = this.getInputPort().receive();
 		if (null == element) {
 			// if (this.getInputPort().getPipe().isClosed()) {
-			if (this.inputPipe.getSignal() == Signal.FINISHED) {
+			if (this.cachedCastedInputPipe.getSignal() == Signal.FINISHED) {
 				this.setReschedulable(false);
 				assert 0 == this.getInputPort().getPipe().size();
 			}
@@ -30,7 +30,7 @@ public class Relay<T> extends AbstractStage<T, T> {
 
 	@Override
 	public void onStart() {
-		this.inputPipe = (SpScPipe<T>) this.getInputPort().getPipe();
+		this.cachedCastedInputPipe = (SpScPipe<T>) this.getInputPort().getPipe();
 		super.onStart();
 	}
 
diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment11/MethodCallThroughputAnalysis11.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment11/MethodCallThroughputAnalysis11.java
index ca600712..db3ae9e4 100644
--- a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment11/MethodCallThroughputAnalysis11.java
+++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment11/MethodCallThroughputAnalysis11.java
@@ -45,8 +45,8 @@ public class MethodCallThroughputAnalysis11 extends Analysis {
 	@Override
 	public void init() {
 		super.init();
-		Pipeline<?, ?> pipeline = this.buildPipeline(this.numInputObjects, this.inputObjectCreator);
-		this.runnable = new RunnableStage(pipeline);
+		Pipeline<Void, ?> pipeline = this.buildPipeline(this.numInputObjects, this.inputObjectCreator);
+		this.runnable = new RunnableStage<Void>(pipeline);
 	}
 
 	/**
-- 
GitLab