package akka.io.net;

import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

import kieker.common.record.flow.IFlowRecord;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import teetime.stage.trace.traceReconstruction.EventBasedTrace;
import teetime.util.ISendTraceBuffer;
import teetime.util.TraceReconstructor;
import teetime.util.concurrent.hashmap.ConcurrentHashMapWithDefault;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.common.StopMessage;
import akka.common.WatchActor.WatchMe;

public class TcpTraceReconstructionActor extends UntypedActor implements ISendTraceBuffer {

	private static final Logger LOGGER = LoggerFactory.getLogger(TcpTraceReconstructionActor.class);

	private TimeUnit timeunit;
	private long maxTraceDuration = Long.MAX_VALUE;
	private long maxTraceTimeout = Long.MAX_VALUE;
	private long maxEncounteredLoggingTimestamp = -1;

	private final TraceReconstructor reconstructor;
	private final ActorRef validTraceReceiver;
	private final ActorRef invalidTraceReceiver;
	private final Set<Thread> threads = Collections.newSetFromMap(new ConcurrentHashMap<Thread, Boolean>());

	public TcpTraceReconstructionActor(final ConcurrentHashMapWithDefault<Long, EventBasedTrace> traceId2trace,
			final Props validTraceReceiverProps, final Props invalidTraceReceiverProps, final ActorRef watchActor) {
		super();
		this.reconstructor = new TraceReconstructor(traceId2trace, this);
		this.validTraceReceiver = context().actorOf(validTraceReceiverProps, "Valid-receiver");
		this.invalidTraceReceiver = context().actorOf(invalidTraceReceiverProps, "Invalid-receiver");

		watchActor.tell(new WatchMe(getSelf()), getSelf());
	}

	@Override
	public void onReceive(final Object message) throws Exception {
		if (LOGGER.isTraceEnabled()) {
			LOGGER.trace("onReceive: " + message);
			LOGGER.trace("Current thread: " + Thread.currentThread());
		}

		threads.add(Thread.currentThread());

		if (message instanceof IFlowRecord) {
			final IFlowRecord flowRecord = (IFlowRecord) message;
			reconstructor.execute(flowRecord);
		} else if (message instanceof StopMessage) {
			this.onTerminating();
			validTraceReceiver.tell(message, getSelf());
			invalidTraceReceiver.tell(message, getSelf());
		} else {
			unhandled(message);
		}
	}

	@Override
	public void postStop() throws Exception {
		LOGGER.info("stopped");
		super.postStop();
	}

	private void onTerminating() {
		LOGGER.debug("traces left: " + reconstructor.getTraceId2trace().keySet());
		LOGGER.trace("Current thread: " + Thread.currentThread());
		reconstructor.terminate();
		LOGGER.debug("Threads (" + threads.size() + "): " + threads);
	}

	@Override
	public void sendTraceBuffer(final EventBasedTrace traceBuffer) {
		ActorRef receiver = (traceBuffer.isInvalid()) ? invalidTraceReceiver : validTraceReceiver;
		receiver.tell(traceBuffer, getSelf());
	}

	public TimeUnit getTimeunit() {
		return timeunit;
	}

	public void setTimeunit(final TimeUnit timeunit) {
		this.timeunit = timeunit;
	}

	public long getMaxTraceDuration() {
		return maxTraceDuration;
	}

	public void setMaxTraceDuration(final long maxTraceDuration) {
		this.maxTraceDuration = maxTraceDuration;
	}

	public long getMaxTraceTimeout() {
		return maxTraceTimeout;
	}

	public void setMaxTraceTimeout(final long maxTraceTimeout) {
		this.maxTraceTimeout = maxTraceTimeout;
	}

	public long getMaxEncounteredLoggingTimestamp() {
		return maxEncounteredLoggingTimestamp;
	}

	public void setMaxEncounteredLoggingTimestamp(final long maxEncounteredLoggingTimestamp) {
		this.maxEncounteredLoggingTimestamp = maxEncounteredLoggingTimestamp;
	}
}