Skip to content
Snippets Groups Projects
Commit 40af7764 authored by Florian Fittkau's avatar Florian Fittkau
Browse files

WiP

parent 6a8700a3
No related branches found
No related tags found
No related merge requests found
...@@ -40,5 +40,6 @@ explorviz.live_trace_processing.tcp_reader_disruptor_size=16384 ...@@ -40,5 +40,6 @@ explorviz.live_trace_processing.tcp_reader_disruptor_size=16384
explorviz.live_trace_processing.trace_reconstruction_disruptor_size=256 explorviz.live_trace_processing.trace_reconstruction_disruptor_size=256
explorviz.live_trace_processing.trace_reconstruction_buffer_initial_size=128 explorviz.live_trace_processing.trace_reconstruction_buffer_initial_size=128
explorviz.live_trace_processing.trace_reconstruction_timeout_in_sec=4
explorviz.live_trace_processing.trace_summarization_disruptor_size=64 explorviz.live_trace_processing.trace_summarization_disruptor_size=64
...@@ -15,17 +15,21 @@ import explorviz.live_trace_processing.record.trace.RuntimeStatisticInformation; ...@@ -15,17 +15,21 @@ import explorviz.live_trace_processing.record.trace.RuntimeStatisticInformation;
import explorviz.live_trace_processing.record.trace.Trace; import explorviz.live_trace_processing.record.trace.Trace;
class TraceReconstructionBuffer { class TraceReconstructionBuffer {
private static final int TIMEOUT_IN_SECONDS = 4;
private final List<AbstractEventRecord> events = new ArrayList<AbstractEventRecord>( private final List<AbstractEventRecord> events = new ArrayList<AbstractEventRecord>(
Constants.TRACE_RECONSTRUCTION_BUFFER_INITIAL_SIZE); Constants.TRACE_RECONSTRUCTION_BUFFER_INITIAL_SIZE);
private int openEvents; private int openEvents;
private int updatedInThisPeriodCounter = TIMEOUT_IN_SECONDS; private final int TIMEOUT_IN_SECONDS;
private int updatedInThisPeriodCounter;
private int maxOrderIndex = -1; private int maxOrderIndex = -1;
public TraceReconstructionBuffer(final int traceTimeoutInSec) {
TIMEOUT_IN_SECONDS = traceTimeoutInSec;
updatedInThisPeriodCounter = TIMEOUT_IN_SECONDS;
}
public final void insertEvent(final AbstractEventRecord event) { public final void insertEvent(final AbstractEventRecord event) {
resetTimeoutCounter(); resetTimeoutCounter();
setMaxOrderIndex(event); setMaxOrderIndex(event);
......
...@@ -23,10 +23,13 @@ public final class TraceReconstructionFilter extends AbstractFilter implements R ...@@ -23,10 +23,13 @@ public final class TraceReconstructionFilter extends AbstractFilter implements R
private final PipesMerger<IRecord> traceReconstructionMerger; private final PipesMerger<IRecord> traceReconstructionMerger;
private final int traceTimeoutInSec;
public TraceReconstructionFilter(final PipesMerger<IRecord> traceReconstructionMerger, public TraceReconstructionFilter(final PipesMerger<IRecord> traceReconstructionMerger,
final Queue<IRecord> receiverQueue) { final Queue<IRecord> receiverQueue, final int traceTimeoutInSec) {
super(receiverQueue, "Reconstructed traces/sec", 1000); super(receiverQueue, "Reconstructed traces/sec", 1000);
this.traceReconstructionMerger = traceReconstructionMerger; this.traceReconstructionMerger = traceReconstructionMerger;
this.traceTimeoutInSec = traceTimeoutInSec;
} }
@Override @Override
...@@ -61,7 +64,7 @@ public final class TraceReconstructionFilter extends AbstractFilter implements R ...@@ -61,7 +64,7 @@ public final class TraceReconstructionFilter extends AbstractFilter implements R
private final TraceReconstructionBuffer getBufferForRecord(final AbstractEventRecord record) { private final TraceReconstructionBuffer getBufferForRecord(final AbstractEventRecord record) {
TraceReconstructionBuffer traceBuffer = traceIdAndHost2trace.get(record); TraceReconstructionBuffer traceBuffer = traceIdAndHost2trace.get(record);
if (traceBuffer == null) { if (traceBuffer == null) {
traceBuffer = new TraceReconstructionBuffer(); traceBuffer = new TraceReconstructionBuffer(traceTimeoutInSec);
traceIdAndHost2trace.put(record, traceBuffer); traceIdAndHost2trace.put(record, traceBuffer);
} }
return traceBuffer; return traceBuffer;
......
...@@ -26,7 +26,8 @@ public class FilterConfiguration { ...@@ -26,7 +26,8 @@ public class FilterConfiguration {
Constants.TCP_READER_DISRUPTOR_SIZE); Constants.TCP_READER_DISRUPTOR_SIZE);
new TraceReconstructionFilter(traceReconstructionMerger, new TraceReconstructionFilter(traceReconstructionMerger,
traceReductionConnector.registerProducer()).start(); traceReductionConnector.registerProducer(),
Constants.TRACE_RECONSTRUCTION_TIMEOUT_IN_SEC).start();
new TCPReader(configuration.getIntProperty(ConfigurationFactory.READER_LISTENING_PORT, new TCPReader(configuration.getIntProperty(ConfigurationFactory.READER_LISTENING_PORT,
10133), traceReconstructionMerger).read(); 10133), traceReconstructionMerger).read();
......
...@@ -11,7 +11,7 @@ public class TraceReconstructionBufferTest { ...@@ -11,7 +11,7 @@ public class TraceReconstructionBufferTest {
@Test @Test
public void testInsertEvent() throws Exception { public void testInsertEvent() throws Exception {
final TraceReconstructionBuffer traceReconstructionBuffer = new TraceReconstructionBuffer(); final TraceReconstructionBuffer traceReconstructionBuffer = new TraceReconstructionBuffer(5);
traceReconstructionBuffer.insertEvent(new BeforeOperationEventRecord(1, 0, 0, "test", traceReconstructionBuffer.insertEvent(new BeforeOperationEventRecord(1, 0, 0, "test",
"TestClazz", "", new HostApplicationMetaDataRecord("testSystem", "testIp", "TestClazz", "", new HostApplicationMetaDataRecord("testSystem", "testIp",
"testHost", "testApp", "Java"))); "testHost", "testApp", "Java")));
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment