From a0d209fe2eb2bb4811b8fde6453e761383b505e5 Mon Sep 17 00:00:00 2001 From: Florian Fittkau <ffi@informatik.uni-kiel.de> Date: Tue, 27 Aug 2013 09:29:15 +0200 Subject: [PATCH] summarization now working --- .../TraceReconstructionFilter.java | 210 +++++---- .../TracePatternSummarizationFilter.java | 202 ++++---- .../reader/IPeriodicTimeSignalReceiver.java | 7 + .../reader/MessageDistributer.java | 441 +++++++++--------- .../hpc_monitoring/reader/TCPReader.java | 121 ++--- .../hpc_monitoring/reader/TimeReader.java | 78 ++-- .../worker/main/WorkerController.java | 8 +- 7 files changed, 555 insertions(+), 512 deletions(-) create mode 100644 src/explorviz/hpc_monitoring/reader/IPeriodicTimeSignalReceiver.java diff --git a/src/explorviz/hpc_monitoring/filter/reconstruction/TraceReconstructionFilter.java b/src/explorviz/hpc_monitoring/filter/reconstruction/TraceReconstructionFilter.java index db4cd7a..09aa8a6 100644 --- a/src/explorviz/hpc_monitoring/filter/reconstruction/TraceReconstructionFilter.java +++ b/src/explorviz/hpc_monitoring/filter/reconstruction/TraceReconstructionFilter.java @@ -2,115 +2,125 @@ package explorviz.hpc_monitoring.filter.reconstruction; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; + import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.dsl.Disruptor; + import explorviz.hpc_monitoring.filter.counting.CountingThroughputFilter; import explorviz.hpc_monitoring.filter.reduction.TracePatternSummarizationFilter; +import explorviz.hpc_monitoring.reader.IPeriodicTimeSignalReceiver; import explorviz.hpc_monitoring.reader.RecordEvent; -import explorviz.hpc_monitoring.record.*; +import explorviz.hpc_monitoring.reader.TimeReader; +import explorviz.hpc_monitoring.record.IRecord; +import explorviz.hpc_monitoring.record.Trace; +import explorviz.hpc_monitoring.record.TraceMetadata; import explorviz.hpc_monitoring.record.events.AbstractOperationEvent; import gnu.trove.iterator.TLongObjectIterator; import gnu.trove.map.hash.TLongObjectHashMap; public final class TraceReconstructionFilter implements - EventHandler<RecordEvent> { - private static final CountingThroughputFilter counter = new CountingThroughputFilter( - "Reconstructed traces per second"); - - private final long maxTraceTimeout; - - private final TLongObjectHashMap<TraceBuffer> traceId2trace = new TLongObjectHashMap<TraceBuffer>( - 1024); - private final RingBuffer<RecordEvent> ringBuffer; - - @SuppressWarnings("unchecked") - public TraceReconstructionFilter(final long maxTraceTimeout) { - this.maxTraceTimeout = maxTraceTimeout; - - final ExecutorService exec = Executors.newCachedThreadPool(); - final Disruptor<RecordEvent> disruptor = new Disruptor<RecordEvent>( - RecordEvent.EVENT_FACTORY, 16384, exec); - - final EventHandler<RecordEvent>[] eventHandlers = new EventHandler[1]; - eventHandlers[0] = new TracePatternSummarizationFilter(5 * 1000 * 1000); - disruptor.handleEventsWith(eventHandlers); - ringBuffer = disruptor.start(); - } - - public void periodicTimeSignal(final long timestamp) { - checkForTimeouts(timestamp); - } - - private void checkForTimeouts(final long timestamp) { - final long traceTimeout = timestamp - maxTraceTimeout; - for (final TLongObjectIterator<TraceBuffer> iterator = traceId2trace - .iterator(); iterator.hasNext(); iterator.advance()) { - final TraceBuffer traceBuffer = iterator.value(); - if (traceBuffer.getMaxLoggingTimestamp() <= traceTimeout) { - sendOutInvalidTrace(traceBuffer.toTrace()); - iterator.remove(); - } - } - } - - private void sendOutValidTrace(final Trace trace) { - counter.inputObjects(trace); - // putInRingBuffer(trace); - } - - private void sendOutInvalidTrace(final Trace trace) { - counter.inputObjects(trace); - // putInRingBuffer(trace); - } - - private void putInRingBuffer(final IRecord record) { - final long hiseq = ringBuffer.next(); - final RecordEvent valueEvent = ringBuffer.get(hiseq); - valueEvent.setValue(record); - ringBuffer.publish(hiseq); - } - - @Override - public void onEvent(final RecordEvent event, final long sequence, - final boolean endOfBatch) throws Exception { - final IRecord record = event.getValue(); - if (record instanceof TraceMetadata) { - final TraceMetadata traceMetadata = ((TraceMetadata) record); - - final long traceId = traceMetadata.getTraceId(); - final TraceBuffer traceBuffer = getBufferForTraceId(traceId); - traceBuffer.setTrace(traceMetadata); - } - else if (record instanceof AbstractOperationEvent) { - final AbstractOperationEvent abstractOperationEvent = ((AbstractOperationEvent) record); - - final long traceId = abstractOperationEvent.getTraceId(); - final TraceBuffer traceBuffer = getBufferForTraceId(traceId); - traceBuffer.insertEvent(abstractOperationEvent); - - if (traceBuffer.isFinished()) { - traceId2trace.remove(traceId); - sendOutValidTrace(traceBuffer.toTrace()); - } - } - } - - private TraceBuffer getBufferForTraceId(final long traceId) { - TraceBuffer traceBuffer = traceId2trace.get(traceId); - if (traceBuffer == null) { - traceBuffer = new TraceBuffer(); - traceId2trace.put(traceId, traceBuffer); - } - return traceBuffer; - } - - public void terminate() { - for (final Object entry : traceId2trace.values()) { - if (entry instanceof TraceBuffer) { - sendOutInvalidTrace(((TraceBuffer) entry).toTrace()); - } - } - traceId2trace.clear(); - } + EventHandler<RecordEvent>, IPeriodicTimeSignalReceiver { + private static final CountingThroughputFilter counter = new CountingThroughputFilter( + "Reconstructed traces per second"); + + private final long maxTraceTimeout; + + private final TLongObjectHashMap<TraceBuffer> traceId2trace = new TLongObjectHashMap<TraceBuffer>( + 1024); + private final RingBuffer<RecordEvent> ringBuffer; + + @SuppressWarnings("unchecked") + public TraceReconstructionFilter(final long maxTraceTimeout, + final EventHandler<RecordEvent> endReceiver) { + this.maxTraceTimeout = maxTraceTimeout; + + final ExecutorService exec = Executors.newCachedThreadPool(); + final Disruptor<RecordEvent> disruptor = new Disruptor<RecordEvent>( + RecordEvent.EVENT_FACTORY, 16384, exec); + + final EventHandler<RecordEvent>[] eventHandlers = new EventHandler[1]; + eventHandlers[0] = new TracePatternSummarizationFilter(5 * 1000 * 1000, + endReceiver); + disruptor.handleEventsWith(eventHandlers); + ringBuffer = disruptor.start(); + + new TimeReader(1 * 1000 * 1000, this).start(); + } + + @Override + public void periodicTimeSignal(final long timestamp) { + checkForTimeouts(timestamp); + } + + private void checkForTimeouts(final long timestamp) { + final long traceTimeout = timestamp - maxTraceTimeout; + for (final TLongObjectIterator<TraceBuffer> iterator = traceId2trace + .iterator(); iterator.hasNext(); iterator.advance()) { + final TraceBuffer traceBuffer = iterator.value(); + if (traceBuffer.getMaxLoggingTimestamp() <= traceTimeout) { + sendOutInvalidTrace(traceBuffer.toTrace()); + iterator.remove(); + } + } + } + + private void sendOutValidTrace(final Trace trace) { + counter.inputObjects(trace); + putInRingBuffer(trace); + } + + private void sendOutInvalidTrace(final Trace trace) { + counter.inputObjects(trace); + putInRingBuffer(trace); // TODO + } + + private void putInRingBuffer(final IRecord record) { + final long hiseq = ringBuffer.next(); + final RecordEvent valueEvent = ringBuffer.get(hiseq); + valueEvent.setValue(record); + ringBuffer.publish(hiseq); + } + + @Override + public void onEvent(final RecordEvent event, final long sequence, + final boolean endOfBatch) throws Exception { + final IRecord record = event.getValue(); + if (record instanceof TraceMetadata) { + final TraceMetadata traceMetadata = ((TraceMetadata) record); + + final long traceId = traceMetadata.getTraceId(); + final TraceBuffer traceBuffer = getBufferForTraceId(traceId); + traceBuffer.setTrace(traceMetadata); + } else if (record instanceof AbstractOperationEvent) { + final AbstractOperationEvent abstractOperationEvent = ((AbstractOperationEvent) record); + + final long traceId = abstractOperationEvent.getTraceId(); + final TraceBuffer traceBuffer = getBufferForTraceId(traceId); + traceBuffer.insertEvent(abstractOperationEvent); + + if (traceBuffer.isFinished()) { + traceId2trace.remove(traceId); + sendOutValidTrace(traceBuffer.toTrace()); + } + } + } + + private TraceBuffer getBufferForTraceId(final long traceId) { + TraceBuffer traceBuffer = traceId2trace.get(traceId); + if (traceBuffer == null) { + traceBuffer = new TraceBuffer(); + traceId2trace.put(traceId, traceBuffer); + } + return traceBuffer; + } + + public void terminate() { + for (final Object entry : traceId2trace.values()) { + if (entry instanceof TraceBuffer) { + sendOutInvalidTrace(((TraceBuffer) entry).toTrace()); + } + } + traceId2trace.clear(); + } } \ No newline at end of file diff --git a/src/explorviz/hpc_monitoring/filter/reduction/TracePatternSummarizationFilter.java b/src/explorviz/hpc_monitoring/filter/reduction/TracePatternSummarizationFilter.java index 47dd1bb..e32960f 100644 --- a/src/explorviz/hpc_monitoring/filter/reduction/TracePatternSummarizationFilter.java +++ b/src/explorviz/hpc_monitoring/filter/reduction/TracePatternSummarizationFilter.java @@ -1,90 +1,132 @@ package explorviz.hpc_monitoring.filter.reduction; -import java.util.*; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + import com.lmax.disruptor.EventHandler; +import com.lmax.disruptor.RingBuffer; +import com.lmax.disruptor.dsl.Disruptor; + +import explorviz.hpc_monitoring.filter.counting.CountingThroughputFilter; +import explorviz.hpc_monitoring.reader.IPeriodicTimeSignalReceiver; import explorviz.hpc_monitoring.reader.RecordEvent; +import explorviz.hpc_monitoring.reader.TimeReader; import explorviz.hpc_monitoring.record.IRecord; import explorviz.hpc_monitoring.record.Trace; import explorviz.hpc_monitoring.record.events.AbstractOperationEvent; public class TracePatternSummarizationFilter implements - EventHandler<RecordEvent> { - private final long maxCollectionDuration; - private final Map<Trace, TraceAggregationBuffer> trace2buffer = new TreeMap<Trace, TraceAggregationBuffer>( - new TraceComperator()); - - public TracePatternSummarizationFilter(final long maxCollectionDuration) { - this.maxCollectionDuration = maxCollectionDuration; - } - - public void periodicTimeSignal(final Long timestamp) { - processTimeoutQueue(timestamp); - } - - private void processTimeoutQueue(final long timestamp) { - final long bufferTimeout = timestamp - maxCollectionDuration; - final List<Trace> toRemove = new ArrayList<Trace>(); - for (final TraceAggregationBuffer traceBuffer : trace2buffer.values()) { - if (traceBuffer.getBufferCreatedTimestamp() <= bufferTimeout) { - final Trace aggregatedTrace = traceBuffer.getAggregatedTrace(); - sendOutTrace(aggregatedTrace); - toRemove.add(aggregatedTrace); - } - } - for (final Trace traceEventRecords : toRemove) { - trace2buffer.remove(traceEventRecords); - } - } - - private void sendOutTrace(final Trace aggregatedTrace) { - // TODO - } - - @Override - public void onEvent(final RecordEvent event, final long sequence, - final boolean endOfBatch) throws Exception { - final IRecord value = event.getValue(); - if (value instanceof Trace) { - insertIntoBuffer((Trace) value); - } - } - - private void insertIntoBuffer(final Trace trace) { - TraceAggregationBuffer traceAggregationBuffer = trace2buffer.get(trace); - if (traceAggregationBuffer == null) { - traceAggregationBuffer = new TraceAggregationBuffer( - System.nanoTime()); - trace2buffer.put(trace, traceAggregationBuffer); - } - traceAggregationBuffer.insertTrace(trace); - } - - public void terminate(final boolean error) { - for (final TraceAggregationBuffer traceBuffer : trace2buffer.values()) { - sendOutTrace(traceBuffer.getAggregatedTrace()); - } - trace2buffer.clear(); - } - - private static final class TraceComperator implements Comparator<Trace> { - - @Override - public int compare(final Trace t1, final Trace t2) { - final AbstractOperationEvent[] recordsT1 = t1.getTraceEvents(); - final AbstractOperationEvent[] recordsT2 = t2.getTraceEvents(); - - if ((recordsT1.length - recordsT2.length) != 0) { - return recordsT1.length - recordsT2.length; - } - - final int cmpHostnames = t1.getTraceMetadata().getHostname() - .compareTo(t2.getTraceMetadata().getHostname()); - if (cmpHostnames != 0) { - return cmpHostnames; - } - - // TODO deep check records - return 0; - } - } + EventHandler<RecordEvent>, IPeriodicTimeSignalReceiver { + private final long maxCollectionDuration; + + private final Map<Trace, TraceAggregationBuffer> trace2buffer = new TreeMap<Trace, TraceAggregationBuffer>( + new TraceComperator()); + + private static final CountingThroughputFilter counter = new CountingThroughputFilter( + "Reduced trace results per second"); + + private final RingBuffer<RecordEvent> ringBuffer; + + public TracePatternSummarizationFilter(final long maxCollectionDuration, + final EventHandler<RecordEvent> endReceiver) { + this.maxCollectionDuration = maxCollectionDuration; + + final ExecutorService exec = Executors.newCachedThreadPool(); + final Disruptor<RecordEvent> disruptor = new Disruptor<RecordEvent>( + RecordEvent.EVENT_FACTORY, 128, exec); + + @SuppressWarnings("unchecked") + final EventHandler<RecordEvent>[] eventHandlers = new EventHandler[1]; + eventHandlers[0] = endReceiver; + if (endReceiver != null) { + disruptor.handleEventsWith(eventHandlers); + } + ringBuffer = disruptor.start(); + new TimeReader(1 * 1000 * 1000, this).start(); + } + + @Override + public void periodicTimeSignal(final long timestamp) { + processTimeoutQueue(timestamp); + } + + private void processTimeoutQueue(final long timestamp) { + final long bufferTimeout = timestamp - maxCollectionDuration; + final List<Trace> toRemove = new ArrayList<Trace>(); + for (final TraceAggregationBuffer traceBuffer : trace2buffer.values()) { + if (traceBuffer.getBufferCreatedTimestamp() <= bufferTimeout) { + final Trace aggregatedTrace = traceBuffer.getAggregatedTrace(); + sendOutTrace(aggregatedTrace); + toRemove.add(aggregatedTrace); + } + } + for (final Trace traceEventRecords : toRemove) { + trace2buffer.remove(traceEventRecords); + } + } + + private void sendOutTrace(final Trace aggregatedTrace) { + counter.inputObjects(aggregatedTrace); + putInRingBuffer(aggregatedTrace); + } + + private void putInRingBuffer(final IRecord record) { + final long hiseq = ringBuffer.next(); + final RecordEvent valueEvent = ringBuffer.get(hiseq); + valueEvent.setValue(record); + ringBuffer.publish(hiseq); + } + + @Override + public void onEvent(final RecordEvent event, final long sequence, + final boolean endOfBatch) throws Exception { + final IRecord value = event.getValue(); + if (value instanceof Trace) { + insertIntoBuffer((Trace) value); + } + } + + private void insertIntoBuffer(final Trace trace) { + TraceAggregationBuffer traceAggregationBuffer = trace2buffer.get(trace); + if (traceAggregationBuffer == null) { + traceAggregationBuffer = new TraceAggregationBuffer( + System.nanoTime()); + trace2buffer.put(trace, traceAggregationBuffer); + } + traceAggregationBuffer.insertTrace(trace); + } + + public void terminate(final boolean error) { + for (final TraceAggregationBuffer traceBuffer : trace2buffer.values()) { + sendOutTrace(traceBuffer.getAggregatedTrace()); + } + trace2buffer.clear(); + } + + private static final class TraceComperator implements Comparator<Trace> { + + @Override + public int compare(final Trace t1, final Trace t2) { + final AbstractOperationEvent[] recordsT1 = t1.getTraceEvents(); + final AbstractOperationEvent[] recordsT2 = t2.getTraceEvents(); + + if ((recordsT1.length - recordsT2.length) != 0) { + return recordsT1.length - recordsT2.length; + } + + final int cmpHostnames = t1.getTraceMetadata().getHostname() + .compareTo(t2.getTraceMetadata().getHostname()); + if (cmpHostnames != 0) { + return cmpHostnames; + } + + // TODO deep check records + return 0; + } + } } diff --git a/src/explorviz/hpc_monitoring/reader/IPeriodicTimeSignalReceiver.java b/src/explorviz/hpc_monitoring/reader/IPeriodicTimeSignalReceiver.java new file mode 100644 index 0000000..d145afb --- /dev/null +++ b/src/explorviz/hpc_monitoring/reader/IPeriodicTimeSignalReceiver.java @@ -0,0 +1,7 @@ +package explorviz.hpc_monitoring.reader; + +public interface IPeriodicTimeSignalReceiver { + + void periodicTimeSignal(long timestamp); + +} diff --git a/src/explorviz/hpc_monitoring/reader/MessageDistributer.java b/src/explorviz/hpc_monitoring/reader/MessageDistributer.java index 29780c4..3d88fd1 100644 --- a/src/explorviz/hpc_monitoring/reader/MessageDistributer.java +++ b/src/explorviz/hpc_monitoring/reader/MessageDistributer.java @@ -2,235 +2,238 @@ package explorviz.hpc_monitoring.reader; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; + import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.dsl.Disruptor; + import explorviz.hpc_monitoring.byteaccess.UnsafeBits; import explorviz.hpc_monitoring.filter.counting.CountingThroughputFilter; import explorviz.hpc_monitoring.filter.reconstruction.TraceReconstructionFilter; import explorviz.hpc_monitoring.record.IRecord; import explorviz.hpc_monitoring.record.TraceMetadata; -import explorviz.hpc_monitoring.record.events.normal.*; +import explorviz.hpc_monitoring.record.events.normal.AfterFailedOperationEvent; +import explorviz.hpc_monitoring.record.events.normal.AfterOperationEvent; +import explorviz.hpc_monitoring.record.events.normal.BeforeOperationEvent; import gnu.trove.map.hash.TIntObjectHashMap; public class MessageDistributer implements EventHandler<ByteArrayEvent> { - private static final CountingThroughputFilter counter = new CountingThroughputFilter( - "Records per second"); - - private final TIntObjectHashMap<String> stringRegistry = new TIntObjectHashMap<String>( - 64); - - private byte[] unreadBytes = null; - - private final RingBuffer<RecordEvent> ringBuffer; - - @SuppressWarnings("unchecked") - public MessageDistributer() { - final ExecutorService exec = Executors.newCachedThreadPool(); - final Disruptor<RecordEvent> disruptor = new Disruptor<RecordEvent>( - RecordEvent.EVENT_FACTORY, 32768, exec); - - final EventHandler<RecordEvent>[] eventHandlers = new EventHandler[1]; - eventHandlers[0] = new TraceReconstructionFilter(5 * 1000 * 1000); - disruptor.handleEventsWith(eventHandlers); - ringBuffer = disruptor.start(); - } - - @Override - public void onEvent(final ByteArrayEvent event, final long sequence, - final boolean endOfBatch) throws Exception { - final byte[] received = event.getValue(); - final int receivedLength = event.getLength(); - - byte[] messages = received; - int messagesLength = receivedLength; - - if (unreadBytes != null) { - final int unreadBytesLength = unreadBytes.length; - - messagesLength += unreadBytesLength; - messages = new byte[messagesLength]; - - System.arraycopy(unreadBytes, 0, messages, 0, unreadBytesLength); - System.arraycopy(received, 0, messages, unreadBytesLength, - receivedLength); - } - - unreadBytes = messagesfromByteArray(messages, messagesLength); - } - - private byte[] messagesfromByteArray(final byte[] b, final int readSize) { - int offset = 0; - - while (offset < readSize) { - if ((readSize - offset) < 4) { - return createUnreadBytesArray(b, readSize, offset, false); - } - - final int clazzId = UnsafeBits.getInt(b, offset); - offset += 4; - - IRecord record = null; - - switch (clazzId) { - case 0: { - if ((readSize - offset) < (8 + 4 + 8 + 4 + 4)) { - return createUnreadBytesArray(b, readSize, offset, true); - } - - final long traceId = UnsafeBits.getLong(b, offset); - offset += 8; - final int hostnameId = UnsafeBits.getInt(b, offset); - offset += 4; - final long parentTraceId = UnsafeBits.getLong(b, offset); - offset += 8; - final int parentOrderId = UnsafeBits.getInt(b, offset); - offset += 4; - final int applicationId = UnsafeBits.getInt(b, offset); - offset += 4; - - record = new TraceMetadata(traceId, - getStringFromRegistry(hostnameId), parentTraceId, - parentOrderId, getStringFromRegistry(applicationId)); - break; - } - case 1: { - if ((readSize - offset) < (8 + 8 + 4 + 4)) { - return createUnreadBytesArray(b, readSize, offset, true); - } - - final long timestamp = UnsafeBits.getLong(b, offset); - offset += 8; - final long traceId = UnsafeBits.getLong(b, offset); - offset += 8; - final int orderIndex = UnsafeBits.getInt(b, offset); - offset += 4; - final int operationId = UnsafeBits.getInt(b, offset); - offset += 4; - - record = new BeforeOperationEvent(timestamp, traceId, - orderIndex, getStringFromRegistry(operationId)); - break; - } - case 2: { - if ((readSize - offset) < (8 + 8 + 4 + 4 + 4)) { - return createUnreadBytesArray(b, readSize, offset, true); - } - - final long timestamp = UnsafeBits.getLong(b, offset); - offset += 8; - final long traceId = UnsafeBits.getLong(b, offset); - offset += 8; - final int orderIndex = UnsafeBits.getInt(b, offset); - offset += 4; - final int operationId = UnsafeBits.getInt(b, offset); - offset += 4; - final int causeId = UnsafeBits.getInt(b, offset); - offset += 4; - - record = new AfterFailedOperationEvent(timestamp, traceId, - orderIndex, getStringFromRegistry(operationId), - getStringFromRegistry(causeId)); - break; - } - case 3: { - if ((readSize - offset) < (8 + 8 + 4 + 4)) { - return createUnreadBytesArray(b, readSize, offset, true); - } - - final long timestamp = UnsafeBits.getLong(b, offset); - offset += 8; - final long traceId = UnsafeBits.getLong(b, offset); - offset += 8; - final int orderIndex = UnsafeBits.getInt(b, offset); - offset += 4; - final int operationId = UnsafeBits.getInt(b, offset); - offset += 4; - - record = new AfterOperationEvent(timestamp, traceId, - orderIndex, getStringFromRegistry(operationId)); - break; - } - case 4: { - if ((readSize - offset) < (4 + 4)) { - return createUnreadBytesArray(b, readSize, offset, true); - } - - final int mapId = UnsafeBits.getInt(b, offset); - offset += 4; - final int stringLength = UnsafeBits.getInt(b, offset); - offset += 4; - - if ((readSize - offset) < stringLength) { - return createUnreadBytesArray(b, readSize, offset - 8, - true); - } - - final byte[] stringBytes = new byte[stringLength]; - System.arraycopy(b, offset, stringBytes, 0, stringLength); - final String string = new String(stringBytes); - offset += stringLength; - - addToRegistry(mapId, string); - - break; - } - default: { - System.out.println("unknown class id " + clazzId - + " at offset " + (offset - 4)); - return null; - } - } - - putInRingBuffer(record); - } - - return null; - } - - private byte[] createUnreadBytesArray(final byte[] b, final int readSize, - int offset, final boolean withClazzId) { - if (withClazzId) { - offset -= 4; - } - final int unreadBytesSize = readSize - offset; - final byte[] unreadBytes = new byte[unreadBytesSize]; - System.arraycopy(b, offset, unreadBytes, 0, unreadBytesSize); - return unreadBytes; - } - - private void putInRingBuffer(final IRecord record) { - counter.inputObjects(record); - final long hiseq = ringBuffer.next(); - final RecordEvent valueEvent = ringBuffer.get(hiseq); - valueEvent.setValue(record); - ringBuffer.publish(hiseq); - } - - public void addToRegistry(final int key, final String value) { - stringRegistry.put(key, value); - - synchronized (this) { - notifyAll(); - } - } - - private String getStringFromRegistry(final int id) { - String result = stringRegistry.get(id); - while (result == null) { - try { - synchronized (this) { - System.out.println("waiting for " + id); - this.wait(); - } - } - catch (final InterruptedException e) { - e.printStackTrace(); - } - result = stringRegistry.get(id); - } - - return result; - } + private static final CountingThroughputFilter counter = new CountingThroughputFilter( + "Records per second"); + + private final TIntObjectHashMap<String> stringRegistry = new TIntObjectHashMap<String>( + 64); + + private byte[] unreadBytes = null; + + private final RingBuffer<RecordEvent> ringBuffer; + + @SuppressWarnings("unchecked") + public MessageDistributer(final EventHandler<RecordEvent> endReceiver) { + final ExecutorService exec = Executors.newCachedThreadPool(); + final Disruptor<RecordEvent> disruptor = new Disruptor<RecordEvent>( + RecordEvent.EVENT_FACTORY, 32768, exec); + + final EventHandler<RecordEvent>[] eventHandlers = new EventHandler[1]; + eventHandlers[0] = new TraceReconstructionFilter(5 * 1000 * 1000, + endReceiver); + disruptor.handleEventsWith(eventHandlers); + ringBuffer = disruptor.start(); + } + + @Override + public void onEvent(final ByteArrayEvent event, final long sequence, + final boolean endOfBatch) throws Exception { + final byte[] received = event.getValue(); + final int receivedLength = event.getLength(); + + byte[] messages = received; + int messagesLength = receivedLength; + + if (unreadBytes != null) { + final int unreadBytesLength = unreadBytes.length; + + messagesLength += unreadBytesLength; + messages = new byte[messagesLength]; + + System.arraycopy(unreadBytes, 0, messages, 0, unreadBytesLength); + System.arraycopy(received, 0, messages, unreadBytesLength, + receivedLength); + } + + unreadBytes = messagesfromByteArray(messages, messagesLength); + } + + private byte[] messagesfromByteArray(final byte[] b, final int readSize) { + int offset = 0; + + while (offset < readSize) { + if ((readSize - offset) < 4) { + return createUnreadBytesArray(b, readSize, offset, false); + } + + final int clazzId = UnsafeBits.getInt(b, offset); + offset += 4; + + IRecord record = null; + + switch (clazzId) { + case 0: { + if ((readSize - offset) < (8 + 4 + 8 + 4 + 4)) { + return createUnreadBytesArray(b, readSize, offset, true); + } + + final long traceId = UnsafeBits.getLong(b, offset); + offset += 8; + final int hostnameId = UnsafeBits.getInt(b, offset); + offset += 4; + final long parentTraceId = UnsafeBits.getLong(b, offset); + offset += 8; + final int parentOrderId = UnsafeBits.getInt(b, offset); + offset += 4; + final int applicationId = UnsafeBits.getInt(b, offset); + offset += 4; + + record = new TraceMetadata(traceId, + getStringFromRegistry(hostnameId), parentTraceId, + parentOrderId, getStringFromRegistry(applicationId)); + break; + } + case 1: { + if ((readSize - offset) < (8 + 8 + 4 + 4)) { + return createUnreadBytesArray(b, readSize, offset, true); + } + + final long timestamp = UnsafeBits.getLong(b, offset); + offset += 8; + final long traceId = UnsafeBits.getLong(b, offset); + offset += 8; + final int orderIndex = UnsafeBits.getInt(b, offset); + offset += 4; + final int operationId = UnsafeBits.getInt(b, offset); + offset += 4; + + record = new BeforeOperationEvent(timestamp, traceId, + orderIndex, getStringFromRegistry(operationId)); + break; + } + case 2: { + if ((readSize - offset) < (8 + 8 + 4 + 4 + 4)) { + return createUnreadBytesArray(b, readSize, offset, true); + } + + final long timestamp = UnsafeBits.getLong(b, offset); + offset += 8; + final long traceId = UnsafeBits.getLong(b, offset); + offset += 8; + final int orderIndex = UnsafeBits.getInt(b, offset); + offset += 4; + final int operationId = UnsafeBits.getInt(b, offset); + offset += 4; + final int causeId = UnsafeBits.getInt(b, offset); + offset += 4; + + record = new AfterFailedOperationEvent(timestamp, traceId, + orderIndex, getStringFromRegistry(operationId), + getStringFromRegistry(causeId)); + break; + } + case 3: { + if ((readSize - offset) < (8 + 8 + 4 + 4)) { + return createUnreadBytesArray(b, readSize, offset, true); + } + + final long timestamp = UnsafeBits.getLong(b, offset); + offset += 8; + final long traceId = UnsafeBits.getLong(b, offset); + offset += 8; + final int orderIndex = UnsafeBits.getInt(b, offset); + offset += 4; + final int operationId = UnsafeBits.getInt(b, offset); + offset += 4; + + record = new AfterOperationEvent(timestamp, traceId, + orderIndex, getStringFromRegistry(operationId)); + break; + } + case 4: { + if ((readSize - offset) < (4 + 4)) { + return createUnreadBytesArray(b, readSize, offset, true); + } + + final int mapId = UnsafeBits.getInt(b, offset); + offset += 4; + final int stringLength = UnsafeBits.getInt(b, offset); + offset += 4; + + if ((readSize - offset) < stringLength) { + return createUnreadBytesArray(b, readSize, offset - 8, true); + } + + final byte[] stringBytes = new byte[stringLength]; + System.arraycopy(b, offset, stringBytes, 0, stringLength); + final String string = new String(stringBytes); + offset += stringLength; + + addToRegistry(mapId, string); + + break; + } + default: { + System.out.println("unknown class id " + clazzId + + " at offset " + (offset - 4)); + return null; + } + } + + putInRingBuffer(record); + } + + return null; + } + + private byte[] createUnreadBytesArray(final byte[] b, final int readSize, + int offset, final boolean withClazzId) { + if (withClazzId) { + offset -= 4; + } + final int unreadBytesSize = readSize - offset; + final byte[] unreadBytes = new byte[unreadBytesSize]; + System.arraycopy(b, offset, unreadBytes, 0, unreadBytesSize); + return unreadBytes; + } + + private void putInRingBuffer(final IRecord record) { + counter.inputObjects(record); + final long hiseq = ringBuffer.next(); + final RecordEvent valueEvent = ringBuffer.get(hiseq); + valueEvent.setValue(record); + ringBuffer.publish(hiseq); + } + + public void addToRegistry(final int key, final String value) { + stringRegistry.put(key, value); + + synchronized (this) { + notifyAll(); + } + } + + private String getStringFromRegistry(final int id) { + String result = stringRegistry.get(id); + while (result == null) { + try { + synchronized (this) { + System.out.println("waiting for " + id); + this.wait(); + } + } catch (final InterruptedException e) { + e.printStackTrace(); + } + result = stringRegistry.get(id); + } + + return result; + } } diff --git a/src/explorviz/hpc_monitoring/reader/TCPReader.java b/src/explorviz/hpc_monitoring/reader/TCPReader.java index 91f6a03..970e4be 100644 --- a/src/explorviz/hpc_monitoring/reader/TCPReader.java +++ b/src/explorviz/hpc_monitoring/reader/TCPReader.java @@ -6,80 +6,81 @@ import java.net.ServerSocket; import java.net.Socket; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; + import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.dsl.Disruptor; public final class TCPReader { - private static final int MESSAGE_BUFFER_SIZE = 65536; + private static final int MESSAGE_BUFFER_SIZE = 65536; - private final int listeningPort; + private final int listeningPort; - private ServerSocket serversocket; - private boolean active = true; + private ServerSocket serversocket; + private boolean active = true; - private final RingBuffer<ByteArrayEvent> ringBuffer; + private final RingBuffer<ByteArrayEvent> ringBuffer; - public TCPReader(final int listeningPort) throws IllegalArgumentException { - this.listeningPort = listeningPort; + public TCPReader(final int listeningPort, + final EventHandler<RecordEvent> endReceiver) + throws IllegalArgumentException { + this.listeningPort = listeningPort; - final ExecutorService exec = Executors.newCachedThreadPool(); - final Disruptor<ByteArrayEvent> disruptor = new Disruptor<ByteArrayEvent>( - ByteArrayEvent.EVENT_FACTORY, 4096, exec); + final ExecutorService exec = Executors.newCachedThreadPool(); + final Disruptor<ByteArrayEvent> disruptor = new Disruptor<ByteArrayEvent>( + ByteArrayEvent.EVENT_FACTORY, 4096, exec); - @SuppressWarnings("unchecked") - final EventHandler<ByteArrayEvent>[] eventHandlers = new EventHandler[1]; - eventHandlers[0] = new MessageDistributer(); - disruptor.handleEventsWith(eventHandlers); - ringBuffer = disruptor.start(); - } + @SuppressWarnings("unchecked") + final EventHandler<ByteArrayEvent>[] eventHandlers = new EventHandler[1]; + eventHandlers[0] = new MessageDistributer(endReceiver); + disruptor.handleEventsWith(eventHandlers); + ringBuffer = disruptor.start(); + } - public void read() { - try { - open(); - while (active) { - // TODO only one connection! - final Socket socket = serversocket.accept(); - final BufferedInputStream bufferedInputStream = new BufferedInputStream( - socket.getInputStream(), MESSAGE_BUFFER_SIZE); - int readBytes = 0; - byte[] messages = new byte[MESSAGE_BUFFER_SIZE]; - while ((readBytes = bufferedInputStream.read(messages, 0, - MESSAGE_BUFFER_SIZE)) != -1) { - putInRingBuffer(messages, readBytes); - messages = new byte[MESSAGE_BUFFER_SIZE]; - } + public void read() { + try { + open(); + while (active) { + // TODO only one connection! + final Socket socket = serversocket.accept(); + final BufferedInputStream bufferedInputStream = new BufferedInputStream( + socket.getInputStream(), MESSAGE_BUFFER_SIZE); + int readBytes = 0; + byte[] messages = new byte[MESSAGE_BUFFER_SIZE]; + while ((readBytes = bufferedInputStream.read(messages, 0, + MESSAGE_BUFFER_SIZE)) != -1) { + putInRingBuffer(messages, readBytes); + messages = new byte[MESSAGE_BUFFER_SIZE]; + } - socket.close(); - } - } - catch (final IOException ex) { - System.out.println("Error in read() " + ex.toString()); - } - finally { - try { - serversocket.close(); - } - catch (final IOException e) { - System.out.println("Error in read()" + e.toString()); - } - } - } + socket.close(); + } + } catch (final IOException ex) { + System.out.println("Error in read() " + ex.toString()); + } finally { + try { + serversocket.close(); + } catch (final IOException e) { + System.out.println("Error in read()" + e.toString()); + } + } + } - private void open() throws IOException { - serversocket = new ServerSocket(listeningPort); - } + private void open() throws IOException { + serversocket = new ServerSocket(listeningPort); + System.out.println("listening on port " + listeningPort); + } - private void putInRingBuffer(final byte[] messages, final int readBytes) { - final long hiseq = ringBuffer.next(); - final ByteArrayEvent valueEvent = ringBuffer.get(hiseq); - valueEvent.setValue(messages); - valueEvent.setLength(readBytes); - ringBuffer.publish(hiseq); - } + private void putInRingBuffer(final byte[] messages, final int readBytes) { + final long hiseq = ringBuffer.next(); + final ByteArrayEvent valueEvent = ringBuffer.get(hiseq); + valueEvent.setValue(messages); + valueEvent.setLength(readBytes); + ringBuffer.publish(hiseq); + } - public void terminate(final boolean error) { - System.out.println("Shutdown of TCPReader requested."); - active = false; - } + public void terminate(final boolean error) { + System.out.println("Shutdown of TCPReader requested."); + active = false; + } } diff --git a/src/explorviz/hpc_monitoring/reader/TimeReader.java b/src/explorviz/hpc_monitoring/reader/TimeReader.java index 5a6ec9f..cdfba27 100644 --- a/src/explorviz/hpc_monitoring/reader/TimeReader.java +++ b/src/explorviz/hpc_monitoring/reader/TimeReader.java @@ -1,54 +1,34 @@ package explorviz.hpc_monitoring.reader; -import java.util.concurrent.*; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; public final class TimeReader { - private final long period; - - private boolean terminated = false; - - private final ScheduledExecutorService executorService; - private ScheduledFuture<?> result; - - public TimeReader(final long periodInNanoSec) { - period = periodInNanoSec; - executorService = new ScheduledThreadPoolExecutor(1); - } - - public void read() { - result = executorService.scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - sendTimestampEvent(); - } - }, 0, period, TimeUnit.NANOSECONDS); - try { - result.get(); - } - catch (final ExecutionException ex) { - terminate(); - } - catch (final InterruptedException ignore) {} - catch (final CancellationException ignore) {} - terminate(); - } - - protected void sendTimestampEvent() { - // final long timestamp = System.nanoTime(); - // super.deliver(OUTPUT_PORT_NAME_TIMESTAMPS, timestamp); - } - - public void terminate() { - if (!terminated) { - executorService.shutdown(); - try { - terminated = executorService.awaitTermination(5, - TimeUnit.SECONDS); - } - catch (final InterruptedException ex) {} - if (!terminated) { - result.cancel(true); - } - } - } + private final long period; + + private final ScheduledExecutorService executorService; + + private final IPeriodicTimeSignalReceiver receiver; + + public TimeReader(final long periodInNanoSec, + final IPeriodicTimeSignalReceiver receiver) { + period = periodInNanoSec; + this.receiver = receiver; + executorService = new ScheduledThreadPoolExecutor(1); + } + + public void start() { + executorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + sendTimestampEvent(); + } + }, 0, period, TimeUnit.NANOSECONDS); + } + + protected void sendTimestampEvent() { + final long timestamp = System.nanoTime(); + receiver.periodicTimeSignal(timestamp); + } } diff --git a/src/explorviz/worker/main/WorkerController.java b/src/explorviz/worker/main/WorkerController.java index 0e138e4..bf0fda8 100644 --- a/src/explorviz/worker/main/WorkerController.java +++ b/src/explorviz/worker/main/WorkerController.java @@ -4,8 +4,8 @@ import explorviz.hpc_monitoring.reader.TCPReader; public class WorkerController { - public void start() { - final TCPReader tcpReader = new TCPReader(10133); - tcpReader.read(); - } + public void start() { + final TCPReader tcpReader = new TCPReader(10133, null); + tcpReader.read(); + } } -- GitLab