Skip to content
Snippets Groups Projects
TcpTraceReductionActor.java 3.5 KiB
Newer Older
package kiekerdays.tcptracereduction;

import java.util.Iterator;
import java.util.Map.Entry;
import java.util.NavigableMap;

import kieker.analysis.plugin.filter.flow.TraceEventRecords;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.UntypedActor;

public class TcpTraceReductionActor extends UntypedActor {

	private static final Logger LOGGER = LoggerFactory.getLogger(TcpTraceReductionActor.class);

	private final NavigableMap<TraceEventRecords, TraceAggregationBuffer> trace2buffer;
	private ActorRef traceAggregationReceiver;

	private long maxCollectionDurationInNs;

	public TcpTraceReductionActor(final NavigableMap<TraceEventRecords, TraceAggregationBuffer> trace2buffer, Props traceAggregationProps) {
		this.trace2buffer = trace2buffer;
		// output "ports"
		this.traceAggregationReceiver = context().actorOf(traceAggregationProps);
	}

	@Override
	public void onReceive(Object message) throws Exception {
		if (message instanceof TraceEventRecords) {
			TraceEventRecords traceEventRecords = (TraceEventRecords) message;
			final long timestamp = System.nanoTime();
			this.countSameTraces(traceEventRecords, timestamp);
		} else if (message instanceof Long) {
			Long timestampInNs = (Long) message;
			if (timestampInNs != null) {
				this.processTimeoutQueue(timestampInNs);
			}
		} else {
			unhandled(message);
		}
	}

	private void countSameTraces(final TraceEventRecords traceEventRecords, final long timestamp) {
		synchronized (this.trace2buffer) {
			TraceAggregationBuffer traceBuffer = this.trace2buffer.get(traceEventRecords);
			if (traceBuffer == null) {
				traceBuffer = new TraceAggregationBuffer(timestamp, traceEventRecords);
				this.trace2buffer.put(traceEventRecords, traceBuffer);
			}
			traceBuffer.count();
		}
	}

	private void processTimeoutQueue(final long timestampInNs) {
		final long bufferTimeoutInNs = timestampInNs - this.maxCollectionDurationInNs;
		synchronized (this.trace2buffer) {
			for (final Iterator<Entry<TraceEventRecords, TraceAggregationBuffer>> iterator = this.trace2buffer.entrySet().iterator(); iterator.hasNext();) {
				final TraceAggregationBuffer traceBuffer = iterator.next().getValue();
				// this.logger.debug("traceBuffer.getBufferCreatedTimestamp(): " + traceBuffer.getBufferCreatedTimestamp() + " vs. " + bufferTimeoutInNs
				// + " (bufferTimeoutInNs)");
				if (traceBuffer.getBufferCreatedTimestamp() <= bufferTimeoutInNs) {
					final TraceEventRecords record = traceBuffer.getTraceEventRecords();
					record.setCount(traceBuffer.getCount());
					send(record);
				}
				iterator.remove();
			}
		}
	}

	private void send(final TraceEventRecords record) {
		traceAggregationReceiver.tell(record, getSelf());
	}

	@Override
	public void postStop() throws Exception {
		LOGGER.info("stopped");
		onTerminating();
		super.postStop();
	}

	private void onTerminating() {
		synchronized (this.trace2buffer) { // BETTER hide and improve synchronization in the buffer
			for (final Entry<TraceEventRecords, TraceAggregationBuffer> entry : this.trace2buffer.entrySet()) {
				final TraceAggregationBuffer buffer = entry.getValue();
				final TraceEventRecords record = buffer.getTraceEventRecords();
				record.setCount(buffer.getCount());
				send(record);
			}
			this.trace2buffer.clear();
		}
	}

	public long getMaxCollectionDuration() {
		return this.maxCollectionDurationInNs;
	}

	public void setMaxCollectionDuration(final long maxCollectionDuration) {
		this.maxCollectionDurationInNs = maxCollectionDuration;
	}

}