Skip to content
Snippets Groups Projects
Commit 84ac1a4d authored by Nelson Tavares de Sousa's avatar Nelson Tavares de Sousa
Browse files

moved fields into logic classes

parent 0aa8cbe7
No related branches found
No related tags found
No related merge requests found
...@@ -28,14 +28,14 @@ public class TcpTraceReconstructionActor extends UntypedActor implements ISendTr ...@@ -28,14 +28,14 @@ public class TcpTraceReconstructionActor extends UntypedActor implements ISendTr
private long maxTraceTimeout = Long.MAX_VALUE; private long maxTraceTimeout = Long.MAX_VALUE;
private long maxEncounteredLoggingTimestamp = -1; private long maxEncounteredLoggingTimestamp = -1;
private final ConcurrentHashMapWithDefault<Long, EventBasedTrace> traceId2trace; private TraceReconstructor reconstructor;
private final ActorRef validTraceReceiver; private final ActorRef validTraceReceiver;
private final ActorRef invalidTraceReceiver; private final ActorRef invalidTraceReceiver;
public TcpTraceReconstructionActor(final ConcurrentHashMapWithDefault<Long, EventBasedTrace> traceId2trace, public TcpTraceReconstructionActor(final ConcurrentHashMapWithDefault<Long, EventBasedTrace> traceId2trace,
final Props validTraceReceiverProps, final Props invalidTraceReceiverProps, ActorRef watchActor) { final Props validTraceReceiverProps, final Props invalidTraceReceiverProps, ActorRef watchActor) {
super(); super();
this.traceId2trace = traceId2trace; this.reconstructor = new TraceReconstructor(traceId2trace);
this.validTraceReceiver = context().actorOf(validTraceReceiverProps); this.validTraceReceiver = context().actorOf(validTraceReceiverProps);
this.invalidTraceReceiver = context().actorOf(invalidTraceReceiverProps); this.invalidTraceReceiver = context().actorOf(invalidTraceReceiverProps);
...@@ -45,7 +45,7 @@ public class TcpTraceReconstructionActor extends UntypedActor implements ISendTr ...@@ -45,7 +45,7 @@ public class TcpTraceReconstructionActor extends UntypedActor implements ISendTr
@Override @Override
public void onReceive(final Object message) throws Exception { public void onReceive(final Object message) throws Exception {
if (message instanceof IFlowRecord) { if (message instanceof IFlowRecord) {
TraceReconstructor.execute((IFlowRecord) message, traceId2trace, this); reconstructor.execute((IFlowRecord) message, this);
} else if (message instanceof StopMessage) { } else if (message instanceof StopMessage) {
this.onTerminating(); this.onTerminating();
validTraceReceiver.tell(message, getSelf()); validTraceReceiver.tell(message, getSelf());
...@@ -64,8 +64,8 @@ public class TcpTraceReconstructionActor extends UntypedActor implements ISendTr ...@@ -64,8 +64,8 @@ public class TcpTraceReconstructionActor extends UntypedActor implements ISendTr
} }
private void onTerminating() { private void onTerminating() {
LOGGER.trace("traces left: " + traceId2trace.keySet()); LOGGER.trace("traces left: " + reconstructor.getTraceId2trace().keySet());
TraceReconstructor.terminate(traceId2trace, this); reconstructor.terminate(this);
} }
public void sendTraceBuffer(final EventBasedTrace traceBuffer) { public void sendTraceBuffer(final EventBasedTrace traceBuffer) {
......
package akka.io.net; package akka.io.net;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.NavigableMap; import java.util.NavigableMap;
import org.slf4j.Logger; import org.slf4j.Logger;
...@@ -21,14 +19,14 @@ public class TcpTraceReductionActor extends UntypedActor implements ISendTraceAg ...@@ -21,14 +19,14 @@ public class TcpTraceReductionActor extends UntypedActor implements ISendTraceAg
private static final Logger LOGGER = LoggerFactory.getLogger(TcpTraceReductionActor.class); private static final Logger LOGGER = LoggerFactory.getLogger(TcpTraceReductionActor.class);
private final NavigableMap<EventBasedTrace, TraceAggregationBuffer> trace2buffer; private final TraceReductor reductor;
private final ActorRef traceAggregationReceiver; private final ActorRef traceAggregationReceiver;
private long maxCollectionDurationInNs; private long maxCollectionDurationInNs;
public TcpTraceReductionActor(final NavigableMap<EventBasedTrace, TraceAggregationBuffer> trace2buffer, public TcpTraceReductionActor(final NavigableMap<EventBasedTrace, TraceAggregationBuffer> trace2buffer,
final Props traceAggregationReceiverProps, ActorRef watchActor) { final Props traceAggregationReceiverProps, ActorRef watchActor) {
this.trace2buffer = trace2buffer; this.reductor = new TraceReductor(trace2buffer);
// output "ports" // output "ports"
this.traceAggregationReceiver = context().actorOf(traceAggregationReceiverProps); this.traceAggregationReceiver = context().actorOf(traceAggregationReceiverProps);
...@@ -39,13 +37,13 @@ public class TcpTraceReductionActor extends UntypedActor implements ISendTraceAg ...@@ -39,13 +37,13 @@ public class TcpTraceReductionActor extends UntypedActor implements ISendTraceAg
public void onReceive(final Object message) throws Exception { public void onReceive(final Object message) throws Exception {
if (message instanceof EventBasedTrace) { if (message instanceof EventBasedTrace) {
EventBasedTrace eventBasedTrace = (EventBasedTrace) message; EventBasedTrace eventBasedTrace = (EventBasedTrace) message;
TraceReductor.countSameTraces(eventBasedTrace, trace2buffer); reductor.countSameTraces(eventBasedTrace);
} else if (message instanceof Long) { } else if (message instanceof Long) {
Long timestampInNs = (Long) message; Long timestampInNs = (Long) message;
if (timestampInNs != null) { if (timestampInNs != null) {
TraceReductor.processTimeoutQueue(timestampInNs, maxCollectionDurationInNs, trace2buffer, this); reductor.processTimeoutQueue(timestampInNs, maxCollectionDurationInNs, this);
} }
} else if (message instanceof StopMessage) { } else if (message instanceof StopMessage) {
this.onTerminating(); this.onTerminating();
...@@ -71,7 +69,7 @@ public class TcpTraceReductionActor extends UntypedActor implements ISendTraceAg ...@@ -71,7 +69,7 @@ public class TcpTraceReductionActor extends UntypedActor implements ISendTraceAg
private void onTerminating() { private void onTerminating() {
LOGGER.debug("Terminating..."); LOGGER.debug("Terminating...");
TraceReductor.terminate(trace2buffer, this); reductor.terminate(this);
} }
public long getMaxCollectionDuration() { public long getMaxCollectionDuration() {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment