diff --git a/src/explorviz/hpc_monitoring/filter/reconstruction/TraceBuffer.java b/src/explorviz/hpc_monitoring/filter/reconstruction/TraceBuffer.java index c1cc32fa79b2b10411d61348dadc79e8169c4968..a220139d24e3d31d2407052cd17fad3e175c1745 100644 --- a/src/explorviz/hpc_monitoring/filter/reconstruction/TraceBuffer.java +++ b/src/explorviz/hpc_monitoring/filter/reconstruction/TraceBuffer.java @@ -1,112 +1,115 @@ package explorviz.hpc_monitoring.filter.reconstruction; import java.io.Serializable; -import java.util.*; +import java.util.Comparator; +import java.util.Iterator; +import java.util.TreeSet; + +import explorviz.hpc_monitoring.record.HostApplicationMetadata; import explorviz.hpc_monitoring.record.Trace; -import explorviz.hpc_monitoring.record.TraceMetadata; import explorviz.hpc_monitoring.record.events.AbstractOperationEvent; -import explorviz.hpc_monitoring.record.events.normal.*; +import explorviz.hpc_monitoring.record.events.normal.AfterFailedOperationEvent; +import explorviz.hpc_monitoring.record.events.normal.AfterOperationEvent; +import explorviz.hpc_monitoring.record.events.normal.BeforeOperationEvent; public class TraceBuffer { - private static final Comparator<AbstractOperationEvent> COMPARATOR = new AbstractOperationEventComperator(); - - private TraceMetadata traceMetadata; - private final TreeSet<AbstractOperationEvent> events = new TreeSet<AbstractOperationEvent>( - COMPARATOR); - - private boolean closeable; - private boolean damaged; - - private int openEvents; - private int maxOrderIndex = -1; - private long maxLoggingTimestamp = -1; - - public final long getMaxLoggingTimestamp() { - return maxLoggingTimestamp; - } - - public final void insertEvent(final AbstractOperationEvent event) { - setMaxLoggingTimestamp(event); - final int orderIndex = setMaxOrderIndex(event); - - if (event instanceof BeforeOperationEvent) { - if (orderIndex == 0) { - closeable = true; - } - openEvents++; - } - else if (event instanceof AfterOperationEvent) { - openEvents--; - } - else if (event instanceof AfterFailedOperationEvent) { - openEvents--; - } - - if (!events.add(event)) { - System.out.println("Duplicate entry for orderIndex " + orderIndex - + " with traceId " + traceMetadata.getTraceId()); - damaged = true; - } - } - - private final void setMaxLoggingTimestamp(final AbstractOperationEvent event) { - final long loggingTimestamp = event.getLoggingTimestamp(); - if (loggingTimestamp > maxLoggingTimestamp) { - maxLoggingTimestamp = loggingTimestamp; - } - } - - private final int setMaxOrderIndex(final AbstractOperationEvent event) { - final int orderIndex = event.getOrderIndex(); - if (orderIndex > maxOrderIndex) { - maxOrderIndex = orderIndex; - } - return orderIndex; - } - - public void setTrace(final TraceMetadata trace) { - if (traceMetadata != null) { - System.out.println("Duplicate Trace entry for traceId " - + trace.getTraceId()); - damaged = true; - return; - } - traceMetadata = trace; - } - - public final boolean isFinished() { - return !isInvalid() && closeable; - } - - public final boolean isInvalid() { - return (((maxOrderIndex + 1) != events.size()) || events.isEmpty() - || (openEvents != 0) || (traceMetadata == null) || damaged); - } - - public final Trace toTrace() { // TODO still slow? - final AbstractOperationEvent[] arrayEvents = new AbstractOperationEvent[events - .size()]; - final Iterator<AbstractOperationEvent> iterator = events.iterator(); - int index = 0; - while (iterator.hasNext()) { - arrayEvents[index] = iterator.next(); - index++; - } - - return new Trace(traceMetadata, arrayEvents); - } - - /** - * @author Jan Waller - */ - private static final class AbstractOperationEventComperator implements - Comparator<AbstractOperationEvent>, Serializable { - private static final long serialVersionUID = 8920737343446332517L; - - @Override - public int compare(final AbstractOperationEvent o1, - final AbstractOperationEvent o2) { - return o1.getOrderIndex() - o2.getOrderIndex(); - } - } + private static final Comparator<AbstractOperationEvent> COMPARATOR = new AbstractOperationEventComperator(); + + private HostApplicationMetadata traceMetadata; + private final TreeSet<AbstractOperationEvent> events = new TreeSet<AbstractOperationEvent>( + COMPARATOR); + + private boolean closeable; + private boolean damaged; + + private int openEvents; + private int maxOrderIndex = -1; + private long maxLoggingTimestamp = -1; + + public final long getMaxLoggingTimestamp() { + return maxLoggingTimestamp; + } + + public final void insertEvent(final AbstractOperationEvent event) { + setMaxLoggingTimestamp(event); + final int orderIndex = setMaxOrderIndex(event); + + if (event instanceof BeforeOperationEvent) { + if (orderIndex == 0) { + closeable = true; + } + openEvents++; + } else if (event instanceof AfterOperationEvent) { + openEvents--; + } else if (event instanceof AfterFailedOperationEvent) { + openEvents--; + } + + if (!events.add(event)) { + // System.out.println("Duplicate entry for orderIndex " + orderIndex + // + " with traceId " + traceMetadata.getTraceId()); + damaged = true; + } + } + + private final void setMaxLoggingTimestamp(final AbstractOperationEvent event) { + final long loggingTimestamp = event.getLoggingTimestamp(); + if (loggingTimestamp > maxLoggingTimestamp) { + maxLoggingTimestamp = loggingTimestamp; + } + } + + private final int setMaxOrderIndex(final AbstractOperationEvent event) { + final int orderIndex = event.getOrderIndex(); + if (orderIndex > maxOrderIndex) { + maxOrderIndex = orderIndex; + } + return orderIndex; + } + + public void setTrace(final HostApplicationMetadata trace) { + if (traceMetadata != null) { + // System.out.println("Duplicate Trace entry for traceId " + // + trace.getTraceId()); + damaged = true; + return; + } + traceMetadata = trace; + } + + public final boolean isFinished() { + return !isInvalid() && closeable; + } + + public final boolean isInvalid() { + return (((maxOrderIndex + 1) != events.size()) || events.isEmpty() + || (openEvents != 0) || (traceMetadata == null) || damaged); + } + + public final Trace toTrace() { // TODO still slow? + final AbstractOperationEvent[] arrayEvents = new AbstractOperationEvent[events + .size()]; + final Iterator<AbstractOperationEvent> iterator = events.iterator(); + int index = 0; + while (iterator.hasNext()) { + arrayEvents[index] = iterator.next(); + index++; + } + + return new Trace(traceMetadata, arrayEvents); + } + + /** + * @author Jan Waller + */ + private static final class AbstractOperationEventComperator implements + Comparator<AbstractOperationEvent>, Serializable { + private static final long serialVersionUID = 8920737343446332517L; + + @Override + public int compare(final AbstractOperationEvent o1, + final AbstractOperationEvent o2) { + return o1.getOrderIndex() - o2.getOrderIndex(); + } + } } diff --git a/src/explorviz/hpc_monitoring/filter/reconstruction/TraceReconstructionFilter.java b/src/explorviz/hpc_monitoring/filter/reconstruction/TraceReconstructionFilter.java index b4c8060d5c1d421848e34f767eed2d922d779d82..ef14c0ec3646d9bcfcf6cf70e4b0ad7c7a9101a9 100644 --- a/src/explorviz/hpc_monitoring/filter/reconstruction/TraceReconstructionFilter.java +++ b/src/explorviz/hpc_monitoring/filter/reconstruction/TraceReconstructionFilter.java @@ -12,9 +12,9 @@ import explorviz.hpc_monitoring.filter.counting.CountingThroughputFilter; import explorviz.hpc_monitoring.filter.reduction.TracePatternSummarizationFilter; import explorviz.hpc_monitoring.reader.IPeriodicTimeSignalReceiver; import explorviz.hpc_monitoring.reader.TimeReader; +import explorviz.hpc_monitoring.record.HostApplicationMetadata; import explorviz.hpc_monitoring.record.IRecord; import explorviz.hpc_monitoring.record.Trace; -import explorviz.hpc_monitoring.record.TraceMetadata; import explorviz.hpc_monitoring.record.events.AbstractOperationEvent; import gnu.trove.iterator.TLongObjectIterator; import gnu.trove.map.hash.TLongObjectHashMap; @@ -40,8 +40,8 @@ public final class TraceReconstructionFilter implements RecordEvent.EVENT_FACTORY, 16384, exec); final EventHandler<RecordEvent>[] eventHandlers = new EventHandler[1]; - eventHandlers[0] = new TracePatternSummarizationFilter( - 2 * 1000 * 1000 * 1000, endReceiver); + eventHandlers[0] = new TracePatternSummarizationFilter(2 * 1000 * 1000, + endReceiver); disruptor.handleEventsWith(eventHandlers); ringBuffer = disruptor.start(); @@ -67,14 +67,14 @@ public final class TraceReconstructionFilter implements private void sendOutValidTrace(final Trace trace) { counter.inputObjects(trace); - putInRingBuffer(trace); + // putInRingBuffer(trace); } private void sendOutInvalidTrace(final Trace trace) { // counter.inputObjects(trace); // putInRingBuffer(trace); // TODO - System.out.println("Invalid trace: " - + trace.getTraceMetadata().getTraceId()); + // System.out.println("Invalid trace: " + // + trace.getTraceMetadata().getTraceId()); } private void putInRingBuffer(final IRecord record) { @@ -88,12 +88,12 @@ public final class TraceReconstructionFilter implements public void onEvent(final RecordEvent event, final long sequence, final boolean endOfBatch) throws Exception { final IRecord record = event.getValue(); - if (record instanceof TraceMetadata) { - final TraceMetadata traceMetadata = ((TraceMetadata) record); + if (record instanceof HostApplicationMetadata) { + final HostApplicationMetadata traceMetadata = ((HostApplicationMetadata) record); - final long traceId = traceMetadata.getTraceId(); - final TraceBuffer traceBuffer = getBufferForTraceId(traceId); - traceBuffer.setTrace(traceMetadata); + // final long traceId = traceMetadata.getTraceId(); + // final TraceBuffer traceBuffer = getBufferForTraceId(traceId); + // traceBuffer.setTrace(traceMetadata); } else if (record instanceof AbstractOperationEvent) { final AbstractOperationEvent abstractOperationEvent = ((AbstractOperationEvent) record); diff --git a/src/explorviz/hpc_monitoring/reader/MessageDistributer.java b/src/explorviz/hpc_monitoring/reader/MessageDistributer.java deleted file mode 100644 index 249bef8fac048c3a705c05fc5226161a0ee85e81..0000000000000000000000000000000000000000 --- a/src/explorviz/hpc_monitoring/reader/MessageDistributer.java +++ /dev/null @@ -1,319 +0,0 @@ -package explorviz.hpc_monitoring.reader; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - -import com.lmax.disruptor.EventHandler; -import com.lmax.disruptor.RingBuffer; -import com.lmax.disruptor.dsl.Disruptor; - -import explorviz.hpc_monitoring.byteaccess.UnsafeBits; -import explorviz.hpc_monitoring.disruptor.ByteArrayEvent; -import explorviz.hpc_monitoring.disruptor.RecordEvent; -import explorviz.hpc_monitoring.filter.counting.CountingThroughputFilter; -import explorviz.hpc_monitoring.record.IRecord; -import explorviz.hpc_monitoring.record.TraceMetadata; -import explorviz.hpc_monitoring.record.events.normal.AfterFailedOperationEvent; -import explorviz.hpc_monitoring.record.events.normal.AfterOperationEvent; -import explorviz.hpc_monitoring.record.events.normal.BeforeOperationEvent; -import gnu.trove.map.hash.TIntObjectHashMap; - -public class MessageDistributer implements EventHandler<ByteArrayEvent> { - - private static final CountingThroughputFilter counter = new CountingThroughputFilter( - "Records per second"); - - private final TIntObjectHashMap<String> stringRegistry = new TIntObjectHashMap<String>( - 64); - - private final List<byte[]> waitingForStringMessages = new ArrayList<byte[]>( - 1024); - - private byte[] unreadBytes = null; - - private final RingBuffer<RecordEvent> ringBuffer; - - @SuppressWarnings("unchecked") - public MessageDistributer(final EventHandler<RecordEvent> endReceiver) { - final ExecutorService exec = Executors.newCachedThreadPool(); - final Disruptor<RecordEvent> disruptor = new Disruptor<RecordEvent>( - RecordEvent.EVENT_FACTORY, 32768, exec); - - ringBuffer = disruptor.start(); - } - - @Override - public void onEvent(final ByteArrayEvent event, final long sequence, - final boolean endOfBatch) throws Exception { - final byte[] received = event.getValue(); - final int receivedLength = event.getLength(); - - byte[] messages = received; - int messagesLength = receivedLength; - - if (unreadBytes != null) { - final int unreadBytesLength = unreadBytes.length; - - messagesLength += unreadBytesLength; - messages = new byte[messagesLength]; - - System.arraycopy(unreadBytes, 0, messages, 0, unreadBytesLength); - System.arraycopy(received, 0, messages, unreadBytesLength, - receivedLength); - } - - unreadBytes = messagesfromByteArray(messages, messagesLength); - } - - private byte[] messagesfromByteArray(final byte[] b, final int readSize) { - int offset = 0; - - while (offset < readSize) { - if ((readSize - offset) < 4) { - return createUnreadBytesArray(b, readSize, offset, false); - } - - final int clazzId = UnsafeBits.getInt(b, offset); - offset += 4; - - switch (clazzId) { - case TraceMetadata.CLAZZ_ID: { - if ((readSize - offset) < TraceMetadata.BYTE_LENGTH) { - return createUnreadBytesArray(b, readSize, offset, true); - } - - offset = readInTraceMetadata(b, offset); - break; - } - case BeforeOperationEvent.CLAZZ_ID: { - if ((readSize - offset) < BeforeOperationEvent.BYTE_LENGTH) { - return createUnreadBytesArray(b, readSize, offset, true); - } - - offset = readInBeforeOperationEvent(b, offset); - break; - } - case AfterFailedOperationEvent.CLAZZ_ID: { - if ((readSize - offset) < AfterFailedOperationEvent.BYTE_LENGTH) { - return createUnreadBytesArray(b, readSize, offset, true); - } - - offset = readInAfterFailedOperationEvent(b, offset); - break; - } - case AfterOperationEvent.CLAZZ_ID: { - if ((readSize - offset) < AfterOperationEvent.BYTE_LENGTH) { - return createUnreadBytesArray(b, readSize, offset, true); - } - - offset = readInAfterOperationEvent(b, offset); - break; - } - case 4: { - if ((readSize - offset) < (4 + 4)) { - return createUnreadBytesArray(b, readSize, offset, true); - } - - final int mapId = UnsafeBits.getInt(b, offset); - offset += 4; - final int stringLength = UnsafeBits.getInt(b, offset); - offset += 4; - - if ((readSize - offset) < stringLength) { - return createUnreadBytesArray(b, readSize, offset - 8, - true); - } - - final byte[] stringBytes = new byte[stringLength]; - System.arraycopy(b, offset, stringBytes, 0, stringLength); - final String string = new String(stringBytes); - offset += stringLength; - - addToRegistry(mapId, string); - - break; - } - default: { - System.out.println("unknown class id " + clazzId - + " at offset " + (offset - 4)); - return null; - } - } - } - - return null; - } - - private int readInTraceMetadata(final byte[] b, int offset) { - final long traceId = UnsafeBits.getLong(b, offset); - offset += 8; - final int hostnameId = UnsafeBits.getInt(b, offset); - offset += 4; - final long parentTraceId = UnsafeBits.getLong(b, offset); - offset += 8; - final int parentOrderId = UnsafeBits.getInt(b, offset); - offset += 4; - final int applicationId = UnsafeBits.getInt(b, offset); - offset += 4; - - final String hostname = getStringFromRegistry(hostnameId); - final String application = getStringFromRegistry(applicationId); - - if ((hostname != null) && (application != null)) { - putInRingBuffer(new TraceMetadata(traceId, hostname, parentTraceId, - parentOrderId, application)); - } else { - final byte[] message = new byte[TraceMetadata.BYTE_LENGTH_WITH_CLAZZ_ID]; - System.arraycopy(b, offset - - TraceMetadata.BYTE_LENGTH_WITH_CLAZZ_ID, message, 0, - TraceMetadata.BYTE_LENGTH_WITH_CLAZZ_ID); - putInWaitingMessages(message); - } - return offset; - } - - private int readInBeforeOperationEvent(final byte[] b, int offset) { - final long timestamp = UnsafeBits.getLong(b, offset); - offset += 8; - final long traceId = UnsafeBits.getLong(b, offset); - offset += 8; - final int orderIndex = UnsafeBits.getInt(b, offset); - offset += 4; - final int operationId = UnsafeBits.getInt(b, offset); - offset += 4; - - final String operation = getStringFromRegistry(operationId); - - if (operation != null) { - putInRingBuffer(new BeforeOperationEvent(timestamp, traceId, - orderIndex, operation)); - } else { - final byte[] message = new byte[BeforeOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID]; - System.arraycopy(b, offset - - BeforeOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID, message, - 0, BeforeOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID); - putInWaitingMessages(message); - } - return offset; - } - - private int readInAfterFailedOperationEvent(final byte[] b, int offset) { - final long timestamp = UnsafeBits.getLong(b, offset); - offset += 8; - final long traceId = UnsafeBits.getLong(b, offset); - offset += 8; - final int orderIndex = UnsafeBits.getInt(b, offset); - offset += 4; - final int operationId = UnsafeBits.getInt(b, offset); - offset += 4; - final int causeId = UnsafeBits.getInt(b, offset); - offset += 4; - - final String operation = getStringFromRegistry(operationId); - final String cause = getStringFromRegistry(causeId); - - if ((operation != null) && (cause != null)) { - putInRingBuffer(new AfterFailedOperationEvent(timestamp, traceId, - orderIndex, operation, cause)); - } else { - final byte[] message = new byte[AfterFailedOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID]; - System.arraycopy(b, offset - - AfterFailedOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID, - message, 0, - AfterFailedOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID); - putInWaitingMessages(message); - } - return offset; - } - - private int readInAfterOperationEvent(final byte[] b, int offset) { - final long timestamp = UnsafeBits.getLong(b, offset); - offset += 8; - final long traceId = UnsafeBits.getLong(b, offset); - offset += 8; - final int orderIndex = UnsafeBits.getInt(b, offset); - offset += 4; - final int operationId = UnsafeBits.getInt(b, offset); - offset += 4; - - final String operation = getStringFromRegistry(operationId); - if (operation != null) { - putInRingBuffer(new AfterOperationEvent(timestamp, traceId, - orderIndex, operation)); - } else { - final byte[] message = new byte[AfterOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID]; - System.arraycopy(b, offset - - AfterOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID, message, - 0, AfterOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID); - putInWaitingMessages(message); - } - - return offset; - } - - private void putInWaitingMessages(final byte[] message) { - waitingForStringMessages.add(message); - } - - private byte[] createUnreadBytesArray(final byte[] b, final int readSize, - int offset, final boolean withClazzId) { - if (withClazzId) { - offset -= 4; - } - final int unreadBytesSize = readSize - offset; - final byte[] unreadBytes = new byte[unreadBytesSize]; - System.arraycopy(b, offset, unreadBytes, 0, unreadBytesSize); - return unreadBytes; - } - - private void putInRingBuffer(final IRecord record) { - counter.inputObjects(record); - // final long hiseq = ringBuffer.next(); - // final RecordEvent valueEvent = ringBuffer.get(hiseq); - // valueEvent.setValue(record); - // ringBuffer.publish(hiseq); - } - - public void addToRegistry(final int key, final String value) { - stringRegistry.put(key, value); - - // System.out.println("put key " + key + " value " + value); - - checkWaitingMessages(); - } - - private void checkWaitingMessages() { - final List<byte[]> localWaitingList = new ArrayList<byte[]>(); - for (final byte[] waitingMessage : waitingForStringMessages) { - localWaitingList.add(waitingMessage); - } - waitingForStringMessages.clear(); - - for (final byte[] waitingMessage : localWaitingList) { - final int waitingMessageClazzId = UnsafeBits.getInt(waitingMessage, - 0); - switch (waitingMessageClazzId) { - case TraceMetadata.CLAZZ_ID: - readInTraceMetadata(waitingMessage, 4); - break; - case BeforeOperationEvent.CLAZZ_ID: - readInBeforeOperationEvent(waitingMessage, 4); - break; - case AfterFailedOperationEvent.CLAZZ_ID: - readInAfterFailedOperationEvent(waitingMessage, 4); - break; - case AfterOperationEvent.CLAZZ_ID: - readInAfterOperationEvent(waitingMessage, 4); - break; - default: - break; - } - } - } - - private String getStringFromRegistry(final int id) { - return stringRegistry.get(id); - } -} diff --git a/src/explorviz/hpc_monitoring/reader/TCPReader.java b/src/explorviz/hpc_monitoring/reader/TCPReader.java index 7180ab05771eac7622d871bfc5983aed4745e803..61e6a190e363e3a1eb0c94313d3df91fb2e3133a 100644 --- a/src/explorviz/hpc_monitoring/reader/TCPReader.java +++ b/src/explorviz/hpc_monitoring/reader/TCPReader.java @@ -2,9 +2,12 @@ package explorviz.hpc_monitoring.reader; import java.io.IOException; import java.net.InetSocketAddress; +import java.nio.BufferUnderflowException; import java.nio.ByteBuffer; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -12,21 +15,37 @@ import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.dsl.Disruptor; -import explorviz.hpc_monitoring.disruptor.ByteArrayEvent; import explorviz.hpc_monitoring.disruptor.RecordEvent; +import explorviz.hpc_monitoring.filter.counting.CountingThroughputFilter; +import explorviz.hpc_monitoring.filter.reconstruction.TraceReconstructionFilter; +import explorviz.hpc_monitoring.record.HostApplicationMetadata; +import explorviz.hpc_monitoring.record.IRecord; +import explorviz.hpc_monitoring.record.events.normal.AfterFailedOperationEvent; +import explorviz.hpc_monitoring.record.events.normal.AfterOperationEvent; +import explorviz.hpc_monitoring.record.events.normal.BeforeOperationEvent; +import gnu.trove.map.hash.TIntObjectHashMap; public final class TCPReader { private static final int MESSAGE_BUFFER_SIZE = 131072; - private final int listeningPort; private ServerSocketChannel serversocket; private boolean active = true; - private final RingBuffer<ByteArrayEvent> ringBuffer; + private final RingBuffer<RecordEvent> ringBuffer; private final ByteBuffer buffer; + private static final CountingThroughputFilter counter = new CountingThroughputFilter( + "Records per second"); + + private final static TIntObjectHashMap<String> stringRegistry = new TIntObjectHashMap<String>( + 64); + + private final static List<byte[]> waitingForStringMessages = new ArrayList<byte[]>( + 1024); + private HostApplicationMetadata hostApplicationMetadata; + public TCPReader(final int listeningPort, final EventHandler<RecordEvent> endReceiver) throws IllegalArgumentException { @@ -35,12 +54,13 @@ public final class TCPReader { buffer = ByteBuffer.allocateDirect(MESSAGE_BUFFER_SIZE); final ExecutorService exec = Executors.newCachedThreadPool(); - final Disruptor<ByteArrayEvent> disruptor = new Disruptor<ByteArrayEvent>( - ByteArrayEvent.EVENT_FACTORY, 32, exec); + final Disruptor<RecordEvent> disruptor = new Disruptor<RecordEvent>( + RecordEvent.EVENT_FACTORY, 32768, exec); @SuppressWarnings("unchecked") - final EventHandler<ByteArrayEvent>[] eventHandlers = new EventHandler[1]; - eventHandlers[0] = new MessageDistributer(endReceiver); + final EventHandler<RecordEvent>[] eventHandlers = new EventHandler[1]; + eventHandlers[0] = new TraceReconstructionFilter(1 * 1000 * 1000, + endReceiver); disruptor.handleEventsWith(eventHandlers); ringBuffer = disruptor.start(); } @@ -51,13 +71,9 @@ public final class TCPReader { while (active) { // TODO only one connection! final SocketChannel socketChannel = serversocket.accept(); - int readBytes = 0; - while ((readBytes = socketChannel.read(buffer)) != -1) { - final byte[] messages = new byte[readBytes]; + while ((socketChannel.read(buffer)) != -1) { buffer.flip(); - buffer.get(messages, 0, readBytes); - putInRingBuffer(messages, readBytes); - buffer.clear(); + messagesfromByteArray(buffer); } serversocket.close(); @@ -79,16 +95,197 @@ public final class TCPReader { System.out.println("listening on port " + listeningPort); } - private void putInRingBuffer(final byte[] messages, final int readBytes) { - final long hiseq = ringBuffer.next(); - final ByteArrayEvent valueEvent = ringBuffer.get(hiseq); - valueEvent.setValue(messages); - valueEvent.setLength(readBytes); - ringBuffer.publish(hiseq); + private void putInRingBuffer(final IRecord message) { + counter.inputObjects(message); + // System.out.println(message.toString()); + // final long hiseq = ringBuffer.next(); + // final RecordEvent valueEvent = ringBuffer.get(hiseq); + // valueEvent.setValue(message); + // ringBuffer.publish(hiseq); } public void terminate(final boolean error) { System.out.println("Shutdown of TCPReader requested."); active = false; } + + public void messagesfromByteArray(final ByteBuffer buffer) { + while (buffer.remaining() > 0) { + buffer.mark(); + try { + final byte clazzId = buffer.get(); + + switch (clazzId) { + case HostApplicationMetadata.CLAZZ_ID: { + readInTraceMetadata(buffer); + break; + } + case BeforeOperationEvent.CLAZZ_ID: { + readInBeforeOperationEvent(buffer); + break; + } + case AfterFailedOperationEvent.CLAZZ_ID: { + readInAfterFailedOperationEvent(buffer); + break; + } + case AfterOperationEvent.CLAZZ_ID: { + readInAfterOperationEvent(buffer); + break; + } + case 4: { + final int mapId = buffer.getInt(); + final int stringLength = buffer.getInt(); + + final byte[] stringByteArray = new byte[stringLength]; + + buffer.get(stringByteArray); + + addToRegistry(mapId, new String(stringByteArray)); + break; + } + default: { + System.out.println("unknown class id " + clazzId + + " at offset " + (buffer.position() - 4)); + } + } + } catch (final BufferUnderflowException e) { + buffer.reset(); + buffer.compact(); + return; + } + } + + buffer.clear(); + } + + private final void readInTraceMetadata(final ByteBuffer buffer) { + final int hostnameId = buffer.getInt(); + final int applicationId = buffer.getInt(); + + final String hostname = getStringFromRegistry(hostnameId); + final String application = getStringFromRegistry(applicationId); + + // if ((hostname != null) && (application != null)) { + hostApplicationMetadata = new HostApplicationMetadata(hostname, + application); + // } else { + // final byte[] message = new + // byte[TraceMetadata.BYTE_LENGTH_WITH_CLAZZ_ID]; + // System.arraycopy(b, offset + // - TraceMetadata.BYTE_LENGTH_WITH_CLAZZ_ID, message, 0, + // TraceMetadata.BYTE_LENGTH_WITH_CLAZZ_ID); + // putInWaitingMessages(message); + // } + } + + private final void readInBeforeOperationEvent(final ByteBuffer buffer) { + final long timestamp = buffer.getLong(); + final long traceId = buffer.getLong(); + final int orderIndex = buffer.getInt(); + final int operationId = buffer.getInt(); + + final String operation = getStringFromRegistry(operationId); + + // if (operation != null) { + putInRingBuffer(new BeforeOperationEvent(timestamp, traceId, + orderIndex, operation)); + // } else { + // final byte[] message = new + // byte[BeforeOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID]; + // System.arraycopy(b, offset + // - BeforeOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID, message, + // 0, BeforeOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID); + // putInWaitingMessages(message); + // } + } + + private final void readInAfterFailedOperationEvent(final ByteBuffer buffer) { + final long timestamp = buffer.getLong(); + final long traceId = buffer.getLong(); + final int orderIndex = buffer.getInt(); + final int operationId = buffer.getInt(); + final int causeId = buffer.getInt(); + + final String operation = getStringFromRegistry(operationId); + final String cause = getStringFromRegistry(causeId); + + // if ((operation != null) && (cause != null)) { + putInRingBuffer(new AfterFailedOperationEvent(timestamp, traceId, + orderIndex, operation, cause)); + // } else { + // final byte[] message = new + // byte[AfterFailedOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID]; + // System.arraycopy(b, offset + // - AfterFailedOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID, + // message, 0, + // AfterFailedOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID); + // putInWaitingMessages(message); + // } + } + + private final void readInAfterOperationEvent(final ByteBuffer buffer) { + final long timestamp = buffer.getLong(); + final long traceId = buffer.getLong(); + final int orderIndex = buffer.getInt(); + final int operationId = buffer.getInt(); + + final String operation = getStringFromRegistry(operationId); + // if (operation != null) { + putInRingBuffer(new AfterOperationEvent(timestamp, traceId, orderIndex, + operation)); + // } else { + // final byte[] message = new + // byte[AfterOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID]; + // System.arraycopy(b, offset + // - AfterOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID, message, + // 0, AfterOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID); + // putInWaitingMessages(message); + // } + + } + + private void putInWaitingMessages(final byte[] message) { + waitingForStringMessages.add(message); + } + + private void checkWaitingMessages() { + final List<byte[]> localWaitingList = new ArrayList<byte[]>(); + for (final byte[] waitingMessage : waitingForStringMessages) { + localWaitingList.add(waitingMessage); + } + waitingForStringMessages.clear(); + + // for (final byte[] waitingMessage : localWaitingList) { + // final int waitingMessageClazzId = UnsafeBits.getInt(waitingMessage, + // 0); + // switch (waitingMessageClazzId) { + // case TraceMetadata.CLAZZ_ID: + // readInTraceMetadata(waitingMessage, 4); + // break; + // case BeforeOperationEvent.CLAZZ_ID: + // readInBeforeOperationEvent(waitingMessage, 4); + // break; + // case AfterFailedOperationEvent.CLAZZ_ID: + // readInAfterFailedOperationEvent(waitingMessage, 4); + // break; + // case AfterOperationEvent.CLAZZ_ID: + // readInAfterOperationEvent(waitingMessage, 4); + // break; + // default: + // break; + // } + // } + } + + public void addToRegistry(final int key, final String value) { + stringRegistry.put(key, value); + + // System.out.println("put key " + key + " value " + value); + + checkWaitingMessages(); + } + + private String getStringFromRegistry(final int id) { + return stringRegistry.get(id); + } }