Skip to content
Snippets Groups Projects
Commit 983102f3 authored by Christian Wulf's avatar Christian Wulf
Browse files

added tcp analysis

parent 9a2a11b5
Branches
Tags
No related merge requests found
Showing
with 334 additions and 20 deletions
......@@ -11,6 +11,8 @@ import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
public class EndStage<T> implements StageWithPort<T, T> {
private final InputPort<T> inputPort = new InputPort<T>(this);
public int count;
public ConstructorClosure<?> closure;
public List<Object> list = new LinkedList<Object>();
......@@ -46,8 +48,7 @@ public class EndStage<T> implements StageWithPort<T, T> {
@Override
public InputPort<T> getInputPort() {
// TODO Auto-generated method stub
return null;
return this.inputPort;
}
@Override
......
......@@ -35,10 +35,12 @@ public class ThroughputFilter<T> extends ConsumerStage<T, T> {
private void computeThroughput() {
long diffInNs = System.nanoTime() - this.timestamp;
long diffInMs = TimeUnit.NANOSECONDS.toMillis(diffInNs);
long throughputPerMs = this.numPassedElements / diffInMs;
this.throughputs.add(throughputPerMs);
// this.logger.info("Throughput: " + throughputPerMs + " elements/ms");
// long diffInMs = TimeUnit.NANOSECONDS.toMillis(diffInNs);
// long throughputPerMs = this.numPassedElements / diffInMs;
long diffInSec = TimeUnit.NANOSECONDS.toSeconds(diffInNs);
long throughputPerSec = this.numPassedElements / diffInSec;
this.throughputs.add(throughputPerSec);
this.logger.info("Throughput: " + throughputPerSec + " elements/s");
}
private void resetTimestamp() {
......
......@@ -19,14 +19,18 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import teetime.variant.methodcallWithPorts.framework.core.ProducerStage;
import kieker.common.exception.MonitoringRecordException;
import kieker.common.logging.Log;
import kieker.common.logging.LogFactory;
import kieker.common.record.AbstractMonitoringRecord;
import kieker.common.record.IMonitoringRecord;
import kieker.common.record.misc.RegistryRecord;
import kieker.common.util.registry.ILookup;
import kieker.common.util.registry.Lookup;
......@@ -46,6 +50,8 @@ public class TCPReader extends ProducerStage<Void, IMonitoringRecord> {
private int port1 = 10133;
private int port2 = 10134;
private TCPStringReader tcpStringReader;
// @Override // implement onStop
// public void onPipelineStops() {
// super.logger.info("Shutdown of TCPReader requested.");
......@@ -69,6 +75,13 @@ public class TCPReader extends ProducerStage<Void, IMonitoringRecord> {
this.port2 = port2;
}
@Override
public void onStart() {
this.tcpStringReader = new TCPStringReader(this.port2, this.stringRegistry);
this.tcpStringReader.start();
super.onStart();
}
@Override
protected void execute5(final Void element) {
ServerSocketChannel serversocket = null;
......@@ -120,6 +133,82 @@ public class TCPReader extends ProducerStage<Void, IMonitoringRecord> {
}
}
}
this.setReschedulable(false);
this.tcpStringReader.terminate();
}
}
/**
*
* @author Jan Waller
*
* @since 1.8
*/
private static class TCPStringReader extends Thread {
private static final int MESSAGE_BUFFER_SIZE = 65535;
private static final Log LOG = LogFactory.getLog(TCPStringReader.class);
private final int port;
private final ILookup<String> stringRegistry;
private volatile boolean terminated = false; // NOPMD
private volatile Thread readerThread;
public TCPStringReader(final int port, final ILookup<String> stringRegistry) {
this.port = port;
this.stringRegistry = stringRegistry;
}
public void terminate() {
this.terminated = true;
this.readerThread.interrupt();
}
@Override
public void run() {
this.readerThread = Thread.currentThread();
ServerSocketChannel serversocket = null;
try {
serversocket = ServerSocketChannel.open();
serversocket.socket().bind(new InetSocketAddress(this.port));
if (LOG.isDebugEnabled()) {
LOG.debug("Listening on port " + this.port);
}
// BEGIN also loop this one?
final SocketChannel socketChannel = serversocket.accept();
final ByteBuffer buffer = ByteBuffer.allocateDirect(MESSAGE_BUFFER_SIZE);
while ((socketChannel.read(buffer) != -1) && (!this.terminated)) {
buffer.flip();
try {
while (buffer.hasRemaining()) {
buffer.mark();
RegistryRecord.registerRecordInRegistry(buffer, this.stringRegistry);
}
buffer.clear();
} catch (final BufferUnderflowException ex) {
buffer.reset();
buffer.compact();
}
}
socketChannel.close();
// END also loop this one?
} catch (final ClosedByInterruptException ex) {
LOG.warn("Reader interrupted", ex);
} catch (final IOException ex) {
LOG.error("Error while reading", ex);
} finally {
if (null != serversocket) {
try {
serversocket.close();
} catch (final IOException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Failed to close TCP connection!", e);
}
}
}
}
}
}
}
......@@ -79,7 +79,7 @@ public class ChwHomeTraceReconstructionAnalysisTest {
assertEquals(6886, trace6886.getTraceMetadata().getTraceId());
Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs());
System.out.println("Mean throughput: " + quintiles.get(0.5) + " elements/ms");
System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/ms");
}
@Test
......@@ -106,7 +106,7 @@ public class ChwHomeTraceReconstructionAnalysisTest {
assertEquals(8974347286117089281l, trace1.getTraceMetadata().getTraceId());
Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs());
System.out.println("Mean throughput: " + quintiles.get(0.5) + " elements/ms");
System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/ms");
assertThat(quintiles.get(0.5), is(both(greaterThan(1700l)).and(lessThan(1900l))));
}
......@@ -135,7 +135,7 @@ public class ChwHomeTraceReconstructionAnalysisTest {
assertEquals(1, trace1.getTraceMetadata().getTraceId());
Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs());
System.out.println("Mean throughput: " + quintiles.get(0.5) + " elements/ms");
System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/ms");
}
}
/***************************************************************************
* Copyright 2014 Kieker Project (http://kieker-monitoring.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
***************************************************************************/
package teetime.variant.methodcallWithPorts.examples.traceReconstruction;
import static org.junit.Assert.assertEquals;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import teetime.util.StatisticsUtil;
import teetime.util.StopWatch;
/**
* @author Christian Wulf
*
* @since 1.10
*/
public class ChwWorkTcpTraceReconstructionAnalysisTest {
private StopWatch stopWatch;
@Before
public void before() {
this.stopWatch = new StopWatch();
}
@After
public void after() {
long overallDurationInNs = this.stopWatch.getDurationInNs();
System.out.println("Duration: " + TimeUnit.NANOSECONDS.toMillis(overallDurationInNs) + " ms");
}
@Test
public void performAnalysis() {
final TcpTraceReconstructionAnalysis analysis = new TcpTraceReconstructionAnalysis();
analysis.init();
this.stopWatch.start();
try {
analysis.start();
} finally {
this.stopWatch.end();
analysis.onTerminate();
}
assertEquals(21001, analysis.getNumRecords());
assertEquals(1000, analysis.getNumTraces());
// TraceEventRecords trace6884 = analysis.getElementCollection().get(0);
// assertEquals(6884, trace6884.getTraceMetadata().getTraceId());
//
// TraceEventRecords trace6886 = analysis.getElementCollection().get(1);
// assertEquals(6886, trace6886.getTraceMetadata().getTraceId());
Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getRecordThroughputs());
System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/ms");
}
}
......@@ -79,7 +79,7 @@ public class ChwWorkTraceReconstructionAnalysisTest {
assertEquals(6886, trace6886.getTraceMetadata().getTraceId());
Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs());
System.out.println("Mean throughput: " + quintiles.get(0.5) + " elements/ms");
System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/ms");
}
@Test
......@@ -106,7 +106,7 @@ public class ChwWorkTraceReconstructionAnalysisTest {
assertEquals(8974347286117089281l, trace1.getTraceMetadata().getTraceId());
Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs());
System.out.println("Mean throughput: " + quintiles.get(0.5) + " elements/ms");
System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/ms");
assertThat(quintiles.get(0.5), is(both(greaterThan(1100l)).and(lessThan(1400l))));
}
......@@ -135,7 +135,7 @@ public class ChwWorkTraceReconstructionAnalysisTest {
assertEquals(1, trace1.getTraceMetadata().getTraceId());
Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs());
System.out.println("Mean throughput: " + quintiles.get(0.5) + " elements/ms");
System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/ms");
}
}
package teetime.variant.methodcallWithPorts.examples.traceReconstruction;
import java.util.LinkedList;
import java.util.List;
import teetime.variant.explicitScheduling.framework.core.Analysis;
import teetime.variant.methodcallWithPorts.framework.core.Pipeline;
import teetime.variant.methodcallWithPorts.framework.core.RunnableStage;
import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
import teetime.variant.methodcallWithPorts.stage.Clock;
import teetime.variant.methodcallWithPorts.stage.CountingFilter;
import teetime.variant.methodcallWithPorts.stage.EndStage;
import teetime.variant.methodcallWithPorts.stage.InstanceOfFilter;
import teetime.variant.methodcallWithPorts.stage.ThroughputFilter;
import teetime.variant.methodcallWithPorts.stage.io.TCPReader;
import teetime.variant.methodcallWithPorts.stage.kieker.className.ClassNameRegistryRepository;
import teetime.variant.methodcallWithPorts.stage.kieker.traceReconstruction.TraceReconstructionFilter;
import kieker.analysis.plugin.filter.flow.TraceEventRecords;
import kieker.common.record.IMonitoringRecord;
import kieker.common.record.flow.IFlowRecord;
public class TcpTraceReconstructionAnalysis extends Analysis {
private final List<TraceEventRecords> elementCollection = new LinkedList<TraceEventRecords>();
private Thread clockThread;
private Thread clock2Thread;
private Thread workerThread;
private ClassNameRegistryRepository classNameRegistryRepository;
private CountingFilter<IMonitoringRecord> recordCounter;
private CountingFilter<TraceEventRecords> traceCounter;
private ThroughputFilter<IFlowRecord> recordThroughputFilter;
private ThroughputFilter<TraceEventRecords> traceThroughputFilter;
@Override
public void init() {
super.init();
StageWithPort<Void, Long> clockStage = this.buildClockPipeline();
this.clockThread = new Thread(new RunnableStage(clockStage));
StageWithPort<Void, Long> clock2Stage = this.buildClock2Pipeline();
this.clock2Thread = new Thread(new RunnableStage(clock2Stage));
Pipeline<?, ?> pipeline = this.buildPipeline(clockStage, clock2Stage);
this.workerThread = new Thread(new RunnableStage(pipeline));
}
private StageWithPort<Void, Long> buildClockPipeline() {
Clock clock = new Clock();
clock.setIntervalDelayInMs(1000);
return clock;
}
private StageWithPort<Void, Long> buildClock2Pipeline() {
Clock clock = new Clock();
clock.setIntervalDelayInMs(2000);
return clock;
}
private Pipeline<Void, TraceEventRecords> buildPipeline(final StageWithPort<Void, Long> clockStage, final StageWithPort<Void, Long> clock2Stage) {
this.classNameRegistryRepository = new ClassNameRegistryRepository();
// create stages
TCPReader tcpReader = new TCPReader();
this.recordCounter = new CountingFilter<IMonitoringRecord>();
final InstanceOfFilter<IMonitoringRecord, IFlowRecord> instanceOfFilter = new InstanceOfFilter<IMonitoringRecord, IFlowRecord>(
IFlowRecord.class);
this.recordThroughputFilter = new ThroughputFilter<IFlowRecord>();
final TraceReconstructionFilter traceReconstructionFilter = new TraceReconstructionFilter();
this.traceThroughputFilter = new ThroughputFilter<TraceEventRecords>();
this.traceCounter = new CountingFilter<TraceEventRecords>();
EndStage<TraceEventRecords> endStage = new EndStage<TraceEventRecords>();
// connect stages
SpScPipe.connect(tcpReader.getOutputPort(), this.recordCounter.getInputPort(), 1024);
SingleElementPipe.connect(this.recordCounter.getOutputPort(), instanceOfFilter.getInputPort());
// SingleElementPipe.connect(instanceOfFilter.getOutputPort(), this.recordThroughputFilter.getInputPort());
// SingleElementPipe.connect(this.recordThroughputFilter.getOutputPort(), traceReconstructionFilter.getInputPort());
SingleElementPipe.connect(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort());
SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), this.traceThroughputFilter.getInputPort());
SingleElementPipe.connect(this.traceThroughputFilter.getOutputPort(), this.traceCounter.getInputPort());
SingleElementPipe.connect(this.traceCounter.getOutputPort(), endStage.getInputPort());
SpScPipe.connect(clockStage.getOutputPort(), this.recordThroughputFilter.getTriggerInputPort(), 1);
SpScPipe.connect(clock2Stage.getOutputPort(), this.traceThroughputFilter.getTriggerInputPort(), 1);
// create and configure pipeline
Pipeline<Void, TraceEventRecords> pipeline = new Pipeline<Void, TraceEventRecords>();
pipeline.setFirstStage(tcpReader);
pipeline.addIntermediateStage(this.recordCounter);
pipeline.addIntermediateStage(instanceOfFilter);
// pipeline.addIntermediateStage(this.recordThroughputFilter);
pipeline.addIntermediateStage(traceReconstructionFilter);
pipeline.addIntermediateStage(this.traceThroughputFilter);
pipeline.addIntermediateStage(this.traceCounter);
pipeline.setLastStage(endStage);
return pipeline;
}
@Override
public void start() {
super.start();
this.clockThread.start();
this.clock2Thread.start();
this.workerThread.start();
try {
this.workerThread.join();
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
this.clockThread.interrupt();
this.clock2Thread.interrupt();
}
public List<TraceEventRecords> getElementCollection() {
return this.elementCollection;
}
public int getNumRecords() {
return this.recordCounter.getNumElementsPassed();
}
public int getNumTraces() {
return this.traceCounter.getNumElementsPassed();
}
public List<Long> getRecordThroughputs() {
return this.recordThroughputFilter.getThroughputs();
}
public List<Long> getTraceThroughputFilter() {
return this.traceThroughputFilter.getThroughputs();
}
}
......@@ -31,8 +31,8 @@ public class TraceReconstructionAnalysis extends Analysis {
private final List<TraceEventRecords> elementCollection = new LinkedList<TraceEventRecords>();
private Thread producerThread;
private Thread clockThread;
private Thread workerThread;
private ClassNameRegistryRepository classNameRegistryRepository;
......@@ -50,8 +50,8 @@ public class TraceReconstructionAnalysis extends Analysis {
StageWithPort<Void, Long> clockStage = this.buildClockPipeline();
this.clockThread = new Thread(new RunnableStage(clockStage));
Pipeline<File, Void> producerPipeline = this.buildProducerPipeline(clockStage);
this.producerThread = new Thread(new RunnableStage(producerPipeline));
Pipeline<?, ?> pipeline = this.buildPipeline(clockStage);
this.workerThread = new Thread(new RunnableStage(pipeline));
}
private StageWithPort<Void, Long> buildClockPipeline() {
......@@ -61,7 +61,7 @@ public class TraceReconstructionAnalysis extends Analysis {
return clock;
}
private Pipeline<File, Void> buildProducerPipeline(final StageWithPort<Void, Long> clockStage) {
private Pipeline<File, Void> buildPipeline(final StageWithPort<Void, Long> clockStage) {
this.classNameRegistryRepository = new ClassNameRegistryRepository();
// final IsIMonitoringRecordInRange isIMonitoringRecordInRange = new IsIMonitoringRecordInRange(0, 1000);
......@@ -124,10 +124,10 @@ public class TraceReconstructionAnalysis extends Analysis {
super.start();
this.clockThread.start();
this.producerThread.start();
this.workerThread.start();
try {
this.producerThread.join();
this.workerThread.join();
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
......@@ -151,10 +151,10 @@ public class TraceReconstructionAnalysis extends Analysis {
}
public File getInputDir() {
return inputDir;
return this.inputDir;
}
public void setInputDir(File inputDir) {
public void setInputDir(final File inputDir) {
this.inputDir = inputDir;
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment