Skip to content
Snippets Groups Projects
Commit 536a86d0 authored by Christian Wulf's avatar Christian Wulf
Browse files

adapted to new trace processing workflow

parent f8e90850
No related branches found
No related tags found
No related merge requests found
...@@ -10,8 +10,8 @@ import kiekerdays.StopMessage; ...@@ -10,8 +10,8 @@ import kiekerdays.StopMessage;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import teetime.stage.trace.traceReconstruction.EventBasedTrace;
import teetime.util.concurrent.hashmap.ConcurrentHashMapWithDefault; import teetime.util.concurrent.hashmap.ConcurrentHashMapWithDefault;
import teetime.util.concurrent.hashmap.TraceBuffer;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import akka.actor.Props; import akka.actor.Props;
import akka.actor.UntypedActor; import akka.actor.UntypedActor;
...@@ -25,12 +25,12 @@ public class TcpTraceReconstructionActor extends UntypedActor { ...@@ -25,12 +25,12 @@ public class TcpTraceReconstructionActor extends UntypedActor {
private long maxTraceTimeout = Long.MAX_VALUE; private long maxTraceTimeout = Long.MAX_VALUE;
private long maxEncounteredLoggingTimestamp = -1; private long maxEncounteredLoggingTimestamp = -1;
private final ConcurrentHashMapWithDefault<Long, TraceBuffer> traceId2trace; private final ConcurrentHashMapWithDefault<Long, EventBasedTrace> traceId2trace;
private ActorRef validTraceReceiver; private final ActorRef validTraceReceiver;
private ActorRef invalidTraceReceiver; private final ActorRef invalidTraceReceiver;
public TcpTraceReconstructionActor(ConcurrentHashMapWithDefault<Long, TraceBuffer> traceId2trace, public TcpTraceReconstructionActor(final ConcurrentHashMapWithDefault<Long, EventBasedTrace> traceId2trace,
Props validTraceReceiverProps, Props invalidTraceReceiverProps) { final Props validTraceReceiverProps, final Props invalidTraceReceiverProps) {
super(); super();
this.traceId2trace = traceId2trace; this.traceId2trace = traceId2trace;
this.validTraceReceiver = context().actorOf(validTraceReceiverProps); this.validTraceReceiver = context().actorOf(validTraceReceiverProps);
...@@ -38,7 +38,7 @@ public class TcpTraceReconstructionActor extends UntypedActor { ...@@ -38,7 +38,7 @@ public class TcpTraceReconstructionActor extends UntypedActor {
} }
@Override @Override
public void onReceive(Object message) throws Exception { public void onReceive(final Object message) throws Exception {
if (message instanceof IFlowRecord) { if (message instanceof IFlowRecord) {
final Long traceId = this.reconstructTrace((IFlowRecord) message); final Long traceId = this.reconstructTrace((IFlowRecord) message);
if (traceId != null) { if (traceId != null) {
...@@ -57,12 +57,12 @@ public class TcpTraceReconstructionActor extends UntypedActor { ...@@ -57,12 +57,12 @@ public class TcpTraceReconstructionActor extends UntypedActor {
Long traceId = null; Long traceId = null;
if (record instanceof TraceMetadata) { if (record instanceof TraceMetadata) {
traceId = ((TraceMetadata) record).getTraceId(); traceId = ((TraceMetadata) record).getTraceId();
TraceBuffer traceBuffer = this.traceId2trace.getOrCreate(traceId); EventBasedTrace traceBuffer = this.traceId2trace.getOrCreate(traceId);
traceBuffer.setTrace((TraceMetadata) record); traceBuffer.setTrace((TraceMetadata) record);
} else if (record instanceof AbstractTraceEvent) { } else if (record instanceof AbstractTraceEvent) {
traceId = ((AbstractTraceEvent) record).getTraceId(); traceId = ((AbstractTraceEvent) record).getTraceId();
TraceBuffer traceBuffer = this.traceId2trace.getOrCreate(traceId); EventBasedTrace traceBuffer = this.traceId2trace.getOrCreate(traceId);
traceBuffer.insertEvent((AbstractTraceEvent) record); traceBuffer.insertEvent((AbstractTraceEvent) record);
} }
...@@ -71,7 +71,7 @@ public class TcpTraceReconstructionActor extends UntypedActor { ...@@ -71,7 +71,7 @@ public class TcpTraceReconstructionActor extends UntypedActor {
} }
private void put(final Long traceId, final boolean onlyIfFinished) { private void put(final Long traceId, final boolean onlyIfFinished) {
final TraceBuffer traceBuffer = this.traceId2trace.get(traceId); final EventBasedTrace traceBuffer = this.traceId2trace.get(traceId);
if (traceBuffer != null) { // null-check to check whether the trace has already been sent and removed if (traceBuffer != null) { // null-check to check whether the trace has already been sent and removed
boolean shouldSend; boolean shouldSend;
if (onlyIfFinished) { if (onlyIfFinished) {
...@@ -102,16 +102,16 @@ public class TcpTraceReconstructionActor extends UntypedActor { ...@@ -102,16 +102,16 @@ public class TcpTraceReconstructionActor extends UntypedActor {
} }
} }
private void sendTraceBuffer(final TraceBuffer traceBuffer) { private void sendTraceBuffer(final EventBasedTrace traceBuffer) {
ActorRef receiver = (traceBuffer.isInvalid()) ? invalidTraceReceiver : validTraceReceiver; ActorRef receiver = (traceBuffer.isInvalid()) ? invalidTraceReceiver : validTraceReceiver;
receiver.tell(traceBuffer.toTraceEvents(), getSelf()); receiver.tell(traceBuffer, getSelf());
} }
public TimeUnit getTimeunit() { public TimeUnit getTimeunit() {
return timeunit; return timeunit;
} }
public void setTimeunit(TimeUnit timeunit) { public void setTimeunit(final TimeUnit timeunit) {
this.timeunit = timeunit; this.timeunit = timeunit;
} }
...@@ -119,7 +119,7 @@ public class TcpTraceReconstructionActor extends UntypedActor { ...@@ -119,7 +119,7 @@ public class TcpTraceReconstructionActor extends UntypedActor {
return maxTraceDuration; return maxTraceDuration;
} }
public void setMaxTraceDuration(long maxTraceDuration) { public void setMaxTraceDuration(final long maxTraceDuration) {
this.maxTraceDuration = maxTraceDuration; this.maxTraceDuration = maxTraceDuration;
} }
...@@ -127,7 +127,7 @@ public class TcpTraceReconstructionActor extends UntypedActor { ...@@ -127,7 +127,7 @@ public class TcpTraceReconstructionActor extends UntypedActor {
return maxTraceTimeout; return maxTraceTimeout;
} }
public void setMaxTraceTimeout(long maxTraceTimeout) { public void setMaxTraceTimeout(final long maxTraceTimeout) {
this.maxTraceTimeout = maxTraceTimeout; this.maxTraceTimeout = maxTraceTimeout;
} }
...@@ -135,7 +135,7 @@ public class TcpTraceReconstructionActor extends UntypedActor { ...@@ -135,7 +135,7 @@ public class TcpTraceReconstructionActor extends UntypedActor {
return maxEncounteredLoggingTimestamp; return maxEncounteredLoggingTimestamp;
} }
public void setMaxEncounteredLoggingTimestamp(long maxEncounteredLoggingTimestamp) { public void setMaxEncounteredLoggingTimestamp(final long maxEncounteredLoggingTimestamp) {
this.maxEncounteredLoggingTimestamp = maxEncounteredLoggingTimestamp; this.maxEncounteredLoggingTimestamp = maxEncounteredLoggingTimestamp;
} }
} }
...@@ -12,8 +12,9 @@ import kiekerdays.tcpreader.TcpReaderActor; ...@@ -12,8 +12,9 @@ import kiekerdays.tcpreader.TcpReaderActor;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import teetime.stage.trace.traceReconstruction.EventBasedTrace;
import teetime.stage.trace.traceReconstruction.EventBasedTraceFactory;
import teetime.util.concurrent.hashmap.ConcurrentHashMapWithDefault; import teetime.util.concurrent.hashmap.ConcurrentHashMapWithDefault;
import teetime.util.concurrent.hashmap.TraceBuffer;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import akka.actor.ActorSelection; import akka.actor.ActorSelection;
import akka.actor.ActorSystem; import akka.actor.ActorSystem;
...@@ -24,14 +25,15 @@ public class TraceReconstructionAnalysisConfiguration { ...@@ -24,14 +25,15 @@ public class TraceReconstructionAnalysisConfiguration {
private static final Logger LOGGER = LoggerFactory.getLogger(TraceReconstructionAnalysisConfiguration.class); private static final Logger LOGGER = LoggerFactory.getLogger(TraceReconstructionAnalysisConfiguration.class);
private final ConcurrentHashMapWithDefault<Long, TraceBuffer> traceId2trace = new ConcurrentHashMapWithDefault<Long, TraceBuffer>(new TraceBuffer()); private final ConcurrentHashMapWithDefault<Long, EventBasedTrace> traceId2trace;
private ActorRef tcpReaderActor; private ActorRef tcpReaderActor;
private Collection<Object> validTraces = new LinkedList<>(); private final Collection<Object> validTraces = new LinkedList<>();
private ActorRef rootActor; private ActorRef rootActor;
public TraceReconstructionAnalysisConfiguration() { public TraceReconstructionAnalysisConfiguration() {
traceId2trace = new ConcurrentHashMapWithDefault<>(EventBasedTraceFactory.INSTANCE);
init(); init();
} }
...@@ -39,15 +41,18 @@ public class TraceReconstructionAnalysisConfiguration { ...@@ -39,15 +41,18 @@ public class TraceReconstructionAnalysisConfiguration {
ActorSystem system = ActorSystem.create("TraceReductionAnalysisConfiguration"); ActorSystem system = ActorSystem.create("TraceReductionAnalysisConfiguration");
// specify actors // specify actors
// Props collectorValidProps = Props.create(Collector.class, validTraces); // do not use a collector since it pollutes the heap // Props collectorValidProps = Props.create(Collector.class, validTraces); // do not use a collector since it
// pollutes the heap
Props collectorValidProps = Props.create(CounterActor.class); Props collectorValidProps = Props.create(CounterActor.class);
Props collectorInvalidProps = Props.create(CollectorActor.class); Props collectorInvalidProps = Props.create(CollectorActor.class);
Props tcpTraceReconstructionProps = Props.create(TcpTraceReconstructionActor.class, traceId2trace, collectorValidProps, collectorInvalidProps); Props tcpTraceReconstructionProps = Props.create(TcpTraceReconstructionActor.class, traceId2trace,
collectorValidProps, collectorInvalidProps);
Props tcpReaderProps = Props.create(TcpReaderActor.class, tcpTraceReconstructionProps); Props tcpReaderProps = Props.create(TcpReaderActor.class, tcpTraceReconstructionProps);
// create actors // create actors
final ActorRef tcpReaderActor = system.actorOf(tcpReaderProps, "TcpReaderActor"); final ActorRef tcpReaderActor = system.actorOf(tcpReaderProps, "TcpReaderActor");
// final ActorRef tcpTraceReconstructonActor = system.actorOf(tcpTraceReconstructionProps, "TcpTraceReconstructionActor"); // final ActorRef tcpTraceReconstructonActor = system.actorOf(tcpTraceReconstructionProps,
// "TcpTraceReconstructionActor");
// final ActorRef validTraceReceiver = system.actorOf(collectorValidProps, "CollectorValid"); // final ActorRef validTraceReceiver = system.actorOf(collectorValidProps, "CollectorValid");
// final ActorRef invalidTraceReceiver = system.actorOf(collectorInvalidProps, "CollectorInvalid"); // final ActorRef invalidTraceReceiver = system.actorOf(collectorInvalidProps, "CollectorInvalid");
// final ActorRef tcpReaderActor = system.actorOf(Props.create(TcpReaderActor.class, // final ActorRef tcpReaderActor = system.actorOf(Props.create(TcpReaderActor.class,
......
...@@ -4,12 +4,13 @@ import java.util.Iterator; ...@@ -4,12 +4,13 @@ import java.util.Iterator;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.NavigableMap; import java.util.NavigableMap;
import kieker.analysis.plugin.filter.flow.TraceEventRecords;
import kiekerdays.StopMessage; import kiekerdays.StopMessage;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import teetime.stage.trace.traceReconstruction.EventBasedTrace;
import teetime.stage.trace.traceReduction.TraceAggregationBuffer;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import akka.actor.Props; import akka.actor.Props;
import akka.actor.UntypedActor; import akka.actor.UntypedActor;
...@@ -18,23 +19,23 @@ public class TcpTraceReductionActor extends UntypedActor { ...@@ -18,23 +19,23 @@ public class TcpTraceReductionActor extends UntypedActor {
private static final Logger LOGGER = LoggerFactory.getLogger(TcpTraceReductionActor.class); private static final Logger LOGGER = LoggerFactory.getLogger(TcpTraceReductionActor.class);
private final NavigableMap<TraceEventRecords, TraceAggregationBuffer> trace2buffer; private final NavigableMap<EventBasedTrace, TraceAggregationBuffer> trace2buffer;
private ActorRef traceAggregationReceiver; private final ActorRef traceAggregationReceiver;
private long maxCollectionDurationInNs; private long maxCollectionDurationInNs;
public TcpTraceReductionActor(final NavigableMap<TraceEventRecords, TraceAggregationBuffer> trace2buffer, Props traceAggregationProps) { public TcpTraceReductionActor(final NavigableMap<EventBasedTrace, TraceAggregationBuffer> trace2buffer,
final Props traceAggregationProps) {
this.trace2buffer = trace2buffer; this.trace2buffer = trace2buffer;
// output "ports" // output "ports"
this.traceAggregationReceiver = context().actorOf(traceAggregationProps); this.traceAggregationReceiver = context().actorOf(traceAggregationProps);
} }
@Override @Override
public void onReceive(Object message) throws Exception { public void onReceive(final Object message) throws Exception {
if (message instanceof TraceEventRecords) { if (message instanceof EventBasedTrace) {
TraceEventRecords traceEventRecords = (TraceEventRecords) message; EventBasedTrace eventBasedTrace = (EventBasedTrace) message;
final long timestamp = System.nanoTime(); this.countSameTraces(eventBasedTrace);
this.countSameTraces(traceEventRecords, timestamp);
} else if (message instanceof Long) { } else if (message instanceof Long) {
Long timestampInNs = (Long) message; Long timestampInNs = (Long) message;
if (timestampInNs != null) { if (timestampInNs != null) {
...@@ -48,11 +49,11 @@ public class TcpTraceReductionActor extends UntypedActor { ...@@ -48,11 +49,11 @@ public class TcpTraceReductionActor extends UntypedActor {
} }
} }
private void countSameTraces(final TraceEventRecords traceEventRecords, final long timestamp) { private void countSameTraces(final EventBasedTrace traceEventRecords) {
synchronized (this.trace2buffer) { synchronized (this.trace2buffer) {
TraceAggregationBuffer traceBuffer = this.trace2buffer.get(traceEventRecords); TraceAggregationBuffer traceBuffer = this.trace2buffer.get(traceEventRecords);
if (traceBuffer == null) { if (traceBuffer == null) {
traceBuffer = new TraceAggregationBuffer(timestamp, traceEventRecords); traceBuffer = new TraceAggregationBuffer(traceEventRecords);
this.trace2buffer.put(traceEventRecords, traceBuffer); this.trace2buffer.put(traceEventRecords, traceBuffer);
} }
traceBuffer.count(); traceBuffer.count();
...@@ -62,21 +63,21 @@ public class TcpTraceReductionActor extends UntypedActor { ...@@ -62,21 +63,21 @@ public class TcpTraceReductionActor extends UntypedActor {
private void processTimeoutQueue(final long timestampInNs) { private void processTimeoutQueue(final long timestampInNs) {
final long bufferTimeoutInNs = timestampInNs - this.maxCollectionDurationInNs; final long bufferTimeoutInNs = timestampInNs - this.maxCollectionDurationInNs;
synchronized (this.trace2buffer) { synchronized (this.trace2buffer) {
for (final Iterator<Entry<TraceEventRecords, TraceAggregationBuffer>> iterator = this.trace2buffer.entrySet().iterator(); iterator.hasNext();) { for (final Iterator<Entry<EventBasedTrace, TraceAggregationBuffer>> iterator = this.trace2buffer
.entrySet().iterator(); iterator.hasNext();) {
final TraceAggregationBuffer traceBuffer = iterator.next().getValue(); final TraceAggregationBuffer traceBuffer = iterator.next().getValue();
// this.logger.debug("traceBuffer.getBufferCreatedTimestamp(): " + traceBuffer.getBufferCreatedTimestamp() + " vs. " + bufferTimeoutInNs // this.logger.debug("traceBuffer.getBufferCreatedTimestamp(): " +
// traceBuffer.getBufferCreatedTimestamp() + " vs. " + bufferTimeoutInNs
// + " (bufferTimeoutInNs)"); // + " (bufferTimeoutInNs)");
if (traceBuffer.getBufferCreatedTimestamp() <= bufferTimeoutInNs) { if (traceBuffer.getBufferCreatedTimestampInNs() <= bufferTimeoutInNs) {
final TraceEventRecords record = traceBuffer.getTraceEventRecords(); send(traceBuffer);
record.setCount(traceBuffer.getCount());
send(record);
} }
iterator.remove(); iterator.remove();
} }
} }
} }
private void send(final TraceEventRecords record) { private void send(final TraceAggregationBuffer record) {
traceAggregationReceiver.tell(record, getSelf()); traceAggregationReceiver.tell(record, getSelf());
} }
...@@ -89,11 +90,9 @@ public class TcpTraceReductionActor extends UntypedActor { ...@@ -89,11 +90,9 @@ public class TcpTraceReductionActor extends UntypedActor {
private void onTerminating() { private void onTerminating() {
LOGGER.debug("Terminating..."); LOGGER.debug("Terminating...");
synchronized (this.trace2buffer) { // BETTER hide and improve synchronization in the buffer synchronized (this.trace2buffer) { // BETTER hide and improve synchronization in the buffer
for (final Entry<TraceEventRecords, TraceAggregationBuffer> entry : this.trace2buffer.entrySet()) { for (final Entry<EventBasedTrace, TraceAggregationBuffer> entry : this.trace2buffer.entrySet()) {
final TraceAggregationBuffer buffer = entry.getValue(); final TraceAggregationBuffer aggregatedTrace = entry.getValue();
final TraceEventRecords record = buffer.getTraceEventRecords(); send(aggregatedTrace);
record.setCount(buffer.getCount());
send(record);
} }
this.trace2buffer.clear(); this.trace2buffer.clear();
} }
......
package kiekerdays.tcptracereduction;
import kieker.analysis.plugin.filter.flow.TraceEventRecords;
/**
* Buffer for similar traces that are to be aggregated into a single trace.
*
* @author Jan Waller, Florian Biss
*/
public final class TraceAggregationBuffer {
private final long bufferCreatedTimestamp;
private final TraceEventRecords aggregatedTrace;
private int countOfAggregatedTraces;
public TraceAggregationBuffer(final long bufferCreatedTimestamp, final TraceEventRecords trace) {
this.bufferCreatedTimestamp = bufferCreatedTimestamp;
this.aggregatedTrace = trace;
}
public void count() {
this.countOfAggregatedTraces++;
}
public long getBufferCreatedTimestamp() {
return this.bufferCreatedTimestamp;
}
public TraceEventRecords getTraceEventRecords() {
return this.aggregatedTrace;
}
public int getCount() {
return this.countOfAggregatedTraces;
}
}
package kiekerdays.tcptracereduction;
import java.io.Serializable;
import java.util.Comparator;
import kieker.analysis.plugin.filter.flow.TraceEventRecords;
import kieker.common.record.flow.trace.AbstractTraceEvent;
import kieker.common.record.flow.trace.operation.AbstractOperationEvent;
import kieker.common.record.flow.trace.operation.AfterOperationFailedEvent;
/**
* @author Jan Waller, Florian Fittkau, Florian Biss
*/
public final class TraceComperator implements Comparator<TraceEventRecords>, Serializable {
private static final long serialVersionUID = 8920766818232517L;
/**
* Creates a new instance of this class.
*/
public TraceComperator() {
// default empty constructor
}
/**
* {@inheritDoc}
*/
@Override
public int compare(final TraceEventRecords t1, final TraceEventRecords t2) {
final AbstractTraceEvent[] recordsT1 = t1.getTraceEvents();
final AbstractTraceEvent[] recordsT2 = t2.getTraceEvents();
if (recordsT1.length != recordsT2.length) {
return recordsT1.length - recordsT2.length;
}
final int cmpHostnames = t1.getTraceMetadata().getHostname()
.compareTo(t2.getTraceMetadata().getHostname());
if (cmpHostnames != 0) {
return cmpHostnames;
}
for (int i = 0; i < recordsT1.length; i++) {
final AbstractTraceEvent recordT1 = recordsT1[i];
final AbstractTraceEvent recordT2 = recordsT2[i];
final int cmpClass = recordT1.getClass().getName()
.compareTo(recordT2.getClass().getName());
if (cmpClass != 0) {
return cmpClass;
}
if (recordT1 instanceof AbstractOperationEvent) {
final int cmpSignature = ((AbstractOperationEvent) recordT1).getOperationSignature()
.compareTo(((AbstractOperationEvent) recordT2).getOperationSignature());
if (cmpSignature != 0) {
return cmpSignature;
}
}
if (recordT1 instanceof AfterOperationFailedEvent) {
final int cmpError = ((AfterOperationFailedEvent) recordT1).getCause().compareTo(
((AfterOperationFailedEvent) recordT2).getCause());
if (cmpError != 0) {
return cmpClass;
}
}
}
// All records match.
return 0;
}
}
...@@ -5,7 +5,6 @@ import java.util.LinkedList; ...@@ -5,7 +5,6 @@ import java.util.LinkedList;
import java.util.NavigableMap; import java.util.NavigableMap;
import java.util.TreeMap; import java.util.TreeMap;
import kieker.analysis.plugin.filter.flow.TraceEventRecords;
import kiekerdays.CounterActor; import kiekerdays.CounterActor;
import kiekerdays.StartMessage; import kiekerdays.StartMessage;
import kiekerdays.StopMessage; import kiekerdays.StopMessage;
...@@ -15,8 +14,11 @@ import kiekerdays.tcpreconstruction.TcpTraceReconstructionActor; ...@@ -15,8 +14,11 @@ import kiekerdays.tcpreconstruction.TcpTraceReconstructionActor;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import teetime.stage.trace.traceReconstruction.EventBasedTrace;
import teetime.stage.trace.traceReconstruction.EventBasedTraceFactory;
import teetime.stage.trace.traceReduction.EventBasedTraceComperator;
import teetime.stage.trace.traceReduction.TraceAggregationBuffer;
import teetime.util.concurrent.hashmap.ConcurrentHashMapWithDefault; import teetime.util.concurrent.hashmap.ConcurrentHashMapWithDefault;
import teetime.util.concurrent.hashmap.TraceBuffer;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import akka.actor.ActorSelection; import akka.actor.ActorSelection;
import akka.actor.ActorSystem; import akka.actor.ActorSystem;
...@@ -24,20 +26,19 @@ import akka.actor.Props; ...@@ -24,20 +26,19 @@ import akka.actor.Props;
public class TraceReductionAnalysisConfiguration { public class TraceReductionAnalysisConfiguration {
private static final String START_MESSAGE = "start";
private static final Logger LOGGER = LoggerFactory.getLogger(TraceReductionAnalysisConfiguration.class); private static final Logger LOGGER = LoggerFactory.getLogger(TraceReductionAnalysisConfiguration.class);
private final ConcurrentHashMapWithDefault<Long, TraceBuffer> traceId2trace = new ConcurrentHashMapWithDefault<Long, TraceBuffer>(new TraceBuffer()); private final ConcurrentHashMapWithDefault<Long, EventBasedTrace> traceId2trace;
private final NavigableMap<TraceEventRecords, TraceAggregationBuffer> trace2Buffer; private final NavigableMap<EventBasedTrace, TraceAggregationBuffer> trace2Buffer;
private ActorRef tcpReaderActor; private ActorRef tcpReaderActor;
private Collection<Object> validTraces = new LinkedList<>(); private final Collection<Object> validTraces = new LinkedList<>();
private ActorRef rootActor; private ActorRef rootActor;
public TraceReductionAnalysisConfiguration() { public TraceReductionAnalysisConfiguration() {
trace2Buffer = new TreeMap<TraceEventRecords, TraceAggregationBuffer>(new TraceComperator()); traceId2trace = new ConcurrentHashMapWithDefault<Long, EventBasedTrace>(EventBasedTraceFactory.INSTANCE);
trace2Buffer = new TreeMap<EventBasedTrace, TraceAggregationBuffer>(new EventBasedTraceComperator());
init(); init();
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment