diff --git a/conf/logging.properties b/conf/logging.properties index 5f13ed5cdb89ca93fe341c97b9b64967cb7ca147..0c3fdb94be28e2c8d6214114b74e0a90df8abc6f 100644 --- a/conf/logging.properties +++ b/conf/logging.properties @@ -3,12 +3,12 @@ java.util.logging.ConsoleHandler.level = ALL #java.util.logging.ConsoleHandler.formatter = java.util.logging.SimpleFormatter -java.util.logging.SimpleFormatter.format=[%1$tF %1$tr] %4$s: %5$s (%2$s)%n +java.util.logging.SimpleFormatter.format=[%1$tF %1$tr] %4$s: %5$s (%2$s) %6$s %n #teetime.level = ALL #teetime.variant.methodcallWithPorts.framework.level = ALL #teetime.variant.methodcallWithPorts.framework.core.level = FINE -#teetime.variant.methodcallWithPorts.stage.level = INFO +teetime.variant.methodcallWithPorts.stage.level = INFO #teetime.variant.methodcallWithPorts.examples.traceReconstructionWithThreads.level = FINE teetime.variant.methodcallWithPorts.examples.kiekerdays.level = FINE diff --git a/src/main/java/teetime/variant/explicitScheduling/framework/core/Analysis.java b/src/main/java/teetime/variant/explicitScheduling/framework/core/Analysis.java index 42719a29fbdc42cc8b7682434e4288989368b6e0..8b147e41c6c57c77569cfe9125a5885fc033424a 100644 --- a/src/main/java/teetime/variant/explicitScheduling/framework/core/Analysis.java +++ b/src/main/java/teetime/variant/explicitScheduling/framework/core/Analysis.java @@ -18,7 +18,7 @@ package teetime.variant.explicitScheduling.framework.core; /** * @author Christian Wulf - * + * * @since 1.10 */ public class Analysis { @@ -32,6 +32,6 @@ public class Analysis { } public void onTerminate() { - // System.out.println("Analysis stopped."); + System.out.println("Analysis stopped."); } } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/explorviz/KiekerRecordTcpReader.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/explorviz/KiekerRecordTcpReader.java new file mode 100644 index 0000000000000000000000000000000000000000..a49946d3938e3ab04a99da48dad76c3833990e29 --- /dev/null +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/explorviz/KiekerRecordTcpReader.java @@ -0,0 +1,229 @@ +package teetime.variant.methodcallWithPorts.stage.explorviz; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.BufferUnderflowException; +import java.nio.ByteBuffer; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.util.ArrayList; +import java.util.List; + +import teetime.variant.methodcallWithPorts.framework.core.ProducerStage; + +import kieker.common.record.IMonitoringRecord; +import kieker.common.record.flow.trace.operation.AfterOperationEvent; +import kieker.common.record.flow.trace.operation.BeforeOperationEvent; +import kieker.common.util.registry.ILookup; +import kieker.common.util.registry.Lookup; + +public class KiekerRecordTcpReader extends ProducerStage<IMonitoringRecord> { + + private static final int MESSAGE_BUFFER_SIZE = 65535; + + private static final byte HostApplicationMetaDataRecord_CLAZZ_ID = 0; + private static final byte BeforeOperationEventRecord_CLAZZ_ID = 1; + private static final byte AfterOperationEventRecord_CLAZZ_ID = 3; + private static final byte StringRegistryRecord_CLAZZ_ID = 4; + + private static final int HostApplicationMetaDataRecord_BYTE_LENGTH = 16; + private static final int BeforeOperationEventRecord_COMPRESSED_BYTE_LENGTH = 36; + private static final int AfterOperationEventRecord_COMPRESSED_BYTE_LENGTH = 20; + + private final ILookup<String> stringRegistry = new Lookup<String>(); + + private final List<byte[]> waitingForStringMessages = new ArrayList<byte[]>(1024); + + private int port1 = 10133; + + public final int getPort1() { + return this.port1; + } + + public final void setPort1(final int port1) { + this.port1 = port1; + } + + @Override + protected void execute() { + ServerSocketChannel serversocket = null; + try { + serversocket = ServerSocketChannel.open(); + serversocket.socket().bind(new InetSocketAddress(this.port1)); + if (super.logger.isDebugEnabled()) { + super.logger.debug("Listening on port " + this.port1); + } + // BEGIN also loop this one? + final SocketChannel socketChannel = serversocket.accept(); + final ByteBuffer buffer = ByteBuffer.allocateDirect(MESSAGE_BUFFER_SIZE); + while (socketChannel.read(buffer) != -1) { + buffer.flip(); + // System.out.println("Reading, remaining:" + buffer.remaining()); + try { + while (buffer.hasRemaining()) { + buffer.mark(); + this.messagesfromByteArray(buffer); + } + buffer.clear(); + } catch (final BufferUnderflowException ex) { + buffer.reset(); + // System.out.println("Underflow, remaining:" + buffer.remaining()); + buffer.compact(); + } + } + // System.out.println("Channel closing..."); + socketChannel.close(); + // END also loop this one? + } catch (final IOException ex) { + super.logger.error("Error while reading", ex); + } finally { + if (null != serversocket) { + try { + serversocket.close(); + } catch (final IOException e) { + if (super.logger.isDebugEnabled()) { + super.logger.debug("Failed to close TCP connection!", e); + } + } + } + + this.setReschedulable(false); + } + } + + private final void messagesfromByteArray(final ByteBuffer buffer) { + final byte clazzId = buffer.get(); + switch (clazzId) { + case HostApplicationMetaDataRecord_CLAZZ_ID: { + if (buffer.remaining() >= HostApplicationMetaDataRecord_BYTE_LENGTH) { + this.readInHostApplicationMetaData(buffer); + break; + } + buffer.position(buffer.position() - 1); + buffer.compact(); + return; + } + case BeforeOperationEventRecord_CLAZZ_ID: { + if (buffer.remaining() >= BeforeOperationEventRecord_COMPRESSED_BYTE_LENGTH) { + this.readInBeforeOperationEvent(buffer); + break; + } + buffer.position(buffer.position() - 1); + buffer.compact(); + return; + } + case AfterOperationEventRecord_CLAZZ_ID: { + if (buffer.remaining() >= AfterOperationEventRecord_COMPRESSED_BYTE_LENGTH) { + this.readInAfterOperationEvent(buffer); + break; + } + buffer.position(buffer.position() - 1); + buffer.compact(); + return; + } + case StringRegistryRecord_CLAZZ_ID: { + int mapId = 0; + int stringLength = 0; + if (buffer.remaining() >= 8) { + mapId = buffer.getInt(); + stringLength = buffer.getInt(); + } else { + buffer.position(buffer.position() - 1); + buffer.compact(); + return; + } + + if (buffer.remaining() >= stringLength) { + final byte[] stringByteArray = new byte[stringLength]; + + buffer.get(stringByteArray); + + this.stringRegistry.set(new String(stringByteArray), mapId); + + this.checkWaitingMessages(); + } else { + buffer.position(buffer.position() - 9); + buffer.compact(); + return; + } + break; + } + default: { + System.out.println("unknown class id " + clazzId + " at offset " + + (buffer.position() - 1)); + return; + } + } + } + + private final void readInHostApplicationMetaData(final ByteBuffer buffer) { + final int systemnameId = buffer.getInt(); + final int ipaddressId = buffer.getInt(); + final int hostnameId = buffer.getInt(); + final int applicationId = buffer.getInt(); + // just consume; not necessary for kieker + } + + private final void readInBeforeOperationEvent(final ByteBuffer buffer) { + final long timestamp = buffer.getLong(); + final long traceId = buffer.getLong(); + final int orderIndex = buffer.getInt(); + final int objectId = buffer.getInt(); + final int operationId = buffer.getInt(); + final int clazzId = buffer.getInt(); + final int interfaceId = buffer.getInt(); + + final String operation = this.stringRegistry.get(operationId); + final String clazz = this.stringRegistry.get(clazzId); + if (operation == null || clazz == null) { + this.putInWaitingMessages(buffer, BeforeOperationEventRecord_COMPRESSED_BYTE_LENGTH + 1); + return; + } + + final IMonitoringRecord record = new BeforeOperationEvent(timestamp, traceId, orderIndex, operation, clazz); + this.send(this.outputPort, record); + } + + private final void readInAfterOperationEvent(final ByteBuffer buffer) { + final long timestamp = buffer.getLong(); + final long traceId = buffer.getLong(); + final int orderIndex = buffer.getInt(); + + final IMonitoringRecord record = new AfterOperationEvent(timestamp, traceId, orderIndex, null, null); + this.send(this.outputPort, record); + } + + private final void putInWaitingMessages(final ByteBuffer buffer, final int length) { + final byte[] message = new byte[length]; + buffer.position(buffer.position() - length); + buffer.get(message); + this.waitingForStringMessages.add(message); + } + + private final void checkWaitingMessages() { + final List<byte[]> localWaitingList = new ArrayList<byte[]>(); + for (final byte[] waitingMessage : this.waitingForStringMessages) { + localWaitingList.add(waitingMessage); + } + this.waitingForStringMessages.clear(); + + for (final byte[] waitingMessage : localWaitingList) { + final ByteBuffer buffer = ByteBuffer.wrap(waitingMessage); + final byte waitingMessageClazzId = buffer.get(); + switch (waitingMessageClazzId) { + case HostApplicationMetaDataRecord_CLAZZ_ID: + this.readInHostApplicationMetaData(buffer); + break; + case BeforeOperationEventRecord_CLAZZ_ID: + this.readInBeforeOperationEvent(buffer); + break; + case AfterOperationEventRecord_CLAZZ_ID: + this.readInAfterOperationEvent(buffer); + break; + default: + break; + } + } + } + +} diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/io/TCPReader.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/io/TCPReader.java index 3229fe7158bf717e2aca1dc5d45fba81a1128f37..f6489819a7d2609e9713b78aa823300a4b2dab4e 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/io/TCPReader.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/io/TCPReader.java @@ -41,9 +41,9 @@ import kieker.common.util.registry.Lookup; /** * This is a reader which reads the records from a TCP port. - * + * * @author Jan Waller, Nils Christian Ehmke - * + * * @since 1.10 */ public class TCPReader extends ProducerStage<IMonitoringRecord> { @@ -140,17 +140,7 @@ public class TCPReader extends ProducerStage<IMonitoringRecord> { try { while (buffer.hasRemaining()) { buffer.mark(); - final int clazzid = buffer.getInt(); - final long loggingTimestamp = buffer.getLong(); - final IMonitoringRecord record; - try { // NOCS (Nested try-catch) - // record = this.recordFactory.create(clazzid, buffer, this.stringRegistry); - record = AbstractMonitoringRecord.createFromByteBuffer(clazzid, buffer, this.stringRegistry); - record.setLoggingTimestamp(loggingTimestamp); - this.send(this.outputPort, record); - } catch (final MonitoringRecordException ex) { - super.logger.error("Failed to create record.", ex); - } + this.createAndSendRecord(buffer); } buffer.clear(); } catch (final BufferUnderflowException ex) { @@ -180,10 +170,24 @@ public class TCPReader extends ProducerStage<IMonitoringRecord> { } } + private void createAndSendRecord(final ByteBuffer buffer) { + final int clazzid = buffer.getInt(); + final long loggingTimestamp = buffer.getLong(); + final IMonitoringRecord record; + try { // NOCS (Nested try-catch) + // record = this.recordFactory.create(clazzid, buffer, this.stringRegistry); + record = AbstractMonitoringRecord.createFromByteBuffer(clazzid, buffer, this.stringRegistry); + record.setLoggingTimestamp(loggingTimestamp); + this.send(this.outputPort, record); + } catch (final MonitoringRecordException ex) { + super.logger.error("Failed to create record.", ex); + } + } + /** - * + * * @author Jan Waller - * + * * @since 1.8 */ private static class TCPStringReader extends Thread { diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/KiekerLoadDriver.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/KiekerLoadDriver.java new file mode 100644 index 0000000000000000000000000000000000000000..c134308ffa1690c30e3be8cfd5a34f7e23c55244 --- /dev/null +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/KiekerLoadDriver.java @@ -0,0 +1,179 @@ +package teetime.variant.methodcallWithPorts.examples.kiekerdays; + +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; + +import teetime.variant.methodcallWithPorts.framework.core.Pipeline; +import teetime.variant.methodcallWithPorts.framework.core.RunnableStage; +import teetime.variant.methodcallWithPorts.framework.core.StageWithPort; +import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe; +import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe; +import teetime.variant.methodcallWithPorts.stage.CollectorSink; +import teetime.variant.methodcallWithPorts.stage.kieker.Dir2RecordsFilter; +import teetime.variant.methodcallWithPorts.stage.kieker.className.ClassNameRegistryRepository; + +import kieker.common.record.IMonitoringRecord; +import kieker.common.util.registry.IMonitoringRecordReceiver; +import kieker.common.util.registry.Registry; + +public class KiekerLoadDriver { + + private final List<IMonitoringRecord> elementCollection = new LinkedList<IMonitoringRecord>(); + private final RunnableStage runnableStage; + private long[] timings; + + public KiekerLoadDriver(final File directory) { + StageWithPort producerPipeline = this.buildProducerPipeline(directory); + this.runnableStage = new RunnableStage(producerPipeline); + } + + private StageWithPort buildProducerPipeline(final File directory) { + ClassNameRegistryRepository classNameRegistryRepository = new ClassNameRegistryRepository(); + // create stages + Dir2RecordsFilter dir2RecordsFilter = new Dir2RecordsFilter(classNameRegistryRepository); + CollectorSink<IMonitoringRecord> collector = new CollectorSink<IMonitoringRecord>(this.elementCollection); + + final Pipeline<Dir2RecordsFilter, CollectorSink<IMonitoringRecord>> pipeline = new Pipeline<Dir2RecordsFilter, CollectorSink<IMonitoringRecord>>(); + pipeline.setFirstStage(dir2RecordsFilter); + pipeline.setLastStage(collector); + + dir2RecordsFilter.getInputPort().setPipe(new SpScPipe<File>(1)); + SingleElementPipe.connect(dir2RecordsFilter.getOutputPort(), collector.getInputPort()); + + dir2RecordsFilter.getInputPort().getPipe().add(directory); + + return pipeline; + } + + public Collection<IMonitoringRecord> load() { + this.runnableStage.run(); + return this.elementCollection; + } + + private static class RecordReceiver implements IMonitoringRecordReceiver { + + private final Registry<String> stringRegistry; + private final ByteBuffer buffer = ByteBuffer.allocateDirect(Short.MAX_VALUE * 10); + private SocketChannel socketChannel; + + public RecordReceiver(final Registry<String> stringRegistry) throws IOException { + this.stringRegistry = stringRegistry; + } + + @Override + public boolean newMonitoringRecord(final IMonitoringRecord record) { + System.out.println("Registering " + record); + record.writeBytes(this.buffer, this.stringRegistry); + this.buffer.flip(); + try { + int writtenBytes = this.socketChannel.write(this.buffer); + System.out.println("writtenBytes: " + writtenBytes); + this.buffer.clear(); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + return true; + } + + public void connect() throws IOException { + String hostname = "localhost"; + int port = 10134; + System.out.println("Connecting to " + hostname + ":" + port); + + this.socketChannel = SocketChannel.open(); + this.socketChannel.connect(new InetSocketAddress(hostname, port)); + } + + // public void sendRegistryRecords() throws IOException { + // String hostname = "localhost"; + // int port = 10134; + // System.out.println("Connecting to " + hostname + ":" + port); + // + // SocketChannel socketChannel = SocketChannel.open(); + // try { + // socketChannel.connect(new InetSocketAddress(hostname, port)); + // this.buffer.flip(); + // socketChannel.write(this.buffer); + // } finally { + // socketChannel.close(); + // } + // } + + public void close() throws IOException { + this.socketChannel.close(); + } + + } + + public static void main(final String[] args) throws IOException { + final File directory = new File(args[0]); + final File outputFile = new File(args[1]); + final int runs = Integer.parseInt(args[2]); + + KiekerLoadDriver kiekerLoadDriver = new KiekerLoadDriver(directory); + kiekerLoadDriver.timings = new long[runs]; + Collection<IMonitoringRecord> records = kiekerLoadDriver.load(); + + final Registry<String> stringRegistry = new Registry<String>(); + ByteBuffer recordBuffer = ByteBuffer.allocateDirect(Short.MAX_VALUE); + + RecordReceiver recordReceiver = new RecordReceiver(stringRegistry); + stringRegistry.setRecordReceiver(recordReceiver); + try { + recordReceiver.connect(); + + for (IMonitoringRecord record : records) { + int clazzId = stringRegistry.get(record.getClass().getName()); + recordBuffer.putInt(clazzId); + recordBuffer.putLong(record.getLoggingTimestamp()); + record.writeBytes(recordBuffer, stringRegistry); + } + + String hostname = "localhost"; + int port = 10133; + System.out.println("Connecting to " + hostname + ":" + port); + + SocketChannel socketChannel = SocketChannel.open(); + try { + socketChannel.connect(new InetSocketAddress(hostname, port)); + for (int i = 0; i < runs; i++) { + recordBuffer.flip(); + // System.out.println("position: " + recordBuffer.position()); + // System.out.println("limit: " + recordBuffer.limit()); + long start_ns = System.nanoTime(); + int writtenBytes = socketChannel.write(recordBuffer); + long stop_ns = System.nanoTime(); + kiekerLoadDriver.timings[i] = stop_ns - start_ns; + if ((i % 100000) == 0) { + System.out.println(i); // NOPMD (System.out) + } + // System.out.println("writtenBytes (record): " + writtenBytes); + } + } finally { + socketChannel.close(); + } + + } finally { + recordReceiver.close(); + } + + PrintStream ps = new PrintStream(new BufferedOutputStream(new FileOutputStream(outputFile, true), 8192 * 8), false, "UTF-8"); + try { + for (long timing : kiekerLoadDriver.timings) { + ps.println("KiekerLoadDriver;" + timing); + } + } finally { + ps.close(); + } + } +} diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TCPReaderSink.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TCPReaderSink.java index fe7e3ead6b1ad5490608883e46ae017d0de010d6..7bfec10e16795ddc53492e555ea07cd9ecb2be0d 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TCPReaderSink.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TCPReaderSink.java @@ -39,9 +39,9 @@ import kieker.common.util.registry.Lookup; /** * This is a reader which reads the records from a TCP port. - * + * * @author Jan Waller, Nils Christian Ehmke - * + * * @since 1.10 */ public class TCPReaderSink extends ProducerStage<IMonitoringRecord> { @@ -156,9 +156,9 @@ public class TCPReaderSink extends ProducerStage<IMonitoringRecord> { } /** - * + * * @author Jan Waller - * + * * @since 1.8 */ private static class TCPStringReader extends Thread { diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceLoggingExplorviz.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceLoggingExplorviz.java new file mode 100644 index 0000000000000000000000000000000000000000..4e141ed0dd982eef61179f39dcf432c034fc3632 --- /dev/null +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceLoggingExplorviz.java @@ -0,0 +1,61 @@ +package teetime.variant.methodcallWithPorts.examples.kiekerdays; + +import teetime.variant.explicitScheduling.framework.core.Analysis; +import teetime.variant.methodcallWithPorts.framework.core.Pipeline; +import teetime.variant.methodcallWithPorts.framework.core.RunnableStage; +import teetime.variant.methodcallWithPorts.framework.core.StageWithPort; +import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe; +import teetime.variant.methodcallWithPorts.stage.basic.Sink; +import teetime.variant.methodcallWithPorts.stage.explorviz.KiekerRecordTcpReader; + +import kieker.common.record.IMonitoringRecord; + +public class TcpTraceLoggingExplorviz extends Analysis { + + private Thread tcpThread; + + @Override + public void init() { + super.init(); + StageWithPort tcpPipeline = this.buildTcpPipeline(); + this.tcpThread = new Thread(new RunnableStage(tcpPipeline)); + } + + @Override + public void start() { + super.start(); + + this.tcpThread.start(); + + try { + this.tcpThread.join(); + } catch (InterruptedException e) { + throw new IllegalStateException(e); + } + } + + private StageWithPort buildTcpPipeline() { + KiekerRecordTcpReader tcpReader = new KiekerRecordTcpReader(); + Sink<IMonitoringRecord> endStage = new Sink<IMonitoringRecord>(); + + SingleElementPipe.connect(tcpReader.getOutputPort(), endStage.getInputPort()); + + // create and configure pipeline + Pipeline<KiekerRecordTcpReader, Sink<IMonitoringRecord>> pipeline = new Pipeline<KiekerRecordTcpReader, Sink<IMonitoringRecord>>(); + pipeline.setFirstStage(tcpReader); + pipeline.setLastStage(endStage); + return tcpReader; + } + + public static void main(final String[] args) { + final TcpTraceLoggingExplorviz analysis = new TcpTraceLoggingExplorviz(); + + analysis.init(); + try { + analysis.start(); + } finally { + analysis.onTerminate(); + } + } + +} diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReading/TcpTraceLoggingExtAnalysis.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReading/TcpTraceLoggingExtAnalysis.java index 0a228f586cd9e5774a1141eadabd55f4ea229949..dd178e770462daf6b0ce336ab142f005b67b65e6 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReading/TcpTraceLoggingExtAnalysis.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReading/TcpTraceLoggingExtAnalysis.java @@ -49,6 +49,7 @@ public class TcpTraceLoggingExtAnalysis extends Analysis { SingleElementPipe.connect(tcpReader.getOutputPort(), this.recordCounter.getInputPort()); SingleElementPipe.connect(this.recordCounter.getOutputPort(), this.recordThroughputStage.getInputPort()); SingleElementPipe.connect(this.recordThroughputStage.getOutputPort(), endStage.getInputPort()); + // SingleElementPipe.connect(this.recordCounter.getOutputPort(), endStage.getInputPort()); SpScPipe.connect(previousClockStage.getNewOutputPort(), this.recordThroughputStage.getTriggerInputPort(), 10); @@ -56,7 +57,7 @@ public class TcpTraceLoggingExtAnalysis extends Analysis { Pipeline<TCPReader, Sink<IMonitoringRecord>> pipeline = new Pipeline<TCPReader, Sink<IMonitoringRecord>>(); pipeline.setFirstStage(tcpReader); pipeline.addIntermediateStage(this.recordCounter); - pipeline.addIntermediateStage(this.recordThroughputStage); + // pipeline.addIntermediateStage(this.recordThroughputStage); pipeline.setLastStage(endStage); return pipeline; } diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java index a26578dea46c53fae8c302554b8c0239d72c9092..58c3fc74041d58281dccc9ea2ed29013b2362ab6 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java @@ -84,12 +84,12 @@ public class TcpTraceReconstructionAnalysis extends Analysis { // connect stages SpScPipe.connect(tcpReader.getOutputPort(), this.recordCounter.getInputPort(), TCP_RELAY_MAX_SIZE); SingleElementPipe.connect(this.recordCounter.getOutputPort(), instanceOfFilter.getInputPort()); - SingleElementPipe.connect(instanceOfFilter.getOutputPort(), this.recordThroughputFilter.getInputPort()); - SingleElementPipe.connect(this.recordThroughputFilter.getOutputPort(), traceReconstructionFilter.getInputPort()); - // SingleElementPipe.connect(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort()); - // SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), this.traceThroughputFilter.getInputPort()); - // SingleElementPipe.connect(this.traceThroughputFilter.getOutputPort(), this.traceCounter.getInputPort()); - SingleElementPipe.connect(traceReconstructionFilter.getTraceValidOutputPort(), this.traceCounter.getInputPort()); + // SingleElementPipe.connect(instanceOfFilter.getOutputPort(), this.recordThroughputFilter.getInputPort()); + // SingleElementPipe.connect(this.recordThroughputFilter.getOutputPort(), traceReconstructionFilter.getInputPort()); + SingleElementPipe.connect(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort()); + SingleElementPipe.connect(traceReconstructionFilter.getTraceValidOutputPort(), this.traceThroughputFilter.getInputPort()); + SingleElementPipe.connect(this.traceThroughputFilter.getOutputPort(), this.traceCounter.getInputPort()); + // SingleElementPipe.connect(traceReconstructionFilter.getTraceValidOutputPort(), this.traceCounter.getInputPort()); SingleElementPipe.connect(this.traceCounter.getOutputPort(), endStage.getInputPort()); SpScPipe.connect(clockStage.getNewOutputPort(), this.recordThroughputFilter.getTriggerInputPort(), 10); @@ -100,9 +100,9 @@ public class TcpTraceReconstructionAnalysis extends Analysis { pipeline.setFirstStage(tcpReader); pipeline.addIntermediateStage(this.recordCounter); pipeline.addIntermediateStage(instanceOfFilter); - pipeline.addIntermediateStage(this.recordThroughputFilter); + // pipeline.addIntermediateStage(this.recordThroughputFilter); pipeline.addIntermediateStage(traceReconstructionFilter); - // pipeline.addIntermediateStage(this.traceThroughputFilter); + pipeline.addIntermediateStage(this.traceThroughputFilter); pipeline.addIntermediateStage(this.traceCounter); pipeline.setLastStage(endStage); return pipeline; @@ -113,8 +113,8 @@ public class TcpTraceReconstructionAnalysis extends Analysis { super.start(); this.workerThread.start(); - this.clockThread.start(); - // this.clock2Thread.start(); + // this.clockThread.start(); + this.clock2Thread.start(); try { this.workerThread.join(); @@ -122,7 +122,7 @@ public class TcpTraceReconstructionAnalysis extends Analysis { throw new IllegalStateException(e); } this.clockThread.interrupt(); - // this.clock2Thread.interrupt(); + this.clock2Thread.interrupt(); } public List<TraceEventRecords> getElementCollection() {