From a3ace7bbca716b87b273fdc8ad9234bca34e631f Mon Sep 17 00:00:00 2001 From: Christian Wulf <chw@informatik.uni-kiel.de> Date: Wed, 25 Feb 2015 09:17:21 +0100 Subject: [PATCH] moved ISendTraceAggregationBuffer to ctor --- .../stage/trace/traceReduction/TraceReductionFilter.java | 6 +++--- src/main/java/teetime/util/TraceReductor.java | 8 +++++--- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/src/main/java/teetime/stage/trace/traceReduction/TraceReductionFilter.java b/src/main/java/teetime/stage/trace/traceReduction/TraceReductionFilter.java index d65b49dc..2848dff3 100644 --- a/src/main/java/teetime/stage/trace/traceReduction/TraceReductionFilter.java +++ b/src/main/java/teetime/stage/trace/traceReduction/TraceReductionFilter.java @@ -46,14 +46,14 @@ public class TraceReductionFilter extends AbstractConsumerStage<EventBasedTrace> private long maxCollectionDurationInNs; public TraceReductionFilter(final Map<EventBasedTrace, TraceAggregationBuffer> trace2buffer) { - this.reductor = new TraceReductor(trace2buffer); + this.reductor = new TraceReductor(trace2buffer, this); } @Override protected void execute(final EventBasedTrace eventBasedTrace) { Long timestampInNs = this.triggerInputPort.receive(); if (timestampInNs != null) { - reductor.processTimeoutQueue(timestampInNs, maxCollectionDurationInNs, this); + reductor.processTimeoutQueue(timestampInNs, maxCollectionDurationInNs); } reductor.countSameTraces(eventBasedTrace); @@ -61,7 +61,7 @@ public class TraceReductionFilter extends AbstractConsumerStage<EventBasedTrace> @Override public void onTerminating() throws Exception { - reductor.terminate(this); + reductor.terminate(); // BETTER re-use processTimeoutQueue here, e.g., as follows // triggerInputPort.getPipe().add(new Date().getTime()); diff --git a/src/main/java/teetime/util/TraceReductor.java b/src/main/java/teetime/util/TraceReductor.java index 8aaebbe6..48d93a21 100644 --- a/src/main/java/teetime/util/TraceReductor.java +++ b/src/main/java/teetime/util/TraceReductor.java @@ -16,9 +16,11 @@ import teetime.stage.trace.traceReduction.TraceAggregationBuffer; public class TraceReductor { private final Map<EventBasedTrace, TraceAggregationBuffer> trace2buffer; + private final ISendTraceAggregationBuffer sender; - public TraceReductor(final Map<EventBasedTrace, TraceAggregationBuffer> trace2buffer) { + public TraceReductor(final Map<EventBasedTrace, TraceAggregationBuffer> trace2buffer, final ISendTraceAggregationBuffer sender) { this.trace2buffer = trace2buffer; + this.sender = sender; } public void countSameTraces(final EventBasedTrace eventBasedTrace) { @@ -32,7 +34,7 @@ public class TraceReductor { } } - public void processTimeoutQueue(final long timestampInNs, final long maxCollectionDurationInNs, final ISendTraceAggregationBuffer sender) { + public void processTimeoutQueue(final long timestampInNs, final long maxCollectionDurationInNs) { final long bufferTimeoutInNs = timestampInNs - maxCollectionDurationInNs; synchronized (trace2buffer) { for (final Iterator<Entry<EventBasedTrace, TraceAggregationBuffer>> iterator = trace2buffer @@ -46,7 +48,7 @@ public class TraceReductor { } } - public void terminate(final ISendTraceAggregationBuffer sender) { + public void terminate() { synchronized (trace2buffer) { // BETTER hide and improve synchronization in the buffer for (final Entry<EventBasedTrace, TraceAggregationBuffer> entry : trace2buffer.entrySet()) { final TraceAggregationBuffer aggregatedTrace = entry.getValue(); -- GitLab