diff --git a/.settings/edu.umd.cs.findbugs.core.prefs b/.settings/edu.umd.cs.findbugs.core.prefs index cc4d45619732789dbaa198f17c7ed2060050bf16..d8e404f24575b4df84b5ea513ee2919ea6f743f5 100644 --- a/.settings/edu.umd.cs.findbugs.core.prefs +++ b/.settings/edu.umd.cs.findbugs.core.prefs @@ -1,5 +1,5 @@ #FindBugs User Preferences -#Tue Nov 04 15:50:16 CET 2014 +#Fri Dec 05 08:18:05 CET 2014 detector_threshold=3 effort=max excludefilter0=.fbExcludeFilterFile|true diff --git a/src/main/java/teetime/stage/Pipeline.java b/src/main/java/teetime/stage/Pipeline.java new file mode 100644 index 0000000000000000000000000000000000000000..02d8fb7c99da6b4a3b2311667bf70a2cb4d71ae7 --- /dev/null +++ b/src/main/java/teetime/stage/Pipeline.java @@ -0,0 +1,78 @@ +package teetime.stage; + +import java.util.List; + +import teetime.framework.IStage; +import teetime.framework.InputPort; +import teetime.framework.TerminationStrategy; +import teetime.framework.signal.ISignal; +import teetime.framework.validation.InvalidPortConnection; + +/** + * + * @author Christian Wulf + * + * @param <L> + * the type of the last stage in this pipeline + */ +public class Pipeline<L extends IStage> implements IStage { + + private final IStage firstStage; + private final L lastStage; + + public Pipeline(final IStage firstStage, final L lastStage) { + super(); + this.firstStage = firstStage; + this.lastStage = lastStage; + } + + @Override + public TerminationStrategy getTerminationStrategy() { + return firstStage.getTerminationStrategy(); + } + + @Override + public void terminate() { + firstStage.terminate(); + } + + @Override + public boolean shouldBeTerminated() { + return firstStage.shouldBeTerminated(); + } + + @Override + public String getId() { + return firstStage.getId(); + } + + @Override + public void executeWithPorts() { + firstStage.executeWithPorts(); + } + + @Override + public IStage getParentStage() { + return firstStage.getParentStage(); + } + + @Override + public void setParentStage(final IStage parentStage, final int index) { + firstStage.setParentStage(parentStage, index); + } + + @Override + public void onSignal(final ISignal signal, final InputPort<?> inputPort) { + firstStage.onSignal(signal, inputPort); + } + + @Override + public void validateOutputPorts(final List<InvalidPortConnection> invalidPortConnections) { + lastStage.validateOutputPorts(invalidPortConnections); + } + + public L getLastStage() { + return lastStage; + } + +} diff --git a/src/performancetest/java/teetime/examples/kiekerdays/TcpTraceReconstruction.java b/src/performancetest/java/teetime/examples/kiekerdays/TcpTraceReconstruction.java index cf8bf96fef4b31ee5f190e2e4df0b2a9df96898b..43668d419a160043c56890c6c6823bfa24e0eba1 100644 --- a/src/performancetest/java/teetime/examples/kiekerdays/TcpTraceReconstruction.java +++ b/src/performancetest/java/teetime/examples/kiekerdays/TcpTraceReconstruction.java @@ -6,9 +6,14 @@ import java.util.List; import teetime.framework.IStage; import teetime.framework.RunnableStage; -import teetime.framework.pipe.SingleElementPipe; +import teetime.framework.pipe.IPipe; +import teetime.framework.pipe.IPipeFactory; +import teetime.framework.pipe.PipeFactoryRegistry; +import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; +import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; import teetime.framework.pipe.SpScPipe; import teetime.stage.InstanceOfFilter; +import teetime.stage.Pipeline; import teetime.stage.Relay; import teetime.stage.basic.Sink; import teetime.stage.basic.distributor.Distributor; @@ -29,15 +34,23 @@ public class TcpTraceReconstruction { private final List<TraceEventRecords> elementCollection = new LinkedList<TraceEventRecords>(); private final ConcurrentHashMapWithDefault<Long, TraceBuffer> traceId2trace = new ConcurrentHashMapWithDefault<Long, TraceBuffer>(new TraceBuffer()); - private final List<SpScPipe> tcpRelayPipes = new ArrayList<SpScPipe>(); + private final List<IPipe> tcpRelayPipes = new ArrayList<IPipe>(); + + private final IPipeFactory intraThreadPipeFactory; + private final IPipeFactory interThreadPipeFactory; private Thread tcpThread; private Thread[] workerThreads; private int numWorkerThreads; + public TcpTraceReconstruction() { + intraThreadPipeFactory = PipeFactoryRegistry.INSTANCE.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false); + interThreadPipeFactory = PipeFactoryRegistry.INSTANCE.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false); + } + public void init() { - IStage tcpPipeline = this.buildTcpPipeline(); + Pipeline<Distributor<IMonitoringRecord>> tcpPipeline = this.buildTcpPipeline(); this.tcpThread = new Thread(new RunnableStage(tcpPipeline)); this.numWorkerThreads = Math.min(NUM_VIRTUAL_CORES, this.numWorkerThreads); @@ -49,13 +62,13 @@ public class TcpTraceReconstruction { } } - private IStage buildTcpPipeline() { + private Pipeline<Distributor<IMonitoringRecord>> buildTcpPipeline() { TcpReader tcpReader = new TcpReader(); Distributor<IMonitoringRecord> distributor = new Distributor<IMonitoringRecord>(); - SingleElementPipe.connect(tcpReader.getOutputPort(), distributor.getInputPort()); + intraThreadPipeFactory.create(tcpReader.getOutputPort(), distributor.getInputPort()); - return tcpReader; + return new Pipeline<Distributor<IMonitoringRecord>>(tcpReader, distributor); } private IStage buildPipeline(final Distributor<IMonitoringRecord> tcpReaderPipeline) { @@ -67,12 +80,12 @@ public class TcpTraceReconstruction { Sink<TraceEventRecords> endStage = new Sink<TraceEventRecords>(); // connect stages - SpScPipe tcpRelayPipe = SpScPipe.connect(tcpReaderPipeline.getNewOutputPort(), relay.getInputPort(), TCP_RELAY_MAX_SIZE); + IPipe tcpRelayPipe = interThreadPipeFactory.create(tcpReaderPipeline.getNewOutputPort(), relay.getInputPort(), TCP_RELAY_MAX_SIZE); this.tcpRelayPipes.add(tcpRelayPipe); - SingleElementPipe.connect(relay.getOutputPort(), instanceOfFilter.getInputPort()); - SingleElementPipe.connect(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort()); - SingleElementPipe.connect(traceReconstructionFilter.getTraceValidOutputPort(), endStage.getInputPort()); + intraThreadPipeFactory.create(relay.getOutputPort(), instanceOfFilter.getInputPort()); + intraThreadPipeFactory.create(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort()); + intraThreadPipeFactory.create(traceReconstructionFilter.getTraceValidOutputPort(), endStage.getInputPort()); return relay; } @@ -98,8 +111,9 @@ public class TcpTraceReconstruction { public void onTerminate() { int maxNumWaits = 0; - for (SpScPipe pipe : this.tcpRelayPipes) { - maxNumWaits = Math.max(maxNumWaits, pipe.getNumWaits()); + for (IPipe pipe : this.tcpRelayPipes) { + SpScPipe interThreadPipe = (SpScPipe) pipe; + maxNumWaits = Math.max(maxNumWaits, interThreadPipe.getNumWaits()); } System.out.println("max #waits of TcpRelayPipes: " + maxNumWaits); }