diff --git a/src/META-INF/explorviz.live_trace_processing.default.properties b/src/META-INF/explorviz.live_trace_processing.default.properties index 26467df60b85e4e605546524c6f270210e85e163..19bbac24ab80812c9578bb8c4e05eedb47642097 100644 --- a/src/META-INF/explorviz.live_trace_processing.default.properties +++ b/src/META-INF/explorviz.live_trace_processing.default.properties @@ -11,9 +11,6 @@ explorviz.live_trace_processing.writer_load_balancing_port=9999 explorviz.live_trace_processing.writer_load_balancing_wait_time=20000 explorviz.live_trace_processing.writer_load_balancing_scaling_group=analysis-worker-2 -explorviz.live_trace_processing.sending_buffer_size=65536 -explorviz.live_trace_processing.monitoring_controller_disruptor_size=32 - explorviz.live_trace_processing.tcp_reader_output_buffer_size=8192 explorviz.live_trace_processing.tcp_reader_disruptor_size=32 diff --git a/src/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionBuffer.java b/src/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionBuffer.java index 1f8203e6a0210507ba63696565321b0ed630ef1d..1a6ecbfb7623e0440075753be97139e09c99b288 100644 --- a/src/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionBuffer.java +++ b/src/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionBuffer.java @@ -4,6 +4,7 @@ import java.util.ArrayList; import java.util.List; import explorviz.live_trace_processing.Constants; +import explorviz.live_trace_processing.reader.TimeProvider; import explorviz.live_trace_processing.record.event.AbstractOperationEventRecord; import explorviz.live_trace_processing.record.event.normal.AfterFailedOperationEventRecord; import explorviz.live_trace_processing.record.event.normal.AfterOperationEventRecord; @@ -18,11 +19,13 @@ class TraceReconstructionBuffer { private int openEvents; - private long maxLoggingTimestamp = -1; + private volatile boolean updatedInThisPeriod = true; + + private long lastBufferInsert = -1; private int maxOrderIndex = -1; public final void insertEvent(final AbstractOperationEventRecord event) { - setMaxLoggingTimestamp(event); + updatedInThisPeriod = true; final int orderIndex = setMaxOrderIndex(event); if (event instanceof BeforeOperationEventRecord) { @@ -39,15 +42,20 @@ class TraceReconstructionBuffer { events.add(event); } - public final long getMaxLoggingTimestamp() { - return maxLoggingTimestamp; + public boolean isUpdatedInThisPeriod() { + return updatedInThisPeriod; } - private final void setMaxLoggingTimestamp(final AbstractOperationEventRecord event) { - final long loggingTimestamp = event.getLoggingTimestamp(); - if (loggingTimestamp > maxLoggingTimestamp) { - maxLoggingTimestamp = loggingTimestamp; - } + public void setUpdatedInThisPeriod(final boolean updated) { + updatedInThisPeriod = updated; + } + + public final long getLastBufferInsert() { + return lastBufferInsert; + } + + public final void updateLastBufferInsert() { + lastBufferInsert = TimeProvider.getCurrentTimestamp(); } private final int setMaxOrderIndex(final AbstractOperationEventRecord event) { diff --git a/src/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionFilter.java b/src/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionFilter.java index fa5a6841e7d024ac5bd13b5e239a22e682ea556d..6c6917b7cdfaf7d1d917bdd34bec5dbf029655a4 100644 --- a/src/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionFilter.java +++ b/src/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionFilter.java @@ -76,9 +76,15 @@ public final class TraceReconstructionFilter extends AbstractFilter implements I for (final Entry<Long, TraceReconstructionBuffer> entry : traceId2trace.entrySet()) { final TraceReconstructionBuffer traceBuffer = entry.getValue(); - if (traceBuffer.getMaxLoggingTimestamp() <= traceTimeout) { - deliver(traceBuffer.toTrace(false)); - traceIdsToRemove.add(entry.getKey()); + if (traceBuffer.isUpdatedInThisPeriod()) { + traceBuffer.setUpdatedInThisPeriod(false); + } else { + traceBuffer.updateLastBufferInsert(); + + if (traceBuffer.getLastBufferInsert() <= traceTimeout) { + deliver(traceBuffer.toTrace(false)); + traceIdsToRemove.add(entry.getKey()); + } } } diff --git a/src/explorviz/live_trace_processing/main/FilterConfiguration.java b/src/explorviz/live_trace_processing/main/FilterConfiguration.java index 977ca2c97183a337f7fcf23572a6a19bcc34aca0..4975c418d1ed8d7a49c50d6e1ecba67ea6c73322 100644 --- a/src/explorviz/live_trace_processing/main/FilterConfiguration.java +++ b/src/explorviz/live_trace_processing/main/FilterConfiguration.java @@ -15,10 +15,10 @@ public class FilterConfiguration { public static void configureAndStartFilters(final Configuration configuration, final ITraceSink sink) { final ITraceReduction traceReduction = new TracePatternSummarizationFilter( - TimeUnit.MILLISECONDS.toNanos(990), sink); + TimeUnit.SECONDS.toNanos(1), sink); final ITraceReconstruction traceReconstruction = new TraceReconstructionFilter( - TimeUnit.SECONDS.toNanos(5), traceReduction); + TimeUnit.SECONDS.toNanos(4), traceReduction); new TCPReader(configuration.getIntProperty(ConfigurationFactory.READER_LISTENING_PORT), traceReconstruction).read(); diff --git a/test/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionBufferTest.java b/test/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionBufferTest.java index 3fae4a3ca75157e5fef771d9d0573cd1f7c2a8ae..bd1df03c04162347ff9b9e2054626f6b356d10b1 100644 --- a/test/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionBufferTest.java +++ b/test/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionBufferTest.java @@ -1,6 +1,6 @@ package explorviz.live_trace_processing.filter.reconstruction; -import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import org.junit.Test; @@ -16,7 +16,7 @@ public class TraceReconstructionBufferTest { traceReconstructionBuffer.insertEvent(new BeforeOperationEventRecord(1000, 1, 0, "test", new HostApplicationMetaDataRecord("testHost", "testApp"), new RuntimeStatisticInformation(1000))); - assertEquals(1000, traceReconstructionBuffer.getMaxLoggingTimestamp()); + assertTrue(true); // TODO } }