From 723a3ae42e1ef6bd9c9a6e359ff6e62023efd312 Mon Sep 17 00:00:00 2001
From: Christian Wulf <chw@informatik.uni-kiel.de>
Date: Fri, 27 Jun 2014 05:26:16 +0200
Subject: [PATCH] upgraded to kieker 1.10-snapshot; added logs and analysis for
 eprints, kieker, kieker2

---
 .classpath                                    |  4 +-
 pom.xml                                       | 85 ++++++++++---------
 .../java/teetime/util/StatisticsUtil.java     |  4 -
 .../methodcallWithPorts/stage/Clock.java      |  2 +-
 .../stage/ThroughputFilter.java               | 14 +--
 .../TraceReconstructionAnalysis.java          | 49 ++++++-----
 .../TraceReconstructionAnalysisTest.java      | 65 +++++++++++++-
 7 files changed, 149 insertions(+), 74 deletions(-)

diff --git a/.classpath b/.classpath
index c9522874..21206d20 100644
--- a/.classpath
+++ b/.classpath
@@ -13,12 +13,12 @@
 		</attributes>
 	</classpathentry>
 	<classpathentry kind="src" path="conf"/>
-	<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.6">
+	<classpathentry kind="con" path="org.eclipse.m2e.MAVEN2_CLASSPATH_CONTAINER">
 		<attributes>
 			<attribute name="maven.pomderived" value="true"/>
 		</attributes>
 	</classpathentry>
-	<classpathentry kind="con" path="org.eclipse.m2e.MAVEN2_CLASSPATH_CONTAINER">
+	<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.6">
 		<attributes>
 			<attribute name="maven.pomderived" value="true"/>
 		</attributes>
diff --git a/pom.xml b/pom.xml
index 1edcb345..c2c4cc67 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1,45 +1,52 @@
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <modelVersion>4.0.0</modelVersion>
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
 
-  <groupId>net.sourceforge.teetime</groupId>
-  <artifactId>teetime</artifactId>
-  <version>1.0-SNAPSHOT</version>
-  <packaging>jar</packaging>
+	<groupId>net.sourceforge.teetime</groupId>
+	<artifactId>teetime</artifactId>
+	<version>1.0-SNAPSHOT</version>
+	<packaging>jar</packaging>
 
-  <name>teetime</name>
-  <url>http://maven.apache.org</url>
+	<name>teetime</name>
+	<url>http://maven.apache.org</url>
 
-  <properties>
-    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-  </properties>
+	<properties>
+		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+	</properties>
 
-  <dependencies>
-    <dependency>
-      <groupId>junit</groupId>
-      <artifactId>junit</artifactId>
-      <version>4.11</version>
-      <scope>test</scope>
-    </dependency>
-	<dependency>
-		<groupId>org.hamcrest</groupId>
-		<artifactId>hamcrest-core</artifactId>
-		<version>1.3</version>
-	</dependency>
-	<dependency>
-		<groupId>org.hamcrest</groupId>
-		<artifactId>hamcrest-library</artifactId>
-		<version>1.3</version>
-	</dependency>
-	<dependency>
-      <groupId>net.kieker-monitoring</groupId>
-      <artifactId>kieker</artifactId>
-      <version>1.9</version>
-    </dependency>
-	<dependency>
-		<groupId>com.google.guava</groupId>
-		<artifactId>guava</artifactId>
-		<version>17.0</version>
-	</dependency>
-  </dependencies>
+	<repositories>
+		<repository>
+			<id>sonatype.oss.snapshots</id>
+			<url>https://oss.sonatype.org/content/repositories/snapshots/</url>
+		</repository>
+	</repositories>
+
+	<dependencies>
+		<dependency>
+			<groupId>junit</groupId>
+			<artifactId>junit</artifactId>
+			<version>4.11</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.hamcrest</groupId>
+			<artifactId>hamcrest-core</artifactId>
+			<version>1.3</version>
+		</dependency>
+		<dependency>
+			<groupId>org.hamcrest</groupId>
+			<artifactId>hamcrest-library</artifactId>
+			<version>1.3</version>
+		</dependency>
+		<dependency>
+			<groupId>net.kieker-monitoring</groupId>
+			<artifactId>kieker</artifactId>
+			<version>1.10-SNAPSHOT</version>
+		</dependency>
+		<dependency>
+			<groupId>com.google.guava</groupId>
+			<artifactId>guava</artifactId>
+			<version>17.0</version>
+		</dependency>
+	</dependencies>
 </project>
diff --git a/src/main/java/teetime/util/StatisticsUtil.java b/src/main/java/teetime/util/StatisticsUtil.java
index 38788e19..69655706 100644
--- a/src/main/java/teetime/util/StatisticsUtil.java
+++ b/src/main/java/teetime/util/StatisticsUtil.java
@@ -40,10 +40,6 @@ public class StatisticsUtil {
 		// utility class
 	}
 
-	public static void calculateAvg(final List<Long> durations) {
-
-	}
-
 	public static PerformanceResult printStatistics(final long overallDurationInNs, final List<TimestampObject> timestampObjects) {
 		PerformanceResult performanceResult = new PerformanceResult();
 
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/Clock.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/Clock.java
index 70ba4d7a..1b7186d6 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/stage/Clock.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/Clock.java
@@ -26,7 +26,7 @@ public class Clock extends ProducerStage<Void, Long> {
 		}
 
 		// System.out.println("Emitting timestamp");
-		this.send(this.getCurrentTimeInNs());
+		this.getOutputPort().send(this.getCurrentTimeInNs());
 	}
 
 	private void sleep(final long delayInMs) {
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/ThroughputFilter.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/ThroughputFilter.java
index 91861108..3296c6ff 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/stage/ThroughputFilter.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/ThroughputFilter.java
@@ -2,6 +2,7 @@ package teetime.variant.methodcallWithPorts.stage;
 
 import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage;
 import teetime.variant.methodcallWithPorts.framework.core.InputPort;
@@ -17,8 +18,8 @@ public class ThroughputFilter<T> extends ConsumerStage<T, T> {
 
 	@Override
 	protected void execute5(final T element) {
-		Long trigger = this.triggerInputPort.receive();
-		if (trigger != null) {
+		Long timestampInNs = this.triggerInputPort.receive();
+		if (timestampInNs != null) {
 			this.computeThroughput();
 			this.resetTimestamp();
 		}
@@ -34,9 +35,10 @@ public class ThroughputFilter<T> extends ConsumerStage<T, T> {
 
 	private void computeThroughput() {
 		long diffInNs = System.nanoTime() - this.timestamp;
-		long throughput = this.numPassedElements / diffInNs;
-		// this.throughputs.add(throughput);
-		this.logger.info("Throughput: " + throughput + " ns");
+		long diffInMs = TimeUnit.NANOSECONDS.toMillis(diffInNs);
+		long throughputPerMs = this.numPassedElements / diffInMs;
+		this.throughputs.add(throughputPerMs);
+		// this.logger.info("Throughput: " + throughputPerMs + " elements/ms");
 	}
 
 	private void resetTimestamp() {
@@ -49,7 +51,7 @@ public class ThroughputFilter<T> extends ConsumerStage<T, T> {
 	}
 
 	public InputPort<Long> getTriggerInputPort() {
-		return triggerInputPort;
+		return this.triggerInputPort;
 	}
 
 }
diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TraceReconstructionAnalysis.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TraceReconstructionAnalysis.java
index c200515b..65b9a562 100644
--- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TraceReconstructionAnalysis.java
+++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TraceReconstructionAnalysis.java
@@ -7,13 +7,13 @@ import java.util.List;
 import teetime.variant.explicitScheduling.framework.core.Analysis;
 import teetime.variant.methodcallWithPorts.framework.core.Pipeline;
 import teetime.variant.methodcallWithPorts.framework.core.RunnableStage;
+import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
 import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe;
 import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
 import teetime.variant.methodcallWithPorts.stage.Cache;
 import teetime.variant.methodcallWithPorts.stage.Clock;
 import teetime.variant.methodcallWithPorts.stage.CollectorSink;
 import teetime.variant.methodcallWithPorts.stage.CountingFilter;
-import teetime.variant.methodcallWithPorts.stage.EndStage;
 import teetime.variant.methodcallWithPorts.stage.InstanceOfFilter;
 import teetime.variant.methodcallWithPorts.stage.ThroughputFilter;
 import teetime.variant.methodcallWithPorts.stage.kieker.Dir2RecordsFilter;
@@ -32,6 +32,7 @@ public class TraceReconstructionAnalysis extends Analysis {
 	private final List<TraceEventRecords> elementCollection = new LinkedList<TraceEventRecords>();
 
 	private Thread producerThread;
+	private Thread clockThread;
 
 	private ClassNameRegistryRepository classNameRegistryRepository;
 
@@ -39,29 +40,28 @@ public class TraceReconstructionAnalysis extends Analysis {
 
 	private CountingFilter<TraceEventRecords> traceCounter;
 
-	private ThroughputFilter<TraceEventRecords> throughputFilter;
+	private ThroughputFilter<IFlowRecord> throughputFilter;
+
+	private File inputDir;
 
 	@Override
 	public void init() {
 		super.init();
-		Pipeline<Void, Void> clockPipeline = this.buildClockPipeline();
-		this.producerThread = new Thread(new RunnableStage(clockPipeline));
+		StageWithPort<Void, Long> clockStage = this.buildClockPipeline();
+		this.clockThread = new Thread(new RunnableStage(clockStage));
 
-		Pipeline<File, Void> producerPipeline = this.buildProducerPipeline(clockPipeline);
+		Pipeline<File, Void> producerPipeline = this.buildProducerPipeline(clockStage);
 		this.producerThread = new Thread(new RunnableStage(producerPipeline));
 	}
 
-	private Pipeline<Void, Void> buildClockPipeline() {
+	private StageWithPort<Void, Long> buildClockPipeline() {
 		Clock clock = new Clock();
-		clock.setIntervalDelayInMs(1000);
+		clock.setIntervalDelayInMs(50);
 
-		Pipeline<Void, Void> pipeline = new Pipeline<Void, Void>();
-		pipeline.setFirstStage(clock);
-		pipeline.setLastStage(new EndStage<Void>());
-		return pipeline;
+		return clock;
 	}
 
-	private Pipeline<File, Void> buildProducerPipeline(final Pipeline<Void, Void> clockPipeline) {
+	private Pipeline<File, Void> buildProducerPipeline(final StageWithPort<Void, Long> clockStage) {
 		this.classNameRegistryRepository = new ClassNameRegistryRepository();
 
 		// final IsIMonitoringRecordInRange isIMonitoringRecordInRange = new IsIMonitoringRecordInRange(0, 1000);
@@ -79,8 +79,8 @@ public class TraceReconstructionAnalysis extends Analysis {
 		// isOperationExecutionRecordTraceIdPredicate);
 		final InstanceOfFilter<IMonitoringRecord, IFlowRecord> instanceOfFilter = new InstanceOfFilter<IMonitoringRecord, IFlowRecord>(
 				IFlowRecord.class);
+		this.throughputFilter = new ThroughputFilter<IFlowRecord>();
 		final TraceReconstructionFilter traceReconstructionFilter = new TraceReconstructionFilter();
-		this.throughputFilter = new ThroughputFilter<TraceEventRecords>();
 		this.traceCounter = new CountingFilter<TraceEventRecords>();
 		final CollectorSink<TraceEventRecords> collector = new CollectorSink<TraceEventRecords>(this.elementCollection);
 
@@ -94,15 +94,16 @@ public class TraceReconstructionAnalysis extends Analysis {
 		SingleElementPipe.connect(this.recordCounter.getOutputPort(), cache.getInputPort());
 		SingleElementPipe.connect(cache.getOutputPort(), stringBufferFilter.getInputPort());
 		SingleElementPipe.connect(stringBufferFilter.getOutputPort(), instanceOfFilter.getInputPort());
-		SingleElementPipe.connect(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort());
-		SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), this.throughputFilter.getInputPort());
-		SingleElementPipe.connect(this.throughputFilter.getOutputPort(), this.traceCounter.getInputPort());
+		SingleElementPipe.connect(instanceOfFilter.getOutputPort(), this.throughputFilter.getInputPort());
+		SingleElementPipe.connect(this.throughputFilter.getOutputPort(), traceReconstructionFilter.getInputPort());
+		// SingleElementPipe.connect(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort());
+		SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), this.traceCounter.getInputPort());
 		SingleElementPipe.connect(this.traceCounter.getOutputPort(), collector.getInputPort());
 
-		SpScPipe.connect(clockPipeline.getOutputPort(), this.throughputFilter.getTriggerInputPort(), 1);
+		SpScPipe.connect(clockStage.getOutputPort(), this.throughputFilter.getTriggerInputPort(), 1);
 
 		// fill input ports
-		dir2RecordsFilter.getInputPort().getPipe().add(new File("src/test/data/Eprints-logs"));
+		dir2RecordsFilter.getInputPort().getPipe().add(this.inputDir);
 
 		// create and configure pipeline
 		Pipeline<File, Void> pipeline = new Pipeline<File, Void>();
@@ -111,8 +112,8 @@ public class TraceReconstructionAnalysis extends Analysis {
 		pipeline.addIntermediateStage(cache);
 		pipeline.addIntermediateStage(stringBufferFilter);
 		pipeline.addIntermediateStage(instanceOfFilter);
-		pipeline.addIntermediateStage(traceReconstructionFilter);
 		pipeline.addIntermediateStage(this.throughputFilter);
+		pipeline.addIntermediateStage(traceReconstructionFilter);
 		pipeline.addIntermediateStage(this.traceCounter);
 		pipeline.setLastStage(collector);
 		return pipeline;
@@ -122,6 +123,7 @@ public class TraceReconstructionAnalysis extends Analysis {
 	public void start() {
 		super.start();
 
+		this.clockThread.start();
 		this.producerThread.start();
 
 		try {
@@ -129,6 +131,7 @@ public class TraceReconstructionAnalysis extends Analysis {
 		} catch (InterruptedException e) {
 			throw new IllegalStateException(e);
 		}
+		this.clockThread.interrupt();
 	}
 
 	public List<TraceEventRecords> getElementCollection() {
@@ -146,4 +149,12 @@ public class TraceReconstructionAnalysis extends Analysis {
 	public List<Long> getThroughputs() {
 		return this.throughputFilter.getThroughputs();
 	}
+
+	public File getInputDir() {
+		return inputDir;
+	}
+
+	public void setInputDir(File inputDir) {
+		this.inputDir = inputDir;
+	}
 }
diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TraceReconstructionAnalysisTest.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TraceReconstructionAnalysisTest.java
index 1bf63ab7..8cedeba2 100644
--- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TraceReconstructionAnalysisTest.java
+++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TraceReconstructionAnalysisTest.java
@@ -17,12 +17,15 @@ package teetime.variant.methodcallWithPorts.examples.traceReconstruction;
 
 import static org.junit.Assert.assertEquals;
 
+import java.io.File;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import teetime.util.StatisticsUtil;
 import teetime.util.StopWatch;
 
 import kieker.analysis.plugin.filter.flow.TraceEventRecords;
@@ -48,8 +51,9 @@ public class TraceReconstructionAnalysisTest {
 	}
 
 	@Test
-	public void performAnalysis() {
+	public void performAnalysisWithEprintsLogs() {
 		final TraceReconstructionAnalysis analysis = new TraceReconstructionAnalysis();
+		analysis.setInputDir(new File("src/test/data/Eprints-logs"));
 		analysis.init();
 
 		this.stopWatch.start();
@@ -63,13 +67,68 @@ public class TraceReconstructionAnalysisTest {
 		assertEquals(50002, analysis.getNumRecords());
 		assertEquals(2, analysis.getNumTraces());
 
-		assertEquals(2, analysis.getElementCollection().size());
-
 		TraceEventRecords trace6884 = analysis.getElementCollection().get(0);
 		assertEquals(6884, trace6884.getTraceMetadata().getTraceId());
 
 		TraceEventRecords trace6886 = analysis.getElementCollection().get(1);
 		assertEquals(6886, trace6886.getTraceMetadata().getTraceId());
+
+		Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs());
+		System.out.println("Mean throughput: " + quintiles.get(0.5) + " elements/ms");
+	}
+
+	@Test
+	public void performAnalysisWithKiekerLogs() {
+		final TraceReconstructionAnalysis analysis = new TraceReconstructionAnalysis();
+		analysis.setInputDir(new File("src/test/data/kieker-logs"));
+		analysis.init();
+
+		this.stopWatch.start();
+		try {
+			analysis.start();
+		} finally {
+			this.stopWatch.end();
+			analysis.onTerminate();
+		}
+
+		assertEquals(1489902, analysis.getNumRecords());
+		assertEquals(24013, analysis.getNumTraces());
+
+		TraceEventRecords trace0 = analysis.getElementCollection().get(0);
+		assertEquals(8974347286117089280l, trace0.getTraceMetadata().getTraceId());
+
+		TraceEventRecords trace1 = analysis.getElementCollection().get(1);
+		assertEquals(8974347286117089281l, trace1.getTraceMetadata().getTraceId());
+
+		Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs());
+		System.out.println("Mean throughput: " + quintiles.get(0.5) + " elements/ms");
+	}
+
+	@Test
+	public void performAnalysisWithKieker2Logs() {
+		final TraceReconstructionAnalysis analysis = new TraceReconstructionAnalysis();
+		analysis.setInputDir(new File("src/test/data/kieker2-logs"));
+		analysis.init();
+
+		this.stopWatch.start();
+		try {
+			analysis.start();
+		} finally {
+			this.stopWatch.end();
+			analysis.onTerminate();
+		}
+
+		assertEquals(17371, analysis.getNumRecords());
+		assertEquals(22, analysis.getNumTraces());
+
+		TraceEventRecords trace0 = analysis.getElementCollection().get(0);
+		assertEquals(0, trace0.getTraceMetadata().getTraceId());
+
+		TraceEventRecords trace1 = analysis.getElementCollection().get(1);
+		assertEquals(1, trace1.getTraceMetadata().getTraceId());
+
+		Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs());
+		System.out.println("Mean throughput: " + quintiles.get(0.5) + " elements/ms");
 	}
 
 }
-- 
GitLab