Skip to content
Snippets Groups Projects
Select Git revision
  • 536a86d0b6da20b85404d88142df5facf6740cce
  • master default
2 results

TraceReductionAnalysisConfiguration.java

Blame
  • user avatar
    Christian Wulf authored
    536a86d0
    History
    TraceReductionAnalysisConfiguration.java 4.15 KiB
    package kiekerdays.tcptracereduction;
    
    import java.util.Collection;
    import java.util.LinkedList;
    import java.util.NavigableMap;
    import java.util.TreeMap;
    
    import kiekerdays.CounterActor;
    import kiekerdays.StartMessage;
    import kiekerdays.StopMessage;
    import kiekerdays.tcpreader.TcpReaderActor;
    import kiekerdays.tcpreconstruction.TcpTraceReconstructionActor;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import teetime.stage.trace.traceReconstruction.EventBasedTrace;
    import teetime.stage.trace.traceReconstruction.EventBasedTraceFactory;
    import teetime.stage.trace.traceReduction.EventBasedTraceComperator;
    import teetime.stage.trace.traceReduction.TraceAggregationBuffer;
    import teetime.util.concurrent.hashmap.ConcurrentHashMapWithDefault;
    import akka.actor.ActorRef;
    import akka.actor.ActorSelection;
    import akka.actor.ActorSystem;
    import akka.actor.Props;
    
    public class TraceReductionAnalysisConfiguration {
    
    	private static final Logger LOGGER = LoggerFactory.getLogger(TraceReductionAnalysisConfiguration.class);
    
    	private final ConcurrentHashMapWithDefault<Long, EventBasedTrace> traceId2trace;
    	private final NavigableMap<EventBasedTrace, TraceAggregationBuffer> trace2Buffer;
    
    	private ActorRef tcpReaderActor;
    	private final Collection<Object> validTraces = new LinkedList<>();
    
    	private ActorRef rootActor;
    
    	public TraceReductionAnalysisConfiguration() {
    		traceId2trace = new ConcurrentHashMapWithDefault<Long, EventBasedTrace>(EventBasedTraceFactory.INSTANCE);
    		trace2Buffer = new TreeMap<EventBasedTrace, TraceAggregationBuffer>(new EventBasedTraceComperator());
    		init();
    	}
    
    	private void init() {
    		ActorSystem system = ActorSystem.create("TraceReductionAnalysisConfiguration");
    
    		// specify actors
    		Props counterProps = Props.create(CounterActor.class);
    		Props tcpTraceReductionProps = Props.create(TcpTraceReductionActor.class, trace2Buffer, counterProps);
    		Props tcpTraceReconstructionProps = Props.create(TcpTraceReconstructionActor.class, traceId2trace, tcpTraceReductionProps, Props.empty());
    		Props tcpReaderProps = Props.create(TcpReaderActor.class, tcpTraceReconstructionProps);
    
    		// create actors
    		final ActorRef tcpReaderActor = system.actorOf(tcpReaderProps, "TcpReaderActor");
    		// final ActorRef tcpTraceReconstructonActor = system.actorOf(tcpTraceReconstructionProps, "TcpTraceReconstructionActor");
    		// final ActorRef tcpTraceReductionActor = system.actorOf(tcpTraceReductionProps, "TcpTraceReductionActor");
    		// final ActorRef validTraceReceiver = system.actorOf(collectorValidProps, "CollectorValid");
    		// final ActorRef invalidTraceReceiver = system.actorOf(collectorInvalidProps, "CollectorInvalid");
    		// final ActorRef tcpReaderActor = system.actorOf(Props.create(TcpReaderActor.class,
    		// new Creator<TcpReaderActor>() {
    		// @Override
    		// public TcpReaderActor create() throws Exception {
    		// return new TcpReaderActor(tcpTraceReconstructonActor);
    		// }
    		// }));
    
    		// tcpReader = TypedActor.get(system).typedActorOf(new TypedProps<TcpReaderActor>(ITcpReader.class, new
    		// Creator<TcpReaderActor>() {
    		// @Override
    		// public TcpReaderActor create() throws Exception {
    		// return new TcpReaderActor(tcpTraceReconstructonActor);
    		// }
    		// }));
    
    		this.tcpReaderActor = tcpReaderActor;
    
    		ActorSelection rootActorSelection = system.actorSelection(tcpReaderActor.path().root());
    		rootActor = rootActorSelection.anchor();
    
    		LOGGER.info("Configuration initialized.");
    	}
    
    	public void start() throws Exception {
    		LOGGER.info("Starting analysis...");
    		// tcpReader.onStarting();
    		// tcpReader.execute();
    		// tcpReader.onTerminating();
    		tcpReaderActor.tell(new StartMessage(), ActorRef.noSender());
    		LOGGER.info("Analysis started.");
    
    		// tcpReaderActor.tell(new StopMessage(), ActorRef.noSender());
    		tcpReaderActor.tell(new StopMessage(), rootActor);
    
    		// tcpReaderActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
    		// rootActor.tell(PoisonPill.getInstance(), rootActor);
    		// tcpReaderActor.parent().tell(PoisonPill.getInstance(), getSelf());
    		// Future<Terminated> future = system.terminate();
    		// Await.ready(future, Duration.Inf());
    		LOGGER.info("Analysis stopped.");
    	}
    
    	public Collection<Object> getValidTraces() {
    		return validTraces;
    	}
    
    }