diff --git a/Start Master (testing only).launch b/Start Master (testing only).launch index 803bd3b4486e8ea4853f474e0ca60b1012026170..92ef4480469f95b69dddcabcc9929f542185459e 100644 --- a/Start Master (testing only).launch +++ b/Start Master (testing only).launch @@ -11,5 +11,5 @@ </listAttribute> <stringAttribute key="org.eclipse.jdt.launching.MAIN_TYPE" value="explorviz.live_trace_processing.main.WorkerStarter"/> <stringAttribute key="org.eclipse.jdt.launching.PROJECT_ATTR" value="worker"/> -<stringAttribute key="org.eclipse.jdt.launching.VM_ARGUMENTS" value="-Xmx4G -Dexplorviz.live_trace_processing.worker_enabled=false -Dexplorviz.live_trace_processing.reader_listening_port=10134"/> +<stringAttribute key="org.eclipse.jdt.launching.VM_ARGUMENTS" value="-Xmx4G -Dexplorviz.live_trace_processing.worker_enabled=false -Dexplorviz.live_trace_processing.reader_listening_port=10133"/> </launchConfiguration> diff --git a/src/explorviz/live_trace_processing/connector/TCPConnector.java b/src/explorviz/live_trace_processing/connector/TCPConnector.java index 13bbca69ba33fe55346fb4b82bf22f6df9fadbcf..9b07a18700e87088dd5f5aca3bef856994b430bf 100644 --- a/src/explorviz/live_trace_processing/connector/TCPConnector.java +++ b/src/explorviz/live_trace_processing/connector/TCPConnector.java @@ -16,10 +16,12 @@ import explorviz.live_trace_processing.record.ISerializableRecord; import explorviz.live_trace_processing.record.misc.StringRegistryRecord; import explorviz.live_trace_processing.record.misc.TerminateRecord; import explorviz.live_trace_processing.record.misc.TimedPeriodRecord; +import explorviz.live_trace_processing.writer.IRecordSender; import explorviz.live_trace_processing.writer.IStringRecordSender; import explorviz.live_trace_processing.writer.IWriter; -public class TCPConnector extends AbstractSink implements IWriter, IStringRecordSender { +public class TCPConnector extends AbstractSink implements IWriter, IStringRecordSender, + IRecordSender { private URL providerURL; private SocketChannel socketChannel; @@ -66,7 +68,7 @@ public class TCPConnector extends AbstractSink implements IWriter, IStringRecord @Override public void sendOutStringRecord(final StringRegistryRecord record) { final ByteBuffer stringBuffer = ByteBuffer.allocateDirect(record.getRecordSizeInBytes()); - record.putIntoByteBuffer(stringBuffer, stringRegistry); + record.putIntoByteBuffer(stringBuffer, stringRegistry, this); send(stringBuffer); } @@ -79,7 +81,7 @@ public class TCPConnector extends AbstractSink implements IWriter, IStringRecord System.out.println("required: " + serializableRecord.getRecordSizeInBytes() + " and capacity is " + buffer.capacity()); } - serializableRecord.putIntoByteBuffer(buffer, stringRegistry); + serializableRecord.putIntoByteBuffer(buffer, stringRegistry, this); } else if (record instanceof TimedPeriodRecord) { if (buffer.hasRemaining()) { send(buffer); @@ -89,7 +91,8 @@ public class TCPConnector extends AbstractSink implements IWriter, IStringRecord } } - private void send(final ByteBuffer buffer) { + @Override + public void send(final ByteBuffer buffer) { while ((socketChannel == null) || (!socketChannel.isConnected())) { try { Thread.sleep(1); diff --git a/src/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionBuffer.java b/src/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionBuffer.java index 094dc86976846579c2c5d2cee8a0fe92c508a386..95d1bab1ae57a1a6d2fd4c45938af47d1b9f53fc 100644 --- a/src/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionBuffer.java +++ b/src/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionBuffer.java @@ -99,6 +99,7 @@ class TraceReconstructionBuffer { } } } - return new Trace(new ArrayList<AbstractEventRecord>(events), valid, containsRemoteRecord, 1); + return new Trace(new ArrayList<AbstractEventRecord>(events), valid, containsRemoteRecord, + 1, events.size()); } } diff --git a/src/explorviz/live_trace_processing/reader/TCPReaderOneClient.java b/src/explorviz/live_trace_processing/reader/TCPReaderOneClient.java index e4624cd59ed0ba7ee7cd7fc7fc36dac5fa2869ad..b5c34d16eab235092edbe0d7437969b2b6ab8a8d 100644 --- a/src/explorviz/live_trace_processing/reader/TCPReaderOneClient.java +++ b/src/explorviz/live_trace_processing/reader/TCPReaderOneClient.java @@ -62,6 +62,8 @@ class TCPReaderOneClient extends Thread { private final boolean isWorker; + private Trace currentlyOpenTrace; + public TCPReaderOneClient(final SocketChannel socketChannel, final boolean isWorker, final RingBuffer<RecordArrayEvent> ringBuffer) { this.socketChannel = socketChannel; @@ -104,253 +106,283 @@ class TCPReaderOneClient extends Thread { } private final void messagesfromByteArray(final ByteBuffer buffer) { - while (buffer.remaining() > 0) { - final byte clazzId = buffer.get(); - switch (clazzId) { - case HostApplicationMetaDataRecord.CLAZZ_ID: { - if (buffer.remaining() >= HostApplicationMetaDataRecord.BYTE_LENGTH) { - readInHostApplicationMetaData(buffer); - break; - } - buffer.position(buffer.position() - 1); - buffer.compact(); - return; - } - case BeforeOperationEventRecord.CLAZZ_ID: { - if (buffer.remaining() >= BeforeOperationEventRecord.COMPRESSED_BYTE_LENGTH) { - readInBeforeOperationEvent(buffer); - break; + boolean shouldProceed = true; + if (currentlyOpenTrace != null) { + shouldProceed = readInTraceRecordChunks(buffer); + } + + if (shouldProceed) { + while (buffer.remaining() > 0) { + final byte clazzId = buffer.get(); + switch (clazzId) { + case HostApplicationMetaDataRecord.CLAZZ_ID: { + if (buffer.remaining() >= HostApplicationMetaDataRecord.BYTE_LENGTH) { + readInHostApplicationMetaData(buffer); + break; + } + buffer.position(buffer.position() - 1); + buffer.compact(); + return; } - buffer.position(buffer.position() - 1); - buffer.compact(); - return; - } - case AfterFailedOperationEventRecord.CLAZZ_ID: { - if (buffer.remaining() >= AfterFailedOperationEventRecord.COMPRESSED_BYTE_LENGTH) { - readInAfterFailedOperationEvent(buffer); - break; + case BeforeOperationEventRecord.CLAZZ_ID: { + if (buffer.remaining() >= BeforeOperationEventRecord.COMPRESSED_BYTE_LENGTH) { + readInBeforeOperationEvent(buffer); + break; + } + buffer.position(buffer.position() - 1); + buffer.compact(); + return; } - buffer.position(buffer.position() - 1); - buffer.compact(); - return; + case AfterFailedOperationEventRecord.CLAZZ_ID: { + if (buffer.remaining() >= AfterFailedOperationEventRecord.COMPRESSED_BYTE_LENGTH) { + readInAfterFailedOperationEvent(buffer); + break; + } + buffer.position(buffer.position() - 1); + buffer.compact(); + return; - } - case AfterOperationEventRecord.CLAZZ_ID: { - if (buffer.remaining() >= AfterOperationEventRecord.COMPRESSED_BYTE_LENGTH) { - readInAfterOperationEvent(buffer); - break; } - buffer.position(buffer.position() - 1); - buffer.compact(); - return; - } - case StringRegistryRecord.CLAZZ_ID: { - int mapId = 0; - int stringLength = 0; - if (buffer.remaining() >= 8) { - mapId = buffer.getInt(); - stringLength = buffer.getInt(); - } else { + case AfterOperationEventRecord.CLAZZ_ID: { + if (buffer.remaining() >= AfterOperationEventRecord.COMPRESSED_BYTE_LENGTH) { + readInAfterOperationEvent(buffer); + break; + } buffer.position(buffer.position() - 1); buffer.compact(); return; } + case StringRegistryRecord.CLAZZ_ID: { + int mapId = 0; + int stringLength = 0; + if (buffer.remaining() >= 8) { + mapId = buffer.getInt(); + stringLength = buffer.getInt(); + } else { + buffer.position(buffer.position() - 1); + buffer.compact(); + return; + } - if (buffer.remaining() >= stringLength) { - final byte[] stringByteArray = new byte[stringLength]; + if (buffer.remaining() >= stringLength) { + final byte[] stringByteArray = new byte[stringLength]; - buffer.get(stringByteArray); + buffer.get(stringByteArray); - stringRegistry.putStringRecord(mapId, new String(stringByteArray)); + stringRegistry.putStringRecord(mapId, new String(stringByteArray)); - checkWaitingMessages(); - } else { - buffer.position(buffer.position() - 9); - buffer.compact(); - return; + checkWaitingMessages(); + } else { + buffer.position(buffer.position() - 9); + buffer.compact(); + return; + } + break; } - break; - } - case SystemMonitoringRecord.CLAZZ_ID: { - if (isWorker) { + case SystemMonitoringRecord.CLAZZ_ID: { + // if (isWorker) { if (buffer.remaining() >= SystemMonitoringRecord.COMPRESSED_BYTE_LENGTH) { readInSystemMonitoringRecord(buffer); break; } - } else { - if (buffer.remaining() >= SystemMonitoringRecord.BYTE_LENGTH) { - try { - SystemMonitoringRecord.createFromByteBuffer(buffer, stringRegistry); - } catch (final IdNotAvailableException e) { - // should not happen - e.printStackTrace(); - } - break; - } + // } else { + // if (buffer.remaining() >= + // SystemMonitoringRecord.BYTE_LENGTH) { + // try { + // putInRingBuffer(SystemMonitoringRecord.createFromByteBuffer( + // buffer, stringRegistry)); + // } catch (final IdNotAvailableException e) { + // // should not happen + // e.printStackTrace(); + // } + // break; + // } + // } + buffer.position(buffer.position() - 1); + buffer.compact(); + return; } - buffer.position(buffer.position() - 1); - buffer.compact(); - return; - } - case Trace.CLAZZ_ID: { - if (buffer.remaining() >= Trace.BYTE_LENGTH) { - final byte validByte = buffer.get(); - boolean valid = true; - if (validByte == (byte) 0) { - valid = false; - } - final byte containsRemoteRecordByte = buffer.get(); - boolean containsRemoteRecord = true; - if (containsRemoteRecordByte == (byte) 0) { - containsRemoteRecord = false; - } - final int calledTimes = buffer.getInt(); - final int eventsLength = buffer.getInt(); - final int byteLength = buffer.getInt(); + case Trace.CLAZZ_ID: { + if (buffer.remaining() >= Trace.BYTE_LENGTH) { + final byte validByte = buffer.get(); + boolean valid = true; + if (validByte == (byte) 0) { + valid = false; + } + final byte containsRemoteRecordByte = buffer.get(); + boolean containsRemoteRecord = true; + if (containsRemoteRecordByte == (byte) 0) { + containsRemoteRecord = false; + } + final int calledTimes = buffer.getInt(); + final int eventsLength = buffer.getInt(); - if ((buffer.remaining() >= byteLength) && (eventsLength > 0)) { final List<AbstractEventRecord> events = new ArrayList<AbstractEventRecord>( eventsLength); - for (int i = 0; i < eventsLength; i++) { - try { - events.add(AbstractEventRecord.createFromByteBuffer(buffer, - stringRegistry)); - } catch (final IdNotAvailableException e) { - // should not happen - e.printStackTrace(); - } - } + currentlyOpenTrace = new Trace(events, valid, containsRemoteRecord, + calledTimes, eventsLength); - putInRingBuffer(new Trace(events, valid, containsRemoteRecord, - calledTimes)); + if (!readInTraceRecordChunks(buffer)) { + return; + } break; } - buffer.position(buffer.position() - 15); + buffer.position(buffer.position() - 1); buffer.compact(); return; } - buffer.position(buffer.position() - 1); - buffer.compact(); - return; - } - case BeforeConstructorEventRecord.CLAZZ_ID: { - if (buffer.remaining() >= BeforeConstructorEventRecord.COMPRESSED_BYTE_LENGTH) { - readInBeforeConstructorEvent(buffer); - break; + case BeforeConstructorEventRecord.CLAZZ_ID: { + if (buffer.remaining() >= BeforeConstructorEventRecord.COMPRESSED_BYTE_LENGTH) { + readInBeforeConstructorEvent(buffer); + break; + } + buffer.position(buffer.position() - 1); + buffer.compact(); + return; } - buffer.position(buffer.position() - 1); - buffer.compact(); - return; - } - case AfterFailedConstructorEventRecord.CLAZZ_ID: { - if (buffer.remaining() >= AfterFailedConstructorEventRecord.COMPRESSED_BYTE_LENGTH) { - readInAfterFailedConstructorEvent(buffer); - break; + case AfterFailedConstructorEventRecord.CLAZZ_ID: { + if (buffer.remaining() >= AfterFailedConstructorEventRecord.COMPRESSED_BYTE_LENGTH) { + readInAfterFailedConstructorEvent(buffer); + break; + } + buffer.position(buffer.position() - 1); + buffer.compact(); + return; } - buffer.position(buffer.position() - 1); - buffer.compact(); - return; - } - case AfterConstructorEventRecord.CLAZZ_ID: { - if (buffer.remaining() >= AfterConstructorEventRecord.COMPRESSED_BYTE_LENGTH) { - readInAfterConstructorEvent(buffer); - break; + case AfterConstructorEventRecord.CLAZZ_ID: { + if (buffer.remaining() >= AfterConstructorEventRecord.COMPRESSED_BYTE_LENGTH) { + readInAfterConstructorEvent(buffer); + break; + } + buffer.position(buffer.position() - 1); + buffer.compact(); + return; } - buffer.position(buffer.position() - 1); - buffer.compact(); - return; - } - case BeforeReceivedRemoteCallRecord.CLAZZ_ID: { - if (buffer.remaining() >= BeforeReceivedRemoteCallRecord.COMPRESSED_BYTE_LENGTH) { - readInBeforeReceivedRemoteCallEvent(buffer); - break; + case BeforeReceivedRemoteCallRecord.CLAZZ_ID: { + if (buffer.remaining() >= BeforeReceivedRemoteCallRecord.COMPRESSED_BYTE_LENGTH) { + readInBeforeReceivedRemoteCallEvent(buffer); + break; + } + buffer.position(buffer.position() - 1); + buffer.compact(); + return; } - buffer.position(buffer.position() - 1); - buffer.compact(); - return; - } - case BeforeStaticOperationEventRecord.CLAZZ_ID: { - if (buffer.remaining() >= BeforeStaticOperationEventRecord.COMPRESSED_BYTE_LENGTH) { - readInBeforeStaticOperationEvent(buffer); - break; + case BeforeStaticOperationEventRecord.CLAZZ_ID: { + if (buffer.remaining() >= BeforeStaticOperationEventRecord.COMPRESSED_BYTE_LENGTH) { + readInBeforeStaticOperationEvent(buffer); + break; + } + buffer.position(buffer.position() - 1); + buffer.compact(); + return; } - buffer.position(buffer.position() - 1); - buffer.compact(); - return; - } - case AfterFailedStaticOperationEventRecord.CLAZZ_ID: { - if (buffer.remaining() >= AfterFailedStaticOperationEventRecord.COMPRESSED_BYTE_LENGTH) { - readInAfterFailedStaticOperationEvent(buffer); - break; + case AfterFailedStaticOperationEventRecord.CLAZZ_ID: { + if (buffer.remaining() >= AfterFailedStaticOperationEventRecord.COMPRESSED_BYTE_LENGTH) { + readInAfterFailedStaticOperationEvent(buffer); + break; + } + buffer.position(buffer.position() - 1); + buffer.compact(); + return; } - buffer.position(buffer.position() - 1); - buffer.compact(); - return; - } - case AfterStaticOperationEventRecord.CLAZZ_ID: { - if (buffer.remaining() >= AfterStaticOperationEventRecord.COMPRESSED_BYTE_LENGTH) { - readInAfterStaticOperationEvent(buffer); - break; + case AfterStaticOperationEventRecord.CLAZZ_ID: { + if (buffer.remaining() >= AfterStaticOperationEventRecord.COMPRESSED_BYTE_LENGTH) { + readInAfterStaticOperationEvent(buffer); + break; + } + buffer.position(buffer.position() - 1); + buffer.compact(); + return; } - buffer.position(buffer.position() - 1); - buffer.compact(); - return; - } - case BeforeSentRemoteCallRecord.CLAZZ_ID: { - if (buffer.remaining() >= BeforeSentRemoteCallRecord.COMPRESSED_BYTE_LENGTH) { - readInBeforeSentRemoteCallEvent(buffer); - break; + case BeforeSentRemoteCallRecord.CLAZZ_ID: { + if (buffer.remaining() >= BeforeSentRemoteCallRecord.COMPRESSED_BYTE_LENGTH) { + readInBeforeSentRemoteCallEvent(buffer); + break; + } + buffer.position(buffer.position() - 1); + buffer.compact(); + return; } - buffer.position(buffer.position() - 1); - buffer.compact(); - return; - } - case AfterSentRemoteCallRecord.CLAZZ_ID: { - if (buffer.remaining() >= AfterSentRemoteCallRecord.COMPRESSED_BYTE_LENGTH) { - readInAfterSentRemoteCallEvent(buffer); - break; + case AfterSentRemoteCallRecord.CLAZZ_ID: { + if (buffer.remaining() >= AfterSentRemoteCallRecord.COMPRESSED_BYTE_LENGTH) { + readInAfterSentRemoteCallEvent(buffer); + break; + } + buffer.position(buffer.position() - 1); + buffer.compact(); + return; } - buffer.position(buffer.position() - 1); - buffer.compact(); - return; - } - case BeforeUnknownReceivedRemoteCallRecord.CLAZZ_ID: { - if (buffer.remaining() >= BeforeUnknownReceivedRemoteCallRecord.COMPRESSED_BYTE_LENGTH) { - readInBeforeUnknownReceivedRemoteCallEvent(buffer); - break; + case BeforeUnknownReceivedRemoteCallRecord.CLAZZ_ID: { + if (buffer.remaining() >= BeforeUnknownReceivedRemoteCallRecord.COMPRESSED_BYTE_LENGTH) { + readInBeforeUnknownReceivedRemoteCallEvent(buffer); + break; + } + buffer.position(buffer.position() - 1); + buffer.compact(); + return; } - buffer.position(buffer.position() - 1); - buffer.compact(); - return; - } - case AfterUnknownReceivedRemoteCallRecord.CLAZZ_ID: { - if (buffer.remaining() >= AfterUnknownReceivedRemoteCallRecord.COMPRESSED_BYTE_LENGTH) { - readInAfterUnknownReceivedRemoteCallEvent(buffer); - break; + case AfterUnknownReceivedRemoteCallRecord.CLAZZ_ID: { + if (buffer.remaining() >= AfterUnknownReceivedRemoteCallRecord.COMPRESSED_BYTE_LENGTH) { + readInAfterUnknownReceivedRemoteCallEvent(buffer); + break; + } + buffer.position(buffer.position() - 1); + buffer.compact(); + return; + } + case AfterReceivedRemoteCallRecord.CLAZZ_ID: { + if (buffer.remaining() >= AfterReceivedRemoteCallRecord.COMPRESSED_BYTE_LENGTH) { + readInAfterReceivedRemoteCallEvent(buffer); + break; + } + buffer.position(buffer.position() - 1); + buffer.compact(); + return; + } + default: { + System.out.println("unknown class id " + clazzId + " at offset " + + (buffer.position() - 1)); + buffer.clear(); + return; } - buffer.position(buffer.position() - 1); - buffer.compact(); - return; } - case AfterReceivedRemoteCallRecord.CLAZZ_ID: { - if (buffer.remaining() >= AfterReceivedRemoteCallRecord.COMPRESSED_BYTE_LENGTH) { - readInAfterReceivedRemoteCallEvent(buffer); - break; + } + + buffer.clear(); + } + } + + private boolean readInTraceRecordChunks(final ByteBuffer buffer) { + if (buffer.remaining() >= (4 + 4)) { + int recordAmountNowComming = buffer.getInt(); + final int bytesComming = buffer.getInt(); + + if (buffer.remaining() >= bytesComming) { + while (recordAmountNowComming > 0) { + try { + currentlyOpenTrace.getTraceEvents().add( + AbstractEventRecord.createFromByteBuffer(buffer, stringRegistry)); + } catch (final IdNotAvailableException e) { + // should not happen + e.printStackTrace(); } - buffer.position(buffer.position() - 1); - buffer.compact(); - return; + recordAmountNowComming--; } - default: { - System.out.println("unknown class id " + clazzId + " at offset " - + (buffer.position() - 1)); - buffer.clear(); - return; + + if (currentlyOpenTrace.getTraceEvents().size() == currentlyOpenTrace + .getEventsLength()) { + putInRingBuffer(currentlyOpenTrace); + currentlyOpenTrace = null; + return true; } + } else { + buffer.position(buffer.position() - 4 - 4); + buffer.compact(); } + } else { + buffer.compact(); } - - buffer.clear(); + return false; } private final void readInHostApplicationMetaData(final ByteBuffer buffer) {