diff --git a/src/explorviz/hpc_monitoring/filter/counting/CountingThroughputFilter.java b/src/explorviz/hpc_monitoring/filter/counting/CountingThroughputFilter.java index 899d55d9a98b680a5d02f134392ab33590a6233a..5d0c410e51cd1339bc32a8978d71136a6498b999 100644 --- a/src/explorviz/hpc_monitoring/filter/counting/CountingThroughputFilter.java +++ b/src/explorviz/hpc_monitoring/filter/counting/CountingThroughputFilter.java @@ -16,7 +16,8 @@ public final class CountingThroughputFilter { intervalSize = 1 * 1000 * 1000 * 1000; } - private void processEvent(final Object event, final long currentTime) { + private void processEvent(final Object event, final long currentTime, + final int increment) { final long startOfTimestampsInterval = computeFirstTimestampInInterval(currentTime); final long endOfTimestampsInterval = computeLastTimestampInInterval(currentTime); @@ -31,11 +32,15 @@ public final class CountingThroughputFilter { currentCountForCurrentInterval = 0; } - currentCountForCurrentInterval++; + currentCountForCurrentInterval += increment; } public final void inputObjects(final Object object) { - processEvent(object, System.currentTimeMillis() * 1000 * 1000); + processEvent(object, System.currentTimeMillis() * 1000 * 1000, 1); + } + + public final void inputObjectsCount(final int object) { + processEvent(object, System.currentTimeMillis() * 1000 * 1000, object); } private long computeFirstTimestampInInterval(final long timestamp) { diff --git a/src/explorviz/hpc_monitoring/reader/MessageDistributer.java b/src/explorviz/hpc_monitoring/reader/MessageDistributer.java index ebe0af3ed970c4719d9c19776da4882fe196325b..c2c0a985b0cda65556ce6a3bae3fbb7f26df2dde 100644 --- a/src/explorviz/hpc_monitoring/reader/MessageDistributer.java +++ b/src/explorviz/hpc_monitoring/reader/MessageDistributer.java @@ -30,7 +30,7 @@ public class MessageDistributer implements EventHandler<ByteArrayEvent> { private final List<byte[]> waitingForStringMessages = new ArrayList<byte[]>( 1024); - private byte[] unreadBytes = null; + private final byte[] unreadBytes = null; private final RingBuffer<RecordEvent> ringBuffer; @@ -50,24 +50,26 @@ public class MessageDistributer implements EventHandler<ByteArrayEvent> { @Override public void onEvent(final ByteArrayEvent event, final long sequence, final boolean endOfBatch) throws Exception { - final byte[] received = event.getValue(); - final int receivedLength = event.getLength(); - - byte[] messages = received; - int messagesLength = receivedLength; - - if (unreadBytes != null) { - final int unreadBytesLength = unreadBytes.length; - - messagesLength += unreadBytesLength; - messages = new byte[messagesLength]; - - System.arraycopy(unreadBytes, 0, messages, 0, unreadBytesLength); - System.arraycopy(received, 0, messages, unreadBytesLength, - receivedLength); - } - - unreadBytes = messagesfromByteArray(messages, messagesLength); + counter.inputObjectsCount(event.getLength() / 28); + + // final byte[] received = event.getValue(); + // final int receivedLength = event.getLength(); + // + // byte[] messages = received; + // int messagesLength = receivedLength; + // + // if (unreadBytes != null) { + // final int unreadBytesLength = unreadBytes.length; + // + // messagesLength += unreadBytesLength; + // messages = new byte[messagesLength]; + // + // System.arraycopy(unreadBytes, 0, messages, 0, unreadBytesLength); + // System.arraycopy(received, 0, messages, unreadBytesLength, + // receivedLength); + // } + // + // unreadBytes = messagesfromByteArray(messages, messagesLength); } private byte[] messagesfromByteArray(final byte[] b, final int readSize) { @@ -268,10 +270,10 @@ public class MessageDistributer implements EventHandler<ByteArrayEvent> { private void putInRingBuffer(final IRecord record) { counter.inputObjects(record); - final long hiseq = ringBuffer.next(); - final RecordEvent valueEvent = ringBuffer.get(hiseq); - valueEvent.setValue(record); - ringBuffer.publish(hiseq); + // final long hiseq = ringBuffer.next(); + // final RecordEvent valueEvent = ringBuffer.get(hiseq); + // valueEvent.setValue(record); + // ringBuffer.publish(hiseq); } public void addToRegistry(final int key, final String value) { diff --git a/src/explorviz/hpc_monitoring/reader/TCPReader.java b/src/explorviz/hpc_monitoring/reader/TCPReader.java index 970e4be16c01bd161ea047572d56e2b45be2303a..5540f9c131b3d428cee50199e4645561b0ccfa82 100644 --- a/src/explorviz/hpc_monitoring/reader/TCPReader.java +++ b/src/explorviz/hpc_monitoring/reader/TCPReader.java @@ -1,9 +1,10 @@ package explorviz.hpc_monitoring.reader; -import java.io.BufferedInputStream; import java.io.IOException; -import java.net.ServerSocket; -import java.net.Socket; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -12,20 +13,24 @@ import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.dsl.Disruptor; public final class TCPReader { - private static final int MESSAGE_BUFFER_SIZE = 65536; + private static final int MESSAGE_BUFFER_SIZE = 65536 * 2; private final int listeningPort; - private ServerSocket serversocket; + private ServerSocketChannel serversocket; private boolean active = true; private final RingBuffer<ByteArrayEvent> ringBuffer; + private final ByteBuffer buffer; + public TCPReader(final int listeningPort, final EventHandler<RecordEvent> endReceiver) throws IllegalArgumentException { this.listeningPort = listeningPort; + buffer = ByteBuffer.allocate(MESSAGE_BUFFER_SIZE); + final ExecutorService exec = Executors.newCachedThreadPool(); final Disruptor<ByteArrayEvent> disruptor = new Disruptor<ByteArrayEvent>( ByteArrayEvent.EVENT_FACTORY, 4096, exec); @@ -42,18 +47,17 @@ public final class TCPReader { open(); while (active) { // TODO only one connection! - final Socket socket = serversocket.accept(); - final BufferedInputStream bufferedInputStream = new BufferedInputStream( - socket.getInputStream(), MESSAGE_BUFFER_SIZE); + final SocketChannel socketChannel = serversocket.accept(); int readBytes = 0; - byte[] messages = new byte[MESSAGE_BUFFER_SIZE]; - while ((readBytes = bufferedInputStream.read(messages, 0, - MESSAGE_BUFFER_SIZE)) != -1) { - putInRingBuffer(messages, readBytes); - messages = new byte[MESSAGE_BUFFER_SIZE]; + while ((readBytes = socketChannel.read(buffer)) != -1) { + // final byte[] messages = new byte[MESSAGE_BUFFER_SIZE]; + // System.arraycopy(buffer.array(), 0, messages, 0, + // buffer.position()); + putInRingBuffer(null, readBytes); + buffer.clear(); } - socket.close(); + serversocket.close(); } } catch (final IOException ex) { System.out.println("Error in read() " + ex.toString()); @@ -67,7 +71,8 @@ public final class TCPReader { } private void open() throws IOException { - serversocket = new ServerSocket(listeningPort); + serversocket = ServerSocketChannel.open(); + serversocket.socket().bind(new InetSocketAddress(listeningPort)); System.out.println("listening on port " + listeningPort); }