diff --git a/src/explorviz/hpc_monitoring/filter/reconstruction/TraceBuffer.java b/src/explorviz/hpc_monitoring/filter/reconstruction/TraceBuffer.java index d43634830d5640ebe9977c118c4b8885ba5213cb..00113367b67efd19cb02bf8ba69d32bed4d66afc 100644 --- a/src/explorviz/hpc_monitoring/filter/reconstruction/TraceBuffer.java +++ b/src/explorviz/hpc_monitoring/filter/reconstruction/TraceBuffer.java @@ -87,7 +87,7 @@ public class TraceBuffer { } public final Trace toTrace() { - return new Trace(traceMetadata, events); + return new Trace(traceMetadata, events, events.get(0).getTraceId()); } // private static final class AbstractOperationEventComperator implements diff --git a/src/explorviz/hpc_monitoring/filter/reconstruction/TraceReconstructionFilter.java b/src/explorviz/hpc_monitoring/filter/reconstruction/TraceReconstructionFilter.java index 797489cfd7998277c8f17448c805a85ec9347787..54a8c722e47e5eb4650f667c37a6ac71f1246f7f 100644 --- a/src/explorviz/hpc_monitoring/filter/reconstruction/TraceReconstructionFilter.java +++ b/src/explorviz/hpc_monitoring/filter/reconstruction/TraceReconstructionFilter.java @@ -5,18 +5,10 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import com.lmax.disruptor.EventHandler; -import com.lmax.disruptor.RingBuffer; -import com.lmax.disruptor.dsl.Disruptor; - -import explorviz.hpc_monitoring.disruptor.RecordArrayEvent; -import explorviz.hpc_monitoring.disruptor.RecordArrayEventFactory; -import explorviz.hpc_monitoring.disruptor.RecordEvent; -import explorviz.hpc_monitoring.filter.counting.CountingThroughputFilter; +import explorviz.hpc_monitoring.filter.AbstractFilter; +import explorviz.hpc_monitoring.filter.IReceiver; import explorviz.hpc_monitoring.filter.reduction.TracePatternSummarizationFilter; import explorviz.hpc_monitoring.reader.TimeProvider; import explorviz.hpc_monitoring.record.HostApplicationMetaData; @@ -26,35 +18,63 @@ import explorviz.hpc_monitoring.record.TimedPeriodRecord; import explorviz.hpc_monitoring.record.event.AbstractOperationEvent; import explorviz.hpc_monitoring.record.trace.Trace; -public final class TraceReconstructionFilter implements EventHandler<RecordArrayEvent> { - private static final int OUTPUT_MESSAGE_BUFFER_SIZE = 256; +public final class TraceReconstructionFilter extends AbstractFilter { private static final int RINGBUFFER_LENGTH = 32; - - private static final CountingThroughputFilter counter = new CountingThroughputFilter( - "Reconstructed traces/sec"); + private static final int OUTPUT_BATCH_SIZE = 256; private final long maxTraceTimeout; private final Map<Long, TraceBuffer> traceId2trace = new ConcurrentSkipListMap<Long, TraceBuffer>(); - private final RingBuffer<RecordArrayEvent> ringBuffer; - private IRecord[] outputBuffer = new IRecord[OUTPUT_MESSAGE_BUFFER_SIZE]; - private int outputBufferIndex = 0; - - public TraceReconstructionFilter(final long maxTraceTimeout, - final EventHandler<RecordEvent> endReceiver) { + public TraceReconstructionFilter(final long maxTraceTimeout, final IReceiver sinkReceiver) { + super( + new TracePatternSummarizationFilter(TimeUnit.MILLISECONDS.toNanos(990), + sinkReceiver), RINGBUFFER_LENGTH, OUTPUT_BATCH_SIZE, + "Reconstructed traces/sec"); this.maxTraceTimeout = maxTraceTimeout; + } - final ExecutorService exec = Executors.newCachedThreadPool(); - final Disruptor<RecordArrayEvent> disruptor = new Disruptor<RecordArrayEvent>( - new RecordArrayEventFactory(OUTPUT_MESSAGE_BUFFER_SIZE), RINGBUFFER_LENGTH, exec); + @Override + public void processRecord(final IRecord record, + final HostApplicationMetaData hostApplicationMetaData) { + if (record instanceof AbstractOperationEvent) { + final AbstractOperationEvent abstractOperationEvent = ((AbstractOperationEvent) record); + + final long traceId = abstractOperationEvent.getTraceId(); + final TraceBuffer traceBuffer = getBufferForTraceId( + abstractOperationEvent.getTraceId(), hostApplicationMetaData); + traceBuffer.insertEvent(abstractOperationEvent); + + if (traceBuffer.isFinished()) { + traceId2trace.remove(traceId); + sendOutValidTrace(traceBuffer.toTrace()); + } + } else if (record instanceof Trace) { + final Trace trace = (Trace) record; + if (trace.isValid()) { + deliver(trace); + } else { + getBufferForTraceId(trace.getTraceId(), trace.getHostMetadata()); + } + } else if (record instanceof TimedPeriodRecord) { + checkForTimeouts(TimeProvider.getCurrentTimestamp()); + periodicFlush(record); + } else if (record instanceof TerminateRecord) { + terminate(); + } else { + deliver(record); + } + } - @SuppressWarnings("unchecked") - final EventHandler<RecordArrayEvent>[] eventHandlers = new EventHandler[1]; - eventHandlers[0] = new TracePatternSummarizationFilter(TimeUnit.SECONDS.toNanos(1), - endReceiver); - disruptor.handleEventsWith(eventHandlers); - ringBuffer = disruptor.start(); + private TraceBuffer getBufferForTraceId(final long traceId, + final HostApplicationMetaData metadata) { + TraceBuffer traceBuffer = traceId2trace.get(traceId); + if (traceBuffer == null) { + traceBuffer = new TraceBuffer(); + traceBuffer.setTrace(metadata); + traceId2trace.put(traceId, traceBuffer); + } + return traceBuffer; } private void checkForTimeouts(final long timestamp) { @@ -76,77 +96,11 @@ public final class TraceReconstructionFilter implements EventHandler<RecordArray private void sendOutValidTrace(final Trace trace) { trace.setValid(true); - putInRingBuffer(trace); + deliver(trace); } private void sendOutInvalidTrace(final Trace trace) { - putInRingBuffer(trace); - System.out.println("Invalid trace: " + trace.getTraceEvents().get(0).getTraceId()); - } - - private void putInRingBuffer(final IRecord message) { - counter.inputObjects(message); - outputBuffer[outputBufferIndex++] = message; - - if (outputBufferIndex == OUTPUT_MESSAGE_BUFFER_SIZE) { - flushOutputBuffer(); - } - } - - private void flushOutputBuffer() { - if (outputBufferIndex > 0) { - final long hiseq = ringBuffer.next(); - final RecordArrayEvent valueEvent = ringBuffer.get(hiseq); - final IRecord[] oldValues = valueEvent.getValues(); - valueEvent.setValues(outputBuffer); - valueEvent.setValuesLength(outputBufferIndex); - ringBuffer.publish(hiseq); - - outputBuffer = oldValues; - - outputBufferIndex = 0; - } - } - - @Override - public void onEvent(final RecordArrayEvent event, final long sequence, final boolean endOfBatch) - throws Exception { - final IRecord[] values = event.getValues(); - final int valuesLength = event.getValuesLength(); - - for (int i = 0; i < valuesLength; i++) { - final IRecord record = values[i]; - if (record instanceof AbstractOperationEvent) { - final AbstractOperationEvent abstractOperationEvent = ((AbstractOperationEvent) record); - - final long traceId = abstractOperationEvent.getTraceId(); - final TraceBuffer traceBuffer = getBufferForTraceId( - abstractOperationEvent.getTraceId(), event.getMetadata()); - traceBuffer.insertEvent(abstractOperationEvent); - - if (traceBuffer.isFinished()) { - traceId2trace.remove(traceId); - sendOutValidTrace(traceBuffer.toTrace()); - } - } else if (record instanceof TimedPeriodRecord) { - checkForTimeouts(TimeProvider.getCurrentTimestamp()); // TODO - outputBuffer[outputBufferIndex++] = record; - flushOutputBuffer(); - } else if (record instanceof TerminateRecord) { - terminate(); - } - } - } - - private TraceBuffer getBufferForTraceId(final long traceId, - final HostApplicationMetaData metadata) { - TraceBuffer traceBuffer = traceId2trace.get(traceId); - if (traceBuffer == null) { - traceBuffer = new TraceBuffer(); - traceBuffer.setTrace(metadata); - traceId2trace.put(traceId, traceBuffer); - } - return traceBuffer; + deliver(trace); } private void terminate() { diff --git a/src/explorviz/hpc_monitoring/filter/reduction/TraceAggregationBuffer.java b/src/explorviz/hpc_monitoring/filter/reduction/TraceAggregationBuffer.java index f5749d1c4749177aad70c8e7ecae3b495e3cc8ea..6b90dde65c258c28252706f9cf8d6fb1e4799619 100644 --- a/src/explorviz/hpc_monitoring/filter/reduction/TraceAggregationBuffer.java +++ b/src/explorviz/hpc_monitoring/filter/reduction/TraceAggregationBuffer.java @@ -1,5 +1,8 @@ package explorviz.hpc_monitoring.filter.reduction; +import java.util.List; + +import explorviz.hpc_monitoring.record.event.AbstractOperationEvent; import explorviz.hpc_monitoring.record.trace.Trace; public class TraceAggregationBuffer { @@ -26,14 +29,12 @@ public class TraceAggregationBuffer { if (accumulator == null) { accumulator = trace; } else { - // final List<AbstractOperationEvent> aggregatedRecords = // TODO - // accumulator.getTraceEvents(); - // final List<AbstractOperationEvent> records = - // trace.getTraceEvents(); - // - // for (int i = 0; i < aggregatedRecords.size(); i++) { - // aggregatedRecords.get(i).getRuntime().merge(records.get(i).getRuntime()); - // } + final List<AbstractOperationEvent> aggregatedRecords = accumulator.getTraceEvents(); + final List<AbstractOperationEvent> records = trace.getTraceEvents(); + + for (int i = 0; i < aggregatedRecords.size(); i++) { + aggregatedRecords.get(i).getRuntime().merge(records.get(i).getRuntime()); + } accumulator.getRuntime().merge(trace.getRuntime()); } diff --git a/src/explorviz/hpc_monitoring/filter/reduction/TraceComperator.java b/src/explorviz/hpc_monitoring/filter/reduction/TraceComperator.java deleted file mode 100644 index 6fec9317113ac2e0cbe12e718c6d1ed0b367e5cb..0000000000000000000000000000000000000000 --- a/src/explorviz/hpc_monitoring/filter/reduction/TraceComperator.java +++ /dev/null @@ -1,59 +0,0 @@ -package explorviz.hpc_monitoring.filter.reduction; - -import java.util.Comparator; -import java.util.List; - -import explorviz.hpc_monitoring.record.event.AbstractOperationEvent; -import explorviz.hpc_monitoring.record.event.normal.AfterFailedOperationEvent; -import explorviz.hpc_monitoring.record.trace.Trace; - -public class TraceComperator implements Comparator<Trace> { - @Override - public int compare(final Trace t1, final Trace t2) { - final List<AbstractOperationEvent> recordsT1 = t1.getTraceEvents(); - final List<AbstractOperationEvent> recordsT2 = t2.getTraceEvents(); - - if ((recordsT1.size() - recordsT2.size()) != 0) { - return recordsT1.size() - recordsT2.size(); - } - - final int cmpHostnames = t1.getTraceMetadata().getHostname() - .compareTo(t2.getTraceMetadata().getHostname()); - if (cmpHostnames != 0) { - return cmpHostnames; - } - - final int cmpApplicationNames = t1.getTraceMetadata().getApplication() - .compareTo(t2.getTraceMetadata().getApplication()); - if (cmpApplicationNames != 0) { - return cmpApplicationNames; - } - - for (int i = 0; i < recordsT1.size(); i++) { - final AbstractOperationEvent recordT1 = recordsT1.get(i); - final AbstractOperationEvent recordT2 = recordsT2.get(i); - - final int cmpSignature = recordT1.getOperationSignatureId() - - recordT2.getOperationSignatureId(); - if (cmpSignature != 0) { - return cmpSignature; - } - - final int cmpClass = recordT1.getClass().getName() - .compareTo(recordT2.getClass().getName()); - if (cmpClass != 0) { - return cmpClass; - } - - if (recordT1 instanceof AfterFailedOperationEvent) { - final int cmpError = ((AfterFailedOperationEvent) recordT1).getCause().compareTo( - ((AfterFailedOperationEvent) recordT2).getCause()); - if (cmpError != 0) { - return cmpClass; - } - } - } - - return 0; - } -} diff --git a/src/explorviz/hpc_monitoring/filter/reduction/TracePatternSummarizationFilter.java b/src/explorviz/hpc_monitoring/filter/reduction/TracePatternSummarizationFilter.java index 567cdc8ac964a8dc5ee408bd8df84c617ae496c9..f7989aa35bf5f8b944ec79f189b538d1e3de4d02 100644 --- a/src/explorviz/hpc_monitoring/filter/reduction/TracePatternSummarizationFilter.java +++ b/src/explorviz/hpc_monitoring/filter/reduction/TracePatternSummarizationFilter.java @@ -4,72 +4,50 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import com.lmax.disruptor.EventHandler; -import com.lmax.disruptor.RingBuffer; -import com.lmax.disruptor.dsl.Disruptor; - -import explorviz.hpc_monitoring.disruptor.RecordArrayEvent; -import explorviz.hpc_monitoring.disruptor.RecordEvent; -import explorviz.hpc_monitoring.filter.counting.CountingThroughputFilter; +import explorviz.hpc_monitoring.filter.AbstractFilter; +import explorviz.hpc_monitoring.filter.IReceiver; import explorviz.hpc_monitoring.reader.TimeProvider; +import explorviz.hpc_monitoring.record.HostApplicationMetaData; import explorviz.hpc_monitoring.record.IRecord; import explorviz.hpc_monitoring.record.TerminateRecord; import explorviz.hpc_monitoring.record.TimedPeriodRecord; import explorviz.hpc_monitoring.record.trace.Trace; +import explorviz.hpc_monitoring.record.trace.TraceComperator; -public class TracePatternSummarizationFilter implements EventHandler<RecordArrayEvent> { +public class TracePatternSummarizationFilter extends AbstractFilter { private static final int RINGBUFFER_LENGTH = 256; + private static final int OUTPUT_BATCH_SIZE = 16; private final long maxCollectionDuration; private final Map<Trace, TraceAggregationBuffer> trace2buffer = new ConcurrentSkipListMap<Trace, TraceAggregationBuffer>( new TraceComperator()); - private static final CountingThroughputFilter counter = new CountingThroughputFilter( - "Reduced traces/sec"); - - private final RingBuffer<RecordEvent> ringBuffer; - public TracePatternSummarizationFilter(final long maxCollectionDuration, - final EventHandler<RecordEvent> endReceiver) { - this.maxCollectionDuration = maxCollectionDuration; - - final ExecutorService exec = Executors.newCachedThreadPool(); - final Disruptor<RecordEvent> disruptor = new Disruptor<RecordEvent>( - RecordEvent.EVENT_FACTORY, RINGBUFFER_LENGTH, exec); + final IReceiver sinkReceiver) { + super(sinkReceiver, RINGBUFFER_LENGTH, OUTPUT_BATCH_SIZE, "Reduced traces / sec"); - @SuppressWarnings("unchecked") - final EventHandler<RecordEvent>[] eventHandlers = new EventHandler[1]; - eventHandlers[0] = endReceiver; - if (endReceiver != null) { - disruptor.handleEventsWith(eventHandlers); - } - ringBuffer = disruptor.start(); + this.maxCollectionDuration = maxCollectionDuration; } @Override - public void onEvent(final RecordArrayEvent event, final long sequence, final boolean endOfBatch) - throws Exception { - final IRecord[] values = event.getValues(); - final int valuesLength = event.getValuesLength(); - - for (int i = 0; i < valuesLength; i++) { - final IRecord record = values[i]; - if (record instanceof Trace) { - final Trace trace = (Trace) record; - if (trace.isValid()) { - insertIntoBuffer(trace); - } else { - sendOutTrace(trace); - } - } else if (record instanceof TimedPeriodRecord) { - processTimeoutQueue(TimeProvider.getCurrentTimestamp()); - } else if (record instanceof TerminateRecord) { - terminate(); + public void processRecord(final IRecord record, + final HostApplicationMetaData hostApplicationMetaData) { + if (record instanceof Trace) { + final Trace trace = (Trace) record; + if (trace.isValid()) { + insertIntoBuffer(trace); + } else { + deliver(trace); } + } else if (record instanceof TimedPeriodRecord) { + processTimeoutQueue(TimeProvider.getCurrentTimestamp()); + periodicFlush(record); + } else if (record instanceof TerminateRecord) { + terminate(); + } else { + deliver(record); } } @@ -88,7 +66,7 @@ public class TracePatternSummarizationFilter implements EventHandler<RecordArray for (final TraceAggregationBuffer traceBuffer : trace2buffer.values()) { if (traceBuffer.getBufferCreatedTimestamp() <= bufferTimeout) { final Trace aggregatedTrace = traceBuffer.getAggregatedTrace(); - sendOutTrace(aggregatedTrace); + deliver(aggregatedTrace); toRemove.add(aggregatedTrace); } } @@ -97,17 +75,9 @@ public class TracePatternSummarizationFilter implements EventHandler<RecordArray } } - private void sendOutTrace(final Trace aggregatedTrace) { - counter.inputObjects(aggregatedTrace); - final long hiseq = ringBuffer.next(); - final RecordEvent valueEvent = ringBuffer.get(hiseq); - valueEvent.setValue(aggregatedTrace); - ringBuffer.publish(hiseq); - } - private void terminate() { for (final TraceAggregationBuffer traceBuffer : trace2buffer.values()) { - sendOutTrace(traceBuffer.getAggregatedTrace()); + deliver(traceBuffer.getAggregatedTrace()); } trace2buffer.clear(); } diff --git a/src/explorviz/hpc_monitoring/reader/TCPReader.java b/src/explorviz/hpc_monitoring/reader/TCPReader.java index 1f1d39e13f5515fca2f224baa5327e452ac8f85c..045976106619a6dea1c8f1c074758323fd671e30 100644 --- a/src/explorviz/hpc_monitoring/reader/TCPReader.java +++ b/src/explorviz/hpc_monitoring/reader/TCPReader.java @@ -15,11 +15,12 @@ import com.lmax.disruptor.dsl.Disruptor; import explorviz.hpc_monitoring.disruptor.RecordArrayEvent; import explorviz.hpc_monitoring.disruptor.RecordArrayEventFactory; -import explorviz.hpc_monitoring.disruptor.RecordEvent; +import explorviz.hpc_monitoring.filter.IReceiver; import explorviz.hpc_monitoring.filter.reconstruction.TraceReconstructionFilter; public final class TCPReader { static final int MESSAGE_BUFFER_SIZE = 131072; + static final int OUTPUT_MESSAGE_BUFFER_SIZE = 16384; private static final int RINGBUFFER_LENGTH = 16; @@ -30,9 +31,9 @@ public final class TCPReader { private final RingBuffer<RecordArrayEvent> ringBuffer; - private final List<Thread> threads = new ArrayList<Thread>(); + private final List<TCPReaderThread> threads = new ArrayList<TCPReaderThread>(); - public TCPReader(final int listeningPort, final EventHandler<RecordEvent> endReceiver) { + public TCPReader(final int listeningPort, final IReceiver endReceiver) { this.listeningPort = listeningPort; final ExecutorService exec = Executors.newCachedThreadPool(); @@ -50,7 +51,8 @@ public final class TCPReader { try { open(); while (active) { - final Thread thread = new TCPReaderThread(serversocket.accept(), ringBuffer); + final TCPReaderThread thread = new TCPReaderThread(serversocket.accept(), + ringBuffer); thread.start(); threads.add(thread); } @@ -75,5 +77,8 @@ public final class TCPReader { public final void terminate(final boolean error) { System.out.println("Shutdown of TCPReader requested."); active = false; + for (final TCPReaderThread thread : threads) { + thread.terminate(); + } } } diff --git a/src/explorviz/hpc_monitoring/reader/TCPReaderThread.java b/src/explorviz/hpc_monitoring/reader/TCPReaderThread.java index ec9533cfea164ce888c1d6d76ce3f45a7256a7fd..4396424788eeee5712c4cd32b4eefafbae7db00e 100644 --- a/src/explorviz/hpc_monitoring/reader/TCPReaderThread.java +++ b/src/explorviz/hpc_monitoring/reader/TCPReaderThread.java @@ -292,12 +292,14 @@ public class TCPReaderThread extends Thread implements IPeriodicTimeSignalReceiv private final void addToRegistry(final int key, final String value) { stringRegistry.put(key, value); - // System.out.println("put key " + key + " value " + value); - checkWaitingMessages(); } private final String getStringFromRegistry(final int id) { return stringRegistry.get(id); } + + public void terminate() { + // TODO + } }