diff --git a/src/explorviz/hpc_monitoring/filter/reconstruction/TraceBuffer.java b/src/explorviz/hpc_monitoring/filter/reconstruction/TraceBuffer.java index a220139d24e3d31d2407052cd17fad3e175c1745..b28c915d9ac7e8d0ac2420c8203f65096b052d54 100644 --- a/src/explorviz/hpc_monitoring/filter/reconstruction/TraceBuffer.java +++ b/src/explorviz/hpc_monitoring/filter/reconstruction/TraceBuffer.java @@ -46,8 +46,8 @@ public class TraceBuffer { } if (!events.add(event)) { - // System.out.println("Duplicate entry for orderIndex " + orderIndex - // + " with traceId " + traceMetadata.getTraceId()); + System.out.println("Duplicate entry for orderIndex " + orderIndex + + " with traceId " + event.getTraceId()); damaged = true; } } @@ -69,8 +69,6 @@ public class TraceBuffer { public void setTrace(final HostApplicationMetadata trace) { if (traceMetadata != null) { - // System.out.println("Duplicate Trace entry for traceId " - // + trace.getTraceId()); damaged = true; return; } @@ -86,7 +84,7 @@ public class TraceBuffer { || (openEvents != 0) || (traceMetadata == null) || damaged); } - public final Trace toTrace() { // TODO still slow? + public final Trace toTrace() { final AbstractOperationEvent[] arrayEvents = new AbstractOperationEvent[events .size()]; final Iterator<AbstractOperationEvent> iterator = events.iterator(); diff --git a/src/explorviz/hpc_monitoring/filter/reconstruction/TraceReconstructionFilter.java b/src/explorviz/hpc_monitoring/filter/reconstruction/TraceReconstructionFilter.java index ef14c0ec3646d9bcfcf6cf70e4b0ad7c7a9101a9..5c590aeedd364e1b908d8f7cc9904b111fcb2589 100644 --- a/src/explorviz/hpc_monitoring/filter/reconstruction/TraceReconstructionFilter.java +++ b/src/explorviz/hpc_monitoring/filter/reconstruction/TraceReconstructionFilter.java @@ -40,12 +40,12 @@ public final class TraceReconstructionFilter implements RecordEvent.EVENT_FACTORY, 16384, exec); final EventHandler<RecordEvent>[] eventHandlers = new EventHandler[1]; - eventHandlers[0] = new TracePatternSummarizationFilter(2 * 1000 * 1000, + eventHandlers[0] = new TracePatternSummarizationFilter(1 * 1000, endReceiver); disruptor.handleEventsWith(eventHandlers); ringBuffer = disruptor.start(); - new TimeReader(2 * 1000, this).start(); + new TimeReader(1 * 1000, this).start(); } @Override @@ -67,14 +67,14 @@ public final class TraceReconstructionFilter implements private void sendOutValidTrace(final Trace trace) { counter.inputObjects(trace); - // putInRingBuffer(trace); + putInRingBuffer(trace); } private void sendOutInvalidTrace(final Trace trace) { // counter.inputObjects(trace); // putInRingBuffer(trace); // TODO - // System.out.println("Invalid trace: " - // + trace.getTraceMetadata().getTraceId()); + System.out.println("Invalid trace: " + + trace.getTraceEvents()[0].getTraceId()); } private void putInRingBuffer(final IRecord record) { @@ -88,17 +88,12 @@ public final class TraceReconstructionFilter implements public void onEvent(final RecordEvent event, final long sequence, final boolean endOfBatch) throws Exception { final IRecord record = event.getValue(); - if (record instanceof HostApplicationMetadata) { - final HostApplicationMetadata traceMetadata = ((HostApplicationMetadata) record); - - // final long traceId = traceMetadata.getTraceId(); - // final TraceBuffer traceBuffer = getBufferForTraceId(traceId); - // traceBuffer.setTrace(traceMetadata); - } else if (record instanceof AbstractOperationEvent) { + if (record instanceof AbstractOperationEvent) { final AbstractOperationEvent abstractOperationEvent = ((AbstractOperationEvent) record); final long traceId = abstractOperationEvent.getTraceId(); - final TraceBuffer traceBuffer = getBufferForTraceId(traceId); + final TraceBuffer traceBuffer = getBufferForTraceId(traceId, + event.getMetadata()); traceBuffer.insertEvent(abstractOperationEvent); if (traceBuffer.isFinished()) { @@ -108,10 +103,12 @@ public final class TraceReconstructionFilter implements } } - private TraceBuffer getBufferForTraceId(final long traceId) { + private TraceBuffer getBufferForTraceId(final long traceId, + final HostApplicationMetadata metadata) { TraceBuffer traceBuffer = traceId2trace.get(traceId); if (traceBuffer == null) { traceBuffer = new TraceBuffer(); + traceBuffer.setTrace(metadata); traceId2trace.put(traceId, traceBuffer); } return traceBuffer; diff --git a/src/explorviz/hpc_monitoring/filter/reduction/TracePatternSummarizationFilter.java b/src/explorviz/hpc_monitoring/filter/reduction/TracePatternSummarizationFilter.java index 29a5b016f6d37cc5438ee838ee99b60410ab781b..c34630c00db6b9064d12e39fd9c162d708a967e7 100644 --- a/src/explorviz/hpc_monitoring/filter/reduction/TracePatternSummarizationFilter.java +++ b/src/explorviz/hpc_monitoring/filter/reduction/TracePatternSummarizationFilter.java @@ -95,7 +95,7 @@ public class TracePatternSummarizationFilter implements TraceAggregationBuffer traceAggregationBuffer = trace2buffer.get(trace); if (traceAggregationBuffer == null) { traceAggregationBuffer = new TraceAggregationBuffer( - System.nanoTime()); + System.currentTimeMillis()); trace2buffer.put(trace, traceAggregationBuffer); } traceAggregationBuffer.insertTrace(trace); diff --git a/src/explorviz/hpc_monitoring/reader/TCPReader.java b/src/explorviz/hpc_monitoring/reader/TCPReader.java index 61e6a190e363e3a1eb0c94313d3df91fb2e3133a..366d222ef44b471d7aa32194ea52b8d103d5ba66 100644 --- a/src/explorviz/hpc_monitoring/reader/TCPReader.java +++ b/src/explorviz/hpc_monitoring/reader/TCPReader.java @@ -97,11 +97,11 @@ public final class TCPReader { private void putInRingBuffer(final IRecord message) { counter.inputObjects(message); - // System.out.println(message.toString()); - // final long hiseq = ringBuffer.next(); - // final RecordEvent valueEvent = ringBuffer.get(hiseq); - // valueEvent.setValue(message); - // ringBuffer.publish(hiseq); + final long hiseq = ringBuffer.next(); + final RecordEvent valueEvent = ringBuffer.get(hiseq); + valueEvent.setValue(message); + valueEvent.setMetadata(hostApplicationMetadata); + ringBuffer.publish(hiseq); } public void terminate(final boolean error) { @@ -165,17 +165,16 @@ public final class TCPReader { final String hostname = getStringFromRegistry(hostnameId); final String application = getStringFromRegistry(applicationId); - // if ((hostname != null) && (application != null)) { - hostApplicationMetadata = new HostApplicationMetadata(hostname, - application); - // } else { - // final byte[] message = new - // byte[TraceMetadata.BYTE_LENGTH_WITH_CLAZZ_ID]; - // System.arraycopy(b, offset - // - TraceMetadata.BYTE_LENGTH_WITH_CLAZZ_ID, message, 0, - // TraceMetadata.BYTE_LENGTH_WITH_CLAZZ_ID); - // putInWaitingMessages(message); - // } + if ((hostname != null) && (application != null)) { + hostApplicationMetadata = new HostApplicationMetadata(hostname, + application); + } else { + final byte[] message = new byte[HostApplicationMetadata.BYTE_LENGTH_WITH_CLAZZ_ID]; + buffer.position(buffer.position() + - HostApplicationMetadata.BYTE_LENGTH_WITH_CLAZZ_ID); + buffer.get(message); + putInWaitingMessages(message); + } } private final void readInBeforeOperationEvent(final ByteBuffer buffer) { @@ -186,17 +185,16 @@ public final class TCPReader { final String operation = getStringFromRegistry(operationId); - // if (operation != null) { - putInRingBuffer(new BeforeOperationEvent(timestamp, traceId, - orderIndex, operation)); - // } else { - // final byte[] message = new - // byte[BeforeOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID]; - // System.arraycopy(b, offset - // - BeforeOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID, message, - // 0, BeforeOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID); - // putInWaitingMessages(message); - // } + if (operation != null) { + putInRingBuffer(new BeforeOperationEvent(timestamp, traceId, + orderIndex, operation)); + } else { + final byte[] message = new byte[BeforeOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID]; + buffer.position(buffer.position() + - BeforeOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID); + buffer.get(message); + putInWaitingMessages(message); + } } private final void readInAfterFailedOperationEvent(final ByteBuffer buffer) { @@ -209,18 +207,16 @@ public final class TCPReader { final String operation = getStringFromRegistry(operationId); final String cause = getStringFromRegistry(causeId); - // if ((operation != null) && (cause != null)) { - putInRingBuffer(new AfterFailedOperationEvent(timestamp, traceId, - orderIndex, operation, cause)); - // } else { - // final byte[] message = new - // byte[AfterFailedOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID]; - // System.arraycopy(b, offset - // - AfterFailedOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID, - // message, 0, - // AfterFailedOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID); - // putInWaitingMessages(message); - // } + if ((operation != null) && (cause != null)) { + putInRingBuffer(new AfterFailedOperationEvent(timestamp, traceId, + orderIndex, operation, cause)); + } else { + final byte[] message = new byte[AfterFailedOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID]; + buffer.position(buffer.position() + - AfterFailedOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID); + buffer.get(message); + putInWaitingMessages(message); + } } private final void readInAfterOperationEvent(final ByteBuffer buffer) { @@ -230,17 +226,16 @@ public final class TCPReader { final int operationId = buffer.getInt(); final String operation = getStringFromRegistry(operationId); - // if (operation != null) { - putInRingBuffer(new AfterOperationEvent(timestamp, traceId, orderIndex, - operation)); - // } else { - // final byte[] message = new - // byte[AfterOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID]; - // System.arraycopy(b, offset - // - AfterOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID, message, - // 0, AfterOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID); - // putInWaitingMessages(message); - // } + if (operation != null) { + putInRingBuffer(new AfterOperationEvent(timestamp, traceId, + orderIndex, operation)); + } else { + final byte[] message = new byte[AfterOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID]; + buffer.position(buffer.position() + - AfterOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID); + buffer.get(message); + putInWaitingMessages(message); + } } @@ -255,26 +250,26 @@ public final class TCPReader { } waitingForStringMessages.clear(); - // for (final byte[] waitingMessage : localWaitingList) { - // final int waitingMessageClazzId = UnsafeBits.getInt(waitingMessage, - // 0); - // switch (waitingMessageClazzId) { - // case TraceMetadata.CLAZZ_ID: - // readInTraceMetadata(waitingMessage, 4); - // break; - // case BeforeOperationEvent.CLAZZ_ID: - // readInBeforeOperationEvent(waitingMessage, 4); - // break; - // case AfterFailedOperationEvent.CLAZZ_ID: - // readInAfterFailedOperationEvent(waitingMessage, 4); - // break; - // case AfterOperationEvent.CLAZZ_ID: - // readInAfterOperationEvent(waitingMessage, 4); - // break; - // default: - // break; - // } - // } + for (final byte[] waitingMessage : localWaitingList) { + final ByteBuffer buffer = ByteBuffer.wrap(waitingMessage); + final byte waitingMessageClazzId = buffer.get(); + switch (waitingMessageClazzId) { + case HostApplicationMetadata.CLAZZ_ID: + readInTraceMetadata(buffer); + break; + case BeforeOperationEvent.CLAZZ_ID: + readInBeforeOperationEvent(buffer); + break; + case AfterFailedOperationEvent.CLAZZ_ID: + readInAfterFailedOperationEvent(buffer); + break; + case AfterOperationEvent.CLAZZ_ID: + readInAfterOperationEvent(buffer); + break; + default: + break; + } + } } public void addToRegistry(final int key, final String value) {