Skip to content
Snippets Groups Projects
Commit 9256ae10 authored by Florian Fittkau's avatar Florian Fittkau
Browse files

new monitoring records

parent 342be7df
No related branches found
No related tags found
No related merge requests found
...@@ -20,4 +20,6 @@ explorviz.live_trace_processing.trace_reconstruction_disruptor_size=32 ...@@ -20,4 +20,6 @@ explorviz.live_trace_processing.trace_reconstruction_disruptor_size=32
explorviz.live_trace_processing.trace_reconstruction_buffer_initial_size=128 explorviz.live_trace_processing.trace_reconstruction_buffer_initial_size=128
explorviz.live_trace_processing.trace_summarization_output_buffer_size=64 explorviz.live_trace_processing.trace_summarization_output_buffer_size=64
explorviz.live_trace_processing.trace_summarization_disruptor_size=16 explorviz.live_trace_processing.trace_summarization_disruptor_size=16
\ No newline at end of file
explorviz.live_trace_processing.sending_buffer_size=65536
\ No newline at end of file
...@@ -31,6 +31,7 @@ public class TCPConnector extends AbstractSink implements IWriter, IStringRecord ...@@ -31,6 +31,7 @@ public class TCPConnector extends AbstractSink implements IWriter, IStringRecord
private volatile boolean shouldDisconnect = false; private volatile boolean shouldDisconnect = false;
public TCPConnector(final String hostname, final int port, final Configuration configuration) { public TCPConnector(final String hostname, final int port, final Configuration configuration) {
buffer.clear();
try { try {
setProviderURL(new URL("http://" + hostname + ":" + port)); setProviderURL(new URL("http://" + hostname + ":" + port));
} catch (final MalformedURLException e) { } catch (final MalformedURLException e) {
...@@ -99,7 +100,6 @@ public class TCPConnector extends AbstractSink implements IWriter, IStringRecord ...@@ -99,7 +100,6 @@ public class TCPConnector extends AbstractSink implements IWriter, IStringRecord
while (buffer.hasRemaining()) { while (buffer.hasRemaining()) {
socketChannel.write(buffer); socketChannel.write(buffer);
} }
buffer.clear();
doDisconnectIfNessecary(); doDisconnectIfNessecary();
} catch (final IOException e) { } catch (final IOException e) {
System.out.println("WARNING: Connection was closed - possible data loss"); System.out.println("WARNING: Connection was closed - possible data loss");
...@@ -107,6 +107,8 @@ public class TCPConnector extends AbstractSink implements IWriter, IStringRecord ...@@ -107,6 +107,8 @@ public class TCPConnector extends AbstractSink implements IWriter, IStringRecord
socketChannel.close(); socketChannel.close();
} catch (final IOException e1) { } catch (final IOException e1) {
} }
} finally {
buffer.clear();
} }
} }
......
...@@ -5,14 +5,14 @@ import java.util.List; ...@@ -5,14 +5,14 @@ import java.util.List;
import explorviz.live_trace_processing.Constants; import explorviz.live_trace_processing.Constants;
import explorviz.live_trace_processing.reader.TimeProvider; import explorviz.live_trace_processing.reader.TimeProvider;
import explorviz.live_trace_processing.record.event.AbstractOperationEventRecord; import explorviz.live_trace_processing.record.event.AbstractEventRecord;
import explorviz.live_trace_processing.record.event.normal.AfterFailedOperationEventRecord; import explorviz.live_trace_processing.record.event.normal.AfterFailedOperationEventRecord;
import explorviz.live_trace_processing.record.event.normal.AfterOperationEventRecord; import explorviz.live_trace_processing.record.event.normal.AfterOperationEventRecord;
import explorviz.live_trace_processing.record.event.normal.BeforeOperationEventRecord; import explorviz.live_trace_processing.record.event.normal.BeforeOperationEventRecord;
import explorviz.live_trace_processing.record.trace.Trace; import explorviz.live_trace_processing.record.trace.Trace;
class TraceReconstructionBuffer { class TraceReconstructionBuffer {
private final List<AbstractOperationEventRecord> events = new ArrayList<AbstractOperationEventRecord>( private final List<AbstractEventRecord> events = new ArrayList<AbstractEventRecord>(
Constants.TRACE_RECONSTRUCTION_BUFFER_INITIAL_SIZE); Constants.TRACE_RECONSTRUCTION_BUFFER_INITIAL_SIZE);
private boolean closeable; private boolean closeable;
...@@ -24,7 +24,7 @@ class TraceReconstructionBuffer { ...@@ -24,7 +24,7 @@ class TraceReconstructionBuffer {
private long lastBufferInsert = -1; private long lastBufferInsert = -1;
private int maxOrderIndex = -1; private int maxOrderIndex = -1;
public final void insertEvent(final AbstractOperationEventRecord event) { public final void insertEvent(final AbstractEventRecord event) {
updatedInThisPeriod = true; updatedInThisPeriod = true;
final int orderIndex = setMaxOrderIndex(event); final int orderIndex = setMaxOrderIndex(event);
...@@ -58,7 +58,7 @@ class TraceReconstructionBuffer { ...@@ -58,7 +58,7 @@ class TraceReconstructionBuffer {
lastBufferInsert = TimeProvider.getCurrentTimestamp(); lastBufferInsert = TimeProvider.getCurrentTimestamp();
} }
private final int setMaxOrderIndex(final AbstractOperationEventRecord event) { private final int setMaxOrderIndex(final AbstractEventRecord event) {
final int orderIndex = event.getOrderIndex(); final int orderIndex = event.getOrderIndex();
if (orderIndex > maxOrderIndex) { if (orderIndex > maxOrderIndex) {
maxOrderIndex = orderIndex; maxOrderIndex = orderIndex;
......
...@@ -11,7 +11,7 @@ import explorviz.live_trace_processing.filter.AbstractFilter; ...@@ -11,7 +11,7 @@ import explorviz.live_trace_processing.filter.AbstractFilter;
import explorviz.live_trace_processing.filter.reduction.ITraceReduction; import explorviz.live_trace_processing.filter.reduction.ITraceReduction;
import explorviz.live_trace_processing.reader.TimeProvider; import explorviz.live_trace_processing.reader.TimeProvider;
import explorviz.live_trace_processing.record.IRecord; import explorviz.live_trace_processing.record.IRecord;
import explorviz.live_trace_processing.record.event.AbstractOperationEventRecord; import explorviz.live_trace_processing.record.event.AbstractEventRecord;
import explorviz.live_trace_processing.record.misc.TerminateRecord; import explorviz.live_trace_processing.record.misc.TerminateRecord;
import explorviz.live_trace_processing.record.misc.TimedPeriodRecord; import explorviz.live_trace_processing.record.misc.TimedPeriodRecord;
import explorviz.live_trace_processing.record.trace.Trace; import explorviz.live_trace_processing.record.trace.Trace;
...@@ -30,8 +30,8 @@ public final class TraceReconstructionFilter extends AbstractFilter implements I ...@@ -30,8 +30,8 @@ public final class TraceReconstructionFilter extends AbstractFilter implements I
@Override @Override
public final void processRecord(final IRecord record) { public final void processRecord(final IRecord record) {
if (record instanceof AbstractOperationEventRecord) { if (record instanceof AbstractEventRecord) {
final AbstractOperationEventRecord abstractOperationEvent = ((AbstractOperationEventRecord) record); final AbstractEventRecord abstractOperationEvent = ((AbstractEventRecord) record);
final long traceId = abstractOperationEvent.getTraceId(); final long traceId = abstractOperationEvent.getTraceId();
final TraceReconstructionBuffer traceBuffer = getBufferForTraceId(abstractOperationEvent final TraceReconstructionBuffer traceBuffer = getBufferForTraceId(abstractOperationEvent
......
...@@ -2,7 +2,7 @@ package explorviz.live_trace_processing.filter.reduction.summarization; ...@@ -2,7 +2,7 @@ package explorviz.live_trace_processing.filter.reduction.summarization;
import java.util.List; import java.util.List;
import explorviz.live_trace_processing.record.event.AbstractOperationEventRecord; import explorviz.live_trace_processing.record.event.AbstractEventRecord;
import explorviz.live_trace_processing.record.trace.Trace; import explorviz.live_trace_processing.record.trace.Trace;
class TracePatternSummarizationBuffer { class TracePatternSummarizationBuffer {
...@@ -25,9 +25,9 @@ class TracePatternSummarizationBuffer { ...@@ -25,9 +25,9 @@ class TracePatternSummarizationBuffer {
if (accumulator == null) { if (accumulator == null) {
accumulator = trace; accumulator = trace;
} else { } else {
final List<AbstractOperationEventRecord> aggregatedRecords = accumulator final List<AbstractEventRecord> aggregatedRecords = accumulator
.getTraceEvents(); .getTraceEvents();
final List<AbstractOperationEventRecord> records = trace.getTraceEvents(); final List<AbstractEventRecord> records = trace.getTraceEvents();
for (int i = 0; i < aggregatedRecords.size(); i++) { for (int i = 0; i < aggregatedRecords.size(); i++) {
aggregatedRecords.get(i).getRuntime().merge(records.get(i).getRuntime()); aggregatedRecords.get(i).getRuntime().merge(records.get(i).getRuntime());
......
...@@ -10,17 +10,18 @@ import java.util.concurrent.TimeUnit; ...@@ -10,17 +10,18 @@ import java.util.concurrent.TimeUnit;
import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.RingBuffer;
import explorviz.live_trace_processing.Constants; import explorviz.live_trace_processing.Constants;
import explorviz.live_trace_processing.IdNotAvailableException;
import explorviz.live_trace_processing.StringRegistry; import explorviz.live_trace_processing.StringRegistry;
import explorviz.live_trace_processing.filter.RecordArrayEvent; import explorviz.live_trace_processing.filter.RecordArrayEvent;
import explorviz.live_trace_processing.record.IRecord; import explorviz.live_trace_processing.record.IRecord;
import explorviz.live_trace_processing.record.event.AbstractOperationEventRecord; import explorviz.live_trace_processing.record.event.AbstractEventRecord;
import explorviz.live_trace_processing.record.event.HostApplicationMetaDataRecord;
import explorviz.live_trace_processing.record.event.normal.AfterFailedOperationEventRecord; import explorviz.live_trace_processing.record.event.normal.AfterFailedOperationEventRecord;
import explorviz.live_trace_processing.record.event.normal.AfterOperationEventRecord; import explorviz.live_trace_processing.record.event.normal.AfterOperationEventRecord;
import explorviz.live_trace_processing.record.event.normal.BeforeOperationEventRecord; import explorviz.live_trace_processing.record.event.normal.BeforeOperationEventRecord;
import explorviz.live_trace_processing.record.misc.StringRegistryRecord; import explorviz.live_trace_processing.record.misc.StringRegistryRecord;
import explorviz.live_trace_processing.record.misc.SystemMonitoringRecord; import explorviz.live_trace_processing.record.misc.SystemMonitoringRecord;
import explorviz.live_trace_processing.record.misc.TimedPeriodRecord; import explorviz.live_trace_processing.record.misc.TimedPeriodRecord;
import explorviz.live_trace_processing.record.trace.HostApplicationMetaDataRecord;
import explorviz.live_trace_processing.record.trace.RuntimeStatisticInformation; import explorviz.live_trace_processing.record.trace.RuntimeStatisticInformation;
import explorviz.live_trace_processing.record.trace.Trace; import explorviz.live_trace_processing.record.trace.Trace;
...@@ -79,8 +80,8 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec ...@@ -79,8 +80,8 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec
final byte clazzId = buffer.get(); final byte clazzId = buffer.get();
switch (clazzId) { switch (clazzId) {
case HostApplicationMetaDataRecord.CLAZZ_ID: { case HostApplicationMetaDataRecord.CLAZZ_ID: {
if (buffer.remaining() >= (HostApplicationMetaDataRecord.BYTE_LENGTH_WITH_CLAZZ_ID - 1)) { if (buffer.remaining() >= HostApplicationMetaDataRecord.BYTE_LENGTH) {
readInTraceMetadata(buffer); readInHostApplicationMetaData(buffer);
} else { } else {
buffer.position(buffer.position() - 1); buffer.position(buffer.position() - 1);
buffer.compact(); buffer.compact();
...@@ -165,12 +166,18 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec ...@@ -165,12 +166,18 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec
final int eventsLength = buffer.getInt(); final int eventsLength = buffer.getInt();
final int byteLength = buffer.getInt(); final int byteLength = buffer.getInt();
if (buffer.remaining() >= byteLength) { if (buffer.remaining() >= byteLength) {
final List<AbstractOperationEventRecord> events = new ArrayList<AbstractOperationEventRecord>( final List<AbstractEventRecord> events = new ArrayList<AbstractEventRecord>(
eventsLength); eventsLength);
for (int i = 0; i < eventsLength; i++) { for (int i = 0; i < eventsLength; i++) {
final AbstractOperationEventRecord eventRecord = AbstractOperationEventRecord AbstractEventRecord eventRecord;
.createFromByteBuffer(buffer, stringRegistry); try {
events.add(eventRecord); eventRecord = AbstractEventRecord.createFromByteBuffer(buffer,
stringRegistry);
events.add(eventRecord);
} catch (final IdNotAvailableException e) {
// should not happen
e.printStackTrace();
}
} }
putInRingBuffer(new Trace(events, valid)); putInRingBuffer(new Trace(events, valid));
...@@ -199,21 +206,17 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec ...@@ -199,21 +206,17 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec
buffer.clear(); buffer.clear();
} }
private final void readInTraceMetadata(final ByteBuffer buffer) { private final void readInHostApplicationMetaData(final ByteBuffer buffer) {
final int hostnameId = buffer.getInt(); final int hostnameId = buffer.getInt();
final int applicationId = buffer.getInt(); final int applicationId = buffer.getInt();
final String hostname = stringRegistry.getStringFromId(hostnameId); try {
final String application = stringRegistry.getStringFromId(applicationId); final String hostname = stringRegistry.getStringFromId(hostnameId);
final String application = stringRegistry.getStringFromId(applicationId);
if ((hostname != null) && (application != null)) {
hostApplicationMetadata = new HostApplicationMetaDataRecord(hostname, application); hostApplicationMetadata = new HostApplicationMetaDataRecord(hostname, application);
} else { } catch (final IdNotAvailableException e) {
final byte[] message = new byte[HostApplicationMetaDataRecord.BYTE_LENGTH_WITH_CLAZZ_ID]; putInWaitingMessages(buffer, HostApplicationMetaDataRecord.BYTE_LENGTH_WITH_CLAZZ_ID);
buffer.position(buffer.position()
- HostApplicationMetaDataRecord.BYTE_LENGTH_WITH_CLAZZ_ID);
buffer.get(message);
putInWaitingMessages(message);
} }
} }
...@@ -223,17 +226,13 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec ...@@ -223,17 +226,13 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec
final int orderIndex = buffer.getInt(); final int orderIndex = buffer.getInt();
final int operationId = buffer.getInt(); final int operationId = buffer.getInt();
final String operation = stringRegistry.getStringFromId(operationId); try {
final String operation = stringRegistry.getStringFromId(operationId);
if (operation != null) {
putInRingBuffer(new BeforeOperationEventRecord(timestamp, traceId, orderIndex, putInRingBuffer(new BeforeOperationEventRecord(timestamp, traceId, orderIndex,
operation, hostApplicationMetadata, new RuntimeStatisticInformation(timestamp))); operation, hostApplicationMetadata, new RuntimeStatisticInformation(timestamp)));
} else { } catch (final IdNotAvailableException e) {
final byte[] message = new byte[BeforeOperationEventRecord.BYTE_LENGTH_WITH_CLAZZ_ID]; putInWaitingMessages(buffer, 1 + 8 + 8 + 4 + 4);
buffer.position(buffer.position()
- BeforeOperationEventRecord.BYTE_LENGTH_WITH_CLAZZ_ID);
buffer.get(message);
putInWaitingMessages(message);
} }
} }
...@@ -244,19 +243,15 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec ...@@ -244,19 +243,15 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec
final int operationId = buffer.getInt(); final int operationId = buffer.getInt();
final int causeId = buffer.getInt(); final int causeId = buffer.getInt();
final String operation = stringRegistry.getStringFromId(operationId); try {
final String cause = stringRegistry.getStringFromId(causeId); final String operation = stringRegistry.getStringFromId(operationId);
final String cause = stringRegistry.getStringFromId(causeId);
if ((operation != null) && (cause != null)) {
putInRingBuffer(new AfterFailedOperationEventRecord(timestamp, traceId, orderIndex, putInRingBuffer(new AfterFailedOperationEventRecord(timestamp, traceId, orderIndex,
operation, cause, hostApplicationMetadata, new RuntimeStatisticInformation( operation, cause, hostApplicationMetadata, new RuntimeStatisticInformation(
timestamp))); timestamp)));
} else { } catch (final IdNotAvailableException e) {
final byte[] message = new byte[AfterFailedOperationEventRecord.BYTE_LENGTH_WITH_CLAZZ_ID]; putInWaitingMessages(buffer, 1 + 8 + 8 + 4 + 4 + 4);
buffer.position(buffer.position()
- AfterFailedOperationEventRecord.BYTE_LENGTH_WITH_CLAZZ_ID);
buffer.get(message);
putInWaitingMessages(message);
} }
} }
...@@ -266,15 +261,13 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec ...@@ -266,15 +261,13 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec
final int orderIndex = buffer.getInt(); final int orderIndex = buffer.getInt();
final int operationId = buffer.getInt(); final int operationId = buffer.getInt();
final String operation = stringRegistry.getStringFromId(operationId); try {
if (operation != null) { final String operation = stringRegistry.getStringFromId(operationId);
putInRingBuffer(new AfterOperationEventRecord(timestamp, traceId, orderIndex, putInRingBuffer(new AfterOperationEventRecord(timestamp, traceId, orderIndex,
operation, hostApplicationMetadata, new RuntimeStatisticInformation(timestamp))); operation, hostApplicationMetadata, new RuntimeStatisticInformation(timestamp)));
} else { } catch (final IdNotAvailableException e) {
final byte[] message = new byte[AfterOperationEventRecord.BYTE_LENGTH_WITH_CLAZZ_ID]; putInWaitingMessages(buffer, 1 + 8 + 8 + 4 + 4);
buffer.position(buffer.position() - AfterOperationEventRecord.BYTE_LENGTH_WITH_CLAZZ_ID);
buffer.get(message);
putInWaitingMessages(message);
} }
} }
...@@ -288,7 +281,10 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec ...@@ -288,7 +281,10 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec
hostApplicationMetadata)); hostApplicationMetadata));
} }
private final void putInWaitingMessages(final byte[] message) { private final void putInWaitingMessages(final ByteBuffer buffer, final int length) {
final byte[] message = new byte[length];
buffer.position(buffer.position() - length);
buffer.get(message);
waitingForStringMessages.add(message); waitingForStringMessages.add(message);
} }
...@@ -304,7 +300,7 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec ...@@ -304,7 +300,7 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec
final byte waitingMessageClazzId = buffer.get(); final byte waitingMessageClazzId = buffer.get();
switch (waitingMessageClazzId) { switch (waitingMessageClazzId) {
case HostApplicationMetaDataRecord.CLAZZ_ID: case HostApplicationMetaDataRecord.CLAZZ_ID:
readInTraceMetadata(buffer); readInHostApplicationMetaData(buffer);
break; break;
case BeforeOperationEventRecord.CLAZZ_ID: case BeforeOperationEventRecord.CLAZZ_ID:
readInBeforeOperationEvent(buffer); readInBeforeOperationEvent(buffer);
......
...@@ -4,8 +4,8 @@ import static org.junit.Assert.assertTrue; ...@@ -4,8 +4,8 @@ import static org.junit.Assert.assertTrue;
import org.junit.Test; import org.junit.Test;
import explorviz.live_trace_processing.record.event.HostApplicationMetaDataRecord;
import explorviz.live_trace_processing.record.event.normal.BeforeOperationEventRecord; import explorviz.live_trace_processing.record.event.normal.BeforeOperationEventRecord;
import explorviz.live_trace_processing.record.trace.HostApplicationMetaDataRecord;
import explorviz.live_trace_processing.record.trace.RuntimeStatisticInformation; import explorviz.live_trace_processing.record.trace.RuntimeStatisticInformation;
public class TraceReconstructionBufferTest { public class TraceReconstructionBufferTest {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment