Skip to content
Snippets Groups Projects
Commit 630507d1 authored by Christian Wulf's avatar Christian Wulf
Browse files

added ChwHomeTcpTraceReadingTest

parent 3018be26
No related branches found
No related tags found
No related merge requests found
Showing with 217 additions and 51 deletions
...@@ -35,24 +35,27 @@ public class ElementThroughputMeasuringStage<T> extends ConsumerStage<T, T> { ...@@ -35,24 +35,27 @@ public class ElementThroughputMeasuringStage<T> extends ConsumerStage<T, T> {
private void computeElementThroughput(final Long timestampInNs) { private void computeElementThroughput(final Long timestampInNs) {
long diffInNs = timestampInNs - this.lastTimestampInNs; long diffInNs = timestampInNs - this.lastTimestampInNs;
// the minimum time granularity of the clock is ms
// BETTER use the TimeUnit of the clock long diffInMs = TimeUnit.NANOSECONDS.toMillis(diffInNs);
long throughputPerTimeUnit = -1; double throughputPerMs = (double) this.numPassedElements / diffInMs;
this.logger.info("Throughput: " + String.format("%.3f", throughputPerMs) + " elements/ms" + " -> numPassedElements=" + this.numPassedElements);
long diffInSec = TimeUnit.NANOSECONDS.toSeconds(diffInNs);
if (diffInSec > 0) { // long throughputPerTimeUnit = -1;
throughputPerTimeUnit = this.numPassedElements / diffInSec; //
this.logger.info("Throughput: " + throughputPerTimeUnit + " elements/s" + " -> numPassedElements=" + this.numPassedElements); // long diffInSec = TimeUnit.NANOSECONDS.toSeconds(diffInNs);
} else { // if (diffInSec > 0) {
long diffInMs = TimeUnit.NANOSECONDS.toMillis(diffInNs); // throughputPerTimeUnit = this.numPassedElements / diffInSec;
if (diffInMs > 0) { // this.logger.info("Throughput: " + throughputPerTimeUnit + " elements/s" + " -> numPassedElements=" + this.numPassedElements);
throughputPerTimeUnit = this.numPassedElements / diffInMs; // } else {
this.logger.info("Throughput: " + throughputPerTimeUnit + " elements/ms" + " -> numPassedElements=" + this.numPassedElements); // long diffInMs = TimeUnit.NANOSECONDS.toMillis(diffInNs);
// if (diffInMs > 0) {
} // throughputPerTimeUnit = this.numPassedElements / diffInMs;
} // this.logger.info("Throughput: " + throughputPerTimeUnit + " elements/ms" + " -> numPassedElements=" + this.numPassedElements);
//
this.throughputs.add(throughputPerTimeUnit); // }
// }
this.throughputs.add((long) throughputPerMs);
this.resetTimestamp(timestampInNs); this.resetTimestamp(timestampInNs);
} }
......
package teetime.variant.methodcallWithPorts.examples.kiekerdays; package teetime.variant.methodcallWithPorts.examples.kiekerdays;
import java.util.LinkedList;
import java.util.List;
import teetime.variant.explicitScheduling.framework.core.Analysis; import teetime.variant.explicitScheduling.framework.core.Analysis;
import teetime.variant.methodcallWithPorts.framework.core.Pipeline; import teetime.variant.methodcallWithPorts.framework.core.Pipeline;
import teetime.variant.methodcallWithPorts.framework.core.RunnableStage; import teetime.variant.methodcallWithPorts.framework.core.RunnableStage;
...@@ -10,17 +7,12 @@ import teetime.variant.methodcallWithPorts.framework.core.StageWithPort; ...@@ -10,17 +7,12 @@ import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe; import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe;
import teetime.variant.methodcallWithPorts.stage.EndStage; import teetime.variant.methodcallWithPorts.stage.EndStage;
import kieker.analysis.plugin.filter.flow.TraceEventRecords;
import kieker.common.record.IMonitoringRecord; import kieker.common.record.IMonitoringRecord;
public class TcpTraceLogging extends Analysis { public class TcpTraceLogging extends Analysis {
private final List<TraceEventRecords> elementCollection = new LinkedList<TraceEventRecords>();
private Thread tcpThread; private Thread tcpThread;
private int numWorkerThreads;
@Override @Override
public void init() { public void init() {
super.init(); super.init();
...@@ -28,19 +20,6 @@ public class TcpTraceLogging extends Analysis { ...@@ -28,19 +20,6 @@ public class TcpTraceLogging extends Analysis {
this.tcpThread = new Thread(new RunnableStage<Void>(tcpPipeline)); this.tcpThread = new Thread(new RunnableStage<Void>(tcpPipeline));
} }
private StageWithPort<Void, IMonitoringRecord> buildTcpPipeline() {
TCPReaderSink tcpReader = new TCPReaderSink();
EndStage<IMonitoringRecord> endStage = new EndStage<IMonitoringRecord>();
SingleElementPipe.connect(tcpReader.getOutputPort(), endStage.getInputPort());
// create and configure pipeline
Pipeline<Void, IMonitoringRecord> pipeline = new Pipeline<Void, IMonitoringRecord>();
pipeline.setFirstStage(tcpReader);
pipeline.setLastStage(endStage);
return pipeline;
}
@Override @Override
public void start() { public void start() {
super.start(); super.start();
...@@ -54,21 +33,21 @@ public class TcpTraceLogging extends Analysis { ...@@ -54,21 +33,21 @@ public class TcpTraceLogging extends Analysis {
} }
} }
public List<TraceEventRecords> getElementCollection() { private StageWithPort<Void, IMonitoringRecord> buildTcpPipeline() {
return this.elementCollection; TCPReaderSink tcpReader = new TCPReaderSink();
} EndStage<IMonitoringRecord> endStage = new EndStage<IMonitoringRecord>();
public int getNumWorkerThreads() { SingleElementPipe.connect(tcpReader.getOutputPort(), endStage.getInputPort());
return this.numWorkerThreads;
}
public void setNumWorkerThreads(final int numWorkerThreads) { // create and configure pipeline
this.numWorkerThreads = numWorkerThreads; Pipeline<Void, IMonitoringRecord> pipeline = new Pipeline<Void, IMonitoringRecord>();
pipeline.setFirstStage(tcpReader);
pipeline.setLastStage(endStage);
return pipeline;
} }
public static void main(final String[] args) { public static void main(final String[] args) {
final TcpTraceLogging analysis = new TcpTraceLogging(); final TcpTraceLogging analysis = new TcpTraceLogging();
analysis.setNumWorkerThreads(1);
analysis.init(); analysis.init();
try { try {
......
/***************************************************************************
* Copyright 2014 Kieker Project (http://kieker-monitoring.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
***************************************************************************/
package teetime.variant.methodcallWithPorts.examples.traceReading;
import static org.junit.Assert.assertEquals;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Before;
import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.runners.MethodSorters;
import teetime.util.ListUtil;
import teetime.util.StatisticsUtil;
import teetime.util.StopWatch;
/**
* @author Christian Wulf
*
* @since 1.10
*/
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class ChwHomeTcpTraceReadingTest {
private static final int MIO = 1000000;
private static final int EXPECTED_NUM_TRACES = 10 * MIO;
private static final int EXPECTED_NUM_RECORDS = 21 * EXPECTED_NUM_TRACES + 1;
private StopWatch stopWatch;
@Before
public void before() {
this.stopWatch = new StopWatch();
}
@After
public void after() {
long overallDurationInNs = this.stopWatch.getDurationInNs();
System.out.println("Duration: " + TimeUnit.NANOSECONDS.toMillis(overallDurationInNs) + " ms");
}
@Test
public void performAnalysis() {
final TcpTraceLoggingExtAnalysis analysis = new TcpTraceLoggingExtAnalysis();
analysis.init();
this.stopWatch.start();
try {
analysis.start();
} finally {
this.stopWatch.end();
analysis.onTerminate();
}
List<Long> recordThroughputs = ListUtil.removeFirstHalfElements(analysis.getRecordThroughputs());
Map<Double, Long> recordQuintiles = StatisticsUtil.calculateQuintiles(recordThroughputs);
System.out.println("Median record throughput: " + recordQuintiles.get(0.5) + " records/time unit");
assertEquals("#records", EXPECTED_NUM_RECORDS, analysis.getNumRecords());
}
}
package teetime.variant.methodcallWithPorts.examples.traceReading;
import java.util.List;
import teetime.variant.explicitScheduling.framework.core.Analysis;
import teetime.variant.methodcallWithPorts.framework.core.Pipeline;
import teetime.variant.methodcallWithPorts.framework.core.RunnableStage;
import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
import teetime.variant.methodcallWithPorts.stage.Clock;
import teetime.variant.methodcallWithPorts.stage.Counter;
import teetime.variant.methodcallWithPorts.stage.ElementThroughputMeasuringStage;
import teetime.variant.methodcallWithPorts.stage.EndStage;
import teetime.variant.methodcallWithPorts.stage.basic.distributor.Distributor;
import teetime.variant.methodcallWithPorts.stage.io.TCPReader;
import kieker.common.record.IMonitoringRecord;
public class TcpTraceLoggingExtAnalysis extends Analysis {
private static final int MIO = 1000000;
private static final int TCP_RELAY_MAX_SIZE = 2 * MIO;
private Thread clockThread;
private Thread tcpThread;
private Counter<IMonitoringRecord> recordCounter;
private ElementThroughputMeasuringStage<IMonitoringRecord> recordThroughputStage;
private StageWithPort<Void, Long> buildClockPipeline(final long intervalDelayInMs) {
Clock clockStage = new Clock();
clockStage.setInitialDelayInMs(intervalDelayInMs);
clockStage.setIntervalDelayInMs(intervalDelayInMs);
Distributor<Long> distributor = new Distributor<Long>();
SingleElementPipe.connect(clockStage.getOutputPort(), distributor.getInputPort());
// create and configure pipeline
Pipeline<Void, Long> pipeline = new Pipeline<Void, Long>();
pipeline.setFirstStage(clockStage);
pipeline.setLastStage(distributor);
return pipeline;
}
private StageWithPort<Void, IMonitoringRecord> buildTcpPipeline(final StageWithPort<Void, Long> clockPipeline) {
TCPReader tcpReader = new TCPReader();
this.recordCounter = new Counter<IMonitoringRecord>();
this.recordThroughputStage = new ElementThroughputMeasuringStage<IMonitoringRecord>();
EndStage<IMonitoringRecord> endStage = new EndStage<IMonitoringRecord>();
SingleElementPipe.connect(tcpReader.getOutputPort(), this.recordCounter.getInputPort());
SingleElementPipe.connect(this.recordCounter.getOutputPort(), this.recordThroughputStage.getInputPort());
SingleElementPipe.connect(this.recordThroughputStage.getOutputPort(), endStage.getInputPort());
SpScPipe.connect(clockPipeline.getOutputPort(), this.recordThroughputStage.getTriggerInputPort(), TCP_RELAY_MAX_SIZE);
// create and configure pipeline
Pipeline<Void, IMonitoringRecord> pipeline = new Pipeline<Void, IMonitoringRecord>();
pipeline.setFirstStage(tcpReader);
pipeline.addIntermediateStage(this.recordCounter);
pipeline.addIntermediateStage(this.recordThroughputStage);
pipeline.setLastStage(endStage);
return pipeline;
}
@Override
public void init() {
super.init();
StageWithPort<Void, Long> clockPipeline = this.buildClockPipeline(1000);
this.clockThread = new Thread(new RunnableStage<Void>(clockPipeline));
StageWithPort<Void, IMonitoringRecord> tcpPipeline = this.buildTcpPipeline(clockPipeline);
this.tcpThread = new Thread(new RunnableStage<Void>(tcpPipeline));
}
@Override
public void start() {
super.start();
this.tcpThread.start();
this.clockThread.start();
try {
this.tcpThread.join();
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
this.clockThread.interrupt();
}
public int getNumRecords() {
return this.recordCounter.getNumElementsPassed();
}
public List<Long> getRecordThroughputs() {
return this.recordThroughputStage.getThroughputs();
}
}
...@@ -42,7 +42,8 @@ import kieker.common.record.IMonitoringRecord; ...@@ -42,7 +42,8 @@ import kieker.common.record.IMonitoringRecord;
@FixMethodOrder(MethodSorters.NAME_ASCENDING) @FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest { public class ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest {
private static final int EXPECTED_NUM_TRACES = 1000000; private static final int MIO = 1000000;
private static final int EXPECTED_NUM_TRACES = 1 * MIO;
private StopWatch stopWatch; private StopWatch stopWatch;
......
...@@ -34,7 +34,8 @@ import kieker.common.record.flow.trace.TraceMetadata; ...@@ -34,7 +34,8 @@ import kieker.common.record.flow.trace.TraceMetadata;
public class TcpTraceReconstructionAnalysisWithThreads extends Analysis { public class TcpTraceReconstructionAnalysisWithThreads extends Analysis {
private static final int NUM_VIRTUAL_CORES = Runtime.getRuntime().availableProcessors(); private static final int NUM_VIRTUAL_CORES = Runtime.getRuntime().availableProcessors();
private static final int TCP_RELAY_MAX_SIZE = 2000000; private static final int MIO = 1000000;
private static final int TCP_RELAY_MAX_SIZE = 2 * MIO;
private final List<TraceEventRecords> elementCollection = new LinkedList<TraceEventRecords>(); private final List<TraceEventRecords> elementCollection = new LinkedList<TraceEventRecords>();
......
...@@ -39,7 +39,8 @@ import kieker.common.record.flow.trace.TraceMetadata; ...@@ -39,7 +39,8 @@ import kieker.common.record.flow.trace.TraceMetadata;
public class TcpTraceReductionAnalysisWithThreads extends Analysis { public class TcpTraceReductionAnalysisWithThreads extends Analysis {
private static final int NUM_VIRTUAL_CORES = Runtime.getRuntime().availableProcessors(); private static final int NUM_VIRTUAL_CORES = Runtime.getRuntime().availableProcessors();
private static final int TCP_RELAY_MAX_SIZE = 500000; private static final int MIO = 1000000;
private static final int TCP_RELAY_MAX_SIZE = (int) (0.5 * MIO);
private final List<TraceEventRecords> elementCollection = new LinkedList<TraceEventRecords>(); private final List<TraceEventRecords> elementCollection = new LinkedList<TraceEventRecords>();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment