From bd5acedc65ec96bffdc481f93ff74f3cffbd7193 Mon Sep 17 00:00:00 2001 From: Christian Wulf <chw@informatik.uni-kiel.de> Date: Wed, 25 Feb 2015 09:18:47 +0100 Subject: [PATCH] added log block --- src/main/java/akka/common/CounterActor.java | 4 ++++ .../io/net/TcpRecordReconstructionActor.java | 10 +++++++++- .../io/net/TcpTraceReconstructionActor.java | 15 ++++++++------- .../akka/io/net/TcpTraceReductionActor.java | 18 +++++++++--------- 4 files changed, 30 insertions(+), 17 deletions(-) diff --git a/src/main/java/akka/common/CounterActor.java b/src/main/java/akka/common/CounterActor.java index bc4d196..5084035 100644 --- a/src/main/java/akka/common/CounterActor.java +++ b/src/main/java/akka/common/CounterActor.java @@ -19,6 +19,10 @@ public class CounterActor extends UntypedActor { @Override public void onReceive(final Object message) throws Exception { + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("onReceive: " + message); + } + if (message instanceof StopMessage) { onTerminating(); } else { diff --git a/src/main/java/akka/io/net/TcpRecordReconstructionActor.java b/src/main/java/akka/io/net/TcpRecordReconstructionActor.java index 9ea8d4b..c597da6 100644 --- a/src/main/java/akka/io/net/TcpRecordReconstructionActor.java +++ b/src/main/java/akka/io/net/TcpRecordReconstructionActor.java @@ -61,11 +61,19 @@ public class TcpRecordReconstructionActor extends UntypedActor { @Override public void onReceive(final Object message) throws Exception { - LOGGER.debug("Message: " + message.getClass().getName()); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("onReceive: " + message.getClass()); + } + try { if (message instanceof StartMessage) { onStarting(); tcpMonitoringRecordReader.run(); + + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Reading from TCP finished."); + } + self().tell(new StopMessage(), self()); } else if (message instanceof StopMessage) { onTerminating(); diff --git a/src/main/java/akka/io/net/TcpTraceReconstructionActor.java b/src/main/java/akka/io/net/TcpTraceReconstructionActor.java index f1c3ef7..dd1490c 100644 --- a/src/main/java/akka/io/net/TcpTraceReconstructionActor.java +++ b/src/main/java/akka/io/net/TcpTraceReconstructionActor.java @@ -3,8 +3,6 @@ package akka.io.net; import java.util.concurrent.TimeUnit; import kieker.common.record.flow.IFlowRecord; -import kieker.common.record.flow.trace.AbstractTraceEvent; -import kieker.common.record.flow.trace.TraceMetadata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,7 +33,7 @@ public class TcpTraceReconstructionActor extends UntypedActor implements ISendTr public TcpTraceReconstructionActor(final ConcurrentHashMapWithDefault<Long, EventBasedTrace> traceId2trace, final Props validTraceReceiverProps, final Props invalidTraceReceiverProps, ActorRef watchActor) { super(); - this.reconstructor = new TraceReconstructor(traceId2trace); + this.reconstructor = new TraceReconstructor(traceId2trace, this); this.validTraceReceiver = context().actorOf(validTraceReceiverProps); this.invalidTraceReceiver = context().actorOf(invalidTraceReceiverProps); @@ -44,8 +42,12 @@ public class TcpTraceReconstructionActor extends UntypedActor implements ISendTr @Override public void onReceive(final Object message) throws Exception { + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("onReceive: " + message); + } + if (message instanceof IFlowRecord) { - reconstructor.execute((IFlowRecord) message, this); + reconstructor.execute((IFlowRecord) message); } else if (message instanceof StopMessage) { this.onTerminating(); validTraceReceiver.tell(message, getSelf()); @@ -55,8 +57,6 @@ public class TcpTraceReconstructionActor extends UntypedActor implements ISendTr } } - - @Override public void postStop() throws Exception { LOGGER.info("stopped"); @@ -65,9 +65,10 @@ public class TcpTraceReconstructionActor extends UntypedActor implements ISendTr private void onTerminating() { LOGGER.trace("traces left: " + reconstructor.getTraceId2trace().keySet()); - reconstructor.terminate(this); + reconstructor.terminate(); } + @Override public void sendTraceBuffer(final EventBasedTrace traceBuffer) { ActorRef receiver = (traceBuffer.isInvalid()) ? invalidTraceReceiver : validTraceReceiver; receiver.tell(traceBuffer, getSelf()); diff --git a/src/main/java/akka/io/net/TcpTraceReductionActor.java b/src/main/java/akka/io/net/TcpTraceReductionActor.java index fefc11d..80cdad1 100644 --- a/src/main/java/akka/io/net/TcpTraceReductionActor.java +++ b/src/main/java/akka/io/net/TcpTraceReductionActor.java @@ -26,7 +26,7 @@ public class TcpTraceReductionActor extends UntypedActor implements ISendTraceAg public TcpTraceReductionActor(final NavigableMap<EventBasedTrace, TraceAggregationBuffer> trace2buffer, final Props traceAggregationReceiverProps, ActorRef watchActor) { - this.reductor = new TraceReductor(trace2buffer); + this.reductor = new TraceReductor(trace2buffer, this); // output "ports" this.traceAggregationReceiver = context().actorOf(traceAggregationReceiverProps); @@ -35,15 +35,18 @@ public class TcpTraceReductionActor extends UntypedActor implements ISendTraceAg @Override public void onReceive(final Object message) throws Exception { + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("onReceive: " + message); + } + if (message instanceof EventBasedTrace) { EventBasedTrace eventBasedTrace = (EventBasedTrace) message; reductor.countSameTraces(eventBasedTrace); - - + } else if (message instanceof Long) { Long timestampInNs = (Long) message; if (timestampInNs != null) { - reductor.processTimeoutQueue(timestampInNs, maxCollectionDurationInNs, this); + reductor.processTimeoutQueue(timestampInNs, maxCollectionDurationInNs); } } else if (message instanceof StopMessage) { this.onTerminating(); @@ -53,10 +56,7 @@ public class TcpTraceReductionActor extends UntypedActor implements ISendTraceAg } } - - - - + @Override public void send(final TraceAggregationBuffer record) { traceAggregationReceiver.tell(record, getSelf()); } @@ -69,7 +69,7 @@ public class TcpTraceReductionActor extends UntypedActor implements ISendTraceAg private void onTerminating() { LOGGER.debug("Terminating..."); - reductor.terminate(this); + reductor.terminate(); } public long getMaxCollectionDuration() { -- GitLab