Skip to content
Snippets Groups Projects
Commit d4b7cc36 authored by Florian Fittkau's avatar Florian Fittkau
Browse files

Records per second: 28,60 millions

parent 3effe994
No related branches found
No related tags found
No related merge requests found
......@@ -11,4 +11,5 @@
</listAttribute>
<stringAttribute key="org.eclipse.jdt.launching.MAIN_TYPE" value="explorviz.worker.main.WorkerStarter"/>
<stringAttribute key="org.eclipse.jdt.launching.PROJECT_ATTR" value="worker"/>
<stringAttribute key="org.eclipse.jdt.launching.VM_ARGUMENTS" value="-Xmx4G"/>
</launchConfiguration>
......@@ -6,9 +6,9 @@ import java.net.Socket;
import com.lmax.disruptor.EventHandler;
import explorviz.hpc_monitoring.disruptor.ByteArrayEvent;
import explorviz.hpc_monitoring.disruptor.Byte32ArrayEvent;
public class TCPConnector implements EventHandler<ByteArrayEvent> {
public class TCPConnector implements EventHandler<Byte32ArrayEvent> {
private static final int MESSAGE_BUFFER_SIZE = 65536;
private String providerUrl;
......@@ -76,7 +76,7 @@ public class TCPConnector implements EventHandler<ByteArrayEvent> {
}
@Override
public void onEvent(final ByteArrayEvent event, final long sequence,
public void onEvent(final Byte32ArrayEvent event, final long sequence,
final boolean endOfBatch) throws Exception {
sendMessage(event.getValue(), event.getLength());
}
......
......@@ -10,10 +10,9 @@ import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import explorviz.hpc_monitoring.byteaccess.UnsafeBits;
import explorviz.hpc_monitoring.disruptor.ByteArrayEvent;
import explorviz.hpc_monitoring.disruptor.Byte65536ArrayEvent;
import explorviz.hpc_monitoring.disruptor.RecordEvent;
import explorviz.hpc_monitoring.filter.counting.CountingThroughputFilter;
import explorviz.hpc_monitoring.filter.reconstruction.TraceReconstructionFilter;
import explorviz.hpc_monitoring.record.IRecord;
import explorviz.hpc_monitoring.record.TraceMetadata;
import explorviz.hpc_monitoring.record.events.normal.AfterFailedOperationEvent;
......@@ -21,7 +20,7 @@ import explorviz.hpc_monitoring.record.events.normal.AfterOperationEvent;
import explorviz.hpc_monitoring.record.events.normal.BeforeOperationEvent;
import gnu.trove.map.hash.TIntObjectHashMap;
public class MessageDistributer implements EventHandler<ByteArrayEvent> {
public class MessageDistributer implements EventHandler<Byte65536ArrayEvent> {
private static final CountingThroughputFilter counter = new CountingThroughputFilter(
"Records per second");
......@@ -42,15 +41,11 @@ public class MessageDistributer implements EventHandler<ByteArrayEvent> {
final Disruptor<RecordEvent> disruptor = new Disruptor<RecordEvent>(
RecordEvent.EVENT_FACTORY, 32768, exec);
final EventHandler<RecordEvent>[] eventHandlers = new EventHandler[1];
eventHandlers[0] = new TraceReconstructionFilter(
1 * 1000 * 1000 * 1000, endReceiver);
disruptor.handleEventsWith(eventHandlers);
ringBuffer = disruptor.start();
}
@Override
public void onEvent(final ByteArrayEvent event, final long sequence,
public void onEvent(final Byte65536ArrayEvent event, final long sequence,
final boolean endOfBatch) throws Exception {
final byte[] received = event.getValue();
final int receivedLength = event.getLength();
......@@ -275,10 +270,10 @@ public class MessageDistributer implements EventHandler<ByteArrayEvent> {
private void putInRingBuffer(final IRecord record) {
counter.inputObjects(record);
final long hiseq = ringBuffer.next();
final RecordEvent valueEvent = ringBuffer.get(hiseq);
valueEvent.setValue(record);
ringBuffer.publish(hiseq);
// final long hiseq = ringBuffer.next();
// final RecordEvent valueEvent = ringBuffer.get(hiseq);
// valueEvent.setValue(record);
// ringBuffer.publish(hiseq);
}
public void addToRegistry(final int key, final String value) {
......
......@@ -12,18 +12,18 @@ import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import explorviz.hpc_monitoring.disruptor.ByteArrayEvent;
import explorviz.hpc_monitoring.disruptor.Byte65536ArrayEvent;
import explorviz.hpc_monitoring.disruptor.RecordEvent;
public final class TCPReader {
private static final int MESSAGE_BUFFER_SIZE = 65536;
private static final int MESSAGE_BUFFER_SIZE = 131072;
private final int listeningPort;
private ServerSocketChannel serversocket;
private boolean active = true;
private final RingBuffer<ByteArrayEvent> ringBuffer;
private final RingBuffer<Byte65536ArrayEvent> ringBuffer;
private final ByteBuffer buffer;
......@@ -35,11 +35,11 @@ public final class TCPReader {
buffer = ByteBuffer.allocateDirect(MESSAGE_BUFFER_SIZE);
final ExecutorService exec = Executors.newCachedThreadPool();
final Disruptor<ByteArrayEvent> disruptor = new Disruptor<ByteArrayEvent>(
ByteArrayEvent.EVENT_FACTORY, 4096, exec);
final Disruptor<Byte65536ArrayEvent> disruptor = new Disruptor<Byte65536ArrayEvent>(
Byte65536ArrayEvent.EVENT_FACTORY, 32, exec);
@SuppressWarnings("unchecked")
final EventHandler<ByteArrayEvent>[] eventHandlers = new EventHandler[1];
final EventHandler<Byte65536ArrayEvent>[] eventHandlers = new EventHandler[1];
eventHandlers[0] = new MessageDistributer(endReceiver);
disruptor.handleEventsWith(eventHandlers);
ringBuffer = disruptor.start();
......@@ -81,7 +81,7 @@ public final class TCPReader {
private void putInRingBuffer(final byte[] messages, final int readBytes) {
final long hiseq = ringBuffer.next();
final ByteArrayEvent valueEvent = ringBuffer.get(hiseq);
final Byte65536ArrayEvent valueEvent = ringBuffer.get(hiseq);
valueEvent.setValue(messages);
valueEvent.setLength(readBytes);
ringBuffer.publish(hiseq);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment