Skip to content
Snippets Groups Projects
Commit 96b57e0a authored by Florian Fittkau's avatar Florian Fittkau
Browse files

minor

parent 0f08efe0
No related branches found
No related tags found
No related merge requests found
......@@ -14,23 +14,16 @@ import explorviz.live_trace_processing.reader.TCPReader;
public class FilterConfiguration {
public static void configureAndStartFilters(final Configuration configuration,
final ITraceSink sink) {
final boolean isWorker = configuration
.getBooleanProperty(ConfigurationFactory.WORKER_ENABLED);
// final IRecordCounting recordCounting = new
// RecordCountingFilter(sink);
final ITraceReduction traceReduction = new TracesSummarizationFilter(
TimeUnit.MILLISECONDS.toNanos(990), sink);
// final PatternSummarizationFilter oneTraceReduction = new
// PatternSummarizationFilter(
// traceReduction, 300, 1);
final ITraceReconstruction traceReconstruction = new TraceReconstructionFilter(
TimeUnit.SECONDS.toNanos(4), traceReduction);
new TCPReader(configuration.getIntProperty(ConfigurationFactory.READER_LISTENING_PORT,
10133), isWorker, traceReconstruction).read();
10133), traceReconstruction).read();
}
}
......@@ -35,12 +35,8 @@ public final class TCPReader implements IPeriodicTimeSignalReceiver {
private final List<TCPReaderOneClient> threads = new ArrayList<TCPReaderOneClient>();
private final boolean isWorker;
public TCPReader(final int listeningPort, final boolean isWorker,
final ITraceReconstruction traceReconstruction) {
public TCPReader(final int listeningPort, final ITraceReconstruction traceReconstruction) {
this.listeningPort = listeningPort;
this.isWorker = isWorker;
final Disruptor<RecordArrayEvent> disruptor = new Disruptor<RecordArrayEvent>(
new RecordArrayEventFactory(Constants.TCP_READER_OUTPUT_BUFFER_SIZE),
......@@ -86,7 +82,7 @@ public final class TCPReader implements IPeriodicTimeSignalReceiver {
open();
while (active) {
final TCPReaderOneClient thread = new TCPReaderOneClient(serversocket.accept(),
isWorker, ringBuffer);
ringBuffer);
thread.start();
threads.add(thread);
}
......
......@@ -60,14 +60,11 @@ class TCPReaderOneClient extends Thread {
private String remoteAddress = "";
private final boolean isWorker;
private Trace currentlyOpenTrace;
public TCPReaderOneClient(final SocketChannel socketChannel, final boolean isWorker,
public TCPReaderOneClient(final SocketChannel socketChannel,
final RingBuffer<RecordArrayEvent> ringBuffer) {
this.socketChannel = socketChannel;
this.isWorker = isWorker;
this.ringBuffer = ringBuffer;
}
......@@ -180,24 +177,10 @@ class TCPReaderOneClient extends Thread {
break;
}
case SystemMonitoringRecord.CLAZZ_ID: {
// if (isWorker) {
if (buffer.remaining() >= SystemMonitoringRecord.COMPRESSED_BYTE_LENGTH) {
readInSystemMonitoringRecord(buffer);
break;
}
// } else {
// if (buffer.remaining() >=
// SystemMonitoringRecord.BYTE_LENGTH) {
// try {
// putInRingBuffer(SystemMonitoringRecord.createFromByteBuffer(
// buffer, stringRegistry));
// } catch (final IdNotAvailableException e) {
// // should not happen
// e.printStackTrace();
// }
// break;
// }
// }
buffer.position(buffer.position() - 1);
buffer.compact();
return;
......@@ -339,6 +322,21 @@ class TCPReaderOneClient extends Thread {
buffer.compact();
return;
}
case SystemMonitoringRecord.CLAZZ_ID_FROM_WORKER: {
if (buffer.remaining() >= SystemMonitoringRecord.BYTE_LENGTH) {
try {
putInRingBuffer(SystemMonitoringRecord.createFromByteBuffer(buffer,
stringRegistry));
} catch (final IdNotAvailableException e) {
// should not happen
e.printStackTrace();
}
break;
}
buffer.position(buffer.position() - 1);
buffer.compact();
return;
}
default: {
System.out.println("unknown class id " + clazzId + " at offset "
+ (buffer.position() - 1));
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment