diff --git a/Start Worker.launch b/Start Worker.launch index 661ab2a7bea5920411817a8d393f11c5d075daf1..7e358d3fa62c5a5cc236d6e6a6c8987feb0f16c8 100644 --- a/Start Worker.launch +++ b/Start Worker.launch @@ -11,4 +11,5 @@ </listAttribute> <stringAttribute key="org.eclipse.jdt.launching.MAIN_TYPE" value="explorviz.worker.main.WorkerStarter"/> <stringAttribute key="org.eclipse.jdt.launching.PROJECT_ATTR" value="worker"/> +<stringAttribute key="org.eclipse.jdt.launching.VM_ARGUMENTS" value="-Xmx4G"/> </launchConfiguration> diff --git a/src/explorviz/hpc_monitoring/connector/TCPConnector.java b/src/explorviz/hpc_monitoring/connector/TCPConnector.java index f28ea5c19a1bd399b63dea0f97a37e30ee9a74ee..5a58a081e5e87b58dbac33f39b221764ef8b9229 100644 --- a/src/explorviz/hpc_monitoring/connector/TCPConnector.java +++ b/src/explorviz/hpc_monitoring/connector/TCPConnector.java @@ -6,9 +6,9 @@ import java.net.Socket; import com.lmax.disruptor.EventHandler; -import explorviz.hpc_monitoring.disruptor.ByteArrayEvent; +import explorviz.hpc_monitoring.disruptor.Byte32ArrayEvent; -public class TCPConnector implements EventHandler<ByteArrayEvent> { +public class TCPConnector implements EventHandler<Byte32ArrayEvent> { private static final int MESSAGE_BUFFER_SIZE = 65536; private String providerUrl; @@ -76,7 +76,7 @@ public class TCPConnector implements EventHandler<ByteArrayEvent> { } @Override - public void onEvent(final ByteArrayEvent event, final long sequence, + public void onEvent(final Byte32ArrayEvent event, final long sequence, final boolean endOfBatch) throws Exception { sendMessage(event.getValue(), event.getLength()); } diff --git a/src/explorviz/hpc_monitoring/reader/MessageDistributer.java b/src/explorviz/hpc_monitoring/reader/MessageDistributer.java index 3d6a6ad80d6aa633469ef292d383c611f4befc6a..4257f65da51ec0b8a39c944dcd94c09514012480 100644 --- a/src/explorviz/hpc_monitoring/reader/MessageDistributer.java +++ b/src/explorviz/hpc_monitoring/reader/MessageDistributer.java @@ -10,10 +10,9 @@ import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.dsl.Disruptor; import explorviz.hpc_monitoring.byteaccess.UnsafeBits; -import explorviz.hpc_monitoring.disruptor.ByteArrayEvent; +import explorviz.hpc_monitoring.disruptor.Byte65536ArrayEvent; import explorviz.hpc_monitoring.disruptor.RecordEvent; import explorviz.hpc_monitoring.filter.counting.CountingThroughputFilter; -import explorviz.hpc_monitoring.filter.reconstruction.TraceReconstructionFilter; import explorviz.hpc_monitoring.record.IRecord; import explorviz.hpc_monitoring.record.TraceMetadata; import explorviz.hpc_monitoring.record.events.normal.AfterFailedOperationEvent; @@ -21,7 +20,7 @@ import explorviz.hpc_monitoring.record.events.normal.AfterOperationEvent; import explorviz.hpc_monitoring.record.events.normal.BeforeOperationEvent; import gnu.trove.map.hash.TIntObjectHashMap; -public class MessageDistributer implements EventHandler<ByteArrayEvent> { +public class MessageDistributer implements EventHandler<Byte65536ArrayEvent> { private static final CountingThroughputFilter counter = new CountingThroughputFilter( "Records per second"); @@ -42,15 +41,11 @@ public class MessageDistributer implements EventHandler<ByteArrayEvent> { final Disruptor<RecordEvent> disruptor = new Disruptor<RecordEvent>( RecordEvent.EVENT_FACTORY, 32768, exec); - final EventHandler<RecordEvent>[] eventHandlers = new EventHandler[1]; - eventHandlers[0] = new TraceReconstructionFilter( - 1 * 1000 * 1000 * 1000, endReceiver); - disruptor.handleEventsWith(eventHandlers); ringBuffer = disruptor.start(); } @Override - public void onEvent(final ByteArrayEvent event, final long sequence, + public void onEvent(final Byte65536ArrayEvent event, final long sequence, final boolean endOfBatch) throws Exception { final byte[] received = event.getValue(); final int receivedLength = event.getLength(); @@ -275,10 +270,10 @@ public class MessageDistributer implements EventHandler<ByteArrayEvent> { private void putInRingBuffer(final IRecord record) { counter.inputObjects(record); - final long hiseq = ringBuffer.next(); - final RecordEvent valueEvent = ringBuffer.get(hiseq); - valueEvent.setValue(record); - ringBuffer.publish(hiseq); + // final long hiseq = ringBuffer.next(); + // final RecordEvent valueEvent = ringBuffer.get(hiseq); + // valueEvent.setValue(record); + // ringBuffer.publish(hiseq); } public void addToRegistry(final int key, final String value) { diff --git a/src/explorviz/hpc_monitoring/reader/TCPReader.java b/src/explorviz/hpc_monitoring/reader/TCPReader.java index 2bc2861a8a405e111da4da8e932766f6143b09da..da5693682ed18f06233ee25ea2b7a908dde4c134 100644 --- a/src/explorviz/hpc_monitoring/reader/TCPReader.java +++ b/src/explorviz/hpc_monitoring/reader/TCPReader.java @@ -12,18 +12,18 @@ import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.dsl.Disruptor; -import explorviz.hpc_monitoring.disruptor.ByteArrayEvent; +import explorviz.hpc_monitoring.disruptor.Byte65536ArrayEvent; import explorviz.hpc_monitoring.disruptor.RecordEvent; public final class TCPReader { - private static final int MESSAGE_BUFFER_SIZE = 65536; + private static final int MESSAGE_BUFFER_SIZE = 131072; private final int listeningPort; private ServerSocketChannel serversocket; private boolean active = true; - private final RingBuffer<ByteArrayEvent> ringBuffer; + private final RingBuffer<Byte65536ArrayEvent> ringBuffer; private final ByteBuffer buffer; @@ -35,11 +35,11 @@ public final class TCPReader { buffer = ByteBuffer.allocateDirect(MESSAGE_BUFFER_SIZE); final ExecutorService exec = Executors.newCachedThreadPool(); - final Disruptor<ByteArrayEvent> disruptor = new Disruptor<ByteArrayEvent>( - ByteArrayEvent.EVENT_FACTORY, 4096, exec); + final Disruptor<Byte65536ArrayEvent> disruptor = new Disruptor<Byte65536ArrayEvent>( + Byte65536ArrayEvent.EVENT_FACTORY, 32, exec); @SuppressWarnings("unchecked") - final EventHandler<ByteArrayEvent>[] eventHandlers = new EventHandler[1]; + final EventHandler<Byte65536ArrayEvent>[] eventHandlers = new EventHandler[1]; eventHandlers[0] = new MessageDistributer(endReceiver); disruptor.handleEventsWith(eventHandlers); ringBuffer = disruptor.start(); @@ -81,7 +81,7 @@ public final class TCPReader { private void putInRingBuffer(final byte[] messages, final int readBytes) { final long hiseq = ringBuffer.next(); - final ByteArrayEvent valueEvent = ringBuffer.get(hiseq); + final Byte65536ArrayEvent valueEvent = ringBuffer.get(hiseq); valueEvent.setValue(messages); valueEvent.setLength(readBytes); ringBuffer.publish(hiseq);