Skip to content
Snippets Groups Projects
Commit 0f08efe0 authored by Florian Fittkau's avatar Florian Fittkau
Browse files

trace reading

parent b6f1f87f
No related branches found
No related tags found
No related merge requests found
......@@ -11,5 +11,5 @@
</listAttribute>
<stringAttribute key="org.eclipse.jdt.launching.MAIN_TYPE" value="explorviz.live_trace_processing.main.WorkerStarter"/>
<stringAttribute key="org.eclipse.jdt.launching.PROJECT_ATTR" value="worker"/>
<stringAttribute key="org.eclipse.jdt.launching.VM_ARGUMENTS" value="-Xmx4G -Dexplorviz.live_trace_processing.worker_enabled=false -Dexplorviz.live_trace_processing.reader_listening_port=10134"/>
<stringAttribute key="org.eclipse.jdt.launching.VM_ARGUMENTS" value="-Xmx4G -Dexplorviz.live_trace_processing.worker_enabled=false -Dexplorviz.live_trace_processing.reader_listening_port=10133"/>
</launchConfiguration>
......@@ -16,10 +16,12 @@ import explorviz.live_trace_processing.record.ISerializableRecord;
import explorviz.live_trace_processing.record.misc.StringRegistryRecord;
import explorviz.live_trace_processing.record.misc.TerminateRecord;
import explorviz.live_trace_processing.record.misc.TimedPeriodRecord;
import explorviz.live_trace_processing.writer.IRecordSender;
import explorviz.live_trace_processing.writer.IStringRecordSender;
import explorviz.live_trace_processing.writer.IWriter;
public class TCPConnector extends AbstractSink implements IWriter, IStringRecordSender {
public class TCPConnector extends AbstractSink implements IWriter, IStringRecordSender,
IRecordSender {
private URL providerURL;
private SocketChannel socketChannel;
......@@ -66,7 +68,7 @@ public class TCPConnector extends AbstractSink implements IWriter, IStringRecord
@Override
public void sendOutStringRecord(final StringRegistryRecord record) {
final ByteBuffer stringBuffer = ByteBuffer.allocateDirect(record.getRecordSizeInBytes());
record.putIntoByteBuffer(stringBuffer, stringRegistry);
record.putIntoByteBuffer(stringBuffer, stringRegistry, this);
send(stringBuffer);
}
......@@ -79,7 +81,7 @@ public class TCPConnector extends AbstractSink implements IWriter, IStringRecord
System.out.println("required: " + serializableRecord.getRecordSizeInBytes()
+ " and capacity is " + buffer.capacity());
}
serializableRecord.putIntoByteBuffer(buffer, stringRegistry);
serializableRecord.putIntoByteBuffer(buffer, stringRegistry, this);
} else if (record instanceof TimedPeriodRecord) {
if (buffer.hasRemaining()) {
send(buffer);
......@@ -89,7 +91,8 @@ public class TCPConnector extends AbstractSink implements IWriter, IStringRecord
}
}
private void send(final ByteBuffer buffer) {
@Override
public void send(final ByteBuffer buffer) {
while ((socketChannel == null) || (!socketChannel.isConnected())) {
try {
Thread.sleep(1);
......
......@@ -99,6 +99,7 @@ class TraceReconstructionBuffer {
}
}
}
return new Trace(new ArrayList<AbstractEventRecord>(events), valid, containsRemoteRecord, 1);
return new Trace(new ArrayList<AbstractEventRecord>(events), valid, containsRemoteRecord,
1, events.size());
}
}
......@@ -62,6 +62,8 @@ class TCPReaderOneClient extends Thread {
private final boolean isWorker;
private Trace currentlyOpenTrace;
public TCPReaderOneClient(final SocketChannel socketChannel, final boolean isWorker,
final RingBuffer<RecordArrayEvent> ringBuffer) {
this.socketChannel = socketChannel;
......@@ -104,253 +106,283 @@ class TCPReaderOneClient extends Thread {
}
private final void messagesfromByteArray(final ByteBuffer buffer) {
while (buffer.remaining() > 0) {
final byte clazzId = buffer.get();
switch (clazzId) {
case HostApplicationMetaDataRecord.CLAZZ_ID: {
if (buffer.remaining() >= HostApplicationMetaDataRecord.BYTE_LENGTH) {
readInHostApplicationMetaData(buffer);
break;
}
buffer.position(buffer.position() - 1);
buffer.compact();
return;
}
case BeforeOperationEventRecord.CLAZZ_ID: {
if (buffer.remaining() >= BeforeOperationEventRecord.COMPRESSED_BYTE_LENGTH) {
readInBeforeOperationEvent(buffer);
break;
boolean shouldProceed = true;
if (currentlyOpenTrace != null) {
shouldProceed = readInTraceRecordChunks(buffer);
}
if (shouldProceed) {
while (buffer.remaining() > 0) {
final byte clazzId = buffer.get();
switch (clazzId) {
case HostApplicationMetaDataRecord.CLAZZ_ID: {
if (buffer.remaining() >= HostApplicationMetaDataRecord.BYTE_LENGTH) {
readInHostApplicationMetaData(buffer);
break;
}
buffer.position(buffer.position() - 1);
buffer.compact();
return;
}
buffer.position(buffer.position() - 1);
buffer.compact();
return;
}
case AfterFailedOperationEventRecord.CLAZZ_ID: {
if (buffer.remaining() >= AfterFailedOperationEventRecord.COMPRESSED_BYTE_LENGTH) {
readInAfterFailedOperationEvent(buffer);
break;
case BeforeOperationEventRecord.CLAZZ_ID: {
if (buffer.remaining() >= BeforeOperationEventRecord.COMPRESSED_BYTE_LENGTH) {
readInBeforeOperationEvent(buffer);
break;
}
buffer.position(buffer.position() - 1);
buffer.compact();
return;
}
buffer.position(buffer.position() - 1);
buffer.compact();
return;
case AfterFailedOperationEventRecord.CLAZZ_ID: {
if (buffer.remaining() >= AfterFailedOperationEventRecord.COMPRESSED_BYTE_LENGTH) {
readInAfterFailedOperationEvent(buffer);
break;
}
buffer.position(buffer.position() - 1);
buffer.compact();
return;
}
case AfterOperationEventRecord.CLAZZ_ID: {
if (buffer.remaining() >= AfterOperationEventRecord.COMPRESSED_BYTE_LENGTH) {
readInAfterOperationEvent(buffer);
break;
}
buffer.position(buffer.position() - 1);
buffer.compact();
return;
}
case StringRegistryRecord.CLAZZ_ID: {
int mapId = 0;
int stringLength = 0;
if (buffer.remaining() >= 8) {
mapId = buffer.getInt();
stringLength = buffer.getInt();
} else {
case AfterOperationEventRecord.CLAZZ_ID: {
if (buffer.remaining() >= AfterOperationEventRecord.COMPRESSED_BYTE_LENGTH) {
readInAfterOperationEvent(buffer);
break;
}
buffer.position(buffer.position() - 1);
buffer.compact();
return;
}
case StringRegistryRecord.CLAZZ_ID: {
int mapId = 0;
int stringLength = 0;
if (buffer.remaining() >= 8) {
mapId = buffer.getInt();
stringLength = buffer.getInt();
} else {
buffer.position(buffer.position() - 1);
buffer.compact();
return;
}
if (buffer.remaining() >= stringLength) {
final byte[] stringByteArray = new byte[stringLength];
if (buffer.remaining() >= stringLength) {
final byte[] stringByteArray = new byte[stringLength];
buffer.get(stringByteArray);
buffer.get(stringByteArray);
stringRegistry.putStringRecord(mapId, new String(stringByteArray));
stringRegistry.putStringRecord(mapId, new String(stringByteArray));
checkWaitingMessages();
} else {
buffer.position(buffer.position() - 9);
buffer.compact();
return;
checkWaitingMessages();
} else {
buffer.position(buffer.position() - 9);
buffer.compact();
return;
}
break;
}
break;
}
case SystemMonitoringRecord.CLAZZ_ID: {
if (isWorker) {
case SystemMonitoringRecord.CLAZZ_ID: {
// if (isWorker) {
if (buffer.remaining() >= SystemMonitoringRecord.COMPRESSED_BYTE_LENGTH) {
readInSystemMonitoringRecord(buffer);
break;
}
} else {
if (buffer.remaining() >= SystemMonitoringRecord.BYTE_LENGTH) {
try {
SystemMonitoringRecord.createFromByteBuffer(buffer, stringRegistry);
} catch (final IdNotAvailableException e) {
// should not happen
e.printStackTrace();
}
break;
}
// } else {
// if (buffer.remaining() >=
// SystemMonitoringRecord.BYTE_LENGTH) {
// try {
// putInRingBuffer(SystemMonitoringRecord.createFromByteBuffer(
// buffer, stringRegistry));
// } catch (final IdNotAvailableException e) {
// // should not happen
// e.printStackTrace();
// }
// break;
// }
// }
buffer.position(buffer.position() - 1);
buffer.compact();
return;
}
buffer.position(buffer.position() - 1);
buffer.compact();
return;
}
case Trace.CLAZZ_ID: {
if (buffer.remaining() >= Trace.BYTE_LENGTH) {
final byte validByte = buffer.get();
boolean valid = true;
if (validByte == (byte) 0) {
valid = false;
}
final byte containsRemoteRecordByte = buffer.get();
boolean containsRemoteRecord = true;
if (containsRemoteRecordByte == (byte) 0) {
containsRemoteRecord = false;
}
final int calledTimes = buffer.getInt();
final int eventsLength = buffer.getInt();
final int byteLength = buffer.getInt();
case Trace.CLAZZ_ID: {
if (buffer.remaining() >= Trace.BYTE_LENGTH) {
final byte validByte = buffer.get();
boolean valid = true;
if (validByte == (byte) 0) {
valid = false;
}
final byte containsRemoteRecordByte = buffer.get();
boolean containsRemoteRecord = true;
if (containsRemoteRecordByte == (byte) 0) {
containsRemoteRecord = false;
}
final int calledTimes = buffer.getInt();
final int eventsLength = buffer.getInt();
if ((buffer.remaining() >= byteLength) && (eventsLength > 0)) {
final List<AbstractEventRecord> events = new ArrayList<AbstractEventRecord>(
eventsLength);
for (int i = 0; i < eventsLength; i++) {
try {
events.add(AbstractEventRecord.createFromByteBuffer(buffer,
stringRegistry));
} catch (final IdNotAvailableException e) {
// should not happen
e.printStackTrace();
}
}
currentlyOpenTrace = new Trace(events, valid, containsRemoteRecord,
calledTimes, eventsLength);
putInRingBuffer(new Trace(events, valid, containsRemoteRecord,
calledTimes));
if (!readInTraceRecordChunks(buffer)) {
return;
}
break;
}
buffer.position(buffer.position() - 15);
buffer.position(buffer.position() - 1);
buffer.compact();
return;
}
buffer.position(buffer.position() - 1);
buffer.compact();
return;
}
case BeforeConstructorEventRecord.CLAZZ_ID: {
if (buffer.remaining() >= BeforeConstructorEventRecord.COMPRESSED_BYTE_LENGTH) {
readInBeforeConstructorEvent(buffer);
break;
case BeforeConstructorEventRecord.CLAZZ_ID: {
if (buffer.remaining() >= BeforeConstructorEventRecord.COMPRESSED_BYTE_LENGTH) {
readInBeforeConstructorEvent(buffer);
break;
}
buffer.position(buffer.position() - 1);
buffer.compact();
return;
}
buffer.position(buffer.position() - 1);
buffer.compact();
return;
}
case AfterFailedConstructorEventRecord.CLAZZ_ID: {
if (buffer.remaining() >= AfterFailedConstructorEventRecord.COMPRESSED_BYTE_LENGTH) {
readInAfterFailedConstructorEvent(buffer);
break;
case AfterFailedConstructorEventRecord.CLAZZ_ID: {
if (buffer.remaining() >= AfterFailedConstructorEventRecord.COMPRESSED_BYTE_LENGTH) {
readInAfterFailedConstructorEvent(buffer);
break;
}
buffer.position(buffer.position() - 1);
buffer.compact();
return;
}
buffer.position(buffer.position() - 1);
buffer.compact();
return;
}
case AfterConstructorEventRecord.CLAZZ_ID: {
if (buffer.remaining() >= AfterConstructorEventRecord.COMPRESSED_BYTE_LENGTH) {
readInAfterConstructorEvent(buffer);
break;
case AfterConstructorEventRecord.CLAZZ_ID: {
if (buffer.remaining() >= AfterConstructorEventRecord.COMPRESSED_BYTE_LENGTH) {
readInAfterConstructorEvent(buffer);
break;
}
buffer.position(buffer.position() - 1);
buffer.compact();
return;
}
buffer.position(buffer.position() - 1);
buffer.compact();
return;
}
case BeforeReceivedRemoteCallRecord.CLAZZ_ID: {
if (buffer.remaining() >= BeforeReceivedRemoteCallRecord.COMPRESSED_BYTE_LENGTH) {
readInBeforeReceivedRemoteCallEvent(buffer);
break;
case BeforeReceivedRemoteCallRecord.CLAZZ_ID: {
if (buffer.remaining() >= BeforeReceivedRemoteCallRecord.COMPRESSED_BYTE_LENGTH) {
readInBeforeReceivedRemoteCallEvent(buffer);
break;
}
buffer.position(buffer.position() - 1);
buffer.compact();
return;
}
buffer.position(buffer.position() - 1);
buffer.compact();
return;
}
case BeforeStaticOperationEventRecord.CLAZZ_ID: {
if (buffer.remaining() >= BeforeStaticOperationEventRecord.COMPRESSED_BYTE_LENGTH) {
readInBeforeStaticOperationEvent(buffer);
break;
case BeforeStaticOperationEventRecord.CLAZZ_ID: {
if (buffer.remaining() >= BeforeStaticOperationEventRecord.COMPRESSED_BYTE_LENGTH) {
readInBeforeStaticOperationEvent(buffer);
break;
}
buffer.position(buffer.position() - 1);
buffer.compact();
return;
}
buffer.position(buffer.position() - 1);
buffer.compact();
return;
}
case AfterFailedStaticOperationEventRecord.CLAZZ_ID: {
if (buffer.remaining() >= AfterFailedStaticOperationEventRecord.COMPRESSED_BYTE_LENGTH) {
readInAfterFailedStaticOperationEvent(buffer);
break;
case AfterFailedStaticOperationEventRecord.CLAZZ_ID: {
if (buffer.remaining() >= AfterFailedStaticOperationEventRecord.COMPRESSED_BYTE_LENGTH) {
readInAfterFailedStaticOperationEvent(buffer);
break;
}
buffer.position(buffer.position() - 1);
buffer.compact();
return;
}
buffer.position(buffer.position() - 1);
buffer.compact();
return;
}
case AfterStaticOperationEventRecord.CLAZZ_ID: {
if (buffer.remaining() >= AfterStaticOperationEventRecord.COMPRESSED_BYTE_LENGTH) {
readInAfterStaticOperationEvent(buffer);
break;
case AfterStaticOperationEventRecord.CLAZZ_ID: {
if (buffer.remaining() >= AfterStaticOperationEventRecord.COMPRESSED_BYTE_LENGTH) {
readInAfterStaticOperationEvent(buffer);
break;
}
buffer.position(buffer.position() - 1);
buffer.compact();
return;
}
buffer.position(buffer.position() - 1);
buffer.compact();
return;
}
case BeforeSentRemoteCallRecord.CLAZZ_ID: {
if (buffer.remaining() >= BeforeSentRemoteCallRecord.COMPRESSED_BYTE_LENGTH) {
readInBeforeSentRemoteCallEvent(buffer);
break;
case BeforeSentRemoteCallRecord.CLAZZ_ID: {
if (buffer.remaining() >= BeforeSentRemoteCallRecord.COMPRESSED_BYTE_LENGTH) {
readInBeforeSentRemoteCallEvent(buffer);
break;
}
buffer.position(buffer.position() - 1);
buffer.compact();
return;
}
buffer.position(buffer.position() - 1);
buffer.compact();
return;
}
case AfterSentRemoteCallRecord.CLAZZ_ID: {
if (buffer.remaining() >= AfterSentRemoteCallRecord.COMPRESSED_BYTE_LENGTH) {
readInAfterSentRemoteCallEvent(buffer);
break;
case AfterSentRemoteCallRecord.CLAZZ_ID: {
if (buffer.remaining() >= AfterSentRemoteCallRecord.COMPRESSED_BYTE_LENGTH) {
readInAfterSentRemoteCallEvent(buffer);
break;
}
buffer.position(buffer.position() - 1);
buffer.compact();
return;
}
buffer.position(buffer.position() - 1);
buffer.compact();
return;
}
case BeforeUnknownReceivedRemoteCallRecord.CLAZZ_ID: {
if (buffer.remaining() >= BeforeUnknownReceivedRemoteCallRecord.COMPRESSED_BYTE_LENGTH) {
readInBeforeUnknownReceivedRemoteCallEvent(buffer);
break;
case BeforeUnknownReceivedRemoteCallRecord.CLAZZ_ID: {
if (buffer.remaining() >= BeforeUnknownReceivedRemoteCallRecord.COMPRESSED_BYTE_LENGTH) {
readInBeforeUnknownReceivedRemoteCallEvent(buffer);
break;
}
buffer.position(buffer.position() - 1);
buffer.compact();
return;
}
buffer.position(buffer.position() - 1);
buffer.compact();
return;
}
case AfterUnknownReceivedRemoteCallRecord.CLAZZ_ID: {
if (buffer.remaining() >= AfterUnknownReceivedRemoteCallRecord.COMPRESSED_BYTE_LENGTH) {
readInAfterUnknownReceivedRemoteCallEvent(buffer);
break;
case AfterUnknownReceivedRemoteCallRecord.CLAZZ_ID: {
if (buffer.remaining() >= AfterUnknownReceivedRemoteCallRecord.COMPRESSED_BYTE_LENGTH) {
readInAfterUnknownReceivedRemoteCallEvent(buffer);
break;
}
buffer.position(buffer.position() - 1);
buffer.compact();
return;
}
case AfterReceivedRemoteCallRecord.CLAZZ_ID: {
if (buffer.remaining() >= AfterReceivedRemoteCallRecord.COMPRESSED_BYTE_LENGTH) {
readInAfterReceivedRemoteCallEvent(buffer);
break;
}
buffer.position(buffer.position() - 1);
buffer.compact();
return;
}
default: {
System.out.println("unknown class id " + clazzId + " at offset "
+ (buffer.position() - 1));
buffer.clear();
return;
}
buffer.position(buffer.position() - 1);
buffer.compact();
return;
}
case AfterReceivedRemoteCallRecord.CLAZZ_ID: {
if (buffer.remaining() >= AfterReceivedRemoteCallRecord.COMPRESSED_BYTE_LENGTH) {
readInAfterReceivedRemoteCallEvent(buffer);
break;
}
buffer.clear();
}
}
private boolean readInTraceRecordChunks(final ByteBuffer buffer) {
if (buffer.remaining() >= (4 + 4)) {
int recordAmountNowComming = buffer.getInt();
final int bytesComming = buffer.getInt();
if (buffer.remaining() >= bytesComming) {
while (recordAmountNowComming > 0) {
try {
currentlyOpenTrace.getTraceEvents().add(
AbstractEventRecord.createFromByteBuffer(buffer, stringRegistry));
} catch (final IdNotAvailableException e) {
// should not happen
e.printStackTrace();
}
buffer.position(buffer.position() - 1);
buffer.compact();
return;
recordAmountNowComming--;
}
default: {
System.out.println("unknown class id " + clazzId + " at offset "
+ (buffer.position() - 1));
buffer.clear();
return;
if (currentlyOpenTrace.getTraceEvents().size() == currentlyOpenTrace
.getEventsLength()) {
putInRingBuffer(currentlyOpenTrace);
currentlyOpenTrace = null;
return true;
}
} else {
buffer.position(buffer.position() - 4 - 4);
buffer.compact();
}
} else {
buffer.compact();
}
buffer.clear();
return false;
}
private final void readInHostApplicationMetaData(final ByteBuffer buffer) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment