Skip to content
Snippets Groups Projects
Commit a5ebbd2a authored by Christian Wulf's avatar Christian Wulf
Browse files

added sysout maxSize for SpSc pipe in KiekerDays experiments

parent 9e0f8213
No related branches found
No related tags found
No related merge requests found
...@@ -8,6 +8,6 @@ java.util.logging.SimpleFormatter.format=[%1$tF %1$tr] %4$s: %5$s (%2$s)%n ...@@ -8,6 +8,6 @@ java.util.logging.SimpleFormatter.format=[%1$tF %1$tr] %4$s: %5$s (%2$s)%n
#teetime.level = ALL #teetime.level = ALL
#teetime.variant.methodcallWithPorts.framework.level = ALL #teetime.variant.methodcallWithPorts.framework.level = ALL
teetime.variant.methodcallWithPorts.framework.core.level = FINE #teetime.variant.methodcallWithPorts.framework.core.level = FINE
teetime.variant.methodcallWithPorts.stage.level = INFO #teetime.variant.methodcallWithPorts.stage.level = INFO
#teetime.variant.methodcallWithPorts.examples.traceReconstructionWithThreads.level = FINE #teetime.variant.methodcallWithPorts.examples.traceReconstructionWithThreads.level = FINE
package teetime.variant.methodcallWithPorts.examples.kiekerdays; package teetime.variant.methodcallWithPorts.examples.kiekerdays;
import java.util.ArrayList;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
...@@ -25,9 +26,11 @@ import kieker.common.record.flow.IFlowRecord; ...@@ -25,9 +26,11 @@ import kieker.common.record.flow.IFlowRecord;
public class TcpTraceReconstruction extends Analysis { public class TcpTraceReconstruction extends Analysis {
private static final int NUM_VIRTUAL_CORES = Runtime.getRuntime().availableProcessors(); private static final int NUM_VIRTUAL_CORES = Runtime.getRuntime().availableProcessors();
private static final int TCP_RELAY_MAX_SIZE = 500000; private static final int TCP_RELAY_MAX_SIZE = 100000;
private final List<TraceEventRecords> elementCollection = new LinkedList<TraceEventRecords>(); private final List<TraceEventRecords> elementCollection = new LinkedList<TraceEventRecords>();
private final ConcurrentHashMapWithDefault<Long, TraceBuffer> traceId2trace = new ConcurrentHashMapWithDefault<Long, TraceBuffer>(new TraceBuffer());
private final List<SpScPipe<IMonitoringRecord>> tcpRelayPipes = new ArrayList<SpScPipe<IMonitoringRecord>>();
private Thread tcpThread; private Thread tcpThread;
private Thread[] workerThreads; private Thread[] workerThreads;
...@@ -62,8 +65,6 @@ public class TcpTraceReconstruction extends Analysis { ...@@ -62,8 +65,6 @@ public class TcpTraceReconstruction extends Analysis {
return pipeline; return pipeline;
} }
private final ConcurrentHashMapWithDefault<Long, TraceBuffer> traceId2trace = new ConcurrentHashMapWithDefault<Long, TraceBuffer>(new TraceBuffer());
private Pipeline<IMonitoringRecord, ?> buildPipeline(final StageWithPort<Void, IMonitoringRecord> tcpReaderPipeline) { private Pipeline<IMonitoringRecord, ?> buildPipeline(final StageWithPort<Void, IMonitoringRecord> tcpReaderPipeline) {
// create stages // create stages
Relay<IMonitoringRecord> relay = new Relay<IMonitoringRecord>(); Relay<IMonitoringRecord> relay = new Relay<IMonitoringRecord>();
...@@ -73,7 +74,8 @@ public class TcpTraceReconstruction extends Analysis { ...@@ -73,7 +74,8 @@ public class TcpTraceReconstruction extends Analysis {
EndStage<TraceEventRecords> endStage = new EndStage<TraceEventRecords>(); EndStage<TraceEventRecords> endStage = new EndStage<TraceEventRecords>();
// connect stages // connect stages
SpScPipe.connect(tcpReaderPipeline.getOutputPort(), relay.getInputPort(), TCP_RELAY_MAX_SIZE); SpScPipe<IMonitoringRecord> tcpRelayPipe = SpScPipe.connect(tcpReaderPipeline.getOutputPort(), relay.getInputPort(), TCP_RELAY_MAX_SIZE);
this.tcpRelayPipes.add(tcpRelayPipe);
SingleElementPipe.connect(relay.getOutputPort(), instanceOfFilter.getInputPort()); SingleElementPipe.connect(relay.getOutputPort(), instanceOfFilter.getInputPort());
SingleElementPipe.connect(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort()); SingleElementPipe.connect(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort());
...@@ -109,6 +111,16 @@ public class TcpTraceReconstruction extends Analysis { ...@@ -109,6 +111,16 @@ public class TcpTraceReconstruction extends Analysis {
} }
} }
@Override
public void onTerminate() {
int maxSize = 0;
for (SpScPipe<IMonitoringRecord> pipe : this.tcpRelayPipes) {
maxSize = Math.max(maxSize, pipe.getMaxSize());
}
System.out.println("max size of TcpRelayPipes: " + maxSize);
super.onTerminate();
}
public List<TraceEventRecords> getElementCollection() { public List<TraceEventRecords> getElementCollection() {
return this.elementCollection; return this.elementCollection;
} }
......
package teetime.variant.methodcallWithPorts.examples.kiekerdays; package teetime.variant.methodcallWithPorts.examples.kiekerdays;
import java.util.ArrayList;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
...@@ -31,9 +32,12 @@ import kieker.common.record.flow.IFlowRecord; ...@@ -31,9 +32,12 @@ import kieker.common.record.flow.IFlowRecord;
public class TcpTraceReduction extends Analysis { public class TcpTraceReduction extends Analysis {
private static final int NUM_VIRTUAL_CORES = Runtime.getRuntime().availableProcessors(); private static final int NUM_VIRTUAL_CORES = Runtime.getRuntime().availableProcessors();
private static final int TCP_RELAY_MAX_SIZE = 500000; private static final int TCP_RELAY_MAX_SIZE = 100000;
private final List<TraceEventRecords> elementCollection = new LinkedList<TraceEventRecords>(); private final List<TraceEventRecords> elementCollection = new LinkedList<TraceEventRecords>();
private final ConcurrentHashMapWithDefault<Long, TraceBuffer> traceId2trace = new ConcurrentHashMapWithDefault<Long, TraceBuffer>(new TraceBuffer());
private final Map<TraceEventRecords, TraceAggregationBuffer> trace2buffer = new TreeMap<TraceEventRecords, TraceAggregationBuffer>(new TraceComperator());
private final List<SpScPipe<IMonitoringRecord>> tcpRelayPipes = new ArrayList<SpScPipe<IMonitoringRecord>>();
private Thread tcpThread; private Thread tcpThread;
private Thread clockThread; private Thread clockThread;
...@@ -87,9 +91,6 @@ public class TcpTraceReduction extends Analysis { ...@@ -87,9 +91,6 @@ public class TcpTraceReduction extends Analysis {
return pipeline; return pipeline;
} }
private final ConcurrentHashMapWithDefault<Long, TraceBuffer> traceId2trace = new ConcurrentHashMapWithDefault<Long, TraceBuffer>(new TraceBuffer());
private final Map<TraceEventRecords, TraceAggregationBuffer> trace2buffer = new TreeMap<TraceEventRecords, TraceAggregationBuffer>(new TraceComperator());
private Pipeline<IMonitoringRecord, ?> buildPipeline(final StageWithPort<Void, IMonitoringRecord> tcpReaderPipeline, private Pipeline<IMonitoringRecord, ?> buildPipeline(final StageWithPort<Void, IMonitoringRecord> tcpReaderPipeline,
final StageWithPort<Void, Long> clockStage) { final StageWithPort<Void, Long> clockStage) {
// create stages // create stages
...@@ -101,7 +102,8 @@ public class TcpTraceReduction extends Analysis { ...@@ -101,7 +102,8 @@ public class TcpTraceReduction extends Analysis {
EndStage<TraceEventRecords> endStage = new EndStage<TraceEventRecords>(); EndStage<TraceEventRecords> endStage = new EndStage<TraceEventRecords>();
// connect stages // connect stages
SpScPipe.connect(tcpReaderPipeline.getOutputPort(), relay.getInputPort(), TCP_RELAY_MAX_SIZE); SpScPipe<IMonitoringRecord> tcpRelayPipe = SpScPipe.connect(tcpReaderPipeline.getOutputPort(), relay.getInputPort(), TCP_RELAY_MAX_SIZE);
this.tcpRelayPipes.add(tcpRelayPipe);
SingleElementPipe.connect(relay.getOutputPort(), instanceOfFilter.getInputPort()); SingleElementPipe.connect(relay.getOutputPort(), instanceOfFilter.getInputPort());
SingleElementPipe.connect(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort()); SingleElementPipe.connect(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort());
...@@ -143,6 +145,16 @@ public class TcpTraceReduction extends Analysis { ...@@ -143,6 +145,16 @@ public class TcpTraceReduction extends Analysis {
this.clockThread.interrupt(); this.clockThread.interrupt();
} }
@Override
public void onTerminate() {
int maxSize = 0;
for (SpScPipe<IMonitoringRecord> pipe : this.tcpRelayPipes) {
maxSize = Math.max(maxSize, pipe.getMaxSize());
}
System.out.println("max size of TcpRelayPipes: " + maxSize);
super.onTerminate();
}
public List<TraceEventRecords> getElementCollection() { public List<TraceEventRecords> getElementCollection() {
return this.elementCollection; return this.elementCollection;
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment