From 3757e9028a3f99f831f49296fc710b052b0faf91 Mon Sep 17 00:00:00 2001
From: Christian Wulf <chw@informatik.uni-kiel.de>
Date: Wed, 25 Jun 2014 18:22:40 +0200
Subject: [PATCH] added RecordReaderAnalysis and corresponding stages

---
 .../stage/FileExtensionSwitch.java            |  29 +++++
 .../methodcallWithPorts/stage/Merger.java     |  74 ------------
 .../stage/basic/merger/IMergerStrategy.java   |  27 +++++
 .../stage/basic/merger/Merger.java            |  77 +++++++++++++
 .../basic/merger/RoundRobinStrategy.java      |  54 +++++++++
 .../stage/kieker/File2RecordFilter.java       | 106 ++++++++++++++++++
 .../kieker/MonitoringLogDirectory2Files.java  |  88 +++++++++++++++
 .../recordReader/RecordReaderAnalysis.java    | 103 +++++++++++++++++
 .../RecordReaderAnalysisTest.java             |  57 ++++++++++
 9 files changed, 541 insertions(+), 74 deletions(-)
 create mode 100644 src/main/java/teetime/variant/methodcallWithPorts/stage/FileExtensionSwitch.java
 delete mode 100644 src/main/java/teetime/variant/methodcallWithPorts/stage/Merger.java
 create mode 100644 src/main/java/teetime/variant/methodcallWithPorts/stage/basic/merger/IMergerStrategy.java
 create mode 100644 src/main/java/teetime/variant/methodcallWithPorts/stage/basic/merger/Merger.java
 create mode 100644 src/main/java/teetime/variant/methodcallWithPorts/stage/basic/merger/RoundRobinStrategy.java
 create mode 100644 src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/File2RecordFilter.java
 create mode 100644 src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/MonitoringLogDirectory2Files.java
 create mode 100644 src/test/java/teetime/variant/methodcallWithPorts/examples/recordReader/RecordReaderAnalysis.java
 create mode 100644 src/test/java/teetime/variant/methodcallWithPorts/examples/recordReader/RecordReaderAnalysisTest.java

diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/FileExtensionSwitch.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/FileExtensionSwitch.java
new file mode 100644
index 00000000..f57ea5f8
--- /dev/null
+++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/FileExtensionSwitch.java
@@ -0,0 +1,29 @@
+package teetime.variant.methodcallWithPorts.stage;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+
+import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage;
+import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
+
+import com.google.common.io.Files;
+
+public class FileExtensionSwitch extends ConsumerStage<File, File> {
+
+	private final Map<String, OutputPort<File>> fileExtensions = new HashMap<String, OutputPort<File>>();
+
+	@Override
+	protected void execute5(final File file) {
+		String fileExtension = Files.getFileExtension(file.getAbsolutePath());
+		OutputPort<File> outputPort = this.fileExtensions.get(fileExtension);
+		outputPort.send(file);
+	}
+
+	public OutputPort<File> addFileExtension(final String fileExtension) {
+		OutputPort<File> outputPort = new OutputPort<File>();
+		this.fileExtensions.put(fileExtension, outputPort);
+		return outputPort;
+	}
+
+}
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/Merger.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/Merger.java
deleted file mode 100644
index aae8c293..00000000
--- a/src/main/java/teetime/variant/methodcallWithPorts/stage/Merger.java
+++ /dev/null
@@ -1,74 +0,0 @@
-package teetime.variant.methodcallWithPorts.stage;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import teetime.util.concurrent.spsc.Pow2;
-import teetime.util.list.CommittableQueue;
-import teetime.variant.methodcallWithPorts.framework.core.AbstractStage;
-import teetime.variant.methodcallWithPorts.framework.core.InputPort;
-
-public class Merger<T> extends AbstractStage<T, T> {
-
-	// TODO do not inherit from AbstractStage since it provides the default input port that is unnecessary for the merger
-
-	private final List<InputPort<T>> inputPortList = new ArrayList<InputPort<T>>();
-	private int nextInputPortIndex;
-	private int size;
-	private InputPort<T>[] inputPorts;
-
-	@Override
-	protected void execute4(final CommittableQueue<T> elements) {
-		// TODO Auto-generated method stub
-
-	}
-
-	@Override
-	protected void execute5(final T element) {
-		this.send(element);
-	}
-
-	@Override
-	public void executeWithPorts() {
-		InputPort<T> inputPort = this.inputPorts[this.nextInputPortIndex % this.size];
-		T element = inputPort.receive();
-		// if (element == null) {
-		// return;
-		// }
-
-		this.nextInputPortIndex++;
-		InputPort<T> nextInputPort = this.inputPorts[this.nextInputPortIndex % this.size];
-		this.setReschedulable(nextInputPort.getPipe().size() > 0);
-
-		this.execute5(element);
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public void onStart() {
-		this.size = this.inputPortList.size();
-		// this.mask = this.size - 1;
-
-		int sizeInPow2 = Pow2.findNextPositivePowerOfTwo(this.size); // is not necessary so far
-		this.inputPorts = this.inputPortList.toArray(new InputPort[sizeInPow2]);
-		// System.out.println("inputPorts: " + this.inputPorts);
-	}
-
-	@Override
-	public InputPort<T> getInputPort() {
-		return this.getNewInputPort();
-	}
-
-	private InputPort<T> getNewInputPort() {
-		InputPort<T> inputPort = new InputPort<T>();
-		this.inputPortList.add(inputPort);
-		return inputPort;
-	}
-
-	@Override
-	public void onIsPipelineHead() {
-		// TODO Auto-generated method stub
-
-	}
-
-}
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/merger/IMergerStrategy.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/merger/IMergerStrategy.java
new file mode 100644
index 00000000..09754083
--- /dev/null
+++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/merger/IMergerStrategy.java
@@ -0,0 +1,27 @@
+/***************************************************************************
+ * Copyright 2014 Kieker Project (http://kieker-monitoring.net)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ***************************************************************************/
+package teetime.variant.methodcallWithPorts.stage.basic.merger;
+
+/**
+ * @author Nils Christian Ehmke
+ * 
+ * @since 1.10
+ */
+public interface IMergerStrategy<T> {
+
+	public T getNextInput(Merger<T> merger);
+
+}
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/merger/Merger.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/merger/Merger.java
new file mode 100644
index 00000000..23238ec8
--- /dev/null
+++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/merger/Merger.java
@@ -0,0 +1,77 @@
+/***************************************************************************
+ * Copyright 2014 Kieker Project (http://kieker-monitoring.net)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ***************************************************************************/
+
+package teetime.variant.methodcallWithPorts.stage.basic.merger;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import teetime.variant.explicitScheduling.framework.core.Description;
+import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage;
+import teetime.variant.methodcallWithPorts.framework.core.InputPort;
+
+/**
+ * 
+ * @author Christian Wulf
+ * 
+ * @since 1.10
+ * 
+ * @param <T>
+ *            the type of the input ports and the output port
+ */
+@Description("This stage merges data from the input ports, by taking elements according to the chosen merge strategy and by putting them to the output port.")
+public class Merger<T> extends ConsumerStage<T, T> {
+
+	// TODO do not inherit from AbstractStage since it provides the default input port that is unnecessary for the merger
+
+	private final List<InputPort<T>> inputPortList = new ArrayList<InputPort<T>>();
+
+	private IMergerStrategy<T> strategy = new RoundRobinStrategy<T>();
+
+	public IMergerStrategy<T> getStrategy() {
+		return this.strategy;
+	}
+
+	public void setStrategy(final IMergerStrategy<T> strategy) {
+		this.strategy = strategy;
+	}
+
+	@Override
+	protected void execute5(final T element) {
+		final T token = this.strategy.getNextInput(this);
+		if (token == null) {
+			return;
+		}
+
+		this.send(token);
+	}
+
+	@Override
+	public InputPort<T> getInputPort() {
+		return this.getNewInputPort();
+	}
+
+	private InputPort<T> getNewInputPort() {
+		InputPort<T> inputPort = new InputPort<T>();
+		this.inputPortList.add(inputPort);
+		return inputPort;
+	}
+
+	public List<InputPort<T>> getInputPortList() {
+		return this.inputPortList;
+	}
+
+}
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/merger/RoundRobinStrategy.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/merger/RoundRobinStrategy.java
new file mode 100644
index 00000000..337b7662
--- /dev/null
+++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/merger/RoundRobinStrategy.java
@@ -0,0 +1,54 @@
+/***************************************************************************
+ * Copyright 2014 Kieker Project (http://kieker-monitoring.net)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ***************************************************************************/
+package teetime.variant.methodcallWithPorts.stage.basic.merger;
+
+import java.util.List;
+
+import teetime.variant.methodcallWithPorts.framework.core.InputPort;
+
+/**
+ * @author Nils Christian Ehmke
+ * 
+ * @since 1.10
+ */
+public final class RoundRobinStrategy<T> implements IMergerStrategy<T> {
+
+	private int index = 0;
+
+	@Override
+	public T getNextInput(final Merger<T> merger) {
+		List<InputPort<T>> inputPorts = merger.getInputPortList();
+		int size = inputPorts.size();
+		// check each port at most once to avoid a potentially infinite loop
+		while (size-- > 0) {
+			InputPort<T> inputPort = this.getNextPortInRoundRobinOrder(inputPorts);
+			final T token = inputPort.receive();
+			if (token != null) {
+				return token;
+			}
+		}
+		return null;
+	}
+
+	private InputPort<T> getNextPortInRoundRobinOrder(final List<InputPort<T>> inputPorts) {
+		InputPort<T> inputPort = inputPorts.get(this.index);
+
+		this.index = (this.index + 1) % inputPorts.size();
+
+		return inputPort;
+	}
+
+}
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/File2RecordFilter.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/File2RecordFilter.java
new file mode 100644
index 00000000..6132236d
--- /dev/null
+++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/File2RecordFilter.java
@@ -0,0 +1,106 @@
+/***************************************************************************
+ * Copyright 2014 Kieker Project (http://kieker-monitoring.net)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ***************************************************************************/
+package teetime.variant.methodcallWithPorts.stage.kieker;
+
+import java.io.File;
+
+import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
+import teetime.variant.methodcallWithPorts.framework.core.Pipeline;
+import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe;
+import teetime.variant.methodcallWithPorts.stage.FileExtensionSwitch;
+import teetime.variant.methodcallWithPorts.stage.basic.merger.Merger;
+import teetime.variant.methodcallWithPorts.stage.io.Directory2FilesFilter;
+import teetime.variant.methodcallWithPorts.stage.kieker.className.ClassNameRegistryCreationFilter;
+import teetime.variant.methodcallWithPorts.stage.kieker.className.ClassNameRegistryRepository;
+import teetime.variant.methodcallWithPorts.stage.kieker.fileToRecord.BinaryFile2RecordFilter;
+import teetime.variant.methodcallWithPorts.stage.kieker.fileToRecord.DatFile2RecordFilter;
+import teetime.variant.methodcallWithPorts.stage.kieker.fileToRecord.ZipFile2RecordFilter;
+
+import kieker.common.record.IMonitoringRecord;
+import kieker.common.util.filesystem.BinaryCompressionMethod;
+import kieker.common.util.filesystem.FSUtil;
+
+/**
+ * @author Christian Wulf
+ * 
+ * @since 1.10
+ */
+public class File2RecordFilter extends Pipeline<File, IMonitoringRecord> {
+
+	private ClassNameRegistryRepository classNameRegistryRepository;
+
+	/**
+	 * @since 1.10
+	 */
+	public File2RecordFilter(final ClassNameRegistryRepository classNameRegistryRepository) {
+		this.classNameRegistryRepository = classNameRegistryRepository;
+
+		// FIXME does not yet work with more than one thread due to classNameRegistryRepository: classNameRegistryRepository is set after the ctor
+		// create stages
+		final ClassNameRegistryCreationFilter classNameRegistryCreationFilter = new ClassNameRegistryCreationFilter(this.classNameRegistryRepository);
+		final Directory2FilesFilter directory2FilesFilter = new Directory2FilesFilter();
+
+		final FileExtensionSwitch fileExtensionSwitch = new FileExtensionSwitch();
+
+		final DatFile2RecordFilter datFile2RecordFilter = new DatFile2RecordFilter(this.classNameRegistryRepository);
+		final BinaryFile2RecordFilter binaryFile2RecordFilter = new BinaryFile2RecordFilter(this.classNameRegistryRepository);
+		final ZipFile2RecordFilter zipFile2RecordFilter = new ZipFile2RecordFilter();
+
+		final Merger<IMonitoringRecord> recordMerger = new Merger<IMonitoringRecord>();
+
+		// store ports due to readability reasons
+		final OutputPort<File> normalFileOutputPort = fileExtensionSwitch.addFileExtension(FSUtil.NORMAL_FILE_EXTENSION);
+		final OutputPort<File> binFileOutputPort = fileExtensionSwitch.addFileExtension(BinaryCompressionMethod.NONE.getFileExtension());
+		final OutputPort<File> zipFileOutputPort = fileExtensionSwitch.addFileExtension(FSUtil.ZIP_FILE_EXTENSION);
+
+		// connect ports by pipes
+		SingleElementPipe.connect(classNameRegistryCreationFilter.getOutputPort(), directory2FilesFilter.getInputPort());
+		SingleElementPipe.connect(directory2FilesFilter.getOutputPort(), fileExtensionSwitch.getInputPort());
+
+		SingleElementPipe.connect(normalFileOutputPort, datFile2RecordFilter.getInputPort());
+		SingleElementPipe.connect(binFileOutputPort, binaryFile2RecordFilter.getInputPort());
+		SingleElementPipe.connect(zipFileOutputPort, zipFile2RecordFilter.getInputPort());
+
+		SingleElementPipe.connect(datFile2RecordFilter.getOutputPort(), recordMerger.getInputPort());
+		SingleElementPipe.connect(binaryFile2RecordFilter.getOutputPort(), recordMerger.getInputPort());
+		SingleElementPipe.connect(zipFile2RecordFilter.getOutputPort(), recordMerger.getInputPort());
+
+		// prepare pipeline
+		this.setFirstStage(classNameRegistryCreationFilter);
+		this.addIntermediateStage(directory2FilesFilter);
+		this.addIntermediateStage(fileExtensionSwitch);
+		this.addIntermediateStage(datFile2RecordFilter);
+		this.addIntermediateStage(binaryFile2RecordFilter);
+		this.addIntermediateStage(zipFile2RecordFilter);
+		this.setLastStage(recordMerger);
+	}
+
+	/**
+	 * @since 1.10
+	 */
+	public File2RecordFilter() {
+		this(null);
+	}
+
+	public ClassNameRegistryRepository getClassNameRegistryRepository() {
+		return this.classNameRegistryRepository;
+	}
+
+	public void setClassNameRegistryRepository(final ClassNameRegistryRepository classNameRegistryRepository) {
+		this.classNameRegistryRepository = classNameRegistryRepository;
+	}
+
+}
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/MonitoringLogDirectory2Files.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/MonitoringLogDirectory2Files.java
new file mode 100644
index 00000000..d420b145
--- /dev/null
+++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/MonitoringLogDirectory2Files.java
@@ -0,0 +1,88 @@
+/***************************************************************************
+ * Copyright 2014 Kieker Project (http://kieker-monitoring.net)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ***************************************************************************/
+package teetime.variant.methodcallWithPorts.stage.kieker;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.util.Comparator;
+
+import kieker.common.util.filesystem.BinaryCompressionMethod;
+import kieker.common.util.filesystem.FSUtil;
+
+import teetime.variant.explicitScheduling.framework.core.Context;
+import teetime.variant.explicitScheduling.framework.core.IInputPort;
+import teetime.variant.explicitScheduling.stage.io.Directory2FilesFilter;
+
+/**
+ * @author Christian Wulf
+ *
+ * @since 1.10
+ */
+public class MonitoringLogDirectory2Files extends Directory2FilesFilter {
+
+	public final IInputPort<Directory2FilesFilter, String> filePrefixInputPort = this.createInputPort();
+
+	/**
+	 * @author Christian Wulf
+	 *
+	 * @since 1.10
+	 */
+	static class MonitoringLogFileFilter implements FileFilter {
+		private String filePrefix;
+
+		@Override
+		public boolean accept(final File pathname) {
+			final String name = pathname.getName();
+			return pathname.isFile()
+					&& name.startsWith(this.filePrefix)
+					&& (name.endsWith(FSUtil.NORMAL_FILE_EXTENSION) || BinaryCompressionMethod.hasValidFileExtension(name));
+		}
+
+		public String getFilePrefix() {
+			return this.filePrefix;
+		}
+
+		public void setFilePrefix(final String filePrefix) {
+			this.filePrefix = filePrefix;
+		}
+	}
+
+	private static final Comparator<File> FILE_COMPARATOR = new Comparator<File>() {
+		@Override
+		public final int compare(final File f1, final File f2) {
+			return f1.compareTo(f2); // simplified (we expect no dirs!)
+		}
+	};
+
+	/**
+	 * @since 1.10
+	 */
+	public MonitoringLogDirectory2Files() {
+		super(new MonitoringLogFileFilter(), FILE_COMPARATOR);
+	}
+
+	@Override
+	protected boolean execute(final Context<Directory2FilesFilter> context) {
+		final String filePrefix = context.tryTake(this.filePrefixInputPort);
+		if (filePrefix == null) {
+			return false;
+		}
+
+		((MonitoringLogFileFilter) this.getFilter()).setFilePrefix(filePrefix);
+
+		return super.execute(context);
+	}
+}
diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/recordReader/RecordReaderAnalysis.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/recordReader/RecordReaderAnalysis.java
new file mode 100644
index 00000000..1c4dee83
--- /dev/null
+++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/recordReader/RecordReaderAnalysis.java
@@ -0,0 +1,103 @@
+/***************************************************************************
+ * Copyright 2014 Kieker Project (http://kieker-monitoring.net)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ***************************************************************************/
+package teetime.variant.methodcallWithPorts.examples.recordReader;
+
+import java.io.File;
+import java.util.LinkedList;
+import java.util.List;
+
+import teetime.util.ConstructorClosure;
+import teetime.variant.explicitScheduling.examples.throughput.TimestampObject;
+import teetime.variant.explicitScheduling.framework.core.Analysis;
+import teetime.variant.methodcallWithPorts.framework.core.Pipeline;
+import teetime.variant.methodcallWithPorts.framework.core.RunnableStage;
+import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe;
+import teetime.variant.methodcallWithPorts.stage.CollectorSink;
+import teetime.variant.methodcallWithPorts.stage.kieker.File2RecordFilter;
+import teetime.variant.methodcallWithPorts.stage.kieker.className.ClassNameRegistryRepository;
+
+import kieker.common.record.IMonitoringRecord;
+
+/**
+ * @author Christian Wulf
+ * 
+ * @since 1.10
+ */
+public class RecordReaderAnalysis extends Analysis {
+
+	private int numInputObjects;
+	private ConstructorClosure<TimestampObject> inputObjectCreator;
+	private int numNoopFilters;
+
+	private final List<IMonitoringRecord> timestampObjectsList = new LinkedList<IMonitoringRecord>();
+
+	private Thread producerThread;
+
+	private ClassNameRegistryRepository classNameRegistryRepository;
+
+	@Override
+	public void init() {
+		super.init();
+		Pipeline<File, Object> producerPipeline = this.buildProducerPipeline(this.numInputObjects, this.inputObjectCreator);
+		this.producerThread = new Thread(new RunnableStage(producerPipeline));
+	}
+
+	private Pipeline<File, Object> buildProducerPipeline(final int numInputObjects, final ConstructorClosure<TimestampObject> inputObjectCreator) {
+		this.classNameRegistryRepository = new ClassNameRegistryRepository();
+		// create stages
+		File2RecordFilter file2RecordFilter = new File2RecordFilter(this.classNameRegistryRepository);
+		CollectorSink<IMonitoringRecord> collector = new CollectorSink<IMonitoringRecord>(this.timestampObjectsList);
+
+		final Pipeline<File, Object> pipeline = new Pipeline<File, Object>();
+		pipeline.setFirstStage(file2RecordFilter);
+		pipeline.setLastStage(collector);
+
+		SingleElementPipe.connect(file2RecordFilter.getOutputPort(), collector.getInputPort());
+
+		return pipeline;
+	}
+
+	@Override
+	public void start() {
+		super.start();
+
+		this.producerThread.start();
+
+		try {
+			this.producerThread.join();
+		} catch (InterruptedException e) {
+			throw new IllegalStateException(e);
+		}
+	}
+
+	public void setInput(final int numInputObjects, final ConstructorClosure<TimestampObject> inputObjectCreator) {
+		this.numInputObjects = numInputObjects;
+		this.inputObjectCreator = inputObjectCreator;
+	}
+
+	public int getNumNoopFilters() {
+		return this.numNoopFilters;
+	}
+
+	public void setNumNoopFilters(final int numNoopFilters) {
+		this.numNoopFilters = numNoopFilters;
+	}
+
+	public List<IMonitoringRecord> getTimestampObjectsList() {
+		return this.timestampObjectsList;
+	}
+
+}
diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/recordReader/RecordReaderAnalysisTest.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/recordReader/RecordReaderAnalysisTest.java
new file mode 100644
index 00000000..9eddc8fa
--- /dev/null
+++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/recordReader/RecordReaderAnalysisTest.java
@@ -0,0 +1,57 @@
+/***************************************************************************
+ * Copyright 2014 Kieker Project (http://kieker-monitoring.net)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ***************************************************************************/
+package teetime.variant.methodcallWithPorts.examples.recordReader;
+
+import org.junit.Test;
+
+import teetime.util.ConstructorClosure;
+import teetime.variant.explicitScheduling.examples.throughput.TimestampObject;
+import test.PerformanceTest;
+
+/**
+ * @author Christian Wulf
+ * 
+ * @since 1.10
+ */
+public class RecordReaderAnalysisTest extends PerformanceTest {
+
+	@Test
+	public void performAnalysis(final int numThreads) {
+		System.out.println("Testing teetime (mc) with NUM_OBJECTS_TO_CREATE=" + NUM_OBJECTS_TO_CREATE + ", NUM_NOOP_FILTERS="
+				+ NUM_NOOP_FILTERS + "...");
+
+		final RecordReaderAnalysis analysis = new RecordReaderAnalysis();
+		analysis.setNumNoopFilters(NUM_NOOP_FILTERS);
+		analysis.setInput(NUM_OBJECTS_TO_CREATE, new ConstructorClosure<TimestampObject>() {
+			@Override
+			public TimestampObject create() {
+				return new TimestampObject();
+			}
+		});
+		analysis.init();
+
+		this.stopWatch.start();
+		try {
+			analysis.start();
+		} finally {
+			this.stopWatch.end();
+			analysis.onTerminate();
+		}
+
+		// this.timestampObjects = analysis.getTimestampObjectsList();
+	}
+
+}
-- 
GitLab