Skip to content
Snippets Groups Projects
Commit 7eefccd4 authored by Florian Fittkau's avatar Florian Fittkau
Browse files

WiP

parent 1dc8d1bf
No related branches found
No related tags found
No related merge requests found
...@@ -9,7 +9,7 @@ explorviz.live_trace_processing.writer_load_balancing_port=9999 ...@@ -9,7 +9,7 @@ explorviz.live_trace_processing.writer_load_balancing_port=9999
explorviz.live_trace_processing.writer_load_balancing_wait_time=20000 explorviz.live_trace_processing.writer_load_balancing_wait_time=20000
explorviz.live_trace_processing.writer_load_balancing_scaling_group=analysis-worker explorviz.live_trace_processing.writer_load_balancing_scaling_group=analysis-worker
explorviz.live_trace_processing.sending_buffer_size=16777216 explorviz.live_trace_processing.sending_buffer_size=2097152
######################## Monitoring ######################## ######################## Monitoring ########################
...@@ -38,7 +38,7 @@ explorviz.live_trace_processing.reader_listening_port=10133 ...@@ -38,7 +38,7 @@ explorviz.live_trace_processing.reader_listening_port=10133
explorviz.live_trace_processing.tcp_reader_disruptor_size=16384 explorviz.live_trace_processing.tcp_reader_disruptor_size=16384
explorviz.live_trace_processing.trace_reconstruction_disruptor_size=4096 explorviz.live_trace_processing.trace_reconstruction_disruptor_size=256
explorviz.live_trace_processing.trace_reconstruction_buffer_initial_size=128 explorviz.live_trace_processing.trace_reconstruction_buffer_initial_size=128
explorviz.live_trace_processing.trace_summarization_disruptor_size=256 explorviz.live_trace_processing.trace_summarization_disruptor_size=64
...@@ -5,6 +5,8 @@ import java.util.Queue; ...@@ -5,6 +5,8 @@ import java.util.Queue;
import explorviz.live_trace_processing.filter.AbstractFilter; import explorviz.live_trace_processing.filter.AbstractFilter;
import explorviz.live_trace_processing.filter.SinglePipeConnector; import explorviz.live_trace_processing.filter.SinglePipeConnector;
import explorviz.live_trace_processing.record.IRecord; import explorviz.live_trace_processing.record.IRecord;
import explorviz.live_trace_processing.record.event.AbstractBeforeOperationEventRecord;
import explorviz.live_trace_processing.record.event.AbstractEventRecord;
import explorviz.live_trace_processing.record.misc.TerminateRecord; import explorviz.live_trace_processing.record.misc.TerminateRecord;
import explorviz.live_trace_processing.record.misc.TimedPeriodRecord; import explorviz.live_trace_processing.record.misc.TimedPeriodRecord;
import explorviz.live_trace_processing.record.trace.Trace; import explorviz.live_trace_processing.record.trace.Trace;
...@@ -29,7 +31,14 @@ public class TraceCountingFilter extends AbstractFilter { ...@@ -29,7 +31,14 @@ public class TraceCountingFilter extends AbstractFilter {
public void processRecord(final IRecord record) { public void processRecord(final IRecord record) {
if (record instanceof Trace) { if (record instanceof Trace) {
final Trace trace = (Trace) record; final Trace trace = (Trace) record;
counter.inputObjectsCount(trace.getCalledTimes()); if (!trace.getTraceEvents().isEmpty()) {
final AbstractEventRecord abstractEventRecord = trace.getTraceEvents().get(0);
if (abstractEventRecord instanceof AbstractBeforeOperationEventRecord) {
final AbstractBeforeOperationEventRecord abstractBeforeOperationEventRecord = (AbstractBeforeOperationEventRecord) abstractEventRecord;
counter.inputObjectsCount(abstractBeforeOperationEventRecord
.getRuntimeStatisticInformation().getCount());
}
}
deliver(record); deliver(record);
} else if (record instanceof TimedPeriodRecord) { } else if (record instanceof TimedPeriodRecord) {
deliver(record); deliver(record);
......
...@@ -33,7 +33,10 @@ class TraceReconstructionBuffer { ...@@ -33,7 +33,10 @@ class TraceReconstructionBuffer {
if ((event instanceof AbstractBeforeEventRecord)) { if ((event instanceof AbstractBeforeEventRecord)) {
openEvents++; openEvents++;
final AbstractBeforeEventRecord beforeEvent = (AbstractBeforeEventRecord) event; final AbstractBeforeEventRecord beforeEvent = (AbstractBeforeEventRecord) event;
beforeEvent.setRuntimeStatisticInformation(new RuntimeStatisticInformation(1, -1, -1)); if (beforeEvent.getRuntimeStatisticInformation() == null) {
beforeEvent.setRuntimeStatisticInformation(new RuntimeStatisticInformation(1, -1,
-1));
}
} else if ((event instanceof AbstractAfterFailedEventRecord) } else if ((event instanceof AbstractAfterFailedEventRecord)
|| (event instanceof AbstractAfterEventRecord)) { || (event instanceof AbstractAfterEventRecord)) {
openEvents--; openEvents--;
...@@ -91,14 +94,16 @@ class TraceReconstructionBuffer { ...@@ -91,14 +94,16 @@ class TraceReconstructionBuffer {
if (!stack.isEmpty()) { if (!stack.isEmpty()) {
final AbstractBeforeEventRecord beforeEvent = stack.pop(); final AbstractBeforeEventRecord beforeEvent = stack.pop();
if (beforeEvent.getRuntimeStatisticInformation() == null) {
final long time = event.getLoggingTimestamp() final long time = event.getLoggingTimestamp()
- beforeEvent.getLoggingTimestamp(); - beforeEvent.getLoggingTimestamp();
beforeEvent.setRuntimeStatisticInformation(new RuntimeStatisticInformation(1, beforeEvent.setRuntimeStatisticInformation(new RuntimeStatisticInformation(
time, time * time)); 1, time, time * time));
}
} }
} }
} }
return new Trace(new ArrayList<AbstractEventRecord>(events), valid, containsRemoteRecord, return new Trace(new ArrayList<AbstractEventRecord>(events), valid, containsRemoteRecord,
1, events.size()); events.size());
} }
} }
...@@ -27,8 +27,6 @@ class TracesSummarizationBuffer { ...@@ -27,8 +27,6 @@ class TracesSummarizationBuffer {
if (accumulator == null) { if (accumulator == null) {
accumulator = trace; accumulator = trace;
} else { } else {
accumulator.setCalledTimes(accumulator.getCalledTimes() + trace.getCalledTimes());
final List<AbstractEventRecord> aggregatedRecords = accumulator.getTraceEvents(); final List<AbstractEventRecord> aggregatedRecords = accumulator.getTraceEvents();
final List<AbstractEventRecord> records = trace.getTraceEvents(); final List<AbstractEventRecord> records = trace.getTraceEvents();
......
package explorviz.live_trace_processing.filter.reduction; package explorviz.live_trace_processing.filter.reduction;
import java.util.Iterator; import java.util.Iterator;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Queue; import java.util.Queue;
...@@ -80,8 +81,9 @@ public class TracesSummarizationFilter extends AbstractFilter { ...@@ -80,8 +81,9 @@ public class TracesSummarizationFilter extends AbstractFilter {
} }
public void makeTraceElementsAccumulator(final Trace trace) { public void makeTraceElementsAccumulator(final Trace trace) {
for (int i = 0; i < trace.getTraceEvents().size(); i++) { final List<AbstractEventRecord> traceEvents = trace.getTraceEvents();
final AbstractEventRecord event = trace.getTraceEvents().get(i);
for (final AbstractEventRecord event : traceEvents) {
if (event instanceof AbstractBeforeEventRecord) { if (event instanceof AbstractBeforeEventRecord) {
final AbstractBeforeEventRecord abstractBeforeEventRecord = (AbstractBeforeEventRecord) event; final AbstractBeforeEventRecord abstractBeforeEventRecord = (AbstractBeforeEventRecord) event;
......
...@@ -62,7 +62,7 @@ class TCPReaderOneClient extends Thread { ...@@ -62,7 +62,7 @@ class TCPReaderOneClient extends Thread {
@Override @Override
public void run() { public void run() {
final ByteBuffer buffer = ByteBuffer.allocate(16 * 1024 * 1024); final ByteBuffer buffer = ByteBuffer.allocate(2 * 1024 * 1024);
try { try {
String remoteAddress = ""; String remoteAddress = "";
if (socketChannel.isConnected()) { if (socketChannel.isConnected()) {
...@@ -194,13 +194,12 @@ class TCPReaderOneClient extends Thread { ...@@ -194,13 +194,12 @@ class TCPReaderOneClient extends Thread {
if (containsRemoteRecordByte == (byte) 0) { if (containsRemoteRecordByte == (byte) 0) {
containsRemoteRecord = false; containsRemoteRecord = false;
} }
final int calledTimes = buffer.getInt();
final int eventsLength = buffer.getInt(); final int eventsLength = buffer.getInt();
final List<AbstractEventRecord> events = new ArrayList<AbstractEventRecord>( final List<AbstractEventRecord> events = new ArrayList<AbstractEventRecord>(
eventsLength); eventsLength);
currentlyOpenTrace = new Trace(events, valid, containsRemoteRecord, currentlyOpenTrace = new Trace(events, valid, containsRemoteRecord,
calledTimes, eventsLength); eventsLength);
if (!readInTraceRecordChunks(buffer)) { if (!readInTraceRecordChunks(buffer)) {
return; return;
...@@ -406,7 +405,11 @@ class TCPReaderOneClient extends Thread { ...@@ -406,7 +405,11 @@ class TCPReaderOneClient extends Thread {
if (currentlyOpenTrace.getTraceEvents().size() == currentlyOpenTrace if (currentlyOpenTrace.getTraceEvents().size() == currentlyOpenTrace
.getEventsLength()) { .getEventsLength()) {
putInQueue(currentlyOpenTrace); final List<AbstractEventRecord> traceEvents = currentlyOpenTrace
.getTraceEvents();
for (final AbstractEventRecord event : traceEvents) {
putInQueue(event);
}
currentlyOpenTrace = null; currentlyOpenTrace = null;
return !cleared; return !cleared;
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment