diff --git a/src/META-INF/explorviz.live_trace_processing.default.properties b/src/META-INF/explorviz.live_trace_processing.default.properties index 35a12f1bc86dd0b41444812a8337d1fb638216e8..1fe5c6056f2bba37c8eb2ae7e4264a50bc018371 100644 --- a/src/META-INF/explorviz.live_trace_processing.default.properties +++ b/src/META-INF/explorviz.live_trace_processing.default.properties @@ -9,7 +9,7 @@ explorviz.live_trace_processing.writer_load_balancing_port=9999 explorviz.live_trace_processing.writer_load_balancing_wait_time=20000 explorviz.live_trace_processing.writer_load_balancing_scaling_group=analysis-worker -explorviz.live_trace_processing.sending_buffer_size=2097152 +explorviz.live_trace_processing.sending_buffer_size=16777216 ######################## Monitoring ######################## @@ -24,7 +24,7 @@ explorviz.live_trace_processing.debug=false explorviz.live_trace_processing.android_monitoring=false explorviz.live_trace_processing.monitoring_enabled=true -explorviz.live_trace_processing.system_monitoring_enabled=true +explorviz.live_trace_processing.system_monitoring_enabled=false explorviz.live_trace_processing.continous_monitoring_enabled=false diff --git a/src/explorviz/live_trace_processing/main/FilterConfiguration.java b/src/explorviz/live_trace_processing/main/FilterConfiguration.java index 2547109f69a99c252c315b3663ff9ecbb314011f..a0430e4206453ff2be1bf91f485ccd5021dc6b94 100644 --- a/src/explorviz/live_trace_processing/main/FilterConfiguration.java +++ b/src/explorviz/live_trace_processing/main/FilterConfiguration.java @@ -14,6 +14,9 @@ import explorviz.live_trace_processing.reader.TCPReader; public class FilterConfiguration { public static void configureAndStartFilters(final Configuration configuration, final ITraceSink sink) { + final boolean isWorker = configuration + .getBooleanProperty(ConfigurationFactory.WORKER_ENABLED); + // final IRecordCounting recordCounting = new // RecordCountingFilter(sink); @@ -28,6 +31,6 @@ public class FilterConfiguration { TimeUnit.SECONDS.toNanos(4), traceReduction); new TCPReader(configuration.getIntProperty(ConfigurationFactory.READER_LISTENING_PORT, - 10133), traceReconstruction).read(); + 10133), isWorker, traceReconstruction).read(); } } diff --git a/src/explorviz/live_trace_processing/reader/TCPReader.java b/src/explorviz/live_trace_processing/reader/TCPReader.java index 11263c3918858a316bf91adc547b336d106befaa..926fe00acb093507fdbed32c07927848923103f4 100644 --- a/src/explorviz/live_trace_processing/reader/TCPReader.java +++ b/src/explorviz/live_trace_processing/reader/TCPReader.java @@ -35,8 +35,12 @@ public final class TCPReader implements IPeriodicTimeSignalReceiver { private final List<TCPReaderOneClient> threads = new ArrayList<TCPReaderOneClient>(); - public TCPReader(final int listeningPort, final ITraceReconstruction traceReconstruction) { + private final boolean isWorker; + + public TCPReader(final int listeningPort, final boolean isWorker, + final ITraceReconstruction traceReconstruction) { this.listeningPort = listeningPort; + this.isWorker = isWorker; final Disruptor<RecordArrayEvent> disruptor = new Disruptor<RecordArrayEvent>( new RecordArrayEventFactory(Constants.TCP_READER_OUTPUT_BUFFER_SIZE), @@ -82,7 +86,7 @@ public final class TCPReader implements IPeriodicTimeSignalReceiver { open(); while (active) { final TCPReaderOneClient thread = new TCPReaderOneClient(serversocket.accept(), - ringBuffer); + isWorker, ringBuffer); thread.start(); threads.add(thread); } diff --git a/src/explorviz/live_trace_processing/reader/TCPReaderOneClient.java b/src/explorviz/live_trace_processing/reader/TCPReaderOneClient.java index cabd6d2e543f89c134477020eb2c1d24ad71f6e5..e4624cd59ed0ba7ee7cd7fc7fc36dac5fa2869ad 100644 --- a/src/explorviz/live_trace_processing/reader/TCPReaderOneClient.java +++ b/src/explorviz/live_trace_processing/reader/TCPReaderOneClient.java @@ -60,15 +60,18 @@ class TCPReaderOneClient extends Thread { private String remoteAddress = ""; - public TCPReaderOneClient(final SocketChannel socketChannel, + private final boolean isWorker; + + public TCPReaderOneClient(final SocketChannel socketChannel, final boolean isWorker, final RingBuffer<RecordArrayEvent> ringBuffer) { this.socketChannel = socketChannel; + this.isWorker = isWorker; this.ringBuffer = ringBuffer; } @Override public void run() { - final ByteBuffer buffer = ByteBuffer.allocateDirect(2 * 1024 * 1024); + final ByteBuffer buffer = ByteBuffer.allocateDirect(16 * 1024 * 1024); try { if (socketChannel.isConnected()) { remoteAddress = ((InetSocketAddress) socketChannel.getRemoteAddress()) @@ -169,9 +172,21 @@ class TCPReaderOneClient extends Thread { break; } case SystemMonitoringRecord.CLAZZ_ID: { - if (buffer.remaining() >= SystemMonitoringRecord.COMPRESSED_BYTE_LENGTH) { - readInSystemMonitoringRecord(buffer); - break; + if (isWorker) { + if (buffer.remaining() >= SystemMonitoringRecord.COMPRESSED_BYTE_LENGTH) { + readInSystemMonitoringRecord(buffer); + break; + } + } else { + if (buffer.remaining() >= SystemMonitoringRecord.BYTE_LENGTH) { + try { + SystemMonitoringRecord.createFromByteBuffer(buffer, stringRegistry); + } catch (final IdNotAvailableException e) { + // should not happen + e.printStackTrace(); + } + break; + } } buffer.position(buffer.position() - 1); buffer.compact(); @@ -192,6 +207,7 @@ class TCPReaderOneClient extends Thread { final int calledTimes = buffer.getInt(); final int eventsLength = buffer.getInt(); final int byteLength = buffer.getInt(); + if ((buffer.remaining() >= byteLength) && (eventsLength > 0)) { final List<AbstractEventRecord> events = new ArrayList<AbstractEventRecord>( eventsLength); @@ -209,7 +225,7 @@ class TCPReaderOneClient extends Thread { calledTimes)); break; } - buffer.position(buffer.position() - 10); + buffer.position(buffer.position() - 15); buffer.compact(); return; }