From 92318842b213de79b36cbb115fd4c167413bd36e Mon Sep 17 00:00:00 2001 From: Florian Fittkau <ffi@informatik.uni-kiel.de> Date: Sun, 22 Sep 2013 16:14:20 +0200 Subject: [PATCH] refactoring --- .../filter/reconstruction/TraceBuffer.java | 8 ++-- .../TraceReconstructionFilter.java | 24 ++++++------ .../reduction/TraceAggregationBuffer.java | 2 +- .../TracePatternSummarizationFilter.java | 6 +-- .../hpc_monitoring/reader/TCPReader.java | 39 ++++++++++--------- 5 files changed, 40 insertions(+), 39 deletions(-) diff --git a/src/explorviz/hpc_monitoring/filter/reconstruction/TraceBuffer.java b/src/explorviz/hpc_monitoring/filter/reconstruction/TraceBuffer.java index 9e279a0..adbffc3 100644 --- a/src/explorviz/hpc_monitoring/filter/reconstruction/TraceBuffer.java +++ b/src/explorviz/hpc_monitoring/filter/reconstruction/TraceBuffer.java @@ -7,10 +7,10 @@ import java.util.TreeSet; import explorviz.hpc_monitoring.record.HostApplicationMetadata; import explorviz.hpc_monitoring.record.Trace; -import explorviz.hpc_monitoring.record.events.AbstractOperationEvent; -import explorviz.hpc_monitoring.record.events.normal.AfterFailedOperationEvent; -import explorviz.hpc_monitoring.record.events.normal.AfterOperationEvent; -import explorviz.hpc_monitoring.record.events.normal.BeforeOperationEvent; +import explorviz.hpc_monitoring.record.event.AbstractOperationEvent; +import explorviz.hpc_monitoring.record.event.normal.AfterFailedOperationEvent; +import explorviz.hpc_monitoring.record.event.normal.AfterOperationEvent; +import explorviz.hpc_monitoring.record.event.normal.BeforeOperationEvent; public class TraceBuffer { private static final Comparator<AbstractOperationEvent> COMPARATOR = new AbstractOperationEventComperator(); diff --git a/src/explorviz/hpc_monitoring/filter/reconstruction/TraceReconstructionFilter.java b/src/explorviz/hpc_monitoring/filter/reconstruction/TraceReconstructionFilter.java index e8950c2..d76e7f6 100644 --- a/src/explorviz/hpc_monitoring/filter/reconstruction/TraceReconstructionFilter.java +++ b/src/explorviz/hpc_monitoring/filter/reconstruction/TraceReconstructionFilter.java @@ -1,6 +1,7 @@ package explorviz.hpc_monitoring.filter.reconstruction; import java.util.Map; +import java.util.Map.Entry; import java.util.TreeMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -14,11 +15,11 @@ import explorviz.hpc_monitoring.disruptor.RecordEvent; import explorviz.hpc_monitoring.filter.counting.CountingThroughputFilter; import explorviz.hpc_monitoring.filter.reduction.TracePatternSummarizationFilter; import explorviz.hpc_monitoring.reader.IPeriodicTimeSignalReceiver; -import explorviz.hpc_monitoring.reader.TimeReader; +import explorviz.hpc_monitoring.reader.TimedReader; import explorviz.hpc_monitoring.record.HostApplicationMetadata; import explorviz.hpc_monitoring.record.IRecord; import explorviz.hpc_monitoring.record.Trace; -import explorviz.hpc_monitoring.record.events.AbstractOperationEvent; +import explorviz.hpc_monitoring.record.event.AbstractOperationEvent; public final class TraceReconstructionFilter implements EventHandler<RecordArrayEvent>, IPeriodicTimeSignalReceiver { @@ -50,7 +51,7 @@ public final class TraceReconstructionFilter implements disruptor.handleEventsWith(eventHandlers); ringBuffer = disruptor.start(); - new TimeReader(1 * 1000, this).start(); + new TimedReader(1 * 1000, this).start(); } @Override @@ -61,15 +62,14 @@ public final class TraceReconstructionFilter implements } private void checkForTimeouts(final long timestamp) { - // final long traceTimeout = timestamp - maxTraceTimeout; - // for (final TLongObjectIterator<TraceBuffer> iterator = traceId2trace - // .iterator(); iterator.hasNext(); iterator.advance()) { - // final TraceBuffer traceBuffer = iterator.value(); - // if (traceBuffer.getMaxLoggingTimestamp() <= traceTimeout) { - // sendOutInvalidTrace(traceBuffer.toTrace()); - // iterator.remove(); - // } - // } + final long traceTimeout = timestamp - maxTraceTimeout; + for (final Entry<Long, TraceBuffer> entry : traceId2trace.entrySet()) { + final TraceBuffer traceBuffer = entry.getValue(); + if (traceBuffer.getMaxLoggingTimestamp() <= traceTimeout) { + sendOutInvalidTrace(traceBuffer.toTrace()); + // TODO remove from traceId2trace + } + } } private void sendOutValidTrace(final Trace trace) { diff --git a/src/explorviz/hpc_monitoring/filter/reduction/TraceAggregationBuffer.java b/src/explorviz/hpc_monitoring/filter/reduction/TraceAggregationBuffer.java index cb148ab..80e97a5 100644 --- a/src/explorviz/hpc_monitoring/filter/reduction/TraceAggregationBuffer.java +++ b/src/explorviz/hpc_monitoring/filter/reduction/TraceAggregationBuffer.java @@ -1,7 +1,7 @@ package explorviz.hpc_monitoring.filter.reduction; import explorviz.hpc_monitoring.record.Trace; -import explorviz.hpc_monitoring.record.events.AbstractOperationEvent; +import explorviz.hpc_monitoring.record.event.AbstractOperationEvent; public class TraceAggregationBuffer { private Trace accumulator; diff --git a/src/explorviz/hpc_monitoring/filter/reduction/TracePatternSummarizationFilter.java b/src/explorviz/hpc_monitoring/filter/reduction/TracePatternSummarizationFilter.java index e0956bd..2ec59f9 100644 --- a/src/explorviz/hpc_monitoring/filter/reduction/TracePatternSummarizationFilter.java +++ b/src/explorviz/hpc_monitoring/filter/reduction/TracePatternSummarizationFilter.java @@ -16,10 +16,10 @@ import explorviz.hpc_monitoring.disruptor.RecordArrayEvent; import explorviz.hpc_monitoring.disruptor.RecordEvent; import explorviz.hpc_monitoring.filter.counting.CountingThroughputFilter; import explorviz.hpc_monitoring.reader.IPeriodicTimeSignalReceiver; -import explorviz.hpc_monitoring.reader.TimeReader; +import explorviz.hpc_monitoring.reader.TimedReader; import explorviz.hpc_monitoring.record.IRecord; import explorviz.hpc_monitoring.record.Trace; -import explorviz.hpc_monitoring.record.events.AbstractOperationEvent; +import explorviz.hpc_monitoring.record.event.AbstractOperationEvent; public class TracePatternSummarizationFilter implements EventHandler<RecordArrayEvent>, IPeriodicTimeSignalReceiver { @@ -48,7 +48,7 @@ public class TracePatternSummarizationFilter implements disruptor.handleEventsWith(eventHandlers); } ringBuffer = disruptor.start(); - new TimeReader(1 * 1000, this).start(); + new TimedReader(1 * 1000, this).start(); } @Override diff --git a/src/explorviz/hpc_monitoring/reader/TCPReader.java b/src/explorviz/hpc_monitoring/reader/TCPReader.java index 5b05403..4ffd104 100644 --- a/src/explorviz/hpc_monitoring/reader/TCPReader.java +++ b/src/explorviz/hpc_monitoring/reader/TCPReader.java @@ -23,9 +23,9 @@ import explorviz.hpc_monitoring.filter.counting.CountingThroughputFilter; import explorviz.hpc_monitoring.filter.reconstruction.TraceReconstructionFilter; import explorviz.hpc_monitoring.record.HostApplicationMetadata; import explorviz.hpc_monitoring.record.IRecord; -import explorviz.hpc_monitoring.record.events.normal.AfterFailedOperationEvent; -import explorviz.hpc_monitoring.record.events.normal.AfterOperationEvent; -import explorviz.hpc_monitoring.record.events.normal.BeforeOperationEvent; +import explorviz.hpc_monitoring.record.event.normal.AfterFailedOperationEvent; +import explorviz.hpc_monitoring.record.event.normal.AfterOperationEvent; +import explorviz.hpc_monitoring.record.event.normal.BeforeOperationEvent; public final class TCPReader implements IPeriodicTimeSignalReceiver { private static final int MESSAGE_BUFFER_SIZE = 131072; @@ -65,12 +65,14 @@ public final class TCPReader implements IPeriodicTimeSignalReceiver { disruptor.handleEventsWith(eventHandlers); ringBuffer = disruptor.start(); - new TimeReader(1 * 1000, this).start(); + new TimedReader(1 * 1000, this).start(); } @Override public void periodicTimeSignal(final long timestamp) { - flushOutputBuffer(); + synchronized (this) { // TODO remove + flushOutputBuffer(); + } } public final void read() { @@ -252,24 +254,23 @@ public final class TCPReader implements IPeriodicTimeSignalReceiver { } private void flushOutputBuffer() { - synchronized (this) { // TODO remove - if (outputBufferIndex > 0) { - final long hiseq = ringBuffer.next(); - final RecordArrayEvent valueEvent = ringBuffer.get(hiseq); - valueEvent.setValues(outputBuffer); - valueEvent.setMetadata(hostApplicationMetadata); - ringBuffer.publish(hiseq); - - outputBuffer = new IRecord[OUTPUT_MESSAGE_BUFFER_SIZE]; // TODO - // object - // reusage? - outputBufferIndex = 0; - } + if (outputBufferIndex > 0) { + final long hiseq = ringBuffer.next(); + final RecordArrayEvent valueEvent = ringBuffer.get(hiseq); + valueEvent.setValues(outputBuffer); + valueEvent.setMetadata(hostApplicationMetadata); + ringBuffer.publish(hiseq); + + outputBuffer = new IRecord[OUTPUT_MESSAGE_BUFFER_SIZE]; // TODO + // object + // reusage? + outputBufferIndex = 0; } } private final void putInWaitingMessages(final byte[] message) { - waitingForStringMessages.add(message); + waitingForStringMessages.add(message); // TODO kill messages if too long + // in queue } private final void checkWaitingMessages() { -- GitLab