diff --git a/conf/logging.properties b/conf/logging.properties index 6635420b22b3b527ef699ba56e3e870a82e3ebb2..ff701c0055e0cbafe3e5dbf6c53849ef3b755be5 100644 --- a/conf/logging.properties +++ b/conf/logging.properties @@ -8,6 +8,6 @@ java.util.logging.SimpleFormatter.format=[%1$tF %1$tr] %4$s: %5$s (%2$s)%n #teetime.level = ALL #teetime.variant.methodcallWithPorts.framework.level = ALL -teetime.variant.methodcallWithPorts.framework.core.level = FINE -teetime.variant.methodcallWithPorts.stage.level = INFO +#teetime.variant.methodcallWithPorts.framework.core.level = FINE +#teetime.variant.methodcallWithPorts.stage.level = INFO #teetime.variant.methodcallWithPorts.examples.traceReconstructionWithThreads.level = FINE diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReconstruction.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReconstruction.java index 5146d97d5053c89c48b852fd94704f1333b93ad3..50688b496938faba3dd798c381fb4c0b9e6ae07e 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReconstruction.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReconstruction.java @@ -1,5 +1,6 @@ package teetime.variant.methodcallWithPorts.examples.kiekerdays; +import java.util.ArrayList; import java.util.LinkedList; import java.util.List; @@ -25,9 +26,11 @@ import kieker.common.record.flow.IFlowRecord; public class TcpTraceReconstruction extends Analysis { private static final int NUM_VIRTUAL_CORES = Runtime.getRuntime().availableProcessors(); - private static final int TCP_RELAY_MAX_SIZE = 500000; + private static final int TCP_RELAY_MAX_SIZE = 100000; private final List<TraceEventRecords> elementCollection = new LinkedList<TraceEventRecords>(); + private final ConcurrentHashMapWithDefault<Long, TraceBuffer> traceId2trace = new ConcurrentHashMapWithDefault<Long, TraceBuffer>(new TraceBuffer()); + private final List<SpScPipe<IMonitoringRecord>> tcpRelayPipes = new ArrayList<SpScPipe<IMonitoringRecord>>(); private Thread tcpThread; private Thread[] workerThreads; @@ -62,8 +65,6 @@ public class TcpTraceReconstruction extends Analysis { return pipeline; } - private final ConcurrentHashMapWithDefault<Long, TraceBuffer> traceId2trace = new ConcurrentHashMapWithDefault<Long, TraceBuffer>(new TraceBuffer()); - private Pipeline<IMonitoringRecord, ?> buildPipeline(final StageWithPort<Void, IMonitoringRecord> tcpReaderPipeline) { // create stages Relay<IMonitoringRecord> relay = new Relay<IMonitoringRecord>(); @@ -73,7 +74,8 @@ public class TcpTraceReconstruction extends Analysis { EndStage<TraceEventRecords> endStage = new EndStage<TraceEventRecords>(); // connect stages - SpScPipe.connect(tcpReaderPipeline.getOutputPort(), relay.getInputPort(), TCP_RELAY_MAX_SIZE); + SpScPipe<IMonitoringRecord> tcpRelayPipe = SpScPipe.connect(tcpReaderPipeline.getOutputPort(), relay.getInputPort(), TCP_RELAY_MAX_SIZE); + this.tcpRelayPipes.add(tcpRelayPipe); SingleElementPipe.connect(relay.getOutputPort(), instanceOfFilter.getInputPort()); SingleElementPipe.connect(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort()); @@ -109,6 +111,16 @@ public class TcpTraceReconstruction extends Analysis { } } + @Override + public void onTerminate() { + int maxSize = 0; + for (SpScPipe<IMonitoringRecord> pipe : this.tcpRelayPipes) { + maxSize = Math.max(maxSize, pipe.getMaxSize()); + } + System.out.println("max size of TcpRelayPipes: " + maxSize); + super.onTerminate(); + } + public List<TraceEventRecords> getElementCollection() { return this.elementCollection; } diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReduction.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReduction.java index ca939ce17bf07af9905baba25804c0004f324182..fb5eb3c2da1a07f61710cae0a5958b9fd529429c 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReduction.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReduction.java @@ -1,5 +1,6 @@ package teetime.variant.methodcallWithPorts.examples.kiekerdays; +import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -31,9 +32,12 @@ import kieker.common.record.flow.IFlowRecord; public class TcpTraceReduction extends Analysis { private static final int NUM_VIRTUAL_CORES = Runtime.getRuntime().availableProcessors(); - private static final int TCP_RELAY_MAX_SIZE = 500000; + private static final int TCP_RELAY_MAX_SIZE = 100000; private final List<TraceEventRecords> elementCollection = new LinkedList<TraceEventRecords>(); + private final ConcurrentHashMapWithDefault<Long, TraceBuffer> traceId2trace = new ConcurrentHashMapWithDefault<Long, TraceBuffer>(new TraceBuffer()); + private final Map<TraceEventRecords, TraceAggregationBuffer> trace2buffer = new TreeMap<TraceEventRecords, TraceAggregationBuffer>(new TraceComperator()); + private final List<SpScPipe<IMonitoringRecord>> tcpRelayPipes = new ArrayList<SpScPipe<IMonitoringRecord>>(); private Thread tcpThread; private Thread clockThread; @@ -87,9 +91,6 @@ public class TcpTraceReduction extends Analysis { return pipeline; } - private final ConcurrentHashMapWithDefault<Long, TraceBuffer> traceId2trace = new ConcurrentHashMapWithDefault<Long, TraceBuffer>(new TraceBuffer()); - private final Map<TraceEventRecords, TraceAggregationBuffer> trace2buffer = new TreeMap<TraceEventRecords, TraceAggregationBuffer>(new TraceComperator()); - private Pipeline<IMonitoringRecord, ?> buildPipeline(final StageWithPort<Void, IMonitoringRecord> tcpReaderPipeline, final StageWithPort<Void, Long> clockStage) { // create stages @@ -101,7 +102,8 @@ public class TcpTraceReduction extends Analysis { EndStage<TraceEventRecords> endStage = new EndStage<TraceEventRecords>(); // connect stages - SpScPipe.connect(tcpReaderPipeline.getOutputPort(), relay.getInputPort(), TCP_RELAY_MAX_SIZE); + SpScPipe<IMonitoringRecord> tcpRelayPipe = SpScPipe.connect(tcpReaderPipeline.getOutputPort(), relay.getInputPort(), TCP_RELAY_MAX_SIZE); + this.tcpRelayPipes.add(tcpRelayPipe); SingleElementPipe.connect(relay.getOutputPort(), instanceOfFilter.getInputPort()); SingleElementPipe.connect(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort()); @@ -143,6 +145,16 @@ public class TcpTraceReduction extends Analysis { this.clockThread.interrupt(); } + @Override + public void onTerminate() { + int maxSize = 0; + for (SpScPipe<IMonitoringRecord> pipe : this.tcpRelayPipes) { + maxSize = Math.max(maxSize, pipe.getMaxSize()); + } + System.out.println("max size of TcpRelayPipes: " + maxSize); + super.onTerminate(); + } + public List<TraceEventRecords> getElementCollection() { return this.elementCollection; }