Skip to content
Snippets Groups Projects
Commit 92318842 authored by Florian Fittkau's avatar Florian Fittkau
Browse files

refactoring

parent 98abb6bc
No related branches found
No related tags found
No related merge requests found
......@@ -7,10 +7,10 @@ import java.util.TreeSet;
import explorviz.hpc_monitoring.record.HostApplicationMetadata;
import explorviz.hpc_monitoring.record.Trace;
import explorviz.hpc_monitoring.record.events.AbstractOperationEvent;
import explorviz.hpc_monitoring.record.events.normal.AfterFailedOperationEvent;
import explorviz.hpc_monitoring.record.events.normal.AfterOperationEvent;
import explorviz.hpc_monitoring.record.events.normal.BeforeOperationEvent;
import explorviz.hpc_monitoring.record.event.AbstractOperationEvent;
import explorviz.hpc_monitoring.record.event.normal.AfterFailedOperationEvent;
import explorviz.hpc_monitoring.record.event.normal.AfterOperationEvent;
import explorviz.hpc_monitoring.record.event.normal.BeforeOperationEvent;
public class TraceBuffer {
private static final Comparator<AbstractOperationEvent> COMPARATOR = new AbstractOperationEventComperator();
......
package explorviz.hpc_monitoring.filter.reconstruction;
import java.util.Map;
import java.util.Map.Entry;
import java.util.TreeMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
......@@ -14,11 +15,11 @@ import explorviz.hpc_monitoring.disruptor.RecordEvent;
import explorviz.hpc_monitoring.filter.counting.CountingThroughputFilter;
import explorviz.hpc_monitoring.filter.reduction.TracePatternSummarizationFilter;
import explorviz.hpc_monitoring.reader.IPeriodicTimeSignalReceiver;
import explorviz.hpc_monitoring.reader.TimeReader;
import explorviz.hpc_monitoring.reader.TimedReader;
import explorviz.hpc_monitoring.record.HostApplicationMetadata;
import explorviz.hpc_monitoring.record.IRecord;
import explorviz.hpc_monitoring.record.Trace;
import explorviz.hpc_monitoring.record.events.AbstractOperationEvent;
import explorviz.hpc_monitoring.record.event.AbstractOperationEvent;
public final class TraceReconstructionFilter implements
EventHandler<RecordArrayEvent>, IPeriodicTimeSignalReceiver {
......@@ -50,7 +51,7 @@ public final class TraceReconstructionFilter implements
disruptor.handleEventsWith(eventHandlers);
ringBuffer = disruptor.start();
new TimeReader(1 * 1000, this).start();
new TimedReader(1 * 1000, this).start();
}
@Override
......@@ -61,15 +62,14 @@ public final class TraceReconstructionFilter implements
}
private void checkForTimeouts(final long timestamp) {
// final long traceTimeout = timestamp - maxTraceTimeout;
// for (final TLongObjectIterator<TraceBuffer> iterator = traceId2trace
// .iterator(); iterator.hasNext(); iterator.advance()) {
// final TraceBuffer traceBuffer = iterator.value();
// if (traceBuffer.getMaxLoggingTimestamp() <= traceTimeout) {
// sendOutInvalidTrace(traceBuffer.toTrace());
// iterator.remove();
// }
// }
final long traceTimeout = timestamp - maxTraceTimeout;
for (final Entry<Long, TraceBuffer> entry : traceId2trace.entrySet()) {
final TraceBuffer traceBuffer = entry.getValue();
if (traceBuffer.getMaxLoggingTimestamp() <= traceTimeout) {
sendOutInvalidTrace(traceBuffer.toTrace());
// TODO remove from traceId2trace
}
}
}
private void sendOutValidTrace(final Trace trace) {
......
package explorviz.hpc_monitoring.filter.reduction;
import explorviz.hpc_monitoring.record.Trace;
import explorviz.hpc_monitoring.record.events.AbstractOperationEvent;
import explorviz.hpc_monitoring.record.event.AbstractOperationEvent;
public class TraceAggregationBuffer {
private Trace accumulator;
......
......@@ -16,10 +16,10 @@ import explorviz.hpc_monitoring.disruptor.RecordArrayEvent;
import explorviz.hpc_monitoring.disruptor.RecordEvent;
import explorviz.hpc_monitoring.filter.counting.CountingThroughputFilter;
import explorviz.hpc_monitoring.reader.IPeriodicTimeSignalReceiver;
import explorviz.hpc_monitoring.reader.TimeReader;
import explorviz.hpc_monitoring.reader.TimedReader;
import explorviz.hpc_monitoring.record.IRecord;
import explorviz.hpc_monitoring.record.Trace;
import explorviz.hpc_monitoring.record.events.AbstractOperationEvent;
import explorviz.hpc_monitoring.record.event.AbstractOperationEvent;
public class TracePatternSummarizationFilter implements
EventHandler<RecordArrayEvent>, IPeriodicTimeSignalReceiver {
......@@ -48,7 +48,7 @@ public class TracePatternSummarizationFilter implements
disruptor.handleEventsWith(eventHandlers);
}
ringBuffer = disruptor.start();
new TimeReader(1 * 1000, this).start();
new TimedReader(1 * 1000, this).start();
}
@Override
......
......@@ -23,9 +23,9 @@ import explorviz.hpc_monitoring.filter.counting.CountingThroughputFilter;
import explorviz.hpc_monitoring.filter.reconstruction.TraceReconstructionFilter;
import explorviz.hpc_monitoring.record.HostApplicationMetadata;
import explorviz.hpc_monitoring.record.IRecord;
import explorviz.hpc_monitoring.record.events.normal.AfterFailedOperationEvent;
import explorviz.hpc_monitoring.record.events.normal.AfterOperationEvent;
import explorviz.hpc_monitoring.record.events.normal.BeforeOperationEvent;
import explorviz.hpc_monitoring.record.event.normal.AfterFailedOperationEvent;
import explorviz.hpc_monitoring.record.event.normal.AfterOperationEvent;
import explorviz.hpc_monitoring.record.event.normal.BeforeOperationEvent;
public final class TCPReader implements IPeriodicTimeSignalReceiver {
private static final int MESSAGE_BUFFER_SIZE = 131072;
......@@ -65,12 +65,14 @@ public final class TCPReader implements IPeriodicTimeSignalReceiver {
disruptor.handleEventsWith(eventHandlers);
ringBuffer = disruptor.start();
new TimeReader(1 * 1000, this).start();
new TimedReader(1 * 1000, this).start();
}
@Override
public void periodicTimeSignal(final long timestamp) {
flushOutputBuffer();
synchronized (this) { // TODO remove
flushOutputBuffer();
}
}
public final void read() {
......@@ -252,24 +254,23 @@ public final class TCPReader implements IPeriodicTimeSignalReceiver {
}
private void flushOutputBuffer() {
synchronized (this) { // TODO remove
if (outputBufferIndex > 0) {
final long hiseq = ringBuffer.next();
final RecordArrayEvent valueEvent = ringBuffer.get(hiseq);
valueEvent.setValues(outputBuffer);
valueEvent.setMetadata(hostApplicationMetadata);
ringBuffer.publish(hiseq);
outputBuffer = new IRecord[OUTPUT_MESSAGE_BUFFER_SIZE]; // TODO
// object
// reusage?
outputBufferIndex = 0;
}
if (outputBufferIndex > 0) {
final long hiseq = ringBuffer.next();
final RecordArrayEvent valueEvent = ringBuffer.get(hiseq);
valueEvent.setValues(outputBuffer);
valueEvent.setMetadata(hostApplicationMetadata);
ringBuffer.publish(hiseq);
outputBuffer = new IRecord[OUTPUT_MESSAGE_BUFFER_SIZE]; // TODO
// object
// reusage?
outputBufferIndex = 0;
}
}
private final void putInWaitingMessages(final byte[] message) {
waitingForStringMessages.add(message);
waitingForStringMessages.add(message); // TODO kill messages if too long
// in queue
}
private final void checkWaitingMessages() {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment