Skip to content
Snippets Groups Projects
Commit ab56e601 authored by Florian Fittkau's avatar Florian Fittkau
Browse files

adaptive monitoring

parent abe1ef45
Branches
Tags
No related merge requests found
package explorviz.live_trace_processing.reader;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -14,6 +18,7 @@ import com.lmax.disruptor.RingBuffer;
import explorviz.live_trace_processing.Constants;
import explorviz.live_trace_processing.IdNotAvailableException;
import explorviz.live_trace_processing.StringRegistry;
import explorviz.live_trace_processing.adaptive_monitoring.AdaptiveMonitoringPatternList;
import explorviz.live_trace_processing.filter.RecordArrayEvent;
import explorviz.live_trace_processing.record.IRecord;
import explorviz.live_trace_processing.record.event.AbstractEventRecord;
......@@ -33,6 +38,7 @@ import explorviz.live_trace_processing.record.misc.StringRegistryRecord;
import explorviz.live_trace_processing.record.misc.SystemMonitoringRecord;
import explorviz.live_trace_processing.record.trace.HostApplicationMetaDataRecord;
import explorviz.live_trace_processing.record.trace.Trace;
import explorviz.live_trace_processing.writer.RemoteConfigurator;
class TCPReaderOneClient extends Thread {
......@@ -49,6 +55,8 @@ class TCPReaderOneClient extends Thread {
private IRecord[] outputBuffer = new IRecord[Constants.TCP_READER_OUTPUT_BUFFER_SIZE];
private int outputBufferIndex = 0;
private String remoteAddress = "";
public TCPReaderOneClient(final SocketChannel socketChannel,
final RingBuffer<RecordArrayEvent> ringBuffer) {
this.socketChannel = socketChannel;
......@@ -60,17 +68,35 @@ class TCPReaderOneClient extends Thread {
final ByteBuffer buffer = ByteBuffer.allocateDirect(2 * 1024 * 1024);
try {
if (socketChannel.isConnected()) {
LOG.info("Client " + socketChannel.getRemoteAddress() + " connected.");
remoteAddress = ((InetSocketAddress) socketChannel.getRemoteAddress())
.getHostName();
LOG.info("Client " + remoteAddress + " connected.");
}
RemoteConfigurationServlet.getConnectedChildren().add(remoteAddress);
sendAdaptiveMonitoringList();
while ((socketChannel.read(buffer)) != -1) {
buffer.flip();
messagesfromByteArray(buffer);
}
} catch (final IOException ex) {
RemoteConfigurationServlet.getConnectedChildren().remove(remoteAddress);
LOG.info("Error in read() " + ex.getMessage());
}
}
private void sendAdaptiveMonitoringList() {
final ConcurrentHashMap<String, Set<String>> patternMap = AdaptiveMonitoringPatternList
.getApplicationToPatternMap();
for (final Entry<String, Set<String>> entry : patternMap.entrySet()) {
for (final String pattern : entry.getValue()) {
RemoteConfigurator.addPattern(remoteAddress, pattern, entry.getKey());
}
}
}
private final void messagesfromByteArray(final ByteBuffer buffer) {
while (buffer.remaining() > 0) {
final byte clazzId = buffer.get();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment