Skip to content
Snippets Groups Projects
Commit c97e5bc2 authored by Nelson Tavares de Sousa's avatar Nelson Tavares de Sousa
Browse files

adapted api changes

parent 2ce14d6b
No related branches found
No related tags found
No related merge requests found
......@@ -19,11 +19,8 @@ import java.io.File;
import java.util.LinkedList;
import java.util.List;
import teetime.framework.Stage;
import teetime.framework.AnalysisConfiguration;
import teetime.framework.pipe.IPipeFactory;
import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
import teetime.framework.Stage;
import teetime.stage.CollectorSink;
import teetime.stage.InitialElementProducer;
import teetime.stage.className.ClassNameRegistryRepository;
......@@ -39,10 +36,8 @@ import kieker.common.record.IMonitoringRecord;
public class RecordReaderConfiguration extends AnalysisConfiguration {
private final List<IMonitoringRecord> elementCollection = new LinkedList<IMonitoringRecord>();
private final IPipeFactory intraThreadPipeFactory;
public RecordReaderConfiguration() {
intraThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false);
this.buildConfiguration();
}
......@@ -60,8 +55,8 @@ public class RecordReaderConfiguration extends AnalysisConfiguration {
CollectorSink<IMonitoringRecord> collector = new CollectorSink<IMonitoringRecord>(this.elementCollection);
// connect stages
intraThreadPipeFactory.create(initialElementProducer.getOutputPort(), dir2RecordsFilter.getInputPort());
intraThreadPipeFactory.create(dir2RecordsFilter.getOutputPort(), collector.getInputPort());
connectIntraThreads(initialElementProducer.getOutputPort(), dir2RecordsFilter.getInputPort());
connectIntraThreads(dir2RecordsFilter.getOutputPort(), collector.getInputPort());
return initialElementProducer;
}
......
......@@ -17,12 +17,9 @@ package teetime.examples.traceReading;
import java.util.List;
import teetime.framework.Stage;
import teetime.framework.AnalysisConfiguration;
import teetime.framework.Pipeline;
import teetime.framework.pipe.IPipeFactory;
import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
import teetime.framework.Stage;
import teetime.stage.Clock;
import teetime.stage.Counter;
import teetime.stage.ElementThroughputMeasuringStage;
......@@ -36,12 +33,8 @@ public class TcpTraceLoggingExtAnalysisConfiguration extends AnalysisConfigurati
private Counter<IMonitoringRecord> recordCounter;
private ElementThroughputMeasuringStage<IMonitoringRecord> recordThroughputStage;
private final IPipeFactory intraThreadPipeFactory;
private final IPipeFactory interThreadPipeFactory;
public TcpTraceLoggingExtAnalysisConfiguration() {
intraThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false);
interThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false);
init();
}
......@@ -58,7 +51,7 @@ public class TcpTraceLoggingExtAnalysisConfiguration extends AnalysisConfigurati
clockStage.setIntervalDelayInMs(intervalDelayInMs);
Distributor<Long> distributor = new Distributor<Long>();
intraThreadPipeFactory.create(clockStage.getOutputPort(), distributor.getInputPort());
connectIntraThreads(clockStage.getOutputPort(), distributor.getInputPort());
return new Pipeline<Distributor<Long>>(clockStage, distributor);
}
......@@ -69,12 +62,12 @@ public class TcpTraceLoggingExtAnalysisConfiguration extends AnalysisConfigurati
this.recordThroughputStage = new ElementThroughputMeasuringStage<IMonitoringRecord>();
Sink<IMonitoringRecord> endStage = new Sink<IMonitoringRecord>();
intraThreadPipeFactory.create(tcpReader.getOutputPort(), this.recordCounter.getInputPort());
intraThreadPipeFactory.create(this.recordCounter.getOutputPort(), this.recordThroughputStage.getInputPort());
intraThreadPipeFactory.create(this.recordThroughputStage.getOutputPort(), endStage.getInputPort());
connectIntraThreads(tcpReader.getOutputPort(), this.recordCounter.getInputPort());
connectIntraThreads(this.recordCounter.getOutputPort(), this.recordThroughputStage.getInputPort());
connectIntraThreads(this.recordThroughputStage.getOutputPort(), endStage.getInputPort());
// intraThreadPipeFactory.create(this.recordCounter.getOutputPort(), endStage.getInputPort());
interThreadPipeFactory.create(previousClockStage.getNewOutputPort(), this.recordThroughputStage.getTriggerInputPort(), 10);
connectBoundedInterThreads(previousClockStage.getNewOutputPort(), this.recordThroughputStage.getTriggerInputPort(), 10);
return tcpReader;
}
......
......@@ -20,9 +20,6 @@ import java.util.List;
import teetime.framework.AnalysisConfiguration;
import teetime.framework.Pipeline;
import teetime.framework.Stage;
import teetime.framework.pipe.IPipeFactory;
import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
import teetime.stage.Clock;
import teetime.stage.Counter;
import teetime.stage.ElementThroughputMeasuringStage;
......@@ -50,12 +47,7 @@ public class TcpTraceReconstructionConf extends AnalysisConfiguration {
private ElementThroughputMeasuringStage<IFlowRecord> recordThroughputFilter;
private ElementThroughputMeasuringStage<EventBasedTrace> traceThroughputFilter;
private final IPipeFactory intraThreadPipeFactory;
private final IPipeFactory interThreadPipeFactory;
public TcpTraceReconstructionConf() {
intraThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false);
interThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false);
this.traceId2trace = new ConcurrentHashMapWithDefault<Long, EventBasedTrace>(EventBasedTraceFactory.INSTANCE);
init();
}
......@@ -76,7 +68,7 @@ public class TcpTraceReconstructionConf extends AnalysisConfiguration {
clock.setIntervalDelayInMs(intervalDelayInMs);
Distributor<Long> distributor = new Distributor<Long>();
intraThreadPipeFactory.create(clock.getOutputPort(), distributor.getInputPort());
connectIntraThreads(clock.getOutputPort(), distributor.getInputPort());
return new Pipeline<Distributor<Long>>(clock, distributor);
}
......@@ -94,18 +86,18 @@ public class TcpTraceReconstructionConf extends AnalysisConfiguration {
Sink<EventBasedTrace> endStage = new Sink<EventBasedTrace>();
// connect stages
interThreadPipeFactory.create(tcpReader.getOutputPort(), this.recordCounter.getInputPort(), TCP_RELAY_MAX_SIZE);
intraThreadPipeFactory.create(this.recordCounter.getOutputPort(), instanceOfFilter.getInputPort());
// intraThreadPipeFactory.create(instanceOfFilter.getOutputPort(), this.recordThroughputFilter.getInputPort());
// intraThreadPipeFactory.create(this.recordThroughputFilter.getOutputPort(), traceReconstructionFilter.getInputPort());
intraThreadPipeFactory.create(instanceOfFilter.getMatchedOutputPort(), traceReconstructionFilter.getInputPort());
intraThreadPipeFactory.create(traceReconstructionFilter.getTraceValidOutputPort(), this.traceThroughputFilter.getInputPort());
intraThreadPipeFactory.create(this.traceThroughputFilter.getOutputPort(), this.traceCounter.getInputPort());
// intraThreadPipeFactory.create(traceReconstructionFilter.getTraceValidOutputPort(), this.traceCounter.getInputPort());
intraThreadPipeFactory.create(this.traceCounter.getOutputPort(), endStage.getInputPort());
interThreadPipeFactory.create(clockStage.getNewOutputPort(), this.recordThroughputFilter.getTriggerInputPort(), 10);
interThreadPipeFactory.create(clock2Stage.getNewOutputPort(), this.traceThroughputFilter.getTriggerInputPort(), 10);
connectBoundedInterThreads(tcpReader.getOutputPort(), this.recordCounter.getInputPort(), TCP_RELAY_MAX_SIZE);
connectIntraThreads(this.recordCounter.getOutputPort(), instanceOfFilter.getInputPort());
// connectIntraThreads(instanceOfFilter.getOutputPort(), this.recordThroughputFilter.getInputPort());
// connectIntraThreads(this.recordThroughputFilter.getOutputPort(), traceReconstructionFilter.getInputPort());
connectIntraThreads(instanceOfFilter.getMatchedOutputPort(), traceReconstructionFilter.getInputPort());
connectIntraThreads(traceReconstructionFilter.getTraceValidOutputPort(), this.traceThroughputFilter.getInputPort());
connectIntraThreads(this.traceThroughputFilter.getOutputPort(), this.traceCounter.getInputPort());
// connectIntraThreads(traceReconstructionFilter.getTraceValidOutputPort(), this.traceCounter.getInputPort());
connectIntraThreads(this.traceCounter.getOutputPort(), endStage.getInputPort());
connectBoundedInterThreads(clockStage.getNewOutputPort(), this.recordThroughputFilter.getTriggerInputPort(), 10);
connectBoundedInterThreads(clock2Stage.getNewOutputPort(), this.traceThroughputFilter.getTriggerInputPort(), 10);
return tcpReader;
}
......
......@@ -21,9 +21,6 @@ import java.util.List;
import teetime.framework.AnalysisConfiguration;
import teetime.framework.Stage;
import teetime.framework.pipe.IPipeFactory;
import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
import teetime.stage.Cache;
import teetime.stage.Clock;
import teetime.stage.CollectorSink;
......@@ -50,8 +47,6 @@ public class TraceReconstructionConf extends AnalysisConfiguration {
private final List<EventBasedTrace> elementCollection = new LinkedList<EventBasedTrace>();
private final File inputDir;
private final IPipeFactory intraThreadPipeFactory;
private final IPipeFactory interThreadPipeFactory;
private final ConcurrentHashMapWithDefault<Long, EventBasedTrace> traceId2trace;
private ClassNameRegistryRepository classNameRegistryRepository;
......@@ -61,8 +56,6 @@ public class TraceReconstructionConf extends AnalysisConfiguration {
public TraceReconstructionConf(final File inputDir) {
this.inputDir = inputDir;
intraThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false);
interThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false);
this.traceId2trace = new ConcurrentHashMapWithDefault<Long, EventBasedTrace>(EventBasedTraceFactory.INSTANCE);
init();
}
......@@ -105,20 +98,20 @@ public class TraceReconstructionConf extends AnalysisConfiguration {
stringBufferFilter.getDataTypeHandlers().add(new StringHandler());
// connect stages
intraThreadPipeFactory.create(initialElementProducer.getOutputPort(), dir2RecordsFilter.getInputPort());
intraThreadPipeFactory.create(dir2RecordsFilter.getOutputPort(), this.recordCounter.getInputPort());
intraThreadPipeFactory.create(this.recordCounter.getOutputPort(), cache.getInputPort());
intraThreadPipeFactory.create(cache.getOutputPort(), stringBufferFilter.getInputPort());
intraThreadPipeFactory.create(stringBufferFilter.getOutputPort(), instanceOfFilter.getInputPort());
intraThreadPipeFactory.create(instanceOfFilter.getMatchedOutputPort(), this.throughputFilter.getInputPort());
intraThreadPipeFactory.create(this.throughputFilter.getOutputPort(), traceReconstructionFilter.getInputPort());
// intraThreadPipeFactory.create(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort());
intraThreadPipeFactory.create(traceReconstructionFilter.getTraceValidOutputPort(), merger.getNewInputPort());
intraThreadPipeFactory.create(traceReconstructionFilter.getTraceInvalidOutputPort(), merger.getNewInputPort());
intraThreadPipeFactory.create(merger.getOutputPort(), this.traceCounter.getInputPort());
intraThreadPipeFactory.create(this.traceCounter.getOutputPort(), collector.getInputPort());
interThreadPipeFactory.create(clockStage.getOutputPort(), this.throughputFilter.getTriggerInputPort(), 1);
connectIntraThreads(initialElementProducer.getOutputPort(), dir2RecordsFilter.getInputPort());
connectIntraThreads(dir2RecordsFilter.getOutputPort(), this.recordCounter.getInputPort());
connectIntraThreads(this.recordCounter.getOutputPort(), cache.getInputPort());
connectIntraThreads(cache.getOutputPort(), stringBufferFilter.getInputPort());
connectIntraThreads(stringBufferFilter.getOutputPort(), instanceOfFilter.getInputPort());
connectIntraThreads(instanceOfFilter.getMatchedOutputPort(), this.throughputFilter.getInputPort());
connectIntraThreads(this.throughputFilter.getOutputPort(), traceReconstructionFilter.getInputPort());
// connectIntraThreads(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort());
connectIntraThreads(traceReconstructionFilter.getTraceValidOutputPort(), merger.getNewInputPort());
connectIntraThreads(traceReconstructionFilter.getTraceInvalidOutputPort(), merger.getNewInputPort());
connectIntraThreads(merger.getOutputPort(), this.traceCounter.getInputPort());
connectIntraThreads(this.traceCounter.getOutputPort(), collector.getInputPort());
connectBoundedInterThreads(clockStage.getOutputPort(), this.throughputFilter.getTriggerInputPort(), 1);
return initialElementProducer;
}
......
......@@ -27,9 +27,6 @@ import teetime.framework.Pipeline;
import teetime.framework.Stage;
import teetime.framework.pipe.IMonitorablePipe;
import teetime.framework.pipe.IPipe;
import teetime.framework.pipe.IPipeFactory;
import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
import teetime.stage.Clock;
import teetime.stage.Counter;
import teetime.stage.ElementDelayMeasuringStage;
......@@ -71,8 +68,6 @@ public class TcpTraceReconstructionAnalysisWithThreadsConfiguration extends Anal
private final StageFactory<ElementThroughputMeasuringStage<EventBasedTrace>> traceThroughputFilterFactory;
private final List<IMonitorablePipe> tcpRelayPipes = new LinkedList<IMonitorablePipe>();
private final IPipeFactory intraThreadPipeFactory;
private final IPipeFactory interThreadPipeFactory;
@SuppressWarnings({ "rawtypes", "unchecked" })
public TcpTraceReconstructionAnalysisWithThreadsConfiguration(final int numWorkerThreads) {
......@@ -93,8 +88,6 @@ public class TcpTraceReconstructionAnalysisWithThreadsConfiguration extends Anal
throw new IllegalArgumentException(e);
}
intraThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false);
interThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false);
init();
}
......@@ -118,7 +111,7 @@ public class TcpTraceReconstructionAnalysisWithThreadsConfiguration extends Anal
TcpReaderStage tcpReader = new TcpReaderStage();
Distributor<IMonitoringRecord> distributor = new Distributor<IMonitoringRecord>();
intraThreadPipeFactory.create(tcpReader.getOutputPort(), distributor.getInputPort());
connectIntraThreads(tcpReader.getOutputPort(), distributor.getInputPort());
return new Pipeline<Distributor<IMonitoringRecord>>(tcpReader, distributor);
}
......@@ -129,7 +122,7 @@ public class TcpTraceReconstructionAnalysisWithThreadsConfiguration extends Anal
clock.setIntervalDelayInMs(intervalDelayInMs);
Distributor<Long> distributor = new Distributor<Long>();
intraThreadPipeFactory.create(clock.getOutputPort(), distributor.getInputPort());
connectIntraThreads(clock.getOutputPort(), distributor.getInputPort());
return new Pipeline<Distributor<Long>>(clock, distributor);
}
......@@ -182,22 +175,22 @@ public class TcpTraceReconstructionAnalysisWithThreadsConfiguration extends Anal
// EndStage<IMonitoringRecord> endStage = new EndStage<IMonitoringRecord>();
// connect stages
IPipe tcpRelayPipe = interThreadPipeFactory.create(tcpReaderPipeline.getNewOutputPort(), relay.getInputPort(), TCP_RELAY_MAX_SIZE);
IPipe tcpRelayPipe = connectBoundedInterThreads(tcpReaderPipeline.getNewOutputPort(), relay.getInputPort(), TCP_RELAY_MAX_SIZE);
this.tcpRelayPipes.add((IMonitorablePipe) tcpRelayPipe);
// SysOutFilter<EventBasedTrace> sysout = new SysOutFilter<EventBasedTrace>(tcpRelayPipe);
interThreadPipeFactory.create(clockStage.getNewOutputPort(), recordThroughputFilter.getTriggerInputPort(), 10);
interThreadPipeFactory.create(clock2Stage.getNewOutputPort(), traceThroughputFilter.getTriggerInputPort(), 10);
intraThreadPipeFactory.create(relay.getOutputPort(), recordCounter.getInputPort());
intraThreadPipeFactory.create(recordCounter.getOutputPort(), recordThroughputFilter.getInputPort());
intraThreadPipeFactory.create(recordThroughputFilter.getOutputPort(), traceMetadataCounter.getInputPort());
intraThreadPipeFactory.create(traceMetadataCounter.getOutputPort(), instanceOfFilter.getInputPort());
intraThreadPipeFactory.create(instanceOfFilter.getMatchedOutputPort(), traceReconstructionFilter.getInputPort());
intraThreadPipeFactory.create(traceReconstructionFilter.getTraceValidOutputPort(), traceCounter.getInputPort());
// intraThreadPipeFactory.create(traceReconstructionFilter.getOutputPort(), traceThroughputFilter.getInputPort());
// intraThreadPipeFactory.create(traceThroughputFilter.getOutputPort(), traceCounter.getInputPort());
intraThreadPipeFactory.create(traceCounter.getOutputPort(), endStage.getInputPort());
connectBoundedInterThreads(clockStage.getNewOutputPort(), recordThroughputFilter.getTriggerInputPort(), 10);
connectBoundedInterThreads(clock2Stage.getNewOutputPort(), traceThroughputFilter.getTriggerInputPort(), 10);
connectIntraThreads(relay.getOutputPort(), recordCounter.getInputPort());
connectIntraThreads(recordCounter.getOutputPort(), recordThroughputFilter.getInputPort());
connectIntraThreads(recordThroughputFilter.getOutputPort(), traceMetadataCounter.getInputPort());
connectIntraThreads(traceMetadataCounter.getOutputPort(), instanceOfFilter.getInputPort());
connectIntraThreads(instanceOfFilter.getMatchedOutputPort(), traceReconstructionFilter.getInputPort());
connectIntraThreads(traceReconstructionFilter.getTraceValidOutputPort(), traceCounter.getInputPort());
// connectIntraThreads(traceReconstructionFilter.getOutputPort(), traceThroughputFilter.getInputPort());
// connectIntraThreads(traceThroughputFilter.getOutputPort(), traceCounter.getInputPort());
connectIntraThreads(traceCounter.getOutputPort(), endStage.getInputPort());
return relay;
}
......
......@@ -29,9 +29,6 @@ import teetime.framework.Pipeline;
import teetime.framework.Stage;
import teetime.framework.pipe.IMonitorablePipe;
import teetime.framework.pipe.IPipe;
import teetime.framework.pipe.IPipeFactory;
import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
import teetime.stage.Clock;
import teetime.stage.Counter;
import teetime.stage.ElementDelayMeasuringStage;
......@@ -67,8 +64,6 @@ public class TcpTraceReductionAnalysisWithThreadsConfiguration extends AnalysisC
private final ConcurrentHashMapWithDefault<Long, EventBasedTrace> traceId2trace;
private final Map<EventBasedTrace, TraceAggregationBuffer> trace2buffer;
private final IPipeFactory intraThreadPipeFactory;
private final IPipeFactory interThreadPipeFactory;
@SuppressWarnings({ "rawtypes", "unchecked" })
public TcpTraceReductionAnalysisWithThreadsConfiguration(final int numWorkerThreads) {
......@@ -89,8 +84,6 @@ public class TcpTraceReductionAnalysisWithThreadsConfiguration extends AnalysisC
this.traceId2trace = new ConcurrentHashMapWithDefault<Long, EventBasedTrace>(EventBasedTraceFactory.INSTANCE);
this.trace2buffer = new TreeMap<EventBasedTrace, TraceAggregationBuffer>(new EventBasedTraceComperator());
this.intraThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false);
this.interThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false);
init();
}
......@@ -114,7 +107,7 @@ public class TcpTraceReductionAnalysisWithThreadsConfiguration extends AnalysisC
final TcpReaderStage tcpReader = new TcpReaderStage();
final Distributor<IMonitoringRecord> distributor = new Distributor<IMonitoringRecord>();
intraThreadPipeFactory.create(tcpReader.getOutputPort(), distributor.getInputPort());
connectIntraThreads(tcpReader.getOutputPort(), distributor.getInputPort());
return new Pipeline<Distributor<IMonitoringRecord>>(tcpReader, distributor);
}
......@@ -125,7 +118,7 @@ public class TcpTraceReductionAnalysisWithThreadsConfiguration extends AnalysisC
clock.setIntervalDelayInMs(intervalDelayInMs);
final Distributor<Long> distributor = new Distributor<Long>();
intraThreadPipeFactory.create(clock.getOutputPort(), distributor.getInputPort());
connectIntraThreads(clock.getOutputPort(), distributor.getInputPort());
return new Pipeline<Distributor<Long>>(clock, distributor);
}
......@@ -182,23 +175,23 @@ public class TcpTraceReductionAnalysisWithThreadsConfiguration extends AnalysisC
Sink<TraceAggregationBuffer> endStage = new Sink<TraceAggregationBuffer>();
// connect stages
final IPipe pipe = interThreadPipeFactory.create(tcpPipeline.getLastStage().getNewOutputPort(), relay.getInputPort(), TCP_RELAY_MAX_SIZE);
final IPipe pipe = connectBoundedInterThreads(tcpPipeline.getLastStage().getNewOutputPort(), relay.getInputPort(), TCP_RELAY_MAX_SIZE);
this.tcpRelayPipes.add((IMonitorablePipe) pipe);
intraThreadPipeFactory.create(relay.getOutputPort(), recordCounter.getInputPort());
intraThreadPipeFactory.create(recordCounter.getOutputPort(), traceMetadataCounter.getInputPort());
intraThreadPipeFactory.create(traceMetadataCounter.getOutputPort(), instanceOfFilter.getInputPort());
intraThreadPipeFactory.create(instanceOfFilter.getMatchedOutputPort(), traceReconstructionFilter.getInputPort());
intraThreadPipeFactory.create(traceReconstructionFilter.getTraceValidOutputPort(), traceReductionFilter.getInputPort());
intraThreadPipeFactory.create(traceReductionFilter.getOutputPort(), traceCounter.getInputPort());
intraThreadPipeFactory.create(traceCounter.getOutputPort(), traceThroughputFilter.getInputPort());
intraThreadPipeFactory.create(traceThroughputFilter.getOutputPort(), endStage.getInputPort());
connectIntraThreads(relay.getOutputPort(), recordCounter.getInputPort());
connectIntraThreads(recordCounter.getOutputPort(), traceMetadataCounter.getInputPort());
connectIntraThreads(traceMetadataCounter.getOutputPort(), instanceOfFilter.getInputPort());
connectIntraThreads(instanceOfFilter.getMatchedOutputPort(), traceReconstructionFilter.getInputPort());
connectIntraThreads(traceReconstructionFilter.getTraceValidOutputPort(), traceReductionFilter.getInputPort());
connectIntraThreads(traceReductionFilter.getOutputPort(), traceCounter.getInputPort());
connectIntraThreads(traceCounter.getOutputPort(), traceThroughputFilter.getInputPort());
connectIntraThreads(traceThroughputFilter.getOutputPort(), endStage.getInputPort());
// intraThreadPipeFactory.create(traceReconstructionFilter.getOutputPort(), traceThroughputFilter.getInputPort());
// intraThreadPipeFactory.create(traceThroughputFilter.getOutputPort(), endStage.getInputPort());
// connectIntraThreads(traceReconstructionFilter.getOutputPort(), traceThroughputFilter.getInputPort());
// connectIntraThreads(traceThroughputFilter.getOutputPort(), endStage.getInputPort());
interThreadPipeFactory.create(clock2Stage.getLastStage().getNewOutputPort(), traceReductionFilter.getTriggerInputPort(), 10);
interThreadPipeFactory.create(clockStage.getLastStage().getNewOutputPort(), traceThroughputFilter.getTriggerInputPort(), 10);
connectBoundedInterThreads(clock2Stage.getLastStage().getNewOutputPort(), traceReductionFilter.getTriggerInputPort(), 10);
connectBoundedInterThreads(clockStage.getLastStage().getNewOutputPort(), traceThroughputFilter.getTriggerInputPort(), 10);
return relay;
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment