diff --git a/conf/logging.properties b/conf/logging.properties index ff701c0055e0cbafe3e5dbf6c53849ef3b755be5..5f13ed5cdb89ca93fe341c97b9b64967cb7ca147 100644 --- a/conf/logging.properties +++ b/conf/logging.properties @@ -11,3 +11,4 @@ java.util.logging.SimpleFormatter.format=[%1$tF %1$tr] %4$s: %5$s (%2$s)%n #teetime.variant.methodcallWithPorts.framework.core.level = FINE #teetime.variant.methodcallWithPorts.stage.level = INFO #teetime.variant.methodcallWithPorts.examples.traceReconstructionWithThreads.level = FINE +teetime.variant.methodcallWithPorts.examples.kiekerdays.level = FINE diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TCPReaderSink.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TCPReaderSink.java index db20af71ea385a1d5ec32fe8ffbdfd6acc4402f0..1f244d8a332fce0a68f9a75b1bf27ee2610daed2 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TCPReaderSink.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TCPReaderSink.java @@ -22,6 +22,9 @@ import java.nio.ByteBuffer; import java.nio.channels.ClosedByInterruptException; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import teetime.variant.methodcallWithPorts.framework.core.ProducerStage; @@ -51,6 +54,13 @@ public class TCPReaderSink extends ProducerStage<Void, IMonitoringRecord> { private TCPStringReader tcpStringReader; + private final AtomicInteger counter = new AtomicInteger(0); + private final ScheduledThreadPoolExecutor executorService; + + public TCPReaderSink() { + this.executorService = new ScheduledThreadPoolExecutor(1); + } + public final int getPort1() { return this.port1; } @@ -69,6 +79,13 @@ public class TCPReaderSink extends ProducerStage<Void, IMonitoringRecord> { @Override public void onStart() { + this.executorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + System.out.println("Records/s: " + TCPReaderSink.this.counter.getAndSet(0)); + } + }, 0, 1, TimeUnit.SECONDS); + this.tcpStringReader = new TCPStringReader(this.port2, this.stringRegistry); this.tcpStringReader.start(); super.onStart(); @@ -99,6 +116,7 @@ public class TCPReaderSink extends ProducerStage<Void, IMonitoringRecord> { record = AbstractMonitoringRecord.createFromByteBuffer(clazzid, buffer, this.stringRegistry); record.setLoggingTimestamp(loggingTimestamp); // this.send(record); + this.counter.incrementAndGet(); } catch (final MonitoringRecordException ex) { super.logger.error("Failed to create record.", ex); } @@ -127,10 +145,16 @@ public class TCPReaderSink extends ProducerStage<Void, IMonitoringRecord> { } this.setReschedulable(false); - this.tcpStringReader.terminate(); } } + @Override + public void onIsPipelineHead() { + this.executorService.shutdown(); + this.tcpStringReader.interrupt(); + super.onIsPipelineHead(); + } + /** * * @author Jan Waller @@ -145,22 +169,14 @@ public class TCPReaderSink extends ProducerStage<Void, IMonitoringRecord> { private final int port; private final ILookup<String> stringRegistry; - private volatile boolean terminated = false; // NOPMD - private volatile Thread readerThread; public TCPStringReader(final int port, final ILookup<String> stringRegistry) { this.port = port; this.stringRegistry = stringRegistry; } - public void terminate() { - this.terminated = true; - this.readerThread.interrupt(); - } - @Override public void run() { - this.readerThread = Thread.currentThread(); ServerSocketChannel serversocket = null; try { serversocket = ServerSocketChannel.open(); @@ -171,7 +187,7 @@ public class TCPReaderSink extends ProducerStage<Void, IMonitoringRecord> { // BEGIN also loop this one? final SocketChannel socketChannel = serversocket.accept(); final ByteBuffer buffer = ByteBuffer.allocateDirect(MESSAGE_BUFFER_SIZE); - while ((socketChannel.read(buffer) != -1) && (!this.terminated)) { + while ((socketChannel.read(buffer) != -1)) { buffer.flip(); try { while (buffer.hasRemaining()) { @@ -201,6 +217,8 @@ public class TCPReaderSink extends ProducerStage<Void, IMonitoringRecord> { } } } + + LOG.debug("StringRegistryReader thread has finished."); } } }