Skip to content
Snippets Groups Projects
Commit 3539ee89 authored by Christian Wulf's avatar Christian Wulf
Browse files

fixed TcpTraceReduction

parent 0a797e14
No related branches found
No related tags found
No related merge requests found
......@@ -15,7 +15,7 @@ import teetime.framework.validation.InvalidPortConnection;
* @param <L>
* the type of the last stage in this pipeline
*/
public class Pipeline<L extends IStage> implements IStage {
public final class Pipeline<L extends IStage> implements IStage {
private final IStage firstStage;
private final L lastStage;
......
......@@ -8,10 +8,15 @@ import java.util.TreeMap;
import teetime.framework.IStage;
import teetime.framework.RunnableStage;
import teetime.framework.pipe.SingleElementPipe;
import teetime.framework.pipe.IPipe;
import teetime.framework.pipe.IPipeFactory;
import teetime.framework.pipe.PipeFactoryRegistry;
import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
import teetime.framework.pipe.SpScPipe;
import teetime.stage.Clock;
import teetime.stage.InstanceOfFilter;
import teetime.stage.Pipeline;
import teetime.stage.Relay;
import teetime.stage.basic.Sink;
import teetime.stage.basic.distributor.Distributor;
......@@ -36,7 +41,7 @@ public class TcpTraceReduction {
private final List<TraceEventRecords> elementCollection = new LinkedList<TraceEventRecords>();
private final ConcurrentHashMapWithDefault<Long, TraceBuffer> traceId2trace = new ConcurrentHashMapWithDefault<Long, TraceBuffer>(new TraceBuffer());
private final Map<TraceEventRecords, TraceAggregationBuffer> trace2buffer = new TreeMap<TraceEventRecords, TraceAggregationBuffer>(new TraceComperator());
private final List<SpScPipe> tcpRelayPipes = new ArrayList<SpScPipe>();
private final List<IPipe> tcpRelayPipes = new ArrayList<IPipe>();
private Thread tcpThread;
private Thread clockThread;
......@@ -44,11 +49,19 @@ public class TcpTraceReduction {
private int numWorkerThreads;
private final IPipeFactory intraThreadPipeFactory;
private final IPipeFactory interThreadPipeFactory;
public TcpTraceReduction() {
intraThreadPipeFactory = PipeFactoryRegistry.INSTANCE.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false);
interThreadPipeFactory = PipeFactoryRegistry.INSTANCE.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false);
}
public void init() {
IStage tcpPipeline = this.buildTcpPipeline();
Pipeline<Distributor<IMonitoringRecord>> tcpPipeline = this.buildTcpPipeline();
this.tcpThread = new Thread(new RunnableStage(tcpPipeline));
IStage clockStage = this.buildClockPipeline(5000);
Pipeline<Distributor<Long>> clockStage = this.buildClockPipeline(5000);
this.clockThread = new Thread(new RunnableStage(clockStage));
this.numWorkerThreads = Math.min(NUM_VIRTUAL_CORES, this.numWorkerThreads);
......@@ -60,24 +73,24 @@ public class TcpTraceReduction {
}
}
private IStage buildTcpPipeline() {
private Pipeline<Distributor<IMonitoringRecord>> buildTcpPipeline() {
TcpReader tcpReader = new TcpReader();
Distributor<IMonitoringRecord> distributor = new Distributor<IMonitoringRecord>();
SingleElementPipe.connect(tcpReader.getOutputPort(), distributor.getInputPort());
intraThreadPipeFactory.create(tcpReader.getOutputPort(), distributor.getInputPort());
return tcpReader;
return new Pipeline<Distributor<IMonitoringRecord>>(tcpReader, distributor);
}
private IStage buildClockPipeline(final long intervalDelayInMs) {
private Pipeline<Distributor<Long>> buildClockPipeline(final long intervalDelayInMs) {
Clock clock = new Clock();
clock.setInitialDelayInMs(intervalDelayInMs);
clock.setIntervalDelayInMs(intervalDelayInMs);
Distributor<Long> distributor = new Distributor<Long>();
SingleElementPipe.connect(clock.getOutputPort(), distributor.getInputPort());
intraThreadPipeFactory.create(clock.getOutputPort(), distributor.getInputPort());
return clock;
return new Pipeline<Distributor<Long>>(clock, distributor);
}
private IStage buildPipeline(final Distributor<IMonitoringRecord> tcpReaderPipeline, final Distributor<Long> clockStage) {
......@@ -90,15 +103,15 @@ public class TcpTraceReduction {
Sink<TraceEventRecords> endStage = new Sink<TraceEventRecords>();
// connect stages
SpScPipe tcpRelayPipe = SpScPipe.connect(tcpReaderPipeline.getNewOutputPort(), relay.getInputPort(), TCP_RELAY_MAX_SIZE);
IPipe tcpRelayPipe = interThreadPipeFactory.create(tcpReaderPipeline.getNewOutputPort(), relay.getInputPort(), TCP_RELAY_MAX_SIZE);
this.tcpRelayPipes.add(tcpRelayPipe);
SingleElementPipe.connect(relay.getOutputPort(), instanceOfFilter.getInputPort());
SingleElementPipe.connect(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort());
SingleElementPipe.connect(traceReconstructionFilter.getTraceValidOutputPort(), traceReductionFilter.getInputPort());
SingleElementPipe.connect(traceReductionFilter.getOutputPort(), endStage.getInputPort());
intraThreadPipeFactory.create(relay.getOutputPort(), instanceOfFilter.getInputPort());
intraThreadPipeFactory.create(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort());
intraThreadPipeFactory.create(traceReconstructionFilter.getTraceValidOutputPort(), traceReductionFilter.getInputPort());
intraThreadPipeFactory.create(traceReductionFilter.getOutputPort(), endStage.getInputPort());
SpScPipe.connect(clockStage.getNewOutputPort(), traceReductionFilter.getTriggerInputPort(), 10);
interThreadPipeFactory.create(clockStage.getNewOutputPort(), traceReductionFilter.getTriggerInputPort(), 10);
return relay;
}
......@@ -126,8 +139,10 @@ public class TcpTraceReduction {
public void onTerminate() {
int maxNumWaits = 0;
for (SpScPipe pipe : this.tcpRelayPipes) {
maxNumWaits = Math.max(maxNumWaits, pipe.getNumWaits());
for (IPipe pipe : this.tcpRelayPipes) {
SpScPipe interThreadPipe = (SpScPipe) pipe;
// TODO introduce IInterThreadPipe
maxNumWaits = Math.max(maxNumWaits, interThreadPipe.getNumWaits());
}
System.out.println("max #waits of TcpRelayPipes: " + maxNumWaits);
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment