From 849905df8d48cd5200ba29a494bfa33342495315 Mon Sep 17 00:00:00 2001
From: Christian Wulf <chw@informatik.uni-kiel.de>
Date: Mon, 3 Nov 2014 21:25:05 +0100
Subject: [PATCH] added AbstractTcpReader

---
 .../teetime/stage/io/AbstractTcpReader.java   | 71 +++++++++++++++++++
 1 file changed, 71 insertions(+)
 create mode 100644 src/main/java/teetime/stage/io/AbstractTcpReader.java

diff --git a/src/main/java/teetime/stage/io/AbstractTcpReader.java b/src/main/java/teetime/stage/io/AbstractTcpReader.java
new file mode 100644
index 00000000..daf0a31d
--- /dev/null
+++ b/src/main/java/teetime/stage/io/AbstractTcpReader.java
@@ -0,0 +1,71 @@
+package teetime.stage.io;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+
+import teetime.framework.ProducerStage;
+
+public abstract class AbstractTcpReader<T> extends ProducerStage<T> {
+
+	private final int port;
+	private final int bufferCapacity;
+
+	public AbstractTcpReader(final int port, final int bufferCapacity) {
+		super();
+		this.port = port;
+		this.bufferCapacity = bufferCapacity;
+	}
+
+	@Override
+	protected void execute() {
+		ServerSocketChannel serversocket = null;
+		try {
+			serversocket = ServerSocketChannel.open();
+			serversocket.socket().bind(new InetSocketAddress(this.port));
+			logger.debug("Listening on port " + this.port);
+
+			final SocketChannel socketChannel = serversocket.accept();
+			try {
+				final ByteBuffer buffer = ByteBuffer.allocateDirect(bufferCapacity);
+				while (socketChannel.read(buffer) != -1) {
+					process(buffer);
+				}
+			} finally {
+				socketChannel.close();
+			}
+		} catch (final IOException ex) {
+			logger.error("Error while reading", ex);
+		} finally {
+			if (null != serversocket) {
+				try {
+					serversocket.close();
+				} catch (final IOException e) {
+					logger.debug("Failed to close TCP connection!", e);
+				}
+			}
+
+			this.terminate();
+		}
+	}
+
+	private void process(final ByteBuffer buffer) {
+		buffer.flip();
+		try {
+			while (buffer.hasRemaining()) {
+				buffer.mark();
+				this.read(buffer);
+			}
+			buffer.clear();
+		} catch (final BufferUnderflowException ex) {
+			buffer.reset();
+			buffer.compact();
+		}
+	}
+
+	protected abstract void read(final ByteBuffer buffer);
+
+}
-- 
GitLab