Skip to content
Snippets Groups Projects
Commit 06a152a1 authored by Christian Wulf's avatar Christian Wulf
Browse files

removed stage AbstractTcpReader;

added util class AbstractTcpReader
parent 5b72ab03
No related branches found
No related tags found
No related merge requests found
#FindBugs User Preferences
#Fri Dec 19 13:43:52 CET 2014
#Wed Jan 28 07:11:46 CET 2015
detector_threshold=3
effort=max
excludefilter0=.fbExcludeFilterFile|true
......
package teetime.stage.io;
package teetime.util.io.network;
import java.io.IOException;
import java.net.InetSocketAddress;
......@@ -7,31 +7,38 @@ import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import teetime.framework.AbstractProducerStage;
import org.slf4j.Logger;
public abstract class AbstractTcpReader<T> extends AbstractProducerStage<T> {
public abstract class AbstractTcpReader implements Runnable {
private final int port;
private final int bufferCapacity;
@SuppressWarnings("PMD.LoggerIsNotStaticFinal")
protected final Logger logger;
public AbstractTcpReader(final int port, final int bufferCapacity) {
private volatile boolean terminated;
public AbstractTcpReader(final int port, final int bufferCapacity, final Logger logger) {
super();
this.port = port;
this.bufferCapacity = bufferCapacity;
this.logger = logger;
}
@Override
protected void execute() {
public final void run() {
ServerSocketChannel serversocket = null;
try {
serversocket = ServerSocketChannel.open();
serversocket.socket().bind(new InetSocketAddress(this.port));
logger.debug("Listening on port " + this.port);
if (logger.isDebugEnabled()) {
logger.debug("Listening on port " + this.port);
}
final SocketChannel socketChannel = serversocket.accept();
try {
final ByteBuffer buffer = ByteBuffer.allocateDirect(bufferCapacity);
while (socketChannel.read(buffer) != -1) {
while (socketChannel.read(buffer) != -1 && !terminated) {
process(buffer);
}
} finally {
......@@ -47,8 +54,6 @@ public abstract class AbstractTcpReader<T> extends AbstractProducerStage<T> {
logger.debug("Failed to close TCP connection.", e);
}
}
this.terminate();
}
}
......@@ -57,7 +62,7 @@ public abstract class AbstractTcpReader<T> extends AbstractProducerStage<T> {
try {
while (buffer.hasRemaining()) {
buffer.mark();
boolean success = this.read(buffer);
boolean success = this.onBufferReceived(buffer);
if (!success) {
buffer.reset();
buffer.compact();
......@@ -66,7 +71,7 @@ public abstract class AbstractTcpReader<T> extends AbstractProducerStage<T> {
}
buffer.clear();
} catch (final BufferUnderflowException ex) {
logger.warn("Unexpected exception. Resetting and compacting buffer.", ex);
logger.warn("Unexpected buffer underflow. Resetting and compacting buffer.", ex);
buffer.reset();
buffer.compact();
}
......@@ -79,6 +84,13 @@ public abstract class AbstractTcpReader<T> extends AbstractProducerStage<T> {
* <li><code>true</code> when there were enough bytes to perform the read operation
* <li><code>false</code> otherwise. In this case, the buffer is reset, compacted, and filled with new content.
*/
protected abstract boolean read(final ByteBuffer buffer);
protected abstract boolean onBufferReceived(final ByteBuffer buffer);
public void terminate() {
this.terminated = true;
}
public int getPort() {
return port;
}
}
wiki @ 0e447457
Subproject commit a93581905ef7b0584d52eae1898148ffa6201a31
Subproject commit 0e4474577e1f49bc96e734c286b2d9e0363895e8
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment