Skip to content
Snippets Groups Projects
Commit 96ea03b0 authored by Florian Fittkau's avatar Florian Fittkau
Browse files

optimizations

parent 045dbbad
No related branches found
No related tags found
No related merge requests found
package explorviz.hpc_monitoring.filter.reconstruction; package explorviz.hpc_monitoring.filter.reconstruction;
import java.io.Serializable; import java.io.Serializable;
import java.util.*; import java.util.Comparator;
import java.util.Iterator;
import java.util.TreeSet;
import explorviz.hpc_monitoring.record.HostApplicationMetadata;
import explorviz.hpc_monitoring.record.Trace; import explorviz.hpc_monitoring.record.Trace;
import explorviz.hpc_monitoring.record.TraceMetadata;
import explorviz.hpc_monitoring.record.events.AbstractOperationEvent; import explorviz.hpc_monitoring.record.events.AbstractOperationEvent;
import explorviz.hpc_monitoring.record.events.normal.*; import explorviz.hpc_monitoring.record.events.normal.AfterFailedOperationEvent;
import explorviz.hpc_monitoring.record.events.normal.AfterOperationEvent;
import explorviz.hpc_monitoring.record.events.normal.BeforeOperationEvent;
public class TraceBuffer { public class TraceBuffer {
private static final Comparator<AbstractOperationEvent> COMPARATOR = new AbstractOperationEventComperator(); private static final Comparator<AbstractOperationEvent> COMPARATOR = new AbstractOperationEventComperator();
private TraceMetadata traceMetadata; private HostApplicationMetadata traceMetadata;
private final TreeSet<AbstractOperationEvent> events = new TreeSet<AbstractOperationEvent>( private final TreeSet<AbstractOperationEvent> events = new TreeSet<AbstractOperationEvent>(
COMPARATOR); COMPARATOR);
...@@ -34,17 +39,15 @@ public class TraceBuffer { ...@@ -34,17 +39,15 @@ public class TraceBuffer {
closeable = true; closeable = true;
} }
openEvents++; openEvents++;
} } else if (event instanceof AfterOperationEvent) {
else if (event instanceof AfterOperationEvent) {
openEvents--; openEvents--;
} } else if (event instanceof AfterFailedOperationEvent) {
else if (event instanceof AfterFailedOperationEvent) {
openEvents--; openEvents--;
} }
if (!events.add(event)) { if (!events.add(event)) {
System.out.println("Duplicate entry for orderIndex " + orderIndex // System.out.println("Duplicate entry for orderIndex " + orderIndex
+ " with traceId " + traceMetadata.getTraceId()); // + " with traceId " + traceMetadata.getTraceId());
damaged = true; damaged = true;
} }
} }
...@@ -64,10 +67,10 @@ public class TraceBuffer { ...@@ -64,10 +67,10 @@ public class TraceBuffer {
return orderIndex; return orderIndex;
} }
public void setTrace(final TraceMetadata trace) { public void setTrace(final HostApplicationMetadata trace) {
if (traceMetadata != null) { if (traceMetadata != null) {
System.out.println("Duplicate Trace entry for traceId " // System.out.println("Duplicate Trace entry for traceId "
+ trace.getTraceId()); // + trace.getTraceId());
damaged = true; damaged = true;
return; return;
} }
......
...@@ -12,9 +12,9 @@ import explorviz.hpc_monitoring.filter.counting.CountingThroughputFilter; ...@@ -12,9 +12,9 @@ import explorviz.hpc_monitoring.filter.counting.CountingThroughputFilter;
import explorviz.hpc_monitoring.filter.reduction.TracePatternSummarizationFilter; import explorviz.hpc_monitoring.filter.reduction.TracePatternSummarizationFilter;
import explorviz.hpc_monitoring.reader.IPeriodicTimeSignalReceiver; import explorviz.hpc_monitoring.reader.IPeriodicTimeSignalReceiver;
import explorviz.hpc_monitoring.reader.TimeReader; import explorviz.hpc_monitoring.reader.TimeReader;
import explorviz.hpc_monitoring.record.HostApplicationMetadata;
import explorviz.hpc_monitoring.record.IRecord; import explorviz.hpc_monitoring.record.IRecord;
import explorviz.hpc_monitoring.record.Trace; import explorviz.hpc_monitoring.record.Trace;
import explorviz.hpc_monitoring.record.TraceMetadata;
import explorviz.hpc_monitoring.record.events.AbstractOperationEvent; import explorviz.hpc_monitoring.record.events.AbstractOperationEvent;
import gnu.trove.iterator.TLongObjectIterator; import gnu.trove.iterator.TLongObjectIterator;
import gnu.trove.map.hash.TLongObjectHashMap; import gnu.trove.map.hash.TLongObjectHashMap;
...@@ -40,8 +40,8 @@ public final class TraceReconstructionFilter implements ...@@ -40,8 +40,8 @@ public final class TraceReconstructionFilter implements
RecordEvent.EVENT_FACTORY, 16384, exec); RecordEvent.EVENT_FACTORY, 16384, exec);
final EventHandler<RecordEvent>[] eventHandlers = new EventHandler[1]; final EventHandler<RecordEvent>[] eventHandlers = new EventHandler[1];
eventHandlers[0] = new TracePatternSummarizationFilter( eventHandlers[0] = new TracePatternSummarizationFilter(2 * 1000 * 1000,
2 * 1000 * 1000 * 1000, endReceiver); endReceiver);
disruptor.handleEventsWith(eventHandlers); disruptor.handleEventsWith(eventHandlers);
ringBuffer = disruptor.start(); ringBuffer = disruptor.start();
...@@ -67,14 +67,14 @@ public final class TraceReconstructionFilter implements ...@@ -67,14 +67,14 @@ public final class TraceReconstructionFilter implements
private void sendOutValidTrace(final Trace trace) { private void sendOutValidTrace(final Trace trace) {
counter.inputObjects(trace); counter.inputObjects(trace);
putInRingBuffer(trace); // putInRingBuffer(trace);
} }
private void sendOutInvalidTrace(final Trace trace) { private void sendOutInvalidTrace(final Trace trace) {
// counter.inputObjects(trace); // counter.inputObjects(trace);
// putInRingBuffer(trace); // TODO // putInRingBuffer(trace); // TODO
System.out.println("Invalid trace: " // System.out.println("Invalid trace: "
+ trace.getTraceMetadata().getTraceId()); // + trace.getTraceMetadata().getTraceId());
} }
private void putInRingBuffer(final IRecord record) { private void putInRingBuffer(final IRecord record) {
...@@ -88,12 +88,12 @@ public final class TraceReconstructionFilter implements ...@@ -88,12 +88,12 @@ public final class TraceReconstructionFilter implements
public void onEvent(final RecordEvent event, final long sequence, public void onEvent(final RecordEvent event, final long sequence,
final boolean endOfBatch) throws Exception { final boolean endOfBatch) throws Exception {
final IRecord record = event.getValue(); final IRecord record = event.getValue();
if (record instanceof TraceMetadata) { if (record instanceof HostApplicationMetadata) {
final TraceMetadata traceMetadata = ((TraceMetadata) record); final HostApplicationMetadata traceMetadata = ((HostApplicationMetadata) record);
final long traceId = traceMetadata.getTraceId(); // final long traceId = traceMetadata.getTraceId();
final TraceBuffer traceBuffer = getBufferForTraceId(traceId); // final TraceBuffer traceBuffer = getBufferForTraceId(traceId);
traceBuffer.setTrace(traceMetadata); // traceBuffer.setTrace(traceMetadata);
} else if (record instanceof AbstractOperationEvent) { } else if (record instanceof AbstractOperationEvent) {
final AbstractOperationEvent abstractOperationEvent = ((AbstractOperationEvent) record); final AbstractOperationEvent abstractOperationEvent = ((AbstractOperationEvent) record);
......
package explorviz.hpc_monitoring.reader;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import explorviz.hpc_monitoring.byteaccess.UnsafeBits;
import explorviz.hpc_monitoring.disruptor.ByteArrayEvent;
import explorviz.hpc_monitoring.disruptor.RecordEvent;
import explorviz.hpc_monitoring.filter.counting.CountingThroughputFilter;
import explorviz.hpc_monitoring.record.IRecord;
import explorviz.hpc_monitoring.record.TraceMetadata;
import explorviz.hpc_monitoring.record.events.normal.AfterFailedOperationEvent;
import explorviz.hpc_monitoring.record.events.normal.AfterOperationEvent;
import explorviz.hpc_monitoring.record.events.normal.BeforeOperationEvent;
import gnu.trove.map.hash.TIntObjectHashMap;
public class MessageDistributer implements EventHandler<ByteArrayEvent> {
private static final CountingThroughputFilter counter = new CountingThroughputFilter(
"Records per second");
private final TIntObjectHashMap<String> stringRegistry = new TIntObjectHashMap<String>(
64);
private final List<byte[]> waitingForStringMessages = new ArrayList<byte[]>(
1024);
private byte[] unreadBytes = null;
private final RingBuffer<RecordEvent> ringBuffer;
@SuppressWarnings("unchecked")
public MessageDistributer(final EventHandler<RecordEvent> endReceiver) {
final ExecutorService exec = Executors.newCachedThreadPool();
final Disruptor<RecordEvent> disruptor = new Disruptor<RecordEvent>(
RecordEvent.EVENT_FACTORY, 32768, exec);
ringBuffer = disruptor.start();
}
@Override
public void onEvent(final ByteArrayEvent event, final long sequence,
final boolean endOfBatch) throws Exception {
final byte[] received = event.getValue();
final int receivedLength = event.getLength();
byte[] messages = received;
int messagesLength = receivedLength;
if (unreadBytes != null) {
final int unreadBytesLength = unreadBytes.length;
messagesLength += unreadBytesLength;
messages = new byte[messagesLength];
System.arraycopy(unreadBytes, 0, messages, 0, unreadBytesLength);
System.arraycopy(received, 0, messages, unreadBytesLength,
receivedLength);
}
unreadBytes = messagesfromByteArray(messages, messagesLength);
}
private byte[] messagesfromByteArray(final byte[] b, final int readSize) {
int offset = 0;
while (offset < readSize) {
if ((readSize - offset) < 4) {
return createUnreadBytesArray(b, readSize, offset, false);
}
final int clazzId = UnsafeBits.getInt(b, offset);
offset += 4;
switch (clazzId) {
case TraceMetadata.CLAZZ_ID: {
if ((readSize - offset) < TraceMetadata.BYTE_LENGTH) {
return createUnreadBytesArray(b, readSize, offset, true);
}
offset = readInTraceMetadata(b, offset);
break;
}
case BeforeOperationEvent.CLAZZ_ID: {
if ((readSize - offset) < BeforeOperationEvent.BYTE_LENGTH) {
return createUnreadBytesArray(b, readSize, offset, true);
}
offset = readInBeforeOperationEvent(b, offset);
break;
}
case AfterFailedOperationEvent.CLAZZ_ID: {
if ((readSize - offset) < AfterFailedOperationEvent.BYTE_LENGTH) {
return createUnreadBytesArray(b, readSize, offset, true);
}
offset = readInAfterFailedOperationEvent(b, offset);
break;
}
case AfterOperationEvent.CLAZZ_ID: {
if ((readSize - offset) < AfterOperationEvent.BYTE_LENGTH) {
return createUnreadBytesArray(b, readSize, offset, true);
}
offset = readInAfterOperationEvent(b, offset);
break;
}
case 4: {
if ((readSize - offset) < (4 + 4)) {
return createUnreadBytesArray(b, readSize, offset, true);
}
final int mapId = UnsafeBits.getInt(b, offset);
offset += 4;
final int stringLength = UnsafeBits.getInt(b, offset);
offset += 4;
if ((readSize - offset) < stringLength) {
return createUnreadBytesArray(b, readSize, offset - 8,
true);
}
final byte[] stringBytes = new byte[stringLength];
System.arraycopy(b, offset, stringBytes, 0, stringLength);
final String string = new String(stringBytes);
offset += stringLength;
addToRegistry(mapId, string);
break;
}
default: {
System.out.println("unknown class id " + clazzId
+ " at offset " + (offset - 4));
return null;
}
}
}
return null;
}
private int readInTraceMetadata(final byte[] b, int offset) {
final long traceId = UnsafeBits.getLong(b, offset);
offset += 8;
final int hostnameId = UnsafeBits.getInt(b, offset);
offset += 4;
final long parentTraceId = UnsafeBits.getLong(b, offset);
offset += 8;
final int parentOrderId = UnsafeBits.getInt(b, offset);
offset += 4;
final int applicationId = UnsafeBits.getInt(b, offset);
offset += 4;
final String hostname = getStringFromRegistry(hostnameId);
final String application = getStringFromRegistry(applicationId);
if ((hostname != null) && (application != null)) {
putInRingBuffer(new TraceMetadata(traceId, hostname, parentTraceId,
parentOrderId, application));
} else {
final byte[] message = new byte[TraceMetadata.BYTE_LENGTH_WITH_CLAZZ_ID];
System.arraycopy(b, offset
- TraceMetadata.BYTE_LENGTH_WITH_CLAZZ_ID, message, 0,
TraceMetadata.BYTE_LENGTH_WITH_CLAZZ_ID);
putInWaitingMessages(message);
}
return offset;
}
private int readInBeforeOperationEvent(final byte[] b, int offset) {
final long timestamp = UnsafeBits.getLong(b, offset);
offset += 8;
final long traceId = UnsafeBits.getLong(b, offset);
offset += 8;
final int orderIndex = UnsafeBits.getInt(b, offset);
offset += 4;
final int operationId = UnsafeBits.getInt(b, offset);
offset += 4;
final String operation = getStringFromRegistry(operationId);
if (operation != null) {
putInRingBuffer(new BeforeOperationEvent(timestamp, traceId,
orderIndex, operation));
} else {
final byte[] message = new byte[BeforeOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID];
System.arraycopy(b, offset
- BeforeOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID, message,
0, BeforeOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID);
putInWaitingMessages(message);
}
return offset;
}
private int readInAfterFailedOperationEvent(final byte[] b, int offset) {
final long timestamp = UnsafeBits.getLong(b, offset);
offset += 8;
final long traceId = UnsafeBits.getLong(b, offset);
offset += 8;
final int orderIndex = UnsafeBits.getInt(b, offset);
offset += 4;
final int operationId = UnsafeBits.getInt(b, offset);
offset += 4;
final int causeId = UnsafeBits.getInt(b, offset);
offset += 4;
final String operation = getStringFromRegistry(operationId);
final String cause = getStringFromRegistry(causeId);
if ((operation != null) && (cause != null)) {
putInRingBuffer(new AfterFailedOperationEvent(timestamp, traceId,
orderIndex, operation, cause));
} else {
final byte[] message = new byte[AfterFailedOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID];
System.arraycopy(b, offset
- AfterFailedOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID,
message, 0,
AfterFailedOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID);
putInWaitingMessages(message);
}
return offset;
}
private int readInAfterOperationEvent(final byte[] b, int offset) {
final long timestamp = UnsafeBits.getLong(b, offset);
offset += 8;
final long traceId = UnsafeBits.getLong(b, offset);
offset += 8;
final int orderIndex = UnsafeBits.getInt(b, offset);
offset += 4;
final int operationId = UnsafeBits.getInt(b, offset);
offset += 4;
final String operation = getStringFromRegistry(operationId);
if (operation != null) {
putInRingBuffer(new AfterOperationEvent(timestamp, traceId,
orderIndex, operation));
} else {
final byte[] message = new byte[AfterOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID];
System.arraycopy(b, offset
- AfterOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID, message,
0, AfterOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID);
putInWaitingMessages(message);
}
return offset;
}
private void putInWaitingMessages(final byte[] message) {
waitingForStringMessages.add(message);
}
private byte[] createUnreadBytesArray(final byte[] b, final int readSize,
int offset, final boolean withClazzId) {
if (withClazzId) {
offset -= 4;
}
final int unreadBytesSize = readSize - offset;
final byte[] unreadBytes = new byte[unreadBytesSize];
System.arraycopy(b, offset, unreadBytes, 0, unreadBytesSize);
return unreadBytes;
}
private void putInRingBuffer(final IRecord record) {
counter.inputObjects(record);
// final long hiseq = ringBuffer.next();
// final RecordEvent valueEvent = ringBuffer.get(hiseq);
// valueEvent.setValue(record);
// ringBuffer.publish(hiseq);
}
public void addToRegistry(final int key, final String value) {
stringRegistry.put(key, value);
// System.out.println("put key " + key + " value " + value);
checkWaitingMessages();
}
private void checkWaitingMessages() {
final List<byte[]> localWaitingList = new ArrayList<byte[]>();
for (final byte[] waitingMessage : waitingForStringMessages) {
localWaitingList.add(waitingMessage);
}
waitingForStringMessages.clear();
for (final byte[] waitingMessage : localWaitingList) {
final int waitingMessageClazzId = UnsafeBits.getInt(waitingMessage,
0);
switch (waitingMessageClazzId) {
case TraceMetadata.CLAZZ_ID:
readInTraceMetadata(waitingMessage, 4);
break;
case BeforeOperationEvent.CLAZZ_ID:
readInBeforeOperationEvent(waitingMessage, 4);
break;
case AfterFailedOperationEvent.CLAZZ_ID:
readInAfterFailedOperationEvent(waitingMessage, 4);
break;
case AfterOperationEvent.CLAZZ_ID:
readInAfterOperationEvent(waitingMessage, 4);
break;
default:
break;
}
}
}
private String getStringFromRegistry(final int id) {
return stringRegistry.get(id);
}
}
...@@ -2,9 +2,12 @@ package explorviz.hpc_monitoring.reader; ...@@ -2,9 +2,12 @@ package explorviz.hpc_monitoring.reader;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel; import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
...@@ -12,21 +15,37 @@ import com.lmax.disruptor.EventHandler; ...@@ -12,21 +15,37 @@ import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.Disruptor;
import explorviz.hpc_monitoring.disruptor.ByteArrayEvent;
import explorviz.hpc_monitoring.disruptor.RecordEvent; import explorviz.hpc_monitoring.disruptor.RecordEvent;
import explorviz.hpc_monitoring.filter.counting.CountingThroughputFilter;
import explorviz.hpc_monitoring.filter.reconstruction.TraceReconstructionFilter;
import explorviz.hpc_monitoring.record.HostApplicationMetadata;
import explorviz.hpc_monitoring.record.IRecord;
import explorviz.hpc_monitoring.record.events.normal.AfterFailedOperationEvent;
import explorviz.hpc_monitoring.record.events.normal.AfterOperationEvent;
import explorviz.hpc_monitoring.record.events.normal.BeforeOperationEvent;
import gnu.trove.map.hash.TIntObjectHashMap;
public final class TCPReader { public final class TCPReader {
private static final int MESSAGE_BUFFER_SIZE = 131072; private static final int MESSAGE_BUFFER_SIZE = 131072;
private final int listeningPort; private final int listeningPort;
private ServerSocketChannel serversocket; private ServerSocketChannel serversocket;
private boolean active = true; private boolean active = true;
private final RingBuffer<ByteArrayEvent> ringBuffer; private final RingBuffer<RecordEvent> ringBuffer;
private final ByteBuffer buffer; private final ByteBuffer buffer;
private static final CountingThroughputFilter counter = new CountingThroughputFilter(
"Records per second");
private final static TIntObjectHashMap<String> stringRegistry = new TIntObjectHashMap<String>(
64);
private final static List<byte[]> waitingForStringMessages = new ArrayList<byte[]>(
1024);
private HostApplicationMetadata hostApplicationMetadata;
public TCPReader(final int listeningPort, public TCPReader(final int listeningPort,
final EventHandler<RecordEvent> endReceiver) final EventHandler<RecordEvent> endReceiver)
throws IllegalArgumentException { throws IllegalArgumentException {
...@@ -35,12 +54,13 @@ public final class TCPReader { ...@@ -35,12 +54,13 @@ public final class TCPReader {
buffer = ByteBuffer.allocateDirect(MESSAGE_BUFFER_SIZE); buffer = ByteBuffer.allocateDirect(MESSAGE_BUFFER_SIZE);
final ExecutorService exec = Executors.newCachedThreadPool(); final ExecutorService exec = Executors.newCachedThreadPool();
final Disruptor<ByteArrayEvent> disruptor = new Disruptor<ByteArrayEvent>( final Disruptor<RecordEvent> disruptor = new Disruptor<RecordEvent>(
ByteArrayEvent.EVENT_FACTORY, 32, exec); RecordEvent.EVENT_FACTORY, 32768, exec);
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
final EventHandler<ByteArrayEvent>[] eventHandlers = new EventHandler[1]; final EventHandler<RecordEvent>[] eventHandlers = new EventHandler[1];
eventHandlers[0] = new MessageDistributer(endReceiver); eventHandlers[0] = new TraceReconstructionFilter(1 * 1000 * 1000,
endReceiver);
disruptor.handleEventsWith(eventHandlers); disruptor.handleEventsWith(eventHandlers);
ringBuffer = disruptor.start(); ringBuffer = disruptor.start();
} }
...@@ -51,13 +71,9 @@ public final class TCPReader { ...@@ -51,13 +71,9 @@ public final class TCPReader {
while (active) { while (active) {
// TODO only one connection! // TODO only one connection!
final SocketChannel socketChannel = serversocket.accept(); final SocketChannel socketChannel = serversocket.accept();
int readBytes = 0; while ((socketChannel.read(buffer)) != -1) {
while ((readBytes = socketChannel.read(buffer)) != -1) {
final byte[] messages = new byte[readBytes];
buffer.flip(); buffer.flip();
buffer.get(messages, 0, readBytes); messagesfromByteArray(buffer);
putInRingBuffer(messages, readBytes);
buffer.clear();
} }
serversocket.close(); serversocket.close();
...@@ -79,16 +95,197 @@ public final class TCPReader { ...@@ -79,16 +95,197 @@ public final class TCPReader {
System.out.println("listening on port " + listeningPort); System.out.println("listening on port " + listeningPort);
} }
private void putInRingBuffer(final byte[] messages, final int readBytes) { private void putInRingBuffer(final IRecord message) {
final long hiseq = ringBuffer.next(); counter.inputObjects(message);
final ByteArrayEvent valueEvent = ringBuffer.get(hiseq); // System.out.println(message.toString());
valueEvent.setValue(messages); // final long hiseq = ringBuffer.next();
valueEvent.setLength(readBytes); // final RecordEvent valueEvent = ringBuffer.get(hiseq);
ringBuffer.publish(hiseq); // valueEvent.setValue(message);
// ringBuffer.publish(hiseq);
} }
public void terminate(final boolean error) { public void terminate(final boolean error) {
System.out.println("Shutdown of TCPReader requested."); System.out.println("Shutdown of TCPReader requested.");
active = false; active = false;
} }
public void messagesfromByteArray(final ByteBuffer buffer) {
while (buffer.remaining() > 0) {
buffer.mark();
try {
final byte clazzId = buffer.get();
switch (clazzId) {
case HostApplicationMetadata.CLAZZ_ID: {
readInTraceMetadata(buffer);
break;
}
case BeforeOperationEvent.CLAZZ_ID: {
readInBeforeOperationEvent(buffer);
break;
}
case AfterFailedOperationEvent.CLAZZ_ID: {
readInAfterFailedOperationEvent(buffer);
break;
}
case AfterOperationEvent.CLAZZ_ID: {
readInAfterOperationEvent(buffer);
break;
}
case 4: {
final int mapId = buffer.getInt();
final int stringLength = buffer.getInt();
final byte[] stringByteArray = new byte[stringLength];
buffer.get(stringByteArray);
addToRegistry(mapId, new String(stringByteArray));
break;
}
default: {
System.out.println("unknown class id " + clazzId
+ " at offset " + (buffer.position() - 4));
}
}
} catch (final BufferUnderflowException e) {
buffer.reset();
buffer.compact();
return;
}
}
buffer.clear();
}
private final void readInTraceMetadata(final ByteBuffer buffer) {
final int hostnameId = buffer.getInt();
final int applicationId = buffer.getInt();
final String hostname = getStringFromRegistry(hostnameId);
final String application = getStringFromRegistry(applicationId);
// if ((hostname != null) && (application != null)) {
hostApplicationMetadata = new HostApplicationMetadata(hostname,
application);
// } else {
// final byte[] message = new
// byte[TraceMetadata.BYTE_LENGTH_WITH_CLAZZ_ID];
// System.arraycopy(b, offset
// - TraceMetadata.BYTE_LENGTH_WITH_CLAZZ_ID, message, 0,
// TraceMetadata.BYTE_LENGTH_WITH_CLAZZ_ID);
// putInWaitingMessages(message);
// }
}
private final void readInBeforeOperationEvent(final ByteBuffer buffer) {
final long timestamp = buffer.getLong();
final long traceId = buffer.getLong();
final int orderIndex = buffer.getInt();
final int operationId = buffer.getInt();
final String operation = getStringFromRegistry(operationId);
// if (operation != null) {
putInRingBuffer(new BeforeOperationEvent(timestamp, traceId,
orderIndex, operation));
// } else {
// final byte[] message = new
// byte[BeforeOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID];
// System.arraycopy(b, offset
// - BeforeOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID, message,
// 0, BeforeOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID);
// putInWaitingMessages(message);
// }
}
private final void readInAfterFailedOperationEvent(final ByteBuffer buffer) {
final long timestamp = buffer.getLong();
final long traceId = buffer.getLong();
final int orderIndex = buffer.getInt();
final int operationId = buffer.getInt();
final int causeId = buffer.getInt();
final String operation = getStringFromRegistry(operationId);
final String cause = getStringFromRegistry(causeId);
// if ((operation != null) && (cause != null)) {
putInRingBuffer(new AfterFailedOperationEvent(timestamp, traceId,
orderIndex, operation, cause));
// } else {
// final byte[] message = new
// byte[AfterFailedOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID];
// System.arraycopy(b, offset
// - AfterFailedOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID,
// message, 0,
// AfterFailedOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID);
// putInWaitingMessages(message);
// }
}
private final void readInAfterOperationEvent(final ByteBuffer buffer) {
final long timestamp = buffer.getLong();
final long traceId = buffer.getLong();
final int orderIndex = buffer.getInt();
final int operationId = buffer.getInt();
final String operation = getStringFromRegistry(operationId);
// if (operation != null) {
putInRingBuffer(new AfterOperationEvent(timestamp, traceId, orderIndex,
operation));
// } else {
// final byte[] message = new
// byte[AfterOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID];
// System.arraycopy(b, offset
// - AfterOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID, message,
// 0, AfterOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID);
// putInWaitingMessages(message);
// }
}
private void putInWaitingMessages(final byte[] message) {
waitingForStringMessages.add(message);
}
private void checkWaitingMessages() {
final List<byte[]> localWaitingList = new ArrayList<byte[]>();
for (final byte[] waitingMessage : waitingForStringMessages) {
localWaitingList.add(waitingMessage);
}
waitingForStringMessages.clear();
// for (final byte[] waitingMessage : localWaitingList) {
// final int waitingMessageClazzId = UnsafeBits.getInt(waitingMessage,
// 0);
// switch (waitingMessageClazzId) {
// case TraceMetadata.CLAZZ_ID:
// readInTraceMetadata(waitingMessage, 4);
// break;
// case BeforeOperationEvent.CLAZZ_ID:
// readInBeforeOperationEvent(waitingMessage, 4);
// break;
// case AfterFailedOperationEvent.CLAZZ_ID:
// readInAfterFailedOperationEvent(waitingMessage, 4);
// break;
// case AfterOperationEvent.CLAZZ_ID:
// readInAfterOperationEvent(waitingMessage, 4);
// break;
// default:
// break;
// }
// }
}
public void addToRegistry(final int key, final String value) {
stringRegistry.put(key, value);
// System.out.println("put key " + key + " value " + value);
checkWaitingMessages();
}
private String getStringFromRegistry(final int id) {
return stringRegistry.get(id);
}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment