From 0a797e14968d50d0925da0d2b7073918a0788df6 Mon Sep 17 00:00:00 2001 From: Christian Wulf <chw@informatik.uni-kiel.de> Date: Fri, 5 Dec 2014 08:42:58 +0100 Subject: [PATCH] fixed TcpTraceReconstruction; reintroduced Pipeline --- .settings/edu.umd.cs.findbugs.core.prefs | 2 +- src/main/java/teetime/stage/Pipeline.java | 78 +++++++++++++++++++ .../kiekerdays/TcpTraceReconstruction.java | 38 ++++++--- 3 files changed, 105 insertions(+), 13 deletions(-) create mode 100644 src/main/java/teetime/stage/Pipeline.java diff --git a/.settings/edu.umd.cs.findbugs.core.prefs b/.settings/edu.umd.cs.findbugs.core.prefs index cc4d4561..d8e404f2 100644 --- a/.settings/edu.umd.cs.findbugs.core.prefs +++ b/.settings/edu.umd.cs.findbugs.core.prefs @@ -1,5 +1,5 @@ #FindBugs User Preferences -#Tue Nov 04 15:50:16 CET 2014 +#Fri Dec 05 08:18:05 CET 2014 detector_threshold=3 effort=max excludefilter0=.fbExcludeFilterFile|true diff --git a/src/main/java/teetime/stage/Pipeline.java b/src/main/java/teetime/stage/Pipeline.java new file mode 100644 index 00000000..02d8fb7c --- /dev/null +++ b/src/main/java/teetime/stage/Pipeline.java @@ -0,0 +1,78 @@ +package teetime.stage; + +import java.util.List; + +import teetime.framework.IStage; +import teetime.framework.InputPort; +import teetime.framework.TerminationStrategy; +import teetime.framework.signal.ISignal; +import teetime.framework.validation.InvalidPortConnection; + +/** + * + * @author Christian Wulf + * + * @param <L> + * the type of the last stage in this pipeline + */ +public class Pipeline<L extends IStage> implements IStage { + + private final IStage firstStage; + private final L lastStage; + + public Pipeline(final IStage firstStage, final L lastStage) { + super(); + this.firstStage = firstStage; + this.lastStage = lastStage; + } + + @Override + public TerminationStrategy getTerminationStrategy() { + return firstStage.getTerminationStrategy(); + } + + @Override + public void terminate() { + firstStage.terminate(); + } + + @Override + public boolean shouldBeTerminated() { + return firstStage.shouldBeTerminated(); + } + + @Override + public String getId() { + return firstStage.getId(); + } + + @Override + public void executeWithPorts() { + firstStage.executeWithPorts(); + } + + @Override + public IStage getParentStage() { + return firstStage.getParentStage(); + } + + @Override + public void setParentStage(final IStage parentStage, final int index) { + firstStage.setParentStage(parentStage, index); + } + + @Override + public void onSignal(final ISignal signal, final InputPort<?> inputPort) { + firstStage.onSignal(signal, inputPort); + } + + @Override + public void validateOutputPorts(final List<InvalidPortConnection> invalidPortConnections) { + lastStage.validateOutputPorts(invalidPortConnections); + } + + public L getLastStage() { + return lastStage; + } + +} diff --git a/src/performancetest/java/teetime/examples/kiekerdays/TcpTraceReconstruction.java b/src/performancetest/java/teetime/examples/kiekerdays/TcpTraceReconstruction.java index cf8bf96f..43668d41 100644 --- a/src/performancetest/java/teetime/examples/kiekerdays/TcpTraceReconstruction.java +++ b/src/performancetest/java/teetime/examples/kiekerdays/TcpTraceReconstruction.java @@ -6,9 +6,14 @@ import java.util.List; import teetime.framework.IStage; import teetime.framework.RunnableStage; -import teetime.framework.pipe.SingleElementPipe; +import teetime.framework.pipe.IPipe; +import teetime.framework.pipe.IPipeFactory; +import teetime.framework.pipe.PipeFactoryRegistry; +import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; +import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; import teetime.framework.pipe.SpScPipe; import teetime.stage.InstanceOfFilter; +import teetime.stage.Pipeline; import teetime.stage.Relay; import teetime.stage.basic.Sink; import teetime.stage.basic.distributor.Distributor; @@ -29,15 +34,23 @@ public class TcpTraceReconstruction { private final List<TraceEventRecords> elementCollection = new LinkedList<TraceEventRecords>(); private final ConcurrentHashMapWithDefault<Long, TraceBuffer> traceId2trace = new ConcurrentHashMapWithDefault<Long, TraceBuffer>(new TraceBuffer()); - private final List<SpScPipe> tcpRelayPipes = new ArrayList<SpScPipe>(); + private final List<IPipe> tcpRelayPipes = new ArrayList<IPipe>(); + + private final IPipeFactory intraThreadPipeFactory; + private final IPipeFactory interThreadPipeFactory; private Thread tcpThread; private Thread[] workerThreads; private int numWorkerThreads; + public TcpTraceReconstruction() { + intraThreadPipeFactory = PipeFactoryRegistry.INSTANCE.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false); + interThreadPipeFactory = PipeFactoryRegistry.INSTANCE.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false); + } + public void init() { - IStage tcpPipeline = this.buildTcpPipeline(); + Pipeline<Distributor<IMonitoringRecord>> tcpPipeline = this.buildTcpPipeline(); this.tcpThread = new Thread(new RunnableStage(tcpPipeline)); this.numWorkerThreads = Math.min(NUM_VIRTUAL_CORES, this.numWorkerThreads); @@ -49,13 +62,13 @@ public class TcpTraceReconstruction { } } - private IStage buildTcpPipeline() { + private Pipeline<Distributor<IMonitoringRecord>> buildTcpPipeline() { TcpReader tcpReader = new TcpReader(); Distributor<IMonitoringRecord> distributor = new Distributor<IMonitoringRecord>(); - SingleElementPipe.connect(tcpReader.getOutputPort(), distributor.getInputPort()); + intraThreadPipeFactory.create(tcpReader.getOutputPort(), distributor.getInputPort()); - return tcpReader; + return new Pipeline<Distributor<IMonitoringRecord>>(tcpReader, distributor); } private IStage buildPipeline(final Distributor<IMonitoringRecord> tcpReaderPipeline) { @@ -67,12 +80,12 @@ public class TcpTraceReconstruction { Sink<TraceEventRecords> endStage = new Sink<TraceEventRecords>(); // connect stages - SpScPipe tcpRelayPipe = SpScPipe.connect(tcpReaderPipeline.getNewOutputPort(), relay.getInputPort(), TCP_RELAY_MAX_SIZE); + IPipe tcpRelayPipe = interThreadPipeFactory.create(tcpReaderPipeline.getNewOutputPort(), relay.getInputPort(), TCP_RELAY_MAX_SIZE); this.tcpRelayPipes.add(tcpRelayPipe); - SingleElementPipe.connect(relay.getOutputPort(), instanceOfFilter.getInputPort()); - SingleElementPipe.connect(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort()); - SingleElementPipe.connect(traceReconstructionFilter.getTraceValidOutputPort(), endStage.getInputPort()); + intraThreadPipeFactory.create(relay.getOutputPort(), instanceOfFilter.getInputPort()); + intraThreadPipeFactory.create(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort()); + intraThreadPipeFactory.create(traceReconstructionFilter.getTraceValidOutputPort(), endStage.getInputPort()); return relay; } @@ -98,8 +111,9 @@ public class TcpTraceReconstruction { public void onTerminate() { int maxNumWaits = 0; - for (SpScPipe pipe : this.tcpRelayPipes) { - maxNumWaits = Math.max(maxNumWaits, pipe.getNumWaits()); + for (IPipe pipe : this.tcpRelayPipes) { + SpScPipe interThreadPipe = (SpScPipe) pipe; + maxNumWaits = Math.max(maxNumWaits, interThreadPipe.getNumWaits()); } System.out.println("max #waits of TcpRelayPipes: " + maxNumWaits); } -- GitLab