Skip to content
Snippets Groups Projects
Commit 107d0a7f authored by Florian Fittkau's avatar Florian Fittkau
Browse files

byte buffer tests

parent fa9c15e0
No related branches found
No related tags found
No related merge requests found
...@@ -16,7 +16,8 @@ public final class CountingThroughputFilter { ...@@ -16,7 +16,8 @@ public final class CountingThroughputFilter {
intervalSize = 1 * 1000 * 1000 * 1000; intervalSize = 1 * 1000 * 1000 * 1000;
} }
private void processEvent(final Object event, final long currentTime) { private void processEvent(final Object event, final long currentTime,
final int increment) {
final long startOfTimestampsInterval = computeFirstTimestampInInterval(currentTime); final long startOfTimestampsInterval = computeFirstTimestampInInterval(currentTime);
final long endOfTimestampsInterval = computeLastTimestampInInterval(currentTime); final long endOfTimestampsInterval = computeLastTimestampInInterval(currentTime);
...@@ -31,11 +32,15 @@ public final class CountingThroughputFilter { ...@@ -31,11 +32,15 @@ public final class CountingThroughputFilter {
currentCountForCurrentInterval = 0; currentCountForCurrentInterval = 0;
} }
currentCountForCurrentInterval++; currentCountForCurrentInterval += increment;
} }
public final void inputObjects(final Object object) { public final void inputObjects(final Object object) {
processEvent(object, System.currentTimeMillis() * 1000 * 1000); processEvent(object, System.currentTimeMillis() * 1000 * 1000, 1);
}
public final void inputObjectsCount(final int object) {
processEvent(object, System.currentTimeMillis() * 1000 * 1000, object);
} }
private long computeFirstTimestampInInterval(final long timestamp) { private long computeFirstTimestampInInterval(final long timestamp) {
......
...@@ -30,7 +30,7 @@ public class MessageDistributer implements EventHandler<ByteArrayEvent> { ...@@ -30,7 +30,7 @@ public class MessageDistributer implements EventHandler<ByteArrayEvent> {
private final List<byte[]> waitingForStringMessages = new ArrayList<byte[]>( private final List<byte[]> waitingForStringMessages = new ArrayList<byte[]>(
1024); 1024);
private byte[] unreadBytes = null; private final byte[] unreadBytes = null;
private final RingBuffer<RecordEvent> ringBuffer; private final RingBuffer<RecordEvent> ringBuffer;
...@@ -50,24 +50,26 @@ public class MessageDistributer implements EventHandler<ByteArrayEvent> { ...@@ -50,24 +50,26 @@ public class MessageDistributer implements EventHandler<ByteArrayEvent> {
@Override @Override
public void onEvent(final ByteArrayEvent event, final long sequence, public void onEvent(final ByteArrayEvent event, final long sequence,
final boolean endOfBatch) throws Exception { final boolean endOfBatch) throws Exception {
final byte[] received = event.getValue(); counter.inputObjectsCount(event.getLength() / 28);
final int receivedLength = event.getLength();
// final byte[] received = event.getValue();
byte[] messages = received; // final int receivedLength = event.getLength();
int messagesLength = receivedLength; //
// byte[] messages = received;
if (unreadBytes != null) { // int messagesLength = receivedLength;
final int unreadBytesLength = unreadBytes.length; //
// if (unreadBytes != null) {
messagesLength += unreadBytesLength; // final int unreadBytesLength = unreadBytes.length;
messages = new byte[messagesLength]; //
// messagesLength += unreadBytesLength;
System.arraycopy(unreadBytes, 0, messages, 0, unreadBytesLength); // messages = new byte[messagesLength];
System.arraycopy(received, 0, messages, unreadBytesLength, //
receivedLength); // System.arraycopy(unreadBytes, 0, messages, 0, unreadBytesLength);
} // System.arraycopy(received, 0, messages, unreadBytesLength,
// receivedLength);
unreadBytes = messagesfromByteArray(messages, messagesLength); // }
//
// unreadBytes = messagesfromByteArray(messages, messagesLength);
} }
private byte[] messagesfromByteArray(final byte[] b, final int readSize) { private byte[] messagesfromByteArray(final byte[] b, final int readSize) {
...@@ -268,10 +270,10 @@ public class MessageDistributer implements EventHandler<ByteArrayEvent> { ...@@ -268,10 +270,10 @@ public class MessageDistributer implements EventHandler<ByteArrayEvent> {
private void putInRingBuffer(final IRecord record) { private void putInRingBuffer(final IRecord record) {
counter.inputObjects(record); counter.inputObjects(record);
final long hiseq = ringBuffer.next(); // final long hiseq = ringBuffer.next();
final RecordEvent valueEvent = ringBuffer.get(hiseq); // final RecordEvent valueEvent = ringBuffer.get(hiseq);
valueEvent.setValue(record); // valueEvent.setValue(record);
ringBuffer.publish(hiseq); // ringBuffer.publish(hiseq);
} }
public void addToRegistry(final int key, final String value) { public void addToRegistry(final int key, final String value) {
......
package explorviz.hpc_monitoring.reader; package explorviz.hpc_monitoring.reader;
import java.io.BufferedInputStream;
import java.io.IOException; import java.io.IOException;
import java.net.ServerSocket; import java.net.InetSocketAddress;
import java.net.Socket; import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
...@@ -12,20 +13,24 @@ import com.lmax.disruptor.RingBuffer; ...@@ -12,20 +13,24 @@ import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.Disruptor;
public final class TCPReader { public final class TCPReader {
private static final int MESSAGE_BUFFER_SIZE = 65536; private static final int MESSAGE_BUFFER_SIZE = 65536 * 2;
private final int listeningPort; private final int listeningPort;
private ServerSocket serversocket; private ServerSocketChannel serversocket;
private boolean active = true; private boolean active = true;
private final RingBuffer<ByteArrayEvent> ringBuffer; private final RingBuffer<ByteArrayEvent> ringBuffer;
private final ByteBuffer buffer;
public TCPReader(final int listeningPort, public TCPReader(final int listeningPort,
final EventHandler<RecordEvent> endReceiver) final EventHandler<RecordEvent> endReceiver)
throws IllegalArgumentException { throws IllegalArgumentException {
this.listeningPort = listeningPort; this.listeningPort = listeningPort;
buffer = ByteBuffer.allocate(MESSAGE_BUFFER_SIZE);
final ExecutorService exec = Executors.newCachedThreadPool(); final ExecutorService exec = Executors.newCachedThreadPool();
final Disruptor<ByteArrayEvent> disruptor = new Disruptor<ByteArrayEvent>( final Disruptor<ByteArrayEvent> disruptor = new Disruptor<ByteArrayEvent>(
ByteArrayEvent.EVENT_FACTORY, 4096, exec); ByteArrayEvent.EVENT_FACTORY, 4096, exec);
...@@ -42,18 +47,17 @@ public final class TCPReader { ...@@ -42,18 +47,17 @@ public final class TCPReader {
open(); open();
while (active) { while (active) {
// TODO only one connection! // TODO only one connection!
final Socket socket = serversocket.accept(); final SocketChannel socketChannel = serversocket.accept();
final BufferedInputStream bufferedInputStream = new BufferedInputStream(
socket.getInputStream(), MESSAGE_BUFFER_SIZE);
int readBytes = 0; int readBytes = 0;
byte[] messages = new byte[MESSAGE_BUFFER_SIZE]; while ((readBytes = socketChannel.read(buffer)) != -1) {
while ((readBytes = bufferedInputStream.read(messages, 0, // final byte[] messages = new byte[MESSAGE_BUFFER_SIZE];
MESSAGE_BUFFER_SIZE)) != -1) { // System.arraycopy(buffer.array(), 0, messages, 0,
putInRingBuffer(messages, readBytes); // buffer.position());
messages = new byte[MESSAGE_BUFFER_SIZE]; putInRingBuffer(null, readBytes);
buffer.clear();
} }
socket.close(); serversocket.close();
} }
} catch (final IOException ex) { } catch (final IOException ex) {
System.out.println("Error in read() " + ex.toString()); System.out.println("Error in read() " + ex.toString());
...@@ -67,7 +71,8 @@ public final class TCPReader { ...@@ -67,7 +71,8 @@ public final class TCPReader {
} }
private void open() throws IOException { private void open() throws IOException {
serversocket = new ServerSocket(listeningPort); serversocket = ServerSocketChannel.open();
serversocket.socket().bind(new InetSocketAddress(listeningPort));
System.out.println("listening on port " + listeningPort); System.out.println("listening on port " + listeningPort);
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment