diff --git a/src/META-INF/explorviz.live_trace_processing.default.properties b/src/META-INF/explorviz.live_trace_processing.default.properties index 19bbac24ab80812c9578bb8c4e05eedb47642097..f50fe0b871801200a9e1ed3cd1fef8d2a6634206 100644 --- a/src/META-INF/explorviz.live_trace_processing.default.properties +++ b/src/META-INF/explorviz.live_trace_processing.default.properties @@ -20,4 +20,6 @@ explorviz.live_trace_processing.trace_reconstruction_disruptor_size=32 explorviz.live_trace_processing.trace_reconstruction_buffer_initial_size=128 explorviz.live_trace_processing.trace_summarization_output_buffer_size=64 -explorviz.live_trace_processing.trace_summarization_disruptor_size=16 \ No newline at end of file +explorviz.live_trace_processing.trace_summarization_disruptor_size=16 + +explorviz.live_trace_processing.sending_buffer_size=65536 \ No newline at end of file diff --git a/src/explorviz/live_trace_processing/connector/TCPConnector.java b/src/explorviz/live_trace_processing/connector/TCPConnector.java index b0e79b48677ef9d48e29c1eac85c9df33503c607..5ceb26052a1dd4b6f981dbab5f9cc3a4d9f44669 100644 --- a/src/explorviz/live_trace_processing/connector/TCPConnector.java +++ b/src/explorviz/live_trace_processing/connector/TCPConnector.java @@ -31,6 +31,7 @@ public class TCPConnector extends AbstractSink implements IWriter, IStringRecord private volatile boolean shouldDisconnect = false; public TCPConnector(final String hostname, final int port, final Configuration configuration) { + buffer.clear(); try { setProviderURL(new URL("http://" + hostname + ":" + port)); } catch (final MalformedURLException e) { @@ -99,7 +100,6 @@ public class TCPConnector extends AbstractSink implements IWriter, IStringRecord while (buffer.hasRemaining()) { socketChannel.write(buffer); } - buffer.clear(); doDisconnectIfNessecary(); } catch (final IOException e) { System.out.println("WARNING: Connection was closed - possible data loss"); @@ -107,6 +107,8 @@ public class TCPConnector extends AbstractSink implements IWriter, IStringRecord socketChannel.close(); } catch (final IOException e1) { } + } finally { + buffer.clear(); } } diff --git a/src/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionBuffer.java b/src/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionBuffer.java index 1a6ecbfb7623e0440075753be97139e09c99b288..2626b1667352e9dbb8f507a7ffe383ce0ad052f3 100644 --- a/src/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionBuffer.java +++ b/src/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionBuffer.java @@ -5,14 +5,14 @@ import java.util.List; import explorviz.live_trace_processing.Constants; import explorviz.live_trace_processing.reader.TimeProvider; -import explorviz.live_trace_processing.record.event.AbstractOperationEventRecord; +import explorviz.live_trace_processing.record.event.AbstractEventRecord; import explorviz.live_trace_processing.record.event.normal.AfterFailedOperationEventRecord; import explorviz.live_trace_processing.record.event.normal.AfterOperationEventRecord; import explorviz.live_trace_processing.record.event.normal.BeforeOperationEventRecord; import explorviz.live_trace_processing.record.trace.Trace; class TraceReconstructionBuffer { - private final List<AbstractOperationEventRecord> events = new ArrayList<AbstractOperationEventRecord>( + private final List<AbstractEventRecord> events = new ArrayList<AbstractEventRecord>( Constants.TRACE_RECONSTRUCTION_BUFFER_INITIAL_SIZE); private boolean closeable; @@ -24,7 +24,7 @@ class TraceReconstructionBuffer { private long lastBufferInsert = -1; private int maxOrderIndex = -1; - public final void insertEvent(final AbstractOperationEventRecord event) { + public final void insertEvent(final AbstractEventRecord event) { updatedInThisPeriod = true; final int orderIndex = setMaxOrderIndex(event); @@ -58,7 +58,7 @@ class TraceReconstructionBuffer { lastBufferInsert = TimeProvider.getCurrentTimestamp(); } - private final int setMaxOrderIndex(final AbstractOperationEventRecord event) { + private final int setMaxOrderIndex(final AbstractEventRecord event) { final int orderIndex = event.getOrderIndex(); if (orderIndex > maxOrderIndex) { maxOrderIndex = orderIndex; diff --git a/src/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionFilter.java b/src/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionFilter.java index 6c6917b7cdfaf7d1d917bdd34bec5dbf029655a4..17884300e2fa6496eadc5379d3923c17a8cf75f5 100644 --- a/src/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionFilter.java +++ b/src/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionFilter.java @@ -11,7 +11,7 @@ import explorviz.live_trace_processing.filter.AbstractFilter; import explorviz.live_trace_processing.filter.reduction.ITraceReduction; import explorviz.live_trace_processing.reader.TimeProvider; import explorviz.live_trace_processing.record.IRecord; -import explorviz.live_trace_processing.record.event.AbstractOperationEventRecord; +import explorviz.live_trace_processing.record.event.AbstractEventRecord; import explorviz.live_trace_processing.record.misc.TerminateRecord; import explorviz.live_trace_processing.record.misc.TimedPeriodRecord; import explorviz.live_trace_processing.record.trace.Trace; @@ -30,8 +30,8 @@ public final class TraceReconstructionFilter extends AbstractFilter implements I @Override public final void processRecord(final IRecord record) { - if (record instanceof AbstractOperationEventRecord) { - final AbstractOperationEventRecord abstractOperationEvent = ((AbstractOperationEventRecord) record); + if (record instanceof AbstractEventRecord) { + final AbstractEventRecord abstractOperationEvent = ((AbstractEventRecord) record); final long traceId = abstractOperationEvent.getTraceId(); final TraceReconstructionBuffer traceBuffer = getBufferForTraceId(abstractOperationEvent diff --git a/src/explorviz/live_trace_processing/filter/reduction/summarization/TracePatternSummarizationBuffer.java b/src/explorviz/live_trace_processing/filter/reduction/summarization/TracePatternSummarizationBuffer.java index f77249fab14de85131c127f53e08899963c9d749..046c6e8605b642ee994015035b6fc6493faba8cd 100644 --- a/src/explorviz/live_trace_processing/filter/reduction/summarization/TracePatternSummarizationBuffer.java +++ b/src/explorviz/live_trace_processing/filter/reduction/summarization/TracePatternSummarizationBuffer.java @@ -2,7 +2,7 @@ package explorviz.live_trace_processing.filter.reduction.summarization; import java.util.List; -import explorviz.live_trace_processing.record.event.AbstractOperationEventRecord; +import explorviz.live_trace_processing.record.event.AbstractEventRecord; import explorviz.live_trace_processing.record.trace.Trace; class TracePatternSummarizationBuffer { @@ -25,9 +25,9 @@ class TracePatternSummarizationBuffer { if (accumulator == null) { accumulator = trace; } else { - final List<AbstractOperationEventRecord> aggregatedRecords = accumulator + final List<AbstractEventRecord> aggregatedRecords = accumulator .getTraceEvents(); - final List<AbstractOperationEventRecord> records = trace.getTraceEvents(); + final List<AbstractEventRecord> records = trace.getTraceEvents(); for (int i = 0; i < aggregatedRecords.size(); i++) { aggregatedRecords.get(i).getRuntime().merge(records.get(i).getRuntime()); diff --git a/src/explorviz/live_trace_processing/reader/TCPReaderOneClient.java b/src/explorviz/live_trace_processing/reader/TCPReaderOneClient.java index fcabc9391eb53fad1991a2e7f5255320a3e32d6b..0a3933eef93ea92d75b1c9b10a4a06c2c1af54fd 100644 --- a/src/explorviz/live_trace_processing/reader/TCPReaderOneClient.java +++ b/src/explorviz/live_trace_processing/reader/TCPReaderOneClient.java @@ -10,17 +10,18 @@ import java.util.concurrent.TimeUnit; import com.lmax.disruptor.RingBuffer; import explorviz.live_trace_processing.Constants; +import explorviz.live_trace_processing.IdNotAvailableException; import explorviz.live_trace_processing.StringRegistry; import explorviz.live_trace_processing.filter.RecordArrayEvent; import explorviz.live_trace_processing.record.IRecord; -import explorviz.live_trace_processing.record.event.AbstractOperationEventRecord; -import explorviz.live_trace_processing.record.event.HostApplicationMetaDataRecord; +import explorviz.live_trace_processing.record.event.AbstractEventRecord; import explorviz.live_trace_processing.record.event.normal.AfterFailedOperationEventRecord; import explorviz.live_trace_processing.record.event.normal.AfterOperationEventRecord; import explorviz.live_trace_processing.record.event.normal.BeforeOperationEventRecord; import explorviz.live_trace_processing.record.misc.StringRegistryRecord; import explorviz.live_trace_processing.record.misc.SystemMonitoringRecord; import explorviz.live_trace_processing.record.misc.TimedPeriodRecord; +import explorviz.live_trace_processing.record.trace.HostApplicationMetaDataRecord; import explorviz.live_trace_processing.record.trace.RuntimeStatisticInformation; import explorviz.live_trace_processing.record.trace.Trace; @@ -79,8 +80,8 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec final byte clazzId = buffer.get(); switch (clazzId) { case HostApplicationMetaDataRecord.CLAZZ_ID: { - if (buffer.remaining() >= (HostApplicationMetaDataRecord.BYTE_LENGTH_WITH_CLAZZ_ID - 1)) { - readInTraceMetadata(buffer); + if (buffer.remaining() >= HostApplicationMetaDataRecord.BYTE_LENGTH) { + readInHostApplicationMetaData(buffer); } else { buffer.position(buffer.position() - 1); buffer.compact(); @@ -165,12 +166,18 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec final int eventsLength = buffer.getInt(); final int byteLength = buffer.getInt(); if (buffer.remaining() >= byteLength) { - final List<AbstractOperationEventRecord> events = new ArrayList<AbstractOperationEventRecord>( + final List<AbstractEventRecord> events = new ArrayList<AbstractEventRecord>( eventsLength); for (int i = 0; i < eventsLength; i++) { - final AbstractOperationEventRecord eventRecord = AbstractOperationEventRecord - .createFromByteBuffer(buffer, stringRegistry); - events.add(eventRecord); + AbstractEventRecord eventRecord; + try { + eventRecord = AbstractEventRecord.createFromByteBuffer(buffer, + stringRegistry); + events.add(eventRecord); + } catch (final IdNotAvailableException e) { + // should not happen + e.printStackTrace(); + } } putInRingBuffer(new Trace(events, valid)); @@ -199,21 +206,17 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec buffer.clear(); } - private final void readInTraceMetadata(final ByteBuffer buffer) { + private final void readInHostApplicationMetaData(final ByteBuffer buffer) { final int hostnameId = buffer.getInt(); final int applicationId = buffer.getInt(); - final String hostname = stringRegistry.getStringFromId(hostnameId); - final String application = stringRegistry.getStringFromId(applicationId); + try { + final String hostname = stringRegistry.getStringFromId(hostnameId); + final String application = stringRegistry.getStringFromId(applicationId); - if ((hostname != null) && (application != null)) { hostApplicationMetadata = new HostApplicationMetaDataRecord(hostname, application); - } else { - final byte[] message = new byte[HostApplicationMetaDataRecord.BYTE_LENGTH_WITH_CLAZZ_ID]; - buffer.position(buffer.position() - - HostApplicationMetaDataRecord.BYTE_LENGTH_WITH_CLAZZ_ID); - buffer.get(message); - putInWaitingMessages(message); + } catch (final IdNotAvailableException e) { + putInWaitingMessages(buffer, HostApplicationMetaDataRecord.BYTE_LENGTH_WITH_CLAZZ_ID); } } @@ -223,17 +226,13 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec final int orderIndex = buffer.getInt(); final int operationId = buffer.getInt(); - final String operation = stringRegistry.getStringFromId(operationId); + try { + final String operation = stringRegistry.getStringFromId(operationId); - if (operation != null) { putInRingBuffer(new BeforeOperationEventRecord(timestamp, traceId, orderIndex, operation, hostApplicationMetadata, new RuntimeStatisticInformation(timestamp))); - } else { - final byte[] message = new byte[BeforeOperationEventRecord.BYTE_LENGTH_WITH_CLAZZ_ID]; - buffer.position(buffer.position() - - BeforeOperationEventRecord.BYTE_LENGTH_WITH_CLAZZ_ID); - buffer.get(message); - putInWaitingMessages(message); + } catch (final IdNotAvailableException e) { + putInWaitingMessages(buffer, 1 + 8 + 8 + 4 + 4); } } @@ -244,19 +243,15 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec final int operationId = buffer.getInt(); final int causeId = buffer.getInt(); - final String operation = stringRegistry.getStringFromId(operationId); - final String cause = stringRegistry.getStringFromId(causeId); + try { + final String operation = stringRegistry.getStringFromId(operationId); + final String cause = stringRegistry.getStringFromId(causeId); - if ((operation != null) && (cause != null)) { putInRingBuffer(new AfterFailedOperationEventRecord(timestamp, traceId, orderIndex, operation, cause, hostApplicationMetadata, new RuntimeStatisticInformation( timestamp))); - } else { - final byte[] message = new byte[AfterFailedOperationEventRecord.BYTE_LENGTH_WITH_CLAZZ_ID]; - buffer.position(buffer.position() - - AfterFailedOperationEventRecord.BYTE_LENGTH_WITH_CLAZZ_ID); - buffer.get(message); - putInWaitingMessages(message); + } catch (final IdNotAvailableException e) { + putInWaitingMessages(buffer, 1 + 8 + 8 + 4 + 4 + 4); } } @@ -266,15 +261,13 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec final int orderIndex = buffer.getInt(); final int operationId = buffer.getInt(); - final String operation = stringRegistry.getStringFromId(operationId); - if (operation != null) { + try { + final String operation = stringRegistry.getStringFromId(operationId); + putInRingBuffer(new AfterOperationEventRecord(timestamp, traceId, orderIndex, operation, hostApplicationMetadata, new RuntimeStatisticInformation(timestamp))); - } else { - final byte[] message = new byte[AfterOperationEventRecord.BYTE_LENGTH_WITH_CLAZZ_ID]; - buffer.position(buffer.position() - AfterOperationEventRecord.BYTE_LENGTH_WITH_CLAZZ_ID); - buffer.get(message); - putInWaitingMessages(message); + } catch (final IdNotAvailableException e) { + putInWaitingMessages(buffer, 1 + 8 + 8 + 4 + 4); } } @@ -288,7 +281,10 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec hostApplicationMetadata)); } - private final void putInWaitingMessages(final byte[] message) { + private final void putInWaitingMessages(final ByteBuffer buffer, final int length) { + final byte[] message = new byte[length]; + buffer.position(buffer.position() - length); + buffer.get(message); waitingForStringMessages.add(message); } @@ -304,7 +300,7 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec final byte waitingMessageClazzId = buffer.get(); switch (waitingMessageClazzId) { case HostApplicationMetaDataRecord.CLAZZ_ID: - readInTraceMetadata(buffer); + readInHostApplicationMetaData(buffer); break; case BeforeOperationEventRecord.CLAZZ_ID: readInBeforeOperationEvent(buffer); diff --git a/test/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionBufferTest.java b/test/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionBufferTest.java index bd1df03c04162347ff9b9e2054626f6b356d10b1..3cd10dcbef25f845b22cfca1554799f9032080d4 100644 --- a/test/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionBufferTest.java +++ b/test/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionBufferTest.java @@ -4,8 +4,8 @@ import static org.junit.Assert.assertTrue; import org.junit.Test; -import explorviz.live_trace_processing.record.event.HostApplicationMetaDataRecord; import explorviz.live_trace_processing.record.event.normal.BeforeOperationEventRecord; +import explorviz.live_trace_processing.record.trace.HostApplicationMetaDataRecord; import explorviz.live_trace_processing.record.trace.RuntimeStatisticInformation; public class TraceReconstructionBufferTest {