From b106588e3fb76287ce89a16ea843968148d6edad Mon Sep 17 00:00:00 2001
From: Christian Wulf <chw@informatik.uni-kiel.de>
Date: Tue, 3 Jun 2014 22:29:32 +0200
Subject: [PATCH] added performance analysis code in CollectorSink

---
 .../framework/concurrent/WorkerThread.java    | 16 +++++++--------
 .../java/teetime/framework/core/Context.java  | 19 +++++++-----------
 .../java/teetime/stage/CollectorSink.java     | 14 ++++++++++++-
 .../ThroughputTimestampAnalysisTest.java      | 20 ++-----------------
 4 files changed, 30 insertions(+), 39 deletions(-)

diff --git a/src/main/java/teetime/framework/concurrent/WorkerThread.java b/src/main/java/teetime/framework/concurrent/WorkerThread.java
index 61181fc1..ceb04647 100644
--- a/src/main/java/teetime/framework/concurrent/WorkerThread.java
+++ b/src/main/java/teetime/framework/concurrent/WorkerThread.java
@@ -64,7 +64,7 @@ public class WorkerThread extends Thread {
 
 		while (this.stageScheduler.isAnyStageActive()) {
 			iterations++;
-//			this.iterationStopWatch.start();
+			this.iterationStopWatch.start();
 
 			final IStage stage = this.stageScheduler.get();
 
@@ -77,13 +77,13 @@ public class WorkerThread extends Thread {
 			}
 			this.stageScheduler.determineNextStage(stage, executedSuccessfully);
 
-//			this.iterationStopWatch.end();
-//			final long schedulingOverhead = this.iterationStopWatch.getDurationInNs() - stage.getLastDuration();
-//			schedulingOverheadInNs += schedulingOverhead;
-//			if ((iterations % 10000) == 0) {
-//				this.schedulingOverheadsInNs.add(schedulingOverheadInNs);
-//				schedulingOverheadInNs = 0;
-//			}
+			this.iterationStopWatch.end();
+			final long schedulingOverhead = this.iterationStopWatch.getDurationInNs() - stage.getLastDuration();
+			schedulingOverheadInNs += schedulingOverhead;
+			if ((iterations % 10000) == 0) {
+				this.schedulingOverheadsInNs.add(schedulingOverheadInNs);
+				schedulingOverheadInNs = 0;
+			}
 		}
 
 		this.stopWatch.end();
diff --git a/src/main/java/teetime/framework/core/Context.java b/src/main/java/teetime/framework/core/Context.java
index e4990444..f0fc5450 100644
--- a/src/main/java/teetime/framework/core/Context.java
+++ b/src/main/java/teetime/framework/core/Context.java
@@ -5,12 +5,9 @@ import java.util.List;
 
 public class Context<S extends IStage> {
 
-	// private final Map<IPipe<Object>, List<Object>> pipesTakenFrom;
-	// private final Set<IStage> pipesPutTo = new HashSet<IStage>();
-
 	/**
 	 * @author Christian Wulf
-	 * 
+	 *
 	 * @since 1.10
 	 */
 	private static class InputPortContainer {
@@ -24,12 +21,11 @@ public class Context<S extends IStage> {
 	private final IOutputPort<S, ?>[] outputPorts;
 
 	// statistics values
-	private int numPushedElements = 0;
-	private int numTakenElements = 0;
+	private long numPushedElements = 0;
+	private long numTakenElements = 0;
 
 	@SuppressWarnings("unchecked")
 	public Context(final IStage owningStage, final List<IInputPort<S, ?>> allTargetPorts) {
-		// this.pipesTakenFrom = this.createPipeMap(allTargetPorts);
 		this.inputPortContainers = this.createInputPortLists(owningStage.getInputPorts());
 		this.outputPorts = new IOutputPort[owningStage.getOutputPorts().size()];
 	}
@@ -56,12 +52,11 @@ public class Context<S extends IStage> {
 		associatedPipe.put(object);
 
 		this.outputPorts[port.getIndex()] = port;
-		// this.pipesPutTo.add(associatedPipe.getTargetPort().getOwningStage());
 		this.numPushedElements++;
 	}
 
 	/**
-	 * 
+	 *
 	 * @param inputPort
 	 * @return
 	 * @since 1.10
@@ -76,7 +71,7 @@ public class Context<S extends IStage> {
 	}
 
 	/**
-	 * 
+	 *
 	 * @param inputPort
 	 * @return
 	 * @since 1.10
@@ -99,10 +94,10 @@ public class Context<S extends IStage> {
 	}
 
 	/**
-	 * 
+	 *
 	 * @param inputPort
 	 * @return
-	 * 
+	 *
 	 * @since 1.10
 	 */
 	public <T> T read(final IInputPort<S, T> inputPort) {
diff --git a/src/main/java/teetime/stage/CollectorSink.java b/src/main/java/teetime/stage/CollectorSink.java
index 69206195..82124438 100644
--- a/src/main/java/teetime/stage/CollectorSink.java
+++ b/src/main/java/teetime/stage/CollectorSink.java
@@ -16,14 +16,16 @@
 package teetime.stage;
 
 import java.util.Collection;
+import java.util.concurrent.TimeUnit;
 
 import teetime.framework.core.AbstractFilter;
 import teetime.framework.core.Context;
 import teetime.framework.core.IInputPort;
+import teetime.util.StopWatch;
 
 /**
  * @author Christian Wulf
- * 
+ *
  * @since 1.10
  */
 public class CollectorSink<T> extends AbstractFilter<CollectorSink<T>> {
@@ -51,10 +53,20 @@ public class CollectorSink<T> extends AbstractFilter<CollectorSink<T>> {
 		this.objects.add(object);
 		if ((this.objects.size() % THRESHOLD) == 0) {
 			System.out.println("size: " + this.objects.size());
+			stopWatch.end();
+			System.out.println("duration: "+TimeUnit.NANOSECONDS.toMillis(stopWatch.getDurationInNs())+" ms");
+			stopWatch.start();
 		}
 
 		return true;
 	}
+StopWatch stopWatch=new StopWatch();
+
+	@Override
+	public void onPipelineStarts() throws Exception {
+		stopWatch.start();
+		super.onPipelineStarts();
+	}
 
 	public Collection<T> getObjects() {
 		return this.objects;
diff --git a/src/test/java/teetime/examples/throughput/ThroughputTimestampAnalysisTest.java b/src/test/java/teetime/examples/throughput/ThroughputTimestampAnalysisTest.java
index d134990f..b6f1f8a3 100644
--- a/src/test/java/teetime/examples/throughput/ThroughputTimestampAnalysisTest.java
+++ b/src/test/java/teetime/examples/throughput/ThroughputTimestampAnalysisTest.java
@@ -34,29 +34,13 @@ import teetime.util.StopWatch;
  */
 public class ThroughputTimestampAnalysisTest {
 
-	private static final int NUM_OBJECTS_TO_CREATE = 50000;
+	private static final int NUM_OBJECTS_TO_CREATE = 100000;
 
 	@Before
 	public void before() {
 		System.setProperty(LogFactory.CUSTOM_LOGGER_JVM, "NONE");
 	}
 
-	// Using QueuePipes ist 1/3 faster than using MethodCallPipes
-	// reason:
-	/*
-	 * MethodCallPipes:
-	 * <ul>
-	 * <li>SchedulingOverhead: 12629 ms
-	 * <li>ExecutedUnsuccessfullyCount: 80300001
-	 * </ul>
-	 *
-	 * QueuePipes:
-	 * <ul>
-	 * <li>SchedulingOverhead: 11337 ms
-	 * <li>ExecutedUnsuccessfullyCount: 804
-	 * </ul>
-	 */
-
 	@Test
 	public void testWithManyObjects() {
 		final StopWatch stopWatch = new StopWatch();
@@ -64,7 +48,7 @@ public class ThroughputTimestampAnalysisTest {
 
 		final ThroughputTimestampAnalysis analysis = new ThroughputTimestampAnalysis();
 		analysis.setShouldUseQueue(true);
-		analysis.setNumNoopFilters(800);
+		analysis.setNumNoopFilters(8);
 		analysis.setTimestampObjects(timestampObjects);
 		analysis.setInput(NUM_OBJECTS_TO_CREATE, new Callable<TimestampObject>() {
 			@Override
-- 
GitLab