Skip to content
Snippets Groups Projects
Commit 9c963c0d authored by Florian Fittkau's avatar Florian Fittkau
Browse files

tweaks and buffer sizes

parent 7978b9f0
No related branches found
No related tags found
No related merge requests found
...@@ -14,13 +14,13 @@ explorviz.live_trace_processing.writer_load_balancing_scaling_group=analysis-wor ...@@ -14,13 +14,13 @@ explorviz.live_trace_processing.writer_load_balancing_scaling_group=analysis-wor
explorviz.live_trace_processing.sending_buffer_size=65536 explorviz.live_trace_processing.sending_buffer_size=65536
explorviz.live_trace_processing.monitoring_controller_disruptor_size=32 explorviz.live_trace_processing.monitoring_controller_disruptor_size=32
explorviz.live_trace_processing.tcp_reader_output_buffer_size=1024 explorviz.live_trace_processing.tcp_reader_output_buffer_size=8192
explorviz.live_trace_processing.tcp_reader_disruptor_size=32 explorviz.live_trace_processing.tcp_reader_disruptor_size=32
explorviz.live_trace_processing.trace_reconstruction_output_buffer_size=256 explorviz.live_trace_processing.trace_reconstruction_output_buffer_size=512
explorviz.live_trace_processing.trace_reconstruction_disruptor_size=32 explorviz.live_trace_processing.trace_reconstruction_disruptor_size=32
explorviz.live_trace_processing.trace_reconstruction_buffer_initial_size=128 explorviz.live_trace_processing.trace_reconstruction_buffer_initial_size=128
explorviz.live_trace_processing.trace_summarization_output_buffer_size=256 explorviz.live_trace_processing.trace_summarization_output_buffer_size=64
explorviz.live_trace_processing.trace_summarization_disruptor_size=16 explorviz.live_trace_processing.trace_summarization_disruptor_size=16
\ No newline at end of file
...@@ -11,14 +11,10 @@ import explorviz.live_trace_processing.record.event.normal.BeforeOperationEventR ...@@ -11,14 +11,10 @@ import explorviz.live_trace_processing.record.event.normal.BeforeOperationEventR
import explorviz.live_trace_processing.record.trace.Trace; import explorviz.live_trace_processing.record.trace.Trace;
class TraceReconstructionBuffer { class TraceReconstructionBuffer {
// private static final Comparator<AbstractOperationEvent> COMPARATOR = new
// AbstractOperationEventComperator();
private final List<AbstractOperationEventRecord> events = new ArrayList<AbstractOperationEventRecord>( private final List<AbstractOperationEventRecord> events = new ArrayList<AbstractOperationEventRecord>(
Constants.TRACE_RECONSTRUCTION_BUFFER_INITIAL_SIZE); Constants.TRACE_RECONSTRUCTION_BUFFER_INITIAL_SIZE);
private boolean closeable; private boolean closeable;
private boolean damaged;
private int openEvents; private int openEvents;
...@@ -40,11 +36,7 @@ class TraceReconstructionBuffer { ...@@ -40,11 +36,7 @@ class TraceReconstructionBuffer {
openEvents--; openEvents--;
} }
if (!events.add(event)) { // TODO events.add(event);
System.out.println("Duplicate entry for orderIndex " + orderIndex + " with traceId "
+ event.getTraceId());
damaged = true;
}
} }
public final long getMaxLoggingTimestamp() { public final long getMaxLoggingTimestamp() {
...@@ -71,20 +63,10 @@ class TraceReconstructionBuffer { ...@@ -71,20 +63,10 @@ class TraceReconstructionBuffer {
} }
public final boolean isInvalid() { public final boolean isInvalid() {
return ((openEvents != 0) || events.isEmpty() || ((maxOrderIndex + 1) != events.size()) return ((openEvents != 0) || events.isEmpty() || ((maxOrderIndex + 1) != events.size()) || !closeable);
|| damaged || !closeable);
} }
public final Trace toTrace(final boolean valid) { public final Trace toTrace(final boolean valid) {
return new Trace(events, valid); return new Trace(events, valid);
} }
// private static final class AbstractOperationEventComperator implements
// Comparator<AbstractOperationEvent> {
// @Override
// public int compare(final AbstractOperationEvent o1, final
// AbstractOperationEvent o2) {
// return o1.getOrderIndex() - o2.getOrderIndex();
// }
// }
} }
...@@ -32,7 +32,7 @@ public final class TraceReconstructionFilter extends AbstractFilter { ...@@ -32,7 +32,7 @@ public final class TraceReconstructionFilter extends AbstractFilter {
} }
@Override @Override
public void processRecord(final IRecord record) { public final void processRecord(final IRecord record) {
if (record instanceof AbstractOperationEventRecord) { if (record instanceof AbstractOperationEventRecord) {
final AbstractOperationEventRecord abstractOperationEvent = ((AbstractOperationEventRecord) record); final AbstractOperationEventRecord abstractOperationEvent = ((AbstractOperationEventRecord) record);
...@@ -43,7 +43,7 @@ public final class TraceReconstructionFilter extends AbstractFilter { ...@@ -43,7 +43,7 @@ public final class TraceReconstructionFilter extends AbstractFilter {
if (traceBuffer.isFinished()) { if (traceBuffer.isFinished()) {
traceId2trace.remove(traceId); traceId2trace.remove(traceId);
sendOutValidTrace(traceBuffer.toTrace(true)); deliver(traceBuffer.toTrace(true));
} }
} else if (record instanceof Trace) { } else if (record instanceof Trace) {
final Trace trace = (Trace) record; final Trace trace = (Trace) record;
...@@ -64,7 +64,7 @@ public final class TraceReconstructionFilter extends AbstractFilter { ...@@ -64,7 +64,7 @@ public final class TraceReconstructionFilter extends AbstractFilter {
} }
} }
private TraceReconstructionBuffer getBufferForTraceId(final long traceId) { private final TraceReconstructionBuffer getBufferForTraceId(final long traceId) {
TraceReconstructionBuffer traceBuffer = traceId2trace.get(traceId); TraceReconstructionBuffer traceBuffer = traceId2trace.get(traceId);
if (traceBuffer == null) { if (traceBuffer == null) {
traceBuffer = new TraceReconstructionBuffer(); traceBuffer = new TraceReconstructionBuffer();
...@@ -80,7 +80,7 @@ public final class TraceReconstructionFilter extends AbstractFilter { ...@@ -80,7 +80,7 @@ public final class TraceReconstructionFilter extends AbstractFilter {
for (final Entry<Long, TraceReconstructionBuffer> entry : traceId2trace.entrySet()) { for (final Entry<Long, TraceReconstructionBuffer> entry : traceId2trace.entrySet()) {
final TraceReconstructionBuffer traceBuffer = entry.getValue(); final TraceReconstructionBuffer traceBuffer = entry.getValue();
if (traceBuffer.getMaxLoggingTimestamp() <= traceTimeout) { if (traceBuffer.getMaxLoggingTimestamp() <= traceTimeout) {
sendOutInvalidTrace(traceBuffer.toTrace(false)); deliver(traceBuffer.toTrace(false));
traceIdsToRemove.add(entry.getKey()); traceIdsToRemove.add(entry.getKey());
} }
} }
...@@ -90,18 +90,9 @@ public final class TraceReconstructionFilter extends AbstractFilter { ...@@ -90,18 +90,9 @@ public final class TraceReconstructionFilter extends AbstractFilter {
} }
} }
private void sendOutValidTrace(final Trace trace) {
trace.setValid(true);
deliver(trace);
}
private void sendOutInvalidTrace(final Trace trace) {
deliver(trace);
}
private void terminate() { private void terminate() {
for (final TraceReconstructionBuffer entry : traceId2trace.values()) { for (final TraceReconstructionBuffer entry : traceId2trace.values()) {
sendOutInvalidTrace(entry.toTrace(false)); deliver(entry.toTrace(false));
} }
traceId2trace.clear(); traceId2trace.clear();
} }
......
...@@ -22,10 +22,6 @@ class TracePatternSummarizationBuffer { ...@@ -22,10 +22,6 @@ class TracePatternSummarizationBuffer {
} }
public void insertTrace(final Trace trace) { public void insertTrace(final Trace trace) {
aggregate(trace);
}
private void aggregate(final Trace trace) {
if (accumulator == null) { if (accumulator == null) {
accumulator = trace; accumulator = trace;
} else { } else {
......
...@@ -37,7 +37,7 @@ public final class TCPReader { ...@@ -37,7 +37,7 @@ public final class TCPReader {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
final EventHandler<RecordArrayEvent>[] eventHandlers = new EventHandler[1]; final EventHandler<RecordArrayEvent>[] eventHandlers = new EventHandler[1];
eventHandlers[0] = new TraceReconstructionFilter(TimeUnit.SECONDS.toNanos(2), endReceiver); eventHandlers[0] = new TraceReconstructionFilter(TimeUnit.SECONDS.toNanos(5), endReceiver);
disruptor.handleEventsWith(eventHandlers); disruptor.handleEventsWith(eventHandlers);
ringBuffer = disruptor.start(); ringBuffer = disruptor.start();
} }
......
...@@ -46,12 +46,14 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec ...@@ -46,12 +46,14 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec
@Override @Override
public void run() { public void run() {
final ByteBuffer buffer = ByteBuffer ByteBuffer buffer = ByteBuffer.allocateDirect(2 * 1024 * 1024);
.allocateDirect(Constants.SENDING_BUFFER_SIZE);
try { try {
while ((socketChannel.read(buffer)) != -1) { while ((socketChannel.read(buffer)) != -1) {
buffer.flip(); buffer.flip();
messagesfromByteArray(buffer); messagesfromByteArray(buffer);
if (buffer.capacity() == buffer.position()) {
buffer = ByteBuffer.allocateDirect(2 * buffer.capacity());
}
} }
} catch (final IOException ex) { } catch (final IOException ex) {
System.out.println("Error in read() " + ex.getMessage()); System.out.println("Error in read() " + ex.getMessage());
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment