Newer
Older
import kieker.common.record.misc.RegistryRecord;
import kieker.common.util.registry.ILookup;
import kieker.common.util.registry.Lookup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import teetime.util.io.network.AbstractTcpReader;
import teetime.util.network.AbstractRecordTcpReader;
import akka.actor.Props;
import akka.actor.UntypedActor;
import experiment.fse15.StartMessage;
import experiment.fse15.StopMessage;
public class TcpReaderActor extends UntypedActor {
private static final Logger LOGGER = LoggerFactory.getLogger(TcpReaderActor.class);
private final ILookup<String> stringRegistry = new Lookup<String>();
private final ActorRef tcpTraceReconstructionActor;
private final AbstractRecordTcpReader tcpMonitoringRecordReader;
private final AbstractTcpReader tcpStringRecordReader;
private final int port1 = 10133;
private final int port2 = 10134;
private final int bufferCapacity = 65535;
public TcpReaderActor(final Props tcpTraceReconstructionProps, ActorRef watchActor) {
watchActor.tell(new WatchMe(getSelf()), getSelf());
this.tcpTraceReconstructionActor = context().actorOf(tcpTraceReconstructionProps);
this.tcpMonitoringRecordReader = new AbstractRecordTcpReader(port1, bufferCapacity, LOGGER, stringRegistry) {
@Override
protected void onRecordReceived(final IMonitoringRecord record) {
tcpTraceReconstructionActor.tell(record, getSelf());
}
};
this.tcpStringRecordReader = new AbstractTcpReader(port2, bufferCapacity, LOGGER) {
@Override
protected boolean onBufferReceived(final ByteBuffer buffer) {
RegistryRecord.registerRecordInRegistry(buffer, stringRegistry);
return true;
}
};
public void onReceive(final Object message) throws Exception {
LOGGER.debug("Message: " + message.getClass().getName());
try {
} else if (message instanceof StopMessage) {
tcpTraceReconstructionActor.tell(message, getSelf());
} catch (Exception e) {
LOGGER.error("Exception while executing TcpReader.", e);
}
// context().stop(getSelf());
// getSelf().tell(PoisonPill.getInstance(), getSelf());
// tcpTraceReconstructionActor.tell(PoisonPill.getInstance(), getSelf());
// context().parent().tell(PoisonPill.getInstance(), getSelf());
// context().system().terminate();
}
private void terminate() {
ActorSelection rootActorSelection = context().system().actorSelection(self().path().root());
ActorRef rootActor = rootActorSelection.anchor();
rootActor.tell(PoisonPill.getInstance(), rootActor);
}
private void onStarting() {
LOGGER.debug("Starting...");
this.tcpStringRecordReaderThread = new Thread(tcpStringRecordReader);
this.tcpStringRecordReaderThread.start();
}
private void onTerminating() {
LOGGER.debug("Terminating...");
tcpStringRecordReader.terminate();
tcpStringRecordReaderThread.interrupt();
@Override
public void postStop() throws Exception {
LOGGER.info("stopped");
super.postStop();
}
}