diff --git a/src/main/java/teetime/framework/pipe/ConcurrentBlockingIntraThreadPipe.java b/src/main/java/teetime/framework/pipe/ConcurrentBlockingIntraThreadPipe.java
new file mode 100644
index 0000000000000000000000000000000000000000..635f483a0b9f1d99175202dbd565919a0055cefe
--- /dev/null
+++ b/src/main/java/teetime/framework/pipe/ConcurrentBlockingIntraThreadPipe.java
@@ -0,0 +1,38 @@
+package teetime.framework.pipe;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import teetime.framework.AbstractIntraThreadPipe;
+import teetime.framework.InputPort;
+import teetime.framework.OutputPort;
+
+public final class ConcurrentBlockingIntraThreadPipe<T> extends AbstractIntraThreadPipe {
+
+	private final ConcurrentLinkedQueue<Object> queue;
+
+	ConcurrentBlockingIntraThreadPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
+		super(sourcePort, targetPort);
+		queue = new ConcurrentLinkedQueue<Object>();
+	}
+
+	@Override
+	public boolean add(final Object element) {
+		return queue.add(element);
+	}
+
+	@Override
+	public boolean isEmpty() {
+		return queue.isEmpty();
+	}
+
+	@Override
+	public int size() {
+		return queue.size();
+	}
+
+	@Override
+	public Object removeLast() {
+		return queue.poll();
+	}
+
+}
diff --git a/src/main/java/teetime/framework/pipe/ConcurrentBlockingIntraThreadPipeFactory.java b/src/main/java/teetime/framework/pipe/ConcurrentBlockingIntraThreadPipeFactory.java
new file mode 100644
index 0000000000000000000000000000000000000000..71a2484d3a91fdd943bee5dea87afa3cfe3062fc
--- /dev/null
+++ b/src/main/java/teetime/framework/pipe/ConcurrentBlockingIntraThreadPipeFactory.java
@@ -0,0 +1,50 @@
+/**
+ * Copyright (C) 2015 TeeTime (http://teetime.sourceforge.net)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package teetime.framework.pipe;
+
+import teetime.framework.InputPort;
+import teetime.framework.OutputPort;
+import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
+import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
+
+public final class ConcurrentBlockingIntraThreadPipeFactory implements IPipeFactory {
+
+	@Override
+	public <T> IPipe create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
+		return this.create(sourcePort, targetPort, 4);
+	}
+
+	@Override
+	public <T> IPipe create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
+		return new ConcurrentBlockingIntraThreadPipe<T>(sourcePort, targetPort);
+	}
+
+	@Override
+	public ThreadCommunication getThreadCommunication() {
+		return ThreadCommunication.INTRA;
+	}
+
+	@Override
+	public PipeOrdering getOrdering() {
+		return PipeOrdering.QUEUE_BASED;
+	}
+
+	@Override
+	public boolean isGrowable() {
+		return true;
+	}
+
+}