From 615301a1f16de497a970afaa4fc4992b3d3cbebb Mon Sep 17 00:00:00 2001
From: Nils Christian Ehmke <nie@informatik.uni-kiel.de>
Date: Thu, 4 Dec 2014 12:16:48 +0100
Subject: [PATCH] Modified some code for the API changes in TeeTime.

---
 .../ClassNameRegistryCreationFilter.java      |  6 +--
 .../explorviz/KiekerRecordTcpReader.java      |  4 +-
 .../teetime/stage/io/database/DbReader.java   |  4 +-
 .../io/filesystem/Dir2RecordsFilter.java      | 19 +++++---
 .../binary/DirWithBin2RecordFilter.java       | 22 ++++++----
 .../binary/file/BinaryFile2RecordFilter.java  |  4 +-
 .../format/text/DirWithDat2RecordFilter.java  | 23 ++++++----
 .../text/file/DatFile2RecordFilter.java       | 24 +++++++----
 .../file/TextLine2MappingRegistryFilter.java  |  6 +--
 .../text/file/TextLine2RecordFilter.java      |  4 +-
 .../stringBuffer/StringBufferFilter.java      |  4 +-
 .../TraceReconstructionFilter.java            |  4 +-
 .../traceReduction/TraceReductionFilter.java  |  4 +-
 src/main/java/util/KiekerLoadDriver.java      | 13 ++----
 .../examples/kiekerdays/TcpTraceLogging.java  |  6 +--
 .../kiekerdays/TcpTraceLoggingExplorviz.java  | 11 ++---
 .../kiekerdays/TcpTraceReconstruction.java    | 23 +++-------
 .../kiekerdays/TcpTraceReduction.java         | 33 +++++---------
 .../RecordReaderConfiguration.java            |  6 +--
 .../TcpTraceLoggingExtAnalysis.java           | 23 +++-------
 .../TcpTraceReconstructionAnalysis.java       | 24 ++++-------
 .../TraceReconstructionAnalysis.java          | 12 ++----
 ...ctionAnalysisWithThreadsConfiguration.java | 43 ++++++-------------
 .../TcpTraceReductionAnalysisWithThreads.java | 41 ++++++------------
 24 files changed, 150 insertions(+), 213 deletions(-)

diff --git a/src/main/java/teetime/stage/className/ClassNameRegistryCreationFilter.java b/src/main/java/teetime/stage/className/ClassNameRegistryCreationFilter.java
index ee175d08..e8c3a29b 100644
--- a/src/main/java/teetime/stage/className/ClassNameRegistryCreationFilter.java
+++ b/src/main/java/teetime/stage/className/ClassNameRegistryCreationFilter.java
@@ -19,15 +19,15 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 
-import teetime.framework.ConsumerStage;
+import teetime.framework.AbstractConsumerStage;
 import teetime.framework.OutputPort;
 
 /**
  * @author Christian Wulf
- * 
+ *
  * @since 1.10
  */
-public class ClassNameRegistryCreationFilter extends ConsumerStage<File> {
+public class ClassNameRegistryCreationFilter extends AbstractConsumerStage<File> {
 
 	private final OutputPort<File> outputPort = this.createOutputPort();
 
diff --git a/src/main/java/teetime/stage/explorviz/KiekerRecordTcpReader.java b/src/main/java/teetime/stage/explorviz/KiekerRecordTcpReader.java
index cd0aa516..8fc450e7 100644
--- a/src/main/java/teetime/stage/explorviz/KiekerRecordTcpReader.java
+++ b/src/main/java/teetime/stage/explorviz/KiekerRecordTcpReader.java
@@ -9,7 +9,7 @@ import java.nio.channels.SocketChannel;
 import java.util.ArrayList;
 import java.util.List;
 
-import teetime.framework.ProducerStage;
+import teetime.framework.AbstractProducerStage;
 
 import kieker.common.record.IMonitoringRecord;
 import kieker.common.record.flow.trace.operation.AfterOperationEvent;
@@ -17,7 +17,7 @@ import kieker.common.record.flow.trace.operation.BeforeOperationEvent;
 import kieker.common.util.registry.ILookup;
 import kieker.common.util.registry.Lookup;
 
-public class KiekerRecordTcpReader extends ProducerStage<IMonitoringRecord> {
+public class KiekerRecordTcpReader extends AbstractProducerStage<IMonitoringRecord> {
 
 	private static final int MESSAGE_BUFFER_SIZE = 65535;
 
diff --git a/src/main/java/teetime/stage/io/database/DbReader.java b/src/main/java/teetime/stage/io/database/DbReader.java
index 076acb3d..3c3d6648 100644
--- a/src/main/java/teetime/stage/io/database/DbReader.java
+++ b/src/main/java/teetime/stage/io/database/DbReader.java
@@ -22,7 +22,7 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 
-import teetime.framework.ProducerStage;
+import teetime.framework.AbstractProducerStage;
 
 import kieker.common.exception.MonitoringRecordException;
 import kieker.common.record.AbstractMonitoringRecord;
@@ -36,7 +36,7 @@ import kieker.common.record.IMonitoringRecord;
  * @since 1.10
  */
 // @Description("A reader which reads records from a database")
-public class DbReader extends ProducerStage<IMonitoringRecord> {
+public class DbReader extends AbstractProducerStage<IMonitoringRecord> {
 
 	/**
 	 * The classname of the driver used for the connection.
diff --git a/src/main/java/teetime/stage/io/filesystem/Dir2RecordsFilter.java b/src/main/java/teetime/stage/io/filesystem/Dir2RecordsFilter.java
index d5749921..899ff1b6 100644
--- a/src/main/java/teetime/stage/io/filesystem/Dir2RecordsFilter.java
+++ b/src/main/java/teetime/stage/io/filesystem/Dir2RecordsFilter.java
@@ -17,8 +17,8 @@ package teetime.stage.io.filesystem;
 
 import java.io.File;
 
+import teetime.framework.AbstractStage;
 import teetime.framework.InputPort;
-import teetime.framework.OldPipeline;
 import teetime.framework.OutputPort;
 import teetime.framework.pipe.IPipeFactory;
 import teetime.framework.pipe.PipeFactoryRegistry;
@@ -41,10 +41,12 @@ import kieker.common.util.filesystem.FSUtil;
  *
  * @since 1.10
  */
-public class Dir2RecordsFilter extends OldPipeline<ClassNameRegistryCreationFilter, Merger<IMonitoringRecord>> {
+public class Dir2RecordsFilter extends AbstractStage {
 
 	private final PipeFactoryRegistry pipeFactoryRegistry = PipeFactoryRegistry.INSTANCE;
 	private ClassNameRegistryRepository classNameRegistryRepository;
+	private final ClassNameRegistryCreationFilter classNameRegistryCreationFilter;
+	private final Merger<IMonitoringRecord> recordMerger;
 
 	/**
 	 * @since 1.10
@@ -80,8 +82,8 @@ public class Dir2RecordsFilter extends OldPipeline<ClassNameRegistryCreationFilt
 		pipeFactory.create(binaryFile2RecordFilter.getOutputPort(), recordMerger.getNewInputPort());
 
 		// prepare pipeline
-		this.setFirstStage(classNameRegistryCreationFilter);
-		this.setLastStage(recordMerger);
+		this.classNameRegistryCreationFilter = classNameRegistryCreationFilter;
+		this.recordMerger = recordMerger;
 	}
 
 	/**
@@ -100,11 +102,16 @@ public class Dir2RecordsFilter extends OldPipeline<ClassNameRegistryCreationFilt
 	}
 
 	public InputPort<File> getInputPort() {
-		return this.getFirstStage().getInputPort();
+		return classNameRegistryCreationFilter.getInputPort();
 	}
 
 	public OutputPort<IMonitoringRecord> getOutputPort() {
-		return this.getLastStage().getOutputPort();
+		return recordMerger.getOutputPort();
+	}
+
+	@Override
+	public void executeWithPorts() {
+		classNameRegistryCreationFilter.executeWithPorts();
 	}
 
 }
diff --git a/src/main/java/teetime/stage/io/filesystem/format/binary/DirWithBin2RecordFilter.java b/src/main/java/teetime/stage/io/filesystem/format/binary/DirWithBin2RecordFilter.java
index feb0cb4f..341d316d 100644
--- a/src/main/java/teetime/stage/io/filesystem/format/binary/DirWithBin2RecordFilter.java
+++ b/src/main/java/teetime/stage/io/filesystem/format/binary/DirWithBin2RecordFilter.java
@@ -2,8 +2,8 @@ package teetime.stage.io.filesystem.format.binary;
 
 import java.io.File;
 
+import teetime.framework.AbstractStage;
 import teetime.framework.InputPort;
-import teetime.framework.OldPipeline;
 import teetime.framework.OutputPort;
 import teetime.stage.className.ClassNameRegistryCreationFilter;
 import teetime.stage.className.ClassNameRegistryRepository;
@@ -11,18 +11,17 @@ import teetime.stage.io.filesystem.format.binary.file.BinaryFile2RecordFilter;
 
 import kieker.common.record.IMonitoringRecord;
 
-public class DirWithBin2RecordFilter extends OldPipeline<ClassNameRegistryCreationFilter, BinaryFile2RecordFilter> {
+public class DirWithBin2RecordFilter extends AbstractStage {
 
 	private ClassNameRegistryRepository classNameRegistryRepository;
+	private final ClassNameRegistryCreationFilter classNameRegistryCreationFilter;
+	private final BinaryFile2RecordFilter binaryFile2RecordFilter;
 
 	public DirWithBin2RecordFilter(final ClassNameRegistryRepository classNameRegistryRepository) {
 		this.classNameRegistryRepository = classNameRegistryRepository;
 
-		final ClassNameRegistryCreationFilter classNameRegistryCreationFilter = new ClassNameRegistryCreationFilter(classNameRegistryRepository);
-		final BinaryFile2RecordFilter binaryFile2RecordFilter = new BinaryFile2RecordFilter(classNameRegistryRepository);
-
-		this.setFirstStage(classNameRegistryCreationFilter);
-		this.setLastStage(binaryFile2RecordFilter);
+		classNameRegistryCreationFilter = new ClassNameRegistryCreationFilter(classNameRegistryRepository);
+		binaryFile2RecordFilter = new BinaryFile2RecordFilter(classNameRegistryRepository);
 	}
 
 	public DirWithBin2RecordFilter() {
@@ -38,10 +37,15 @@ public class DirWithBin2RecordFilter extends OldPipeline<ClassNameRegistryCreati
 	}
 
 	public InputPort<File> getInputPort() {
-		return this.getFirstStage().getInputPort();
+		return this.classNameRegistryCreationFilter.getInputPort();
 	}
 
 	public OutputPort<IMonitoringRecord> getOutputPort() {
-		return this.getLastStage().getOutputPort();
+		return this.binaryFile2RecordFilter.getOutputPort();
+	}
+
+	@Override
+	public void executeWithPorts() {
+		classNameRegistryCreationFilter.executeWithPorts();
 	}
 }
diff --git a/src/main/java/teetime/stage/io/filesystem/format/binary/file/BinaryFile2RecordFilter.java b/src/main/java/teetime/stage/io/filesystem/format/binary/file/BinaryFile2RecordFilter.java
index 24af3af2..79670050 100644
--- a/src/main/java/teetime/stage/io/filesystem/format/binary/file/BinaryFile2RecordFilter.java
+++ b/src/main/java/teetime/stage/io/filesystem/format/binary/file/BinaryFile2RecordFilter.java
@@ -19,7 +19,7 @@ import java.io.DataInputStream;
 import java.io.File;
 import java.io.IOException;
 
-import teetime.framework.ConsumerStage;
+import teetime.framework.AbstractConsumerStage;
 import teetime.framework.OutputPort;
 import teetime.stage.className.ClassNameRegistryRepository;
 
@@ -32,7 +32,7 @@ import kieker.common.util.filesystem.BinaryCompressionMethod;
  *
  * @since 1.10
  */
-public class BinaryFile2RecordFilter extends ConsumerStage<File> {
+public class BinaryFile2RecordFilter extends AbstractConsumerStage<File> {
 
 	private final OutputPort<IMonitoringRecord> outputPort = this.createOutputPort();
 
diff --git a/src/main/java/teetime/stage/io/filesystem/format/text/DirWithDat2RecordFilter.java b/src/main/java/teetime/stage/io/filesystem/format/text/DirWithDat2RecordFilter.java
index 1db08481..9c450506 100644
--- a/src/main/java/teetime/stage/io/filesystem/format/text/DirWithDat2RecordFilter.java
+++ b/src/main/java/teetime/stage/io/filesystem/format/text/DirWithDat2RecordFilter.java
@@ -2,8 +2,8 @@ package teetime.stage.io.filesystem.format.text;
 
 import java.io.File;
 
+import teetime.framework.AbstractStage;
 import teetime.framework.InputPort;
-import teetime.framework.OldPipeline;
 import teetime.framework.OutputPort;
 import teetime.stage.className.ClassNameRegistryCreationFilter;
 import teetime.stage.className.ClassNameRegistryRepository;
@@ -11,18 +11,17 @@ import teetime.stage.io.filesystem.format.text.file.DatFile2RecordFilter;
 
 import kieker.common.record.IMonitoringRecord;
 
-public class DirWithDat2RecordFilter extends OldPipeline<ClassNameRegistryCreationFilter, DatFile2RecordFilter> {
+public class DirWithDat2RecordFilter extends AbstractStage {
 
 	private ClassNameRegistryRepository classNameRegistryRepository;
+	private final ClassNameRegistryCreationFilter classNameRegistryCreationFilter;
+	private final DatFile2RecordFilter datFile2RecordFilter;
 
 	public DirWithDat2RecordFilter(final ClassNameRegistryRepository classNameRegistryRepository) {
 		this.classNameRegistryRepository = classNameRegistryRepository;
 
-		final ClassNameRegistryCreationFilter classNameRegistryCreationFilter = new ClassNameRegistryCreationFilter(classNameRegistryRepository);
-		final DatFile2RecordFilter datFile2RecordFilter = new DatFile2RecordFilter(classNameRegistryRepository);
-
-		this.setFirstStage(classNameRegistryCreationFilter);
-		this.setLastStage(datFile2RecordFilter);
+		classNameRegistryCreationFilter = new ClassNameRegistryCreationFilter(classNameRegistryRepository);
+		datFile2RecordFilter = new DatFile2RecordFilter(classNameRegistryRepository);
 	}
 
 	public DirWithDat2RecordFilter() {
@@ -38,10 +37,16 @@ public class DirWithDat2RecordFilter extends OldPipeline<ClassNameRegistryCreati
 	}
 
 	public InputPort<File> getInputPort() {
-		return this.getFirstStage().getInputPort();
+		return this.classNameRegistryCreationFilter.getInputPort();
 	}
 
 	public OutputPort<IMonitoringRecord> getOutputPort() {
-		return this.getLastStage().getOutputPort();
+		return this.datFile2RecordFilter.getOutputPort();
+	}
+
+	@Override
+	public void executeWithPorts() {
+		this.classNameRegistryCreationFilter.executeWithPorts();
 	}
+
 }
diff --git a/src/main/java/teetime/stage/io/filesystem/format/text/file/DatFile2RecordFilter.java b/src/main/java/teetime/stage/io/filesystem/format/text/file/DatFile2RecordFilter.java
index fba45931..c2ac888a 100644
--- a/src/main/java/teetime/stage/io/filesystem/format/text/file/DatFile2RecordFilter.java
+++ b/src/main/java/teetime/stage/io/filesystem/format/text/file/DatFile2RecordFilter.java
@@ -17,9 +17,9 @@ package teetime.stage.io.filesystem.format.text.file;
 
 import java.io.File;
 
+import teetime.framework.AbstractStage;
 import teetime.framework.InputPort;
 import teetime.framework.OutputPort;
-import teetime.framework.OldPipeline;
 import teetime.framework.pipe.SingleElementPipe;
 import teetime.stage.className.ClassNameRegistryRepository;
 import teetime.stage.io.File2TextLinesFilter;
@@ -31,24 +31,30 @@ import kieker.common.record.IMonitoringRecord;
  *
  * @since 1.10
  */
-public class DatFile2RecordFilter extends OldPipeline<File2TextLinesFilter, TextLine2RecordFilter> {
+public class DatFile2RecordFilter extends AbstractStage {
 
-	public DatFile2RecordFilter(final ClassNameRegistryRepository classNameRegistryRepository) {
-		File2TextLinesFilter file2TextLinesFilter = new File2TextLinesFilter();
-		TextLine2RecordFilter textLine2RecordFilter = new TextLine2RecordFilter(classNameRegistryRepository);
+	private final File2TextLinesFilter file2TextLinesFilter;
+	private final TextLine2RecordFilter textLine2RecordFilter;
 
-		this.setFirstStage(file2TextLinesFilter);
-		this.setLastStage(textLine2RecordFilter);
+	public DatFile2RecordFilter(final ClassNameRegistryRepository classNameRegistryRepository) {
+		file2TextLinesFilter = new File2TextLinesFilter();
+		textLine2RecordFilter = new TextLine2RecordFilter(classNameRegistryRepository);
 
 		// BETTER let the framework choose the optimal pipe implementation
 		SingleElementPipe.connect(file2TextLinesFilter.getOutputPort(), textLine2RecordFilter.getInputPort());
 	}
 
 	public InputPort<File> getInputPort() {
-		return this.getFirstStage().getInputPort();
+		return this.file2TextLinesFilter.getInputPort();
 	}
 
 	public OutputPort<IMonitoringRecord> getOutputPort() {
-		return this.getLastStage().getOutputPort();
+		return this.textLine2RecordFilter.getOutputPort();
 	}
+
+	@Override
+	public void executeWithPorts() {
+		file2TextLinesFilter.executeWithPorts();
+	}
+
 }
diff --git a/src/main/java/teetime/stage/io/filesystem/format/text/file/TextLine2MappingRegistryFilter.java b/src/main/java/teetime/stage/io/filesystem/format/text/file/TextLine2MappingRegistryFilter.java
index e2ee19c8..bb128cd6 100644
--- a/src/main/java/teetime/stage/io/filesystem/format/text/file/TextLine2MappingRegistryFilter.java
+++ b/src/main/java/teetime/stage/io/filesystem/format/text/file/TextLine2MappingRegistryFilter.java
@@ -18,16 +18,16 @@ package teetime.stage.io.filesystem.format.text.file;
 
 import java.util.Map;
 
-import teetime.framework.ConsumerStage;
+import teetime.framework.AbstractConsumerStage;
 
 import kieker.common.util.filesystem.FSUtil;
 
 /**
  * @author Christian Wulf
- * 
+ *
  * @since 1.10
  */
-public class TextLine2MappingRegistryFilter extends ConsumerStage<String> {
+public class TextLine2MappingRegistryFilter extends AbstractConsumerStage<String> {
 
 	private final Map<Integer, String> stringRegistry;
 
diff --git a/src/main/java/teetime/stage/io/filesystem/format/text/file/TextLine2RecordFilter.java b/src/main/java/teetime/stage/io/filesystem/format/text/file/TextLine2RecordFilter.java
index a39985f0..a165b77a 100644
--- a/src/main/java/teetime/stage/io/filesystem/format/text/file/TextLine2RecordFilter.java
+++ b/src/main/java/teetime/stage/io/filesystem/format/text/file/TextLine2RecordFilter.java
@@ -19,7 +19,7 @@ package teetime.stage.io.filesystem.format.text.file;
 import java.util.HashSet;
 import java.util.Set;
 
-import teetime.framework.ConsumerStage;
+import teetime.framework.AbstractConsumerStage;
 import teetime.framework.OutputPort;
 import teetime.stage.className.ClassNameRegistryRepository;
 import teetime.stage.util.MappingException;
@@ -35,7 +35,7 @@ import kieker.common.record.IMonitoringRecord;
  *
  * @since 1.10
  */
-public class TextLine2RecordFilter extends ConsumerStage<TextLine> {
+public class TextLine2RecordFilter extends AbstractConsumerStage<TextLine> {
 
 	private final OutputPort<IMonitoringRecord> outputPort = this.createOutputPort();
 
diff --git a/src/main/java/teetime/stage/stringBuffer/StringBufferFilter.java b/src/main/java/teetime/stage/stringBuffer/StringBufferFilter.java
index 6e2d9ac4..73a8e99c 100644
--- a/src/main/java/teetime/stage/stringBuffer/StringBufferFilter.java
+++ b/src/main/java/teetime/stage/stringBuffer/StringBufferFilter.java
@@ -18,7 +18,7 @@ package teetime.stage.stringBuffer;
 import java.util.Collection;
 import java.util.LinkedList;
 
-import teetime.framework.ConsumerStage;
+import teetime.framework.AbstractConsumerStage;
 import teetime.framework.OutputPort;
 import teetime.stage.stringBuffer.handler.AbstractDataTypeHandler;
 import teetime.stage.stringBuffer.util.KiekerHashMap;
@@ -28,7 +28,7 @@ import teetime.stage.stringBuffer.util.KiekerHashMap;
  *
  * @since 1.10
  */
-public class StringBufferFilter<T> extends ConsumerStage<T> {
+public class StringBufferFilter<T> extends AbstractConsumerStage<T> {
 
 	private final OutputPort<T> outputPort = this.createOutputPort();
 
diff --git a/src/main/java/teetime/stage/trace/traceReconstruction/TraceReconstructionFilter.java b/src/main/java/teetime/stage/trace/traceReconstruction/TraceReconstructionFilter.java
index 14ae83e9..3652abe7 100644
--- a/src/main/java/teetime/stage/trace/traceReconstruction/TraceReconstructionFilter.java
+++ b/src/main/java/teetime/stage/trace/traceReconstruction/TraceReconstructionFilter.java
@@ -17,7 +17,7 @@ package teetime.stage.trace.traceReconstruction;
 
 import java.util.concurrent.TimeUnit;
 
-import teetime.framework.ConsumerStage;
+import teetime.framework.AbstractConsumerStage;
 import teetime.framework.OutputPort;
 import teetime.util.concurrent.hashmap.ConcurrentHashMapWithDefault;
 import teetime.util.concurrent.hashmap.TraceBuffer;
@@ -32,7 +32,7 @@ import kieker.common.record.flow.trace.TraceMetadata;
  *
  * @since 1.10
  */
-public class TraceReconstructionFilter extends ConsumerStage<IFlowRecord> {
+public class TraceReconstructionFilter extends AbstractConsumerStage<IFlowRecord> {
 
 	private final OutputPort<TraceEventRecords> traceValidOutputPort = this.createOutputPort();
 	private final OutputPort<TraceEventRecords> traceInvalidOutputPort = this.createOutputPort();
diff --git a/src/main/java/teetime/stage/trace/traceReduction/TraceReductionFilter.java b/src/main/java/teetime/stage/trace/traceReduction/TraceReductionFilter.java
index 5c64c64f..3f5dda01 100644
--- a/src/main/java/teetime/stage/trace/traceReduction/TraceReductionFilter.java
+++ b/src/main/java/teetime/stage/trace/traceReduction/TraceReductionFilter.java
@@ -20,7 +20,7 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
 
-import teetime.framework.ConsumerStage;
+import teetime.framework.AbstractConsumerStage;
 import teetime.framework.InputPort;
 import teetime.framework.OutputPort;
 
@@ -37,7 +37,7 @@ import kieker.analysis.plugin.filter.flow.TraceEventRecords;
  *
  * @since
  */
-public class TraceReductionFilter extends ConsumerStage<TraceEventRecords> {
+public class TraceReductionFilter extends AbstractConsumerStage<TraceEventRecords> {
 
 	private final InputPort<Long> triggerInputPort = this.createInputPort();
 	private final OutputPort<TraceEventRecords> outputPort = this.createOutputPort();
diff --git a/src/main/java/util/KiekerLoadDriver.java b/src/main/java/util/KiekerLoadDriver.java
index 3fa220d1..409bb4bd 100644
--- a/src/main/java/util/KiekerLoadDriver.java
+++ b/src/main/java/util/KiekerLoadDriver.java
@@ -14,8 +14,7 @@ import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
 
-import teetime.framework.OldHeadPipeline;
-import teetime.framework.HeadStage;
+import teetime.framework.IStage;
 import teetime.framework.RunnableStage;
 import teetime.framework.pipe.SingleElementPipe;
 import teetime.stage.CollectorSink;
@@ -36,25 +35,21 @@ public class KiekerLoadDriver {
 	private long[] timings;
 
 	public KiekerLoadDriver(final File directory) {
-		HeadStage producerPipeline = this.buildProducerPipeline(directory);
+		IStage producerPipeline = this.buildProducerPipeline(directory);
 		this.runnableStage = new RunnableStage(producerPipeline);
 	}
 
-	private OldHeadPipeline<InitialElementProducer<File>, CollectorSink<IMonitoringRecord>> buildProducerPipeline(final File directory) {
+	private IStage buildProducerPipeline(final File directory) {
 		ClassNameRegistryRepository classNameRegistryRepository = new ClassNameRegistryRepository();
 		// create stages
 		InitialElementProducer<File> initialElementProducer = new InitialElementProducer<File>(directory);
 		Dir2RecordsFilter dir2RecordsFilter = new Dir2RecordsFilter(classNameRegistryRepository);
 		CollectorSink<IMonitoringRecord> collector = new CollectorSink<IMonitoringRecord>(this.elementCollection);
 
-		final OldHeadPipeline<InitialElementProducer<File>, CollectorSink<IMonitoringRecord>> pipeline = new OldHeadPipeline<InitialElementProducer<File>, CollectorSink<IMonitoringRecord>>();
-		pipeline.setFirstStage(initialElementProducer);
-		pipeline.setLastStage(collector);
-
 		SingleElementPipe.connect(initialElementProducer.getOutputPort(), dir2RecordsFilter.getInputPort());
 		SingleElementPipe.connect(dir2RecordsFilter.getOutputPort(), collector.getInputPort());
 
-		return pipeline;
+		return initialElementProducer;
 	}
 
 	public Collection<IMonitoringRecord> load() {
diff --git a/src/performancetest/java/teetime/examples/kiekerdays/TcpTraceLogging.java b/src/performancetest/java/teetime/examples/kiekerdays/TcpTraceLogging.java
index 3820209e..2bd12c2b 100644
--- a/src/performancetest/java/teetime/examples/kiekerdays/TcpTraceLogging.java
+++ b/src/performancetest/java/teetime/examples/kiekerdays/TcpTraceLogging.java
@@ -1,6 +1,6 @@
 package teetime.examples.kiekerdays;
 
-import teetime.framework.HeadStage;
+import teetime.framework.IStage;
 import teetime.framework.RunnableStage;
 import teetime.stage.io.network.TcpReader;
 
@@ -9,7 +9,7 @@ public class TcpTraceLogging {
 	private Thread tcpThread;
 
 	public void init() {
-		HeadStage tcpPipeline = this.buildTcpPipeline();
+		IStage tcpPipeline = this.buildTcpPipeline();
 		this.tcpThread = new Thread(new RunnableStage(tcpPipeline));
 	}
 
@@ -24,7 +24,7 @@ public class TcpTraceLogging {
 		}
 	}
 
-	private HeadStage buildTcpPipeline() {
+	private IStage buildTcpPipeline() {
 		// TCPReaderSink tcpReader = new TCPReaderSink();
 		TcpReader tcpReader = new TcpReader();
 
diff --git a/src/performancetest/java/teetime/examples/kiekerdays/TcpTraceLoggingExplorviz.java b/src/performancetest/java/teetime/examples/kiekerdays/TcpTraceLoggingExplorviz.java
index f97afc82..3c9ddeaf 100644
--- a/src/performancetest/java/teetime/examples/kiekerdays/TcpTraceLoggingExplorviz.java
+++ b/src/performancetest/java/teetime/examples/kiekerdays/TcpTraceLoggingExplorviz.java
@@ -1,7 +1,6 @@
 package teetime.examples.kiekerdays;
 
-import teetime.framework.OldHeadPipeline;
-import teetime.framework.HeadStage;
+import teetime.framework.IStage;
 import teetime.framework.RunnableStage;
 import teetime.framework.pipe.SingleElementPipe;
 import teetime.stage.basic.Sink;
@@ -14,7 +13,7 @@ public class TcpTraceLoggingExplorviz {
 	private Thread tcpThread;
 
 	public void init() {
-		HeadStage tcpPipeline = this.buildTcpPipeline();
+		IStage tcpPipeline = this.buildTcpPipeline();
 		this.tcpThread = new Thread(new RunnableStage(tcpPipeline));
 	}
 
@@ -29,16 +28,12 @@ public class TcpTraceLoggingExplorviz {
 		}
 	}
 
-	private HeadStage buildTcpPipeline() {
+	private IStage buildTcpPipeline() {
 		KiekerRecordTcpReader tcpReader = new KiekerRecordTcpReader();
 		Sink<IMonitoringRecord> endStage = new Sink<IMonitoringRecord>();
 
 		SingleElementPipe.connect(tcpReader.getOutputPort(), endStage.getInputPort());
 
-		// create and configure pipeline
-		OldHeadPipeline<KiekerRecordTcpReader, Sink<IMonitoringRecord>> pipeline = new OldHeadPipeline<KiekerRecordTcpReader, Sink<IMonitoringRecord>>();
-		pipeline.setFirstStage(tcpReader);
-		pipeline.setLastStage(endStage);
 		return tcpReader;
 	}
 
diff --git a/src/performancetest/java/teetime/examples/kiekerdays/TcpTraceReconstruction.java b/src/performancetest/java/teetime/examples/kiekerdays/TcpTraceReconstruction.java
index 46b62adc..cf8bf96f 100644
--- a/src/performancetest/java/teetime/examples/kiekerdays/TcpTraceReconstruction.java
+++ b/src/performancetest/java/teetime/examples/kiekerdays/TcpTraceReconstruction.java
@@ -4,8 +4,7 @@ import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
 
-import teetime.framework.OldHeadPipeline;
-import teetime.framework.HeadStage;
+import teetime.framework.IStage;
 import teetime.framework.RunnableStage;
 import teetime.framework.pipe.SingleElementPipe;
 import teetime.framework.pipe.SpScPipe;
@@ -38,32 +37,28 @@ public class TcpTraceReconstruction {
 	private int numWorkerThreads;
 
 	public void init() {
-		OldHeadPipeline<TcpReader, Distributor<IMonitoringRecord>> tcpPipeline = this.buildTcpPipeline();
+		IStage tcpPipeline = this.buildTcpPipeline();
 		this.tcpThread = new Thread(new RunnableStage(tcpPipeline));
 
 		this.numWorkerThreads = Math.min(NUM_VIRTUAL_CORES, this.numWorkerThreads);
 		this.workerThreads = new Thread[this.numWorkerThreads];
 
 		for (int i = 0; i < this.workerThreads.length; i++) {
-			HeadStage pipeline = this.buildPipeline(tcpPipeline.getLastStage());
+			IStage pipeline = this.buildPipeline(tcpPipeline.getLastStage());
 			this.workerThreads[i] = new Thread(new RunnableStage(pipeline));
 		}
 	}
 
-	private OldHeadPipeline<TcpReader, Distributor<IMonitoringRecord>> buildTcpPipeline() {
+	private IStage buildTcpPipeline() {
 		TcpReader tcpReader = new TcpReader();
 		Distributor<IMonitoringRecord> distributor = new Distributor<IMonitoringRecord>();
 
 		SingleElementPipe.connect(tcpReader.getOutputPort(), distributor.getInputPort());
 
-		// create and configure pipeline
-		OldHeadPipeline<TcpReader, Distributor<IMonitoringRecord>> pipeline = new OldHeadPipeline<TcpReader, Distributor<IMonitoringRecord>>();
-		pipeline.setFirstStage(tcpReader);
-		pipeline.setLastStage(distributor);
-		return pipeline;
+		return tcpReader;
 	}
 
-	private HeadStage buildPipeline(final Distributor<IMonitoringRecord> tcpReaderPipeline) {
+	private IStage buildPipeline(final Distributor<IMonitoringRecord> tcpReaderPipeline) {
 		// create stages
 		Relay<IMonitoringRecord> relay = new Relay<IMonitoringRecord>();
 		final InstanceOfFilter<IMonitoringRecord, IFlowRecord> instanceOfFilter = new InstanceOfFilter<IMonitoringRecord, IFlowRecord>(
@@ -79,11 +74,7 @@ public class TcpTraceReconstruction {
 		SingleElementPipe.connect(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort());
 		SingleElementPipe.connect(traceReconstructionFilter.getTraceValidOutputPort(), endStage.getInputPort());
 
-		// create and configure pipeline
-		OldHeadPipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>> pipeline = new OldHeadPipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>>();
-		pipeline.setFirstStage(relay);
-		pipeline.setLastStage(endStage);
-		return pipeline;
+		return relay;
 	}
 
 	public void start() {
diff --git a/src/performancetest/java/teetime/examples/kiekerdays/TcpTraceReduction.java b/src/performancetest/java/teetime/examples/kiekerdays/TcpTraceReduction.java
index e8649f66..9e7c9e09 100644
--- a/src/performancetest/java/teetime/examples/kiekerdays/TcpTraceReduction.java
+++ b/src/performancetest/java/teetime/examples/kiekerdays/TcpTraceReduction.java
@@ -6,8 +6,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 
-import teetime.framework.OldHeadPipeline;
-import teetime.framework.HeadStage;
+import teetime.framework.IStage;
 import teetime.framework.RunnableStage;
 import teetime.framework.pipe.SingleElementPipe;
 import teetime.framework.pipe.SpScPipe;
@@ -46,35 +45,31 @@ public class TcpTraceReduction {
 	private int numWorkerThreads;
 
 	public void init() {
-		OldHeadPipeline<TcpReader, Distributor<IMonitoringRecord>> tcpPipeline = this.buildTcpPipeline();
+		IStage tcpPipeline = this.buildTcpPipeline();
 		this.tcpThread = new Thread(new RunnableStage(tcpPipeline));
 
-		OldHeadPipeline<Clock, Distributor<Long>> clockStage = this.buildClockPipeline(5000);
+		IStage clockStage = this.buildClockPipeline(5000);
 		this.clockThread = new Thread(new RunnableStage(clockStage));
 
 		this.numWorkerThreads = Math.min(NUM_VIRTUAL_CORES, this.numWorkerThreads);
 		this.workerThreads = new Thread[this.numWorkerThreads];
 
 		for (int i = 0; i < this.workerThreads.length; i++) {
-			HeadStage pipeline = this.buildPipeline(tcpPipeline.getLastStage(), clockStage.getLastStage());
+			IStage pipeline = this.buildPipeline(tcpPipeline.getLastStage(), clockStage.getLastStage());
 			this.workerThreads[i] = new Thread(new RunnableStage(pipeline));
 		}
 	}
 
-	private OldHeadPipeline<TcpReader, Distributor<IMonitoringRecord>> buildTcpPipeline() {
+	private IStage buildTcpPipeline() {
 		TcpReader tcpReader = new TcpReader();
 		Distributor<IMonitoringRecord> distributor = new Distributor<IMonitoringRecord>();
 
 		SingleElementPipe.connect(tcpReader.getOutputPort(), distributor.getInputPort());
 
-		// create and configure pipeline
-		OldHeadPipeline<TcpReader, Distributor<IMonitoringRecord>> pipeline = new OldHeadPipeline<TcpReader, Distributor<IMonitoringRecord>>();
-		pipeline.setFirstStage(tcpReader);
-		pipeline.setLastStage(distributor);
-		return pipeline;
+		return tcpReader;
 	}
 
-	private OldHeadPipeline<Clock, Distributor<Long>> buildClockPipeline(final long intervalDelayInMs) {
+	private IStage buildClockPipeline(final long intervalDelayInMs) {
 		Clock clock = new Clock();
 		clock.setInitialDelayInMs(intervalDelayInMs);
 		clock.setIntervalDelayInMs(intervalDelayInMs);
@@ -82,14 +77,10 @@ public class TcpTraceReduction {
 
 		SingleElementPipe.connect(clock.getOutputPort(), distributor.getInputPort());
 
-		// create and configure pipeline
-		OldHeadPipeline<Clock, Distributor<Long>> pipeline = new OldHeadPipeline<Clock, Distributor<Long>>();
-		pipeline.setFirstStage(clock);
-		pipeline.setLastStage(distributor);
-		return pipeline;
+		return clock;
 	}
 
-	private HeadStage buildPipeline(final Distributor<IMonitoringRecord> tcpReaderPipeline, final Distributor<Long> clockStage) {
+	private IStage buildPipeline(final Distributor<IMonitoringRecord> tcpReaderPipeline, final Distributor<Long> clockStage) {
 		// create stages
 		Relay<IMonitoringRecord> relay = new Relay<IMonitoringRecord>();
 		final InstanceOfFilter<IMonitoringRecord, IFlowRecord> instanceOfFilter = new InstanceOfFilter<IMonitoringRecord, IFlowRecord>(
@@ -109,11 +100,7 @@ public class TcpTraceReduction {
 
 		SpScPipe.connect(clockStage.getNewOutputPort(), traceReductionFilter.getTriggerInputPort(), 10);
 
-		// create and configure pipeline
-		OldHeadPipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>> pipeline = new OldHeadPipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>>();
-		pipeline.setFirstStage(relay);
-		pipeline.setLastStage(endStage);
-		return pipeline;
+		return relay;
 	}
 
 	public void start() {
diff --git a/src/performancetest/java/teetime/examples/recordReader/RecordReaderConfiguration.java b/src/performancetest/java/teetime/examples/recordReader/RecordReaderConfiguration.java
index 955057cb..0f8000e8 100644
--- a/src/performancetest/java/teetime/examples/recordReader/RecordReaderConfiguration.java
+++ b/src/performancetest/java/teetime/examples/recordReader/RecordReaderConfiguration.java
@@ -20,7 +20,7 @@ import java.util.LinkedList;
 import java.util.List;
 
 import teetime.framework.AnalysisConfiguration;
-import teetime.framework.HeadStage;
+import teetime.framework.IStage;
 import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
 import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
 import teetime.stage.CollectorSink;
@@ -44,11 +44,11 @@ public class RecordReaderConfiguration extends AnalysisConfiguration {
 	}
 
 	private void buildConfiguration() {
-		HeadStage producerPipeline = this.buildProducerPipeline();
+		IStage producerPipeline = this.buildProducerPipeline();
 		this.getFiniteProducerStages().add(producerPipeline);
 	}
 
-	private HeadStage buildProducerPipeline() {
+	private IStage buildProducerPipeline() {
 		ClassNameRegistryRepository classNameRegistryRepository = new ClassNameRegistryRepository();
 		File logDir = new File("src/test/data/bookstore-logs");
 		// create stages
diff --git a/src/performancetest/java/teetime/examples/traceReading/TcpTraceLoggingExtAnalysis.java b/src/performancetest/java/teetime/examples/traceReading/TcpTraceLoggingExtAnalysis.java
index d4b06908..6ab3f824 100644
--- a/src/performancetest/java/teetime/examples/traceReading/TcpTraceLoggingExtAnalysis.java
+++ b/src/performancetest/java/teetime/examples/traceReading/TcpTraceLoggingExtAnalysis.java
@@ -2,7 +2,7 @@ package teetime.examples.traceReading;
 
 import java.util.List;
 
-import teetime.framework.OldHeadPipeline;
+import teetime.framework.IStage;
 import teetime.framework.RunnableStage;
 import teetime.framework.pipe.SingleElementPipe;
 import teetime.framework.pipe.SpScPipe;
@@ -23,7 +23,7 @@ public class TcpTraceLoggingExtAnalysis {
 	private Counter<IMonitoringRecord> recordCounter;
 	private ElementThroughputMeasuringStage<IMonitoringRecord> recordThroughputStage;
 
-	private OldHeadPipeline<Clock, Distributor<Long>> buildClockPipeline(final long intervalDelayInMs) {
+	private IStage buildClockPipeline(final long intervalDelayInMs) {
 		Clock clockStage = new Clock();
 		clockStage.setInitialDelayInMs(intervalDelayInMs);
 		clockStage.setIntervalDelayInMs(intervalDelayInMs);
@@ -31,14 +31,10 @@ public class TcpTraceLoggingExtAnalysis {
 
 		SingleElementPipe.connect(clockStage.getOutputPort(), distributor.getInputPort());
 
-		// create and configure pipeline
-		OldHeadPipeline<Clock, Distributor<Long>> pipeline = new OldHeadPipeline<Clock, Distributor<Long>>();
-		pipeline.setFirstStage(clockStage);
-		pipeline.setLastStage(distributor);
-		return pipeline;
+		return clockStage;
 	}
 
-	private OldHeadPipeline<?, ?> buildTcpPipeline(final Distributor<Long> previousClockStage) {
+	private IStage buildTcpPipeline(final Distributor<Long> previousClockStage) {
 		TcpReader tcpReader = new TcpReader();
 		this.recordCounter = new Counter<IMonitoringRecord>();
 		this.recordThroughputStage = new ElementThroughputMeasuringStage<IMonitoringRecord>();
@@ -51,19 +47,14 @@ public class TcpTraceLoggingExtAnalysis {
 
 		SpScPipe.connect(previousClockStage.getNewOutputPort(), this.recordThroughputStage.getTriggerInputPort(), 10);
 
-		// create and configure pipeline
-		OldHeadPipeline<TcpReader, Sink<IMonitoringRecord>> pipeline = new OldHeadPipeline<TcpReader, Sink<IMonitoringRecord>>();
-		pipeline.setFirstStage(tcpReader);
-		pipeline.setLastStage(endStage);
-		return pipeline;
+		return tcpReader;
 	}
 
 	public void init() {
-
-		OldHeadPipeline<Clock, Distributor<Long>> clockPipeline = this.buildClockPipeline(1000);
+		IStage clockPipeline = this.buildClockPipeline(1000);
 		this.clockThread = new Thread(new RunnableStage(clockPipeline));
 
-		OldHeadPipeline<?, ?> tcpPipeline = this.buildTcpPipeline(clockPipeline.getLastStage());
+		IStage tcpPipeline = this.buildTcpPipeline(clockPipeline.getLastStage());
 		this.tcpThread = new Thread(new RunnableStage(tcpPipeline));
 	}
 
diff --git a/src/performancetest/java/teetime/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java b/src/performancetest/java/teetime/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java
index 9a4cfff8..abeb2fa9 100644
--- a/src/performancetest/java/teetime/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java
+++ b/src/performancetest/java/teetime/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java
@@ -3,7 +3,7 @@ package teetime.examples.traceReconstruction;
 import java.util.LinkedList;
 import java.util.List;
 
-import teetime.framework.OldHeadPipeline;
+import teetime.framework.IStage;
 import teetime.framework.RunnableStage;
 import teetime.framework.pipe.SingleElementPipe;
 import teetime.framework.pipe.SpScPipe;
@@ -41,31 +41,27 @@ public class TcpTraceReconstructionAnalysis {
 	private ElementThroughputMeasuringStage<TraceEventRecords> traceThroughputFilter;
 
 	public void init() {
-		OldHeadPipeline<Clock, Distributor<Long>> clockStage = this.buildClockPipeline(1000);
+		IStage clockStage = this.buildClockPipeline(1000);
 		this.clockThread = new Thread(new RunnableStage(clockStage));
 
-		OldHeadPipeline<Clock, Distributor<Long>> clock2Stage = this.buildClockPipeline(2000);
+		IStage clock2Stage = this.buildClockPipeline(2000);
 		this.clock2Thread = new Thread(new RunnableStage(clock2Stage));
 
-		OldHeadPipeline<?, ?> pipeline = this.buildPipeline(clockStage.getLastStage(), clock2Stage.getLastStage());
+		IStage pipeline = this.buildPipeline(clockStage.getLastStage(), clock2Stage.getLastStage());
 		this.workerThread = new Thread(new RunnableStage(pipeline));
 	}
 
-	private OldHeadPipeline<Clock, Distributor<Long>> buildClockPipeline(final long intervalDelayInMs) {
+	private IStage buildClockPipeline(final long intervalDelayInMs) {
 		Clock clock = new Clock();
 		clock.setIntervalDelayInMs(intervalDelayInMs);
 		Distributor<Long> distributor = new Distributor<Long>();
 
 		SingleElementPipe.connect(clock.getOutputPort(), distributor.getInputPort());
 
-		// create and configure pipeline
-		OldHeadPipeline<Clock, Distributor<Long>> pipeline = new OldHeadPipeline<Clock, Distributor<Long>>();
-		pipeline.setFirstStage(clock);
-		pipeline.setLastStage(distributor);
-		return pipeline;
+		return clock;
 	}
 
-	private OldHeadPipeline<TcpReader, Sink<TraceEventRecords>> buildPipeline(final Distributor<Long> clockStage, final Distributor<Long> clock2Stage) {
+	private IStage buildPipeline(final Distributor<Long> clockStage, final Distributor<Long> clock2Stage) {
 		// create stages
 		TcpReader tcpReader = new TcpReader();
 		this.recordCounter = new Counter<IMonitoringRecord>();
@@ -91,11 +87,7 @@ public class TcpTraceReconstructionAnalysis {
 		SpScPipe.connect(clockStage.getNewOutputPort(), this.recordThroughputFilter.getTriggerInputPort(), 10);
 		SpScPipe.connect(clock2Stage.getNewOutputPort(), this.traceThroughputFilter.getTriggerInputPort(), 10);
 
-		// create and configure pipeline
-		OldHeadPipeline<TcpReader, Sink<TraceEventRecords>> pipeline = new OldHeadPipeline<TcpReader, Sink<TraceEventRecords>>();
-		pipeline.setFirstStage(tcpReader);
-		pipeline.setLastStage(endStage);
-		return pipeline;
+		return tcpReader;
 	}
 
 	public void start() {
diff --git a/src/performancetest/java/teetime/examples/traceReconstruction/TraceReconstructionAnalysis.java b/src/performancetest/java/teetime/examples/traceReconstruction/TraceReconstructionAnalysis.java
index 1c171f1f..d7b33832 100644
--- a/src/performancetest/java/teetime/examples/traceReconstruction/TraceReconstructionAnalysis.java
+++ b/src/performancetest/java/teetime/examples/traceReconstruction/TraceReconstructionAnalysis.java
@@ -4,7 +4,7 @@ import java.io.File;
 import java.util.LinkedList;
 import java.util.List;
 
-import teetime.framework.OldHeadPipeline;
+import teetime.framework.IStage;
 import teetime.framework.RunnableStage;
 import teetime.framework.pipe.SingleElementPipe;
 import teetime.framework.pipe.SpScPipe;
@@ -49,7 +49,7 @@ public class TraceReconstructionAnalysis {
 		Clock clockStage = this.buildClockPipeline();
 		this.clockThread = new Thread(new RunnableStage(clockStage));
 
-		OldHeadPipeline<?, ?> pipeline = this.buildPipeline(clockStage);
+		IStage pipeline = this.buildPipeline(clockStage);
 		this.workerThread = new Thread(new RunnableStage(pipeline));
 	}
 
@@ -60,7 +60,7 @@ public class TraceReconstructionAnalysis {
 		return clock;
 	}
 
-	private OldHeadPipeline<?, ?> buildPipeline(final Clock clockStage) {
+	private IStage buildPipeline(final Clock clockStage) {
 		this.classNameRegistryRepository = new ClassNameRegistryRepository();
 
 		// create stages
@@ -98,11 +98,7 @@ public class TraceReconstructionAnalysis {
 
 		SpScPipe.connect(clockStage.getOutputPort(), this.throughputFilter.getTriggerInputPort(), 1);
 
-		// create and configure pipeline
-		OldHeadPipeline<InitialElementProducer<File>, CollectorSink<TraceEventRecords>> pipeline = new OldHeadPipeline<InitialElementProducer<File>, CollectorSink<TraceEventRecords>>();
-		pipeline.setFirstStage(initialElementProducer);
-		pipeline.setLastStage(collector);
-		return pipeline;
+		return initialElementProducer;
 	}
 
 	public void start() {
diff --git a/src/performancetest/java/teetime/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreadsConfiguration.java b/src/performancetest/java/teetime/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreadsConfiguration.java
index 0169a2ce..15c606b4 100644
--- a/src/performancetest/java/teetime/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreadsConfiguration.java
+++ b/src/performancetest/java/teetime/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreadsConfiguration.java
@@ -6,9 +6,9 @@ import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
 
+import teetime.framework.AbstractStage;
 import teetime.framework.AnalysisConfiguration;
-import teetime.framework.OldHeadPipeline;
-import teetime.framework.Stage;
+import teetime.framework.IStage;
 import teetime.framework.pipe.SingleElementPipe;
 import teetime.framework.pipe.SpScPipe;
 import teetime.stage.Clock;
@@ -72,37 +72,32 @@ public class TcpTraceReconstructionAnalysisWithThreadsConfiguration extends Anal
 	}
 
 	public void buildConfiguration() {
-		final OldHeadPipeline<TcpReader, Distributor<IMonitoringRecord>> tcpPipeline = this.buildTcpPipeline();
+		final IStage tcpPipeline = this.buildTcpPipeline();
 		this.getFiniteProducerStages().add(tcpPipeline);
 
-		final OldHeadPipeline<Clock, Distributor<Long>> clockStage = this.buildClockPipeline(1000);
+		final IStage clockStage = this.buildClockPipeline(1000);
 		this.getInfiniteProducerStages().add(clockStage);
 
-		final OldHeadPipeline<Clock, Distributor<Long>> clock2Stage = this.buildClockPipeline(2000);
+		final IStage clock2Stage = this.buildClockPipeline(2000);
 		this.getInfiniteProducerStages().add(clock2Stage);
 
 		this.numWorkerThreads = Math.min(NUM_VIRTUAL_CORES, this.numWorkerThreads);
 		for (int i = 0; i < this.numWorkerThreads; i++) {
-			OldHeadPipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>> pipeline = this.buildPipeline(tcpPipeline.getLastStage(), clockStage.getLastStage(),
-					clock2Stage.getLastStage());
+			IStage pipeline = this.buildPipeline(tcpPipeline.getLastStage(), clockStage.getLastStage(), clock2Stage.getLastStage());
 			this.getConsumerStages().add(pipeline);
 		}
 	}
 
-	private OldHeadPipeline<TcpReader, Distributor<IMonitoringRecord>> buildTcpPipeline() {
+	private IStage buildTcpPipeline() {
 		TcpReader tcpReader = new TcpReader();
 		Distributor<IMonitoringRecord> distributor = new Distributor<IMonitoringRecord>();
 
 		SingleElementPipe.connect(tcpReader.getOutputPort(), distributor.getInputPort());
 
-		// create and configure pipeline
-		OldHeadPipeline<TcpReader, Distributor<IMonitoringRecord>> pipeline = new OldHeadPipeline<TcpReader, Distributor<IMonitoringRecord>>("TCP reader pipeline");
-		pipeline.setFirstStage(tcpReader);
-		pipeline.setLastStage(distributor);
-		return pipeline;
+		return tcpReader;
 	}
 
-	private OldHeadPipeline<Clock, Distributor<Long>> buildClockPipeline(final long intervalDelayInMs) {
+	private IStage buildClockPipeline(final long intervalDelayInMs) {
 		Clock clock = new Clock();
 		clock.setInitialDelayInMs(intervalDelayInMs);
 		clock.setIntervalDelayInMs(intervalDelayInMs);
@@ -110,14 +105,10 @@ public class TcpTraceReconstructionAnalysisWithThreadsConfiguration extends Anal
 
 		SingleElementPipe.connect(clock.getOutputPort(), distributor.getInputPort());
 
-		// create and configure pipeline
-		OldHeadPipeline<Clock, Distributor<Long>> pipeline = new OldHeadPipeline<Clock, Distributor<Long>>();
-		pipeline.setFirstStage(clock);
-		pipeline.setLastStage(distributor);
-		return pipeline;
+		return clock;
 	}
 
-	private static class StageFactory<T extends Stage> {
+	private static class StageFactory<T extends AbstractStage> {
 
 		private final Constructor<T> constructor;
 		private final List<T> stages = new ArrayList<T>();
@@ -147,8 +138,7 @@ public class TcpTraceReconstructionAnalysisWithThreadsConfiguration extends Anal
 		}
 	}
 
-	private OldHeadPipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>> buildPipeline(final Distributor<IMonitoringRecord> tcpReaderPipeline,
-			final Distributor<Long> clockStage, final Distributor<Long> clock2Stage) {
+	private IStage buildPipeline(final Distributor<IMonitoringRecord> tcpReaderPipeline, final Distributor<Long> clockStage, final Distributor<Long> clock2Stage) {
 		// create stages
 		Relay<IMonitoringRecord> relay = new Relay<IMonitoringRecord>();
 		Counter<IMonitoringRecord> recordCounter = this.recordCounterFactory.create();
@@ -182,14 +172,7 @@ public class TcpTraceReconstructionAnalysisWithThreadsConfiguration extends Anal
 		// SingleElementPipe.connect(traceThroughputFilter.getOutputPort(), traceCounter.getInputPort());
 		SingleElementPipe.connect(traceCounter.getOutputPort(), endStage.getInputPort());
 
-		// create and configure pipeline
-		OldHeadPipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>> pipeline = new OldHeadPipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>>(
-				"Worker pipeline");
-		pipeline.setFirstStage(relay);
-		// pipeline.addIntermediateStage(sysout);
-		pipeline.setLastStage(endStage);
-
-		return pipeline;
+		return relay;
 	}
 
 	public List<TraceEventRecords> getElementCollection() {
diff --git a/src/performancetest/java/teetime/examples/traceReductionWithThreads/TcpTraceReductionAnalysisWithThreads.java b/src/performancetest/java/teetime/examples/traceReductionWithThreads/TcpTraceReductionAnalysisWithThreads.java
index daef09de..3fad6c93 100644
--- a/src/performancetest/java/teetime/examples/traceReductionWithThreads/TcpTraceReductionAnalysisWithThreads.java
+++ b/src/performancetest/java/teetime/examples/traceReductionWithThreads/TcpTraceReductionAnalysisWithThreads.java
@@ -8,9 +8,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 
-import teetime.framework.OldHeadPipeline;
+import teetime.framework.AbstractStage;
+import teetime.framework.IStage;
 import teetime.framework.RunnableStage;
-import teetime.framework.Stage;
 import teetime.framework.pipe.SingleElementPipe;
 import teetime.framework.pipe.SpScPipe;
 import teetime.stage.Clock;
@@ -52,38 +52,34 @@ public class TcpTraceReductionAnalysisWithThreads {
 	private int numWorkerThreads;
 
 	public void init() {
-		OldHeadPipeline<TcpReader, Distributor<IMonitoringRecord>> tcpPipeline = this.buildTcpPipeline();
+		IStage tcpPipeline = this.buildTcpPipeline();
 		this.tcpThread = new Thread(new RunnableStage(tcpPipeline));
 
-		OldHeadPipeline<Clock, Distributor<Long>> clockStage = this.buildClockPipeline(1000);
+		IStage clockStage = this.buildClockPipeline(1000);
 		this.clockThread = new Thread(new RunnableStage(clockStage));
 
-		OldHeadPipeline<Clock, Distributor<Long>> clock2Stage = this.buildClockPipeline(5000);
+		IStage clock2Stage = this.buildClockPipeline(5000);
 		this.clock2Thread = new Thread(new RunnableStage(clock2Stage));
 
 		this.numWorkerThreads = Math.min(NUM_VIRTUAL_CORES, this.numWorkerThreads);
 		this.workerThreads = new Thread[this.numWorkerThreads];
 
 		for (int i = 0; i < this.workerThreads.length; i++) {
-			OldHeadPipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>> pipeline = this.buildPipeline(tcpPipeline, clockStage, clock2Stage);
+			IStage pipeline = this.buildPipeline(tcpPipeline, clockStage, clock2Stage);
 			this.workerThreads[i] = new Thread(new RunnableStage(pipeline));
 		}
 	}
 
-	private OldHeadPipeline<TcpReader, Distributor<IMonitoringRecord>> buildTcpPipeline() {
+	private IStage buildTcpPipeline() {
 		TcpReader tcpReader = new TcpReader();
 		Distributor<IMonitoringRecord> distributor = new Distributor<IMonitoringRecord>();
 
 		SingleElementPipe.connect(tcpReader.getOutputPort(), distributor.getInputPort());
 
-		// create and configure pipeline
-		OldHeadPipeline<TcpReader, Distributor<IMonitoringRecord>> pipeline = new OldHeadPipeline<TcpReader, Distributor<IMonitoringRecord>>();
-		pipeline.setFirstStage(tcpReader);
-		pipeline.setLastStage(distributor);
-		return pipeline;
+		return tcpReader;
 	}
 
-	private OldHeadPipeline<Clock, Distributor<Long>> buildClockPipeline(final long intervalDelayInMs) {
+	private IStage buildClockPipeline(final long intervalDelayInMs) {
 		Clock clock = new Clock();
 		clock.setInitialDelayInMs(intervalDelayInMs);
 		clock.setIntervalDelayInMs(intervalDelayInMs);
@@ -91,14 +87,10 @@ public class TcpTraceReductionAnalysisWithThreads {
 
 		SingleElementPipe.connect(clock.getOutputPort(), distributor.getInputPort());
 
-		// create and configure pipeline
-		OldHeadPipeline<Clock, Distributor<Long>> pipeline = new OldHeadPipeline<Clock, Distributor<Long>>();
-		pipeline.setFirstStage(clock);
-		pipeline.setLastStage(distributor);
-		return pipeline;
+		return clock;
 	}
 
-	private static class StageFactory<T extends Stage> {
+	private static class StageFactory<T extends AbstractStage> {
 
 		private final Constructor<T> constructor;
 		private final List<T> stages = new ArrayList<T>();
@@ -152,10 +144,7 @@ public class TcpTraceReductionAnalysisWithThreads {
 		}
 	}
 
-	private OldHeadPipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>> buildPipeline(
-			final OldHeadPipeline<TcpReader, Distributor<IMonitoringRecord>> tcpReaderPipeline,
-			final OldHeadPipeline<Clock, Distributor<Long>> clockStage,
-			final OldHeadPipeline<Clock, Distributor<Long>> clock2Stage) {
+	private IStage buildPipeline(final IStage tcpReaderPipeline, final IStage clockStage, final IStage clock2Stage) {
 		// create stages
 		Relay<IMonitoringRecord> relay = new Relay<IMonitoringRecord>();
 		Counter<IMonitoringRecord> recordCounter = this.recordCounterFactory.create();
@@ -187,11 +176,7 @@ public class TcpTraceReductionAnalysisWithThreads {
 		SpScPipe.connect(clock2Stage.getLastStage().getNewOutputPort(), traceReductionFilter.getTriggerInputPort(), 10);
 		SpScPipe.connect(clockStage.getLastStage().getNewOutputPort(), traceThroughputFilter.getTriggerInputPort(), 10);
 
-		// create and configure pipeline
-		OldHeadPipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>> pipeline = new OldHeadPipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>>();
-		pipeline.setFirstStage(relay);
-		pipeline.setLastStage(endStage);
-		return pipeline;
+		return relay;
 	}
 
 	public void start() {
-- 
GitLab