From 4f6db509f9251d801212b236cf209f71b6b057cd Mon Sep 17 00:00:00 2001
From: Christian Wulf <chw@informatik.uni-kiel.de>
Date: Fri, 15 Aug 2014 10:59:53 +0200
Subject: [PATCH] added further pipe factories

---
 .../framework/core/pipe/IPipe.java            |  3 ++
 .../framework/core/pipe/IPipeFactory.java     |  9 ++++++
 .../core/pipe/OrderedGrowableArrayPipe.java   | 10 ++++--
 .../pipe/OrderedGrowableArrayPipeFactory.java | 18 +++++++++++
 .../core/pipe/OrderedGrowablePipe.java        | 11 +++++--
 .../framework/core/pipe/Pipe.java             | 19 ++++++++----
 .../framework/core/pipe/PipeFactory.java      | 15 ++++-----
 .../core/pipe/SingleElementPipe.java          | 10 ++++--
 .../core/pipe/SingleElementPipeFactory.java   | 15 +++++++++
 .../framework/core/pipe/SpScPipe.java         |  8 +++--
 .../framework/core/pipe/SpScPipeFactory.java  | 17 ++++++++++
 .../core/pipe/UnorderedGrowablePipe.java      | 10 ++++--
 .../pipe/UnorderedGrowablePipeFactory.java    | 31 +++++++++++++++++++
 .../RecordReaderConfiguration.java            | 22 +++++++++++--
 14 files changed, 172 insertions(+), 26 deletions(-)
 create mode 100644 src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/UnorderedGrowablePipeFactory.java

diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IPipe.java
index e09d4fe8..e7d500b5 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IPipe.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IPipe.java
@@ -1,6 +1,7 @@
 package teetime.variant.methodcallWithPorts.framework.core.pipe;
 
 import teetime.variant.methodcallWithPorts.framework.core.InputPort;
+import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
 import teetime.variant.methodcallWithPorts.framework.core.Signal;
 
 public interface IPipe<T> {
@@ -25,4 +26,6 @@ public interface IPipe<T> {
 
 	void setSignal(Signal signal);
 
+	void connectPorts(OutputPort<T> sourcePort, InputPort<T> targetPort);
+
 }
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IPipeFactory.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IPipeFactory.java
index 8e55a094..b0b1b7a8 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IPipeFactory.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IPipeFactory.java
@@ -1,7 +1,16 @@
 package teetime.variant.methodcallWithPorts.framework.core.pipe;
 
+import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.PipeOrdering;
+import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.ThreadCommunication;
+
 public interface IPipeFactory {
 
 	<T> IPipe<T> create(int capacity);
 
+	ThreadCommunication getThreadCommunication();
+
+	PipeOrdering getOrdering();
+
+	boolean isGrowable();
+
 }
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowableArrayPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowableArrayPipe.java
index d289f4c8..270556dc 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowableArrayPipe.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowableArrayPipe.java
@@ -18,10 +18,16 @@ public class OrderedGrowableArrayPipe<T> extends IntraThreadPipe<T> {
 		this.elements = new CircularArray<T>(initialCapacity);
 	}
 
+	@Deprecated
 	public static <T> void connect(final OutputPort<T> sourcePort, final InputPort<T> targetPort) {
 		IPipe<T> pipe = new OrderedGrowableArrayPipe<T>();
-		sourcePort.setPipe(pipe);
-		targetPort.setPipe(pipe);
+		pipe.connectPorts(sourcePort, targetPort);
+	}
+
+	@Override
+	public void connectPorts(final OutputPort<T> sourcePort, final InputPort<T> targetPort) {
+		sourcePort.setPipe(this);
+		targetPort.setPipe(this);
 		sourcePort.setCachedTargetStage(targetPort.getOwningStage());
 	}
 
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowableArrayPipeFactory.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowableArrayPipeFactory.java
index 80445efe..b7d10d2e 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowableArrayPipeFactory.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowableArrayPipeFactory.java
@@ -1,5 +1,8 @@
 package teetime.variant.methodcallWithPorts.framework.core.pipe;
 
+import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.PipeOrdering;
+import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.ThreadCommunication;
+
 public class OrderedGrowableArrayPipeFactory implements IPipeFactory {
 
 	/**
@@ -10,4 +13,19 @@ public class OrderedGrowableArrayPipeFactory implements IPipeFactory {
 		return new OrderedGrowableArrayPipe<T>();
 	}
 
+	@Override
+	public ThreadCommunication getThreadCommunication() {
+		return ThreadCommunication.INTRA;
+	}
+
+	@Override
+	public PipeOrdering getOrdering() {
+		return PipeOrdering.QUEUE_BASED;
+	}
+
+	@Override
+	public boolean isGrowable() {
+		return true;
+	}
+
 }
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowablePipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowablePipe.java
index a69db373..4c22e36c 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowablePipe.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowablePipe.java
@@ -17,10 +17,16 @@ public class OrderedGrowablePipe<T> extends IntraThreadPipe<T> {
 		this.elements = new LinkedList<T>();
 	}
 
+	@Deprecated
 	public static <T> void connect(final OutputPort<T> sourcePort, final InputPort<T> targetPort) {
 		IPipe<T> pipe = new OrderedGrowablePipe<T>();
-		sourcePort.setPipe(pipe);
-		targetPort.setPipe(pipe);
+		pipe.connectPorts(sourcePort, targetPort);
+	}
+
+	@Override
+	public void connectPorts(final OutputPort<T> sourcePort, final InputPort<T> targetPort) {
+		sourcePort.setPipe(this);
+		targetPort.setPipe(this);
 		sourcePort.setCachedTargetStage(targetPort.getOwningStage());
 	}
 
@@ -48,4 +54,5 @@ public class OrderedGrowablePipe<T> extends IntraThreadPipe<T> {
 	public int size() {
 		return this.elements.size();
 	}
+
 }
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/Pipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/Pipe.java
index 8d8053f6..eefd95f7 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/Pipe.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/Pipe.java
@@ -8,16 +8,22 @@ public class Pipe<T> extends IntraThreadPipe<T> {
 
 	private final CommittableResizableArrayQueue<T> elements = new CommittableResizableArrayQueue<T>(null, 4);
 
+	@Deprecated
 	public static <T> void connect(final OutputPort<T> sourcePort, final InputPort<T> targetPort) {
 		IPipe<T> pipe = new Pipe<T>();
-		sourcePort.setPipe(pipe);
-		targetPort.setPipe(pipe);
+		pipe.connectPorts(sourcePort, targetPort);
+	}
+
+	@Override
+	public void connectPorts(final OutputPort<T> sourcePort, final InputPort<T> targetPort) {
+		sourcePort.setPipe(this);
+		targetPort.setPipe(this);
 		sourcePort.setCachedTargetStage(targetPort.getOwningStage());
 	}
 
 	/*
 	 * (non-Javadoc)
-	 * 
+	 *
 	 * @see teetime.examples.throughput.methodcall.IPipe#add(T)
 	 */
 	@Override
@@ -29,7 +35,7 @@ public class Pipe<T> extends IntraThreadPipe<T> {
 
 	/*
 	 * (non-Javadoc)
-	 * 
+	 *
 	 * @see teetime.examples.throughput.methodcall.IPipe#removeLast()
 	 */
 	@Override
@@ -41,7 +47,7 @@ public class Pipe<T> extends IntraThreadPipe<T> {
 
 	/*
 	 * (non-Javadoc)
-	 * 
+	 *
 	 * @see teetime.examples.throughput.methodcall.IPipe#isEmpty()
 	 */
 	@Override
@@ -51,7 +57,7 @@ public class Pipe<T> extends IntraThreadPipe<T> {
 
 	/*
 	 * (non-Javadoc)
-	 * 
+	 *
 	 * @see teetime.examples.throughput.methodcall.IPipe#readLast()
 	 */
 	@Override
@@ -67,4 +73,5 @@ public class Pipe<T> extends IntraThreadPipe<T> {
 	public int size() {
 		return this.elements.size();
 	}
+
 }
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/PipeFactory.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/PipeFactory.java
index 5534a69d..1bc509be 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/PipeFactory.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/PipeFactory.java
@@ -9,7 +9,7 @@ public class PipeFactory {
 		INTER, INTRA
 	}
 
-	public enum Ordering {
+	public enum PipeOrdering {
 		/**
 		 * FIFO
 		 */
@@ -25,27 +25,28 @@ public class PipeFactory {
 
 	/**
 	 * Creates a new FIFO-ordered, growable pipe with an initial capacity of 1. <br>
-	 * <i>This method is suitable for most programmers.</i>
+	 * <i>This method is suitable for most situations.</i>
 	 *
 	 * @param tc
 	 * @return
 	 */
 	public <T> IPipe<T> create(final ThreadCommunication tc) {
-		return this.create(tc, Ordering.QUEUE_BASED, true, 1);
+		return this.create(tc, PipeOrdering.QUEUE_BASED, true, 1);
 	}
 
-	public <T> IPipe<T> create(final ThreadCommunication tc, final Ordering ordering, final boolean growable, final int capacity) {
+	public <T> IPipe<T> create(final ThreadCommunication tc, final PipeOrdering ordering, final boolean growable, final int capacity) {
 		String key = this.buildKey(tc, ordering, growable);
 		IPipeFactory pipeClass = this.pipeFactories.get(key);
 		return pipeClass.create(capacity);
 	}
 
-	private String buildKey(final ThreadCommunication tc, final Ordering ordering, final boolean growable) {
+	private String buildKey(final ThreadCommunication tc, final PipeOrdering ordering, final boolean growable) {
 		return tc.toString() + ordering.toString() + growable;
 	}
 
-	public void register(final IPipeFactory pipeFactory, final ThreadCommunication tc, final Ordering ordering, final boolean growable) {
-		String key = this.buildKey(tc, ordering, growable);
+	public void register(final IPipeFactory pipeFactory) {
+		String key = this.buildKey(pipeFactory.getThreadCommunication(), pipeFactory.getOrdering(), pipeFactory.isGrowable());
 		this.pipeFactories.put(key, pipeFactory);
 	}
+
 }
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipe.java
index c5e02be9..aebe4b58 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipe.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipe.java
@@ -7,10 +7,16 @@ public class SingleElementPipe<T> extends IntraThreadPipe<T> {
 
 	private T element;
 
+	@Deprecated
 	public static <T> void connect(final OutputPort<T> sourcePort, final InputPort<T> targetPort) {
 		IPipe<T> pipe = new SingleElementPipe<T>();
-		sourcePort.setPipe(pipe);
-		targetPort.setPipe(pipe);
+		pipe.connectPorts(sourcePort, targetPort);
+	}
+
+	@Override
+	public void connectPorts(final OutputPort<T> sourcePort, final InputPort<T> targetPort) {
+		sourcePort.setPipe(this);
+		targetPort.setPipe(this);
 		sourcePort.setCachedTargetStage(targetPort.getOwningStage());
 	}
 
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipeFactory.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipeFactory.java
index a6f83738..f557d3e7 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipeFactory.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipeFactory.java
@@ -1,5 +1,8 @@
 package teetime.variant.methodcallWithPorts.framework.core.pipe;
 
+import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.PipeOrdering;
+import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.ThreadCommunication;
+
 public class SingleElementPipeFactory implements IPipeFactory {
 
 	/**
@@ -10,4 +13,16 @@ public class SingleElementPipeFactory implements IPipeFactory {
 		return new SingleElementPipe<T>();
 	}
 
+	public ThreadCommunication getThreadCommunication() {
+		return ThreadCommunication.INTRA;
+	}
+
+	public PipeOrdering getOrdering() {
+		return PipeOrdering.ARBITRARY;
+	}
+
+	public boolean isGrowable() {
+		return false;
+	}
+
 }
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java
index 1c8994cc..ef956895 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java
@@ -24,16 +24,18 @@ public class SpScPipe<T> extends AbstractPipe<T> {
 		this.queue = QueueFactory.newQueue(concurrentQueueSpec);
 	}
 
+	@Deprecated
 	public static <T> SpScPipe<T> connect(final OutputPort<T> sourcePort, final InputPort<T> targetPort, final int capacity) {
 		SpScPipe<T> pipe = new SpScPipe<T>(capacity);
-		return pipe.connect(sourcePort, targetPort);
+		pipe.connectPorts(sourcePort, targetPort);
+		return pipe;
 	}
 
-	public SpScPipe<T> connect(final OutputPort<T> sourcePort, final InputPort<T> targetPort) {
+	@Override
+	public void connectPorts(final OutputPort<T> sourcePort, final InputPort<T> targetPort) {
 		targetPort.setPipe(this);
 		sourcePort.setPipe(this);
 		sourcePort.setCachedTargetStage(targetPort.getOwningStage());
-		return this;
 	}
 
 	@Override
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipeFactory.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipeFactory.java
index 745823c1..25d681d2 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipeFactory.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipeFactory.java
@@ -1,5 +1,8 @@
 package teetime.variant.methodcallWithPorts.framework.core.pipe;
 
+import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.PipeOrdering;
+import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.ThreadCommunication;
+
 public class SpScPipeFactory implements IPipeFactory {
 
 	@Override
@@ -7,4 +10,18 @@ public class SpScPipeFactory implements IPipeFactory {
 		return new SpScPipe<T>(capacity);
 	}
 
+	@Override
+	public ThreadCommunication getThreadCommunication() {
+		return ThreadCommunication.INTER;
+	}
+
+	@Override
+	public PipeOrdering getOrdering() {
+		return PipeOrdering.QUEUE_BASED;
+	}
+
+	@Override
+	public boolean isGrowable() {
+		return false;
+	}
 }
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/UnorderedGrowablePipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/UnorderedGrowablePipe.java
index 2b908268..71cc7e87 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/UnorderedGrowablePipe.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/UnorderedGrowablePipe.java
@@ -17,10 +17,16 @@ public class UnorderedGrowablePipe<T> extends IntraThreadPipe<T> {
 		this.elements = (T[]) new Object[this.MIN_CAPACITY];
 	}
 
+	@Deprecated
 	public static <T> void connect(final OutputPort<T> sourcePort, final InputPort<T> targetPort) {
 		IPipe<T> pipe = new UnorderedGrowablePipe<T>();
-		sourcePort.setPipe(pipe);
-		targetPort.setPipe(pipe);
+		pipe.connectPorts(sourcePort, targetPort);
+	}
+
+	@Override
+	public void connectPorts(final OutputPort<T> sourcePort, final InputPort<T> targetPort) {
+		sourcePort.setPipe(this);
+		targetPort.setPipe(this);
 		sourcePort.setCachedTargetStage(targetPort.getOwningStage());
 	}
 
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/UnorderedGrowablePipeFactory.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/UnorderedGrowablePipeFactory.java
new file mode 100644
index 00000000..362559e9
--- /dev/null
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/UnorderedGrowablePipeFactory.java
@@ -0,0 +1,31 @@
+package teetime.variant.methodcallWithPorts.framework.core.pipe;
+
+import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.PipeOrdering;
+import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.ThreadCommunication;
+
+public class UnorderedGrowablePipeFactory implements IPipeFactory {
+
+	/**
+	 * Hint: The capacity for this pipe implementation is ignored
+	 */
+	@Override
+	public <T> IPipe<T> create(final int capacity) {
+		return new UnorderedGrowablePipe<T>();
+	}
+
+	@Override
+	public ThreadCommunication getThreadCommunication() {
+		return ThreadCommunication.INTRA;
+	}
+
+	@Override
+	public PipeOrdering getOrdering() {
+		return PipeOrdering.STACK_BASED;
+	}
+
+	@Override
+	public boolean isGrowable() {
+		return true;
+	}
+
+}
diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/recordReader/RecordReaderConfiguration.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/recordReader/RecordReaderConfiguration.java
index f1adf927..149f97cb 100644
--- a/src/test/java/teetime/variant/methodcallWithPorts/examples/recordReader/RecordReaderConfiguration.java
+++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/recordReader/RecordReaderConfiguration.java
@@ -22,8 +22,15 @@ import java.util.List;
 import teetime.variant.methodcallWithPorts.framework.core.Configuration;
 import teetime.variant.methodcallWithPorts.framework.core.Pipeline;
 import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
-import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe;
+import teetime.variant.methodcallWithPorts.framework.core.pipe.IPipe;
+import teetime.variant.methodcallWithPorts.framework.core.pipe.OrderedGrowableArrayPipeFactory;
+import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory;
+import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.PipeOrdering;
+import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.ThreadCommunication;
+import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipeFactory;
 import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
+import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipeFactory;
+import teetime.variant.methodcallWithPorts.framework.core.pipe.UnorderedGrowablePipeFactory;
 import teetime.variant.methodcallWithPorts.stage.CollectorSink;
 import teetime.variant.methodcallWithPorts.stage.kieker.Dir2RecordsFilter;
 import teetime.variant.methodcallWithPorts.stage.kieker.className.ClassNameRegistryRepository;
@@ -38,6 +45,16 @@ import kieker.common.record.IMonitoringRecord;
 public class RecordReaderConfiguration extends Configuration {
 
 	private final List<IMonitoringRecord> elementCollection = new LinkedList<IMonitoringRecord>();
+	private final PipeFactory pipeFactory;
+
+	public RecordReaderConfiguration() {
+		// BETTER instantiate one single pipe factory for all analyzes and register all available pipe implementations once
+		this.pipeFactory = new PipeFactory();
+		this.pipeFactory.register(new SingleElementPipeFactory());
+		this.pipeFactory.register(new OrderedGrowableArrayPipeFactory());
+		this.pipeFactory.register(new UnorderedGrowablePipeFactory());
+		this.pipeFactory.register(new SpScPipeFactory());
+	}
 
 	public void buildConfiguration() {
 		StageWithPort producerPipeline = this.buildProducerPipeline();
@@ -54,7 +71,8 @@ public class RecordReaderConfiguration extends Configuration {
 		pipeline.setFirstStage(dir2RecordsFilter);
 		pipeline.setLastStage(collector);
 
-		SingleElementPipe.connect(dir2RecordsFilter.getOutputPort(), collector.getInputPort());
+		IPipe<IMonitoringRecord> pipe = this.pipeFactory.create(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false, 1);
+		pipe.connectPorts(dir2RecordsFilter.getOutputPort(), collector.getInputPort());
 
 		dir2RecordsFilter.getInputPort().setPipe(new SpScPipe<File>(1));
 		dir2RecordsFilter.getInputPort().getPipe().add(new File("src/test/data/bookstore-logs"));
-- 
GitLab