diff --git a/src/explorviz/hpc_monitoring/reader/TCPReaderThread.java b/src/explorviz/hpc_monitoring/reader/TCPReaderThread.java index 975cd920463e3649927e680c0041c8414af8d327..164c0006173ee9577b1fb567db9c21076dfc8663 100644 --- a/src/explorviz/hpc_monitoring/reader/TCPReaderThread.java +++ b/src/explorviz/hpc_monitoring/reader/TCPReaderThread.java @@ -1,7 +1,6 @@ package explorviz.hpc_monitoring.reader; import java.io.IOException; -import java.nio.BufferUnderflowException; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; import java.util.ArrayList; @@ -68,50 +67,79 @@ public class TCPReaderThread extends Thread implements IPeriodicTimeSignalReceiv private final void messagesfromByteArray(final ByteBuffer buffer) { while (buffer.remaining() > 0) { - buffer.mark(); - try { - final byte clazzId = buffer.get(); - - switch (clazzId) { - case HostApplicationMetaData.CLAZZ_ID: { + final byte clazzId = buffer.get(); + switch (clazzId) { + case HostApplicationMetaData.CLAZZ_ID: { + if (buffer.remaining() >= (HostApplicationMetaData.BYTE_LENGTH_WITH_CLAZZ_ID - 1)) { readInTraceMetadata(buffer); - break; + } else { + buffer.position(buffer.position() - 1); + buffer.compact(); + return; } - case BeforeOperationEvent.CLAZZ_ID: { + break; + } + case BeforeOperationEvent.CLAZZ_ID: { + if (buffer.remaining() >= (BeforeOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID - 1)) { readInBeforeOperationEvent(buffer); - break; + } else { + buffer.position(buffer.position() - 1); + buffer.compact(); + return; } - case AfterFailedOperationEvent.CLAZZ_ID: { + break; + } + case AfterFailedOperationEvent.CLAZZ_ID: { + if (buffer.remaining() >= (AfterFailedOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID - 1)) { readInAfterFailedOperationEvent(buffer); - break; + } else { + buffer.position(buffer.position() - 1); + buffer.compact(); + return; } - case AfterOperationEvent.CLAZZ_ID: { + break; + } + case AfterOperationEvent.CLAZZ_ID: { + if (buffer.remaining() >= (AfterOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID - 1)) { readInAfterOperationEvent(buffer); - break; + } else { + buffer.position(buffer.position() - 1); + buffer.compact(); + return; + } + break; + } + case StringRegistryRecord.CLAZZ_ID: { + int mapId = 0; + int stringLength = 0; + if (buffer.remaining() >= 8) { + mapId = buffer.getInt(); + stringLength = buffer.getInt(); + } else { + buffer.position(buffer.position() - 1); + buffer.compact(); + return; } - case StringRegistryRecord.CLAZZ_ID: { - final int mapId = buffer.getInt(); - final int stringLength = buffer.getInt(); + if (buffer.remaining() >= stringLength) { final byte[] stringByteArray = new byte[stringLength]; buffer.get(stringByteArray); addToRegistry(mapId, new String(stringByteArray)); - break; - } - default: { - System.out.println("unknown class id " + clazzId + " at offset " - + (buffer.position() - 4)); - buffer.reset(); + } else { + buffer.position(buffer.position() - 9); buffer.compact(); return; } + break; + } + default: { + System.out.println("unknown class id " + clazzId + " at offset " + + (buffer.position() - 4)); + buffer.clear(); + return; } - } catch (final BufferUnderflowException e) { - buffer.reset(); - buffer.compact(); - return; } } @@ -230,12 +258,12 @@ public class TCPReaderThread extends Thread implements IPeriodicTimeSignalReceiv private final void putInRingBuffer(final IRecord message) { counter.inputObjects(message); - synchronized (this) { // TODO remove - outputBuffer[outputBufferIndex++] = message; - if (outputBufferIndex == OUTPUT_MESSAGE_BUFFER_SIZE) { - flushOutputBuffer(); - } - } + // synchronized (this) { // TODO remove + // outputBuffer[outputBufferIndex++] = message; + // if (outputBufferIndex == OUTPUT_MESSAGE_BUFFER_SIZE) { + // flushOutputBuffer(); + // } + // } } private void flushOutputBuffer() {