diff --git a/src/main/java/teetime/stage/trace/traceReduction/TraceReductionFilter.java b/src/main/java/teetime/stage/trace/traceReduction/TraceReductionFilter.java index d65b49dc9e29159a7cb771a53cc562266237718b..2848dff35d69018f9c8a75805f6d8d7112da4ef6 100644 --- a/src/main/java/teetime/stage/trace/traceReduction/TraceReductionFilter.java +++ b/src/main/java/teetime/stage/trace/traceReduction/TraceReductionFilter.java @@ -46,14 +46,14 @@ public class TraceReductionFilter extends AbstractConsumerStage<EventBasedTrace> private long maxCollectionDurationInNs; public TraceReductionFilter(final Map<EventBasedTrace, TraceAggregationBuffer> trace2buffer) { - this.reductor = new TraceReductor(trace2buffer); + this.reductor = new TraceReductor(trace2buffer, this); } @Override protected void execute(final EventBasedTrace eventBasedTrace) { Long timestampInNs = this.triggerInputPort.receive(); if (timestampInNs != null) { - reductor.processTimeoutQueue(timestampInNs, maxCollectionDurationInNs, this); + reductor.processTimeoutQueue(timestampInNs, maxCollectionDurationInNs); } reductor.countSameTraces(eventBasedTrace); @@ -61,7 +61,7 @@ public class TraceReductionFilter extends AbstractConsumerStage<EventBasedTrace> @Override public void onTerminating() throws Exception { - reductor.terminate(this); + reductor.terminate(); // BETTER re-use processTimeoutQueue here, e.g., as follows // triggerInputPort.getPipe().add(new Date().getTime()); diff --git a/src/main/java/teetime/util/TraceReductor.java b/src/main/java/teetime/util/TraceReductor.java index 8aaebbe61a423d180acedc1c937433c538c22ae8..48d93a21427168d476f78a601343b2c336b17582 100644 --- a/src/main/java/teetime/util/TraceReductor.java +++ b/src/main/java/teetime/util/TraceReductor.java @@ -16,9 +16,11 @@ import teetime.stage.trace.traceReduction.TraceAggregationBuffer; public class TraceReductor { private final Map<EventBasedTrace, TraceAggregationBuffer> trace2buffer; + private final ISendTraceAggregationBuffer sender; - public TraceReductor(final Map<EventBasedTrace, TraceAggregationBuffer> trace2buffer) { + public TraceReductor(final Map<EventBasedTrace, TraceAggregationBuffer> trace2buffer, final ISendTraceAggregationBuffer sender) { this.trace2buffer = trace2buffer; + this.sender = sender; } public void countSameTraces(final EventBasedTrace eventBasedTrace) { @@ -32,7 +34,7 @@ public class TraceReductor { } } - public void processTimeoutQueue(final long timestampInNs, final long maxCollectionDurationInNs, final ISendTraceAggregationBuffer sender) { + public void processTimeoutQueue(final long timestampInNs, final long maxCollectionDurationInNs) { final long bufferTimeoutInNs = timestampInNs - maxCollectionDurationInNs; synchronized (trace2buffer) { for (final Iterator<Entry<EventBasedTrace, TraceAggregationBuffer>> iterator = trace2buffer @@ -46,7 +48,7 @@ public class TraceReductor { } } - public void terminate(final ISendTraceAggregationBuffer sender) { + public void terminate() { synchronized (trace2buffer) { // BETTER hide and improve synchronization in the buffer for (final Entry<EventBasedTrace, TraceAggregationBuffer> entry : trace2buffer.entrySet()) { final TraceAggregationBuffer aggregatedTrace = entry.getValue();