Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package kiekerdays.tcpreconstruction;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import kiekerdays.Collector;
import kiekerdays.tcpreader.TcpReaderActor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import teetime.util.concurrent.hashmap.ConcurrentHashMapWithDefault;
import teetime.util.concurrent.hashmap.TraceBuffer;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import akka.actor.Props;
public class TraceReconstructionAnalysisConfiguration {
private static final String START_MESSAGE = "start";
private static final Logger LOGGER = LoggerFactory.getLogger(TraceReconstructionAnalysisConfiguration.class);
private final ConcurrentHashMapWithDefault<Long, TraceBuffer> traceId2trace = new ConcurrentHashMapWithDefault<Long, TraceBuffer>(new TraceBuffer());
private ActorRef tcpReaderActor;
private Collection<Object> validTraces = new LinkedList<>();
private ActorRef rootActor;
public TraceReconstructionAnalysisConfiguration() {
init();
}
private void init() {
ActorSystem system = ActorSystem.create("TraceReductionAnalysisConfiguration");
// specify actors
Props collectorValidProps = Props.create(Collector.class, validTraces);
Props collectorInvalidProps = Props.create(Collector.class, Collections.emptyList());
Props tcpTraceReconstructionProps = Props.create(TcpTraceReconstructionActor.class, traceId2trace, collectorValidProps, collectorInvalidProps);
Props tcpReaderProps = Props.create(TcpReaderActor.class, tcpTraceReconstructionProps);
// create actors
final ActorRef tcpReaderActor = system.actorOf(tcpReaderProps, "TcpReaderActor");
// final ActorRef tcpTraceReconstructonActor = system.actorOf(tcpTraceReconstructionProps, "TcpTraceReconstructionActor");
// final ActorRef validTraceReceiver = system.actorOf(collectorValidProps, "CollectorValid");
// final ActorRef invalidTraceReceiver = system.actorOf(collectorInvalidProps, "CollectorInvalid");
// final ActorRef tcpReaderActor = system.actorOf(Props.create(TcpReaderActor.class,
// new Creator<TcpReaderActor>() {
// @Override
// public TcpReaderActor create() throws Exception {
// return new TcpReaderActor(tcpTraceReconstructonActor);
// }
// }));
// tcpReader = TypedActor.get(system).typedActorOf(new TypedProps<TcpReaderActor>(ITcpReader.class, new
// Creator<TcpReaderActor>() {
// @Override
// public TcpReaderActor create() throws Exception {
// return new TcpReaderActor(tcpTraceReconstructonActor);
// }
// }));
this.tcpReaderActor = tcpReaderActor;
ActorSelection rootActorSelection = system.actorSelection(tcpReaderActor.path().root());
rootActor = rootActorSelection.anchor();
LOGGER.info("Configuration initialized.");
}
public void start() throws Exception {
LOGGER.info("Starting analysis...");
// tcpReader.onStarting();
// tcpReader.execute();
// tcpReader.onTerminating();
tcpReaderActor.tell(START_MESSAGE, ActorRef.noSender());
LOGGER.info("Analysis started.");
rootActor.tell(PoisonPill.getInstance(), rootActor);
// tcpReaderActor.parent().tell(PoisonPill.getInstance(), getSelf());
// Future<Terminated> future = system.terminate();
// Await.ready(future, Duration.Inf());
LOGGER.info("Analysis stopped.");
}
public Collection<Object> getValidTraces() {
return validTraces;
}
}