From 9c963c0dd2115308b64ead5945ee3e3c81a755e6 Mon Sep 17 00:00:00 2001 From: Florian Fittkau <ffi@informatik.uni-kiel.de> Date: Tue, 3 Dec 2013 23:20:23 +0100 Subject: [PATCH] tweaks and buffer sizes --- ...z.live_trace_processing.default.properties | 6 ++--- .../TraceReconstructionBuffer.java | 22 ++----------------- .../TraceReconstructionFilter.java | 19 +++++----------- .../TracePatternSummarizationBuffer.java | 4 ---- .../reader/TCPReader.java | 2 +- .../reader/TCPReaderOneClient.java | 6 +++-- 6 files changed, 15 insertions(+), 44 deletions(-) diff --git a/src/META-INF/explorviz.live_trace_processing.default.properties b/src/META-INF/explorviz.live_trace_processing.default.properties index a690ba6..c52ffc7 100644 --- a/src/META-INF/explorviz.live_trace_processing.default.properties +++ b/src/META-INF/explorviz.live_trace_processing.default.properties @@ -14,13 +14,13 @@ explorviz.live_trace_processing.writer_load_balancing_scaling_group=analysis-wor explorviz.live_trace_processing.sending_buffer_size=65536 explorviz.live_trace_processing.monitoring_controller_disruptor_size=32 -explorviz.live_trace_processing.tcp_reader_output_buffer_size=1024 +explorviz.live_trace_processing.tcp_reader_output_buffer_size=8192 explorviz.live_trace_processing.tcp_reader_disruptor_size=32 -explorviz.live_trace_processing.trace_reconstruction_output_buffer_size=256 +explorviz.live_trace_processing.trace_reconstruction_output_buffer_size=512 explorviz.live_trace_processing.trace_reconstruction_disruptor_size=32 explorviz.live_trace_processing.trace_reconstruction_buffer_initial_size=128 -explorviz.live_trace_processing.trace_summarization_output_buffer_size=256 +explorviz.live_trace_processing.trace_summarization_output_buffer_size=64 explorviz.live_trace_processing.trace_summarization_disruptor_size=16 \ No newline at end of file diff --git a/src/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionBuffer.java b/src/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionBuffer.java index a2d790f..1f8203e 100644 --- a/src/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionBuffer.java +++ b/src/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionBuffer.java @@ -11,14 +11,10 @@ import explorviz.live_trace_processing.record.event.normal.BeforeOperationEventR import explorviz.live_trace_processing.record.trace.Trace; class TraceReconstructionBuffer { - // private static final Comparator<AbstractOperationEvent> COMPARATOR = new - // AbstractOperationEventComperator(); - private final List<AbstractOperationEventRecord> events = new ArrayList<AbstractOperationEventRecord>( Constants.TRACE_RECONSTRUCTION_BUFFER_INITIAL_SIZE); private boolean closeable; - private boolean damaged; private int openEvents; @@ -40,11 +36,7 @@ class TraceReconstructionBuffer { openEvents--; } - if (!events.add(event)) { // TODO - System.out.println("Duplicate entry for orderIndex " + orderIndex + " with traceId " - + event.getTraceId()); - damaged = true; - } + events.add(event); } public final long getMaxLoggingTimestamp() { @@ -71,20 +63,10 @@ class TraceReconstructionBuffer { } public final boolean isInvalid() { - return ((openEvents != 0) || events.isEmpty() || ((maxOrderIndex + 1) != events.size()) - || damaged || !closeable); + return ((openEvents != 0) || events.isEmpty() || ((maxOrderIndex + 1) != events.size()) || !closeable); } public final Trace toTrace(final boolean valid) { return new Trace(events, valid); } - - // private static final class AbstractOperationEventComperator implements - // Comparator<AbstractOperationEvent> { - // @Override - // public int compare(final AbstractOperationEvent o1, final - // AbstractOperationEvent o2) { - // return o1.getOrderIndex() - o2.getOrderIndex(); - // } - // } } diff --git a/src/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionFilter.java b/src/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionFilter.java index ce71458..ec030d4 100644 --- a/src/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionFilter.java +++ b/src/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionFilter.java @@ -32,7 +32,7 @@ public final class TraceReconstructionFilter extends AbstractFilter { } @Override - public void processRecord(final IRecord record) { + public final void processRecord(final IRecord record) { if (record instanceof AbstractOperationEventRecord) { final AbstractOperationEventRecord abstractOperationEvent = ((AbstractOperationEventRecord) record); @@ -43,7 +43,7 @@ public final class TraceReconstructionFilter extends AbstractFilter { if (traceBuffer.isFinished()) { traceId2trace.remove(traceId); - sendOutValidTrace(traceBuffer.toTrace(true)); + deliver(traceBuffer.toTrace(true)); } } else if (record instanceof Trace) { final Trace trace = (Trace) record; @@ -64,7 +64,7 @@ public final class TraceReconstructionFilter extends AbstractFilter { } } - private TraceReconstructionBuffer getBufferForTraceId(final long traceId) { + private final TraceReconstructionBuffer getBufferForTraceId(final long traceId) { TraceReconstructionBuffer traceBuffer = traceId2trace.get(traceId); if (traceBuffer == null) { traceBuffer = new TraceReconstructionBuffer(); @@ -80,7 +80,7 @@ public final class TraceReconstructionFilter extends AbstractFilter { for (final Entry<Long, TraceReconstructionBuffer> entry : traceId2trace.entrySet()) { final TraceReconstructionBuffer traceBuffer = entry.getValue(); if (traceBuffer.getMaxLoggingTimestamp() <= traceTimeout) { - sendOutInvalidTrace(traceBuffer.toTrace(false)); + deliver(traceBuffer.toTrace(false)); traceIdsToRemove.add(entry.getKey()); } } @@ -90,18 +90,9 @@ public final class TraceReconstructionFilter extends AbstractFilter { } } - private void sendOutValidTrace(final Trace trace) { - trace.setValid(true); - deliver(trace); - } - - private void sendOutInvalidTrace(final Trace trace) { - deliver(trace); - } - private void terminate() { for (final TraceReconstructionBuffer entry : traceId2trace.values()) { - sendOutInvalidTrace(entry.toTrace(false)); + deliver(entry.toTrace(false)); } traceId2trace.clear(); } diff --git a/src/explorviz/live_trace_processing/filter/reduction/summarization/TracePatternSummarizationBuffer.java b/src/explorviz/live_trace_processing/filter/reduction/summarization/TracePatternSummarizationBuffer.java index 0705752..f77249f 100644 --- a/src/explorviz/live_trace_processing/filter/reduction/summarization/TracePatternSummarizationBuffer.java +++ b/src/explorviz/live_trace_processing/filter/reduction/summarization/TracePatternSummarizationBuffer.java @@ -22,10 +22,6 @@ class TracePatternSummarizationBuffer { } public void insertTrace(final Trace trace) { - aggregate(trace); - } - - private void aggregate(final Trace trace) { if (accumulator == null) { accumulator = trace; } else { diff --git a/src/explorviz/live_trace_processing/reader/TCPReader.java b/src/explorviz/live_trace_processing/reader/TCPReader.java index 9b13249..c449d95 100644 --- a/src/explorviz/live_trace_processing/reader/TCPReader.java +++ b/src/explorviz/live_trace_processing/reader/TCPReader.java @@ -37,7 +37,7 @@ public final class TCPReader { @SuppressWarnings("unchecked") final EventHandler<RecordArrayEvent>[] eventHandlers = new EventHandler[1]; - eventHandlers[0] = new TraceReconstructionFilter(TimeUnit.SECONDS.toNanos(2), endReceiver); + eventHandlers[0] = new TraceReconstructionFilter(TimeUnit.SECONDS.toNanos(5), endReceiver); disruptor.handleEventsWith(eventHandlers); ringBuffer = disruptor.start(); } diff --git a/src/explorviz/live_trace_processing/reader/TCPReaderOneClient.java b/src/explorviz/live_trace_processing/reader/TCPReaderOneClient.java index e02b689..b3707a2 100644 --- a/src/explorviz/live_trace_processing/reader/TCPReaderOneClient.java +++ b/src/explorviz/live_trace_processing/reader/TCPReaderOneClient.java @@ -46,12 +46,14 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec @Override public void run() { - final ByteBuffer buffer = ByteBuffer - .allocateDirect(Constants.SENDING_BUFFER_SIZE); + ByteBuffer buffer = ByteBuffer.allocateDirect(2 * 1024 * 1024); try { while ((socketChannel.read(buffer)) != -1) { buffer.flip(); messagesfromByteArray(buffer); + if (buffer.capacity() == buffer.position()) { + buffer = ByteBuffer.allocateDirect(2 * buffer.capacity()); + } } } catch (final IOException ex) { System.out.println("Error in read() " + ex.getMessage()); -- GitLab