diff --git a/src/explorviz/hpc_monitoring/filter/reconstruction/TraceBuffer.java b/src/explorviz/hpc_monitoring/filter/reconstruction/TraceBuffer.java index f840b1584c0c0b07307ffdba65207f31a46b2b1e..d43634830d5640ebe9977c118c4b8885ba5213cb 100644 --- a/src/explorviz/hpc_monitoring/filter/reconstruction/TraceBuffer.java +++ b/src/explorviz/hpc_monitoring/filter/reconstruction/TraceBuffer.java @@ -1,8 +1,7 @@ package explorviz.hpc_monitoring.filter.reconstruction; -import java.util.Comparator; -import java.util.Iterator; -import java.util.TreeSet; +import java.util.ArrayList; +import java.util.List; import explorviz.hpc_monitoring.record.HostApplicationMetaData; import explorviz.hpc_monitoring.record.event.AbstractOperationEvent; @@ -12,22 +11,22 @@ import explorviz.hpc_monitoring.record.event.normal.BeforeOperationEvent; import explorviz.hpc_monitoring.record.trace.Trace; public class TraceBuffer { - private static final Comparator<AbstractOperationEvent> COMPARATOR = new AbstractOperationEventComperator(); + // private static final Comparator<AbstractOperationEvent> COMPARATOR = new + // AbstractOperationEventComperator(); + + private static final int INITIAL_EVENT_CAPACITY = 100; private HostApplicationMetaData traceMetadata; - private final TreeSet<AbstractOperationEvent> events = new TreeSet<AbstractOperationEvent>( - COMPARATOR); + private final List<AbstractOperationEvent> events = new ArrayList<AbstractOperationEvent>( + INITIAL_EVENT_CAPACITY); private boolean closeable; private boolean damaged; private int openEvents; - private int maxOrderIndex = -1; - private long maxLoggingTimestamp = -1; - public final long getMaxLoggingTimestamp() { - return maxLoggingTimestamp; - } + private long maxLoggingTimestamp = -1; + private int maxOrderIndex = -1; public final void insertEvent(final AbstractOperationEvent event) { setMaxLoggingTimestamp(event); @@ -44,13 +43,17 @@ public class TraceBuffer { openEvents--; } - if (!events.add(event)) { + if (!events.add(event)) { // TODO System.out.println("Duplicate entry for orderIndex " + orderIndex + " with traceId " + event.getTraceId()); damaged = true; } } + public final long getMaxLoggingTimestamp() { + return maxLoggingTimestamp; + } + private final void setMaxLoggingTimestamp(final AbstractOperationEvent event) { final long loggingTimestamp = event.getLoggingTimestamp(); if (loggingTimestamp > maxLoggingTimestamp) { @@ -75,32 +78,24 @@ public class TraceBuffer { } public final boolean isFinished() { - return !isInvalid() && closeable; + return !isInvalid(); } public final boolean isInvalid() { - return ((openEvents != 0) || ((maxOrderIndex + 1) != events.size()) || events.isEmpty() || damaged); + return ((openEvents != 0) || events.isEmpty() || ((maxOrderIndex + 1) != events.size()) + || damaged || !closeable); } public final Trace toTrace() { - final AbstractOperationEvent[] arrayEvents = new AbstractOperationEvent[events.size()]; - final Iterator<AbstractOperationEvent> iterator = events.iterator(); - int index = 0; - while (iterator.hasNext()) { - arrayEvents[index] = iterator.next(); - index++; - } - - // TODO set runtimes - - return new Trace(traceMetadata, arrayEvents); + return new Trace(traceMetadata, events); } - private static final class AbstractOperationEventComperator implements - Comparator<AbstractOperationEvent> { - @Override - public int compare(final AbstractOperationEvent o1, final AbstractOperationEvent o2) { - return o1.getOrderIndex() - o2.getOrderIndex(); - } - } + // private static final class AbstractOperationEventComperator implements + // Comparator<AbstractOperationEvent> { + // @Override + // public int compare(final AbstractOperationEvent o1, final + // AbstractOperationEvent o2) { + // return o1.getOrderIndex() - o2.getOrderIndex(); + // } + // } } diff --git a/src/explorviz/hpc_monitoring/filter/reconstruction/TraceReconstructionFilter.java b/src/explorviz/hpc_monitoring/filter/reconstruction/TraceReconstructionFilter.java index 6029fe6c968eba5d7554b442df5316303106d297..797489cfd7998277c8f17448c805a85ec9347787 100644 --- a/src/explorviz/hpc_monitoring/filter/reconstruction/TraceReconstructionFilter.java +++ b/src/explorviz/hpc_monitoring/filter/reconstruction/TraceReconstructionFilter.java @@ -7,15 +7,18 @@ import java.util.Map.Entry; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.dsl.Disruptor; import explorviz.hpc_monitoring.disruptor.RecordArrayEvent; +import explorviz.hpc_monitoring.disruptor.RecordArrayEventFactory; import explorviz.hpc_monitoring.disruptor.RecordEvent; import explorviz.hpc_monitoring.filter.counting.CountingThroughputFilter; import explorviz.hpc_monitoring.filter.reduction.TracePatternSummarizationFilter; +import explorviz.hpc_monitoring.reader.TimeProvider; import explorviz.hpc_monitoring.record.HostApplicationMetaData; import explorviz.hpc_monitoring.record.IRecord; import explorviz.hpc_monitoring.record.TerminateRecord; @@ -25,6 +28,7 @@ import explorviz.hpc_monitoring.record.trace.Trace; public final class TraceReconstructionFilter implements EventHandler<RecordArrayEvent> { private static final int OUTPUT_MESSAGE_BUFFER_SIZE = 256; + private static final int RINGBUFFER_LENGTH = 32; private static final CountingThroughputFilter counter = new CountingThroughputFilter( "Reconstructed traces/sec"); @@ -43,11 +47,12 @@ public final class TraceReconstructionFilter implements EventHandler<RecordArray final ExecutorService exec = Executors.newCachedThreadPool(); final Disruptor<RecordArrayEvent> disruptor = new Disruptor<RecordArrayEvent>( - RecordArrayEvent.EVENT_FACTORY, 32, exec); + new RecordArrayEventFactory(OUTPUT_MESSAGE_BUFFER_SIZE), RINGBUFFER_LENGTH, exec); @SuppressWarnings("unchecked") final EventHandler<RecordArrayEvent>[] eventHandlers = new EventHandler[1]; - eventHandlers[0] = new TracePatternSummarizationFilter(1 * 1000 * 1000, endReceiver); + eventHandlers[0] = new TracePatternSummarizationFilter(TimeUnit.SECONDS.toNanos(1), + endReceiver); disruptor.handleEventsWith(eventHandlers); ringBuffer = disruptor.start(); } @@ -70,12 +75,13 @@ public final class TraceReconstructionFilter implements EventHandler<RecordArray } private void sendOutValidTrace(final Trace trace) { - // putInRingBuffer(trace); + trace.setValid(true); + putInRingBuffer(trace); } private void sendOutInvalidTrace(final Trace trace) { - // putInRingBuffer(trace); - System.out.println("Invalid trace: " + trace.getTraceEvents()[0].getTraceId()); + putInRingBuffer(trace); + System.out.println("Invalid trace: " + trace.getTraceEvents().get(0).getTraceId()); } private void putInRingBuffer(final IRecord message) { @@ -91,11 +97,13 @@ public final class TraceReconstructionFilter implements EventHandler<RecordArray if (outputBufferIndex > 0) { final long hiseq = ringBuffer.next(); final RecordArrayEvent valueEvent = ringBuffer.get(hiseq); + final IRecord[] oldValues = valueEvent.getValues(); valueEvent.setValues(outputBuffer); valueEvent.setValuesLength(outputBufferIndex); ringBuffer.publish(hiseq); - outputBuffer = new IRecord[OUTPUT_MESSAGE_BUFFER_SIZE]; + outputBuffer = oldValues; + outputBufferIndex = 0; } } @@ -121,7 +129,7 @@ public final class TraceReconstructionFilter implements EventHandler<RecordArray sendOutValidTrace(traceBuffer.toTrace()); } } else if (record instanceof TimedPeriodRecord) { - // checkForTimeouts(TimeProvider.getCurrentTimestamp()); TODO + checkForTimeouts(TimeProvider.getCurrentTimestamp()); // TODO outputBuffer[outputBufferIndex++] = record; flushOutputBuffer(); } else if (record instanceof TerminateRecord) { diff --git a/src/explorviz/hpc_monitoring/filter/reduction/TraceAggregationBuffer.java b/src/explorviz/hpc_monitoring/filter/reduction/TraceAggregationBuffer.java index 3d067710eceda254dcf8929f23f56ade3c20b92d..6b90dde65c258c28252706f9cf8d6fb1e4799619 100644 --- a/src/explorviz/hpc_monitoring/filter/reduction/TraceAggregationBuffer.java +++ b/src/explorviz/hpc_monitoring/filter/reduction/TraceAggregationBuffer.java @@ -1,42 +1,42 @@ package explorviz.hpc_monitoring.filter.reduction; +import java.util.List; + import explorviz.hpc_monitoring.record.event.AbstractOperationEvent; import explorviz.hpc_monitoring.record.trace.Trace; public class TraceAggregationBuffer { - private Trace accumulator; - private final long bufferCreatedTimestamp; - - public TraceAggregationBuffer(final long bufferCreatedTimestamp) { - this.bufferCreatedTimestamp = bufferCreatedTimestamp; - } - - public long getBufferCreatedTimestamp() { - return bufferCreatedTimestamp; - } - - public Trace getAggregatedTrace() { - return accumulator; - } - - public void insertTrace(final Trace trace) { - aggregate(trace); - } - - private void aggregate(final Trace trace) { - if (accumulator == null) { - accumulator = trace; - } - else { - final AbstractOperationEvent[] aggregatedRecords = accumulator - .getTraceEvents(); - final AbstractOperationEvent[] records = trace.getTraceEvents(); - for (int i = 0; i < aggregatedRecords.length; i++) { - aggregatedRecords[i].getRuntime() - .merge(records[i].getRuntime()); - } - - accumulator.getRuntime().merge(trace.getRuntime()); - } - } + private Trace accumulator; + private final long bufferCreatedTimestamp; + + public TraceAggregationBuffer(final long bufferCreatedTimestamp) { + this.bufferCreatedTimestamp = bufferCreatedTimestamp; + } + + public long getBufferCreatedTimestamp() { + return bufferCreatedTimestamp; + } + + public Trace getAggregatedTrace() { + return accumulator; + } + + public void insertTrace(final Trace trace) { + aggregate(trace); + } + + private void aggregate(final Trace trace) { + if (accumulator == null) { + accumulator = trace; + } else { + final List<AbstractOperationEvent> aggregatedRecords = accumulator.getTraceEvents(); + final List<AbstractOperationEvent> records = trace.getTraceEvents(); + + for (int i = 0; i < aggregatedRecords.size(); i++) { + aggregatedRecords.get(i).getRuntime().merge(records.get(i).getRuntime()); + } + + accumulator.getRuntime().merge(trace.getRuntime()); + } + } } \ No newline at end of file diff --git a/src/explorviz/hpc_monitoring/filter/reduction/TraceComperator.java b/src/explorviz/hpc_monitoring/filter/reduction/TraceComperator.java index 549119249350cf0c9de06bb2159a0e20812c2872..6fec9317113ac2e0cbe12e718c6d1ed0b367e5cb 100644 --- a/src/explorviz/hpc_monitoring/filter/reduction/TraceComperator.java +++ b/src/explorviz/hpc_monitoring/filter/reduction/TraceComperator.java @@ -1,6 +1,7 @@ package explorviz.hpc_monitoring.filter.reduction; import java.util.Comparator; +import java.util.List; import explorviz.hpc_monitoring.record.event.AbstractOperationEvent; import explorviz.hpc_monitoring.record.event.normal.AfterFailedOperationEvent; @@ -9,11 +10,11 @@ import explorviz.hpc_monitoring.record.trace.Trace; public class TraceComperator implements Comparator<Trace> { @Override public int compare(final Trace t1, final Trace t2) { - final AbstractOperationEvent[] recordsT1 = t1.getTraceEvents(); - final AbstractOperationEvent[] recordsT2 = t2.getTraceEvents(); + final List<AbstractOperationEvent> recordsT1 = t1.getTraceEvents(); + final List<AbstractOperationEvent> recordsT2 = t2.getTraceEvents(); - if ((recordsT1.length - recordsT2.length) != 0) { - return recordsT1.length - recordsT2.length; + if ((recordsT1.size() - recordsT2.size()) != 0) { + return recordsT1.size() - recordsT2.size(); } final int cmpHostnames = t1.getTraceMetadata().getHostname() @@ -28,9 +29,9 @@ public class TraceComperator implements Comparator<Trace> { return cmpApplicationNames; } - for (int i = 0; i < recordsT1.length; i++) { - final AbstractOperationEvent recordT1 = recordsT1[i]; - final AbstractOperationEvent recordT2 = recordsT2[i]; + for (int i = 0; i < recordsT1.size(); i++) { + final AbstractOperationEvent recordT1 = recordsT1.get(i); + final AbstractOperationEvent recordT2 = recordsT2.get(i); final int cmpSignature = recordT1.getOperationSignatureId() - recordT2.getOperationSignatureId(); diff --git a/src/explorviz/hpc_monitoring/filter/reduction/TracePatternSummarizationFilter.java b/src/explorviz/hpc_monitoring/filter/reduction/TracePatternSummarizationFilter.java index f410435095301f0f2551771a73fc8f6a79ff39cf..567cdc8ac964a8dc5ee408bd8df84c617ae496c9 100644 --- a/src/explorviz/hpc_monitoring/filter/reduction/TracePatternSummarizationFilter.java +++ b/src/explorviz/hpc_monitoring/filter/reduction/TracePatternSummarizationFilter.java @@ -21,6 +21,8 @@ import explorviz.hpc_monitoring.record.TimedPeriodRecord; import explorviz.hpc_monitoring.record.trace.Trace; public class TracePatternSummarizationFilter implements EventHandler<RecordArrayEvent> { + private static final int RINGBUFFER_LENGTH = 256; + private final long maxCollectionDuration; private final Map<Trace, TraceAggregationBuffer> trace2buffer = new ConcurrentSkipListMap<Trace, TraceAggregationBuffer>( @@ -37,7 +39,7 @@ public class TracePatternSummarizationFilter implements EventHandler<RecordArray final ExecutorService exec = Executors.newCachedThreadPool(); final Disruptor<RecordEvent> disruptor = new Disruptor<RecordEvent>( - RecordEvent.EVENT_FACTORY, 256, exec); + RecordEvent.EVENT_FACTORY, RINGBUFFER_LENGTH, exec); @SuppressWarnings("unchecked") final EventHandler<RecordEvent>[] eventHandlers = new EventHandler[1]; @@ -57,7 +59,12 @@ public class TracePatternSummarizationFilter implements EventHandler<RecordArray for (int i = 0; i < valuesLength; i++) { final IRecord record = values[i]; if (record instanceof Trace) { - insertIntoBuffer((Trace) record); + final Trace trace = (Trace) record; + if (trace.isValid()) { + insertIntoBuffer(trace); + } else { + sendOutTrace(trace); + } } else if (record instanceof TimedPeriodRecord) { processTimeoutQueue(TimeProvider.getCurrentTimestamp()); } else if (record instanceof TerminateRecord) { diff --git a/src/explorviz/hpc_monitoring/reader/TCPReader.java b/src/explorviz/hpc_monitoring/reader/TCPReader.java index e64d8bb2c88bf615f6b1235c86521f90d0777366..1f1d39e13f5515fca2f224baa5327e452ac8f85c 100644 --- a/src/explorviz/hpc_monitoring/reader/TCPReader.java +++ b/src/explorviz/hpc_monitoring/reader/TCPReader.java @@ -7,16 +7,22 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.dsl.Disruptor; import explorviz.hpc_monitoring.disruptor.RecordArrayEvent; +import explorviz.hpc_monitoring.disruptor.RecordArrayEventFactory; import explorviz.hpc_monitoring.disruptor.RecordEvent; import explorviz.hpc_monitoring.filter.reconstruction.TraceReconstructionFilter; public final class TCPReader { + static final int MESSAGE_BUFFER_SIZE = 131072; + static final int OUTPUT_MESSAGE_BUFFER_SIZE = 16384; + private static final int RINGBUFFER_LENGTH = 16; + private final int listeningPort; private boolean active = true; @@ -31,11 +37,11 @@ public final class TCPReader { final ExecutorService exec = Executors.newCachedThreadPool(); final Disruptor<RecordArrayEvent> disruptor = new Disruptor<RecordArrayEvent>( - RecordArrayEvent.EVENT_FACTORY, 16, exec); + new RecordArrayEventFactory(OUTPUT_MESSAGE_BUFFER_SIZE), RINGBUFFER_LENGTH, exec); @SuppressWarnings("unchecked") final EventHandler<RecordArrayEvent>[] eventHandlers = new EventHandler[1]; - eventHandlers[0] = new TraceReconstructionFilter(2 * 1000 * 1000, endReceiver); + eventHandlers[0] = new TraceReconstructionFilter(TimeUnit.SECONDS.toNanos(2), endReceiver); disruptor.handleEventsWith(eventHandlers); ringBuffer = disruptor.start(); } diff --git a/src/explorviz/hpc_monitoring/reader/TCPReaderThread.java b/src/explorviz/hpc_monitoring/reader/TCPReaderThread.java index d60498e76432f741194820dbb47888e695081dba..ec9533cfea164ce888c1d6d76ce3f45a7256a7fd 100644 --- a/src/explorviz/hpc_monitoring/reader/TCPReaderThread.java +++ b/src/explorviz/hpc_monitoring/reader/TCPReaderThread.java @@ -7,6 +7,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.TreeMap; +import java.util.concurrent.TimeUnit; import com.lmax.disruptor.RingBuffer; @@ -21,9 +22,6 @@ import explorviz.hpc_monitoring.record.event.normal.AfterOperationEvent; import explorviz.hpc_monitoring.record.event.normal.BeforeOperationEvent; public class TCPReaderThread extends Thread implements IPeriodicTimeSignalReceiver { - private static final int MESSAGE_BUFFER_SIZE = 131072; - private static final int OUTPUT_MESSAGE_BUFFER_SIZE = 16384; - private HostApplicationMetaData hostApplicationMetadata; private final static Map<Integer, String> stringRegistry = new TreeMap<Integer, String>(); @@ -33,9 +31,9 @@ public class TCPReaderThread extends Thread implements IPeriodicTimeSignalReceiv "Received records/sec in Reader" + Thread.currentThread().getId()); private final SocketChannel socketChannel; - private final RingBuffer<RecordArrayEvent> ringBuffer; - private IRecord[] outputBuffer = new IRecord[OUTPUT_MESSAGE_BUFFER_SIZE]; + + private IRecord[] outputBuffer = new IRecord[TCPReader.OUTPUT_MESSAGE_BUFFER_SIZE]; private int outputBufferIndex = 0; public TCPReaderThread(final SocketChannel socketChannel, @@ -43,12 +41,12 @@ public class TCPReaderThread extends Thread implements IPeriodicTimeSignalReceiv this.socketChannel = socketChannel; this.ringBuffer = ringBuffer; - new TimeSignalReader(1 * 1000, this).start(); + new TimeSignalReader(TimeUnit.SECONDS.toMillis(1), this).start(); } @Override public void run() { - final ByteBuffer buffer = ByteBuffer.allocateDirect(MESSAGE_BUFFER_SIZE); + final ByteBuffer buffer = ByteBuffer.allocateDirect(TCPReader.MESSAGE_BUFFER_SIZE); try { while ((socketChannel.read(buffer)) != -1) { buffer.flip(); @@ -62,14 +60,16 @@ public class TCPReaderThread extends Thread implements IPeriodicTimeSignalReceiv @Override public void periodicTimeSignal(final long timestamp) { // TODO flush out buffer! - final IRecord[] buffer = new IRecord[1]; - buffer[0] = new TimedPeriodRecord(); final long hiseq = ringBuffer.next(); final RecordArrayEvent valueEvent = ringBuffer.get(hiseq); + final IRecord[] buffer = valueEvent.getValues(); + buffer[0] = new TimedPeriodRecord(); + valueEvent.setValues(buffer); valueEvent.setValuesLength(1); valueEvent.setMetadata(hostApplicationMetadata); + ringBuffer.publish(hiseq); } @@ -267,7 +267,7 @@ public class TCPReaderThread extends Thread implements IPeriodicTimeSignalReceiv counter.inputObjects(message); outputBuffer[outputBufferIndex++] = message; - if (outputBufferIndex == OUTPUT_MESSAGE_BUFFER_SIZE) { + if (outputBufferIndex == TCPReader.OUTPUT_MESSAGE_BUFFER_SIZE) { flushOutputBuffer(); } } @@ -276,12 +276,15 @@ public class TCPReaderThread extends Thread implements IPeriodicTimeSignalReceiv if (outputBufferIndex > 0) { final long hiseq = ringBuffer.next(); final RecordArrayEvent valueEvent = ringBuffer.get(hiseq); + final IRecord[] oldValues = valueEvent.getValues(); + valueEvent.setValues(outputBuffer); valueEvent.setValuesLength(outputBufferIndex); valueEvent.setMetadata(hostApplicationMetadata); ringBuffer.publish(hiseq); - outputBuffer = new IRecord[OUTPUT_MESSAGE_BUFFER_SIZE]; + outputBuffer = oldValues; + outputBufferIndex = 0; } }