Skip to content
Snippets Groups Projects
Commit 7de0baaf authored by Florian Fittkau's avatar Florian Fittkau
Browse files

new flush buffer system

parent d52ac95d
Branches master
Tags
No related merge requests found
......@@ -16,15 +16,15 @@ import explorviz.hpc_monitoring.disruptor.RecordArrayEvent;
import explorviz.hpc_monitoring.disruptor.RecordEvent;
import explorviz.hpc_monitoring.filter.counting.CountingThroughputFilter;
import explorviz.hpc_monitoring.filter.reduction.TracePatternSummarizationFilter;
import explorviz.hpc_monitoring.reader.IPeriodicTimeSignalReceiver;
import explorviz.hpc_monitoring.reader.TimeSignalReader;
import explorviz.hpc_monitoring.reader.TimeProvider;
import explorviz.hpc_monitoring.record.HostApplicationMetaData;
import explorviz.hpc_monitoring.record.IRecord;
import explorviz.hpc_monitoring.record.TerminateRecord;
import explorviz.hpc_monitoring.record.TimedPeriodRecord;
import explorviz.hpc_monitoring.record.event.AbstractOperationEvent;
import explorviz.hpc_monitoring.record.trace.Trace;
public final class TraceReconstructionFilter implements EventHandler<RecordArrayEvent>,
IPeriodicTimeSignalReceiver {
public final class TraceReconstructionFilter implements EventHandler<RecordArrayEvent> {
private static final int OUTPUT_MESSAGE_BUFFER_SIZE = 256;
private static final CountingThroughputFilter counter = new CountingThroughputFilter(
......@@ -51,17 +51,12 @@ public final class TraceReconstructionFilter implements EventHandler<RecordArray
eventHandlers[0] = new TracePatternSummarizationFilter(1 * 1000, endReceiver);
disruptor.handleEventsWith(eventHandlers);
ringBuffer = disruptor.start();
new TimeSignalReader(1 * 1000, this).start();
}
@Override
public void periodicTimeSignal(final long timestamp) {
synchronized (this) {
private void periodicTimeSignal(final long timestamp) {
checkForTimeouts(timestamp);
flushOutputBuffer();
}
}
private void checkForTimeouts(final long timestamp) {
final long traceTimeout = timestamp - maxTraceTimeout;
......@@ -85,20 +80,18 @@ public final class TraceReconstructionFilter implements EventHandler<RecordArray
}
private void sendOutInvalidTrace(final Trace trace) {
// putInRingBuffer(trace); // TODO
putInRingBuffer(trace);
System.out.println("Invalid trace: " + trace.getTraceEvents()[0].getTraceId());
}
private void putInRingBuffer(final IRecord message) {
counter.inputObjects(message);
synchronized (this) {
outputBuffer[outputBufferIndex++] = message;
if (outputBufferIndex == OUTPUT_MESSAGE_BUFFER_SIZE) {
flushOutputBuffer();
}
}
}
private void flushOutputBuffer() {
if (outputBufferIndex > 0) {
......@@ -121,6 +114,7 @@ public final class TraceReconstructionFilter implements EventHandler<RecordArray
for (int i = 0; i < valuesLength; i++) {
final IRecord record = values[i];
if (record instanceof AbstractOperationEvent) {
final AbstractOperationEvent abstractOperationEvent = ((AbstractOperationEvent) record);
final long traceId = abstractOperationEvent.getTraceId();
......@@ -132,6 +126,11 @@ public final class TraceReconstructionFilter implements EventHandler<RecordArray
traceId2trace.remove(traceId);
sendOutValidTrace(traceBuffer.toTrace());
}
} else if (record instanceof TimedPeriodRecord) {
periodicTimeSignal(TimeProvider.getCurrentTimestamp());
} else if (record instanceof TerminateRecord) {
terminate();
}
}
}
......@@ -146,12 +145,10 @@ public final class TraceReconstructionFilter implements EventHandler<RecordArray
return traceBuffer;
}
public void terminate() {
synchronized (this) {
private void terminate() {
for (final TraceBuffer entry : traceId2trace.values()) {
sendOutInvalidTrace(entry.toTrace());
}
traceId2trace.clear();
}
}
\ No newline at end of file
}
\ No newline at end of file
......@@ -14,14 +14,13 @@ import com.lmax.disruptor.dsl.Disruptor;
import explorviz.hpc_monitoring.disruptor.RecordArrayEvent;
import explorviz.hpc_monitoring.disruptor.RecordEvent;
import explorviz.hpc_monitoring.filter.counting.CountingThroughputFilter;
import explorviz.hpc_monitoring.reader.IPeriodicTimeSignalReceiver;
import explorviz.hpc_monitoring.reader.TimeProvider;
import explorviz.hpc_monitoring.reader.TimeSignalReader;
import explorviz.hpc_monitoring.record.IRecord;
import explorviz.hpc_monitoring.record.TerminateRecord;
import explorviz.hpc_monitoring.record.TimedPeriodRecord;
import explorviz.hpc_monitoring.record.trace.Trace;
public class TracePatternSummarizationFilter implements EventHandler<RecordArrayEvent>,
IPeriodicTimeSignalReceiver {
public class TracePatternSummarizationFilter implements EventHandler<RecordArrayEvent> {
private final long maxCollectionDuration;
private final Map<Trace, TraceAggregationBuffer> trace2buffer = new ConcurrentSkipListMap<Trace, TraceAggregationBuffer>(
......@@ -47,7 +46,6 @@ public class TracePatternSummarizationFilter implements EventHandler<RecordArray
disruptor.handleEventsWith(eventHandlers);
}
ringBuffer = disruptor.start();
new TimeSignalReader(1 * 1000, this).start();
}
@Override
......@@ -56,10 +54,14 @@ public class TracePatternSummarizationFilter implements EventHandler<RecordArray
final IRecord[] values = event.getValues();
final int valuesLength = event.getValuesLength();
synchronized (this) {
for (int i = 0; i < valuesLength; i++) {
final IRecord record = values[i];
if (record instanceof Trace) {
insertIntoBuffer((Trace) record);
} else if (record instanceof TimedPeriodRecord) {
periodicTimeSignal(TimeProvider.getCurrentTimestamp());
} else if (record instanceof TerminateRecord) {
terminate();
}
}
}
......@@ -73,12 +75,9 @@ public class TracePatternSummarizationFilter implements EventHandler<RecordArray
traceAggregationBuffer.insertTrace(trace);
}
@Override
public void periodicTimeSignal(final long timestamp) {
synchronized (this) {
private void periodicTimeSignal(final long timestamp) {
processTimeoutQueue(timestamp);
}
}
private void processTimeoutQueue(final long timestamp) {
final long bufferTimeout = timestamp - maxCollectionDuration;
......@@ -103,7 +102,7 @@ public class TracePatternSummarizationFilter implements EventHandler<RecordArray
ringBuffer.publish(hiseq);
}
public void terminate(final boolean error) {
private void terminate() {
for (final TraceAggregationBuffer traceBuffer : trace2buffer.values()) {
sendOutTrace(traceBuffer.getAggregatedTrace());
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment