Newer
Older
import java.util.concurrent.TimeUnit;
import kieker.common.record.flow.IFlowRecord;
import kieker.common.record.flow.trace.AbstractTraceEvent;
import kieker.common.record.flow.trace.TraceMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import teetime.stage.trace.traceReconstruction.EventBasedTrace;
import teetime.util.concurrent.hashmap.ConcurrentHashMapWithDefault;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.UntypedActor;
import experiment.fse15.StopMessage;
import experiment.fse15.WatchActor.WatchMe;
public class TcpTraceReconstructionActor extends UntypedActor {
private static final Logger LOGGER = LoggerFactory.getLogger(TcpTraceReconstructionActor.class);
private TimeUnit timeunit;
private long maxTraceDuration = Long.MAX_VALUE;
private long maxTraceTimeout = Long.MAX_VALUE;
private long maxEncounteredLoggingTimestamp = -1;
private final ConcurrentHashMapWithDefault<Long, EventBasedTrace> traceId2trace;
private final ActorRef validTraceReceiver;
private final ActorRef invalidTraceReceiver;
public TcpTraceReconstructionActor(final ConcurrentHashMapWithDefault<Long, EventBasedTrace> traceId2trace,
final Props validTraceReceiverProps, final Props invalidTraceReceiverProps, ActorRef watchActor) {
super();
this.traceId2trace = traceId2trace;
this.validTraceReceiver = context().actorOf(validTraceReceiverProps);
this.invalidTraceReceiver = context().actorOf(invalidTraceReceiverProps);
watchActor.tell(new WatchMe(getSelf()), getSelf());
public void onReceive(final Object message) throws Exception {
if (message instanceof IFlowRecord) {
final Long traceId = this.reconstructTrace((IFlowRecord) message);
if (traceId != null) {
this.put(traceId, true);
}
} else if (message instanceof StopMessage) {
this.onTerminating();
validTraceReceiver.tell(message, getSelf());
invalidTraceReceiver.tell(message, getSelf());
} else {
unhandled(message);
}
}
private Long reconstructTrace(final IFlowRecord record) {
Long traceId = null;
if (record instanceof TraceMetadata) {
traceId = ((TraceMetadata) record).getTraceId();
EventBasedTrace traceBuffer = this.traceId2trace.getOrCreate(traceId);
traceBuffer.setTrace((TraceMetadata) record);
} else if (record instanceof AbstractTraceEvent) {
traceId = ((AbstractTraceEvent) record).getTraceId();
EventBasedTrace traceBuffer = this.traceId2trace.getOrCreate(traceId);
traceBuffer.insertEvent((AbstractTraceEvent) record);
}
return traceId;
}
private void put(final Long traceId, final boolean onlyIfFinished) {
final EventBasedTrace traceBuffer = this.traceId2trace.get(traceId);
if (traceBuffer != null) { // null-check to check whether the trace has already been sent and removed
boolean shouldSend;
if (onlyIfFinished) {
shouldSend = traceBuffer.isFinished();
} else {
shouldSend = true;
}
if (shouldSend) {
boolean removed = (null != this.traceId2trace.remove(traceId));
if (removed) {
this.sendTraceBuffer(traceBuffer);
}
}
}
}
@Override
public void postStop() throws Exception {
LOGGER.info("stopped");
super.postStop();
}
private void onTerminating() {
LOGGER.trace("traces left: " + traceId2trace.keySet());
for (Long traceId : this.traceId2trace.keySet()) {
this.put(traceId, false);
}
}
private void sendTraceBuffer(final EventBasedTrace traceBuffer) {
ActorRef receiver = (traceBuffer.isInvalid()) ? invalidTraceReceiver : validTraceReceiver;
receiver.tell(traceBuffer, getSelf());
}
public TimeUnit getTimeunit() {
return timeunit;
}
public void setTimeunit(final TimeUnit timeunit) {
this.timeunit = timeunit;
}
public long getMaxTraceDuration() {
return maxTraceDuration;
}
public void setMaxTraceDuration(final long maxTraceDuration) {
this.maxTraceDuration = maxTraceDuration;
}
public long getMaxTraceTimeout() {
return maxTraceTimeout;
}
public void setMaxTraceTimeout(final long maxTraceTimeout) {
this.maxTraceTimeout = maxTraceTimeout;
}
public long getMaxEncounteredLoggingTimestamp() {
return maxEncounteredLoggingTimestamp;
}
public void setMaxEncounteredLoggingTimestamp(final long maxEncounteredLoggingTimestamp) {
this.maxEncounteredLoggingTimestamp = maxEncounteredLoggingTimestamp;
}
}