Skip to content
Snippets Groups Projects
Commit 9d50ec5f authored by Florian Fittkau's avatar Florian Fittkau
Browse files

WiP

parent 6d2de765
Branches
Tags
No related merge requests found
...@@ -5,7 +5,6 @@ import java.util.List; ...@@ -5,7 +5,6 @@ import java.util.List;
import java.util.Stack; import java.util.Stack;
import explorviz.live_trace_processing.Constants; import explorviz.live_trace_processing.Constants;
import explorviz.live_trace_processing.reader.TimeProvider;
import explorviz.live_trace_processing.record.event.AbstractAfterEventRecord; import explorviz.live_trace_processing.record.event.AbstractAfterEventRecord;
import explorviz.live_trace_processing.record.event.AbstractAfterFailedEventRecord; import explorviz.live_trace_processing.record.event.AbstractAfterFailedEventRecord;
import explorviz.live_trace_processing.record.event.AbstractBeforeEventRecord; import explorviz.live_trace_processing.record.event.AbstractBeforeEventRecord;
...@@ -16,18 +15,19 @@ import explorviz.live_trace_processing.record.trace.RuntimeStatisticInformation; ...@@ -16,18 +15,19 @@ import explorviz.live_trace_processing.record.trace.RuntimeStatisticInformation;
import explorviz.live_trace_processing.record.trace.Trace; import explorviz.live_trace_processing.record.trace.Trace;
class TraceReconstructionBuffer { class TraceReconstructionBuffer {
private static final int TIMEOUT_IN_SECONDS = 4;
private final List<AbstractEventRecord> events = new ArrayList<AbstractEventRecord>( private final List<AbstractEventRecord> events = new ArrayList<AbstractEventRecord>(
Constants.TRACE_RECONSTRUCTION_BUFFER_INITIAL_SIZE); Constants.TRACE_RECONSTRUCTION_BUFFER_INITIAL_SIZE);
private int openEvents; private int openEvents;
private volatile boolean updatedInThisPeriod = true; private int updatedInThisPeriodCounter = TIMEOUT_IN_SECONDS;
private long lastBufferInsert = -1;
private int maxOrderIndex = -1; private int maxOrderIndex = -1;
public final void insertEvent(final AbstractEventRecord event) { public final void insertEvent(final AbstractEventRecord event) {
updatedInThisPeriod = true; resetTimeoutCounter();
setMaxOrderIndex(event); setMaxOrderIndex(event);
if ((event instanceof AbstractBeforeEventRecord)) { if ((event instanceof AbstractBeforeEventRecord)) {
...@@ -47,20 +47,16 @@ class TraceReconstructionBuffer { ...@@ -47,20 +47,16 @@ class TraceReconstructionBuffer {
events.add(event); events.add(event);
} }
public boolean isUpdatedInThisPeriod() { public boolean isTimedout() {
return updatedInThisPeriod; return updatedInThisPeriodCounter <= 0;
}
public void setUpdatedInThisPeriod(final boolean updated) {
updatedInThisPeriod = updated;
} }
public final long getLastBufferInsert() { public void decreaseTimeoutCounter() {
return lastBufferInsert; updatedInThisPeriodCounter--;
} }
public final void updateLastBufferInsert() { public void resetTimeoutCounter() {
lastBufferInsert = TimeProvider.getCurrentTimestamp(); updatedInThisPeriodCounter = TIMEOUT_IN_SECONDS;
} }
private final int setMaxOrderIndex(final AbstractEventRecord event) { private final int setMaxOrderIndex(final AbstractEventRecord event) {
......
...@@ -18,18 +18,15 @@ import explorviz.live_trace_processing.record.trace.HostApplicationMetaDataRecor ...@@ -18,18 +18,15 @@ import explorviz.live_trace_processing.record.trace.HostApplicationMetaDataRecor
import explorviz.live_trace_processing.record.trace.Trace; import explorviz.live_trace_processing.record.trace.Trace;
public final class TraceReconstructionFilter extends AbstractFilter implements Runnable { public final class TraceReconstructionFilter extends AbstractFilter implements Runnable {
private final long maxTraceTimeout;
private final Map<AbstractEventRecord, TraceReconstructionBuffer> traceIdAndHost2trace = new TreeMap<AbstractEventRecord, TraceReconstructionBuffer>( private final Map<AbstractEventRecord, TraceReconstructionBuffer> traceIdAndHost2trace = new TreeMap<AbstractEventRecord, TraceReconstructionBuffer>(
new TraceIdAndHostComperator()); new TraceIdAndHostComperator());
private final PipesMerger<IRecord> traceReconstructionMerger; private final PipesMerger<IRecord> traceReconstructionMerger;
public TraceReconstructionFilter(final PipesMerger<IRecord> traceReconstructionMerger, public TraceReconstructionFilter(final PipesMerger<IRecord> traceReconstructionMerger,
final long maxTraceTimeout, final Queue<IRecord> receiverQueue) { final Queue<IRecord> receiverQueue) {
super(receiverQueue, "Reconstructed traces/sec", 1000); super(receiverQueue, "Reconstructed traces/sec", 1000);
this.traceReconstructionMerger = traceReconstructionMerger; this.traceReconstructionMerger = traceReconstructionMerger;
this.maxTraceTimeout = maxTraceTimeout;
} }
@Override @Override
...@@ -71,8 +68,6 @@ public final class TraceReconstructionFilter extends AbstractFilter implements R ...@@ -71,8 +68,6 @@ public final class TraceReconstructionFilter extends AbstractFilter implements R
} }
private void checkForTimeouts(final long timestamp) { private void checkForTimeouts(final long timestamp) {
final long traceTimeout = timestamp - maxTraceTimeout;
System.out.println("Trace Recon Filter Size: " + traceIdAndHost2trace.size()); System.out.println("Trace Recon Filter Size: " + traceIdAndHost2trace.size());
final Iterator<Entry<AbstractEventRecord, TraceReconstructionBuffer>> iterator = traceIdAndHost2trace final Iterator<Entry<AbstractEventRecord, TraceReconstructionBuffer>> iterator = traceIdAndHost2trace
...@@ -81,18 +76,14 @@ public final class TraceReconstructionFilter extends AbstractFilter implements R ...@@ -81,18 +76,14 @@ public final class TraceReconstructionFilter extends AbstractFilter implements R
while (iterator.hasNext()) { while (iterator.hasNext()) {
final Entry<AbstractEventRecord, TraceReconstructionBuffer> entry = iterator.next(); final Entry<AbstractEventRecord, TraceReconstructionBuffer> entry = iterator.next();
final TraceReconstructionBuffer traceBuffer = entry.getValue(); final TraceReconstructionBuffer traceBuffer = entry.getValue();
if (traceBuffer.isUpdatedInThisPeriod()) { if (!traceBuffer.isTimedout()) {
traceBuffer.setUpdatedInThisPeriod(false); traceBuffer.decreaseTimeoutCounter();
} else { } else {
traceBuffer.updateLastBufferInsert();
if (traceBuffer.getLastBufferInsert() <= traceTimeout) {
deliver(traceBuffer.toTrace(false)); deliver(traceBuffer.toTrace(false));
iterator.remove(); iterator.remove();
} }
} }
} }
}
private void terminate() { private void terminate() {
for (final TraceReconstructionBuffer entry : traceIdAndHost2trace.values()) { for (final TraceReconstructionBuffer entry : traceIdAndHost2trace.values()) {
......
...@@ -21,6 +21,8 @@ import explorviz.live_trace_processing.record.trace.Trace; ...@@ -21,6 +21,8 @@ import explorviz.live_trace_processing.record.trace.Trace;
import explorviz.live_trace_processing.record.trace.TraceComperator; import explorviz.live_trace_processing.record.trace.TraceComperator;
public class TracesSummarizationFilter extends AbstractFilter { public class TracesSummarizationFilter extends AbstractFilter {
private static final String INVALID_TRACE = "invalid trace...";
private final long maxCollectionDuration; private final long maxCollectionDuration;
private final Map<Trace, TracesSummarizationBuffer> trace2buffer = new TreeMap<Trace, TracesSummarizationBuffer>( private final Map<Trace, TracesSummarizationBuffer> trace2buffer = new TreeMap<Trace, TracesSummarizationBuffer>(
...@@ -50,7 +52,7 @@ public class TracesSummarizationFilter extends AbstractFilter { ...@@ -50,7 +52,7 @@ public class TracesSummarizationFilter extends AbstractFilter {
insertIntoBuffer(trace); insertIntoBuffer(trace);
} else { } else {
// trace with remote records or invalid trace cant be reduced // trace with remote records or invalid trace cant be reduced
System.out.println("invalid trace... size:" + trace.getTraceEvents().size()); System.out.println(INVALID_TRACE);
// System.out.println("invalid trace... trace:" + // System.out.println("invalid trace... trace:" +
// trace.toString()); // trace.toString());
makeTraceElementsAccumulator(trace); makeTraceElementsAccumulator(trace);
......
...@@ -25,7 +25,7 @@ public class FilterConfiguration { ...@@ -25,7 +25,7 @@ public class FilterConfiguration {
final PipesMerger<IRecord> traceReconstructionMerger = new PipesMerger<IRecord>( final PipesMerger<IRecord> traceReconstructionMerger = new PipesMerger<IRecord>(
Constants.TCP_READER_DISRUPTOR_SIZE); Constants.TCP_READER_DISRUPTOR_SIZE);
new TraceReconstructionFilter(traceReconstructionMerger, TimeUnit.SECONDS.toNanos(5), new TraceReconstructionFilter(traceReconstructionMerger,
traceReductionConnector.registerProducer()).start(); traceReductionConnector.registerProducer()).start();
new TCPReader(configuration.getIntProperty(ConfigurationFactory.READER_LISTENING_PORT, new TCPReader(configuration.getIntProperty(ConfigurationFactory.READER_LISTENING_PORT,
......
...@@ -38,7 +38,7 @@ public final class TCPReader implements IPeriodicTimeSignalReceiver { ...@@ -38,7 +38,7 @@ public final class TCPReader implements IPeriodicTimeSignalReceiver {
@Override @Override
public void periodicTimeSignal(final long timestamp) { public void periodicTimeSignal(final long timestamp) {
final TimedPeriodRecord periodRecord = new TimedPeriodRecord(); final TimedPeriodRecord periodRecord = new TimedPeriodRecord();
while (!periodicSignalQueue.offer(periodRecord)) { while ((periodicSignalQueue != null) && !periodicSignalQueue.offer(periodRecord)) {
try { try {
Thread.sleep(1); Thread.sleep(1);
} catch (final InterruptedException e) { } catch (final InterruptedException e) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment