diff --git a/src/explorviz/live_trace_processing/reader/TCPReaderOneClient.java b/src/explorviz/live_trace_processing/reader/TCPReaderOneClient.java index b28db6dcbf13079873678dc297cbfbf96b6b1303..e671bcf001d2102569c7cf2bb2b445870b0fb0d0 100644 --- a/src/explorviz/live_trace_processing/reader/TCPReaderOneClient.java +++ b/src/explorviz/live_trace_processing/reader/TCPReaderOneClient.java @@ -1,10 +1,14 @@ package explorviz.live_trace_processing.reader; import java.io.IOException; +import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; import java.util.ArrayList; import java.util.List; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -14,6 +18,7 @@ import com.lmax.disruptor.RingBuffer; import explorviz.live_trace_processing.Constants; import explorviz.live_trace_processing.IdNotAvailableException; import explorviz.live_trace_processing.StringRegistry; +import explorviz.live_trace_processing.adaptive_monitoring.AdaptiveMonitoringPatternList; import explorviz.live_trace_processing.filter.RecordArrayEvent; import explorviz.live_trace_processing.record.IRecord; import explorviz.live_trace_processing.record.event.AbstractEventRecord; @@ -33,6 +38,7 @@ import explorviz.live_trace_processing.record.misc.StringRegistryRecord; import explorviz.live_trace_processing.record.misc.SystemMonitoringRecord; import explorviz.live_trace_processing.record.trace.HostApplicationMetaDataRecord; import explorviz.live_trace_processing.record.trace.Trace; +import explorviz.live_trace_processing.writer.RemoteConfigurator; class TCPReaderOneClient extends Thread { @@ -49,6 +55,8 @@ class TCPReaderOneClient extends Thread { private IRecord[] outputBuffer = new IRecord[Constants.TCP_READER_OUTPUT_BUFFER_SIZE]; private int outputBufferIndex = 0; + private String remoteAddress = ""; + public TCPReaderOneClient(final SocketChannel socketChannel, final RingBuffer<RecordArrayEvent> ringBuffer) { this.socketChannel = socketChannel; @@ -60,17 +68,35 @@ class TCPReaderOneClient extends Thread { final ByteBuffer buffer = ByteBuffer.allocateDirect(2 * 1024 * 1024); try { if (socketChannel.isConnected()) { - LOG.info("Client " + socketChannel.getRemoteAddress() + " connected."); + remoteAddress = ((InetSocketAddress) socketChannel.getRemoteAddress()) + .getHostName(); + LOG.info("Client " + remoteAddress + " connected."); } + + RemoteConfigurationServlet.getConnectedChildren().add(remoteAddress); + + sendAdaptiveMonitoringList(); + while ((socketChannel.read(buffer)) != -1) { buffer.flip(); messagesfromByteArray(buffer); } } catch (final IOException ex) { + RemoteConfigurationServlet.getConnectedChildren().remove(remoteAddress); LOG.info("Error in read() " + ex.getMessage()); } } + private void sendAdaptiveMonitoringList() { + final ConcurrentHashMap<String, Set<String>> patternMap = AdaptiveMonitoringPatternList + .getApplicationToPatternMap(); + for (final Entry<String, Set<String>> entry : patternMap.entrySet()) { + for (final String pattern : entry.getValue()) { + RemoteConfigurator.addPattern(remoteAddress, pattern, entry.getKey()); + } + } + } + private final void messagesfromByteArray(final ByteBuffer buffer) { while (buffer.remaining() > 0) { final byte clazzId = buffer.get();