Skip to content
Snippets Groups Projects
Commit 8cd77fd3 authored by Florian Fittkau's avatar Florian Fittkau
Browse files

record serialization

parent 303de702
No related branches found
No related tags found
No related merge requests found
...@@ -3,6 +3,7 @@ ...@@ -3,6 +3,7 @@
<classpathentry excluding="kieker/analysis/plugin/reader/mq/" kind="src" path="src"/> <classpathentry excluding="kieker/analysis/plugin/reader/mq/" kind="src" path="src"/>
<classpathentry kind="src" path="test"/> <classpathentry kind="src" path="test"/>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/> <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
<classpathentry kind="con" path="org.eclipse.jdt.junit.JUNIT_CONTAINER/4"/>
<classpathentry kind="lib" path="lib/disruptor-3.2.0.jar"> <classpathentry kind="lib" path="lib/disruptor-3.2.0.jar">
<attributes> <attributes>
<attribute name="javadoc_location" value="jar:platform:/resource/worker/lib/disruptor-3.2.0-javadoc.jar!/"/> <attribute name="javadoc_location" value="jar:platform:/resource/worker/lib/disruptor-3.2.0-javadoc.jar!/"/>
......
...@@ -7,7 +7,6 @@ import explorviz.live_trace_processing.record.event.AbstractOperationEventRecord ...@@ -7,7 +7,6 @@ import explorviz.live_trace_processing.record.event.AbstractOperationEventRecord
import explorviz.live_trace_processing.record.event.normal.AfterFailedOperationEventRecord; import explorviz.live_trace_processing.record.event.normal.AfterFailedOperationEventRecord;
import explorviz.live_trace_processing.record.event.normal.AfterOperationEventRecord; import explorviz.live_trace_processing.record.event.normal.AfterOperationEventRecord;
import explorviz.live_trace_processing.record.event.normal.BeforeOperationEventRecord; import explorviz.live_trace_processing.record.event.normal.BeforeOperationEventRecord;
import explorviz.live_trace_processing.record.misc.HostApplicationMetaDataRecord;
import explorviz.live_trace_processing.record.trace.Trace; import explorviz.live_trace_processing.record.trace.Trace;
class TraceReconstructionBuffer { class TraceReconstructionBuffer {
...@@ -16,7 +15,6 @@ class TraceReconstructionBuffer { ...@@ -16,7 +15,6 @@ class TraceReconstructionBuffer {
private static final int INITIAL_EVENT_CAPACITY = 100; private static final int INITIAL_EVENT_CAPACITY = 100;
private HostApplicationMetaDataRecord traceMetadata;
private final List<AbstractOperationEventRecord> events = new ArrayList<AbstractOperationEventRecord>( private final List<AbstractOperationEventRecord> events = new ArrayList<AbstractOperationEventRecord>(
INITIAL_EVENT_CAPACITY); INITIAL_EVENT_CAPACITY);
...@@ -69,14 +67,6 @@ class TraceReconstructionBuffer { ...@@ -69,14 +67,6 @@ class TraceReconstructionBuffer {
return orderIndex; return orderIndex;
} }
public void setTrace(final HostApplicationMetaDataRecord trace) {
if (traceMetadata != null) {
damaged = true;
return;
}
traceMetadata = trace;
}
public final boolean isFinished() { public final boolean isFinished() {
return !isInvalid(); return !isInvalid();
} }
...@@ -87,7 +77,7 @@ class TraceReconstructionBuffer { ...@@ -87,7 +77,7 @@ class TraceReconstructionBuffer {
} }
public final Trace toTrace() { public final Trace toTrace() {
return new Trace(traceMetadata, events, events.get(0).getTraceId()); return new Trace(events, false);
} }
// private static final class AbstractOperationEventComperator implements // private static final class AbstractOperationEventComperator implements
......
...@@ -13,7 +13,6 @@ import explorviz.live_trace_processing.filter.reduction.summarization.TracePatte ...@@ -13,7 +13,6 @@ import explorviz.live_trace_processing.filter.reduction.summarization.TracePatte
import explorviz.live_trace_processing.reader.TimeProvider; import explorviz.live_trace_processing.reader.TimeProvider;
import explorviz.live_trace_processing.record.IRecord; import explorviz.live_trace_processing.record.IRecord;
import explorviz.live_trace_processing.record.event.AbstractOperationEventRecord; import explorviz.live_trace_processing.record.event.AbstractOperationEventRecord;
import explorviz.live_trace_processing.record.misc.HostApplicationMetaDataRecord;
import explorviz.live_trace_processing.record.misc.TerminateRecord; import explorviz.live_trace_processing.record.misc.TerminateRecord;
import explorviz.live_trace_processing.record.misc.TimedPeriodRecord; import explorviz.live_trace_processing.record.misc.TimedPeriodRecord;
import explorviz.live_trace_processing.record.trace.Trace; import explorviz.live_trace_processing.record.trace.Trace;
...@@ -40,9 +39,8 @@ public final class TraceReconstructionFilter extends AbstractFilter { ...@@ -40,9 +39,8 @@ public final class TraceReconstructionFilter extends AbstractFilter {
final AbstractOperationEventRecord abstractOperationEvent = ((AbstractOperationEventRecord) record); final AbstractOperationEventRecord abstractOperationEvent = ((AbstractOperationEventRecord) record);
final long traceId = abstractOperationEvent.getTraceId(); final long traceId = abstractOperationEvent.getTraceId();
final TraceReconstructionBuffer traceBuffer = getBufferForTraceId( final TraceReconstructionBuffer traceBuffer = getBufferForTraceId(abstractOperationEvent
abstractOperationEvent.getTraceId(), .getTraceId());
abstractOperationEvent.getHostApplicationMetadata());
traceBuffer.insertEvent(abstractOperationEvent); traceBuffer.insertEvent(abstractOperationEvent);
if (traceBuffer.isFinished()) { if (traceBuffer.isFinished()) {
...@@ -54,7 +52,7 @@ public final class TraceReconstructionFilter extends AbstractFilter { ...@@ -54,7 +52,7 @@ public final class TraceReconstructionFilter extends AbstractFilter {
if (trace.isValid()) { if (trace.isValid()) {
deliver(trace); deliver(trace);
} else { } else {
getBufferForTraceId(trace.getTraceId(), trace.getHostMetadata()); getBufferForTraceId(trace.getTraceEvents().get(0).getTraceId());
} }
} else if (record instanceof TimedPeriodRecord) { } else if (record instanceof TimedPeriodRecord) {
checkForTimeouts(TimeProvider.getCurrentTimestamp()); checkForTimeouts(TimeProvider.getCurrentTimestamp());
...@@ -66,12 +64,10 @@ public final class TraceReconstructionFilter extends AbstractFilter { ...@@ -66,12 +64,10 @@ public final class TraceReconstructionFilter extends AbstractFilter {
} }
} }
private TraceReconstructionBuffer getBufferForTraceId(final long traceId, private TraceReconstructionBuffer getBufferForTraceId(final long traceId) {
final HostApplicationMetaDataRecord metadata) {
TraceReconstructionBuffer traceBuffer = traceId2trace.get(traceId); TraceReconstructionBuffer traceBuffer = traceId2trace.get(traceId);
if (traceBuffer == null) { if (traceBuffer == null) {
traceBuffer = new TraceReconstructionBuffer(); traceBuffer = new TraceReconstructionBuffer();
traceBuffer.setTrace(metadata);
traceId2trace.put(traceId, traceBuffer); traceId2trace.put(traceId, traceBuffer);
} }
return traceBuffer; return traceBuffer;
......
...@@ -29,14 +29,13 @@ class TracePatternSummarizationBuffer { ...@@ -29,14 +29,13 @@ class TracePatternSummarizationBuffer {
if (accumulator == null) { if (accumulator == null) {
accumulator = trace; accumulator = trace;
} else { } else {
final List<AbstractOperationEventRecord> aggregatedRecords = accumulator.getTraceEvents(); final List<AbstractOperationEventRecord> aggregatedRecords = accumulator
.getTraceEvents();
final List<AbstractOperationEventRecord> records = trace.getTraceEvents(); final List<AbstractOperationEventRecord> records = trace.getTraceEvents();
for (int i = 0; i < aggregatedRecords.size(); i++) { for (int i = 0; i < aggregatedRecords.size(); i++) {
aggregatedRecords.get(i).getRuntime().merge(records.get(i).getRuntime()); aggregatedRecords.get(i).getRuntime().merge(records.get(i).getRuntime());
} }
accumulator.getRuntime().merge(trace.getRuntime());
} }
} }
} }
\ No newline at end of file
...@@ -15,13 +15,14 @@ import explorviz.live_trace_processing.Constants; ...@@ -15,13 +15,14 @@ import explorviz.live_trace_processing.Constants;
import explorviz.live_trace_processing.filter.RecordArrayEvent; import explorviz.live_trace_processing.filter.RecordArrayEvent;
import explorviz.live_trace_processing.filter.counting.CountingThroughputFilter; import explorviz.live_trace_processing.filter.counting.CountingThroughputFilter;
import explorviz.live_trace_processing.record.IRecord; import explorviz.live_trace_processing.record.IRecord;
import explorviz.live_trace_processing.record.event.HostApplicationMetaDataRecord;
import explorviz.live_trace_processing.record.event.normal.AfterFailedOperationEventRecord; import explorviz.live_trace_processing.record.event.normal.AfterFailedOperationEventRecord;
import explorviz.live_trace_processing.record.event.normal.AfterOperationEventRecord; import explorviz.live_trace_processing.record.event.normal.AfterOperationEventRecord;
import explorviz.live_trace_processing.record.event.normal.BeforeOperationEventRecord; import explorviz.live_trace_processing.record.event.normal.BeforeOperationEventRecord;
import explorviz.live_trace_processing.record.misc.HostApplicationMetaDataRecord;
import explorviz.live_trace_processing.record.misc.StringRegistryRecord; import explorviz.live_trace_processing.record.misc.StringRegistryRecord;
import explorviz.live_trace_processing.record.misc.SystemMonitoringRecord; import explorviz.live_trace_processing.record.misc.SystemMonitoringRecord;
import explorviz.live_trace_processing.record.misc.TimedPeriodRecord; import explorviz.live_trace_processing.record.misc.TimedPeriodRecord;
import explorviz.live_trace_processing.record.trace.RuntimeStatisticInformation;
public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalReceiver { public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalReceiver {
private HostApplicationMetaDataRecord hostApplicationMetadata; private HostApplicationMetaDataRecord hostApplicationMetadata;
...@@ -195,7 +196,7 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec ...@@ -195,7 +196,7 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec
if (operation != null) { if (operation != null) {
putInRingBuffer(new BeforeOperationEventRecord(timestamp, traceId, orderIndex, putInRingBuffer(new BeforeOperationEventRecord(timestamp, traceId, orderIndex,
operation, hostApplicationMetadata)); operation, hostApplicationMetadata, new RuntimeStatisticInformation(timestamp)));
} else { } else {
final byte[] message = new byte[BeforeOperationEventRecord.BYTE_LENGTH_WITH_CLAZZ_ID]; final byte[] message = new byte[BeforeOperationEventRecord.BYTE_LENGTH_WITH_CLAZZ_ID];
buffer.position(buffer.position() buffer.position(buffer.position()
...@@ -217,7 +218,8 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec ...@@ -217,7 +218,8 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec
if ((operation != null) && (cause != null)) { if ((operation != null) && (cause != null)) {
putInRingBuffer(new AfterFailedOperationEventRecord(timestamp, traceId, orderIndex, putInRingBuffer(new AfterFailedOperationEventRecord(timestamp, traceId, orderIndex,
operation, cause, hostApplicationMetadata)); operation, cause, hostApplicationMetadata, new RuntimeStatisticInformation(
timestamp)));
} else { } else {
final byte[] message = new byte[AfterFailedOperationEventRecord.BYTE_LENGTH_WITH_CLAZZ_ID]; final byte[] message = new byte[AfterFailedOperationEventRecord.BYTE_LENGTH_WITH_CLAZZ_ID];
buffer.position(buffer.position() buffer.position(buffer.position()
...@@ -236,7 +238,7 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec ...@@ -236,7 +238,7 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec
final String operation = getStringFromRegistry(operationId); final String operation = getStringFromRegistry(operationId);
if (operation != null) { if (operation != null) {
putInRingBuffer(new AfterOperationEventRecord(timestamp, traceId, orderIndex, putInRingBuffer(new AfterOperationEventRecord(timestamp, traceId, orderIndex,
operation, hostApplicationMetadata)); operation, hostApplicationMetadata, new RuntimeStatisticInformation(timestamp)));
} else { } else {
final byte[] message = new byte[AfterOperationEventRecord.BYTE_LENGTH_WITH_CLAZZ_ID]; final byte[] message = new byte[AfterOperationEventRecord.BYTE_LENGTH_WITH_CLAZZ_ID];
buffer.position(buffer.position() - AfterOperationEventRecord.BYTE_LENGTH_WITH_CLAZZ_ID); buffer.position(buffer.position() - AfterOperationEventRecord.BYTE_LENGTH_WITH_CLAZZ_ID);
......
package explorviz.live_trace_processing.filter.reconstruction;
import static org.junit.Assert.assertEquals;
import org.junit.Test;
import explorviz.live_trace_processing.record.event.HostApplicationMetaDataRecord;
import explorviz.live_trace_processing.record.event.normal.BeforeOperationEventRecord;
import explorviz.live_trace_processing.record.trace.RuntimeStatisticInformation;
public class TraceReconstructionBufferTest {
@Test
public void testInsertEvent() throws Exception {
final TraceReconstructionBuffer traceReconstructionBuffer = new TraceReconstructionBuffer();
traceReconstructionBuffer.insertEvent(new BeforeOperationEventRecord(1000, 1, 0, "test",
new HostApplicationMetaDataRecord("testHost", "testApp"),
new RuntimeStatisticInformation(1000)));
assertEquals(1000, traceReconstructionBuffer.getMaxLoggingTimestamp());
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment