Skip to content
Snippets Groups Projects
Commit 0491b4ca authored by Christian Wulf's avatar Christian Wulf
Browse files

added throughput information to TCPReaderSink

parent e58e15e3
No related branches found
No related tags found
No related merge requests found
...@@ -11,3 +11,4 @@ java.util.logging.SimpleFormatter.format=[%1$tF %1$tr] %4$s: %5$s (%2$s)%n ...@@ -11,3 +11,4 @@ java.util.logging.SimpleFormatter.format=[%1$tF %1$tr] %4$s: %5$s (%2$s)%n
#teetime.variant.methodcallWithPorts.framework.core.level = FINE #teetime.variant.methodcallWithPorts.framework.core.level = FINE
#teetime.variant.methodcallWithPorts.stage.level = INFO #teetime.variant.methodcallWithPorts.stage.level = INFO
#teetime.variant.methodcallWithPorts.examples.traceReconstructionWithThreads.level = FINE #teetime.variant.methodcallWithPorts.examples.traceReconstructionWithThreads.level = FINE
teetime.variant.methodcallWithPorts.examples.kiekerdays.level = FINE
...@@ -22,6 +22,9 @@ import java.nio.ByteBuffer; ...@@ -22,6 +22,9 @@ import java.nio.ByteBuffer;
import java.nio.channels.ClosedByInterruptException; import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.ServerSocketChannel; import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import teetime.variant.methodcallWithPorts.framework.core.ProducerStage; import teetime.variant.methodcallWithPorts.framework.core.ProducerStage;
...@@ -51,6 +54,13 @@ public class TCPReaderSink extends ProducerStage<Void, IMonitoringRecord> { ...@@ -51,6 +54,13 @@ public class TCPReaderSink extends ProducerStage<Void, IMonitoringRecord> {
private TCPStringReader tcpStringReader; private TCPStringReader tcpStringReader;
private final AtomicInteger counter = new AtomicInteger(0);
private final ScheduledThreadPoolExecutor executorService;
public TCPReaderSink() {
this.executorService = new ScheduledThreadPoolExecutor(1);
}
public final int getPort1() { public final int getPort1() {
return this.port1; return this.port1;
} }
...@@ -69,6 +79,13 @@ public class TCPReaderSink extends ProducerStage<Void, IMonitoringRecord> { ...@@ -69,6 +79,13 @@ public class TCPReaderSink extends ProducerStage<Void, IMonitoringRecord> {
@Override @Override
public void onStart() { public void onStart() {
this.executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
System.out.println("Records/s: " + TCPReaderSink.this.counter.getAndSet(0));
}
}, 0, 1, TimeUnit.SECONDS);
this.tcpStringReader = new TCPStringReader(this.port2, this.stringRegistry); this.tcpStringReader = new TCPStringReader(this.port2, this.stringRegistry);
this.tcpStringReader.start(); this.tcpStringReader.start();
super.onStart(); super.onStart();
...@@ -99,6 +116,7 @@ public class TCPReaderSink extends ProducerStage<Void, IMonitoringRecord> { ...@@ -99,6 +116,7 @@ public class TCPReaderSink extends ProducerStage<Void, IMonitoringRecord> {
record = AbstractMonitoringRecord.createFromByteBuffer(clazzid, buffer, this.stringRegistry); record = AbstractMonitoringRecord.createFromByteBuffer(clazzid, buffer, this.stringRegistry);
record.setLoggingTimestamp(loggingTimestamp); record.setLoggingTimestamp(loggingTimestamp);
// this.send(record); // this.send(record);
this.counter.incrementAndGet();
} catch (final MonitoringRecordException ex) { } catch (final MonitoringRecordException ex) {
super.logger.error("Failed to create record.", ex); super.logger.error("Failed to create record.", ex);
} }
...@@ -127,10 +145,16 @@ public class TCPReaderSink extends ProducerStage<Void, IMonitoringRecord> { ...@@ -127,10 +145,16 @@ public class TCPReaderSink extends ProducerStage<Void, IMonitoringRecord> {
} }
this.setReschedulable(false); this.setReschedulable(false);
this.tcpStringReader.terminate();
} }
} }
@Override
public void onIsPipelineHead() {
this.executorService.shutdown();
this.tcpStringReader.interrupt();
super.onIsPipelineHead();
}
/** /**
* *
* @author Jan Waller * @author Jan Waller
...@@ -145,22 +169,14 @@ public class TCPReaderSink extends ProducerStage<Void, IMonitoringRecord> { ...@@ -145,22 +169,14 @@ public class TCPReaderSink extends ProducerStage<Void, IMonitoringRecord> {
private final int port; private final int port;
private final ILookup<String> stringRegistry; private final ILookup<String> stringRegistry;
private volatile boolean terminated = false; // NOPMD
private volatile Thread readerThread;
public TCPStringReader(final int port, final ILookup<String> stringRegistry) { public TCPStringReader(final int port, final ILookup<String> stringRegistry) {
this.port = port; this.port = port;
this.stringRegistry = stringRegistry; this.stringRegistry = stringRegistry;
} }
public void terminate() {
this.terminated = true;
this.readerThread.interrupt();
}
@Override @Override
public void run() { public void run() {
this.readerThread = Thread.currentThread();
ServerSocketChannel serversocket = null; ServerSocketChannel serversocket = null;
try { try {
serversocket = ServerSocketChannel.open(); serversocket = ServerSocketChannel.open();
...@@ -171,7 +187,7 @@ public class TCPReaderSink extends ProducerStage<Void, IMonitoringRecord> { ...@@ -171,7 +187,7 @@ public class TCPReaderSink extends ProducerStage<Void, IMonitoringRecord> {
// BEGIN also loop this one? // BEGIN also loop this one?
final SocketChannel socketChannel = serversocket.accept(); final SocketChannel socketChannel = serversocket.accept();
final ByteBuffer buffer = ByteBuffer.allocateDirect(MESSAGE_BUFFER_SIZE); final ByteBuffer buffer = ByteBuffer.allocateDirect(MESSAGE_BUFFER_SIZE);
while ((socketChannel.read(buffer) != -1) && (!this.terminated)) { while ((socketChannel.read(buffer) != -1)) {
buffer.flip(); buffer.flip();
try { try {
while (buffer.hasRemaining()) { while (buffer.hasRemaining()) {
...@@ -201,6 +217,8 @@ public class TCPReaderSink extends ProducerStage<Void, IMonitoringRecord> { ...@@ -201,6 +217,8 @@ public class TCPReaderSink extends ProducerStage<Void, IMonitoringRecord> {
} }
} }
} }
LOG.debug("StringRegistryReader thread has finished.");
} }
} }
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment