Skip to content
Snippets Groups Projects
TcpTraceReconstructionActor.java 4.3 KiB
Newer Older
package experiment.fse15.tcpreconstruction;
Christian Wulf's avatar
Christian Wulf committed

import java.util.concurrent.TimeUnit;

import kieker.common.record.flow.IFlowRecord;
import kieker.common.record.flow.trace.AbstractTraceEvent;
import kieker.common.record.flow.trace.TraceMetadata;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import teetime.stage.trace.traceReconstruction.EventBasedTrace;
Christian Wulf's avatar
Christian Wulf committed
import teetime.util.concurrent.hashmap.ConcurrentHashMapWithDefault;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.UntypedActor;
Christian Wulf's avatar
Christian Wulf committed
import experiment.fse15.StopMessage;
import experiment.fse15.WatchActor.WatchMe;
Christian Wulf's avatar
Christian Wulf committed

public class TcpTraceReconstructionActor extends UntypedActor {

	private static final Logger LOGGER = LoggerFactory.getLogger(TcpTraceReconstructionActor.class);

	private TimeUnit timeunit;
	private long maxTraceDuration = Long.MAX_VALUE;
	private long maxTraceTimeout = Long.MAX_VALUE;
	private long maxEncounteredLoggingTimestamp = -1;

	private final ConcurrentHashMapWithDefault<Long, EventBasedTrace> traceId2trace;
	private final ActorRef validTraceReceiver;
	private final ActorRef invalidTraceReceiver;
Christian Wulf's avatar
Christian Wulf committed

	public TcpTraceReconstructionActor(final ConcurrentHashMapWithDefault<Long, EventBasedTrace> traceId2trace,
Christian Wulf's avatar
Christian Wulf committed
			final Props validTraceReceiverProps, final Props invalidTraceReceiverProps, ActorRef watchActor) {
Christian Wulf's avatar
Christian Wulf committed
		super();
		this.traceId2trace = traceId2trace;
		this.validTraceReceiver = context().actorOf(validTraceReceiverProps);
		this.invalidTraceReceiver = context().actorOf(invalidTraceReceiverProps);
Christian Wulf's avatar
Christian Wulf committed

		watchActor.tell(new WatchMe(getSelf()), getSelf());
Christian Wulf's avatar
Christian Wulf committed
	}

	@Override
	public void onReceive(final Object message) throws Exception {
Christian Wulf's avatar
Christian Wulf committed
		if (message instanceof IFlowRecord) {
			final Long traceId = this.reconstructTrace((IFlowRecord) message);
			if (traceId != null) {
				this.put(traceId, true);
			}
		} else if (message instanceof StopMessage) {
			this.onTerminating();
			validTraceReceiver.tell(message, getSelf());
			invalidTraceReceiver.tell(message, getSelf());
Christian Wulf's avatar
Christian Wulf committed
		} else {
			unhandled(message);
		}
	}

	private Long reconstructTrace(final IFlowRecord record) {
		Long traceId = null;
		if (record instanceof TraceMetadata) {
			traceId = ((TraceMetadata) record).getTraceId();
			EventBasedTrace traceBuffer = this.traceId2trace.getOrCreate(traceId);
Christian Wulf's avatar
Christian Wulf committed

			traceBuffer.setTrace((TraceMetadata) record);
		} else if (record instanceof AbstractTraceEvent) {
			traceId = ((AbstractTraceEvent) record).getTraceId();
			EventBasedTrace traceBuffer = this.traceId2trace.getOrCreate(traceId);
Christian Wulf's avatar
Christian Wulf committed

			traceBuffer.insertEvent((AbstractTraceEvent) record);
		}

		return traceId;
	}

	private void put(final Long traceId, final boolean onlyIfFinished) {
		final EventBasedTrace traceBuffer = this.traceId2trace.get(traceId);
Christian Wulf's avatar
Christian Wulf committed
		if (traceBuffer != null) { // null-check to check whether the trace has already been sent and removed
			boolean shouldSend;
			if (onlyIfFinished) {
				shouldSend = traceBuffer.isFinished();
			} else {
				shouldSend = true;
			}

			if (shouldSend) {
				boolean removed = (null != this.traceId2trace.remove(traceId));
				if (removed) {
					this.sendTraceBuffer(traceBuffer);
				}
			}
		}
	}

	@Override
	public void postStop() throws Exception {
		LOGGER.info("stopped");
		super.postStop();
	}

	private void onTerminating() {
		LOGGER.trace("traces left: " + traceId2trace.keySet());
		for (Long traceId : this.traceId2trace.keySet()) {
			this.put(traceId, false);
		}
	}

	private void sendTraceBuffer(final EventBasedTrace traceBuffer) {
Christian Wulf's avatar
Christian Wulf committed
		ActorRef receiver = (traceBuffer.isInvalid()) ? invalidTraceReceiver : validTraceReceiver;
		receiver.tell(traceBuffer, getSelf());
Christian Wulf's avatar
Christian Wulf committed
	}

	public TimeUnit getTimeunit() {
		return timeunit;
	}

	public void setTimeunit(final TimeUnit timeunit) {
Christian Wulf's avatar
Christian Wulf committed
		this.timeunit = timeunit;
	}

	public long getMaxTraceDuration() {
		return maxTraceDuration;
	}

	public void setMaxTraceDuration(final long maxTraceDuration) {
Christian Wulf's avatar
Christian Wulf committed
		this.maxTraceDuration = maxTraceDuration;
	}

	public long getMaxTraceTimeout() {
		return maxTraceTimeout;
	}

	public void setMaxTraceTimeout(final long maxTraceTimeout) {
Christian Wulf's avatar
Christian Wulf committed
		this.maxTraceTimeout = maxTraceTimeout;
	}

	public long getMaxEncounteredLoggingTimestamp() {
		return maxEncounteredLoggingTimestamp;
	}

	public void setMaxEncounteredLoggingTimestamp(final long maxEncounteredLoggingTimestamp) {
Christian Wulf's avatar
Christian Wulf committed
		this.maxEncounteredLoggingTimestamp = maxEncounteredLoggingTimestamp;
	}
}