Skip to content
Snippets Groups Projects
Commit 0a797e14 authored by Christian Wulf's avatar Christian Wulf
Browse files

fixed TcpTraceReconstruction;

reintroduced Pipeline
parent 615301a1
No related branches found
No related tags found
No related merge requests found
#FindBugs User Preferences
#Tue Nov 04 15:50:16 CET 2014
#Fri Dec 05 08:18:05 CET 2014
detector_threshold=3
effort=max
excludefilter0=.fbExcludeFilterFile|true
......
package teetime.stage;
import java.util.List;
import teetime.framework.IStage;
import teetime.framework.InputPort;
import teetime.framework.TerminationStrategy;
import teetime.framework.signal.ISignal;
import teetime.framework.validation.InvalidPortConnection;
/**
*
* @author Christian Wulf
*
* @param <L>
* the type of the last stage in this pipeline
*/
public class Pipeline<L extends IStage> implements IStage {
private final IStage firstStage;
private final L lastStage;
public Pipeline(final IStage firstStage, final L lastStage) {
super();
this.firstStage = firstStage;
this.lastStage = lastStage;
}
@Override
public TerminationStrategy getTerminationStrategy() {
return firstStage.getTerminationStrategy();
}
@Override
public void terminate() {
firstStage.terminate();
}
@Override
public boolean shouldBeTerminated() {
return firstStage.shouldBeTerminated();
}
@Override
public String getId() {
return firstStage.getId();
}
@Override
public void executeWithPorts() {
firstStage.executeWithPorts();
}
@Override
public IStage getParentStage() {
return firstStage.getParentStage();
}
@Override
public void setParentStage(final IStage parentStage, final int index) {
firstStage.setParentStage(parentStage, index);
}
@Override
public void onSignal(final ISignal signal, final InputPort<?> inputPort) {
firstStage.onSignal(signal, inputPort);
}
@Override
public void validateOutputPorts(final List<InvalidPortConnection> invalidPortConnections) {
lastStage.validateOutputPorts(invalidPortConnections);
}
public L getLastStage() {
return lastStage;
}
}
......@@ -6,9 +6,14 @@ import java.util.List;
import teetime.framework.IStage;
import teetime.framework.RunnableStage;
import teetime.framework.pipe.SingleElementPipe;
import teetime.framework.pipe.IPipe;
import teetime.framework.pipe.IPipeFactory;
import teetime.framework.pipe.PipeFactoryRegistry;
import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
import teetime.framework.pipe.SpScPipe;
import teetime.stage.InstanceOfFilter;
import teetime.stage.Pipeline;
import teetime.stage.Relay;
import teetime.stage.basic.Sink;
import teetime.stage.basic.distributor.Distributor;
......@@ -29,15 +34,23 @@ public class TcpTraceReconstruction {
private final List<TraceEventRecords> elementCollection = new LinkedList<TraceEventRecords>();
private final ConcurrentHashMapWithDefault<Long, TraceBuffer> traceId2trace = new ConcurrentHashMapWithDefault<Long, TraceBuffer>(new TraceBuffer());
private final List<SpScPipe> tcpRelayPipes = new ArrayList<SpScPipe>();
private final List<IPipe> tcpRelayPipes = new ArrayList<IPipe>();
private final IPipeFactory intraThreadPipeFactory;
private final IPipeFactory interThreadPipeFactory;
private Thread tcpThread;
private Thread[] workerThreads;
private int numWorkerThreads;
public TcpTraceReconstruction() {
intraThreadPipeFactory = PipeFactoryRegistry.INSTANCE.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false);
interThreadPipeFactory = PipeFactoryRegistry.INSTANCE.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false);
}
public void init() {
IStage tcpPipeline = this.buildTcpPipeline();
Pipeline<Distributor<IMonitoringRecord>> tcpPipeline = this.buildTcpPipeline();
this.tcpThread = new Thread(new RunnableStage(tcpPipeline));
this.numWorkerThreads = Math.min(NUM_VIRTUAL_CORES, this.numWorkerThreads);
......@@ -49,13 +62,13 @@ public class TcpTraceReconstruction {
}
}
private IStage buildTcpPipeline() {
private Pipeline<Distributor<IMonitoringRecord>> buildTcpPipeline() {
TcpReader tcpReader = new TcpReader();
Distributor<IMonitoringRecord> distributor = new Distributor<IMonitoringRecord>();
SingleElementPipe.connect(tcpReader.getOutputPort(), distributor.getInputPort());
intraThreadPipeFactory.create(tcpReader.getOutputPort(), distributor.getInputPort());
return tcpReader;
return new Pipeline<Distributor<IMonitoringRecord>>(tcpReader, distributor);
}
private IStage buildPipeline(final Distributor<IMonitoringRecord> tcpReaderPipeline) {
......@@ -67,12 +80,12 @@ public class TcpTraceReconstruction {
Sink<TraceEventRecords> endStage = new Sink<TraceEventRecords>();
// connect stages
SpScPipe tcpRelayPipe = SpScPipe.connect(tcpReaderPipeline.getNewOutputPort(), relay.getInputPort(), TCP_RELAY_MAX_SIZE);
IPipe tcpRelayPipe = interThreadPipeFactory.create(tcpReaderPipeline.getNewOutputPort(), relay.getInputPort(), TCP_RELAY_MAX_SIZE);
this.tcpRelayPipes.add(tcpRelayPipe);
SingleElementPipe.connect(relay.getOutputPort(), instanceOfFilter.getInputPort());
SingleElementPipe.connect(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort());
SingleElementPipe.connect(traceReconstructionFilter.getTraceValidOutputPort(), endStage.getInputPort());
intraThreadPipeFactory.create(relay.getOutputPort(), instanceOfFilter.getInputPort());
intraThreadPipeFactory.create(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort());
intraThreadPipeFactory.create(traceReconstructionFilter.getTraceValidOutputPort(), endStage.getInputPort());
return relay;
}
......@@ -98,8 +111,9 @@ public class TcpTraceReconstruction {
public void onTerminate() {
int maxNumWaits = 0;
for (SpScPipe pipe : this.tcpRelayPipes) {
maxNumWaits = Math.max(maxNumWaits, pipe.getNumWaits());
for (IPipe pipe : this.tcpRelayPipes) {
SpScPipe interThreadPipe = (SpScPipe) pipe;
maxNumWaits = Math.max(maxNumWaits, interThreadPipe.getNumWaits());
}
System.out.println("max #waits of TcpRelayPipes: " + maxNumWaits);
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment