From 630507d1fbd97e63dbbca8690dc444187613056c Mon Sep 17 00:00:00 2001
From: Christian Wulf <chw@informatik.uni-kiel.de>
Date: Tue, 8 Jul 2014 20:39:55 +0200
Subject: [PATCH] added ChwHomeTcpTraceReadingTest

---
 .../ElementThroughputMeasuringStage.java      |  39 +++----
 .../examples/kiekerdays/TcpTraceLogging.java  |  39 ++-----
 .../ChwHomeTcpTraceReadingTest.java           |  79 ++++++++++++++
 .../TcpTraceLoggingExtAnalysis.java           | 102 ++++++++++++++++++
 ...ReconstructionAnalysisWithThreadsTest.java |   3 +-
 ...raceReconstructionAnalysisWithThreads.java |   3 +-
 .../TcpTraceReductionAnalysisWithThreads.java |   3 +-
 7 files changed, 217 insertions(+), 51 deletions(-)
 create mode 100644 src/test/java/teetime/variant/methodcallWithPorts/examples/traceReading/ChwHomeTcpTraceReadingTest.java
 create mode 100644 src/test/java/teetime/variant/methodcallWithPorts/examples/traceReading/TcpTraceLoggingExtAnalysis.java

diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/ElementThroughputMeasuringStage.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/ElementThroughputMeasuringStage.java
index 956e13a..9b742a0 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/stage/ElementThroughputMeasuringStage.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/ElementThroughputMeasuringStage.java
@@ -35,24 +35,27 @@ public class ElementThroughputMeasuringStage<T> extends ConsumerStage<T, T> {
 
 	private void computeElementThroughput(final Long timestampInNs) {
 		long diffInNs = timestampInNs - this.lastTimestampInNs;
-
-		// BETTER use the TimeUnit of the clock
-		long throughputPerTimeUnit = -1;
-
-		long diffInSec = TimeUnit.NANOSECONDS.toSeconds(diffInNs);
-		if (diffInSec > 0) {
-			throughputPerTimeUnit = this.numPassedElements / diffInSec;
-			this.logger.info("Throughput: " + throughputPerTimeUnit + " elements/s" + " -> numPassedElements=" + this.numPassedElements);
-		} else {
-			long diffInMs = TimeUnit.NANOSECONDS.toMillis(diffInNs);
-			if (diffInMs > 0) {
-				throughputPerTimeUnit = this.numPassedElements / diffInMs;
-				this.logger.info("Throughput: " + throughputPerTimeUnit + " elements/ms" + " -> numPassedElements=" + this.numPassedElements);
-
-			}
-		}
-
-		this.throughputs.add(throughputPerTimeUnit);
+		// the minimum time granularity of the clock is ms
+		long diffInMs = TimeUnit.NANOSECONDS.toMillis(diffInNs);
+		double throughputPerMs = (double) this.numPassedElements / diffInMs;
+		this.logger.info("Throughput: " + String.format("%.3f", throughputPerMs) + " elements/ms" + " -> numPassedElements=" + this.numPassedElements);
+
+		// long throughputPerTimeUnit = -1;
+		//
+		// long diffInSec = TimeUnit.NANOSECONDS.toSeconds(diffInNs);
+		// if (diffInSec > 0) {
+		// throughputPerTimeUnit = this.numPassedElements / diffInSec;
+		// this.logger.info("Throughput: " + throughputPerTimeUnit + " elements/s" + " -> numPassedElements=" + this.numPassedElements);
+		// } else {
+		// long diffInMs = TimeUnit.NANOSECONDS.toMillis(diffInNs);
+		// if (diffInMs > 0) {
+		// throughputPerTimeUnit = this.numPassedElements / diffInMs;
+		// this.logger.info("Throughput: " + throughputPerTimeUnit + " elements/ms" + " -> numPassedElements=" + this.numPassedElements);
+		//
+		// }
+		// }
+
+		this.throughputs.add((long) throughputPerMs);
 		this.resetTimestamp(timestampInNs);
 	}
 
diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceLogging.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceLogging.java
index cfa1d8a..3414649 100644
--- a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceLogging.java
+++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceLogging.java
@@ -1,8 +1,5 @@
 package teetime.variant.methodcallWithPorts.examples.kiekerdays;
 
-import java.util.LinkedList;
-import java.util.List;
-
 import teetime.variant.explicitScheduling.framework.core.Analysis;
 import teetime.variant.methodcallWithPorts.framework.core.Pipeline;
 import teetime.variant.methodcallWithPorts.framework.core.RunnableStage;
@@ -10,17 +7,12 @@ import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
 import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe;
 import teetime.variant.methodcallWithPorts.stage.EndStage;
 
-import kieker.analysis.plugin.filter.flow.TraceEventRecords;
 import kieker.common.record.IMonitoringRecord;
 
 public class TcpTraceLogging extends Analysis {
 
-	private final List<TraceEventRecords> elementCollection = new LinkedList<TraceEventRecords>();
-
 	private Thread tcpThread;
 
-	private int numWorkerThreads;
-
 	@Override
 	public void init() {
 		super.init();
@@ -28,19 +20,6 @@ public class TcpTraceLogging extends Analysis {
 		this.tcpThread = new Thread(new RunnableStage<Void>(tcpPipeline));
 	}
 
-	private StageWithPort<Void, IMonitoringRecord> buildTcpPipeline() {
-		TCPReaderSink tcpReader = new TCPReaderSink();
-		EndStage<IMonitoringRecord> endStage = new EndStage<IMonitoringRecord>();
-
-		SingleElementPipe.connect(tcpReader.getOutputPort(), endStage.getInputPort());
-
-		// create and configure pipeline
-		Pipeline<Void, IMonitoringRecord> pipeline = new Pipeline<Void, IMonitoringRecord>();
-		pipeline.setFirstStage(tcpReader);
-		pipeline.setLastStage(endStage);
-		return pipeline;
-	}
-
 	@Override
 	public void start() {
 		super.start();
@@ -54,21 +33,21 @@ public class TcpTraceLogging extends Analysis {
 		}
 	}
 
-	public List<TraceEventRecords> getElementCollection() {
-		return this.elementCollection;
-	}
+	private StageWithPort<Void, IMonitoringRecord> buildTcpPipeline() {
+		TCPReaderSink tcpReader = new TCPReaderSink();
+		EndStage<IMonitoringRecord> endStage = new EndStage<IMonitoringRecord>();
 
-	public int getNumWorkerThreads() {
-		return this.numWorkerThreads;
-	}
+		SingleElementPipe.connect(tcpReader.getOutputPort(), endStage.getInputPort());
 
-	public void setNumWorkerThreads(final int numWorkerThreads) {
-		this.numWorkerThreads = numWorkerThreads;
+		// create and configure pipeline
+		Pipeline<Void, IMonitoringRecord> pipeline = new Pipeline<Void, IMonitoringRecord>();
+		pipeline.setFirstStage(tcpReader);
+		pipeline.setLastStage(endStage);
+		return pipeline;
 	}
 
 	public static void main(final String[] args) {
 		final TcpTraceLogging analysis = new TcpTraceLogging();
-		analysis.setNumWorkerThreads(1);
 
 		analysis.init();
 		try {
diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReading/ChwHomeTcpTraceReadingTest.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReading/ChwHomeTcpTraceReadingTest.java
new file mode 100644
index 0000000..e3d18e3
--- /dev/null
+++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReading/ChwHomeTcpTraceReadingTest.java
@@ -0,0 +1,79 @@
+/***************************************************************************
+ * Copyright 2014 Kieker Project (http://kieker-monitoring.net)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ***************************************************************************/
+package teetime.variant.methodcallWithPorts.examples.traceReading;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.FixMethodOrder;
+import org.junit.Test;
+import org.junit.runners.MethodSorters;
+
+import teetime.util.ListUtil;
+import teetime.util.StatisticsUtil;
+import teetime.util.StopWatch;
+
+/**
+ * @author Christian Wulf
+ * 
+ * @since 1.10
+ */
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+public class ChwHomeTcpTraceReadingTest {
+
+	private static final int MIO = 1000000;
+	private static final int EXPECTED_NUM_TRACES = 10 * MIO;
+	private static final int EXPECTED_NUM_RECORDS = 21 * EXPECTED_NUM_TRACES + 1;
+
+	private StopWatch stopWatch;
+
+	@Before
+	public void before() {
+		this.stopWatch = new StopWatch();
+	}
+
+	@After
+	public void after() {
+		long overallDurationInNs = this.stopWatch.getDurationInNs();
+		System.out.println("Duration: " + TimeUnit.NANOSECONDS.toMillis(overallDurationInNs) + " ms");
+	}
+
+	@Test
+	public void performAnalysis() {
+		final TcpTraceLoggingExtAnalysis analysis = new TcpTraceLoggingExtAnalysis();
+		analysis.init();
+
+		this.stopWatch.start();
+		try {
+			analysis.start();
+		} finally {
+			this.stopWatch.end();
+			analysis.onTerminate();
+		}
+
+		List<Long> recordThroughputs = ListUtil.removeFirstHalfElements(analysis.getRecordThroughputs());
+		Map<Double, Long> recordQuintiles = StatisticsUtil.calculateQuintiles(recordThroughputs);
+		System.out.println("Median record throughput: " + recordQuintiles.get(0.5) + " records/time unit");
+
+		assertEquals("#records", EXPECTED_NUM_RECORDS, analysis.getNumRecords());
+	}
+
+}
diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReading/TcpTraceLoggingExtAnalysis.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReading/TcpTraceLoggingExtAnalysis.java
new file mode 100644
index 0000000..08a004e
--- /dev/null
+++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReading/TcpTraceLoggingExtAnalysis.java
@@ -0,0 +1,102 @@
+package teetime.variant.methodcallWithPorts.examples.traceReading;
+
+import java.util.List;
+
+import teetime.variant.explicitScheduling.framework.core.Analysis;
+import teetime.variant.methodcallWithPorts.framework.core.Pipeline;
+import teetime.variant.methodcallWithPorts.framework.core.RunnableStage;
+import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
+import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe;
+import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
+import teetime.variant.methodcallWithPorts.stage.Clock;
+import teetime.variant.methodcallWithPorts.stage.Counter;
+import teetime.variant.methodcallWithPorts.stage.ElementThroughputMeasuringStage;
+import teetime.variant.methodcallWithPorts.stage.EndStage;
+import teetime.variant.methodcallWithPorts.stage.basic.distributor.Distributor;
+import teetime.variant.methodcallWithPorts.stage.io.TCPReader;
+
+import kieker.common.record.IMonitoringRecord;
+
+public class TcpTraceLoggingExtAnalysis extends Analysis {
+
+	private static final int MIO = 1000000;
+	private static final int TCP_RELAY_MAX_SIZE = 2 * MIO;
+
+	private Thread clockThread;
+	private Thread tcpThread;
+
+	private Counter<IMonitoringRecord> recordCounter;
+	private ElementThroughputMeasuringStage<IMonitoringRecord> recordThroughputStage;
+
+	private StageWithPort<Void, Long> buildClockPipeline(final long intervalDelayInMs) {
+		Clock clockStage = new Clock();
+		clockStage.setInitialDelayInMs(intervalDelayInMs);
+		clockStage.setIntervalDelayInMs(intervalDelayInMs);
+		Distributor<Long> distributor = new Distributor<Long>();
+
+		SingleElementPipe.connect(clockStage.getOutputPort(), distributor.getInputPort());
+
+		// create and configure pipeline
+		Pipeline<Void, Long> pipeline = new Pipeline<Void, Long>();
+		pipeline.setFirstStage(clockStage);
+		pipeline.setLastStage(distributor);
+		return pipeline;
+	}
+
+	private StageWithPort<Void, IMonitoringRecord> buildTcpPipeline(final StageWithPort<Void, Long> clockPipeline) {
+		TCPReader tcpReader = new TCPReader();
+		this.recordCounter = new Counter<IMonitoringRecord>();
+		this.recordThroughputStage = new ElementThroughputMeasuringStage<IMonitoringRecord>();
+		EndStage<IMonitoringRecord> endStage = new EndStage<IMonitoringRecord>();
+
+		SingleElementPipe.connect(tcpReader.getOutputPort(), this.recordCounter.getInputPort());
+		SingleElementPipe.connect(this.recordCounter.getOutputPort(), this.recordThroughputStage.getInputPort());
+		SingleElementPipe.connect(this.recordThroughputStage.getOutputPort(), endStage.getInputPort());
+
+		SpScPipe.connect(clockPipeline.getOutputPort(), this.recordThroughputStage.getTriggerInputPort(), TCP_RELAY_MAX_SIZE);
+
+		// create and configure pipeline
+		Pipeline<Void, IMonitoringRecord> pipeline = new Pipeline<Void, IMonitoringRecord>();
+		pipeline.setFirstStage(tcpReader);
+		pipeline.addIntermediateStage(this.recordCounter);
+		pipeline.addIntermediateStage(this.recordThroughputStage);
+		pipeline.setLastStage(endStage);
+		return pipeline;
+	}
+
+	@Override
+	public void init() {
+		super.init();
+
+		StageWithPort<Void, Long> clockPipeline = this.buildClockPipeline(1000);
+		this.clockThread = new Thread(new RunnableStage<Void>(clockPipeline));
+
+		StageWithPort<Void, IMonitoringRecord> tcpPipeline = this.buildTcpPipeline(clockPipeline);
+		this.tcpThread = new Thread(new RunnableStage<Void>(tcpPipeline));
+	}
+
+	@Override
+	public void start() {
+		super.start();
+
+		this.tcpThread.start();
+		this.clockThread.start();
+
+		try {
+			this.tcpThread.join();
+		} catch (InterruptedException e) {
+			throw new IllegalStateException(e);
+		}
+
+		this.clockThread.interrupt();
+	}
+
+	public int getNumRecords() {
+		return this.recordCounter.getNumElementsPassed();
+	}
+
+	public List<Long> getRecordThroughputs() {
+		return this.recordThroughputStage.getThroughputs();
+	}
+
+}
diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest.java
index db80710..147bed6 100644
--- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest.java
+++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest.java
@@ -42,7 +42,8 @@ import kieker.common.record.IMonitoringRecord;
 @FixMethodOrder(MethodSorters.NAME_ASCENDING)
 public class ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest {
 
-	private static final int EXPECTED_NUM_TRACES = 1000000;
+	private static final int MIO = 1000000;
+	private static final int EXPECTED_NUM_TRACES = 1 * MIO;
 
 	private StopWatch stopWatch;
 
diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreads.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreads.java
index 20a52ac..1c59cc3 100644
--- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreads.java
+++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreads.java
@@ -34,7 +34,8 @@ import kieker.common.record.flow.trace.TraceMetadata;
 public class TcpTraceReconstructionAnalysisWithThreads extends Analysis {
 
 	private static final int NUM_VIRTUAL_CORES = Runtime.getRuntime().availableProcessors();
-	private static final int TCP_RELAY_MAX_SIZE = 2000000;
+	private static final int MIO = 1000000;
+	private static final int TCP_RELAY_MAX_SIZE = 2 * MIO;
 
 	private final List<TraceEventRecords> elementCollection = new LinkedList<TraceEventRecords>();
 
diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReductionWithThreads/TcpTraceReductionAnalysisWithThreads.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReductionWithThreads/TcpTraceReductionAnalysisWithThreads.java
index c93ffd3..6c1df93 100644
--- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReductionWithThreads/TcpTraceReductionAnalysisWithThreads.java
+++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReductionWithThreads/TcpTraceReductionAnalysisWithThreads.java
@@ -39,7 +39,8 @@ import kieker.common.record.flow.trace.TraceMetadata;
 public class TcpTraceReductionAnalysisWithThreads extends Analysis {
 
 	private static final int NUM_VIRTUAL_CORES = Runtime.getRuntime().availableProcessors();
-	private static final int TCP_RELAY_MAX_SIZE = 500000;
+	private static final int MIO = 1000000;
+	private static final int TCP_RELAY_MAX_SIZE = (int) (0.5 * MIO);
 
 	private final List<TraceEventRecords> elementCollection = new LinkedList<TraceEventRecords>();
 
-- 
GitLab