Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
package kiekerdays.tcptracereduction;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.NavigableMap;
import kieker.analysis.plugin.filter.flow.TraceEventRecords;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.UntypedActor;
public class TcpTraceReductionActor extends UntypedActor {
private static final Logger LOGGER = LoggerFactory.getLogger(TcpTraceReductionActor.class);
private final NavigableMap<TraceEventRecords, TraceAggregationBuffer> trace2buffer;
private ActorRef traceAggregationReceiver;
private long maxCollectionDurationInNs;
public TcpTraceReductionActor(final NavigableMap<TraceEventRecords, TraceAggregationBuffer> trace2buffer, Props traceAggregationProps) {
this.trace2buffer = trace2buffer;
// output "ports"
this.traceAggregationReceiver = context().actorOf(traceAggregationProps);
}
@Override
public void onReceive(Object message) throws Exception {
if (message instanceof TraceEventRecords) {
TraceEventRecords traceEventRecords = (TraceEventRecords) message;
final long timestamp = System.nanoTime();
this.countSameTraces(traceEventRecords, timestamp);
} else if (message instanceof Long) {
Long timestampInNs = (Long) message;
if (timestampInNs != null) {
this.processTimeoutQueue(timestampInNs);
}
} else {
unhandled(message);
}
}
private void countSameTraces(final TraceEventRecords traceEventRecords, final long timestamp) {
synchronized (this.trace2buffer) {
TraceAggregationBuffer traceBuffer = this.trace2buffer.get(traceEventRecords);
if (traceBuffer == null) {
traceBuffer = new TraceAggregationBuffer(timestamp, traceEventRecords);
this.trace2buffer.put(traceEventRecords, traceBuffer);
}
traceBuffer.count();
}
}
private void processTimeoutQueue(final long timestampInNs) {
final long bufferTimeoutInNs = timestampInNs - this.maxCollectionDurationInNs;
synchronized (this.trace2buffer) {
for (final Iterator<Entry<TraceEventRecords, TraceAggregationBuffer>> iterator = this.trace2buffer.entrySet().iterator(); iterator.hasNext();) {
final TraceAggregationBuffer traceBuffer = iterator.next().getValue();
// this.logger.debug("traceBuffer.getBufferCreatedTimestamp(): " + traceBuffer.getBufferCreatedTimestamp() + " vs. " + bufferTimeoutInNs
// + " (bufferTimeoutInNs)");
if (traceBuffer.getBufferCreatedTimestamp() <= bufferTimeoutInNs) {
final TraceEventRecords record = traceBuffer.getTraceEventRecords();
record.setCount(traceBuffer.getCount());
send(record);
}
iterator.remove();
}
}
}
private void send(final TraceEventRecords record) {
traceAggregationReceiver.tell(record, getSelf());
}
@Override
public void postStop() throws Exception {
LOGGER.info("stopped");
onTerminating();
super.postStop();
}
private void onTerminating() {
synchronized (this.trace2buffer) { // BETTER hide and improve synchronization in the buffer
for (final Entry<TraceEventRecords, TraceAggregationBuffer> entry : this.trace2buffer.entrySet()) {
final TraceAggregationBuffer buffer = entry.getValue();
final TraceEventRecords record = buffer.getTraceEventRecords();
record.setCount(buffer.getCount());
send(record);
}
this.trace2buffer.clear();
}
}
public long getMaxCollectionDuration() {
return this.maxCollectionDurationInNs;
}
public void setMaxCollectionDuration(final long maxCollectionDuration) {
this.maxCollectionDurationInNs = maxCollectionDuration;
}
}