Skip to content
Snippets Groups Projects
Commit 6a8700a3 authored by Florian Fittkau's avatar Florian Fittkau
Browse files

WiP

parent 195f5c7d
No related branches found
No related tags found
No related merge requests found
...@@ -4,6 +4,8 @@ import java.io.IOException; ...@@ -4,6 +4,8 @@ import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.channels.ServerSocketChannel; import java.nio.channels.ServerSocketChannel;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.slf4j.Logger; import org.slf4j.Logger;
...@@ -26,6 +28,8 @@ public final class TCPReader implements IPeriodicTimeSignalReceiver { ...@@ -26,6 +28,8 @@ public final class TCPReader implements IPeriodicTimeSignalReceiver {
private final Queue<IRecord> periodicSignalQueue; private final Queue<IRecord> periodicSignalQueue;
private final ExecutorService threadPool;
public TCPReader(final int listeningPort, final PipesMerger<IRecord> traceReconstructionMerger) { public TCPReader(final int listeningPort, final PipesMerger<IRecord> traceReconstructionMerger) {
this.listeningPort = listeningPort; this.listeningPort = listeningPort;
...@@ -33,6 +37,8 @@ public final class TCPReader implements IPeriodicTimeSignalReceiver { ...@@ -33,6 +37,8 @@ public final class TCPReader implements IPeriodicTimeSignalReceiver {
new TimeSignalReader(TimeUnit.SECONDS.toMillis(1), this).start(); new TimeSignalReader(TimeUnit.SECONDS.toMillis(1), this).start();
periodicSignalQueue = merger.registerProducer(); periodicSignalQueue = merger.registerProducer();
threadPool = Executors.newCachedThreadPool();
} }
@Override @Override
...@@ -50,7 +56,7 @@ public final class TCPReader implements IPeriodicTimeSignalReceiver { ...@@ -50,7 +56,7 @@ public final class TCPReader implements IPeriodicTimeSignalReceiver {
try { try {
open(); open();
while (active) { while (active) {
new TCPReaderOneClient(serversocket.accept(), merger).start(); threadPool.execute(new TCPReaderOneClient(serversocket.accept(), merger));
} }
} catch (final IOException ex) { } catch (final IOException ex) {
LOG.info("Error in read() " + ex.getMessage()); LOG.info("Error in read() " + ex.getMessage());
...@@ -72,8 +78,10 @@ public final class TCPReader implements IPeriodicTimeSignalReceiver { ...@@ -72,8 +78,10 @@ public final class TCPReader implements IPeriodicTimeSignalReceiver {
public final void terminate(final boolean error) { public final void terminate(final boolean error) {
LOG.info("Shutdown of TCPReader requested."); LOG.info("Shutdown of TCPReader requested.");
active = false; active = false;
// for (final TCPReaderOneClient thread : threads) { try {
// thread.terminate(); threadPool.awaitTermination(10, TimeUnit.SECONDS);
// } } catch (final InterruptedException e) {
}
threadPool.shutdown();
} }
} }
...@@ -34,7 +34,7 @@ import explorviz.live_trace_processing.record.misc.StringRegistryRecord; ...@@ -34,7 +34,7 @@ import explorviz.live_trace_processing.record.misc.StringRegistryRecord;
import explorviz.live_trace_processing.record.misc.SystemMonitoringRecord; import explorviz.live_trace_processing.record.misc.SystemMonitoringRecord;
import explorviz.live_trace_processing.record.trace.HostApplicationMetaDataRecord; import explorviz.live_trace_processing.record.trace.HostApplicationMetaDataRecord;
class TCPReaderOneClient extends Thread { class TCPReaderOneClient implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(TCPReaderOneClient.class); private static final Logger LOG = LoggerFactory.getLogger(TCPReaderOneClient.class);
...@@ -895,14 +895,12 @@ class TCPReaderOneClient extends Thread { ...@@ -895,14 +895,12 @@ class TCPReaderOneClient extends Thread {
return; return;
} }
System.out.println("waitingForStringMessages: " + waitingForStringMessages.size());
final List<byte[]> localWaitingList = new ArrayList<byte[]>(waitingForStringMessages); final List<byte[]> localWaitingList = new ArrayList<byte[]>(waitingForStringMessages);
waitingForStringMessages.clear(); waitingForStringMessages.clear();
for (final byte[] waitingMessage : localWaitingList) { for (final byte[] waitingMessage : localWaitingList) {
final ByteBuffer buffer = ByteBuffer.wrap(waitingMessage); final ByteBuffer buffer = ByteBuffer.wrap(waitingMessage);
final byte waitingMessageClazzId = buffer.get(); final byte waitingMessageClazzId = buffer.get();
System.out.println("waitingForStringMessages clazzId: " + waitingMessageClazzId);
switch (waitingMessageClazzId) { switch (waitingMessageClazzId) {
case HostApplicationMetaDataRecord.CLAZZ_ID: case HostApplicationMetaDataRecord.CLAZZ_ID:
readInHostApplicationMetaData(buffer); readInHostApplicationMetaData(buffer);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment