From 0d1b1e0eca8f9be316330d170e5f216aaf310403 Mon Sep 17 00:00:00 2001
From: Nils Christian Ehmke <nils@rhocas.de>
Date: Fri, 27 Feb 2015 12:46:49 +0100
Subject: [PATCH] Modified the composite stages by using the new methods from
 the TeeTime API

---
 .../stages/OperationCallHandlerComposite.java | 24 ++++++----------
 .../importer/stages/ReadingComposite.java     |  7 +----
 .../stages/TraceAggregationComposite.java     | 22 ++++++---------
 .../stages/TraceReconstructionComposite.java  | 28 ++++++++-----------
 4 files changed, 30 insertions(+), 51 deletions(-)

diff --git a/src/main/java/kieker/diagnosis/model/importer/stages/OperationCallHandlerComposite.java b/src/main/java/kieker/diagnosis/model/importer/stages/OperationCallHandlerComposite.java
index f11321d3..5b89e956 100644
--- a/src/main/java/kieker/diagnosis/model/importer/stages/OperationCallHandlerComposite.java
+++ b/src/main/java/kieker/diagnosis/model/importer/stages/OperationCallHandlerComposite.java
@@ -26,10 +26,6 @@ import kieker.diagnosis.domain.Trace;
 import teetime.framework.CompositeStage;
 import teetime.framework.InputPort;
 import teetime.framework.Stage;
-import teetime.framework.pipe.IPipeFactory;
-import teetime.framework.pipe.PipeFactoryRegistry;
-import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
-import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
 import teetime.stage.CollectorSink;
 import teetime.stage.basic.distributor.CopyByReferenceStrategy;
 import teetime.stage.basic.distributor.Distributor;
@@ -58,17 +54,15 @@ public final class OperationCallHandlerComposite extends CompositeStage {
 
 		this.inputPort = this.operationCallExtractor.getInputPort();
 
-		// Connect the stages
-		final IPipeFactory pipeFactory = PipeFactoryRegistry.INSTANCE.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false);
-		pipeFactory.create(this.operationCallExtractor.getOutputPort(), distributor1.getInputPort());
-		pipeFactory.create(distributor1.getNewOutputPort(), this.callCollector.getInputPort());
-		pipeFactory.create(distributor1.getNewOutputPort(), failedCallFilter.getInputPort());
-		pipeFactory.create(distributor1.getNewOutputPort(), callAggregator.getInputPort());
-		pipeFactory.create(callAggregator.getOutputPort(), distributor2.getInputPort());
-		pipeFactory.create(distributor2.getNewOutputPort(), this.aggCallCollector.getInputPort());
-		pipeFactory.create(distributor2.getNewOutputPort(), aggFailedCallFilter.getInputPort());
-		pipeFactory.create(aggFailedCallFilter.getOutputPort(), this.aggFailedCallCollector.getInputPort());
-		pipeFactory.create(failedCallFilter.getOutputPort(), this.failedCallCollector.getInputPort());
+		super.connectStages(this.operationCallExtractor.getOutputPort(), distributor1.getInputPort());
+		super.connectStages(distributor1.getNewOutputPort(), this.callCollector.getInputPort());
+		super.connectStages(distributor1.getNewOutputPort(), failedCallFilter.getInputPort());
+		super.connectStages(distributor1.getNewOutputPort(), callAggregator.getInputPort());
+		super.connectStages(callAggregator.getOutputPort(), distributor2.getInputPort());
+		super.connectStages(distributor2.getNewOutputPort(), this.aggCallCollector.getInputPort());
+		super.connectStages(distributor2.getNewOutputPort(), aggFailedCallFilter.getInputPort());
+		super.connectStages(aggFailedCallFilter.getOutputPort(), this.aggFailedCallCollector.getInputPort());
+		super.connectStages(failedCallFilter.getOutputPort(), this.failedCallCollector.getInputPort());
 	}
 
 	public InputPort<Trace> getInputPort() {
diff --git a/src/main/java/kieker/diagnosis/model/importer/stages/ReadingComposite.java b/src/main/java/kieker/diagnosis/model/importer/stages/ReadingComposite.java
index 0d1a1683..dd5065f8 100644
--- a/src/main/java/kieker/diagnosis/model/importer/stages/ReadingComposite.java
+++ b/src/main/java/kieker/diagnosis/model/importer/stages/ReadingComposite.java
@@ -24,10 +24,6 @@ import kieker.common.record.IMonitoringRecord;
 import teetime.framework.CompositeStage;
 import teetime.framework.OutputPort;
 import teetime.framework.Stage;
-import teetime.framework.pipe.IPipeFactory;
-import teetime.framework.pipe.PipeFactoryRegistry;
-import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
-import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
 import teetime.stage.InitialElementProducer;
 import teetime.stage.className.ClassNameRegistryRepository;
 import teetime.stage.io.filesystem.Dir2RecordsFilter;
@@ -46,8 +42,7 @@ public final class ReadingComposite extends CompositeStage {
 		this.producer = new InitialElementProducer<>(importDirectory);
 		this.reader = new Dir2RecordsFilter(new ClassNameRegistryRepository());
 
-		final IPipeFactory pipeFactory = PipeFactoryRegistry.INSTANCE.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false);
-		pipeFactory.create(this.producer.getOutputPort(), this.reader.getInputPort());
+		super.connectStages(this.producer.getOutputPort(), this.reader.getInputPort());
 	}
 
 	public OutputPort<IMonitoringRecord> getOutputPort() {
diff --git a/src/main/java/kieker/diagnosis/model/importer/stages/TraceAggregationComposite.java b/src/main/java/kieker/diagnosis/model/importer/stages/TraceAggregationComposite.java
index 1a656a2c..9fdf1c08 100644
--- a/src/main/java/kieker/diagnosis/model/importer/stages/TraceAggregationComposite.java
+++ b/src/main/java/kieker/diagnosis/model/importer/stages/TraceAggregationComposite.java
@@ -25,10 +25,6 @@ import kieker.diagnosis.domain.Trace;
 import teetime.framework.CompositeStage;
 import teetime.framework.InputPort;
 import teetime.framework.Stage;
-import teetime.framework.pipe.IPipeFactory;
-import teetime.framework.pipe.PipeFactoryRegistry;
-import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
-import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
 import teetime.stage.CollectorSink;
 import teetime.stage.basic.distributor.CopyByReferenceStrategy;
 import teetime.stage.basic.distributor.Distributor;
@@ -47,7 +43,8 @@ public final class TraceAggregationComposite extends CompositeStage {
 	private final CollectorSink<AggregatedTrace> failureContainingTracesCollector;
 	private final AggregatedTraceStatisticsDecorator statisticsDecorator;
 
-	public TraceAggregationComposite(final List<AggregatedTrace> traces, final List<AggregatedTrace> failedTraces, final List<AggregatedTrace> failureContainingTraces) {
+	public TraceAggregationComposite(final List<AggregatedTrace> traces, final List<AggregatedTrace> failedTraces,
+			final List<AggregatedTrace> failureContainingTraces) {
 		this.aggregator = new TraceAggregator();
 		this.statisticsDecorator = new AggregatedTraceStatisticsDecorator();
 
@@ -59,16 +56,15 @@ public final class TraceAggregationComposite extends CompositeStage {
 		this.failedTracesCollector = new CollectorSink<>(failedTraces);
 		this.failureContainingTracesCollector = new CollectorSink<>(failureContainingTraces);
 
-		final IPipeFactory pipeFactory = PipeFactoryRegistry.INSTANCE.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false);
-		pipeFactory.create(this.aggregator.getOutputPort(), this.statisticsDecorator.getInputPort());
-		pipeFactory.create(this.statisticsDecorator.getOutputPort(), distributor.getInputPort());
+		super.connectStages(this.aggregator.getOutputPort(), this.statisticsDecorator.getInputPort());
+		super.connectStages(this.statisticsDecorator.getOutputPort(), distributor.getInputPort());
 
-		pipeFactory.create(distributor.getNewOutputPort(), this.tracesCollector.getInputPort());
-		pipeFactory.create(distributor.getNewOutputPort(), failedTraceFilter.getInputPort());
-		pipeFactory.create(distributor.getNewOutputPort(), failureContainingTraceFilter.getInputPort());
+		super.connectStages(distributor.getNewOutputPort(), this.tracesCollector.getInputPort());
+		super.connectStages(distributor.getNewOutputPort(), failedTraceFilter.getInputPort());
+		super.connectStages(distributor.getNewOutputPort(), failureContainingTraceFilter.getInputPort());
 
-		pipeFactory.create(failedTraceFilter.getOutputPort(), this.failedTracesCollector.getInputPort());
-		pipeFactory.create(failureContainingTraceFilter.getOutputPort(), this.failureContainingTracesCollector.getInputPort());
+		super.connectStages(failedTraceFilter.getOutputPort(), this.failedTracesCollector.getInputPort());
+		super.connectStages(failureContainingTraceFilter.getOutputPort(), this.failureContainingTracesCollector.getInputPort());
 	}
 
 	public InputPort<Trace> getInputPort() {
diff --git a/src/main/java/kieker/diagnosis/model/importer/stages/TraceReconstructionComposite.java b/src/main/java/kieker/diagnosis/model/importer/stages/TraceReconstructionComposite.java
index f8bcc024..be048330 100644
--- a/src/main/java/kieker/diagnosis/model/importer/stages/TraceReconstructionComposite.java
+++ b/src/main/java/kieker/diagnosis/model/importer/stages/TraceReconstructionComposite.java
@@ -28,10 +28,6 @@ import teetime.framework.CompositeStage;
 import teetime.framework.InputPort;
 import teetime.framework.OutputPort;
 import teetime.framework.Stage;
-import teetime.framework.pipe.IPipeFactory;
-import teetime.framework.pipe.PipeFactoryRegistry;
-import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
-import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
 import teetime.stage.CollectorSink;
 import teetime.stage.MultipleInstanceOfFilter;
 import teetime.stage.basic.distributor.CopyByReferenceStrategy;
@@ -69,19 +65,17 @@ public final class TraceReconstructionComposite extends CompositeStage {
 
 		this.outputPort = this.statisticsDecorator.getOutputPort();
 
-		final IPipeFactory pipeFactory = PipeFactoryRegistry.INSTANCE.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false);
-
-		pipeFactory.create(this.typeFilter.getOutputPortForType(IFlowRecord.class), reconstructor.getInputPort());
-		pipeFactory.create(this.typeFilter.getOutputPortForType(OperationExecutionRecord.class), legacyReconstructor.getInputPort());
-		pipeFactory.create(reconstructor.getOutputPort(), merger.getNewInputPort());
-		pipeFactory.create(legacyReconstructor.getOutputPort(), merger.getNewInputPort());
-		pipeFactory.create(merger.getOutputPort(), distributor.getInputPort());
-		pipeFactory.create(distributor.getNewOutputPort(), this.tracesCollector.getInputPort());
-		pipeFactory.create(distributor.getNewOutputPort(), failedTraceFilter.getInputPort());
-		pipeFactory.create(distributor.getNewOutputPort(), failureContainingTraceFilter.getInputPort());
-		pipeFactory.create(distributor.getNewOutputPort(), this.statisticsDecorator.getInputPort());
-		pipeFactory.create(failedTraceFilter.getOutputPort(), this.failedTracesCollector.getInputPort());
-		pipeFactory.create(failureContainingTraceFilter.getOutputPort(), this.failureContainingTracesCollector.getInputPort());
+		super.connectStages(this.typeFilter.getOutputPortForType(IFlowRecord.class), reconstructor.getInputPort());
+		super.connectStages(this.typeFilter.getOutputPortForType(OperationExecutionRecord.class), legacyReconstructor.getInputPort());
+		super.connectStages(reconstructor.getOutputPort(), merger.getNewInputPort());
+		super.connectStages(legacyReconstructor.getOutputPort(), merger.getNewInputPort());
+		super.connectStages(merger.getOutputPort(), distributor.getInputPort());
+		super.connectStages(distributor.getNewOutputPort(), this.tracesCollector.getInputPort());
+		super.connectStages(distributor.getNewOutputPort(), failedTraceFilter.getInputPort());
+		super.connectStages(distributor.getNewOutputPort(), failureContainingTraceFilter.getInputPort());
+		super.connectStages(distributor.getNewOutputPort(), this.statisticsDecorator.getInputPort());
+		super.connectStages(failedTraceFilter.getOutputPort(), this.failedTracesCollector.getInputPort());
+		super.connectStages(failureContainingTraceFilter.getOutputPort(), this.failureContainingTracesCollector.getInputPort());
 	}
 
 	public InputPort<IMonitoringRecord> getInputPort() {
-- 
GitLab