From 3620961a6e5f6aebe87c49570a033b8fbc1640ba Mon Sep 17 00:00:00 2001
From: Christian Wulf <chw@informatik.uni-kiel.de>
Date: Tue, 8 Jul 2014 09:00:41 +0200
Subject: [PATCH] replaced logging in SpScPipe by #waits

---
 pom.xml                                       |  2 +-
 .../framework/core/AbstractStage.java         | 15 +++++++++-----
 .../framework/core/OutputPort.java            |  9 +++++++--
 .../framework/core/pipe/IPipe.java            |  2 +-
 .../core/pipe/OrderedGrowableArrayPipe.java   |  3 ++-
 .../core/pipe/OrderedGrowablePipe.java        |  4 ++--
 .../framework/core/pipe/Pipe.java             |  3 ++-
 .../core/pipe/SingleElementPipe.java          |  3 ++-
 .../framework/core/pipe/SpScPipe.java         | 20 +++++++++++++------
 .../core/pipe/UnorderedGrowablePipe.java      |  3 ++-
 .../basic/distributor/RoundRobinStrategy.java |  1 +
 .../stage/io/TCPReader.java                   |  2 +-
 .../kiekerdays/TcpTraceReconstruction.java    |  6 +++---
 .../kiekerdays/TcpTraceReduction.java         |  6 +++---
 ...ReconstructionAnalysisWithThreadsTest.java |  6 +++---
 ...TraceReductionAnalysisWithThreadsTest.java |  2 +-
 16 files changed, 55 insertions(+), 32 deletions(-)

diff --git a/pom.xml b/pom.xml
index 1a5601f8..e0c85b35 100644
--- a/pom.xml
+++ b/pom.xml
@@ -57,7 +57,7 @@
 
 	<build>
 		<plugins>
-			<!-- we want JDK 1.6 source and binary compatiblility -->
+			<!-- we want JDK 1.6 source and binary compatibility -->
 			<plugin>
 				<groupId>org.apache.maven.plugins</groupId>
 				<artifactId>maven-compiler-plugin</artifactId>
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java
index 49a4cbde..e3834bcd 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java
@@ -44,16 +44,19 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> {
 	protected abstract void execute5(I element);
 
 	/**
-	 * Sends the <code>element</code> using the default output port
+	 * Sends the given <code>element</code> using the default output port
 	 * 
 	 * @param element
+	 * @return <code>true</code> iff the given element could be sent, <code>false</code> otherwise (then use a re-try strategy)
 	 */
-	protected final void send(final O element) {
-		this.send(this.getOutputPort(), element);
+	protected final boolean send(final O element) {
+		return this.send(this.getOutputPort(), element);
 	}
 
-	protected final void send(final OutputPort<O> outputPort, final O element) {
-		outputPort.send(element);
+	protected final boolean send(final OutputPort<O> outputPort, final O element) {
+		if (!outputPort.send(element)) {
+			return false;
+		}
 
 		// StageWithPort<?, ?> next = outputPort.getPipe().getTargetPort().getOwningStage();
 		StageWithPort<?, ?> next = outputPort.getCachedTargetStage();
@@ -61,6 +64,8 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> {
 		do {
 			next.executeWithPorts(); // PERFORMANCE use the return value as indicator for re-schedulability instead
 		} while (next.isReschedulable());
+
+		return true;
 	}
 
 	// public void disable() {
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/OutputPort.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/OutputPort.java
index e95b94fa..6d5e197f 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/OutputPort.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/OutputPort.java
@@ -14,8 +14,13 @@ public class OutputPort<T> {
 	 */
 	private StageWithPort<?, ?> cachedTargetStage;
 
-	public void send(final T element) {
-		this.pipe.add(element);
+	/**
+	 * 
+	 * @param element
+	 * @return <code>true</code> iff the given <code>element</code> could be sent, <code>false</code> otherwise (then use a re-try strategy)
+	 */
+	public boolean send(final T element) {
+		return this.pipe.add(element);
 	}
 
 	public IPipe<T> getPipe() {
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IPipe.java
index 4b94be69..e09d4fe8 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IPipe.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IPipe.java
@@ -5,7 +5,7 @@ import teetime.variant.methodcallWithPorts.framework.core.Signal;
 
 public interface IPipe<T> {
 
-	void add(T element);
+	boolean add(T element);
 
 	T removeLast();
 
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowableArrayPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowableArrayPipe.java
index fc3d224a..d289f4c8 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowableArrayPipe.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowableArrayPipe.java
@@ -26,8 +26,9 @@ public class OrderedGrowableArrayPipe<T> extends IntraThreadPipe<T> {
 	}
 
 	@Override
-	public void add(final T element) {
+	public boolean add(final T element) {
 		this.elements.put(this.tail++, element);
+		return true;
 	}
 
 	@Override
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowablePipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowablePipe.java
index eb92d943..a69db373 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowablePipe.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowablePipe.java
@@ -25,8 +25,8 @@ public class OrderedGrowablePipe<T> extends IntraThreadPipe<T> {
 	}
 
 	@Override
-	public void add(final T element) {
-		this.elements.offer(element);
+	public boolean add(final T element) {
+		return this.elements.offer(element);
 	}
 
 	@Override
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/Pipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/Pipe.java
index eda193f3..8d8053f6 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/Pipe.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/Pipe.java
@@ -21,9 +21,10 @@ public class Pipe<T> extends IntraThreadPipe<T> {
 	 * @see teetime.examples.throughput.methodcall.IPipe#add(T)
 	 */
 	@Override
-	public void add(final T element) {
+	public boolean add(final T element) {
 		this.elements.addToTailUncommitted(element);
 		this.elements.commit();
+		return true;
 	}
 
 	/*
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipe.java
index c3581c1a..c5e02be9 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipe.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipe.java
@@ -15,8 +15,9 @@ public class SingleElementPipe<T> extends IntraThreadPipe<T> {
 	}
 
 	@Override
-	public void add(final T element) {
+	public boolean add(final T element) {
 		this.element = element;
+		return true;
 	}
 
 	@Override
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java
index 836af47b..65143e7f 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java
@@ -15,8 +15,9 @@ import teetime.variant.methodcallWithPorts.framework.core.Signal;
 public class SpScPipe<T> extends AbstractPipe<T> {
 
 	private final Queue<T> queue;
-	private int maxSize;
 	private final AtomicReference<Signal> signal = new AtomicReference<Signal>();
+	// statistics
+	private int numWaits;
 
 	public SpScPipe(final int capacity) {
 		ConcurrentQueueSpec concurrentQueueSpec = new ConcurrentQueueSpec(1, 1, capacity, Ordering.FIFO, Preference.THROUGHPUT);
@@ -32,9 +33,16 @@ public class SpScPipe<T> extends AbstractPipe<T> {
 	}
 
 	@Override
-	public void add(final T element) {
-		this.queue.offer(element);
-		this.maxSize = Math.max(this.queue.size(), this.maxSize);
+	public boolean add(final T element) {
+		// this.maxSize = Math.max(this.queue.size(), this.maxSize);
+
+		// BETTER introduce a QueueIsFullStrategy
+		while (!this.queue.offer(element)) {
+			this.numWaits++;
+			Thread.yield();
+		}
+
+		return true;
 	}
 
 	@Override
@@ -58,8 +66,8 @@ public class SpScPipe<T> extends AbstractPipe<T> {
 	}
 
 	// BETTER find a solution w/o any thread-safe code in this stage
-	public synchronized int getMaxSize() {
-		return this.maxSize;
+	public synchronized int getNumWaits() {
+		return this.numWaits;
 	}
 
 	@Override
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/UnorderedGrowablePipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/UnorderedGrowablePipe.java
index 1ead0076..2b908268 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/UnorderedGrowablePipe.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/UnorderedGrowablePipe.java
@@ -25,13 +25,14 @@ public class UnorderedGrowablePipe<T> extends IntraThreadPipe<T> {
 	}
 
 	@Override
-	public void add(final T element) {
+	public boolean add(final T element) {
 		if (this.lastFreeIndex == this.elements.length) {
 			// if (this.lastFreeIndex == this.elements.getCapacity()) {
 			this.elements = this.grow();
 		}
 		this.elements[this.lastFreeIndex++] = element;
 		// this.elements.put(this.lastFreeIndex++, element);
+		return true;
 	}
 
 	@Override
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/distributor/RoundRobinStrategy.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/distributor/RoundRobinStrategy.java
index 13b27fa3..0a3dfb78 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/distributor/RoundRobinStrategy.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/distributor/RoundRobinStrategy.java
@@ -31,6 +31,7 @@ public final class RoundRobinStrategy<T> implements IDistributorStrategy<T> {
 	@Override
 	public boolean distribute(final List<OutputPort<T>> outputPorts, final T element) {
 		final OutputPort<T> outputPort = this.getNextPortInRoundRobinOrder(outputPorts);
+
 		outputPort.send(element);
 
 		return true;
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/io/TCPReader.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/io/TCPReader.java
index a5d73561..99626076 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/stage/io/TCPReader.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/io/TCPReader.java
@@ -151,7 +151,7 @@ public class TCPReader extends ProducerStage<Void, IMonitoringRecord> {
 						final long loggingTimestamp = buffer.getLong();
 						final IMonitoringRecord record;
 						try { // NOCS (Nested try-catch)
-						// record = this.recordFactory.create(clazzid, buffer, this.stringRegistry);
+								// record = this.recordFactory.create(clazzid, buffer, this.stringRegistry);
 							record = AbstractMonitoringRecord.createFromByteBuffer(clazzid, buffer, this.stringRegistry);
 							record.setLoggingTimestamp(loggingTimestamp);
 							this.send(record);
diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReconstruction.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReconstruction.java
index 575a3549..6328a5d7 100644
--- a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReconstruction.java
+++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReconstruction.java
@@ -114,11 +114,11 @@ public class TcpTraceReconstruction extends Analysis {
 
 	@Override
 	public void onTerminate() {
-		int maxSize = 0;
+		int maxNumWaits = 0;
 		for (SpScPipe<IMonitoringRecord> pipe : this.tcpRelayPipes) {
-			maxSize = Math.max(maxSize, pipe.getMaxSize());
+			maxNumWaits = Math.max(maxNumWaits, pipe.getNumWaits());
 		}
-		System.out.println("max size of TcpRelayPipes: " + maxSize);
+		System.out.println("max #waits of TcpRelayPipes: " + maxNumWaits);
 		super.onTerminate();
 	}
 
diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReduction.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReduction.java
index 16655759..d73c8032 100644
--- a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReduction.java
+++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReduction.java
@@ -148,11 +148,11 @@ public class TcpTraceReduction extends Analysis {
 
 	@Override
 	public void onTerminate() {
-		int maxSize = 0;
+		int maxNumWaits = 0;
 		for (SpScPipe<IMonitoringRecord> pipe : this.tcpRelayPipes) {
-			maxSize = Math.max(maxSize, pipe.getMaxSize());
+			maxNumWaits = Math.max(maxNumWaits, pipe.getNumWaits());
 		}
-		System.out.println("max size of TcpRelayPipes: " + maxSize);
+		System.out.println("max #waits of TcpRelayPipes: " + maxNumWaits);
 		super.onTerminate();
 	}
 
diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest.java
index bf0f636f..db807107 100644
--- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest.java
+++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest.java
@@ -99,11 +99,11 @@ public class ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest {
 			analysis.onTerminate();
 		}
 
-		int maxSize = 0;
+		int maxNumWaits = 0;
 		for (SpScPipe<IMonitoringRecord> pipe : analysis.getTcpRelayPipes()) {
-			maxSize = Math.max(maxSize, pipe.getMaxSize());
+			maxNumWaits = Math.max(maxNumWaits, pipe.getNumWaits());
 		}
-		System.out.println("Max size of tcp-relay pipe: " + maxSize);
+		System.out.println("max #waits of TcpRelayPipes: " + maxNumWaits);
 
 		// System.out.println("#traceMetadata read: " + analysis.getNumTraceMetadatas());
 		// System.out.println("Max #trace created: " + analysis.getMaxElementsCreated());
diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReductionWithThreads/ChwWorkTcpTraceReductionAnalysisWithThreadsTest.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReductionWithThreads/ChwWorkTcpTraceReductionAnalysisWithThreadsTest.java
index 61907ac6..d5bd3c9c 100644
--- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReductionWithThreads/ChwWorkTcpTraceReductionAnalysisWithThreadsTest.java
+++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReductionWithThreads/ChwWorkTcpTraceReductionAnalysisWithThreadsTest.java
@@ -83,7 +83,7 @@ public class ChwWorkTcpTraceReductionAnalysisWithThreadsTest {
 			analysis.onTerminate();
 		}
 
-		System.out.println("Max size of tcp-relay pipe: " + analysis.getTcpRelayPipe().getMaxSize());
+		System.out.println("#waits of tcp-relay pipe: " + analysis.getTcpRelayPipe().getNumWaits());
 		// System.out.println("#traceMetadata read: " + analysis.getNumTraceMetadatas());
 		// System.out.println("Max #trace created: " + analysis.getMaxElementsCreated());
 		System.out.println("TraceThroughputs: " + analysis.getTraceThroughputs());
-- 
GitLab