package kiekerdays.tcptracereduction; import java.util.Iterator; import java.util.Map.Entry; import java.util.NavigableMap; import kieker.analysis.plugin.filter.flow.TraceEventRecords; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import akka.actor.ActorRef; import akka.actor.Props; import akka.actor.UntypedActor; public class TcpTraceReductionActor extends UntypedActor { private static final Logger LOGGER = LoggerFactory.getLogger(TcpTraceReductionActor.class); private final NavigableMap<TraceEventRecords, TraceAggregationBuffer> trace2buffer; private ActorRef traceAggregationReceiver; private long maxCollectionDurationInNs; public TcpTraceReductionActor(final NavigableMap<TraceEventRecords, TraceAggregationBuffer> trace2buffer, Props traceAggregationProps) { this.trace2buffer = trace2buffer; // output "ports" this.traceAggregationReceiver = context().actorOf(traceAggregationProps); } @Override public void onReceive(Object message) throws Exception { if (message instanceof TraceEventRecords) { TraceEventRecords traceEventRecords = (TraceEventRecords) message; final long timestamp = System.nanoTime(); this.countSameTraces(traceEventRecords, timestamp); } else if (message instanceof Long) { Long timestampInNs = (Long) message; if (timestampInNs != null) { this.processTimeoutQueue(timestampInNs); } } else { unhandled(message); } } private void countSameTraces(final TraceEventRecords traceEventRecords, final long timestamp) { synchronized (this.trace2buffer) { TraceAggregationBuffer traceBuffer = this.trace2buffer.get(traceEventRecords); if (traceBuffer == null) { traceBuffer = new TraceAggregationBuffer(timestamp, traceEventRecords); this.trace2buffer.put(traceEventRecords, traceBuffer); } traceBuffer.count(); } } private void processTimeoutQueue(final long timestampInNs) { final long bufferTimeoutInNs = timestampInNs - this.maxCollectionDurationInNs; synchronized (this.trace2buffer) { for (final Iterator<Entry<TraceEventRecords, TraceAggregationBuffer>> iterator = this.trace2buffer.entrySet().iterator(); iterator.hasNext();) { final TraceAggregationBuffer traceBuffer = iterator.next().getValue(); // this.logger.debug("traceBuffer.getBufferCreatedTimestamp(): " + traceBuffer.getBufferCreatedTimestamp() + " vs. " + bufferTimeoutInNs // + " (bufferTimeoutInNs)"); if (traceBuffer.getBufferCreatedTimestamp() <= bufferTimeoutInNs) { final TraceEventRecords record = traceBuffer.getTraceEventRecords(); record.setCount(traceBuffer.getCount()); send(record); } iterator.remove(); } } } private void send(final TraceEventRecords record) { traceAggregationReceiver.tell(record, getSelf()); } @Override public void postStop() throws Exception { LOGGER.info("stopped"); onTerminating(); super.postStop(); } private void onTerminating() { synchronized (this.trace2buffer) { // BETTER hide and improve synchronization in the buffer for (final Entry<TraceEventRecords, TraceAggregationBuffer> entry : this.trace2buffer.entrySet()) { final TraceAggregationBuffer buffer = entry.getValue(); final TraceEventRecords record = buffer.getTraceEventRecords(); record.setCount(buffer.getCount()); send(record); } this.trace2buffer.clear(); } } public long getMaxCollectionDuration() { return this.maxCollectionDurationInNs; } public void setMaxCollectionDuration(final long maxCollectionDuration) { this.maxCollectionDurationInNs = maxCollectionDuration; } }