From 7de0baaf162bc712aab7fa467bdb4395970723ac Mon Sep 17 00:00:00 2001 From: Florian Fittkau <ffi@informatik.uni-kiel.de> Date: Fri, 11 Oct 2013 17:49:50 +0200 Subject: [PATCH] new flush buffer system --- .../TraceReconstructionFilter.java | 65 +++++++++---------- .../TracePatternSummarizationFilter.java | 27 ++++---- 2 files changed, 44 insertions(+), 48 deletions(-) diff --git a/src/explorviz/hpc_monitoring/filter/reconstruction/TraceReconstructionFilter.java b/src/explorviz/hpc_monitoring/filter/reconstruction/TraceReconstructionFilter.java index d92be65..e48986f 100644 --- a/src/explorviz/hpc_monitoring/filter/reconstruction/TraceReconstructionFilter.java +++ b/src/explorviz/hpc_monitoring/filter/reconstruction/TraceReconstructionFilter.java @@ -16,15 +16,15 @@ import explorviz.hpc_monitoring.disruptor.RecordArrayEvent; import explorviz.hpc_monitoring.disruptor.RecordEvent; import explorviz.hpc_monitoring.filter.counting.CountingThroughputFilter; import explorviz.hpc_monitoring.filter.reduction.TracePatternSummarizationFilter; -import explorviz.hpc_monitoring.reader.IPeriodicTimeSignalReceiver; -import explorviz.hpc_monitoring.reader.TimeSignalReader; +import explorviz.hpc_monitoring.reader.TimeProvider; import explorviz.hpc_monitoring.record.HostApplicationMetaData; import explorviz.hpc_monitoring.record.IRecord; +import explorviz.hpc_monitoring.record.TerminateRecord; +import explorviz.hpc_monitoring.record.TimedPeriodRecord; import explorviz.hpc_monitoring.record.event.AbstractOperationEvent; import explorviz.hpc_monitoring.record.trace.Trace; -public final class TraceReconstructionFilter implements EventHandler<RecordArrayEvent>, - IPeriodicTimeSignalReceiver { +public final class TraceReconstructionFilter implements EventHandler<RecordArrayEvent> { private static final int OUTPUT_MESSAGE_BUFFER_SIZE = 256; private static final CountingThroughputFilter counter = new CountingThroughputFilter( @@ -51,16 +51,11 @@ public final class TraceReconstructionFilter implements EventHandler<RecordArray eventHandlers[0] = new TracePatternSummarizationFilter(1 * 1000, endReceiver); disruptor.handleEventsWith(eventHandlers); ringBuffer = disruptor.start(); - - new TimeSignalReader(1 * 1000, this).start(); } - @Override - public void periodicTimeSignal(final long timestamp) { - synchronized (this) { - checkForTimeouts(timestamp); - flushOutputBuffer(); - } + private void periodicTimeSignal(final long timestamp) { + checkForTimeouts(timestamp); + flushOutputBuffer(); } private void checkForTimeouts(final long timestamp) { @@ -85,18 +80,16 @@ public final class TraceReconstructionFilter implements EventHandler<RecordArray } private void sendOutInvalidTrace(final Trace trace) { - // putInRingBuffer(trace); // TODO + putInRingBuffer(trace); System.out.println("Invalid trace: " + trace.getTraceEvents()[0].getTraceId()); } private void putInRingBuffer(final IRecord message) { counter.inputObjects(message); - synchronized (this) { - outputBuffer[outputBufferIndex++] = message; + outputBuffer[outputBufferIndex++] = message; - if (outputBufferIndex == OUTPUT_MESSAGE_BUFFER_SIZE) { - flushOutputBuffer(); - } + if (outputBufferIndex == OUTPUT_MESSAGE_BUFFER_SIZE) { + flushOutputBuffer(); } } @@ -121,16 +114,22 @@ public final class TraceReconstructionFilter implements EventHandler<RecordArray for (int i = 0; i < valuesLength; i++) { final IRecord record = values[i]; - final AbstractOperationEvent abstractOperationEvent = ((AbstractOperationEvent) record); - - final long traceId = abstractOperationEvent.getTraceId(); - final TraceBuffer traceBuffer = getBufferForTraceId( - abstractOperationEvent.getTraceId(), event.getMetadata()); - traceBuffer.insertEvent(abstractOperationEvent); - - if (traceBuffer.isFinished()) { - traceId2trace.remove(traceId); - sendOutValidTrace(traceBuffer.toTrace()); + if (record instanceof AbstractOperationEvent) { + final AbstractOperationEvent abstractOperationEvent = ((AbstractOperationEvent) record); + + final long traceId = abstractOperationEvent.getTraceId(); + final TraceBuffer traceBuffer = getBufferForTraceId( + abstractOperationEvent.getTraceId(), event.getMetadata()); + traceBuffer.insertEvent(abstractOperationEvent); + + if (traceBuffer.isFinished()) { + traceId2trace.remove(traceId); + sendOutValidTrace(traceBuffer.toTrace()); + } + } else if (record instanceof TimedPeriodRecord) { + periodicTimeSignal(TimeProvider.getCurrentTimestamp()); + } else if (record instanceof TerminateRecord) { + terminate(); } } } @@ -146,12 +145,10 @@ public final class TraceReconstructionFilter implements EventHandler<RecordArray return traceBuffer; } - public void terminate() { - synchronized (this) { - for (final TraceBuffer entry : traceId2trace.values()) { - sendOutInvalidTrace(entry.toTrace()); - } - traceId2trace.clear(); + private void terminate() { + for (final TraceBuffer entry : traceId2trace.values()) { + sendOutInvalidTrace(entry.toTrace()); } + traceId2trace.clear(); } } \ No newline at end of file diff --git a/src/explorviz/hpc_monitoring/filter/reduction/TracePatternSummarizationFilter.java b/src/explorviz/hpc_monitoring/filter/reduction/TracePatternSummarizationFilter.java index 0820bd9..9274e51 100644 --- a/src/explorviz/hpc_monitoring/filter/reduction/TracePatternSummarizationFilter.java +++ b/src/explorviz/hpc_monitoring/filter/reduction/TracePatternSummarizationFilter.java @@ -14,14 +14,13 @@ import com.lmax.disruptor.dsl.Disruptor; import explorviz.hpc_monitoring.disruptor.RecordArrayEvent; import explorviz.hpc_monitoring.disruptor.RecordEvent; import explorviz.hpc_monitoring.filter.counting.CountingThroughputFilter; -import explorviz.hpc_monitoring.reader.IPeriodicTimeSignalReceiver; import explorviz.hpc_monitoring.reader.TimeProvider; -import explorviz.hpc_monitoring.reader.TimeSignalReader; import explorviz.hpc_monitoring.record.IRecord; +import explorviz.hpc_monitoring.record.TerminateRecord; +import explorviz.hpc_monitoring.record.TimedPeriodRecord; import explorviz.hpc_monitoring.record.trace.Trace; -public class TracePatternSummarizationFilter implements EventHandler<RecordArrayEvent>, - IPeriodicTimeSignalReceiver { +public class TracePatternSummarizationFilter implements EventHandler<RecordArrayEvent> { private final long maxCollectionDuration; private final Map<Trace, TraceAggregationBuffer> trace2buffer = new ConcurrentSkipListMap<Trace, TraceAggregationBuffer>( @@ -47,7 +46,6 @@ public class TracePatternSummarizationFilter implements EventHandler<RecordArray disruptor.handleEventsWith(eventHandlers); } ringBuffer = disruptor.start(); - new TimeSignalReader(1 * 1000, this).start(); } @Override @@ -56,10 +54,14 @@ public class TracePatternSummarizationFilter implements EventHandler<RecordArray final IRecord[] values = event.getValues(); final int valuesLength = event.getValuesLength(); - synchronized (this) { - for (int i = 0; i < valuesLength; i++) { - final IRecord record = values[i]; + for (int i = 0; i < valuesLength; i++) { + final IRecord record = values[i]; + if (record instanceof Trace) { insertIntoBuffer((Trace) record); + } else if (record instanceof TimedPeriodRecord) { + periodicTimeSignal(TimeProvider.getCurrentTimestamp()); + } else if (record instanceof TerminateRecord) { + terminate(); } } } @@ -73,11 +75,8 @@ public class TracePatternSummarizationFilter implements EventHandler<RecordArray traceAggregationBuffer.insertTrace(trace); } - @Override - public void periodicTimeSignal(final long timestamp) { - synchronized (this) { - processTimeoutQueue(timestamp); - } + private void periodicTimeSignal(final long timestamp) { + processTimeoutQueue(timestamp); } private void processTimeoutQueue(final long timestamp) { @@ -103,7 +102,7 @@ public class TracePatternSummarizationFilter implements EventHandler<RecordArray ringBuffer.publish(hiseq); } - public void terminate(final boolean error) { + private void terminate() { for (final TraceAggregationBuffer traceBuffer : trace2buffer.values()) { sendOutTrace(traceBuffer.getAggregatedTrace()); } -- GitLab