diff --git a/src/main/java/kieker/diagnosis/model/importer/stages/OperationCallHandlerComposite.java b/src/main/java/kieker/diagnosis/model/importer/stages/OperationCallHandlerComposite.java index f11321d3a4a0a106721e5f9c57cc4376a1032e62..5b89e95624e7fcbe8b920ba8ddd5f4f57eba3258 100644 --- a/src/main/java/kieker/diagnosis/model/importer/stages/OperationCallHandlerComposite.java +++ b/src/main/java/kieker/diagnosis/model/importer/stages/OperationCallHandlerComposite.java @@ -26,10 +26,6 @@ import kieker.diagnosis.domain.Trace; import teetime.framework.CompositeStage; import teetime.framework.InputPort; import teetime.framework.Stage; -import teetime.framework.pipe.IPipeFactory; -import teetime.framework.pipe.PipeFactoryRegistry; -import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; -import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; import teetime.stage.CollectorSink; import teetime.stage.basic.distributor.CopyByReferenceStrategy; import teetime.stage.basic.distributor.Distributor; @@ -58,17 +54,15 @@ public final class OperationCallHandlerComposite extends CompositeStage { this.inputPort = this.operationCallExtractor.getInputPort(); - // Connect the stages - final IPipeFactory pipeFactory = PipeFactoryRegistry.INSTANCE.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false); - pipeFactory.create(this.operationCallExtractor.getOutputPort(), distributor1.getInputPort()); - pipeFactory.create(distributor1.getNewOutputPort(), this.callCollector.getInputPort()); - pipeFactory.create(distributor1.getNewOutputPort(), failedCallFilter.getInputPort()); - pipeFactory.create(distributor1.getNewOutputPort(), callAggregator.getInputPort()); - pipeFactory.create(callAggregator.getOutputPort(), distributor2.getInputPort()); - pipeFactory.create(distributor2.getNewOutputPort(), this.aggCallCollector.getInputPort()); - pipeFactory.create(distributor2.getNewOutputPort(), aggFailedCallFilter.getInputPort()); - pipeFactory.create(aggFailedCallFilter.getOutputPort(), this.aggFailedCallCollector.getInputPort()); - pipeFactory.create(failedCallFilter.getOutputPort(), this.failedCallCollector.getInputPort()); + super.connectStages(this.operationCallExtractor.getOutputPort(), distributor1.getInputPort()); + super.connectStages(distributor1.getNewOutputPort(), this.callCollector.getInputPort()); + super.connectStages(distributor1.getNewOutputPort(), failedCallFilter.getInputPort()); + super.connectStages(distributor1.getNewOutputPort(), callAggregator.getInputPort()); + super.connectStages(callAggregator.getOutputPort(), distributor2.getInputPort()); + super.connectStages(distributor2.getNewOutputPort(), this.aggCallCollector.getInputPort()); + super.connectStages(distributor2.getNewOutputPort(), aggFailedCallFilter.getInputPort()); + super.connectStages(aggFailedCallFilter.getOutputPort(), this.aggFailedCallCollector.getInputPort()); + super.connectStages(failedCallFilter.getOutputPort(), this.failedCallCollector.getInputPort()); } public InputPort<Trace> getInputPort() { diff --git a/src/main/java/kieker/diagnosis/model/importer/stages/ReadingComposite.java b/src/main/java/kieker/diagnosis/model/importer/stages/ReadingComposite.java index 0d1a16839d927e30d3b0b88de182bd41ba479406..dd5065f827799a11373e239a97af95522103d869 100644 --- a/src/main/java/kieker/diagnosis/model/importer/stages/ReadingComposite.java +++ b/src/main/java/kieker/diagnosis/model/importer/stages/ReadingComposite.java @@ -24,10 +24,6 @@ import kieker.common.record.IMonitoringRecord; import teetime.framework.CompositeStage; import teetime.framework.OutputPort; import teetime.framework.Stage; -import teetime.framework.pipe.IPipeFactory; -import teetime.framework.pipe.PipeFactoryRegistry; -import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; -import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; import teetime.stage.InitialElementProducer; import teetime.stage.className.ClassNameRegistryRepository; import teetime.stage.io.filesystem.Dir2RecordsFilter; @@ -46,8 +42,7 @@ public final class ReadingComposite extends CompositeStage { this.producer = new InitialElementProducer<>(importDirectory); this.reader = new Dir2RecordsFilter(new ClassNameRegistryRepository()); - final IPipeFactory pipeFactory = PipeFactoryRegistry.INSTANCE.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false); - pipeFactory.create(this.producer.getOutputPort(), this.reader.getInputPort()); + super.connectStages(this.producer.getOutputPort(), this.reader.getInputPort()); } public OutputPort<IMonitoringRecord> getOutputPort() { diff --git a/src/main/java/kieker/diagnosis/model/importer/stages/TraceAggregationComposite.java b/src/main/java/kieker/diagnosis/model/importer/stages/TraceAggregationComposite.java index 1a656a2cda482f2c5cf6b8e2cbdbbc0a18550ee2..9fdf1c08da9ec8b5408f3504e53358717b4db04b 100644 --- a/src/main/java/kieker/diagnosis/model/importer/stages/TraceAggregationComposite.java +++ b/src/main/java/kieker/diagnosis/model/importer/stages/TraceAggregationComposite.java @@ -25,10 +25,6 @@ import kieker.diagnosis.domain.Trace; import teetime.framework.CompositeStage; import teetime.framework.InputPort; import teetime.framework.Stage; -import teetime.framework.pipe.IPipeFactory; -import teetime.framework.pipe.PipeFactoryRegistry; -import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; -import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; import teetime.stage.CollectorSink; import teetime.stage.basic.distributor.CopyByReferenceStrategy; import teetime.stage.basic.distributor.Distributor; @@ -47,7 +43,8 @@ public final class TraceAggregationComposite extends CompositeStage { private final CollectorSink<AggregatedTrace> failureContainingTracesCollector; private final AggregatedTraceStatisticsDecorator statisticsDecorator; - public TraceAggregationComposite(final List<AggregatedTrace> traces, final List<AggregatedTrace> failedTraces, final List<AggregatedTrace> failureContainingTraces) { + public TraceAggregationComposite(final List<AggregatedTrace> traces, final List<AggregatedTrace> failedTraces, + final List<AggregatedTrace> failureContainingTraces) { this.aggregator = new TraceAggregator(); this.statisticsDecorator = new AggregatedTraceStatisticsDecorator(); @@ -59,16 +56,15 @@ public final class TraceAggregationComposite extends CompositeStage { this.failedTracesCollector = new CollectorSink<>(failedTraces); this.failureContainingTracesCollector = new CollectorSink<>(failureContainingTraces); - final IPipeFactory pipeFactory = PipeFactoryRegistry.INSTANCE.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false); - pipeFactory.create(this.aggregator.getOutputPort(), this.statisticsDecorator.getInputPort()); - pipeFactory.create(this.statisticsDecorator.getOutputPort(), distributor.getInputPort()); + super.connectStages(this.aggregator.getOutputPort(), this.statisticsDecorator.getInputPort()); + super.connectStages(this.statisticsDecorator.getOutputPort(), distributor.getInputPort()); - pipeFactory.create(distributor.getNewOutputPort(), this.tracesCollector.getInputPort()); - pipeFactory.create(distributor.getNewOutputPort(), failedTraceFilter.getInputPort()); - pipeFactory.create(distributor.getNewOutputPort(), failureContainingTraceFilter.getInputPort()); + super.connectStages(distributor.getNewOutputPort(), this.tracesCollector.getInputPort()); + super.connectStages(distributor.getNewOutputPort(), failedTraceFilter.getInputPort()); + super.connectStages(distributor.getNewOutputPort(), failureContainingTraceFilter.getInputPort()); - pipeFactory.create(failedTraceFilter.getOutputPort(), this.failedTracesCollector.getInputPort()); - pipeFactory.create(failureContainingTraceFilter.getOutputPort(), this.failureContainingTracesCollector.getInputPort()); + super.connectStages(failedTraceFilter.getOutputPort(), this.failedTracesCollector.getInputPort()); + super.connectStages(failureContainingTraceFilter.getOutputPort(), this.failureContainingTracesCollector.getInputPort()); } public InputPort<Trace> getInputPort() { diff --git a/src/main/java/kieker/diagnosis/model/importer/stages/TraceReconstructionComposite.java b/src/main/java/kieker/diagnosis/model/importer/stages/TraceReconstructionComposite.java index f8bcc02486bb0880d0e107ed3ef06f2a7e6f735a..be048330e28d5a8c959ad49ee4bcee15467ac05d 100644 --- a/src/main/java/kieker/diagnosis/model/importer/stages/TraceReconstructionComposite.java +++ b/src/main/java/kieker/diagnosis/model/importer/stages/TraceReconstructionComposite.java @@ -28,10 +28,6 @@ import teetime.framework.CompositeStage; import teetime.framework.InputPort; import teetime.framework.OutputPort; import teetime.framework.Stage; -import teetime.framework.pipe.IPipeFactory; -import teetime.framework.pipe.PipeFactoryRegistry; -import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; -import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; import teetime.stage.CollectorSink; import teetime.stage.MultipleInstanceOfFilter; import teetime.stage.basic.distributor.CopyByReferenceStrategy; @@ -69,19 +65,17 @@ public final class TraceReconstructionComposite extends CompositeStage { this.outputPort = this.statisticsDecorator.getOutputPort(); - final IPipeFactory pipeFactory = PipeFactoryRegistry.INSTANCE.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false); - - pipeFactory.create(this.typeFilter.getOutputPortForType(IFlowRecord.class), reconstructor.getInputPort()); - pipeFactory.create(this.typeFilter.getOutputPortForType(OperationExecutionRecord.class), legacyReconstructor.getInputPort()); - pipeFactory.create(reconstructor.getOutputPort(), merger.getNewInputPort()); - pipeFactory.create(legacyReconstructor.getOutputPort(), merger.getNewInputPort()); - pipeFactory.create(merger.getOutputPort(), distributor.getInputPort()); - pipeFactory.create(distributor.getNewOutputPort(), this.tracesCollector.getInputPort()); - pipeFactory.create(distributor.getNewOutputPort(), failedTraceFilter.getInputPort()); - pipeFactory.create(distributor.getNewOutputPort(), failureContainingTraceFilter.getInputPort()); - pipeFactory.create(distributor.getNewOutputPort(), this.statisticsDecorator.getInputPort()); - pipeFactory.create(failedTraceFilter.getOutputPort(), this.failedTracesCollector.getInputPort()); - pipeFactory.create(failureContainingTraceFilter.getOutputPort(), this.failureContainingTracesCollector.getInputPort()); + super.connectStages(this.typeFilter.getOutputPortForType(IFlowRecord.class), reconstructor.getInputPort()); + super.connectStages(this.typeFilter.getOutputPortForType(OperationExecutionRecord.class), legacyReconstructor.getInputPort()); + super.connectStages(reconstructor.getOutputPort(), merger.getNewInputPort()); + super.connectStages(legacyReconstructor.getOutputPort(), merger.getNewInputPort()); + super.connectStages(merger.getOutputPort(), distributor.getInputPort()); + super.connectStages(distributor.getNewOutputPort(), this.tracesCollector.getInputPort()); + super.connectStages(distributor.getNewOutputPort(), failedTraceFilter.getInputPort()); + super.connectStages(distributor.getNewOutputPort(), failureContainingTraceFilter.getInputPort()); + super.connectStages(distributor.getNewOutputPort(), this.statisticsDecorator.getInputPort()); + super.connectStages(failedTraceFilter.getOutputPort(), this.failedTracesCollector.getInputPort()); + super.connectStages(failureContainingTraceFilter.getOutputPort(), this.failureContainingTracesCollector.getInputPort()); } public InputPort<IMonitoringRecord> getInputPort() {