Skip to content
Snippets Groups Projects
Commit 6d154c78 authored by Christian Wulf's avatar Christian Wulf
Browse files

added tcp analysis

parent 229fe495
No related branches found
No related tags found
No related merge requests found
Showing
with 334 additions and 20 deletions
...@@ -11,6 +11,8 @@ import teetime.variant.methodcallWithPorts.framework.core.StageWithPort; ...@@ -11,6 +11,8 @@ import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
public class EndStage<T> implements StageWithPort<T, T> { public class EndStage<T> implements StageWithPort<T, T> {
private final InputPort<T> inputPort = new InputPort<T>(this);
public int count; public int count;
public ConstructorClosure<?> closure; public ConstructorClosure<?> closure;
public List<Object> list = new LinkedList<Object>(); public List<Object> list = new LinkedList<Object>();
...@@ -46,8 +48,7 @@ public class EndStage<T> implements StageWithPort<T, T> { ...@@ -46,8 +48,7 @@ public class EndStage<T> implements StageWithPort<T, T> {
@Override @Override
public InputPort<T> getInputPort() { public InputPort<T> getInputPort() {
// TODO Auto-generated method stub return this.inputPort;
return null;
} }
@Override @Override
......
...@@ -35,10 +35,12 @@ public class ThroughputFilter<T> extends ConsumerStage<T, T> { ...@@ -35,10 +35,12 @@ public class ThroughputFilter<T> extends ConsumerStage<T, T> {
private void computeThroughput() { private void computeThroughput() {
long diffInNs = System.nanoTime() - this.timestamp; long diffInNs = System.nanoTime() - this.timestamp;
long diffInMs = TimeUnit.NANOSECONDS.toMillis(diffInNs); // long diffInMs = TimeUnit.NANOSECONDS.toMillis(diffInNs);
long throughputPerMs = this.numPassedElements / diffInMs; // long throughputPerMs = this.numPassedElements / diffInMs;
this.throughputs.add(throughputPerMs); long diffInSec = TimeUnit.NANOSECONDS.toSeconds(diffInNs);
// this.logger.info("Throughput: " + throughputPerMs + " elements/ms"); long throughputPerSec = this.numPassedElements / diffInSec;
this.throughputs.add(throughputPerSec);
this.logger.info("Throughput: " + throughputPerSec + " elements/s");
} }
private void resetTimestamp() { private void resetTimestamp() {
......
...@@ -19,14 +19,18 @@ import java.io.IOException; ...@@ -19,14 +19,18 @@ import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.BufferUnderflowException; import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.ServerSocketChannel; import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import teetime.variant.methodcallWithPorts.framework.core.ProducerStage; import teetime.variant.methodcallWithPorts.framework.core.ProducerStage;
import kieker.common.exception.MonitoringRecordException; import kieker.common.exception.MonitoringRecordException;
import kieker.common.logging.Log;
import kieker.common.logging.LogFactory;
import kieker.common.record.AbstractMonitoringRecord; import kieker.common.record.AbstractMonitoringRecord;
import kieker.common.record.IMonitoringRecord; import kieker.common.record.IMonitoringRecord;
import kieker.common.record.misc.RegistryRecord;
import kieker.common.util.registry.ILookup; import kieker.common.util.registry.ILookup;
import kieker.common.util.registry.Lookup; import kieker.common.util.registry.Lookup;
...@@ -46,6 +50,8 @@ public class TCPReader extends ProducerStage<Void, IMonitoringRecord> { ...@@ -46,6 +50,8 @@ public class TCPReader extends ProducerStage<Void, IMonitoringRecord> {
private int port1 = 10133; private int port1 = 10133;
private int port2 = 10134; private int port2 = 10134;
private TCPStringReader tcpStringReader;
// @Override // implement onStop // @Override // implement onStop
// public void onPipelineStops() { // public void onPipelineStops() {
// super.logger.info("Shutdown of TCPReader requested."); // super.logger.info("Shutdown of TCPReader requested.");
...@@ -69,6 +75,13 @@ public class TCPReader extends ProducerStage<Void, IMonitoringRecord> { ...@@ -69,6 +75,13 @@ public class TCPReader extends ProducerStage<Void, IMonitoringRecord> {
this.port2 = port2; this.port2 = port2;
} }
@Override
public void onStart() {
this.tcpStringReader = new TCPStringReader(this.port2, this.stringRegistry);
this.tcpStringReader.start();
super.onStart();
}
@Override @Override
protected void execute5(final Void element) { protected void execute5(final Void element) {
ServerSocketChannel serversocket = null; ServerSocketChannel serversocket = null;
...@@ -120,6 +133,82 @@ public class TCPReader extends ProducerStage<Void, IMonitoringRecord> { ...@@ -120,6 +133,82 @@ public class TCPReader extends ProducerStage<Void, IMonitoringRecord> {
} }
} }
} }
this.setReschedulable(false);
this.tcpStringReader.terminate();
}
}
/**
*
* @author Jan Waller
*
* @since 1.8
*/
private static class TCPStringReader extends Thread {
private static final int MESSAGE_BUFFER_SIZE = 65535;
private static final Log LOG = LogFactory.getLog(TCPStringReader.class);
private final int port;
private final ILookup<String> stringRegistry;
private volatile boolean terminated = false; // NOPMD
private volatile Thread readerThread;
public TCPStringReader(final int port, final ILookup<String> stringRegistry) {
this.port = port;
this.stringRegistry = stringRegistry;
}
public void terminate() {
this.terminated = true;
this.readerThread.interrupt();
}
@Override
public void run() {
this.readerThread = Thread.currentThread();
ServerSocketChannel serversocket = null;
try {
serversocket = ServerSocketChannel.open();
serversocket.socket().bind(new InetSocketAddress(this.port));
if (LOG.isDebugEnabled()) {
LOG.debug("Listening on port " + this.port);
}
// BEGIN also loop this one?
final SocketChannel socketChannel = serversocket.accept();
final ByteBuffer buffer = ByteBuffer.allocateDirect(MESSAGE_BUFFER_SIZE);
while ((socketChannel.read(buffer) != -1) && (!this.terminated)) {
buffer.flip();
try {
while (buffer.hasRemaining()) {
buffer.mark();
RegistryRecord.registerRecordInRegistry(buffer, this.stringRegistry);
}
buffer.clear();
} catch (final BufferUnderflowException ex) {
buffer.reset();
buffer.compact();
}
}
socketChannel.close();
// END also loop this one?
} catch (final ClosedByInterruptException ex) {
LOG.warn("Reader interrupted", ex);
} catch (final IOException ex) {
LOG.error("Error while reading", ex);
} finally {
if (null != serversocket) {
try {
serversocket.close();
} catch (final IOException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Failed to close TCP connection!", e);
}
}
}
}
} }
} }
} }
...@@ -79,7 +79,7 @@ public class ChwHomeTraceReconstructionAnalysisTest { ...@@ -79,7 +79,7 @@ public class ChwHomeTraceReconstructionAnalysisTest {
assertEquals(6886, trace6886.getTraceMetadata().getTraceId()); assertEquals(6886, trace6886.getTraceMetadata().getTraceId());
Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs()); Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs());
System.out.println("Mean throughput: " + quintiles.get(0.5) + " elements/ms"); System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/ms");
} }
@Test @Test
...@@ -106,7 +106,7 @@ public class ChwHomeTraceReconstructionAnalysisTest { ...@@ -106,7 +106,7 @@ public class ChwHomeTraceReconstructionAnalysisTest {
assertEquals(8974347286117089281l, trace1.getTraceMetadata().getTraceId()); assertEquals(8974347286117089281l, trace1.getTraceMetadata().getTraceId());
Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs()); Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs());
System.out.println("Mean throughput: " + quintiles.get(0.5) + " elements/ms"); System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/ms");
assertThat(quintiles.get(0.5), is(both(greaterThan(1700l)).and(lessThan(1900l)))); assertThat(quintiles.get(0.5), is(both(greaterThan(1700l)).and(lessThan(1900l))));
} }
...@@ -135,7 +135,7 @@ public class ChwHomeTraceReconstructionAnalysisTest { ...@@ -135,7 +135,7 @@ public class ChwHomeTraceReconstructionAnalysisTest {
assertEquals(1, trace1.getTraceMetadata().getTraceId()); assertEquals(1, trace1.getTraceMetadata().getTraceId());
Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs()); Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs());
System.out.println("Mean throughput: " + quintiles.get(0.5) + " elements/ms"); System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/ms");
} }
} }
/***************************************************************************
* Copyright 2014 Kieker Project (http://kieker-monitoring.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
***************************************************************************/
package teetime.variant.methodcallWithPorts.examples.traceReconstruction;
import static org.junit.Assert.assertEquals;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import teetime.util.StatisticsUtil;
import teetime.util.StopWatch;
/**
* @author Christian Wulf
*
* @since 1.10
*/
public class ChwWorkTcpTraceReconstructionAnalysisTest {
private StopWatch stopWatch;
@Before
public void before() {
this.stopWatch = new StopWatch();
}
@After
public void after() {
long overallDurationInNs = this.stopWatch.getDurationInNs();
System.out.println("Duration: " + TimeUnit.NANOSECONDS.toMillis(overallDurationInNs) + " ms");
}
@Test
public void performAnalysis() {
final TcpTraceReconstructionAnalysis analysis = new TcpTraceReconstructionAnalysis();
analysis.init();
this.stopWatch.start();
try {
analysis.start();
} finally {
this.stopWatch.end();
analysis.onTerminate();
}
assertEquals(21001, analysis.getNumRecords());
assertEquals(1000, analysis.getNumTraces());
// TraceEventRecords trace6884 = analysis.getElementCollection().get(0);
// assertEquals(6884, trace6884.getTraceMetadata().getTraceId());
//
// TraceEventRecords trace6886 = analysis.getElementCollection().get(1);
// assertEquals(6886, trace6886.getTraceMetadata().getTraceId());
Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getRecordThroughputs());
System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/ms");
}
}
...@@ -79,7 +79,7 @@ public class ChwWorkTraceReconstructionAnalysisTest { ...@@ -79,7 +79,7 @@ public class ChwWorkTraceReconstructionAnalysisTest {
assertEquals(6886, trace6886.getTraceMetadata().getTraceId()); assertEquals(6886, trace6886.getTraceMetadata().getTraceId());
Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs()); Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs());
System.out.println("Mean throughput: " + quintiles.get(0.5) + " elements/ms"); System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/ms");
} }
@Test @Test
...@@ -106,7 +106,7 @@ public class ChwWorkTraceReconstructionAnalysisTest { ...@@ -106,7 +106,7 @@ public class ChwWorkTraceReconstructionAnalysisTest {
assertEquals(8974347286117089281l, trace1.getTraceMetadata().getTraceId()); assertEquals(8974347286117089281l, trace1.getTraceMetadata().getTraceId());
Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs()); Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs());
System.out.println("Mean throughput: " + quintiles.get(0.5) + " elements/ms"); System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/ms");
assertThat(quintiles.get(0.5), is(both(greaterThan(1100l)).and(lessThan(1400l)))); assertThat(quintiles.get(0.5), is(both(greaterThan(1100l)).and(lessThan(1400l))));
} }
...@@ -135,7 +135,7 @@ public class ChwWorkTraceReconstructionAnalysisTest { ...@@ -135,7 +135,7 @@ public class ChwWorkTraceReconstructionAnalysisTest {
assertEquals(1, trace1.getTraceMetadata().getTraceId()); assertEquals(1, trace1.getTraceMetadata().getTraceId());
Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs()); Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs());
System.out.println("Mean throughput: " + quintiles.get(0.5) + " elements/ms"); System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/ms");
} }
} }
package teetime.variant.methodcallWithPorts.examples.traceReconstruction;
import java.util.LinkedList;
import java.util.List;
import teetime.variant.explicitScheduling.framework.core.Analysis;
import teetime.variant.methodcallWithPorts.framework.core.Pipeline;
import teetime.variant.methodcallWithPorts.framework.core.RunnableStage;
import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
import teetime.variant.methodcallWithPorts.stage.Clock;
import teetime.variant.methodcallWithPorts.stage.CountingFilter;
import teetime.variant.methodcallWithPorts.stage.EndStage;
import teetime.variant.methodcallWithPorts.stage.InstanceOfFilter;
import teetime.variant.methodcallWithPorts.stage.ThroughputFilter;
import teetime.variant.methodcallWithPorts.stage.io.TCPReader;
import teetime.variant.methodcallWithPorts.stage.kieker.className.ClassNameRegistryRepository;
import teetime.variant.methodcallWithPorts.stage.kieker.traceReconstruction.TraceReconstructionFilter;
import kieker.analysis.plugin.filter.flow.TraceEventRecords;
import kieker.common.record.IMonitoringRecord;
import kieker.common.record.flow.IFlowRecord;
public class TcpTraceReconstructionAnalysis extends Analysis {
private final List<TraceEventRecords> elementCollection = new LinkedList<TraceEventRecords>();
private Thread clockThread;
private Thread clock2Thread;
private Thread workerThread;
private ClassNameRegistryRepository classNameRegistryRepository;
private CountingFilter<IMonitoringRecord> recordCounter;
private CountingFilter<TraceEventRecords> traceCounter;
private ThroughputFilter<IFlowRecord> recordThroughputFilter;
private ThroughputFilter<TraceEventRecords> traceThroughputFilter;
@Override
public void init() {
super.init();
StageWithPort<Void, Long> clockStage = this.buildClockPipeline();
this.clockThread = new Thread(new RunnableStage(clockStage));
StageWithPort<Void, Long> clock2Stage = this.buildClock2Pipeline();
this.clock2Thread = new Thread(new RunnableStage(clock2Stage));
Pipeline<?, ?> pipeline = this.buildPipeline(clockStage, clock2Stage);
this.workerThread = new Thread(new RunnableStage(pipeline));
}
private StageWithPort<Void, Long> buildClockPipeline() {
Clock clock = new Clock();
clock.setIntervalDelayInMs(1000);
return clock;
}
private StageWithPort<Void, Long> buildClock2Pipeline() {
Clock clock = new Clock();
clock.setIntervalDelayInMs(2000);
return clock;
}
private Pipeline<Void, TraceEventRecords> buildPipeline(final StageWithPort<Void, Long> clockStage, final StageWithPort<Void, Long> clock2Stage) {
this.classNameRegistryRepository = new ClassNameRegistryRepository();
// create stages
TCPReader tcpReader = new TCPReader();
this.recordCounter = new CountingFilter<IMonitoringRecord>();
final InstanceOfFilter<IMonitoringRecord, IFlowRecord> instanceOfFilter = new InstanceOfFilter<IMonitoringRecord, IFlowRecord>(
IFlowRecord.class);
this.recordThroughputFilter = new ThroughputFilter<IFlowRecord>();
final TraceReconstructionFilter traceReconstructionFilter = new TraceReconstructionFilter();
this.traceThroughputFilter = new ThroughputFilter<TraceEventRecords>();
this.traceCounter = new CountingFilter<TraceEventRecords>();
EndStage<TraceEventRecords> endStage = new EndStage<TraceEventRecords>();
// connect stages
SpScPipe.connect(tcpReader.getOutputPort(), this.recordCounter.getInputPort(), 1024);
SingleElementPipe.connect(this.recordCounter.getOutputPort(), instanceOfFilter.getInputPort());
// SingleElementPipe.connect(instanceOfFilter.getOutputPort(), this.recordThroughputFilter.getInputPort());
// SingleElementPipe.connect(this.recordThroughputFilter.getOutputPort(), traceReconstructionFilter.getInputPort());
SingleElementPipe.connect(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort());
SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), this.traceThroughputFilter.getInputPort());
SingleElementPipe.connect(this.traceThroughputFilter.getOutputPort(), this.traceCounter.getInputPort());
SingleElementPipe.connect(this.traceCounter.getOutputPort(), endStage.getInputPort());
SpScPipe.connect(clockStage.getOutputPort(), this.recordThroughputFilter.getTriggerInputPort(), 1);
SpScPipe.connect(clock2Stage.getOutputPort(), this.traceThroughputFilter.getTriggerInputPort(), 1);
// create and configure pipeline
Pipeline<Void, TraceEventRecords> pipeline = new Pipeline<Void, TraceEventRecords>();
pipeline.setFirstStage(tcpReader);
pipeline.addIntermediateStage(this.recordCounter);
pipeline.addIntermediateStage(instanceOfFilter);
// pipeline.addIntermediateStage(this.recordThroughputFilter);
pipeline.addIntermediateStage(traceReconstructionFilter);
pipeline.addIntermediateStage(this.traceThroughputFilter);
pipeline.addIntermediateStage(this.traceCounter);
pipeline.setLastStage(endStage);
return pipeline;
}
@Override
public void start() {
super.start();
this.clockThread.start();
this.clock2Thread.start();
this.workerThread.start();
try {
this.workerThread.join();
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
this.clockThread.interrupt();
this.clock2Thread.interrupt();
}
public List<TraceEventRecords> getElementCollection() {
return this.elementCollection;
}
public int getNumRecords() {
return this.recordCounter.getNumElementsPassed();
}
public int getNumTraces() {
return this.traceCounter.getNumElementsPassed();
}
public List<Long> getRecordThroughputs() {
return this.recordThroughputFilter.getThroughputs();
}
public List<Long> getTraceThroughputFilter() {
return this.traceThroughputFilter.getThroughputs();
}
}
...@@ -31,8 +31,8 @@ public class TraceReconstructionAnalysis extends Analysis { ...@@ -31,8 +31,8 @@ public class TraceReconstructionAnalysis extends Analysis {
private final List<TraceEventRecords> elementCollection = new LinkedList<TraceEventRecords>(); private final List<TraceEventRecords> elementCollection = new LinkedList<TraceEventRecords>();
private Thread producerThread;
private Thread clockThread; private Thread clockThread;
private Thread workerThread;
private ClassNameRegistryRepository classNameRegistryRepository; private ClassNameRegistryRepository classNameRegistryRepository;
...@@ -50,8 +50,8 @@ public class TraceReconstructionAnalysis extends Analysis { ...@@ -50,8 +50,8 @@ public class TraceReconstructionAnalysis extends Analysis {
StageWithPort<Void, Long> clockStage = this.buildClockPipeline(); StageWithPort<Void, Long> clockStage = this.buildClockPipeline();
this.clockThread = new Thread(new RunnableStage(clockStage)); this.clockThread = new Thread(new RunnableStage(clockStage));
Pipeline<File, Void> producerPipeline = this.buildProducerPipeline(clockStage); Pipeline<?, ?> pipeline = this.buildPipeline(clockStage);
this.producerThread = new Thread(new RunnableStage(producerPipeline)); this.workerThread = new Thread(new RunnableStage(pipeline));
} }
private StageWithPort<Void, Long> buildClockPipeline() { private StageWithPort<Void, Long> buildClockPipeline() {
...@@ -61,7 +61,7 @@ public class TraceReconstructionAnalysis extends Analysis { ...@@ -61,7 +61,7 @@ public class TraceReconstructionAnalysis extends Analysis {
return clock; return clock;
} }
private Pipeline<File, Void> buildProducerPipeline(final StageWithPort<Void, Long> clockStage) { private Pipeline<File, Void> buildPipeline(final StageWithPort<Void, Long> clockStage) {
this.classNameRegistryRepository = new ClassNameRegistryRepository(); this.classNameRegistryRepository = new ClassNameRegistryRepository();
// final IsIMonitoringRecordInRange isIMonitoringRecordInRange = new IsIMonitoringRecordInRange(0, 1000); // final IsIMonitoringRecordInRange isIMonitoringRecordInRange = new IsIMonitoringRecordInRange(0, 1000);
...@@ -124,10 +124,10 @@ public class TraceReconstructionAnalysis extends Analysis { ...@@ -124,10 +124,10 @@ public class TraceReconstructionAnalysis extends Analysis {
super.start(); super.start();
this.clockThread.start(); this.clockThread.start();
this.producerThread.start(); this.workerThread.start();
try { try {
this.producerThread.join(); this.workerThread.join();
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new IllegalStateException(e); throw new IllegalStateException(e);
} }
...@@ -151,10 +151,10 @@ public class TraceReconstructionAnalysis extends Analysis { ...@@ -151,10 +151,10 @@ public class TraceReconstructionAnalysis extends Analysis {
} }
public File getInputDir() { public File getInputDir() {
return inputDir; return this.inputDir;
} }
public void setInputDir(File inputDir) { public void setInputDir(final File inputDir) {
this.inputDir = inputDir; this.inputDir = inputDir;
} }
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment