From 0491b4ca4c7c8f2692b26dbabaf93c5201ce89e9 Mon Sep 17 00:00:00 2001
From: Christian Wulf <chw@informatik.uni-kiel.de>
Date: Sat, 12 Jul 2014 10:06:18 +0200
Subject: [PATCH] added throughput information to TCPReaderSink
---
conf/logging.properties | 1 +
.../examples/kiekerdays/TCPReaderSink.java | 38 ++++++++++++++-----
2 files changed, 29 insertions(+), 10 deletions(-)
diff --git a/conf/logging.properties b/conf/logging.properties
index ff701c00..5f13ed5c 100644
--- a/conf/logging.properties
+++ b/conf/logging.properties
@@ -11,3 +11,4 @@ java.util.logging.SimpleFormatter.format=[%1$tF %1$tr] %4$s: %5$s (%2$s)%n
#teetime.variant.methodcallWithPorts.framework.core.level = FINE
#teetime.variant.methodcallWithPorts.stage.level = INFO
#teetime.variant.methodcallWithPorts.examples.traceReconstructionWithThreads.level = FINE
+teetime.variant.methodcallWithPorts.examples.kiekerdays.level = FINE
diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TCPReaderSink.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TCPReaderSink.java
index db20af71..1f244d8a 100644
--- a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TCPReaderSink.java
+++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TCPReaderSink.java
@@ -22,6 +22,9 @@ import java.nio.ByteBuffer;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import teetime.variant.methodcallWithPorts.framework.core.ProducerStage;
@@ -51,6 +54,13 @@ public class TCPReaderSink extends ProducerStage<Void, IMonitoringRecord> {
private TCPStringReader tcpStringReader;
+ private final AtomicInteger counter = new AtomicInteger(0);
+ private final ScheduledThreadPoolExecutor executorService;
+
+ public TCPReaderSink() {
+ this.executorService = new ScheduledThreadPoolExecutor(1);
+ }
+
public final int getPort1() {
return this.port1;
}
@@ -69,6 +79,13 @@ public class TCPReaderSink extends ProducerStage<Void, IMonitoringRecord> {
@Override
public void onStart() {
+ this.executorService.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ System.out.println("Records/s: " + TCPReaderSink.this.counter.getAndSet(0));
+ }
+ }, 0, 1, TimeUnit.SECONDS);
+
this.tcpStringReader = new TCPStringReader(this.port2, this.stringRegistry);
this.tcpStringReader.start();
super.onStart();
@@ -99,6 +116,7 @@ public class TCPReaderSink extends ProducerStage<Void, IMonitoringRecord> {
record = AbstractMonitoringRecord.createFromByteBuffer(clazzid, buffer, this.stringRegistry);
record.setLoggingTimestamp(loggingTimestamp);
// this.send(record);
+ this.counter.incrementAndGet();
} catch (final MonitoringRecordException ex) {
super.logger.error("Failed to create record.", ex);
}
@@ -127,10 +145,16 @@ public class TCPReaderSink extends ProducerStage<Void, IMonitoringRecord> {
}
this.setReschedulable(false);
- this.tcpStringReader.terminate();
}
}
+ @Override
+ public void onIsPipelineHead() {
+ this.executorService.shutdown();
+ this.tcpStringReader.interrupt();
+ super.onIsPipelineHead();
+ }
+
/**
*
* @author Jan Waller
@@ -145,22 +169,14 @@ public class TCPReaderSink extends ProducerStage<Void, IMonitoringRecord> {
private final int port;
private final ILookup<String> stringRegistry;
- private volatile boolean terminated = false; // NOPMD
- private volatile Thread readerThread;
public TCPStringReader(final int port, final ILookup<String> stringRegistry) {
this.port = port;
this.stringRegistry = stringRegistry;
}
- public void terminate() {
- this.terminated = true;
- this.readerThread.interrupt();
- }
-
@Override
public void run() {
- this.readerThread = Thread.currentThread();
ServerSocketChannel serversocket = null;
try {
serversocket = ServerSocketChannel.open();
@@ -171,7 +187,7 @@ public class TCPReaderSink extends ProducerStage<Void, IMonitoringRecord> {
// BEGIN also loop this one?
final SocketChannel socketChannel = serversocket.accept();
final ByteBuffer buffer = ByteBuffer.allocateDirect(MESSAGE_BUFFER_SIZE);
- while ((socketChannel.read(buffer) != -1) && (!this.terminated)) {
+ while ((socketChannel.read(buffer) != -1)) {
buffer.flip();
try {
while (buffer.hasRemaining()) {
@@ -201,6 +217,8 @@ public class TCPReaderSink extends ProducerStage<Void, IMonitoringRecord> {
}
}
}
+
+ LOG.debug("StringRegistryReader thread has finished.");
}
}
}
--
GitLab