From 41d3c43b4809d3e7fe8a39f79421494bde47b1df Mon Sep 17 00:00:00 2001
From: Christian Wulf <chw@informatik.uni-kiel.de>
Date: Tue, 8 Jul 2014 20:39:55 +0200
Subject: [PATCH] added ChwHomeTcpTraceReadingTest
---
.../ElementThroughputMeasuringStage.java | 39 +++----
.../examples/kiekerdays/TcpTraceLogging.java | 39 ++-----
.../ChwHomeTcpTraceReadingTest.java | 79 ++++++++++++++
.../TcpTraceLoggingExtAnalysis.java | 102 ++++++++++++++++++
...ReconstructionAnalysisWithThreadsTest.java | 3 +-
...raceReconstructionAnalysisWithThreads.java | 3 +-
.../TcpTraceReductionAnalysisWithThreads.java | 3 +-
7 files changed, 217 insertions(+), 51 deletions(-)
create mode 100644 src/test/java/teetime/variant/methodcallWithPorts/examples/traceReading/ChwHomeTcpTraceReadingTest.java
create mode 100644 src/test/java/teetime/variant/methodcallWithPorts/examples/traceReading/TcpTraceLoggingExtAnalysis.java
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/ElementThroughputMeasuringStage.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/ElementThroughputMeasuringStage.java
index 956e13ab..9b742a05 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/stage/ElementThroughputMeasuringStage.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/ElementThroughputMeasuringStage.java
@@ -35,24 +35,27 @@ public class ElementThroughputMeasuringStage<T> extends ConsumerStage<T, T> {
private void computeElementThroughput(final Long timestampInNs) {
long diffInNs = timestampInNs - this.lastTimestampInNs;
-
- // BETTER use the TimeUnit of the clock
- long throughputPerTimeUnit = -1;
-
- long diffInSec = TimeUnit.NANOSECONDS.toSeconds(diffInNs);
- if (diffInSec > 0) {
- throughputPerTimeUnit = this.numPassedElements / diffInSec;
- this.logger.info("Throughput: " + throughputPerTimeUnit + " elements/s" + " -> numPassedElements=" + this.numPassedElements);
- } else {
- long diffInMs = TimeUnit.NANOSECONDS.toMillis(diffInNs);
- if (diffInMs > 0) {
- throughputPerTimeUnit = this.numPassedElements / diffInMs;
- this.logger.info("Throughput: " + throughputPerTimeUnit + " elements/ms" + " -> numPassedElements=" + this.numPassedElements);
-
- }
- }
-
- this.throughputs.add(throughputPerTimeUnit);
+ // the minimum time granularity of the clock is ms
+ long diffInMs = TimeUnit.NANOSECONDS.toMillis(diffInNs);
+ double throughputPerMs = (double) this.numPassedElements / diffInMs;
+ this.logger.info("Throughput: " + String.format("%.3f", throughputPerMs) + " elements/ms" + " -> numPassedElements=" + this.numPassedElements);
+
+ // long throughputPerTimeUnit = -1;
+ //
+ // long diffInSec = TimeUnit.NANOSECONDS.toSeconds(diffInNs);
+ // if (diffInSec > 0) {
+ // throughputPerTimeUnit = this.numPassedElements / diffInSec;
+ // this.logger.info("Throughput: " + throughputPerTimeUnit + " elements/s" + " -> numPassedElements=" + this.numPassedElements);
+ // } else {
+ // long diffInMs = TimeUnit.NANOSECONDS.toMillis(diffInNs);
+ // if (diffInMs > 0) {
+ // throughputPerTimeUnit = this.numPassedElements / diffInMs;
+ // this.logger.info("Throughput: " + throughputPerTimeUnit + " elements/ms" + " -> numPassedElements=" + this.numPassedElements);
+ //
+ // }
+ // }
+
+ this.throughputs.add((long) throughputPerMs);
this.resetTimestamp(timestampInNs);
}
diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceLogging.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceLogging.java
index cfa1d8a2..34146490 100644
--- a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceLogging.java
+++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceLogging.java
@@ -1,8 +1,5 @@
package teetime.variant.methodcallWithPorts.examples.kiekerdays;
-import java.util.LinkedList;
-import java.util.List;
-
import teetime.variant.explicitScheduling.framework.core.Analysis;
import teetime.variant.methodcallWithPorts.framework.core.Pipeline;
import teetime.variant.methodcallWithPorts.framework.core.RunnableStage;
@@ -10,17 +7,12 @@ import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe;
import teetime.variant.methodcallWithPorts.stage.EndStage;
-import kieker.analysis.plugin.filter.flow.TraceEventRecords;
import kieker.common.record.IMonitoringRecord;
public class TcpTraceLogging extends Analysis {
- private final List<TraceEventRecords> elementCollection = new LinkedList<TraceEventRecords>();
-
private Thread tcpThread;
- private int numWorkerThreads;
-
@Override
public void init() {
super.init();
@@ -28,19 +20,6 @@ public class TcpTraceLogging extends Analysis {
this.tcpThread = new Thread(new RunnableStage<Void>(tcpPipeline));
}
- private StageWithPort<Void, IMonitoringRecord> buildTcpPipeline() {
- TCPReaderSink tcpReader = new TCPReaderSink();
- EndStage<IMonitoringRecord> endStage = new EndStage<IMonitoringRecord>();
-
- SingleElementPipe.connect(tcpReader.getOutputPort(), endStage.getInputPort());
-
- // create and configure pipeline
- Pipeline<Void, IMonitoringRecord> pipeline = new Pipeline<Void, IMonitoringRecord>();
- pipeline.setFirstStage(tcpReader);
- pipeline.setLastStage(endStage);
- return pipeline;
- }
-
@Override
public void start() {
super.start();
@@ -54,21 +33,21 @@ public class TcpTraceLogging extends Analysis {
}
}
- public List<TraceEventRecords> getElementCollection() {
- return this.elementCollection;
- }
+ private StageWithPort<Void, IMonitoringRecord> buildTcpPipeline() {
+ TCPReaderSink tcpReader = new TCPReaderSink();
+ EndStage<IMonitoringRecord> endStage = new EndStage<IMonitoringRecord>();
- public int getNumWorkerThreads() {
- return this.numWorkerThreads;
- }
+ SingleElementPipe.connect(tcpReader.getOutputPort(), endStage.getInputPort());
- public void setNumWorkerThreads(final int numWorkerThreads) {
- this.numWorkerThreads = numWorkerThreads;
+ // create and configure pipeline
+ Pipeline<Void, IMonitoringRecord> pipeline = new Pipeline<Void, IMonitoringRecord>();
+ pipeline.setFirstStage(tcpReader);
+ pipeline.setLastStage(endStage);
+ return pipeline;
}
public static void main(final String[] args) {
final TcpTraceLogging analysis = new TcpTraceLogging();
- analysis.setNumWorkerThreads(1);
analysis.init();
try {
diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReading/ChwHomeTcpTraceReadingTest.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReading/ChwHomeTcpTraceReadingTest.java
new file mode 100644
index 00000000..e3d18e31
--- /dev/null
+++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReading/ChwHomeTcpTraceReadingTest.java
@@ -0,0 +1,79 @@
+/***************************************************************************
+ * Copyright 2014 Kieker Project (http://kieker-monitoring.net)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ***************************************************************************/
+package teetime.variant.methodcallWithPorts.examples.traceReading;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.FixMethodOrder;
+import org.junit.Test;
+import org.junit.runners.MethodSorters;
+
+import teetime.util.ListUtil;
+import teetime.util.StatisticsUtil;
+import teetime.util.StopWatch;
+
+/**
+ * @author Christian Wulf
+ *
+ * @since 1.10
+ */
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+public class ChwHomeTcpTraceReadingTest {
+
+ private static final int MIO = 1000000;
+ private static final int EXPECTED_NUM_TRACES = 10 * MIO;
+ private static final int EXPECTED_NUM_RECORDS = 21 * EXPECTED_NUM_TRACES + 1;
+
+ private StopWatch stopWatch;
+
+ @Before
+ public void before() {
+ this.stopWatch = new StopWatch();
+ }
+
+ @After
+ public void after() {
+ long overallDurationInNs = this.stopWatch.getDurationInNs();
+ System.out.println("Duration: " + TimeUnit.NANOSECONDS.toMillis(overallDurationInNs) + " ms");
+ }
+
+ @Test
+ public void performAnalysis() {
+ final TcpTraceLoggingExtAnalysis analysis = new TcpTraceLoggingExtAnalysis();
+ analysis.init();
+
+ this.stopWatch.start();
+ try {
+ analysis.start();
+ } finally {
+ this.stopWatch.end();
+ analysis.onTerminate();
+ }
+
+ List<Long> recordThroughputs = ListUtil.removeFirstHalfElements(analysis.getRecordThroughputs());
+ Map<Double, Long> recordQuintiles = StatisticsUtil.calculateQuintiles(recordThroughputs);
+ System.out.println("Median record throughput: " + recordQuintiles.get(0.5) + " records/time unit");
+
+ assertEquals("#records", EXPECTED_NUM_RECORDS, analysis.getNumRecords());
+ }
+
+}
diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReading/TcpTraceLoggingExtAnalysis.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReading/TcpTraceLoggingExtAnalysis.java
new file mode 100644
index 00000000..08a004e6
--- /dev/null
+++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReading/TcpTraceLoggingExtAnalysis.java
@@ -0,0 +1,102 @@
+package teetime.variant.methodcallWithPorts.examples.traceReading;
+
+import java.util.List;
+
+import teetime.variant.explicitScheduling.framework.core.Analysis;
+import teetime.variant.methodcallWithPorts.framework.core.Pipeline;
+import teetime.variant.methodcallWithPorts.framework.core.RunnableStage;
+import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
+import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe;
+import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
+import teetime.variant.methodcallWithPorts.stage.Clock;
+import teetime.variant.methodcallWithPorts.stage.Counter;
+import teetime.variant.methodcallWithPorts.stage.ElementThroughputMeasuringStage;
+import teetime.variant.methodcallWithPorts.stage.EndStage;
+import teetime.variant.methodcallWithPorts.stage.basic.distributor.Distributor;
+import teetime.variant.methodcallWithPorts.stage.io.TCPReader;
+
+import kieker.common.record.IMonitoringRecord;
+
+public class TcpTraceLoggingExtAnalysis extends Analysis {
+
+ private static final int MIO = 1000000;
+ private static final int TCP_RELAY_MAX_SIZE = 2 * MIO;
+
+ private Thread clockThread;
+ private Thread tcpThread;
+
+ private Counter<IMonitoringRecord> recordCounter;
+ private ElementThroughputMeasuringStage<IMonitoringRecord> recordThroughputStage;
+
+ private StageWithPort<Void, Long> buildClockPipeline(final long intervalDelayInMs) {
+ Clock clockStage = new Clock();
+ clockStage.setInitialDelayInMs(intervalDelayInMs);
+ clockStage.setIntervalDelayInMs(intervalDelayInMs);
+ Distributor<Long> distributor = new Distributor<Long>();
+
+ SingleElementPipe.connect(clockStage.getOutputPort(), distributor.getInputPort());
+
+ // create and configure pipeline
+ Pipeline<Void, Long> pipeline = new Pipeline<Void, Long>();
+ pipeline.setFirstStage(clockStage);
+ pipeline.setLastStage(distributor);
+ return pipeline;
+ }
+
+ private StageWithPort<Void, IMonitoringRecord> buildTcpPipeline(final StageWithPort<Void, Long> clockPipeline) {
+ TCPReader tcpReader = new TCPReader();
+ this.recordCounter = new Counter<IMonitoringRecord>();
+ this.recordThroughputStage = new ElementThroughputMeasuringStage<IMonitoringRecord>();
+ EndStage<IMonitoringRecord> endStage = new EndStage<IMonitoringRecord>();
+
+ SingleElementPipe.connect(tcpReader.getOutputPort(), this.recordCounter.getInputPort());
+ SingleElementPipe.connect(this.recordCounter.getOutputPort(), this.recordThroughputStage.getInputPort());
+ SingleElementPipe.connect(this.recordThroughputStage.getOutputPort(), endStage.getInputPort());
+
+ SpScPipe.connect(clockPipeline.getOutputPort(), this.recordThroughputStage.getTriggerInputPort(), TCP_RELAY_MAX_SIZE);
+
+ // create and configure pipeline
+ Pipeline<Void, IMonitoringRecord> pipeline = new Pipeline<Void, IMonitoringRecord>();
+ pipeline.setFirstStage(tcpReader);
+ pipeline.addIntermediateStage(this.recordCounter);
+ pipeline.addIntermediateStage(this.recordThroughputStage);
+ pipeline.setLastStage(endStage);
+ return pipeline;
+ }
+
+ @Override
+ public void init() {
+ super.init();
+
+ StageWithPort<Void, Long> clockPipeline = this.buildClockPipeline(1000);
+ this.clockThread = new Thread(new RunnableStage<Void>(clockPipeline));
+
+ StageWithPort<Void, IMonitoringRecord> tcpPipeline = this.buildTcpPipeline(clockPipeline);
+ this.tcpThread = new Thread(new RunnableStage<Void>(tcpPipeline));
+ }
+
+ @Override
+ public void start() {
+ super.start();
+
+ this.tcpThread.start();
+ this.clockThread.start();
+
+ try {
+ this.tcpThread.join();
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(e);
+ }
+
+ this.clockThread.interrupt();
+ }
+
+ public int getNumRecords() {
+ return this.recordCounter.getNumElementsPassed();
+ }
+
+ public List<Long> getRecordThroughputs() {
+ return this.recordThroughputStage.getThroughputs();
+ }
+
+}
diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest.java
index db807107..147bed62 100644
--- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest.java
+++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest.java
@@ -42,7 +42,8 @@ import kieker.common.record.IMonitoringRecord;
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest {
- private static final int EXPECTED_NUM_TRACES = 1000000;
+ private static final int MIO = 1000000;
+ private static final int EXPECTED_NUM_TRACES = 1 * MIO;
private StopWatch stopWatch;
diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreads.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreads.java
index 20a52ac9..1c59cc3b 100644
--- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreads.java
+++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreads.java
@@ -34,7 +34,8 @@ import kieker.common.record.flow.trace.TraceMetadata;
public class TcpTraceReconstructionAnalysisWithThreads extends Analysis {
private static final int NUM_VIRTUAL_CORES = Runtime.getRuntime().availableProcessors();
- private static final int TCP_RELAY_MAX_SIZE = 2000000;
+ private static final int MIO = 1000000;
+ private static final int TCP_RELAY_MAX_SIZE = 2 * MIO;
private final List<TraceEventRecords> elementCollection = new LinkedList<TraceEventRecords>();
diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReductionWithThreads/TcpTraceReductionAnalysisWithThreads.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReductionWithThreads/TcpTraceReductionAnalysisWithThreads.java
index c93ffd30..6c1df935 100644
--- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReductionWithThreads/TcpTraceReductionAnalysisWithThreads.java
+++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReductionWithThreads/TcpTraceReductionAnalysisWithThreads.java
@@ -39,7 +39,8 @@ import kieker.common.record.flow.trace.TraceMetadata;
public class TcpTraceReductionAnalysisWithThreads extends Analysis {
private static final int NUM_VIRTUAL_CORES = Runtime.getRuntime().availableProcessors();
- private static final int TCP_RELAY_MAX_SIZE = 500000;
+ private static final int MIO = 1000000;
+ private static final int TCP_RELAY_MAX_SIZE = (int) (0.5 * MIO);
private final List<TraceEventRecords> elementCollection = new LinkedList<TraceEventRecords>();
--
GitLab