From b6f1f87f8b495b968a0699e6d29c1f76d596e52c Mon Sep 17 00:00:00 2001 From: Florian Fittkau <ffi@informatik.uni-kiel.de> Date: Tue, 10 Feb 2015 23:20:37 +0100 Subject: [PATCH] bug fixes --- ...z.live_trace_processing.default.properties | 4 +-- .../main/FilterConfiguration.java | 5 +++- .../reader/TCPReader.java | 8 ++++-- .../reader/TCPReaderOneClient.java | 28 +++++++++++++++---- 4 files changed, 34 insertions(+), 11 deletions(-) diff --git a/src/META-INF/explorviz.live_trace_processing.default.properties b/src/META-INF/explorviz.live_trace_processing.default.properties index 35a12f1..1fe5c60 100644 --- a/src/META-INF/explorviz.live_trace_processing.default.properties +++ b/src/META-INF/explorviz.live_trace_processing.default.properties @@ -9,7 +9,7 @@ explorviz.live_trace_processing.writer_load_balancing_port=9999 explorviz.live_trace_processing.writer_load_balancing_wait_time=20000 explorviz.live_trace_processing.writer_load_balancing_scaling_group=analysis-worker -explorviz.live_trace_processing.sending_buffer_size=2097152 +explorviz.live_trace_processing.sending_buffer_size=16777216 ######################## Monitoring ######################## @@ -24,7 +24,7 @@ explorviz.live_trace_processing.debug=false explorviz.live_trace_processing.android_monitoring=false explorviz.live_trace_processing.monitoring_enabled=true -explorviz.live_trace_processing.system_monitoring_enabled=true +explorviz.live_trace_processing.system_monitoring_enabled=false explorviz.live_trace_processing.continous_monitoring_enabled=false diff --git a/src/explorviz/live_trace_processing/main/FilterConfiguration.java b/src/explorviz/live_trace_processing/main/FilterConfiguration.java index 2547109..a0430e4 100644 --- a/src/explorviz/live_trace_processing/main/FilterConfiguration.java +++ b/src/explorviz/live_trace_processing/main/FilterConfiguration.java @@ -14,6 +14,9 @@ import explorviz.live_trace_processing.reader.TCPReader; public class FilterConfiguration { public static void configureAndStartFilters(final Configuration configuration, final ITraceSink sink) { + final boolean isWorker = configuration + .getBooleanProperty(ConfigurationFactory.WORKER_ENABLED); + // final IRecordCounting recordCounting = new // RecordCountingFilter(sink); @@ -28,6 +31,6 @@ public class FilterConfiguration { TimeUnit.SECONDS.toNanos(4), traceReduction); new TCPReader(configuration.getIntProperty(ConfigurationFactory.READER_LISTENING_PORT, - 10133), traceReconstruction).read(); + 10133), isWorker, traceReconstruction).read(); } } diff --git a/src/explorviz/live_trace_processing/reader/TCPReader.java b/src/explorviz/live_trace_processing/reader/TCPReader.java index 11263c3..926fe00 100644 --- a/src/explorviz/live_trace_processing/reader/TCPReader.java +++ b/src/explorviz/live_trace_processing/reader/TCPReader.java @@ -35,8 +35,12 @@ public final class TCPReader implements IPeriodicTimeSignalReceiver { private final List<TCPReaderOneClient> threads = new ArrayList<TCPReaderOneClient>(); - public TCPReader(final int listeningPort, final ITraceReconstruction traceReconstruction) { + private final boolean isWorker; + + public TCPReader(final int listeningPort, final boolean isWorker, + final ITraceReconstruction traceReconstruction) { this.listeningPort = listeningPort; + this.isWorker = isWorker; final Disruptor<RecordArrayEvent> disruptor = new Disruptor<RecordArrayEvent>( new RecordArrayEventFactory(Constants.TCP_READER_OUTPUT_BUFFER_SIZE), @@ -82,7 +86,7 @@ public final class TCPReader implements IPeriodicTimeSignalReceiver { open(); while (active) { final TCPReaderOneClient thread = new TCPReaderOneClient(serversocket.accept(), - ringBuffer); + isWorker, ringBuffer); thread.start(); threads.add(thread); } diff --git a/src/explorviz/live_trace_processing/reader/TCPReaderOneClient.java b/src/explorviz/live_trace_processing/reader/TCPReaderOneClient.java index cabd6d2..e4624cd 100644 --- a/src/explorviz/live_trace_processing/reader/TCPReaderOneClient.java +++ b/src/explorviz/live_trace_processing/reader/TCPReaderOneClient.java @@ -60,15 +60,18 @@ class TCPReaderOneClient extends Thread { private String remoteAddress = ""; - public TCPReaderOneClient(final SocketChannel socketChannel, + private final boolean isWorker; + + public TCPReaderOneClient(final SocketChannel socketChannel, final boolean isWorker, final RingBuffer<RecordArrayEvent> ringBuffer) { this.socketChannel = socketChannel; + this.isWorker = isWorker; this.ringBuffer = ringBuffer; } @Override public void run() { - final ByteBuffer buffer = ByteBuffer.allocateDirect(2 * 1024 * 1024); + final ByteBuffer buffer = ByteBuffer.allocateDirect(16 * 1024 * 1024); try { if (socketChannel.isConnected()) { remoteAddress = ((InetSocketAddress) socketChannel.getRemoteAddress()) @@ -169,9 +172,21 @@ class TCPReaderOneClient extends Thread { break; } case SystemMonitoringRecord.CLAZZ_ID: { - if (buffer.remaining() >= SystemMonitoringRecord.COMPRESSED_BYTE_LENGTH) { - readInSystemMonitoringRecord(buffer); - break; + if (isWorker) { + if (buffer.remaining() >= SystemMonitoringRecord.COMPRESSED_BYTE_LENGTH) { + readInSystemMonitoringRecord(buffer); + break; + } + } else { + if (buffer.remaining() >= SystemMonitoringRecord.BYTE_LENGTH) { + try { + SystemMonitoringRecord.createFromByteBuffer(buffer, stringRegistry); + } catch (final IdNotAvailableException e) { + // should not happen + e.printStackTrace(); + } + break; + } } buffer.position(buffer.position() - 1); buffer.compact(); @@ -192,6 +207,7 @@ class TCPReaderOneClient extends Thread { final int calledTimes = buffer.getInt(); final int eventsLength = buffer.getInt(); final int byteLength = buffer.getInt(); + if ((buffer.remaining() >= byteLength) && (eventsLength > 0)) { final List<AbstractEventRecord> events = new ArrayList<AbstractEventRecord>( eventsLength); @@ -209,7 +225,7 @@ class TCPReaderOneClient extends Thread { calledTimes)); break; } - buffer.position(buffer.position() - 10); + buffer.position(buffer.position() - 15); buffer.compact(); return; } -- GitLab