From 21c4a432c8895af44df1a7f8141eb986965e20d2 Mon Sep 17 00:00:00 2001 From: Christian Wulf <chw@informatik.uni-kiel.de> Date: Wed, 6 Aug 2014 15:39:41 +0200 Subject: [PATCH] added tcp load driver for kieker records --- conf/logging.properties | 4 +- .../framework/core/Analysis.java | 4 +- .../explorviz/KiekerRecordTcpReader.java | 229 ++++++++++++++++++ .../stage/io/TCPReader.java | 34 +-- ...er-20140617-075121652-UTC-000-Thread-2.dat | 16 ++ src/test/data/load-logs/kieker.map | 6 + .../examples/kiekerdays/KiekerLoadDriver.java | 179 ++++++++++++++ .../examples/kiekerdays/TCPReaderSink.java | 8 +- .../kiekerdays/TcpTraceLoggingExplorviz.java | 61 +++++ .../TcpTraceLoggingExtAnalysis.java | 3 +- .../TcpTraceReconstructionAnalysis.java | 22 +- 11 files changed, 531 insertions(+), 35 deletions(-) create mode 100644 src/main/java/teetime/variant/methodcallWithPorts/stage/explorviz/KiekerRecordTcpReader.java create mode 100644 src/test/data/load-logs/kieker-20140617-075121652-UTC-000-Thread-2.dat create mode 100644 src/test/data/load-logs/kieker.map create mode 100644 src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/KiekerLoadDriver.java create mode 100644 src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceLoggingExplorviz.java diff --git a/conf/logging.properties b/conf/logging.properties index 5f13ed5..0c3fdb9 100644 --- a/conf/logging.properties +++ b/conf/logging.properties @@ -3,12 +3,12 @@ java.util.logging.ConsoleHandler.level = ALL #java.util.logging.ConsoleHandler.formatter = java.util.logging.SimpleFormatter -java.util.logging.SimpleFormatter.format=[%1$tF %1$tr] %4$s: %5$s (%2$s)%n +java.util.logging.SimpleFormatter.format=[%1$tF %1$tr] %4$s: %5$s (%2$s) %6$s %n #teetime.level = ALL #teetime.variant.methodcallWithPorts.framework.level = ALL #teetime.variant.methodcallWithPorts.framework.core.level = FINE -#teetime.variant.methodcallWithPorts.stage.level = INFO +teetime.variant.methodcallWithPorts.stage.level = INFO #teetime.variant.methodcallWithPorts.examples.traceReconstructionWithThreads.level = FINE teetime.variant.methodcallWithPorts.examples.kiekerdays.level = FINE diff --git a/src/main/java/teetime/variant/explicitScheduling/framework/core/Analysis.java b/src/main/java/teetime/variant/explicitScheduling/framework/core/Analysis.java index 42719a2..8b147e4 100644 --- a/src/main/java/teetime/variant/explicitScheduling/framework/core/Analysis.java +++ b/src/main/java/teetime/variant/explicitScheduling/framework/core/Analysis.java @@ -18,7 +18,7 @@ package teetime.variant.explicitScheduling.framework.core; /** * @author Christian Wulf - * + * * @since 1.10 */ public class Analysis { @@ -32,6 +32,6 @@ public class Analysis { } public void onTerminate() { - // System.out.println("Analysis stopped."); + System.out.println("Analysis stopped."); } } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/explorviz/KiekerRecordTcpReader.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/explorviz/KiekerRecordTcpReader.java new file mode 100644 index 0000000..a49946d --- /dev/null +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/explorviz/KiekerRecordTcpReader.java @@ -0,0 +1,229 @@ +package teetime.variant.methodcallWithPorts.stage.explorviz; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.BufferUnderflowException; +import java.nio.ByteBuffer; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.util.ArrayList; +import java.util.List; + +import teetime.variant.methodcallWithPorts.framework.core.ProducerStage; + +import kieker.common.record.IMonitoringRecord; +import kieker.common.record.flow.trace.operation.AfterOperationEvent; +import kieker.common.record.flow.trace.operation.BeforeOperationEvent; +import kieker.common.util.registry.ILookup; +import kieker.common.util.registry.Lookup; + +public class KiekerRecordTcpReader extends ProducerStage<IMonitoringRecord> { + + private static final int MESSAGE_BUFFER_SIZE = 65535; + + private static final byte HostApplicationMetaDataRecord_CLAZZ_ID = 0; + private static final byte BeforeOperationEventRecord_CLAZZ_ID = 1; + private static final byte AfterOperationEventRecord_CLAZZ_ID = 3; + private static final byte StringRegistryRecord_CLAZZ_ID = 4; + + private static final int HostApplicationMetaDataRecord_BYTE_LENGTH = 16; + private static final int BeforeOperationEventRecord_COMPRESSED_BYTE_LENGTH = 36; + private static final int AfterOperationEventRecord_COMPRESSED_BYTE_LENGTH = 20; + + private final ILookup<String> stringRegistry = new Lookup<String>(); + + private final List<byte[]> waitingForStringMessages = new ArrayList<byte[]>(1024); + + private int port1 = 10133; + + public final int getPort1() { + return this.port1; + } + + public final void setPort1(final int port1) { + this.port1 = port1; + } + + @Override + protected void execute() { + ServerSocketChannel serversocket = null; + try { + serversocket = ServerSocketChannel.open(); + serversocket.socket().bind(new InetSocketAddress(this.port1)); + if (super.logger.isDebugEnabled()) { + super.logger.debug("Listening on port " + this.port1); + } + // BEGIN also loop this one? + final SocketChannel socketChannel = serversocket.accept(); + final ByteBuffer buffer = ByteBuffer.allocateDirect(MESSAGE_BUFFER_SIZE); + while (socketChannel.read(buffer) != -1) { + buffer.flip(); + // System.out.println("Reading, remaining:" + buffer.remaining()); + try { + while (buffer.hasRemaining()) { + buffer.mark(); + this.messagesfromByteArray(buffer); + } + buffer.clear(); + } catch (final BufferUnderflowException ex) { + buffer.reset(); + // System.out.println("Underflow, remaining:" + buffer.remaining()); + buffer.compact(); + } + } + // System.out.println("Channel closing..."); + socketChannel.close(); + // END also loop this one? + } catch (final IOException ex) { + super.logger.error("Error while reading", ex); + } finally { + if (null != serversocket) { + try { + serversocket.close(); + } catch (final IOException e) { + if (super.logger.isDebugEnabled()) { + super.logger.debug("Failed to close TCP connection!", e); + } + } + } + + this.setReschedulable(false); + } + } + + private final void messagesfromByteArray(final ByteBuffer buffer) { + final byte clazzId = buffer.get(); + switch (clazzId) { + case HostApplicationMetaDataRecord_CLAZZ_ID: { + if (buffer.remaining() >= HostApplicationMetaDataRecord_BYTE_LENGTH) { + this.readInHostApplicationMetaData(buffer); + break; + } + buffer.position(buffer.position() - 1); + buffer.compact(); + return; + } + case BeforeOperationEventRecord_CLAZZ_ID: { + if (buffer.remaining() >= BeforeOperationEventRecord_COMPRESSED_BYTE_LENGTH) { + this.readInBeforeOperationEvent(buffer); + break; + } + buffer.position(buffer.position() - 1); + buffer.compact(); + return; + } + case AfterOperationEventRecord_CLAZZ_ID: { + if (buffer.remaining() >= AfterOperationEventRecord_COMPRESSED_BYTE_LENGTH) { + this.readInAfterOperationEvent(buffer); + break; + } + buffer.position(buffer.position() - 1); + buffer.compact(); + return; + } + case StringRegistryRecord_CLAZZ_ID: { + int mapId = 0; + int stringLength = 0; + if (buffer.remaining() >= 8) { + mapId = buffer.getInt(); + stringLength = buffer.getInt(); + } else { + buffer.position(buffer.position() - 1); + buffer.compact(); + return; + } + + if (buffer.remaining() >= stringLength) { + final byte[] stringByteArray = new byte[stringLength]; + + buffer.get(stringByteArray); + + this.stringRegistry.set(new String(stringByteArray), mapId); + + this.checkWaitingMessages(); + } else { + buffer.position(buffer.position() - 9); + buffer.compact(); + return; + } + break; + } + default: { + System.out.println("unknown class id " + clazzId + " at offset " + + (buffer.position() - 1)); + return; + } + } + } + + private final void readInHostApplicationMetaData(final ByteBuffer buffer) { + final int systemnameId = buffer.getInt(); + final int ipaddressId = buffer.getInt(); + final int hostnameId = buffer.getInt(); + final int applicationId = buffer.getInt(); + // just consume; not necessary for kieker + } + + private final void readInBeforeOperationEvent(final ByteBuffer buffer) { + final long timestamp = buffer.getLong(); + final long traceId = buffer.getLong(); + final int orderIndex = buffer.getInt(); + final int objectId = buffer.getInt(); + final int operationId = buffer.getInt(); + final int clazzId = buffer.getInt(); + final int interfaceId = buffer.getInt(); + + final String operation = this.stringRegistry.get(operationId); + final String clazz = this.stringRegistry.get(clazzId); + if (operation == null || clazz == null) { + this.putInWaitingMessages(buffer, BeforeOperationEventRecord_COMPRESSED_BYTE_LENGTH + 1); + return; + } + + final IMonitoringRecord record = new BeforeOperationEvent(timestamp, traceId, orderIndex, operation, clazz); + this.send(this.outputPort, record); + } + + private final void readInAfterOperationEvent(final ByteBuffer buffer) { + final long timestamp = buffer.getLong(); + final long traceId = buffer.getLong(); + final int orderIndex = buffer.getInt(); + + final IMonitoringRecord record = new AfterOperationEvent(timestamp, traceId, orderIndex, null, null); + this.send(this.outputPort, record); + } + + private final void putInWaitingMessages(final ByteBuffer buffer, final int length) { + final byte[] message = new byte[length]; + buffer.position(buffer.position() - length); + buffer.get(message); + this.waitingForStringMessages.add(message); + } + + private final void checkWaitingMessages() { + final List<byte[]> localWaitingList = new ArrayList<byte[]>(); + for (final byte[] waitingMessage : this.waitingForStringMessages) { + localWaitingList.add(waitingMessage); + } + this.waitingForStringMessages.clear(); + + for (final byte[] waitingMessage : localWaitingList) { + final ByteBuffer buffer = ByteBuffer.wrap(waitingMessage); + final byte waitingMessageClazzId = buffer.get(); + switch (waitingMessageClazzId) { + case HostApplicationMetaDataRecord_CLAZZ_ID: + this.readInHostApplicationMetaData(buffer); + break; + case BeforeOperationEventRecord_CLAZZ_ID: + this.readInBeforeOperationEvent(buffer); + break; + case AfterOperationEventRecord_CLAZZ_ID: + this.readInAfterOperationEvent(buffer); + break; + default: + break; + } + } + } + +} diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/io/TCPReader.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/io/TCPReader.java index 3229fe7..f648981 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/io/TCPReader.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/io/TCPReader.java @@ -41,9 +41,9 @@ import kieker.common.util.registry.Lookup; /** * This is a reader which reads the records from a TCP port. - * + * * @author Jan Waller, Nils Christian Ehmke - * + * * @since 1.10 */ public class TCPReader extends ProducerStage<IMonitoringRecord> { @@ -140,17 +140,7 @@ public class TCPReader extends ProducerStage<IMonitoringRecord> { try { while (buffer.hasRemaining()) { buffer.mark(); - final int clazzid = buffer.getInt(); - final long loggingTimestamp = buffer.getLong(); - final IMonitoringRecord record; - try { // NOCS (Nested try-catch) - // record = this.recordFactory.create(clazzid, buffer, this.stringRegistry); - record = AbstractMonitoringRecord.createFromByteBuffer(clazzid, buffer, this.stringRegistry); - record.setLoggingTimestamp(loggingTimestamp); - this.send(this.outputPort, record); - } catch (final MonitoringRecordException ex) { - super.logger.error("Failed to create record.", ex); - } + this.createAndSendRecord(buffer); } buffer.clear(); } catch (final BufferUnderflowException ex) { @@ -180,10 +170,24 @@ public class TCPReader extends ProducerStage<IMonitoringRecord> { } } + private void createAndSendRecord(final ByteBuffer buffer) { + final int clazzid = buffer.getInt(); + final long loggingTimestamp = buffer.getLong(); + final IMonitoringRecord record; + try { // NOCS (Nested try-catch) + // record = this.recordFactory.create(clazzid, buffer, this.stringRegistry); + record = AbstractMonitoringRecord.createFromByteBuffer(clazzid, buffer, this.stringRegistry); + record.setLoggingTimestamp(loggingTimestamp); + this.send(this.outputPort, record); + } catch (final MonitoringRecordException ex) { + super.logger.error("Failed to create record.", ex); + } + } + /** - * + * * @author Jan Waller - * + * * @since 1.8 */ private static class TCPStringReader extends Thread { diff --git a/src/test/data/load-logs/kieker-20140617-075121652-UTC-000-Thread-2.dat b/src/test/data/load-logs/kieker-20140617-075121652-UTC-000-Thread-2.dat new file mode 100644 index 0000000..c42a470 --- /dev/null +++ b/src/test/data/load-logs/kieker-20140617-075121652-UTC-000-Thread-2.dat @@ -0,0 +1,16 @@ +$0;1402991481659588369;1.10-SNAPSHOT;KICKER;Osterinsel;1;true;0;NANOSECONDS;1 +$1;1402991481658138502;0;1;<no-session-id>;Osterinsel;0;-1 +$2;1402991481672479079;1402991481672457521;0;0;private static final kieker.common.logging.LogFactory$Logger kieker.common.logging.LogFactory.detectLogger();kieker.common.logging.LogFactory;0 +$3;1402991481678949585;1402991481678929054;0;1;private kieker.common.logging.LogFactory$Logger.<init>(java.lang.String, int);kieker.common.logging.LogFactory$Logger;296604026 +$4;1402991481684201204;1402991481684181015;0;2;private kieker.common.logging.LogFactory$Logger.<init>(java.lang.String, int);kieker.common.logging.LogFactory$Logger;296604026 +$3;1402991481684246373;1402991481684243636;0;3;private kieker.common.logging.LogFactory$Logger.<init>(java.lang.String, int);kieker.common.logging.LogFactory$Logger;118995868 +$4;1402991481684252533;1402991481684250822;0;4;private kieker.common.logging.LogFactory$Logger.<init>(java.lang.String, int);kieker.common.logging.LogFactory$Logger;118995868 +$3;1402991481684267247;1402991481684265536;0;5;private kieker.common.logging.LogFactory$Logger.<init>(java.lang.String, int);kieker.common.logging.LogFactory$Logger;656278119 +$4;1402991481684273064;1402991481684271353;0;6;private kieker.common.logging.LogFactory$Logger.<init>(java.lang.String, int);kieker.common.logging.LogFactory$Logger;656278119 +$3;1402991481684288121;1402991481684286068;0;7;private kieker.common.logging.LogFactory$Logger.<init>(java.lang.String, int);kieker.common.logging.LogFactory$Logger;590335041 +$4;1402991481684293596;1402991481684291885;0;8;private kieker.common.logging.LogFactory$Logger.<init>(java.lang.String, int);kieker.common.logging.LogFactory$Logger;590335041 +$3;1402991481684308310;1402991481684306257;0;9;private kieker.common.logging.LogFactory$Logger.<init>(java.lang.String, int);kieker.common.logging.LogFactory$Logger;386584947 +$4;1402991481684313785;1402991481684312074;0;10;private kieker.common.logging.LogFactory$Logger.<init>(java.lang.String, int);kieker.common.logging.LogFactory$Logger;386584947 +$3;1402991481684328157;1402991481684326446;0;11;private kieker.common.logging.LogFactory$Logger.<init>(java.lang.String, int);kieker.common.logging.LogFactory$Logger;1195172054 +$4;1402991481684333975;1402991481684332264;0;12;private kieker.common.logging.LogFactory$Logger.<init>(java.lang.String, int);kieker.common.logging.LogFactory$Logger;1195172054 +$5;1402991481690701139;1402991481690687109;0;13;private static final kieker.common.logging.LogFactory$Logger kieker.common.logging.LogFactory.detectLogger();kieker.common.logging.LogFactory;0 diff --git a/src/test/data/load-logs/kieker.map b/src/test/data/load-logs/kieker.map new file mode 100644 index 0000000..dcb7f6b --- /dev/null +++ b/src/test/data/load-logs/kieker.map @@ -0,0 +1,6 @@ +$0=kieker.common.record.misc.KiekerMetadataRecord +$1=kieker.common.record.flow.trace.TraceMetadata +$2=kieker.common.record.flow.trace.operation.object.BeforeOperationObjectEvent +$3=kieker.common.record.flow.trace.operation.constructor.object.BeforeConstructorObjectEvent +$4=kieker.common.record.flow.trace.operation.constructor.object.AfterConstructorObjectEvent +$5=kieker.common.record.flow.trace.operation.object.AfterOperationObjectEvent diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/KiekerLoadDriver.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/KiekerLoadDriver.java new file mode 100644 index 0000000..c134308 --- /dev/null +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/KiekerLoadDriver.java @@ -0,0 +1,179 @@ +package teetime.variant.methodcallWithPorts.examples.kiekerdays; + +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; + +import teetime.variant.methodcallWithPorts.framework.core.Pipeline; +import teetime.variant.methodcallWithPorts.framework.core.RunnableStage; +import teetime.variant.methodcallWithPorts.framework.core.StageWithPort; +import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe; +import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe; +import teetime.variant.methodcallWithPorts.stage.CollectorSink; +import teetime.variant.methodcallWithPorts.stage.kieker.Dir2RecordsFilter; +import teetime.variant.methodcallWithPorts.stage.kieker.className.ClassNameRegistryRepository; + +import kieker.common.record.IMonitoringRecord; +import kieker.common.util.registry.IMonitoringRecordReceiver; +import kieker.common.util.registry.Registry; + +public class KiekerLoadDriver { + + private final List<IMonitoringRecord> elementCollection = new LinkedList<IMonitoringRecord>(); + private final RunnableStage runnableStage; + private long[] timings; + + public KiekerLoadDriver(final File directory) { + StageWithPort producerPipeline = this.buildProducerPipeline(directory); + this.runnableStage = new RunnableStage(producerPipeline); + } + + private StageWithPort buildProducerPipeline(final File directory) { + ClassNameRegistryRepository classNameRegistryRepository = new ClassNameRegistryRepository(); + // create stages + Dir2RecordsFilter dir2RecordsFilter = new Dir2RecordsFilter(classNameRegistryRepository); + CollectorSink<IMonitoringRecord> collector = new CollectorSink<IMonitoringRecord>(this.elementCollection); + + final Pipeline<Dir2RecordsFilter, CollectorSink<IMonitoringRecord>> pipeline = new Pipeline<Dir2RecordsFilter, CollectorSink<IMonitoringRecord>>(); + pipeline.setFirstStage(dir2RecordsFilter); + pipeline.setLastStage(collector); + + dir2RecordsFilter.getInputPort().setPipe(new SpScPipe<File>(1)); + SingleElementPipe.connect(dir2RecordsFilter.getOutputPort(), collector.getInputPort()); + + dir2RecordsFilter.getInputPort().getPipe().add(directory); + + return pipeline; + } + + public Collection<IMonitoringRecord> load() { + this.runnableStage.run(); + return this.elementCollection; + } + + private static class RecordReceiver implements IMonitoringRecordReceiver { + + private final Registry<String> stringRegistry; + private final ByteBuffer buffer = ByteBuffer.allocateDirect(Short.MAX_VALUE * 10); + private SocketChannel socketChannel; + + public RecordReceiver(final Registry<String> stringRegistry) throws IOException { + this.stringRegistry = stringRegistry; + } + + @Override + public boolean newMonitoringRecord(final IMonitoringRecord record) { + System.out.println("Registering " + record); + record.writeBytes(this.buffer, this.stringRegistry); + this.buffer.flip(); + try { + int writtenBytes = this.socketChannel.write(this.buffer); + System.out.println("writtenBytes: " + writtenBytes); + this.buffer.clear(); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + return true; + } + + public void connect() throws IOException { + String hostname = "localhost"; + int port = 10134; + System.out.println("Connecting to " + hostname + ":" + port); + + this.socketChannel = SocketChannel.open(); + this.socketChannel.connect(new InetSocketAddress(hostname, port)); + } + + // public void sendRegistryRecords() throws IOException { + // String hostname = "localhost"; + // int port = 10134; + // System.out.println("Connecting to " + hostname + ":" + port); + // + // SocketChannel socketChannel = SocketChannel.open(); + // try { + // socketChannel.connect(new InetSocketAddress(hostname, port)); + // this.buffer.flip(); + // socketChannel.write(this.buffer); + // } finally { + // socketChannel.close(); + // } + // } + + public void close() throws IOException { + this.socketChannel.close(); + } + + } + + public static void main(final String[] args) throws IOException { + final File directory = new File(args[0]); + final File outputFile = new File(args[1]); + final int runs = Integer.parseInt(args[2]); + + KiekerLoadDriver kiekerLoadDriver = new KiekerLoadDriver(directory); + kiekerLoadDriver.timings = new long[runs]; + Collection<IMonitoringRecord> records = kiekerLoadDriver.load(); + + final Registry<String> stringRegistry = new Registry<String>(); + ByteBuffer recordBuffer = ByteBuffer.allocateDirect(Short.MAX_VALUE); + + RecordReceiver recordReceiver = new RecordReceiver(stringRegistry); + stringRegistry.setRecordReceiver(recordReceiver); + try { + recordReceiver.connect(); + + for (IMonitoringRecord record : records) { + int clazzId = stringRegistry.get(record.getClass().getName()); + recordBuffer.putInt(clazzId); + recordBuffer.putLong(record.getLoggingTimestamp()); + record.writeBytes(recordBuffer, stringRegistry); + } + + String hostname = "localhost"; + int port = 10133; + System.out.println("Connecting to " + hostname + ":" + port); + + SocketChannel socketChannel = SocketChannel.open(); + try { + socketChannel.connect(new InetSocketAddress(hostname, port)); + for (int i = 0; i < runs; i++) { + recordBuffer.flip(); + // System.out.println("position: " + recordBuffer.position()); + // System.out.println("limit: " + recordBuffer.limit()); + long start_ns = System.nanoTime(); + int writtenBytes = socketChannel.write(recordBuffer); + long stop_ns = System.nanoTime(); + kiekerLoadDriver.timings[i] = stop_ns - start_ns; + if ((i % 100000) == 0) { + System.out.println(i); // NOPMD (System.out) + } + // System.out.println("writtenBytes (record): " + writtenBytes); + } + } finally { + socketChannel.close(); + } + + } finally { + recordReceiver.close(); + } + + PrintStream ps = new PrintStream(new BufferedOutputStream(new FileOutputStream(outputFile, true), 8192 * 8), false, "UTF-8"); + try { + for (long timing : kiekerLoadDriver.timings) { + ps.println("KiekerLoadDriver;" + timing); + } + } finally { + ps.close(); + } + } +} diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TCPReaderSink.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TCPReaderSink.java index fe7e3ea..7bfec10 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TCPReaderSink.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TCPReaderSink.java @@ -39,9 +39,9 @@ import kieker.common.util.registry.Lookup; /** * This is a reader which reads the records from a TCP port. - * + * * @author Jan Waller, Nils Christian Ehmke - * + * * @since 1.10 */ public class TCPReaderSink extends ProducerStage<IMonitoringRecord> { @@ -156,9 +156,9 @@ public class TCPReaderSink extends ProducerStage<IMonitoringRecord> { } /** - * + * * @author Jan Waller - * + * * @since 1.8 */ private static class TCPStringReader extends Thread { diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceLoggingExplorviz.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceLoggingExplorviz.java new file mode 100644 index 0000000..4e141ed --- /dev/null +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceLoggingExplorviz.java @@ -0,0 +1,61 @@ +package teetime.variant.methodcallWithPorts.examples.kiekerdays; + +import teetime.variant.explicitScheduling.framework.core.Analysis; +import teetime.variant.methodcallWithPorts.framework.core.Pipeline; +import teetime.variant.methodcallWithPorts.framework.core.RunnableStage; +import teetime.variant.methodcallWithPorts.framework.core.StageWithPort; +import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe; +import teetime.variant.methodcallWithPorts.stage.basic.Sink; +import teetime.variant.methodcallWithPorts.stage.explorviz.KiekerRecordTcpReader; + +import kieker.common.record.IMonitoringRecord; + +public class TcpTraceLoggingExplorviz extends Analysis { + + private Thread tcpThread; + + @Override + public void init() { + super.init(); + StageWithPort tcpPipeline = this.buildTcpPipeline(); + this.tcpThread = new Thread(new RunnableStage(tcpPipeline)); + } + + @Override + public void start() { + super.start(); + + this.tcpThread.start(); + + try { + this.tcpThread.join(); + } catch (InterruptedException e) { + throw new IllegalStateException(e); + } + } + + private StageWithPort buildTcpPipeline() { + KiekerRecordTcpReader tcpReader = new KiekerRecordTcpReader(); + Sink<IMonitoringRecord> endStage = new Sink<IMonitoringRecord>(); + + SingleElementPipe.connect(tcpReader.getOutputPort(), endStage.getInputPort()); + + // create and configure pipeline + Pipeline<KiekerRecordTcpReader, Sink<IMonitoringRecord>> pipeline = new Pipeline<KiekerRecordTcpReader, Sink<IMonitoringRecord>>(); + pipeline.setFirstStage(tcpReader); + pipeline.setLastStage(endStage); + return tcpReader; + } + + public static void main(final String[] args) { + final TcpTraceLoggingExplorviz analysis = new TcpTraceLoggingExplorviz(); + + analysis.init(); + try { + analysis.start(); + } finally { + analysis.onTerminate(); + } + } + +} diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReading/TcpTraceLoggingExtAnalysis.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReading/TcpTraceLoggingExtAnalysis.java index 0a228f5..dd178e7 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReading/TcpTraceLoggingExtAnalysis.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReading/TcpTraceLoggingExtAnalysis.java @@ -49,6 +49,7 @@ public class TcpTraceLoggingExtAnalysis extends Analysis { SingleElementPipe.connect(tcpReader.getOutputPort(), this.recordCounter.getInputPort()); SingleElementPipe.connect(this.recordCounter.getOutputPort(), this.recordThroughputStage.getInputPort()); SingleElementPipe.connect(this.recordThroughputStage.getOutputPort(), endStage.getInputPort()); + // SingleElementPipe.connect(this.recordCounter.getOutputPort(), endStage.getInputPort()); SpScPipe.connect(previousClockStage.getNewOutputPort(), this.recordThroughputStage.getTriggerInputPort(), 10); @@ -56,7 +57,7 @@ public class TcpTraceLoggingExtAnalysis extends Analysis { Pipeline<TCPReader, Sink<IMonitoringRecord>> pipeline = new Pipeline<TCPReader, Sink<IMonitoringRecord>>(); pipeline.setFirstStage(tcpReader); pipeline.addIntermediateStage(this.recordCounter); - pipeline.addIntermediateStage(this.recordThroughputStage); + // pipeline.addIntermediateStage(this.recordThroughputStage); pipeline.setLastStage(endStage); return pipeline; } diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java index a26578d..58c3fc7 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java @@ -84,12 +84,12 @@ public class TcpTraceReconstructionAnalysis extends Analysis { // connect stages SpScPipe.connect(tcpReader.getOutputPort(), this.recordCounter.getInputPort(), TCP_RELAY_MAX_SIZE); SingleElementPipe.connect(this.recordCounter.getOutputPort(), instanceOfFilter.getInputPort()); - SingleElementPipe.connect(instanceOfFilter.getOutputPort(), this.recordThroughputFilter.getInputPort()); - SingleElementPipe.connect(this.recordThroughputFilter.getOutputPort(), traceReconstructionFilter.getInputPort()); - // SingleElementPipe.connect(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort()); - // SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), this.traceThroughputFilter.getInputPort()); - // SingleElementPipe.connect(this.traceThroughputFilter.getOutputPort(), this.traceCounter.getInputPort()); - SingleElementPipe.connect(traceReconstructionFilter.getTraceValidOutputPort(), this.traceCounter.getInputPort()); + // SingleElementPipe.connect(instanceOfFilter.getOutputPort(), this.recordThroughputFilter.getInputPort()); + // SingleElementPipe.connect(this.recordThroughputFilter.getOutputPort(), traceReconstructionFilter.getInputPort()); + SingleElementPipe.connect(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort()); + SingleElementPipe.connect(traceReconstructionFilter.getTraceValidOutputPort(), this.traceThroughputFilter.getInputPort()); + SingleElementPipe.connect(this.traceThroughputFilter.getOutputPort(), this.traceCounter.getInputPort()); + // SingleElementPipe.connect(traceReconstructionFilter.getTraceValidOutputPort(), this.traceCounter.getInputPort()); SingleElementPipe.connect(this.traceCounter.getOutputPort(), endStage.getInputPort()); SpScPipe.connect(clockStage.getNewOutputPort(), this.recordThroughputFilter.getTriggerInputPort(), 10); @@ -100,9 +100,9 @@ public class TcpTraceReconstructionAnalysis extends Analysis { pipeline.setFirstStage(tcpReader); pipeline.addIntermediateStage(this.recordCounter); pipeline.addIntermediateStage(instanceOfFilter); - pipeline.addIntermediateStage(this.recordThroughputFilter); + // pipeline.addIntermediateStage(this.recordThroughputFilter); pipeline.addIntermediateStage(traceReconstructionFilter); - // pipeline.addIntermediateStage(this.traceThroughputFilter); + pipeline.addIntermediateStage(this.traceThroughputFilter); pipeline.addIntermediateStage(this.traceCounter); pipeline.setLastStage(endStage); return pipeline; @@ -113,8 +113,8 @@ public class TcpTraceReconstructionAnalysis extends Analysis { super.start(); this.workerThread.start(); - this.clockThread.start(); - // this.clock2Thread.start(); + // this.clockThread.start(); + this.clock2Thread.start(); try { this.workerThread.join(); @@ -122,7 +122,7 @@ public class TcpTraceReconstructionAnalysis extends Analysis { throw new IllegalStateException(e); } this.clockThread.interrupt(); - // this.clock2Thread.interrupt(); + this.clock2Thread.interrupt(); } public List<TraceEventRecords> getElementCollection() { -- GitLab