diff --git a/.settings/org.eclipse.jdt.core.prefs b/.settings/org.eclipse.jdt.core.prefs index c40d9e31ece672c7ebf1e342f4c37c8b4324c4f1..f5f092144dc8d9ba25026576ebcbdd36f630fca1 100644 --- a/.settings/org.eclipse.jdt.core.prefs +++ b/.settings/org.eclipse.jdt.core.prefs @@ -266,7 +266,7 @@ org.eclipse.jdt.core.formatter.keep_else_statement_on_same_line=false org.eclipse.jdt.core.formatter.keep_empty_array_initializer_on_one_line=false org.eclipse.jdt.core.formatter.keep_imple_if_on_one_line=false org.eclipse.jdt.core.formatter.keep_then_statement_on_same_line=false -org.eclipse.jdt.core.formatter.lineSplit=80 +org.eclipse.jdt.core.formatter.lineSplit=100 org.eclipse.jdt.core.formatter.never_indent_block_comments_on_first_column=false org.eclipse.jdt.core.formatter.never_indent_line_comments_on_first_column=false org.eclipse.jdt.core.formatter.number_of_blank_lines_at_beginning_of_method_body=0 diff --git a/src/explorviz/hpc_monitoring/connector/TCPConnector.java b/src/explorviz/hpc_monitoring/connector/TCPConnector.java deleted file mode 100644 index e2e3c0945863f50acd23d503cfa6bea63195241f..0000000000000000000000000000000000000000 --- a/src/explorviz/hpc_monitoring/connector/TCPConnector.java +++ /dev/null @@ -1,83 +0,0 @@ -package explorviz.hpc_monitoring.connector; - -import java.io.BufferedOutputStream; -import java.io.IOException; -import java.net.Socket; - -import com.lmax.disruptor.EventHandler; - -import explorviz.hpc_monitoring.disruptor.ByteArrayEvent; - -public class TCPConnector implements EventHandler<ByteArrayEvent> { - private static final int MESSAGE_BUFFER_SIZE = 65536; - - private String providerUrl; - private final int providerPort; - - private Socket socket; - - private BufferedOutputStream bufferedOutputStream; - - public TCPConnector(final String providerUrl, final int providerPort) { - this.providerUrl = providerUrl; - this.providerPort = providerPort; - - try { - connect(providerUrl); - } catch (final IOException e) { - e.printStackTrace(); - } - } - - private void connect(final String provider) throws IOException { - socket = new Socket(providerUrl, providerPort); - bufferedOutputStream = new BufferedOutputStream( - socket.getOutputStream(), MESSAGE_BUFFER_SIZE); - } - - public final void sendMessage(final byte[] message) { - try { - bufferedOutputStream.write(message); - // if (endOfBatch) { - // bufferedOutputStream.flush(); - // } - } catch (final IOException e) { - e.printStackTrace(); - } - } - - public final void cleanup() { - disconnect(); - } - - private void disconnect() { - if (socket.isConnected()) { - try { - socket.close(); - } catch (final IOException e) { - System.out.println(e.toString()); - } - } - } - - public void setProvider(final String provider) { - synchronized (this) { - if (!provider.equals(providerUrl)) { - disconnect(); - try { - connect(provider); - providerUrl = provider; - notifyAll(); - } catch (final IOException e) { - e.printStackTrace(); - } - } - } - } - - @Override - public void onEvent(final ByteArrayEvent event, final long sequence, - final boolean endOfBatch) throws Exception { - sendMessage(event.getValue()); - } -} diff --git a/src/explorviz/hpc_monitoring/filter/reconstruction/TraceBuffer.java b/src/explorviz/hpc_monitoring/filter/reconstruction/TraceBuffer.java index adbffc3155ecb9cf2400ca73723101f64fe78b82..f840b1584c0c0b07307ffdba65207f31a46b2b1e 100644 --- a/src/explorviz/hpc_monitoring/filter/reconstruction/TraceBuffer.java +++ b/src/explorviz/hpc_monitoring/filter/reconstruction/TraceBuffer.java @@ -1,21 +1,20 @@ package explorviz.hpc_monitoring.filter.reconstruction; -import java.io.Serializable; import java.util.Comparator; import java.util.Iterator; import java.util.TreeSet; -import explorviz.hpc_monitoring.record.HostApplicationMetadata; -import explorviz.hpc_monitoring.record.Trace; +import explorviz.hpc_monitoring.record.HostApplicationMetaData; import explorviz.hpc_monitoring.record.event.AbstractOperationEvent; import explorviz.hpc_monitoring.record.event.normal.AfterFailedOperationEvent; import explorviz.hpc_monitoring.record.event.normal.AfterOperationEvent; import explorviz.hpc_monitoring.record.event.normal.BeforeOperationEvent; +import explorviz.hpc_monitoring.record.trace.Trace; public class TraceBuffer { private static final Comparator<AbstractOperationEvent> COMPARATOR = new AbstractOperationEventComperator(); - private HostApplicationMetadata traceMetadata; + private HostApplicationMetaData traceMetadata; private final TreeSet<AbstractOperationEvent> events = new TreeSet<AbstractOperationEvent>( COMPARATOR); @@ -46,8 +45,8 @@ public class TraceBuffer { } if (!events.add(event)) { - System.out.println("Duplicate entry for orderIndex " + orderIndex - + " with traceId " + event.getTraceId()); + System.out.println("Duplicate entry for orderIndex " + orderIndex + " with traceId " + + event.getTraceId()); damaged = true; } } @@ -67,7 +66,7 @@ public class TraceBuffer { return orderIndex; } - public void setTrace(final HostApplicationMetadata trace) { + public void setTrace(final HostApplicationMetaData trace) { if (traceMetadata != null) { damaged = true; return; @@ -80,13 +79,11 @@ public class TraceBuffer { } public final boolean isInvalid() { - return ((openEvents != 0) || ((maxOrderIndex + 1) != events.size()) - || events.isEmpty() || damaged); + return ((openEvents != 0) || ((maxOrderIndex + 1) != events.size()) || events.isEmpty() || damaged); } public final Trace toTrace() { - final AbstractOperationEvent[] arrayEvents = new AbstractOperationEvent[events - .size()]; + final AbstractOperationEvent[] arrayEvents = new AbstractOperationEvent[events.size()]; final Iterator<AbstractOperationEvent> iterator = events.iterator(); int index = 0; while (iterator.hasNext()) { @@ -94,19 +91,15 @@ public class TraceBuffer { index++; } + // TODO set runtimes + return new Trace(traceMetadata, arrayEvents); } - /** - * @author Jan Waller - */ private static final class AbstractOperationEventComperator implements - Comparator<AbstractOperationEvent>, Serializable { - private static final long serialVersionUID = 8920737343446332517L; - + Comparator<AbstractOperationEvent> { @Override - public int compare(final AbstractOperationEvent o1, - final AbstractOperationEvent o2) { + public int compare(final AbstractOperationEvent o1, final AbstractOperationEvent o2) { return o1.getOrderIndex() - o2.getOrderIndex(); } } diff --git a/src/explorviz/hpc_monitoring/filter/reconstruction/TraceReconstructionFilter.java b/src/explorviz/hpc_monitoring/filter/reconstruction/TraceReconstructionFilter.java index d76e7f6e8f99e7f8c80a7940dd49d0da66d374ab..d92be65c869dfc39a509002c8d55864047bda67a 100644 --- a/src/explorviz/hpc_monitoring/filter/reconstruction/TraceReconstructionFilter.java +++ b/src/explorviz/hpc_monitoring/filter/reconstruction/TraceReconstructionFilter.java @@ -1,8 +1,10 @@ package explorviz.hpc_monitoring.filter.reconstruction; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.TreeMap; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -15,22 +17,22 @@ import explorviz.hpc_monitoring.disruptor.RecordEvent; import explorviz.hpc_monitoring.filter.counting.CountingThroughputFilter; import explorviz.hpc_monitoring.filter.reduction.TracePatternSummarizationFilter; import explorviz.hpc_monitoring.reader.IPeriodicTimeSignalReceiver; -import explorviz.hpc_monitoring.reader.TimedReader; -import explorviz.hpc_monitoring.record.HostApplicationMetadata; +import explorviz.hpc_monitoring.reader.TimeSignalReader; +import explorviz.hpc_monitoring.record.HostApplicationMetaData; import explorviz.hpc_monitoring.record.IRecord; -import explorviz.hpc_monitoring.record.Trace; import explorviz.hpc_monitoring.record.event.AbstractOperationEvent; +import explorviz.hpc_monitoring.record.trace.Trace; -public final class TraceReconstructionFilter implements - EventHandler<RecordArrayEvent>, IPeriodicTimeSignalReceiver { +public final class TraceReconstructionFilter implements EventHandler<RecordArrayEvent>, + IPeriodicTimeSignalReceiver { private static final int OUTPUT_MESSAGE_BUFFER_SIZE = 256; private static final CountingThroughputFilter counter = new CountingThroughputFilter( - "Reconstructed traces per second"); + "Reconstructed traces/sec"); private final long maxTraceTimeout; - private final Map<Long, TraceBuffer> traceId2trace = new TreeMap<Long, TraceBuffer>(); + private final Map<Long, TraceBuffer> traceId2trace = new ConcurrentSkipListMap<Long, TraceBuffer>(); private final RingBuffer<RecordArrayEvent> ringBuffer; private IRecord[] outputBuffer = new IRecord[OUTPUT_MESSAGE_BUFFER_SIZE]; @@ -46,30 +48,36 @@ public final class TraceReconstructionFilter implements @SuppressWarnings("unchecked") final EventHandler<RecordArrayEvent>[] eventHandlers = new EventHandler[1]; - eventHandlers[0] = new TracePatternSummarizationFilter(1 * 1000, - endReceiver); + eventHandlers[0] = new TracePatternSummarizationFilter(1 * 1000, endReceiver); disruptor.handleEventsWith(eventHandlers); ringBuffer = disruptor.start(); - new TimedReader(1 * 1000, this).start(); + new TimeSignalReader(1 * 1000, this).start(); } @Override public void periodicTimeSignal(final long timestamp) { - checkForTimeouts(timestamp); // TODO comes from other thread - - // synchronize! - flushOutputBuffer(); + synchronized (this) { + checkForTimeouts(timestamp); + flushOutputBuffer(); + } } private void checkForTimeouts(final long timestamp) { final long traceTimeout = timestamp - maxTraceTimeout; + final List<Long> traceIdsToRemove = new ArrayList<Long>(); + for (final Entry<Long, TraceBuffer> entry : traceId2trace.entrySet()) { final TraceBuffer traceBuffer = entry.getValue(); if (traceBuffer.getMaxLoggingTimestamp() <= traceTimeout) { sendOutInvalidTrace(traceBuffer.toTrace()); - // TODO remove from traceId2trace + traceIdsToRemove.add(entry.getKey()); } } + + for (final Long traceIdToRemove : traceIdsToRemove) { + traceId2trace.remove(traceIdToRemove); + } } private void sendOutValidTrace(final Trace trace) { @@ -78,13 +86,12 @@ public final class TraceReconstructionFilter implements private void sendOutInvalidTrace(final Trace trace) { // putInRingBuffer(trace); // TODO - System.out.println("Invalid trace: " - + trace.getTraceEvents()[0].getTraceId()); + System.out.println("Invalid trace: " + trace.getTraceEvents()[0].getTraceId()); } private void putInRingBuffer(final IRecord message) { counter.inputObjects(message); - synchronized (this) { // TODO remove + synchronized (this) { outputBuffer[outputBufferIndex++] = message; if (outputBufferIndex == OUTPUT_MESSAGE_BUFFER_SIZE) { @@ -94,61 +101,57 @@ public final class TraceReconstructionFilter implements } private void flushOutputBuffer() { - synchronized (this) { // TODO remove - if (outputBufferIndex > 0) { - final long hiseq = ringBuffer.next(); - final RecordArrayEvent valueEvent = ringBuffer.get(hiseq); - valueEvent.setValues(outputBuffer); - ringBuffer.publish(hiseq); - - outputBuffer = new IRecord[OUTPUT_MESSAGE_BUFFER_SIZE]; // TODO - // object - // reusage? - outputBufferIndex = 0; - } + if (outputBufferIndex > 0) { + final long hiseq = ringBuffer.next(); + final RecordArrayEvent valueEvent = ringBuffer.get(hiseq); + valueEvent.setValues(outputBuffer); + valueEvent.setValuesLength(outputBufferIndex); + ringBuffer.publish(hiseq); + + outputBuffer = new IRecord[OUTPUT_MESSAGE_BUFFER_SIZE]; + outputBufferIndex = 0; } } @Override - public void onEvent(final RecordArrayEvent event, final long sequence, - final boolean endOfBatch) throws Exception { - for (final IRecord record : event.getValues()) { // TODO save length in - // event - if (record != null) { - final AbstractOperationEvent abstractOperationEvent = ((AbstractOperationEvent) record); - - final long traceId = abstractOperationEvent.getTraceId(); - final TraceBuffer traceBuffer = getBufferForTraceId( - abstractOperationEvent.getTraceId(), - event.getMetadata()); - traceBuffer.insertEvent(abstractOperationEvent); - - if (traceBuffer.isFinished()) { - traceId2trace.remove(traceId); - sendOutValidTrace(traceBuffer.toTrace()); - } + public void onEvent(final RecordArrayEvent event, final long sequence, final boolean endOfBatch) + throws Exception { + final IRecord[] values = event.getValues(); + final int valuesLength = event.getValuesLength(); + + for (int i = 0; i < valuesLength; i++) { + final IRecord record = values[i]; + final AbstractOperationEvent abstractOperationEvent = ((AbstractOperationEvent) record); + + final long traceId = abstractOperationEvent.getTraceId(); + final TraceBuffer traceBuffer = getBufferForTraceId( + abstractOperationEvent.getTraceId(), event.getMetadata()); + traceBuffer.insertEvent(abstractOperationEvent); + + if (traceBuffer.isFinished()) { + traceId2trace.remove(traceId); + sendOutValidTrace(traceBuffer.toTrace()); } } } private TraceBuffer getBufferForTraceId(final long traceId, - final HostApplicationMetadata metadata) { + final HostApplicationMetaData metadata) { TraceBuffer traceBuffer = traceId2trace.get(traceId); if (traceBuffer == null) { - traceBuffer = new TraceBuffer(); // TODO dont create new - keep old - // ones and reset! - traceBuffer.setTrace(metadata); // TODO reuse... + traceBuffer = new TraceBuffer(); + traceBuffer.setTrace(metadata); traceId2trace.put(traceId, traceBuffer); } return traceBuffer; } public void terminate() { - for (final Object entry : traceId2trace.values()) { - if (entry instanceof TraceBuffer) { - sendOutInvalidTrace(((TraceBuffer) entry).toTrace()); + synchronized (this) { + for (final TraceBuffer entry : traceId2trace.values()) { + sendOutInvalidTrace(entry.toTrace()); } + traceId2trace.clear(); } - traceId2trace.clear(); } } \ No newline at end of file diff --git a/src/explorviz/hpc_monitoring/filter/reduction/TraceAggregationBuffer.java b/src/explorviz/hpc_monitoring/filter/reduction/TraceAggregationBuffer.java index 80e97a53f7a8bdfa1996eb38f3bed8307b4190d0..3d067710eceda254dcf8929f23f56ade3c20b92d 100644 --- a/src/explorviz/hpc_monitoring/filter/reduction/TraceAggregationBuffer.java +++ b/src/explorviz/hpc_monitoring/filter/reduction/TraceAggregationBuffer.java @@ -1,7 +1,7 @@ package explorviz.hpc_monitoring.filter.reduction; -import explorviz.hpc_monitoring.record.Trace; import explorviz.hpc_monitoring.record.event.AbstractOperationEvent; +import explorviz.hpc_monitoring.record.trace.Trace; public class TraceAggregationBuffer { private Trace accumulator; diff --git a/src/explorviz/hpc_monitoring/filter/reduction/TraceComperator.java b/src/explorviz/hpc_monitoring/filter/reduction/TraceComperator.java new file mode 100644 index 0000000000000000000000000000000000000000..549119249350cf0c9de06bb2159a0e20812c2872 --- /dev/null +++ b/src/explorviz/hpc_monitoring/filter/reduction/TraceComperator.java @@ -0,0 +1,58 @@ +package explorviz.hpc_monitoring.filter.reduction; + +import java.util.Comparator; + +import explorviz.hpc_monitoring.record.event.AbstractOperationEvent; +import explorviz.hpc_monitoring.record.event.normal.AfterFailedOperationEvent; +import explorviz.hpc_monitoring.record.trace.Trace; + +public class TraceComperator implements Comparator<Trace> { + @Override + public int compare(final Trace t1, final Trace t2) { + final AbstractOperationEvent[] recordsT1 = t1.getTraceEvents(); + final AbstractOperationEvent[] recordsT2 = t2.getTraceEvents(); + + if ((recordsT1.length - recordsT2.length) != 0) { + return recordsT1.length - recordsT2.length; + } + + final int cmpHostnames = t1.getTraceMetadata().getHostname() + .compareTo(t2.getTraceMetadata().getHostname()); + if (cmpHostnames != 0) { + return cmpHostnames; + } + + final int cmpApplicationNames = t1.getTraceMetadata().getApplication() + .compareTo(t2.getTraceMetadata().getApplication()); + if (cmpApplicationNames != 0) { + return cmpApplicationNames; + } + + for (int i = 0; i < recordsT1.length; i++) { + final AbstractOperationEvent recordT1 = recordsT1[i]; + final AbstractOperationEvent recordT2 = recordsT2[i]; + + final int cmpSignature = recordT1.getOperationSignatureId() + - recordT2.getOperationSignatureId(); + if (cmpSignature != 0) { + return cmpSignature; + } + + final int cmpClass = recordT1.getClass().getName() + .compareTo(recordT2.getClass().getName()); + if (cmpClass != 0) { + return cmpClass; + } + + if (recordT1 instanceof AfterFailedOperationEvent) { + final int cmpError = ((AfterFailedOperationEvent) recordT1).getCause().compareTo( + ((AfterFailedOperationEvent) recordT2).getCause()); + if (cmpError != 0) { + return cmpClass; + } + } + } + + return 0; + } +} diff --git a/src/explorviz/hpc_monitoring/filter/reduction/TracePatternSummarizationFilter.java b/src/explorviz/hpc_monitoring/filter/reduction/TracePatternSummarizationFilter.java index 2ec59f9b42f1b8a2eb545e1d70c7eb60e350d584..0820bd96caed85f5da6f1ecf5241676989604c6c 100644 --- a/src/explorviz/hpc_monitoring/filter/reduction/TracePatternSummarizationFilter.java +++ b/src/explorviz/hpc_monitoring/filter/reduction/TracePatternSummarizationFilter.java @@ -1,10 +1,9 @@ package explorviz.hpc_monitoring.filter.reduction; import java.util.ArrayList; -import java.util.Comparator; import java.util.List; import java.util.Map; -import java.util.TreeMap; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -16,20 +15,20 @@ import explorviz.hpc_monitoring.disruptor.RecordArrayEvent; import explorviz.hpc_monitoring.disruptor.RecordEvent; import explorviz.hpc_monitoring.filter.counting.CountingThroughputFilter; import explorviz.hpc_monitoring.reader.IPeriodicTimeSignalReceiver; -import explorviz.hpc_monitoring.reader.TimedReader; +import explorviz.hpc_monitoring.reader.TimeProvider; +import explorviz.hpc_monitoring.reader.TimeSignalReader; import explorviz.hpc_monitoring.record.IRecord; -import explorviz.hpc_monitoring.record.Trace; -import explorviz.hpc_monitoring.record.event.AbstractOperationEvent; +import explorviz.hpc_monitoring.record.trace.Trace; -public class TracePatternSummarizationFilter implements - EventHandler<RecordArrayEvent>, IPeriodicTimeSignalReceiver { +public class TracePatternSummarizationFilter implements EventHandler<RecordArrayEvent>, + IPeriodicTimeSignalReceiver { private final long maxCollectionDuration; - private final Map<Trace, TraceAggregationBuffer> trace2buffer = new TreeMap<Trace, TraceAggregationBuffer>( + private final Map<Trace, TraceAggregationBuffer> trace2buffer = new ConcurrentSkipListMap<Trace, TraceAggregationBuffer>( new TraceComperator()); private static final CountingThroughputFilter counter = new CountingThroughputFilter( - "Reduced trace results per second"); + "Reduced traces/sec"); private final RingBuffer<RecordEvent> ringBuffer; @@ -48,12 +47,37 @@ public class TracePatternSummarizationFilter implements disruptor.handleEventsWith(eventHandlers); } ringBuffer = disruptor.start(); - new TimedReader(1 * 1000, this).start(); + new TimeSignalReader(1 * 1000, this).start(); + } + + @Override + public void onEvent(final RecordArrayEvent event, final long sequence, final boolean endOfBatch) + throws Exception { + final IRecord[] values = event.getValues(); + final int valuesLength = event.getValuesLength(); + + synchronized (this) { + for (int i = 0; i < valuesLength; i++) { + final IRecord record = values[i]; + insertIntoBuffer((Trace) record); + } + } + } + + private void insertIntoBuffer(final Trace trace) { + TraceAggregationBuffer traceAggregationBuffer = trace2buffer.get(trace); + if (traceAggregationBuffer == null) { + traceAggregationBuffer = new TraceAggregationBuffer(TimeProvider.getCurrentTimestamp()); + trace2buffer.put(trace, traceAggregationBuffer); + } + traceAggregationBuffer.insertTrace(trace); } @Override public void periodicTimeSignal(final long timestamp) { - processTimeoutQueue(timestamp); + synchronized (this) { + processTimeoutQueue(timestamp); + } } private void processTimeoutQueue(final long timestamp) { @@ -73,64 +97,16 @@ public class TracePatternSummarizationFilter implements private void sendOutTrace(final Trace aggregatedTrace) { counter.inputObjects(aggregatedTrace); - putInRingBuffer(aggregatedTrace); - } - - private void putInRingBuffer(final IRecord record) { final long hiseq = ringBuffer.next(); final RecordEvent valueEvent = ringBuffer.get(hiseq); - valueEvent.setValue(record); + valueEvent.setValue(aggregatedTrace); ringBuffer.publish(hiseq); } - @Override - public void onEvent(final RecordArrayEvent event, final long sequence, - final boolean endOfBatch) throws Exception { - for (final IRecord record : event.getValues()) { - if (record != null) { - if (record instanceof Trace) { - insertIntoBuffer((Trace) record); - } - } - } - } - - private void insertIntoBuffer(final Trace trace) { - TraceAggregationBuffer traceAggregationBuffer = trace2buffer.get(trace); - if (traceAggregationBuffer == null) { - traceAggregationBuffer = new TraceAggregationBuffer( - System.currentTimeMillis()); - trace2buffer.put(trace, traceAggregationBuffer); - } - traceAggregationBuffer.insertTrace(trace); - } - public void terminate(final boolean error) { for (final TraceAggregationBuffer traceBuffer : trace2buffer.values()) { sendOutTrace(traceBuffer.getAggregatedTrace()); } trace2buffer.clear(); } - - private static final class TraceComperator implements Comparator<Trace> { - - @Override - public int compare(final Trace t1, final Trace t2) { - final AbstractOperationEvent[] recordsT1 = t1.getTraceEvents(); - final AbstractOperationEvent[] recordsT2 = t2.getTraceEvents(); - - if ((recordsT1.length - recordsT2.length) != 0) { - return recordsT1.length - recordsT2.length; - } - - // final int cmpHostnames = t1.getTraceMetadata().getHostname() - // .compareTo(t2.getTraceMetadata().getHostname()); - // if (cmpHostnames != 0) { - // return cmpHostnames; - // } - - // TODO deep check records - return 0; - } - } } diff --git a/src/explorviz/hpc_monitoring/reader/TCPReader.java b/src/explorviz/hpc_monitoring/reader/TCPReader.java index 4ffd104c417e41cbc719ffce08e2a6b811f79778..9289fe2dc5dfa6376b60c178241fde512eced740 100644 --- a/src/explorviz/hpc_monitoring/reader/TCPReader.java +++ b/src/explorviz/hpc_monitoring/reader/TCPReader.java @@ -2,14 +2,9 @@ package explorviz.hpc_monitoring.reader; import java.io.IOException; import java.net.InetSocketAddress; -import java.nio.BufferUnderflowException; -import java.nio.ByteBuffer; import java.nio.channels.ServerSocketChannel; -import java.nio.channels.SocketChannel; import java.util.ArrayList; import java.util.List; -import java.util.Map; -import java.util.TreeMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -19,36 +14,17 @@ import com.lmax.disruptor.dsl.Disruptor; import explorviz.hpc_monitoring.disruptor.RecordArrayEvent; import explorviz.hpc_monitoring.disruptor.RecordEvent; -import explorviz.hpc_monitoring.filter.counting.CountingThroughputFilter; import explorviz.hpc_monitoring.filter.reconstruction.TraceReconstructionFilter; -import explorviz.hpc_monitoring.record.HostApplicationMetadata; -import explorviz.hpc_monitoring.record.IRecord; -import explorviz.hpc_monitoring.record.event.normal.AfterFailedOperationEvent; -import explorviz.hpc_monitoring.record.event.normal.AfterOperationEvent; -import explorviz.hpc_monitoring.record.event.normal.BeforeOperationEvent; -public final class TCPReader implements IPeriodicTimeSignalReceiver { - private static final int MESSAGE_BUFFER_SIZE = 131072; - private static final int OUTPUT_MESSAGE_BUFFER_SIZE = 16384; - - // Settings +public final class TCPReader { private final int listeningPort; private boolean active = true; private ServerSocketChannel serversocket; - private HostApplicationMetadata hostApplicationMetadata; - // Buffers private final RingBuffer<RecordArrayEvent> ringBuffer; - private IRecord[] outputBuffer = new IRecord[OUTPUT_MESSAGE_BUFFER_SIZE]; - private int outputBufferIndex = 0; - - private static final CountingThroughputFilter counter = new CountingThroughputFilter( - "Records per second"); - private final static Map<Integer, String> stringRegistry = new TreeMap<Integer, String>(); - private final static List<byte[]> waitingForStringMessages = new ArrayList<byte[]>( - 1024); + private final List<Thread> threads = new ArrayList<Thread>(); public TCPReader(final int listeningPort, final EventHandler<RecordEvent> endReceiver) { @@ -60,43 +36,28 @@ public final class TCPReader implements IPeriodicTimeSignalReceiver { @SuppressWarnings("unchecked") final EventHandler<RecordArrayEvent>[] eventHandlers = new EventHandler[1]; - eventHandlers[0] = new TraceReconstructionFilter(1 * 1000 * 1000, - endReceiver); + eventHandlers[0] = new TraceReconstructionFilter(2 * 1000, endReceiver); disruptor.handleEventsWith(eventHandlers); ringBuffer = disruptor.start(); - - new TimedReader(1 * 1000, this).start(); - } - - @Override - public void periodicTimeSignal(final long timestamp) { - synchronized (this) { // TODO remove - flushOutputBuffer(); - } } public final void read() { try { open(); while (active) { - // TODO only one connection! - final ByteBuffer buffer = ByteBuffer - .allocateDirect(MESSAGE_BUFFER_SIZE); - final SocketChannel socketChannel = serversocket.accept(); - while ((socketChannel.read(buffer)) != -1) { - buffer.flip(); - messagesfromByteArray(buffer); - } - - serversocket.close(); + final Thread thread = new TCPReaderThread( + serversocket.accept(), ringBuffer); + thread.start(); + threads.add(thread); } + serversocket.close(); } catch (final IOException ex) { - System.out.println("Error in read() " + ex.toString()); + System.out.println("Error in read() " + ex.getMessage()); } finally { try { serversocket.close(); } catch (final IOException e) { - System.out.println("Error in read()" + e.toString()); + System.out.println("Error in read()" + e.getMessage()); } } } @@ -111,206 +72,4 @@ public final class TCPReader implements IPeriodicTimeSignalReceiver { System.out.println("Shutdown of TCPReader requested."); active = false; } - - private final void messagesfromByteArray(final ByteBuffer buffer) { - while (buffer.remaining() > 0) { - buffer.mark(); - try { - final byte clazzId = buffer.get(); - - switch (clazzId) { - case HostApplicationMetadata.CLAZZ_ID: { - readInTraceMetadata(buffer); - break; - } - case BeforeOperationEvent.CLAZZ_ID: { - readInBeforeOperationEvent(buffer); - break; - } - case AfterFailedOperationEvent.CLAZZ_ID: { - readInAfterFailedOperationEvent(buffer); - break; - } - case AfterOperationEvent.CLAZZ_ID: { - readInAfterOperationEvent(buffer); - break; - } - case 4: { - final int mapId = buffer.getInt(); - final int stringLength = buffer.getInt(); - - final byte[] stringByteArray = new byte[stringLength]; - - buffer.get(stringByteArray); - - addToRegistry(mapId, new String(stringByteArray)); - break; - } - default: { - System.out.println("unknown class id " + clazzId - + " at offset " + (buffer.position() - 4)); - } - } - } catch (final BufferUnderflowException e) { - buffer.reset(); - buffer.compact(); - return; - } - } - - buffer.clear(); - } - - private final void readInTraceMetadata(final ByteBuffer buffer) { - final int hostnameId = buffer.getInt(); - final int applicationId = buffer.getInt(); - - final String hostname = getStringFromRegistry(hostnameId); - final String application = getStringFromRegistry(applicationId); - - if ((hostname != null) && (application != null)) { - hostApplicationMetadata = new HostApplicationMetadata(hostname, - application); - } else { - final byte[] message = new byte[HostApplicationMetadata.BYTE_LENGTH_WITH_CLAZZ_ID]; - buffer.position(buffer.position() - - HostApplicationMetadata.BYTE_LENGTH_WITH_CLAZZ_ID); - buffer.get(message); - putInWaitingMessages(message); - } - } - - private final void readInBeforeOperationEvent(final ByteBuffer buffer) { - final long timestamp = buffer.getLong(); - final long traceId = buffer.getLong(); - final int orderIndex = buffer.getInt(); - final int operationId = buffer.getInt(); - - final String operation = getStringFromRegistry(operationId); - - if (operation != null) { - putInRingBuffer(new BeforeOperationEvent(timestamp, traceId, - orderIndex, operation)); - } else { - final byte[] message = new byte[BeforeOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID]; - buffer.position(buffer.position() - - BeforeOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID); - buffer.get(message); - putInWaitingMessages(message); - } - } - - private final void readInAfterFailedOperationEvent(final ByteBuffer buffer) { - final long timestamp = buffer.getLong(); - final long traceId = buffer.getLong(); - final int orderIndex = buffer.getInt(); - final int operationId = buffer.getInt(); - final int causeId = buffer.getInt(); - - final String operation = getStringFromRegistry(operationId); - final String cause = getStringFromRegistry(causeId); - - if ((operation != null) && (cause != null)) { - putInRingBuffer(new AfterFailedOperationEvent(timestamp, traceId, - orderIndex, operation, cause)); - } else { - final byte[] message = new byte[AfterFailedOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID]; - buffer.position(buffer.position() - - AfterFailedOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID); - buffer.get(message); - putInWaitingMessages(message); - } - } - - private final void readInAfterOperationEvent(final ByteBuffer buffer) { - final long timestamp = buffer.getLong(); - final long traceId = buffer.getLong(); - final int orderIndex = buffer.getInt(); - final int operationId = buffer.getInt(); - - final String operation = getStringFromRegistry(operationId); - if (operation != null) { - putInRingBuffer(new AfterOperationEvent(timestamp, traceId, - orderIndex, operation)); - } else { - final byte[] message = new byte[AfterOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID]; - buffer.position(buffer.position() - - AfterOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID); - buffer.get(message); - putInWaitingMessages(message); - } - - } - - private final void putInRingBuffer(final IRecord message) { - counter.inputObjects(message); - - synchronized (this) { // TODO remove - outputBuffer[outputBufferIndex++] = message; - if (outputBufferIndex == OUTPUT_MESSAGE_BUFFER_SIZE) { - flushOutputBuffer(); - } - } - } - - private void flushOutputBuffer() { - if (outputBufferIndex > 0) { - final long hiseq = ringBuffer.next(); - final RecordArrayEvent valueEvent = ringBuffer.get(hiseq); - valueEvent.setValues(outputBuffer); - valueEvent.setMetadata(hostApplicationMetadata); - ringBuffer.publish(hiseq); - - outputBuffer = new IRecord[OUTPUT_MESSAGE_BUFFER_SIZE]; // TODO - // object - // reusage? - outputBufferIndex = 0; - } - } - - private final void putInWaitingMessages(final byte[] message) { - waitingForStringMessages.add(message); // TODO kill messages if too long - // in queue - } - - private final void checkWaitingMessages() { - final List<byte[]> localWaitingList = new ArrayList<byte[]>(); - for (final byte[] waitingMessage : waitingForStringMessages) { - localWaitingList.add(waitingMessage); - } - waitingForStringMessages.clear(); - - for (final byte[] waitingMessage : localWaitingList) { - final ByteBuffer buffer = ByteBuffer.wrap(waitingMessage); - final byte waitingMessageClazzId = buffer.get(); - switch (waitingMessageClazzId) { - case HostApplicationMetadata.CLAZZ_ID: - readInTraceMetadata(buffer); - break; - case BeforeOperationEvent.CLAZZ_ID: - readInBeforeOperationEvent(buffer); - break; - case AfterFailedOperationEvent.CLAZZ_ID: - readInAfterFailedOperationEvent(buffer); - break; - case AfterOperationEvent.CLAZZ_ID: - readInAfterOperationEvent(buffer); - break; - default: - break; - } - } - } - - private final void addToRegistry(final int key, final String value) { - stringRegistry.put(key, value); - - // System.out.println("put key " + key + " value " + value); - - checkWaitingMessages(); - } - - private final String getStringFromRegistry(final int id) { - return stringRegistry.get(id); - } } diff --git a/src/explorviz/hpc_monitoring/reader/TCPReaderThread.java b/src/explorviz/hpc_monitoring/reader/TCPReaderThread.java new file mode 100644 index 0000000000000000000000000000000000000000..975cd920463e3649927e680c0041c8414af8d327 --- /dev/null +++ b/src/explorviz/hpc_monitoring/reader/TCPReaderThread.java @@ -0,0 +1,266 @@ +package explorviz.hpc_monitoring.reader; + +import java.io.IOException; +import java.nio.BufferUnderflowException; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +import com.lmax.disruptor.RingBuffer; + +import explorviz.hpc_monitoring.disruptor.RecordArrayEvent; +import explorviz.hpc_monitoring.filter.counting.CountingThroughputFilter; +import explorviz.hpc_monitoring.record.HostApplicationMetaData; +import explorviz.hpc_monitoring.record.IRecord; +import explorviz.hpc_monitoring.record.StringRegistryRecord; +import explorviz.hpc_monitoring.record.event.normal.AfterFailedOperationEvent; +import explorviz.hpc_monitoring.record.event.normal.AfterOperationEvent; +import explorviz.hpc_monitoring.record.event.normal.BeforeOperationEvent; + +public class TCPReaderThread extends Thread implements IPeriodicTimeSignalReceiver { + private static final int MESSAGE_BUFFER_SIZE = 131072; + private static final int OUTPUT_MESSAGE_BUFFER_SIZE = 16384; + + private HostApplicationMetaData hostApplicationMetadata; + + private final static Map<Integer, String> stringRegistry = new TreeMap<Integer, String>(); + private final static List<byte[]> waitingForStringMessages = new ArrayList<byte[]>(1024); + + private static final CountingThroughputFilter counter = new CountingThroughputFilter( + "Received records/sec in Reader" + Thread.currentThread().getId()); + + private final SocketChannel socketChannel; + + private final RingBuffer<RecordArrayEvent> ringBuffer; + private IRecord[] outputBuffer = new IRecord[OUTPUT_MESSAGE_BUFFER_SIZE]; + private int outputBufferIndex = 0; + + public TCPReaderThread(final SocketChannel socketChannel, + final RingBuffer<RecordArrayEvent> ringBuffer) { + this.socketChannel = socketChannel; + this.ringBuffer = ringBuffer; + + new TimeSignalReader(1 * 1000, this).start(); + } + + @Override + public void run() { + final ByteBuffer buffer = ByteBuffer.allocateDirect(MESSAGE_BUFFER_SIZE); + try { + while ((socketChannel.read(buffer)) != -1) { + buffer.flip(); + messagesfromByteArray(buffer); + } + } catch (final IOException ex) { + System.out.println("Error in read() " + ex.getMessage()); + } + } + + @Override + public void periodicTimeSignal(final long timestamp) { + synchronized (this) { // TODO remove + flushOutputBuffer(); + } + } + + private final void messagesfromByteArray(final ByteBuffer buffer) { + while (buffer.remaining() > 0) { + buffer.mark(); + try { + final byte clazzId = buffer.get(); + + switch (clazzId) { + case HostApplicationMetaData.CLAZZ_ID: { + readInTraceMetadata(buffer); + break; + } + case BeforeOperationEvent.CLAZZ_ID: { + readInBeforeOperationEvent(buffer); + break; + } + case AfterFailedOperationEvent.CLAZZ_ID: { + readInAfterFailedOperationEvent(buffer); + break; + } + case AfterOperationEvent.CLAZZ_ID: { + readInAfterOperationEvent(buffer); + break; + } + case StringRegistryRecord.CLAZZ_ID: { + final int mapId = buffer.getInt(); + final int stringLength = buffer.getInt(); + + final byte[] stringByteArray = new byte[stringLength]; + + buffer.get(stringByteArray); + + addToRegistry(mapId, new String(stringByteArray)); + break; + } + default: { + System.out.println("unknown class id " + clazzId + " at offset " + + (buffer.position() - 4)); + buffer.reset(); + buffer.compact(); + return; + } + } + } catch (final BufferUnderflowException e) { + buffer.reset(); + buffer.compact(); + return; + } + } + + buffer.clear(); + } + + private final void readInTraceMetadata(final ByteBuffer buffer) { + final int hostnameId = buffer.getInt(); + final int applicationId = buffer.getInt(); + + final String hostname = getStringFromRegistry(hostnameId); + final String application = getStringFromRegistry(applicationId); + + if ((hostname != null) && (application != null)) { + hostApplicationMetadata = new HostApplicationMetaData(hostname, application); + } else { + final byte[] message = new byte[HostApplicationMetaData.BYTE_LENGTH_WITH_CLAZZ_ID]; + buffer.position(buffer.position() - HostApplicationMetaData.BYTE_LENGTH_WITH_CLAZZ_ID); + buffer.get(message); + putInWaitingMessages(message); + } + } + + private final void readInBeforeOperationEvent(final ByteBuffer buffer) { + final long timestamp = buffer.getLong(); + final long traceId = buffer.getLong(); + final int orderIndex = buffer.getInt(); + final int operationId = buffer.getInt(); + + final String operation = getStringFromRegistry(operationId); + + if (operation != null) { + putInRingBuffer(new BeforeOperationEvent(timestamp, traceId, orderIndex, operationId, + operation)); + } else { + final byte[] message = new byte[BeforeOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID]; + buffer.position(buffer.position() - BeforeOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID); + buffer.get(message); + putInWaitingMessages(message); + } + } + + private final void readInAfterFailedOperationEvent(final ByteBuffer buffer) { + final long timestamp = buffer.getLong(); + final long traceId = buffer.getLong(); + final int orderIndex = buffer.getInt(); + final int operationId = buffer.getInt(); + final int causeId = buffer.getInt(); + + final String operation = getStringFromRegistry(operationId); + final String cause = getStringFromRegistry(causeId); + + if ((operation != null) && (cause != null)) { + putInRingBuffer(new AfterFailedOperationEvent(timestamp, traceId, orderIndex, + operationId, operation, cause)); + } else { + final byte[] message = new byte[AfterFailedOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID]; + buffer.position(buffer.position() - AfterFailedOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID); + buffer.get(message); + putInWaitingMessages(message); + } + } + + private final void readInAfterOperationEvent(final ByteBuffer buffer) { + final long timestamp = buffer.getLong(); + final long traceId = buffer.getLong(); + final int orderIndex = buffer.getInt(); + final int operationId = buffer.getInt(); + + final String operation = getStringFromRegistry(operationId); + if (operation != null) { + putInRingBuffer(new AfterOperationEvent(timestamp, traceId, orderIndex, operationId, + operation)); + } else { + final byte[] message = new byte[AfterOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID]; + buffer.position(buffer.position() - AfterOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID); + buffer.get(message); + putInWaitingMessages(message); + } + + } + + private final void putInWaitingMessages(final byte[] message) { + waitingForStringMessages.add(message); + } + + private final void checkWaitingMessages() { + final List<byte[]> localWaitingList = new ArrayList<byte[]>(); + for (final byte[] waitingMessage : waitingForStringMessages) { + localWaitingList.add(waitingMessage); + } + waitingForStringMessages.clear(); + + for (final byte[] waitingMessage : localWaitingList) { + final ByteBuffer buffer = ByteBuffer.wrap(waitingMessage); + final byte waitingMessageClazzId = buffer.get(); + switch (waitingMessageClazzId) { + case HostApplicationMetaData.CLAZZ_ID: + readInTraceMetadata(buffer); + break; + case BeforeOperationEvent.CLAZZ_ID: + readInBeforeOperationEvent(buffer); + break; + case AfterFailedOperationEvent.CLAZZ_ID: + readInAfterFailedOperationEvent(buffer); + break; + case AfterOperationEvent.CLAZZ_ID: + readInAfterOperationEvent(buffer); + break; + default: + break; + } + } + } + + private final void putInRingBuffer(final IRecord message) { + counter.inputObjects(message); + + synchronized (this) { // TODO remove + outputBuffer[outputBufferIndex++] = message; + if (outputBufferIndex == OUTPUT_MESSAGE_BUFFER_SIZE) { + flushOutputBuffer(); + } + } + } + + private void flushOutputBuffer() { + if (outputBufferIndex > 0) { + final long hiseq = ringBuffer.next(); + final RecordArrayEvent valueEvent = ringBuffer.get(hiseq); + valueEvent.setValues(outputBuffer); + valueEvent.setValuesLength(outputBufferIndex); + valueEvent.setMetadata(hostApplicationMetadata); + ringBuffer.publish(hiseq); + + outputBuffer = new IRecord[OUTPUT_MESSAGE_BUFFER_SIZE]; + outputBufferIndex = 0; + } + } + + private final void addToRegistry(final int key, final String value) { + stringRegistry.put(key, value); + + // System.out.println("put key " + key + " value " + value); + + checkWaitingMessages(); + } + + private final String getStringFromRegistry(final int id) { + return stringRegistry.get(id); + } +}