From ab56e601e6ace3edce7cc43ddcc2d21aba94aae5 Mon Sep 17 00:00:00 2001 From: Florian Fittkau <ffi@informatik.uni-kiel.de> Date: Mon, 13 Oct 2014 18:03:47 +0200 Subject: [PATCH] adaptive monitoring --- .../reader/TCPReaderOneClient.java | 28 ++++++++++++++++++- 1 file changed, 27 insertions(+), 1 deletion(-) diff --git a/src/explorviz/live_trace_processing/reader/TCPReaderOneClient.java b/src/explorviz/live_trace_processing/reader/TCPReaderOneClient.java index b28db6d..e671bcf 100644 --- a/src/explorviz/live_trace_processing/reader/TCPReaderOneClient.java +++ b/src/explorviz/live_trace_processing/reader/TCPReaderOneClient.java @@ -1,10 +1,14 @@ package explorviz.live_trace_processing.reader; import java.io.IOException; +import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; import java.util.ArrayList; import java.util.List; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -14,6 +18,7 @@ import com.lmax.disruptor.RingBuffer; import explorviz.live_trace_processing.Constants; import explorviz.live_trace_processing.IdNotAvailableException; import explorviz.live_trace_processing.StringRegistry; +import explorviz.live_trace_processing.adaptive_monitoring.AdaptiveMonitoringPatternList; import explorviz.live_trace_processing.filter.RecordArrayEvent; import explorviz.live_trace_processing.record.IRecord; import explorviz.live_trace_processing.record.event.AbstractEventRecord; @@ -33,6 +38,7 @@ import explorviz.live_trace_processing.record.misc.StringRegistryRecord; import explorviz.live_trace_processing.record.misc.SystemMonitoringRecord; import explorviz.live_trace_processing.record.trace.HostApplicationMetaDataRecord; import explorviz.live_trace_processing.record.trace.Trace; +import explorviz.live_trace_processing.writer.RemoteConfigurator; class TCPReaderOneClient extends Thread { @@ -49,6 +55,8 @@ class TCPReaderOneClient extends Thread { private IRecord[] outputBuffer = new IRecord[Constants.TCP_READER_OUTPUT_BUFFER_SIZE]; private int outputBufferIndex = 0; + private String remoteAddress = ""; + public TCPReaderOneClient(final SocketChannel socketChannel, final RingBuffer<RecordArrayEvent> ringBuffer) { this.socketChannel = socketChannel; @@ -60,17 +68,35 @@ class TCPReaderOneClient extends Thread { final ByteBuffer buffer = ByteBuffer.allocateDirect(2 * 1024 * 1024); try { if (socketChannel.isConnected()) { - LOG.info("Client " + socketChannel.getRemoteAddress() + " connected."); + remoteAddress = ((InetSocketAddress) socketChannel.getRemoteAddress()) + .getHostName(); + LOG.info("Client " + remoteAddress + " connected."); } + + RemoteConfigurationServlet.getConnectedChildren().add(remoteAddress); + + sendAdaptiveMonitoringList(); + while ((socketChannel.read(buffer)) != -1) { buffer.flip(); messagesfromByteArray(buffer); } } catch (final IOException ex) { + RemoteConfigurationServlet.getConnectedChildren().remove(remoteAddress); LOG.info("Error in read() " + ex.getMessage()); } } + private void sendAdaptiveMonitoringList() { + final ConcurrentHashMap<String, Set<String>> patternMap = AdaptiveMonitoringPatternList + .getApplicationToPatternMap(); + for (final Entry<String, Set<String>> entry : patternMap.entrySet()) { + for (final String pattern : entry.getValue()) { + RemoteConfigurator.addPattern(remoteAddress, pattern, entry.getKey()); + } + } + } + private final void messagesfromByteArray(final ByteBuffer buffer) { while (buffer.remaining() > 0) { final byte clazzId = buffer.get(); -- GitLab