Skip to content
Snippets Groups Projects
TraceReductionAnalysisConfiguration.java 3.76 KiB
Newer Older
package kiekerdays.tcptracereduction;

import java.util.Collection;
import java.util.LinkedList;
import java.util.NavigableMap;
import java.util.TreeMap;

import kieker.analysis.plugin.filter.flow.TraceEventRecords;
import kiekerdays.CounterActor;
import kiekerdays.tcpreader.TcpReaderActor;
import kiekerdays.tcpreconstruction.TcpTraceReconstructionActor;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import teetime.util.concurrent.hashmap.ConcurrentHashMapWithDefault;
import teetime.util.concurrent.hashmap.TraceBuffer;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import akka.actor.Props;

public class TraceReductionAnalysisConfiguration {

	private static final String START_MESSAGE = "start";

	private static final Logger LOGGER = LoggerFactory.getLogger(TraceReductionAnalysisConfiguration.class);

	private final ConcurrentHashMapWithDefault<Long, TraceBuffer> traceId2trace = new ConcurrentHashMapWithDefault<Long, TraceBuffer>(new TraceBuffer());
	private final NavigableMap<TraceEventRecords, TraceAggregationBuffer> trace2Buffer;

	private ActorRef tcpReaderActor;
	private Collection<Object> validTraces = new LinkedList<>();

	private ActorRef rootActor;

	public TraceReductionAnalysisConfiguration() {
		trace2Buffer = new TreeMap<TraceEventRecords, TraceAggregationBuffer>(new TraceComperator());
		init();
	}

	private void init() {
		ActorSystem system = ActorSystem.create("TraceReductionAnalysisConfiguration");

		// specify actors
		Props counterProps = Props.create(CounterActor.class);
		Props tcpTraceReductionProps = Props.create(TcpTraceReductionActor.class, trace2Buffer, counterProps);
		Props tcpTraceReconstructionProps = Props.create(TcpTraceReconstructionActor.class, traceId2trace, tcpTraceReductionProps, null);
		Props tcpReaderProps = Props.create(TcpReaderActor.class, tcpTraceReconstructionProps);

		// create actors
		final ActorRef tcpReaderActor = system.actorOf(tcpReaderProps, "TcpReaderActor");
		// final ActorRef tcpTraceReconstructonActor = system.actorOf(tcpTraceReconstructionProps, "TcpTraceReconstructionActor");
		// final ActorRef tcpTraceReductionActor = system.actorOf(tcpTraceReductionProps, "TcpTraceReductionActor");
		// final ActorRef validTraceReceiver = system.actorOf(collectorValidProps, "CollectorValid");
		// final ActorRef invalidTraceReceiver = system.actorOf(collectorInvalidProps, "CollectorInvalid");
		// final ActorRef tcpReaderActor = system.actorOf(Props.create(TcpReaderActor.class,
		// new Creator<TcpReaderActor>() {
		// @Override
		// public TcpReaderActor create() throws Exception {
		// return new TcpReaderActor(tcpTraceReconstructonActor);
		// }
		// }));

		// tcpReader = TypedActor.get(system).typedActorOf(new TypedProps<TcpReaderActor>(ITcpReader.class, new
		// Creator<TcpReaderActor>() {
		// @Override
		// public TcpReaderActor create() throws Exception {
		// return new TcpReaderActor(tcpTraceReconstructonActor);
		// }
		// }));

		this.tcpReaderActor = tcpReaderActor;

		ActorSelection rootActorSelection = system.actorSelection(tcpReaderActor.path().root());
		rootActor = rootActorSelection.anchor();

		LOGGER.info("Configuration initialized.");
	}

	public void start() throws Exception {
		LOGGER.info("Starting analysis...");
		// tcpReader.onStarting();
		// tcpReader.execute();
		// tcpReader.onTerminating();
		tcpReaderActor.tell(START_MESSAGE, ActorRef.noSender());
		LOGGER.info("Analysis started.");

		rootActor.tell(PoisonPill.getInstance(), rootActor);
		// tcpReaderActor.parent().tell(PoisonPill.getInstance(), getSelf());
		// Future<Terminated> future = system.terminate();
		// Await.ready(future, Duration.Inf());
		LOGGER.info("Analysis stopped.");
	}

	public Collection<Object> getValidTraces() {
		return validTraces;
	}

}