Skip to content
Snippets Groups Projects
TcpReaderActor.java 3.41 KiB
Newer Older
package experiment.fse15.tcpreader;
Christian Wulf's avatar
Christian Wulf committed

Christian Wulf's avatar
Christian Wulf committed
import java.nio.ByteBuffer;

Christian Wulf's avatar
Christian Wulf committed
import kieker.common.record.IMonitoringRecord;
Christian Wulf's avatar
Christian Wulf committed
import kieker.common.record.misc.RegistryRecord;
import kieker.common.util.registry.ILookup;
import kieker.common.util.registry.Lookup;
Christian Wulf's avatar
Christian Wulf committed

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Christian Wulf's avatar
Christian Wulf committed
import teetime.util.io.network.AbstractTcpReader;
import teetime.util.network.AbstractRecordTcpReader;
Christian Wulf's avatar
Christian Wulf committed
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.PoisonPill;
Christian Wulf's avatar
Christian Wulf committed
import akka.actor.Props;
import akka.actor.UntypedActor;
import experiment.fse15.StartMessage;
import experiment.fse15.StopMessage;
Christian Wulf's avatar
Christian Wulf committed
import experiment.fse15.WatchActor.WatchMe;
Christian Wulf's avatar
Christian Wulf committed

public class TcpReaderActor extends UntypedActor {

	private static final Logger LOGGER = LoggerFactory.getLogger(TcpReaderActor.class);

Christian Wulf's avatar
Christian Wulf committed
	private final ILookup<String> stringRegistry = new Lookup<String>();
	private final ActorRef tcpTraceReconstructionActor;
	private final AbstractRecordTcpReader tcpMonitoringRecordReader;
	private final AbstractTcpReader tcpStringRecordReader;
Christian Wulf's avatar
Christian Wulf committed

Christian Wulf's avatar
Christian Wulf committed
	private Thread tcpStringRecordReaderThread;
	private final int port1 = 10133;
	private final int port2 = 10134;
	private final int bufferCapacity = 65535;
Christian Wulf's avatar
Christian Wulf committed

Christian Wulf's avatar
Christian Wulf committed
	public TcpReaderActor(final Props tcpTraceReconstructionProps, ActorRef watchActor) {
Christian Wulf's avatar
Christian Wulf committed
		super();
Christian Wulf's avatar
Christian Wulf committed

Christian Wulf's avatar
Christian Wulf committed
		watchActor.tell(new WatchMe(getSelf()), getSelf());

Christian Wulf's avatar
Christian Wulf committed
		this.tcpTraceReconstructionActor = context().actorOf(tcpTraceReconstructionProps);
Christian Wulf's avatar
Christian Wulf committed

		this.tcpMonitoringRecordReader = new AbstractRecordTcpReader(port1, bufferCapacity, LOGGER, stringRegistry) {
			@Override
			protected void onRecordReceived(final IMonitoringRecord record) {
				tcpTraceReconstructionActor.tell(record, getSelf());
			}
		};

		this.tcpStringRecordReader = new AbstractTcpReader(port2, bufferCapacity, LOGGER) {
			@Override
			protected boolean onBufferReceived(final ByteBuffer buffer) {
				RegistryRecord.registerRecordInRegistry(buffer, stringRegistry);
				return true;
			}
		};
Christian Wulf's avatar
Christian Wulf committed
	}

	@Override
	public void onReceive(final Object message) throws Exception {
Christian Wulf's avatar
Christian Wulf committed
		LOGGER.debug("Message: " + message.getClass().getName());
		try {
			if (message instanceof StartMessage) {
Christian Wulf's avatar
Christian Wulf committed
				onStarting();
				tcpMonitoringRecordReader.run();
				self().tell(new StopMessage(), self());
			} else if (message instanceof StopMessage) {
				onTerminating();
				tcpTraceReconstructionActor.tell(message, getSelf());
Christian Wulf's avatar
Christian Wulf committed
				terminate();
			} else {
				unhandled(message);
			}
Christian Wulf's avatar
Christian Wulf committed
		} catch (Exception e) {
			LOGGER.error("Exception while executing TcpReader.", e);
		}
		// context().stop(getSelf());
		// getSelf().tell(PoisonPill.getInstance(), getSelf());
		// tcpTraceReconstructionActor.tell(PoisonPill.getInstance(), getSelf());
		// context().parent().tell(PoisonPill.getInstance(), getSelf());
		// context().system().terminate();
	}

	private void terminate() {
		ActorSelection rootActorSelection = context().system().actorSelection(self().path().root());
		ActorRef rootActor = rootActorSelection.anchor();
		rootActor.tell(PoisonPill.getInstance(), rootActor);
	}

Christian Wulf's avatar
Christian Wulf committed
	private void onStarting() {
		LOGGER.debug("Starting...");
		this.tcpStringRecordReaderThread = new Thread(tcpStringRecordReader);
		this.tcpStringRecordReaderThread.start();
	}

	private void onTerminating() {
		LOGGER.debug("Terminating...");
Christian Wulf's avatar
Christian Wulf committed
		tcpStringRecordReader.terminate();
		tcpStringRecordReaderThread.interrupt();
Christian Wulf's avatar
Christian Wulf committed
	@Override
	public void postStop() throws Exception {
		LOGGER.info("stopped");
		super.postStop();
	}

}