package kiekerdays.tcpreconstruction; import java.util.Collection; import java.util.LinkedList; import kiekerdays.CollectorActor; import kiekerdays.CounterActor; import kiekerdays.StartMessage; import kiekerdays.StopMessage; import kiekerdays.tcpreader.TcpReaderActor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import teetime.stage.trace.traceReconstruction.EventBasedTrace; import teetime.stage.trace.traceReconstruction.EventBasedTraceFactory; import teetime.util.concurrent.hashmap.ConcurrentHashMapWithDefault; import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.ActorSystem; import akka.actor.PoisonPill; import akka.actor.Props; public class TraceReconstructionAnalysisConfiguration { private static final Logger LOGGER = LoggerFactory.getLogger(TraceReconstructionAnalysisConfiguration.class); private final ConcurrentHashMapWithDefault<Long, EventBasedTrace> traceId2trace; private ActorRef tcpReaderActor; private final Collection<Object> validTraces = new LinkedList<>(); private ActorRef rootActor; public TraceReconstructionAnalysisConfiguration() { traceId2trace = new ConcurrentHashMapWithDefault<>(EventBasedTraceFactory.INSTANCE); init(); } private void init() { ActorSystem system = ActorSystem.create("TraceReductionAnalysisConfiguration"); // specify actors // Props collectorValidProps = Props.create(Collector.class, validTraces); // do not use a collector since it // pollutes the heap Props collectorValidProps = Props.create(CounterActor.class); Props collectorInvalidProps = Props.create(CollectorActor.class); Props tcpTraceReconstructionProps = Props.create(TcpTraceReconstructionActor.class, traceId2trace, collectorValidProps, collectorInvalidProps); Props tcpReaderProps = Props.create(TcpReaderActor.class, tcpTraceReconstructionProps); // create actors final ActorRef tcpReaderActor = system.actorOf(tcpReaderProps, "TcpReaderActor"); // final ActorRef tcpTraceReconstructonActor = system.actorOf(tcpTraceReconstructionProps, // "TcpTraceReconstructionActor"); // final ActorRef validTraceReceiver = system.actorOf(collectorValidProps, "CollectorValid"); // final ActorRef invalidTraceReceiver = system.actorOf(collectorInvalidProps, "CollectorInvalid"); // final ActorRef tcpReaderActor = system.actorOf(Props.create(TcpReaderActor.class, // new Creator<TcpReaderActor>() { // @Override // public TcpReaderActor create() throws Exception { // return new TcpReaderActor(tcpTraceReconstructonActor); // } // })); // tcpReader = TypedActor.get(system).typedActorOf(new TypedProps<TcpReaderActor>(ITcpReader.class, new // Creator<TcpReaderActor>() { // @Override // public TcpReaderActor create() throws Exception { // return new TcpReaderActor(tcpTraceReconstructonActor); // } // })); this.tcpReaderActor = tcpReaderActor; ActorSelection rootActorSelection = system.actorSelection(tcpReaderActor.path().root()); rootActor = rootActorSelection.anchor(); LOGGER.info("Configuration initialized."); } public void start() throws Exception { LOGGER.info("Starting analysis..."); // tcpReader.onStarting(); // tcpReader.execute(); // tcpReader.onTerminating(); tcpReaderActor.tell(new StartMessage(), ActorRef.noSender()); LOGGER.info("Analysis started."); tcpReaderActor.tell(new StopMessage(), ActorRef.noSender()); rootActor.tell(PoisonPill.getInstance(), rootActor); // tcpReaderActor.parent().tell(PoisonPill.getInstance(), getSelf()); // Future<Terminated> future = system.terminate(); // Await.ready(future, Duration.Inf()); LOGGER.info("Analysis stopped."); } public Collection<Object> getValidTraces() { return validTraces; } }