Skip to content
Snippets Groups Projects
Commit bd5acedc authored by Christian Wulf's avatar Christian Wulf
Browse files

added log block

parent 84ac1a4d
No related branches found
No related tags found
No related merge requests found
...@@ -19,6 +19,10 @@ public class CounterActor extends UntypedActor { ...@@ -19,6 +19,10 @@ public class CounterActor extends UntypedActor {
@Override @Override
public void onReceive(final Object message) throws Exception { public void onReceive(final Object message) throws Exception {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("onReceive: " + message);
}
if (message instanceof StopMessage) { if (message instanceof StopMessage) {
onTerminating(); onTerminating();
} else { } else {
......
...@@ -61,11 +61,19 @@ public class TcpRecordReconstructionActor extends UntypedActor { ...@@ -61,11 +61,19 @@ public class TcpRecordReconstructionActor extends UntypedActor {
@Override @Override
public void onReceive(final Object message) throws Exception { public void onReceive(final Object message) throws Exception {
LOGGER.debug("Message: " + message.getClass().getName()); if (LOGGER.isDebugEnabled()) {
LOGGER.debug("onReceive: " + message.getClass());
}
try { try {
if (message instanceof StartMessage) { if (message instanceof StartMessage) {
onStarting(); onStarting();
tcpMonitoringRecordReader.run(); tcpMonitoringRecordReader.run();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Reading from TCP finished.");
}
self().tell(new StopMessage(), self()); self().tell(new StopMessage(), self());
} else if (message instanceof StopMessage) { } else if (message instanceof StopMessage) {
onTerminating(); onTerminating();
......
...@@ -3,8 +3,6 @@ package akka.io.net; ...@@ -3,8 +3,6 @@ package akka.io.net;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import kieker.common.record.flow.IFlowRecord; import kieker.common.record.flow.IFlowRecord;
import kieker.common.record.flow.trace.AbstractTraceEvent;
import kieker.common.record.flow.trace.TraceMetadata;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -35,7 +33,7 @@ public class TcpTraceReconstructionActor extends UntypedActor implements ISendTr ...@@ -35,7 +33,7 @@ public class TcpTraceReconstructionActor extends UntypedActor implements ISendTr
public TcpTraceReconstructionActor(final ConcurrentHashMapWithDefault<Long, EventBasedTrace> traceId2trace, public TcpTraceReconstructionActor(final ConcurrentHashMapWithDefault<Long, EventBasedTrace> traceId2trace,
final Props validTraceReceiverProps, final Props invalidTraceReceiverProps, ActorRef watchActor) { final Props validTraceReceiverProps, final Props invalidTraceReceiverProps, ActorRef watchActor) {
super(); super();
this.reconstructor = new TraceReconstructor(traceId2trace); this.reconstructor = new TraceReconstructor(traceId2trace, this);
this.validTraceReceiver = context().actorOf(validTraceReceiverProps); this.validTraceReceiver = context().actorOf(validTraceReceiverProps);
this.invalidTraceReceiver = context().actorOf(invalidTraceReceiverProps); this.invalidTraceReceiver = context().actorOf(invalidTraceReceiverProps);
...@@ -44,8 +42,12 @@ public class TcpTraceReconstructionActor extends UntypedActor implements ISendTr ...@@ -44,8 +42,12 @@ public class TcpTraceReconstructionActor extends UntypedActor implements ISendTr
@Override @Override
public void onReceive(final Object message) throws Exception { public void onReceive(final Object message) throws Exception {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("onReceive: " + message);
}
if (message instanceof IFlowRecord) { if (message instanceof IFlowRecord) {
reconstructor.execute((IFlowRecord) message, this); reconstructor.execute((IFlowRecord) message);
} else if (message instanceof StopMessage) { } else if (message instanceof StopMessage) {
this.onTerminating(); this.onTerminating();
validTraceReceiver.tell(message, getSelf()); validTraceReceiver.tell(message, getSelf());
...@@ -55,8 +57,6 @@ public class TcpTraceReconstructionActor extends UntypedActor implements ISendTr ...@@ -55,8 +57,6 @@ public class TcpTraceReconstructionActor extends UntypedActor implements ISendTr
} }
} }
@Override @Override
public void postStop() throws Exception { public void postStop() throws Exception {
LOGGER.info("stopped"); LOGGER.info("stopped");
...@@ -65,9 +65,10 @@ public class TcpTraceReconstructionActor extends UntypedActor implements ISendTr ...@@ -65,9 +65,10 @@ public class TcpTraceReconstructionActor extends UntypedActor implements ISendTr
private void onTerminating() { private void onTerminating() {
LOGGER.trace("traces left: " + reconstructor.getTraceId2trace().keySet()); LOGGER.trace("traces left: " + reconstructor.getTraceId2trace().keySet());
reconstructor.terminate(this); reconstructor.terminate();
} }
@Override
public void sendTraceBuffer(final EventBasedTrace traceBuffer) { public void sendTraceBuffer(final EventBasedTrace traceBuffer) {
ActorRef receiver = (traceBuffer.isInvalid()) ? invalidTraceReceiver : validTraceReceiver; ActorRef receiver = (traceBuffer.isInvalid()) ? invalidTraceReceiver : validTraceReceiver;
receiver.tell(traceBuffer, getSelf()); receiver.tell(traceBuffer, getSelf());
......
...@@ -26,7 +26,7 @@ public class TcpTraceReductionActor extends UntypedActor implements ISendTraceAg ...@@ -26,7 +26,7 @@ public class TcpTraceReductionActor extends UntypedActor implements ISendTraceAg
public TcpTraceReductionActor(final NavigableMap<EventBasedTrace, TraceAggregationBuffer> trace2buffer, public TcpTraceReductionActor(final NavigableMap<EventBasedTrace, TraceAggregationBuffer> trace2buffer,
final Props traceAggregationReceiverProps, ActorRef watchActor) { final Props traceAggregationReceiverProps, ActorRef watchActor) {
this.reductor = new TraceReductor(trace2buffer); this.reductor = new TraceReductor(trace2buffer, this);
// output "ports" // output "ports"
this.traceAggregationReceiver = context().actorOf(traceAggregationReceiverProps); this.traceAggregationReceiver = context().actorOf(traceAggregationReceiverProps);
...@@ -35,15 +35,18 @@ public class TcpTraceReductionActor extends UntypedActor implements ISendTraceAg ...@@ -35,15 +35,18 @@ public class TcpTraceReductionActor extends UntypedActor implements ISendTraceAg
@Override @Override
public void onReceive(final Object message) throws Exception { public void onReceive(final Object message) throws Exception {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("onReceive: " + message);
}
if (message instanceof EventBasedTrace) { if (message instanceof EventBasedTrace) {
EventBasedTrace eventBasedTrace = (EventBasedTrace) message; EventBasedTrace eventBasedTrace = (EventBasedTrace) message;
reductor.countSameTraces(eventBasedTrace); reductor.countSameTraces(eventBasedTrace);
} else if (message instanceof Long) { } else if (message instanceof Long) {
Long timestampInNs = (Long) message; Long timestampInNs = (Long) message;
if (timestampInNs != null) { if (timestampInNs != null) {
reductor.processTimeoutQueue(timestampInNs, maxCollectionDurationInNs, this); reductor.processTimeoutQueue(timestampInNs, maxCollectionDurationInNs);
} }
} else if (message instanceof StopMessage) { } else if (message instanceof StopMessage) {
this.onTerminating(); this.onTerminating();
...@@ -53,10 +56,7 @@ public class TcpTraceReductionActor extends UntypedActor implements ISendTraceAg ...@@ -53,10 +56,7 @@ public class TcpTraceReductionActor extends UntypedActor implements ISendTraceAg
} }
} }
@Override
public void send(final TraceAggregationBuffer record) { public void send(final TraceAggregationBuffer record) {
traceAggregationReceiver.tell(record, getSelf()); traceAggregationReceiver.tell(record, getSelf());
} }
...@@ -69,7 +69,7 @@ public class TcpTraceReductionActor extends UntypedActor implements ISendTraceAg ...@@ -69,7 +69,7 @@ public class TcpTraceReductionActor extends UntypedActor implements ISendTraceAg
private void onTerminating() { private void onTerminating() {
LOGGER.debug("Terminating..."); LOGGER.debug("Terminating...");
reductor.terminate(this); reductor.terminate();
} }
public long getMaxCollectionDuration() { public long getMaxCollectionDuration() {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment