package akka.io.net; import java.util.concurrent.TimeUnit; import kieker.common.record.flow.IFlowRecord; import kieker.common.record.flow.trace.AbstractTraceEvent; import kieker.common.record.flow.trace.TraceMetadata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import teetime.stage.trace.traceReconstruction.EventBasedTrace; import teetime.util.ISendTraceBuffer; import teetime.util.TraceReconstructor; import teetime.util.concurrent.hashmap.ConcurrentHashMapWithDefault; import akka.actor.ActorRef; import akka.actor.Props; import akka.actor.UntypedActor; import akka.common.StopMessage; import akka.common.WatchActor.WatchMe; public class TcpTraceReconstructionActor extends UntypedActor implements ISendTraceBuffer { private static final Logger LOGGER = LoggerFactory.getLogger(TcpTraceReconstructionActor.class); private TimeUnit timeunit; private long maxTraceDuration = Long.MAX_VALUE; private long maxTraceTimeout = Long.MAX_VALUE; private long maxEncounteredLoggingTimestamp = -1; private TraceReconstructor reconstructor; private final ActorRef validTraceReceiver; private final ActorRef invalidTraceReceiver; public TcpTraceReconstructionActor(final ConcurrentHashMapWithDefault<Long, EventBasedTrace> traceId2trace, final Props validTraceReceiverProps, final Props invalidTraceReceiverProps, ActorRef watchActor) { super(); this.reconstructor = new TraceReconstructor(traceId2trace); this.validTraceReceiver = context().actorOf(validTraceReceiverProps); this.invalidTraceReceiver = context().actorOf(invalidTraceReceiverProps); watchActor.tell(new WatchMe(getSelf()), getSelf()); } @Override public void onReceive(final Object message) throws Exception { if (message instanceof IFlowRecord) { reconstructor.execute((IFlowRecord) message, this); } else if (message instanceof StopMessage) { this.onTerminating(); validTraceReceiver.tell(message, getSelf()); invalidTraceReceiver.tell(message, getSelf()); } else { unhandled(message); } } @Override public void postStop() throws Exception { LOGGER.info("stopped"); super.postStop(); } private void onTerminating() { LOGGER.trace("traces left: " + reconstructor.getTraceId2trace().keySet()); reconstructor.terminate(this); } public void sendTraceBuffer(final EventBasedTrace traceBuffer) { ActorRef receiver = (traceBuffer.isInvalid()) ? invalidTraceReceiver : validTraceReceiver; receiver.tell(traceBuffer, getSelf()); } public TimeUnit getTimeunit() { return timeunit; } public void setTimeunit(final TimeUnit timeunit) { this.timeunit = timeunit; } public long getMaxTraceDuration() { return maxTraceDuration; } public void setMaxTraceDuration(final long maxTraceDuration) { this.maxTraceDuration = maxTraceDuration; } public long getMaxTraceTimeout() { return maxTraceTimeout; } public void setMaxTraceTimeout(final long maxTraceTimeout) { this.maxTraceTimeout = maxTraceTimeout; } public long getMaxEncounteredLoggingTimestamp() { return maxEncounteredLoggingTimestamp; } public void setMaxEncounteredLoggingTimestamp(final long maxEncounteredLoggingTimestamp) { this.maxEncounteredLoggingTimestamp = maxEncounteredLoggingTimestamp; } }