Skip to content
Snippets Groups Projects
Commit 0d1b1e0e authored by Nils Christian Ehmke's avatar Nils Christian Ehmke
Browse files

Modified the composite stages by using the new methods from the TeeTime API

parent 9aa7a6f5
No related branches found
No related tags found
No related merge requests found
......@@ -26,10 +26,6 @@ import kieker.diagnosis.domain.Trace;
import teetime.framework.CompositeStage;
import teetime.framework.InputPort;
import teetime.framework.Stage;
import teetime.framework.pipe.IPipeFactory;
import teetime.framework.pipe.PipeFactoryRegistry;
import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
import teetime.stage.CollectorSink;
import teetime.stage.basic.distributor.CopyByReferenceStrategy;
import teetime.stage.basic.distributor.Distributor;
......@@ -58,17 +54,15 @@ public final class OperationCallHandlerComposite extends CompositeStage {
this.inputPort = this.operationCallExtractor.getInputPort();
// Connect the stages
final IPipeFactory pipeFactory = PipeFactoryRegistry.INSTANCE.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false);
pipeFactory.create(this.operationCallExtractor.getOutputPort(), distributor1.getInputPort());
pipeFactory.create(distributor1.getNewOutputPort(), this.callCollector.getInputPort());
pipeFactory.create(distributor1.getNewOutputPort(), failedCallFilter.getInputPort());
pipeFactory.create(distributor1.getNewOutputPort(), callAggregator.getInputPort());
pipeFactory.create(callAggregator.getOutputPort(), distributor2.getInputPort());
pipeFactory.create(distributor2.getNewOutputPort(), this.aggCallCollector.getInputPort());
pipeFactory.create(distributor2.getNewOutputPort(), aggFailedCallFilter.getInputPort());
pipeFactory.create(aggFailedCallFilter.getOutputPort(), this.aggFailedCallCollector.getInputPort());
pipeFactory.create(failedCallFilter.getOutputPort(), this.failedCallCollector.getInputPort());
super.connectStages(this.operationCallExtractor.getOutputPort(), distributor1.getInputPort());
super.connectStages(distributor1.getNewOutputPort(), this.callCollector.getInputPort());
super.connectStages(distributor1.getNewOutputPort(), failedCallFilter.getInputPort());
super.connectStages(distributor1.getNewOutputPort(), callAggregator.getInputPort());
super.connectStages(callAggregator.getOutputPort(), distributor2.getInputPort());
super.connectStages(distributor2.getNewOutputPort(), this.aggCallCollector.getInputPort());
super.connectStages(distributor2.getNewOutputPort(), aggFailedCallFilter.getInputPort());
super.connectStages(aggFailedCallFilter.getOutputPort(), this.aggFailedCallCollector.getInputPort());
super.connectStages(failedCallFilter.getOutputPort(), this.failedCallCollector.getInputPort());
}
public InputPort<Trace> getInputPort() {
......
......@@ -24,10 +24,6 @@ import kieker.common.record.IMonitoringRecord;
import teetime.framework.CompositeStage;
import teetime.framework.OutputPort;
import teetime.framework.Stage;
import teetime.framework.pipe.IPipeFactory;
import teetime.framework.pipe.PipeFactoryRegistry;
import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
import teetime.stage.InitialElementProducer;
import teetime.stage.className.ClassNameRegistryRepository;
import teetime.stage.io.filesystem.Dir2RecordsFilter;
......@@ -46,8 +42,7 @@ public final class ReadingComposite extends CompositeStage {
this.producer = new InitialElementProducer<>(importDirectory);
this.reader = new Dir2RecordsFilter(new ClassNameRegistryRepository());
final IPipeFactory pipeFactory = PipeFactoryRegistry.INSTANCE.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false);
pipeFactory.create(this.producer.getOutputPort(), this.reader.getInputPort());
super.connectStages(this.producer.getOutputPort(), this.reader.getInputPort());
}
public OutputPort<IMonitoringRecord> getOutputPort() {
......
......@@ -25,10 +25,6 @@ import kieker.diagnosis.domain.Trace;
import teetime.framework.CompositeStage;
import teetime.framework.InputPort;
import teetime.framework.Stage;
import teetime.framework.pipe.IPipeFactory;
import teetime.framework.pipe.PipeFactoryRegistry;
import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
import teetime.stage.CollectorSink;
import teetime.stage.basic.distributor.CopyByReferenceStrategy;
import teetime.stage.basic.distributor.Distributor;
......@@ -47,7 +43,8 @@ public final class TraceAggregationComposite extends CompositeStage {
private final CollectorSink<AggregatedTrace> failureContainingTracesCollector;
private final AggregatedTraceStatisticsDecorator statisticsDecorator;
public TraceAggregationComposite(final List<AggregatedTrace> traces, final List<AggregatedTrace> failedTraces, final List<AggregatedTrace> failureContainingTraces) {
public TraceAggregationComposite(final List<AggregatedTrace> traces, final List<AggregatedTrace> failedTraces,
final List<AggregatedTrace> failureContainingTraces) {
this.aggregator = new TraceAggregator();
this.statisticsDecorator = new AggregatedTraceStatisticsDecorator();
......@@ -59,16 +56,15 @@ public final class TraceAggregationComposite extends CompositeStage {
this.failedTracesCollector = new CollectorSink<>(failedTraces);
this.failureContainingTracesCollector = new CollectorSink<>(failureContainingTraces);
final IPipeFactory pipeFactory = PipeFactoryRegistry.INSTANCE.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false);
pipeFactory.create(this.aggregator.getOutputPort(), this.statisticsDecorator.getInputPort());
pipeFactory.create(this.statisticsDecorator.getOutputPort(), distributor.getInputPort());
super.connectStages(this.aggregator.getOutputPort(), this.statisticsDecorator.getInputPort());
super.connectStages(this.statisticsDecorator.getOutputPort(), distributor.getInputPort());
pipeFactory.create(distributor.getNewOutputPort(), this.tracesCollector.getInputPort());
pipeFactory.create(distributor.getNewOutputPort(), failedTraceFilter.getInputPort());
pipeFactory.create(distributor.getNewOutputPort(), failureContainingTraceFilter.getInputPort());
super.connectStages(distributor.getNewOutputPort(), this.tracesCollector.getInputPort());
super.connectStages(distributor.getNewOutputPort(), failedTraceFilter.getInputPort());
super.connectStages(distributor.getNewOutputPort(), failureContainingTraceFilter.getInputPort());
pipeFactory.create(failedTraceFilter.getOutputPort(), this.failedTracesCollector.getInputPort());
pipeFactory.create(failureContainingTraceFilter.getOutputPort(), this.failureContainingTracesCollector.getInputPort());
super.connectStages(failedTraceFilter.getOutputPort(), this.failedTracesCollector.getInputPort());
super.connectStages(failureContainingTraceFilter.getOutputPort(), this.failureContainingTracesCollector.getInputPort());
}
public InputPort<Trace> getInputPort() {
......
......@@ -28,10 +28,6 @@ import teetime.framework.CompositeStage;
import teetime.framework.InputPort;
import teetime.framework.OutputPort;
import teetime.framework.Stage;
import teetime.framework.pipe.IPipeFactory;
import teetime.framework.pipe.PipeFactoryRegistry;
import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
import teetime.stage.CollectorSink;
import teetime.stage.MultipleInstanceOfFilter;
import teetime.stage.basic.distributor.CopyByReferenceStrategy;
......@@ -69,19 +65,17 @@ public final class TraceReconstructionComposite extends CompositeStage {
this.outputPort = this.statisticsDecorator.getOutputPort();
final IPipeFactory pipeFactory = PipeFactoryRegistry.INSTANCE.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false);
pipeFactory.create(this.typeFilter.getOutputPortForType(IFlowRecord.class), reconstructor.getInputPort());
pipeFactory.create(this.typeFilter.getOutputPortForType(OperationExecutionRecord.class), legacyReconstructor.getInputPort());
pipeFactory.create(reconstructor.getOutputPort(), merger.getNewInputPort());
pipeFactory.create(legacyReconstructor.getOutputPort(), merger.getNewInputPort());
pipeFactory.create(merger.getOutputPort(), distributor.getInputPort());
pipeFactory.create(distributor.getNewOutputPort(), this.tracesCollector.getInputPort());
pipeFactory.create(distributor.getNewOutputPort(), failedTraceFilter.getInputPort());
pipeFactory.create(distributor.getNewOutputPort(), failureContainingTraceFilter.getInputPort());
pipeFactory.create(distributor.getNewOutputPort(), this.statisticsDecorator.getInputPort());
pipeFactory.create(failedTraceFilter.getOutputPort(), this.failedTracesCollector.getInputPort());
pipeFactory.create(failureContainingTraceFilter.getOutputPort(), this.failureContainingTracesCollector.getInputPort());
super.connectStages(this.typeFilter.getOutputPortForType(IFlowRecord.class), reconstructor.getInputPort());
super.connectStages(this.typeFilter.getOutputPortForType(OperationExecutionRecord.class), legacyReconstructor.getInputPort());
super.connectStages(reconstructor.getOutputPort(), merger.getNewInputPort());
super.connectStages(legacyReconstructor.getOutputPort(), merger.getNewInputPort());
super.connectStages(merger.getOutputPort(), distributor.getInputPort());
super.connectStages(distributor.getNewOutputPort(), this.tracesCollector.getInputPort());
super.connectStages(distributor.getNewOutputPort(), failedTraceFilter.getInputPort());
super.connectStages(distributor.getNewOutputPort(), failureContainingTraceFilter.getInputPort());
super.connectStages(distributor.getNewOutputPort(), this.statisticsDecorator.getInputPort());
super.connectStages(failedTraceFilter.getOutputPort(), this.failedTracesCollector.getInputPort());
super.connectStages(failureContainingTraceFilter.getOutputPort(), this.failureContainingTracesCollector.getInputPort());
}
public InputPort<IMonitoringRecord> getInputPort() {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment