Skip to content
Snippets Groups Projects
Commit 98abb6bc authored by Florian Fittkau's avatar Florian Fittkau
Browse files

optimizations

parent d68fc103
No related branches found
No related tags found
No related merge requests found
......@@ -7,7 +7,6 @@
<attribute name="javadoc_location" value="jar:platform:/resource/worker/lib/disruptor-3.2.0-javadoc.jar!/"/>
</attributes>
</classpathentry>
<classpathentry kind="lib" path="lib/trove-3.0.3.jar"/>
<classpathentry combineaccessrules="false" kind="src" path="/common-monitoring"/>
<classpathentry kind="output" path="bin"/>
</classpath>
This diff is collapsed.
The Trove library is licensed under the Lesser GNU Public License,
which is included with the distribution in a file called LICENSE.txt.
Other license arrangements are possible, for a fee: contact
ericdf@users.sourceforge.net for terms/pricing.
The PrimeFinder and HashFunctions classes in Trove are subject to the
following license restrictions:
Copyright (c) 1999 CERN - European Organization for Nuclear Research.
Permission to use, copy, modify, distribute and sell this software and
its documentation for any purpose is hereby granted without fee,
provided that the above copyright notice appear in all copies and that
both that copyright notice and this permission notice appear in
supporting documentation. CERN makes no representations about the
suitability of this software for any purpose. It is provided "as is"
without expressed or implied warranty.
......@@ -35,9 +35,9 @@ public class TCPConnector implements EventHandler<ByteArrayEvent> {
socket.getOutputStream(), MESSAGE_BUFFER_SIZE);
}
public final void sendMessage(final byte[] message, final int length) {
public final void sendMessage(final byte[] message) {
try {
bufferedOutputStream.write(message, 0, length);
bufferedOutputStream.write(message);
// if (endOfBatch) {
// bufferedOutputStream.flush();
// }
......@@ -78,6 +78,6 @@ public class TCPConnector implements EventHandler<ByteArrayEvent> {
@Override
public void onEvent(final ByteArrayEvent event, final long sequence,
final boolean endOfBatch) throws Exception {
sendMessage(event.getValue(), event.getLength());
sendMessage(event.getValue());
}
}
......@@ -80,8 +80,8 @@ public class TraceBuffer {
}
public final boolean isInvalid() {
return (((maxOrderIndex + 1) != events.size()) || events.isEmpty()
|| (openEvents != 0) || (traceMetadata == null) || damaged);
return ((openEvents != 0) || ((maxOrderIndex + 1) != events.size())
|| events.isEmpty() || damaged);
}
public final Trace toTrace() {
......
package explorviz.hpc_monitoring.filter.reconstruction;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
......@@ -7,6 +9,7 @@ import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import explorviz.hpc_monitoring.disruptor.RecordArrayEvent;
import explorviz.hpc_monitoring.disruptor.RecordEvent;
import explorviz.hpc_monitoring.filter.counting.CountingThroughputFilter;
import explorviz.hpc_monitoring.filter.reduction.TracePatternSummarizationFilter;
......@@ -16,30 +19,32 @@ import explorviz.hpc_monitoring.record.HostApplicationMetadata;
import explorviz.hpc_monitoring.record.IRecord;
import explorviz.hpc_monitoring.record.Trace;
import explorviz.hpc_monitoring.record.events.AbstractOperationEvent;
import gnu.trove.iterator.TLongObjectIterator;
import gnu.trove.map.hash.TLongObjectHashMap;
public final class TraceReconstructionFilter implements
EventHandler<RecordEvent>, IPeriodicTimeSignalReceiver {
EventHandler<RecordArrayEvent>, IPeriodicTimeSignalReceiver {
private static final int OUTPUT_MESSAGE_BUFFER_SIZE = 256;
private static final CountingThroughputFilter counter = new CountingThroughputFilter(
"Reconstructed traces per second");
private final long maxTraceTimeout;
private final TLongObjectHashMap<TraceBuffer> traceId2trace = new TLongObjectHashMap<TraceBuffer>(
1024);
private final RingBuffer<RecordEvent> ringBuffer;
private final Map<Long, TraceBuffer> traceId2trace = new TreeMap<Long, TraceBuffer>();
private final RingBuffer<RecordArrayEvent> ringBuffer;
private IRecord[] outputBuffer = new IRecord[OUTPUT_MESSAGE_BUFFER_SIZE];
private int outputBufferIndex = 0;
@SuppressWarnings("unchecked")
public TraceReconstructionFilter(final long maxTraceTimeout,
final EventHandler<RecordEvent> endReceiver) {
this.maxTraceTimeout = maxTraceTimeout;
final ExecutorService exec = Executors.newCachedThreadPool();
final Disruptor<RecordEvent> disruptor = new Disruptor<RecordEvent>(
RecordEvent.EVENT_FACTORY, 16384, exec);
final Disruptor<RecordArrayEvent> disruptor = new Disruptor<RecordArrayEvent>(
RecordArrayEvent.EVENT_FACTORY, 32, exec);
final EventHandler<RecordEvent>[] eventHandlers = new EventHandler[1];
@SuppressWarnings("unchecked")
final EventHandler<RecordArrayEvent>[] eventHandlers = new EventHandler[1];
eventHandlers[0] = new TracePatternSummarizationFilter(1 * 1000,
endReceiver);
disruptor.handleEventsWith(eventHandlers);
......@@ -50,49 +55,71 @@ public final class TraceReconstructionFilter implements
@Override
public void periodicTimeSignal(final long timestamp) {
checkForTimeouts(timestamp);
checkForTimeouts(timestamp); // TODO comes from other thread -
// synchronize!
flushOutputBuffer();
}
private void checkForTimeouts(final long timestamp) {
final long traceTimeout = timestamp - maxTraceTimeout;
for (final TLongObjectIterator<TraceBuffer> iterator = traceId2trace
.iterator(); iterator.hasNext(); iterator.advance()) {
final TraceBuffer traceBuffer = iterator.value();
if (traceBuffer.getMaxLoggingTimestamp() <= traceTimeout) {
sendOutInvalidTrace(traceBuffer.toTrace());
iterator.remove();
}
}
// final long traceTimeout = timestamp - maxTraceTimeout;
// for (final TLongObjectIterator<TraceBuffer> iterator = traceId2trace
// .iterator(); iterator.hasNext(); iterator.advance()) {
// final TraceBuffer traceBuffer = iterator.value();
// if (traceBuffer.getMaxLoggingTimestamp() <= traceTimeout) {
// sendOutInvalidTrace(traceBuffer.toTrace());
// iterator.remove();
// }
// }
}
private void sendOutValidTrace(final Trace trace) {
counter.inputObjects(trace);
putInRingBuffer(trace);
}
private void sendOutInvalidTrace(final Trace trace) {
// counter.inputObjects(trace);
// putInRingBuffer(trace); // TODO
System.out.println("Invalid trace: "
+ trace.getTraceEvents()[0].getTraceId());
}
private void putInRingBuffer(final IRecord record) {
private void putInRingBuffer(final IRecord message) {
counter.inputObjects(message);
synchronized (this) { // TODO remove
outputBuffer[outputBufferIndex++] = message;
if (outputBufferIndex == OUTPUT_MESSAGE_BUFFER_SIZE) {
flushOutputBuffer();
}
}
}
private void flushOutputBuffer() {
synchronized (this) { // TODO remove
if (outputBufferIndex > 0) {
final long hiseq = ringBuffer.next();
final RecordEvent valueEvent = ringBuffer.get(hiseq);
valueEvent.setValue(record);
final RecordArrayEvent valueEvent = ringBuffer.get(hiseq);
valueEvent.setValues(outputBuffer);
ringBuffer.publish(hiseq);
outputBuffer = new IRecord[OUTPUT_MESSAGE_BUFFER_SIZE]; // TODO
// object
// reusage?
outputBufferIndex = 0;
}
}
}
@Override
public void onEvent(final RecordEvent event, final long sequence,
public void onEvent(final RecordArrayEvent event, final long sequence,
final boolean endOfBatch) throws Exception {
final IRecord record = event.getValue();
if (record instanceof AbstractOperationEvent) {
for (final IRecord record : event.getValues()) { // TODO save length in
// event
if (record != null) {
final AbstractOperationEvent abstractOperationEvent = ((AbstractOperationEvent) record);
final long traceId = abstractOperationEvent.getTraceId();
final TraceBuffer traceBuffer = getBufferForTraceId(traceId,
final TraceBuffer traceBuffer = getBufferForTraceId(
abstractOperationEvent.getTraceId(),
event.getMetadata());
traceBuffer.insertEvent(abstractOperationEvent);
......@@ -102,13 +129,15 @@ public final class TraceReconstructionFilter implements
}
}
}
}
private TraceBuffer getBufferForTraceId(final long traceId,
final HostApplicationMetadata metadata) {
TraceBuffer traceBuffer = traceId2trace.get(traceId);
if (traceBuffer == null) {
traceBuffer = new TraceBuffer();
traceBuffer.setTrace(metadata);
traceBuffer = new TraceBuffer(); // TODO dont create new - keep old
// ones and reset!
traceBuffer.setTrace(metadata); // TODO reuse...
traceId2trace.put(traceId, traceBuffer);
}
return traceBuffer;
......
......@@ -12,6 +12,7 @@ import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import explorviz.hpc_monitoring.disruptor.RecordArrayEvent;
import explorviz.hpc_monitoring.disruptor.RecordEvent;
import explorviz.hpc_monitoring.filter.counting.CountingThroughputFilter;
import explorviz.hpc_monitoring.reader.IPeriodicTimeSignalReceiver;
......@@ -21,7 +22,7 @@ import explorviz.hpc_monitoring.record.Trace;
import explorviz.hpc_monitoring.record.events.AbstractOperationEvent;
public class TracePatternSummarizationFilter implements
EventHandler<RecordEvent>, IPeriodicTimeSignalReceiver {
EventHandler<RecordArrayEvent>, IPeriodicTimeSignalReceiver {
private final long maxCollectionDuration;
private final Map<Trace, TraceAggregationBuffer> trace2buffer = new TreeMap<Trace, TraceAggregationBuffer>(
......@@ -38,7 +39,7 @@ public class TracePatternSummarizationFilter implements
final ExecutorService exec = Executors.newCachedThreadPool();
final Disruptor<RecordEvent> disruptor = new Disruptor<RecordEvent>(
RecordEvent.EVENT_FACTORY, 128, exec);
RecordEvent.EVENT_FACTORY, 256, exec);
@SuppressWarnings("unchecked")
final EventHandler<RecordEvent>[] eventHandlers = new EventHandler[1];
......@@ -83,11 +84,14 @@ public class TracePatternSummarizationFilter implements
}
@Override
public void onEvent(final RecordEvent event, final long sequence,
public void onEvent(final RecordArrayEvent event, final long sequence,
final boolean endOfBatch) throws Exception {
final IRecord value = event.getValue();
if (value instanceof Trace) {
insertIntoBuffer((Trace) value);
for (final IRecord record : event.getValues()) {
if (record != null) {
if (record instanceof Trace) {
insertIntoBuffer((Trace) record);
}
}
}
}
......@@ -119,11 +123,11 @@ public class TracePatternSummarizationFilter implements
return recordsT1.length - recordsT2.length;
}
final int cmpHostnames = t1.getTraceMetadata().getHostname()
.compareTo(t2.getTraceMetadata().getHostname());
if (cmpHostnames != 0) {
return cmpHostnames;
}
// final int cmpHostnames = t1.getTraceMetadata().getHostname()
// .compareTo(t2.getTraceMetadata().getHostname());
// if (cmpHostnames != 0) {
// return cmpHostnames;
// }
// TODO deep check records
return 0;
......
......@@ -8,6 +8,8 @@ import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
......@@ -15,6 +17,7 @@ import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import explorviz.hpc_monitoring.disruptor.RecordArrayEvent;
import explorviz.hpc_monitoring.disruptor.RecordEvent;
import explorviz.hpc_monitoring.filter.counting.CountingThroughputFilter;
import explorviz.hpc_monitoring.filter.reconstruction.TraceReconstructionFilter;
......@@ -23,53 +26,60 @@ import explorviz.hpc_monitoring.record.IRecord;
import explorviz.hpc_monitoring.record.events.normal.AfterFailedOperationEvent;
import explorviz.hpc_monitoring.record.events.normal.AfterOperationEvent;
import explorviz.hpc_monitoring.record.events.normal.BeforeOperationEvent;
import gnu.trove.map.hash.TIntObjectHashMap;
public final class TCPReader {
public final class TCPReader implements IPeriodicTimeSignalReceiver {
private static final int MESSAGE_BUFFER_SIZE = 131072;
private final int listeningPort;
private static final int OUTPUT_MESSAGE_BUFFER_SIZE = 16384;
private ServerSocketChannel serversocket;
// Settings
private final int listeningPort;
private boolean active = true;
private final RingBuffer<RecordEvent> ringBuffer;
private ServerSocketChannel serversocket;
private HostApplicationMetadata hostApplicationMetadata;
private final ByteBuffer buffer;
// Buffers
private final RingBuffer<RecordArrayEvent> ringBuffer;
private IRecord[] outputBuffer = new IRecord[OUTPUT_MESSAGE_BUFFER_SIZE];
private int outputBufferIndex = 0;
private static final CountingThroughputFilter counter = new CountingThroughputFilter(
"Records per second");
private final static TIntObjectHashMap<String> stringRegistry = new TIntObjectHashMap<String>(
64);
private final static Map<Integer, String> stringRegistry = new TreeMap<Integer, String>();
private final static List<byte[]> waitingForStringMessages = new ArrayList<byte[]>(
1024);
private HostApplicationMetadata hostApplicationMetadata;
public TCPReader(final int listeningPort,
final EventHandler<RecordEvent> endReceiver)
throws IllegalArgumentException {
final EventHandler<RecordEvent> endReceiver) {
this.listeningPort = listeningPort;
buffer = ByteBuffer.allocateDirect(MESSAGE_BUFFER_SIZE);
final ExecutorService exec = Executors.newCachedThreadPool();
final Disruptor<RecordEvent> disruptor = new Disruptor<RecordEvent>(
RecordEvent.EVENT_FACTORY, 32768, exec);
final Disruptor<RecordArrayEvent> disruptor = new Disruptor<RecordArrayEvent>(
RecordArrayEvent.EVENT_FACTORY, 16, exec);
@SuppressWarnings("unchecked")
final EventHandler<RecordEvent>[] eventHandlers = new EventHandler[1];
final EventHandler<RecordArrayEvent>[] eventHandlers = new EventHandler[1];
eventHandlers[0] = new TraceReconstructionFilter(1 * 1000 * 1000,
endReceiver);
disruptor.handleEventsWith(eventHandlers);
ringBuffer = disruptor.start();
new TimeReader(1 * 1000, this).start();
}
@Override
public void periodicTimeSignal(final long timestamp) {
flushOutputBuffer();
}
public void read() {
public final void read() {
try {
open();
while (active) {
// TODO only one connection!
final ByteBuffer buffer = ByteBuffer
.allocateDirect(MESSAGE_BUFFER_SIZE);
final SocketChannel socketChannel = serversocket.accept();
while ((socketChannel.read(buffer)) != -1) {
buffer.flip();
......@@ -89,27 +99,18 @@ public final class TCPReader {
}
}
private void open() throws IOException {
private final void open() throws IOException {
serversocket = ServerSocketChannel.open();
serversocket.socket().bind(new InetSocketAddress(listeningPort));
System.out.println("listening on port " + listeningPort);
}
private void putInRingBuffer(final IRecord message) {
counter.inputObjects(message);
final long hiseq = ringBuffer.next();
final RecordEvent valueEvent = ringBuffer.get(hiseq);
valueEvent.setValue(message);
valueEvent.setMetadata(hostApplicationMetadata);
ringBuffer.publish(hiseq);
}
public void terminate(final boolean error) {
public final void terminate(final boolean error) {
System.out.println("Shutdown of TCPReader requested.");
active = false;
}
public void messagesfromByteArray(final ByteBuffer buffer) {
private final void messagesfromByteArray(final ByteBuffer buffer) {
while (buffer.remaining() > 0) {
buffer.mark();
try {
......@@ -239,11 +240,39 @@ public final class TCPReader {
}
private void putInWaitingMessages(final byte[] message) {
private final void putInRingBuffer(final IRecord message) {
counter.inputObjects(message);
synchronized (this) { // TODO remove
outputBuffer[outputBufferIndex++] = message;
if (outputBufferIndex == OUTPUT_MESSAGE_BUFFER_SIZE) {
flushOutputBuffer();
}
}
}
private void flushOutputBuffer() {
synchronized (this) { // TODO remove
if (outputBufferIndex > 0) {
final long hiseq = ringBuffer.next();
final RecordArrayEvent valueEvent = ringBuffer.get(hiseq);
valueEvent.setValues(outputBuffer);
valueEvent.setMetadata(hostApplicationMetadata);
ringBuffer.publish(hiseq);
outputBuffer = new IRecord[OUTPUT_MESSAGE_BUFFER_SIZE]; // TODO
// object
// reusage?
outputBufferIndex = 0;
}
}
}
private final void putInWaitingMessages(final byte[] message) {
waitingForStringMessages.add(message);
}
private void checkWaitingMessages() {
private final void checkWaitingMessages() {
final List<byte[]> localWaitingList = new ArrayList<byte[]>();
for (final byte[] waitingMessage : waitingForStringMessages) {
localWaitingList.add(waitingMessage);
......@@ -272,7 +301,7 @@ public final class TCPReader {
}
}
public void addToRegistry(final int key, final String value) {
private final void addToRegistry(final int key, final String value) {
stringRegistry.put(key, value);
// System.out.println("put key " + key + " value " + value);
......@@ -280,7 +309,7 @@ public final class TCPReader {
checkWaitingMessages();
}
private String getStringFromRegistry(final int id) {
private final String getStringFromRegistry(final int id) {
return stringRegistry.get(id);
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment