diff --git a/src/explorviz/live_trace_processing/filter/counting/IRecordCounting.java b/src/explorviz/live_trace_processing/filter/counting/IRecordCounting.java new file mode 100644 index 0000000000000000000000000000000000000000..6b081ec1397874bd1b83f6bd8f6701e34c6bf436 --- /dev/null +++ b/src/explorviz/live_trace_processing/filter/counting/IRecordCounting.java @@ -0,0 +1,7 @@ +package explorviz.live_trace_processing.filter.counting; + +import explorviz.live_trace_processing.filter.IPipeReceiver; + +public interface IRecordCounting extends IPipeReceiver { + +} diff --git a/src/explorviz/live_trace_processing/filter/counting/RecordCountingFilter.java b/src/explorviz/live_trace_processing/filter/counting/RecordCountingFilter.java new file mode 100644 index 0000000000000000000000000000000000000000..48a5dd3758ff061f6097deceac9f1b5839ba2ceb --- /dev/null +++ b/src/explorviz/live_trace_processing/filter/counting/RecordCountingFilter.java @@ -0,0 +1,29 @@ +package explorviz.live_trace_processing.filter.counting; + +import explorviz.live_trace_processing.filter.AbstractFilter; +import explorviz.live_trace_processing.filter.IPipeReceiver; +import explorviz.live_trace_processing.record.IRecord; +import explorviz.live_trace_processing.record.event.AbstractEventRecord; +import explorviz.live_trace_processing.record.trace.Trace; + +public class RecordCountingFilter extends AbstractFilter implements IRecordCounting { + + public RecordCountingFilter(final IPipeReceiver receiver) { + super(receiver, 16, 64, "Records/sec"); + counter.setEnabled(false); + } + + @Override + public void processRecord(final IRecord record) { + if (record instanceof Trace) { + final Trace trace = (Trace) record; + if (trace.isValid()) { + counter.inputObjectsCount(trace.getTraceEvents().size()); + } + } else if (record instanceof AbstractEventRecord) { + counter.inputObjectsCount(1); + } else { + deliver(record); + } + } +} diff --git a/src/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionFilter.java b/src/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionFilter.java index 9dce7c921118e3cb2d5c787cc80568d65d817fbc..39c6425f7839be0e62992932434627a71651b537 100644 --- a/src/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionFilter.java +++ b/src/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionFilter.java @@ -39,8 +39,9 @@ public final class TraceReconstructionFilter extends AbstractFilter implements I traceBuffer.insertEvent(abstractOperationEvent); if (traceBuffer.isFinished()) { - deliver(traceBuffer.toTrace(true)); + final Trace trace = traceBuffer.toTrace(true); + deliver(trace); traceId2trace.remove(traceId); } } else if (record instanceof Trace) { diff --git a/src/explorviz/live_trace_processing/filter/reduction/summarization/TracePatternSummarizationFilter.java b/src/explorviz/live_trace_processing/filter/reduction/summarization/TracePatternSummarizationFilter.java index 90b0fbed77f77d200e3298df6071cd7ea4278cde..5a820a3c3feea18179493a95afbef4a4186b8b77 100644 --- a/src/explorviz/live_trace_processing/filter/reduction/summarization/TracePatternSummarizationFilter.java +++ b/src/explorviz/live_trace_processing/filter/reduction/summarization/TracePatternSummarizationFilter.java @@ -27,7 +27,7 @@ public class TracePatternSummarizationFilter extends AbstractFilter implements I public TracePatternSummarizationFilter(final long maxCollectionDuration, final IPipeReceiver sinkReceiver) { super(sinkReceiver, Constants.TRACE_SUMMARIZATION_DISRUPTOR_SIZE, - Constants.TRACE_SUMMARIZATION_OUTPUT_BUFFER_SIZE, "Reduced traces / sec"); + Constants.TRACE_SUMMARIZATION_OUTPUT_BUFFER_SIZE, "Reduced traces/sec"); this.maxCollectionDuration = maxCollectionDuration; } diff --git a/src/explorviz/live_trace_processing/main/FilterConfiguration.java b/src/explorviz/live_trace_processing/main/FilterConfiguration.java index 8590ac5e43679dbf9d4fe89279dd0f1689afbec0..cbce1f656724f70c10506f968e70a049ea6626dc 100644 --- a/src/explorviz/live_trace_processing/main/FilterConfiguration.java +++ b/src/explorviz/live_trace_processing/main/FilterConfiguration.java @@ -5,6 +5,8 @@ import java.util.concurrent.TimeUnit; import explorviz.live_trace_processing.configuration.Configuration; import explorviz.live_trace_processing.configuration.ConfigurationFactory; import explorviz.live_trace_processing.filter.ITraceSink; +import explorviz.live_trace_processing.filter.counting.IRecordCounting; +import explorviz.live_trace_processing.filter.counting.RecordCountingFilter; import explorviz.live_trace_processing.filter.reconstruction.ITraceReconstruction; import explorviz.live_trace_processing.filter.reconstruction.TraceReconstructionFilter; import explorviz.live_trace_processing.filter.reduction.ITraceReduction; @@ -14,8 +16,10 @@ import explorviz.live_trace_processing.reader.TCPReader; public class FilterConfiguration { public static void configureAndStartFilters(final Configuration configuration, final ITraceSink sink) { + final IRecordCounting recordCounting = new RecordCountingFilter(sink); + final ITraceReduction traceReduction = new TracePatternSummarizationFilter( - TimeUnit.MILLISECONDS.toNanos(990), sink); + TimeUnit.MILLISECONDS.toNanos(990), recordCounting); final ITraceReconstruction traceReconstruction = new TraceReconstructionFilter( TimeUnit.SECONDS.toNanos(4), traceReduction);