diff --git a/src/main/java/teetime/stage/io/network/TcpReaderStage.java b/src/main/java/teetime/stage/io/network/TcpReaderStage.java index f65ba5f87ca37c96a98c87deb93f65df4fa73be7..0a6f71dcf474f6102bfe5deee22ac4e1f545ecef 100644 --- a/src/main/java/teetime/stage/io/network/TcpReaderStage.java +++ b/src/main/java/teetime/stage/io/network/TcpReaderStage.java @@ -15,10 +15,16 @@ ***************************************************************************/ package teetime.stage.io.network; +import java.nio.ByteBuffer; + import teetime.framework.AbstractProducerStage; import teetime.util.network.AbstractRecordTcpReader; +import teetime.util.network.AbstractTcpReader; import kieker.common.record.IMonitoringRecord; +import kieker.common.record.misc.RegistryRecord; +import kieker.common.util.registry.ILookup; +import kieker.common.util.registry.Lookup; /** * This is a reader which reads the records from a TCP port. @@ -26,58 +32,75 @@ import kieker.common.record.IMonitoringRecord; * @author Jan Waller, Nils Christian Ehmke, Christian Wulf * */ -public class TcpReaderStage extends AbstractProducerStage<IMonitoringRecord> { +public final class TcpReaderStage extends AbstractProducerStage<IMonitoringRecord> { + + private final ILookup<String> stringRegistry = new Lookup<String>(); + private final AbstractRecordTcpReader tcpMonitoringRecordReader; + private final AbstractTcpReader tcpStringRecordReader; - private final AbstractRecordTcpReader recordTcpReader; + private Thread tcpStringRecordReaderThread; /** - * Default constructor with <code>port=10133</code> and <code>bufferCapacity=65535</code> + * Default constructor with <code>port1=10133</code>, <code>bufferCapacity=65535</code>, and <code>port2=10134</code> */ public TcpReaderStage() { - this(10133, 65535); + this(10133, 65535, 10134); } /** * - * @param port - * accept connections on this port + * @param port1 + * used to accept <code>IMonitoringRecord</code>s + * @param port2 + * used to accept <code>StringRecord</code>s * @param bufferCapacity * capacity of the receiving buffer */ - public TcpReaderStage(final int port, final int bufferCapacity) { + public TcpReaderStage(final int port1, final int bufferCapacity, final int port2) { super(); - this.recordTcpReader = new AbstractRecordTcpReader(port, bufferCapacity, logger) { + + this.tcpMonitoringRecordReader = new AbstractRecordTcpReader(port1, bufferCapacity, logger, stringRegistry) { @Override - protected void send(final IMonitoringRecord record) { + protected void onRecordReceived(final IMonitoringRecord record) { outputPort.send(record); } }; + + this.tcpStringRecordReader = new AbstractTcpReader(port2, bufferCapacity, logger) { + @Override + protected boolean onBufferReceived(final ByteBuffer buffer) { + RegistryRecord.registerRecordInRegistry(buffer, stringRegistry); + return true; + } + }; } @Override public void onStarting() throws Exception { super.onStarting(); - recordTcpReader.initialize(); + this.tcpStringRecordReaderThread = new Thread(tcpStringRecordReader); + this.tcpStringRecordReaderThread.start(); } @Override protected void execute() { - recordTcpReader.execute(); + tcpMonitoringRecordReader.run(); terminate(); } @Override public void onTerminating() throws Exception { - this.recordTcpReader.terminate(); + this.tcpStringRecordReader.terminate(); + this.tcpStringRecordReaderThread.interrupt(); super.onTerminating(); } public int getPort1() { - return recordTcpReader.getPort(); + return tcpMonitoringRecordReader.getPort(); } public int getPort2() { - return recordTcpReader.getPort2(); + return tcpStringRecordReader.getPort(); } } diff --git a/src/main/java/teetime/util/network/AbstractRecordTcpReader.java b/src/main/java/teetime/util/network/AbstractRecordTcpReader.java index 3d9e5beec373a25befa1d1f1c46c6d039834cab9..2086802f0158ba252282b4e8cd46b049533cc35b 100644 --- a/src/main/java/teetime/util/network/AbstractRecordTcpReader.java +++ b/src/main/java/teetime/util/network/AbstractRecordTcpReader.java @@ -1,25 +1,15 @@ package teetime.util.network; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.nio.BufferUnderflowException; import java.nio.ByteBuffer; -import java.nio.channels.ClosedByInterruptException; -import java.nio.channels.ServerSocketChannel; -import java.nio.channels.SocketChannel; import org.slf4j.Logger; import kieker.common.exception.RecordInstantiationException; -import kieker.common.logging.Log; -import kieker.common.logging.LogFactory; import kieker.common.record.AbstractMonitoringRecord; import kieker.common.record.IMonitoringRecord; import kieker.common.record.factory.CachedRecordFactoryCatalog; import kieker.common.record.factory.IRecordFactory; -import kieker.common.record.misc.RegistryRecord; import kieker.common.util.registry.ILookup; -import kieker.common.util.registry.Lookup; public abstract class AbstractRecordTcpReader extends AbstractTcpReader { @@ -28,15 +18,13 @@ public abstract class AbstractRecordTcpReader extends AbstractTcpReader { private final CachedRecordFactoryCatalog recordFactories = CachedRecordFactoryCatalog.getInstance(); // BETTER use a non thread-safe implementation to increase performance. A thread-safe version is not necessary. - private final ILookup<String> stringRegistry = new Lookup<String>(); - private int port2 = 10134; - private TCPStringReader tcpStringReader; + private final ILookup<String> stringRegistry; /** * Default constructor with <code>port=10133</code> and <code>bufferCapacity=65535</code> */ - public AbstractRecordTcpReader(final Logger logger) { - this(10133, 65535, logger); + public AbstractRecordTcpReader(final Logger logger, final ILookup<String> stringRegistry) { + this(10133, 65535, logger, stringRegistry); } /** @@ -46,17 +34,13 @@ public abstract class AbstractRecordTcpReader extends AbstractTcpReader { * @param bufferCapacity * capacity of the receiving buffer */ - public AbstractRecordTcpReader(final int port, final int bufferCapacity, final Logger logger) { + public AbstractRecordTcpReader(final int port, final int bufferCapacity, final Logger logger, final ILookup<String> stringRegistry) { super(port, bufferCapacity, logger); - } - - public void initialize() { - this.tcpStringReader = new TCPStringReader(this.port2, this.stringRegistry); - this.tcpStringReader.start(); + this.stringRegistry = stringRegistry; } @Override - protected final boolean read(final ByteBuffer buffer) { + protected final boolean onBufferReceived(final ByteBuffer buffer) { // identify record class if (buffer.remaining() < INT_BYTES) { return false; @@ -80,7 +64,7 @@ public abstract class AbstractRecordTcpReader extends AbstractTcpReader { final IMonitoringRecord record = recordFactory.create(buffer, this.stringRegistry); record.setLoggingTimestamp(loggingTimestamp); - send(record); + onRecordReceived(record); } catch (final RecordInstantiationException ex) { super.logger.error("Failed to create: " + recordClassName, ex); } @@ -88,91 +72,6 @@ public abstract class AbstractRecordTcpReader extends AbstractTcpReader { return true; } - protected abstract void send(IMonitoringRecord record); - - public void terminate() { - this.tcpStringReader.terminate(); - } - - /** - * - * @author Jan Waller - * - * @since 1.8 - */ - protected static class TCPStringReader extends Thread { - - private static final int MESSAGE_BUFFER_SIZE = 65535; - - private static final Log LOG = LogFactory.getLog(TCPStringReader.class); - - private final int port; - private final ILookup<String> stringRegistry; - private volatile boolean terminated = false; // NOPMD - private volatile Thread readerThread; - - public TCPStringReader(final int port, final ILookup<String> stringRegistry) { - this.port = port; - this.stringRegistry = stringRegistry; - } - - public void terminate() { - this.terminated = true; - this.readerThread.interrupt(); - } - - @Override - public void run() { - this.readerThread = Thread.currentThread(); - ServerSocketChannel serversocket = null; - try { - serversocket = ServerSocketChannel.open(); - serversocket.socket().bind(new InetSocketAddress(this.port)); - if (LOG.isDebugEnabled()) { - LOG.debug("Listening on port " + this.port); - } - // BEGIN also loop this one? - final SocketChannel socketChannel = serversocket.accept(); - final ByteBuffer buffer = ByteBuffer.allocateDirect(MESSAGE_BUFFER_SIZE); - while ((socketChannel.read(buffer) != -1) && (!this.terminated)) { - buffer.flip(); - try { - while (buffer.hasRemaining()) { - buffer.mark(); - RegistryRecord.registerRecordInRegistry(buffer, this.stringRegistry); - } - buffer.clear(); - } catch (final BufferUnderflowException ex) { - buffer.reset(); - buffer.compact(); - } - } - socketChannel.close(); - // END also loop this one? - } catch (final ClosedByInterruptException ex) { - LOG.warn("Reader interrupted", ex); - } catch (final IOException ex) { - LOG.error("Error while reading", ex); - } finally { - if (null != serversocket) { - try { - serversocket.close(); - } catch (final IOException e) { - if (LOG.isDebugEnabled()) { - LOG.debug("Failed to close TCP connection!", e); - } - } - } - } - } - } - - public int getPort2() { - return this.port2; - } - - public void setPort2(final int port2) { - this.port2 = port2; - } + protected abstract void onRecordReceived(IMonitoringRecord record); } diff --git a/src/main/java/teetime/util/network/AbstractTcpReader.java b/src/main/java/teetime/util/network/AbstractTcpReader.java index eaa50b521147a258e5f1a60711aff2a881a45802..11db64b871c0caa59d887e2410be285f9e8bd617 100644 --- a/src/main/java/teetime/util/network/AbstractTcpReader.java +++ b/src/main/java/teetime/util/network/AbstractTcpReader.java @@ -9,13 +9,22 @@ import java.nio.channels.SocketChannel; import org.slf4j.Logger; -public abstract class AbstractTcpReader { +/** + * + * @author Christian Wulf + * @deprecated use AbstractTcpReader from teetime instead + * + */ +@Deprecated +public abstract class AbstractTcpReader implements Runnable { private final int port; private final int bufferCapacity; @SuppressWarnings("PMD.LoggerIsNotStaticFinal") protected final Logger logger; + private volatile boolean terminated; + public AbstractTcpReader(final int port, final int bufferCapacity, final Logger logger) { super(); this.port = port; @@ -23,7 +32,8 @@ public abstract class AbstractTcpReader { this.logger = logger; } - public final void execute() { + @Override + public final void run() { ServerSocketChannel serversocket = null; try { serversocket = ServerSocketChannel.open(); @@ -35,7 +45,7 @@ public abstract class AbstractTcpReader { final SocketChannel socketChannel = serversocket.accept(); try { final ByteBuffer buffer = ByteBuffer.allocateDirect(bufferCapacity); - while (socketChannel.read(buffer) != -1) { + while (socketChannel.read(buffer) != -1 && !terminated) { process(buffer); } } finally { @@ -59,7 +69,7 @@ public abstract class AbstractTcpReader { try { while (buffer.hasRemaining()) { buffer.mark(); - boolean success = this.read(buffer); + boolean success = this.onBufferReceived(buffer); if (!success) { buffer.reset(); buffer.compact(); @@ -81,7 +91,11 @@ public abstract class AbstractTcpReader { * <li><code>true</code> when there were enough bytes to perform the read operation * <li><code>false</code> otherwise. In this case, the buffer is reset, compacted, and filled with new content. */ - protected abstract boolean read(final ByteBuffer buffer); + protected abstract boolean onBufferReceived(final ByteBuffer buffer); + + public void terminate() { + this.terminated = true; + } public int getPort() { return port;