diff --git a/src/main/java/teetime/util/concurrent/hashmap/ConcurrentHashMapWithDefault.java b/src/main/java/teetime/util/concurrent/hashmap/ConcurrentHashMapWithDefault.java index 6ddaefc5e2fdb53f46770d510828ee9657a9e7cf..913b0724253d00b9b4f93d61c3fa061f65c1ce85 100644 --- a/src/main/java/teetime/util/concurrent/hashmap/ConcurrentHashMapWithDefault.java +++ b/src/main/java/teetime/util/concurrent/hashmap/ConcurrentHashMapWithDefault.java @@ -1,57 +1,35 @@ -/*************************************************************************** - * Copyright 2014 Kieker Project (http://kieker-monitoring.net) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - ***************************************************************************/ package teetime.util.concurrent.hashmap; import java.util.concurrent.ConcurrentHashMap; -/** - * @author Christian Wulf - * - * @since 1.10 - */ public class ConcurrentHashMapWithDefault<K, V> extends ConcurrentHashMap<K, V> { - private static final long serialVersionUID = -7958038532219740472L; + private static final long serialVersionUID = 199185976241037967L; private final ValueFactory<V> valueFactory; - /** - * @since 1.10 - */ + private int maxElements; + public ConcurrentHashMapWithDefault(final ValueFactory<V> valueFactory) { this.valueFactory = valueFactory; } - /** - * @return the corresponding value if the key exists. Otherwise, it creates, - * inserts, and returns a new default value. - */ - @SuppressWarnings("unchecked") - @Override - public V get(final Object key) { - V value = super.get(key); + public V getOrCreate(final K key) { + V value = this.get(key); if (value == null) { synchronized (this) { - value = super.get(key); + value = this.get(key); if (value == null) { // NOCS (DCL) value = this.valueFactory.create(); - super.put((K) key, value); + this.put(key, value); + this.maxElements++; } } } return value; } + + public int getMaxElements() { + return this.maxElements; + } } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java index 339158832b73f3e86f8ac480c083c41798c96cee..49a4cbde7e21603e4db51350061db98bf2307d36 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java @@ -98,6 +98,7 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> { this.reschedulable = reschedulable; } + @Override public String getId() { return this.id; } @@ -107,7 +108,7 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> { */ @Override public void onSignal(final Signal signal, final InputPort<?> inputPort) { - this.logger.info("Got signal: " + signal + " from input port: " + inputPort); + this.logger.debug("Got signal: " + signal + " from input port: " + inputPort); switch (signal) { case FINISHED: diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Pipeline.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Pipeline.java index 02fdaa01254a3d4e2f874329731e43e8f50df5c2..4e85387bd9b3ae724f94768ee5a6eef5d6563f6d 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Pipeline.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Pipeline.java @@ -33,10 +33,15 @@ public class Pipeline<I, O> implements StageWithPort<I, O> { // private final Set<StageWithPort<?, ?>> currentHeads = new HashSet<StageWithPort<?, ?>>(); public Pipeline() { - this.id = UUID.randomUUID().toString(); // the id should only be represented by a UUID, not additionally by the class name + this(UUID.randomUUID().toString()); + } + + public Pipeline(final String id) { + this.id = id; // the id should only be represented by a UUID, not additionally by the class name this.logger = LogFactory.getLog(this.id); } + @Override public String getId() { return this.id; } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/RunnableStage.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/RunnableStage.java index cb1e9329ef0c8c633e037e77d4c67709598cd723..2b0c752e9f6c0004b6a36eeb7967052515b26b34 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/RunnableStage.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/RunnableStage.java @@ -30,5 +30,7 @@ public class RunnableStage<I> implements Runnable { this.logger.error("Terminating thread due to the following exception: ", e); throw e; } + + this.logger.debug("Finished runnable stage. (" + this.stage.getId() + ")"); } } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/StageWithPort.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/StageWithPort.java index 84bc4309cfd5c7e8a81ce75802cce903f51c75b6..8c993238f87c32f1ae6622366ca1f9cd7bded17d 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/StageWithPort.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/StageWithPort.java @@ -2,6 +2,8 @@ package teetime.variant.methodcallWithPorts.framework.core; public interface StageWithPort<I, O> { + String getId(); + InputPort<I> getInputPort(); OutputPort<O> getOutputPort(); diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/CountingFilter.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/Counter.java similarity index 88% rename from src/main/java/teetime/variant/methodcallWithPorts/stage/CountingFilter.java rename to src/main/java/teetime/variant/methodcallWithPorts/stage/Counter.java index b1fb1b2677b2b87c2868bf51f8dbf172f132db33..51831541aeff5b13ac2383e9b37c0d20319def97 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/CountingFilter.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/Counter.java @@ -2,7 +2,7 @@ package teetime.variant.methodcallWithPorts.stage; import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage; -public class CountingFilter<T> extends ConsumerStage<T, T> { +public class Counter<T> extends ConsumerStage<T, T> { private int numElementsPassed; diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/EndStage.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/EndStage.java index 62a092f5d569b48a8c14b86a285db56b113415d9..e31ef23e06d38664c08c3be3624bab3708a5d290 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/EndStage.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/EndStage.java @@ -2,6 +2,7 @@ package teetime.variant.methodcallWithPorts.stage; import java.util.LinkedList; import java.util.List; +import java.util.UUID; import teetime.util.ConstructorClosure; import teetime.variant.methodcallWithPorts.framework.core.InputPort; @@ -17,6 +18,12 @@ public class EndStage<T> implements StageWithPort<T, T> { public ConstructorClosure<?> closure; public List<Object> list = new LinkedList<Object>(); + private final String id; + + public EndStage() { + this.id = UUID.randomUUID().toString(); // the id should only be represented by a UUID, not additionally by the class name + } + @Override public void onIsPipelineHead() { // do nothing @@ -70,4 +77,9 @@ public class EndStage<T> implements StageWithPort<T, T> { // do nothing } + @Override + public String getId() { + return this.id; + } + } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/InstanceCounter.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/InstanceCounter.java new file mode 100644 index 0000000000000000000000000000000000000000..0da3a138481376c5ede170d9d16eb7038e57b74c --- /dev/null +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/InstanceCounter.java @@ -0,0 +1,27 @@ +package teetime.variant.methodcallWithPorts.stage; + +import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage; + +public class InstanceCounter<T, C extends T> extends ConsumerStage<T, T> { + + private final Class<C> type; + private int counter; + + public InstanceCounter(final Class<C> type) { + this.type = type; + } + + @Override + protected void execute5(final T element) { + if (this.type.isInstance(element)) { + this.counter++; + } + + this.send(element); + } + + public int getCounter() { + return this.counter; + } + +} diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/InstanceOfFilter.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/InstanceOfFilter.java index a205d20b6232866c5eaacd6f0c607227e5f33841..bf3ca12bb74b3f329c79dad6e8576e8f4719e063 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/InstanceOfFilter.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/InstanceOfFilter.java @@ -5,15 +5,11 @@ import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage; /** * @author Jan Waller, Nils Christian Ehmke, Christian Wulf * - * @since 1.10 */ public class InstanceOfFilter<I, O> extends ConsumerStage<I, O> { private Class<O> type; - /** - * @since 1.10 - */ public InstanceOfFilter(final Class<O> type) { this.type = type; } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/traceReconstruction/TraceReconstructionFilter.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/traceReconstruction/TraceReconstructionFilter.java index dcc893c4fe0caca1e0b930f02a449ce970b081c0..cbd26440a555efbdaa9e1c6263a3e898397135ff 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/traceReconstruction/TraceReconstructionFilter.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/traceReconstruction/TraceReconstructionFilter.java @@ -16,9 +16,9 @@ package teetime.variant.methodcallWithPorts.stage.kieker.traceReconstruction; import java.util.Iterator; -import java.util.Map; import java.util.concurrent.TimeUnit; +import teetime.util.concurrent.hashmap.ConcurrentHashMapWithDefault; import teetime.util.concurrent.hashmap.TraceBuffer; import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage; @@ -39,30 +39,27 @@ public class TraceReconstructionFilter extends ConsumerStage<IFlowRecord, TraceE private long maxTraceTimeout = Long.MAX_VALUE; private long maxEncounteredLoggingTimestamp = -1; - private final Map<Long, TraceBuffer> traceId2trace; + private final ConcurrentHashMapWithDefault<Long, TraceBuffer> traceId2trace; - public TraceReconstructionFilter(final Map<Long, TraceBuffer> traceId2trace) { + public TraceReconstructionFilter(final ConcurrentHashMapWithDefault<Long, TraceBuffer> traceId2trace) { super(); this.traceId2trace = traceId2trace; } @Override protected void execute5(final IFlowRecord element) { - // synchronized (this.traceId2trace) {// TODO remove if everything works final Long traceId = this.reconstructTrace(element); if (traceId != null) { this.putIfFinished(traceId); } - // } } private void putIfFinished(final Long traceId) { final TraceBuffer traceBuffer = this.traceId2trace.get(traceId); - if (traceBuffer.isFinished()) { - synchronized (this.traceId2trace) { - if (null != this.traceId2trace.remove(traceId)) { - this.put(traceBuffer); - } + if (traceBuffer != null && traceBuffer.isFinished()) { // null-check to check whether the trace has already been sent and removed + boolean removed = null != this.traceId2trace.remove(traceId); + if (removed) { + this.put(traceBuffer); } } } @@ -71,12 +68,12 @@ public class TraceReconstructionFilter extends ConsumerStage<IFlowRecord, TraceE Long traceId = null; if (record instanceof TraceMetadata) { traceId = ((TraceMetadata) record).getTraceId(); - TraceBuffer traceBuffer = this.traceId2trace.get(traceId); + TraceBuffer traceBuffer = this.traceId2trace.getOrCreate(traceId); traceBuffer.setTrace((TraceMetadata) record); } else if (record instanceof AbstractTraceEvent) { traceId = ((AbstractTraceEvent) record).getTraceId(); - TraceBuffer traceBuffer = this.traceId2trace.get(traceId); + TraceBuffer traceBuffer = this.traceId2trace.getOrCreate(traceId); traceBuffer.insertEvent((AbstractTraceEvent) record); } @@ -91,7 +88,7 @@ public class TraceReconstructionFilter extends ConsumerStage<IFlowRecord, TraceE while (iterator.hasNext()) { TraceBuffer traceBuffer = iterator.next(); if (traceBuffer.isFinished()) { // FIXME remove isFinished - this.put(traceBuffer); + this.put(traceBuffer); // BETTER put outside of synchronized iterator.remove(); } } diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/ChwComparisonMethodcallWithPorts.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/ChwComparisonMethodcallWithPorts.java index 118f667e03f85f2d4aaeee00e34ef7eda3308cd8..b44a916dd1c902147697f356a28b158cb54d2f1a 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/ChwComparisonMethodcallWithPorts.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/ChwComparisonMethodcallWithPorts.java @@ -106,7 +106,7 @@ public class ChwComparisonMethodcallWithPorts { // since 04.07.2014 (incl.) assertEquals(86, value14, 5.1); // +16 assertEquals(26, value10, 2.1); // +0 - assertEquals(37, value11, 4.1); // -7 + assertEquals(41, value11, 4.1); // -3 assertEquals(42, value9, 2.1); // +6 assertEquals(44, value15, 4.1); // +0 assertEquals(53, value17, 4.1); // +0 diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReconstruction.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReconstruction.java index bc169bd833e2ab9e73c9a7bde2f74077bf4e8c61..5146d97d5053c89c48b852fd94704f1333b93ad3 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReconstruction.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReconstruction.java @@ -2,7 +2,6 @@ package teetime.variant.methodcallWithPorts.examples.kiekerdays; import java.util.LinkedList; import java.util.List; -import java.util.Map; import teetime.util.concurrent.hashmap.ConcurrentHashMapWithDefault; import teetime.util.concurrent.hashmap.TraceBuffer; @@ -39,14 +38,14 @@ public class TcpTraceReconstruction extends Analysis { public void init() { super.init(); StageWithPort<Void, IMonitoringRecord> tcpPipeline = this.buildTcpPipeline(); - this.tcpThread = new Thread(new RunnableStage(tcpPipeline)); + this.tcpThread = new Thread(new RunnableStage<Void>(tcpPipeline)); this.numWorkerThreads = Math.min(NUM_VIRTUAL_CORES, this.numWorkerThreads); this.workerThreads = new Thread[this.numWorkerThreads]; for (int i = 0; i < this.workerThreads.length; i++) { - StageWithPort<?, ?> pipeline = this.buildPipeline(tcpPipeline); - this.workerThreads[i] = new Thread(new RunnableStage(pipeline)); + StageWithPort<IMonitoringRecord, ?> pipeline = this.buildPipeline(tcpPipeline); + this.workerThreads[i] = new Thread(new RunnableStage<IMonitoringRecord>(pipeline)); } } @@ -63,7 +62,7 @@ public class TcpTraceReconstruction extends Analysis { return pipeline; } - private final Map<Long, TraceBuffer> traceId2trace = new ConcurrentHashMapWithDefault<Long, TraceBuffer>(new TraceBuffer()); + private final ConcurrentHashMapWithDefault<Long, TraceBuffer> traceId2trace = new ConcurrentHashMapWithDefault<Long, TraceBuffer>(new TraceBuffer()); private Pipeline<IMonitoringRecord, ?> buildPipeline(final StageWithPort<Void, IMonitoringRecord> tcpReaderPipeline) { // create stages diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReduction.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReduction.java index 845d18e51b2252908be77733516ec255a235c2a4..ca939ce17bf07af9905baba25804c0004f324182 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReduction.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReduction.java @@ -45,17 +45,17 @@ public class TcpTraceReduction extends Analysis { public void init() { super.init(); StageWithPort<Void, IMonitoringRecord> tcpPipeline = this.buildTcpPipeline(); - this.tcpThread = new Thread(new RunnableStage(tcpPipeline)); + this.tcpThread = new Thread(new RunnableStage<Void>(tcpPipeline)); StageWithPort<Void, Long> clockStage = this.buildClockPipeline(5000); - this.clockThread = new Thread(new RunnableStage(clockStage)); + this.clockThread = new Thread(new RunnableStage<Void>(clockStage)); this.numWorkerThreads = Math.min(NUM_VIRTUAL_CORES, this.numWorkerThreads); this.workerThreads = new Thread[this.numWorkerThreads]; for (int i = 0; i < this.workerThreads.length; i++) { - StageWithPort<?, ?> pipeline = this.buildPipeline(tcpPipeline, clockStage); - this.workerThreads[i] = new Thread(new RunnableStage(pipeline)); + StageWithPort<IMonitoringRecord, ?> pipeline = this.buildPipeline(tcpPipeline, clockStage); + this.workerThreads[i] = new Thread(new RunnableStage<IMonitoringRecord>(pipeline)); } } @@ -87,7 +87,7 @@ public class TcpTraceReduction extends Analysis { return pipeline; } - private final Map<Long, TraceBuffer> traceId2trace = new ConcurrentHashMapWithDefault<Long, TraceBuffer>(new TraceBuffer()); + private final ConcurrentHashMapWithDefault<Long, TraceBuffer> traceId2trace = new ConcurrentHashMapWithDefault<Long, TraceBuffer>(new TraceBuffer()); private final Map<TraceEventRecords, TraceAggregationBuffer> trace2buffer = new TreeMap<TraceEventRecords, TraceAggregationBuffer>(new TraceComperator()); private Pipeline<IMonitoringRecord, ?> buildPipeline(final StageWithPort<Void, IMonitoringRecord> tcpReaderPipeline, diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/recordReader/RecordReaderAnalysis.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/recordReader/RecordReaderAnalysis.java index c9f5718a55ea7891dbf74483c16da6204dadde6f..bc99338571a85498c2cbb3895c4eb4e25f0954d9 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/recordReader/RecordReaderAnalysis.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/recordReader/RecordReaderAnalysis.java @@ -46,8 +46,8 @@ public class RecordReaderAnalysis extends Analysis { @Override public void init() { super.init(); - Pipeline<?, ?> producerPipeline = this.buildProducerPipeline(); - this.producerThread = new Thread(new RunnableStage(producerPipeline)); + Pipeline<File, ?> producerPipeline = this.buildProducerPipeline(); + this.producerThread = new Thread(new RunnableStage<File>(producerPipeline)); } private Pipeline<File, Void> buildProducerPipeline() { diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java index 735590887090505a07be11d218c54d13ec5a66d1..f94e75796ed35982bcfc665550620303522434ed 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java @@ -2,7 +2,6 @@ package teetime.variant.methodcallWithPorts.examples.traceReconstruction; import java.util.LinkedList; import java.util.List; -import java.util.Map; import teetime.util.concurrent.hashmap.ConcurrentHashMapWithDefault; import teetime.util.concurrent.hashmap.TraceBuffer; @@ -13,7 +12,7 @@ import teetime.variant.methodcallWithPorts.framework.core.StageWithPort; import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe; import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe; import teetime.variant.methodcallWithPorts.stage.Clock; -import teetime.variant.methodcallWithPorts.stage.CountingFilter; +import teetime.variant.methodcallWithPorts.stage.Counter; import teetime.variant.methodcallWithPorts.stage.ElementThroughputMeasuringStage; import teetime.variant.methodcallWithPorts.stage.EndStage; import teetime.variant.methodcallWithPorts.stage.InstanceOfFilter; @@ -33,10 +32,10 @@ public class TcpTraceReconstructionAnalysis extends Analysis { private Thread clock2Thread; private Thread workerThread; - private final Map<Long, TraceBuffer> traceId2trace = new ConcurrentHashMapWithDefault<Long, TraceBuffer>(new TraceBuffer()); + private final ConcurrentHashMapWithDefault<Long, TraceBuffer> traceId2trace = new ConcurrentHashMapWithDefault<Long, TraceBuffer>(new TraceBuffer()); - private CountingFilter<IMonitoringRecord> recordCounter; - private CountingFilter<TraceEventRecords> traceCounter; + private Counter<IMonitoringRecord> recordCounter; + private Counter<TraceEventRecords> traceCounter; private ElementThroughputMeasuringStage<IFlowRecord> recordThroughputFilter; private ElementThroughputMeasuringStage<TraceEventRecords> traceThroughputFilter; @@ -44,13 +43,13 @@ public class TcpTraceReconstructionAnalysis extends Analysis { public void init() { super.init(); StageWithPort<Void, Long> clockStage = this.buildClockPipeline(1000); - this.clockThread = new Thread(new RunnableStage(clockStage)); + this.clockThread = new Thread(new RunnableStage<Void>(clockStage)); StageWithPort<Void, Long> clock2Stage = this.buildClockPipeline(2000); - this.clock2Thread = new Thread(new RunnableStage(clock2Stage)); + this.clock2Thread = new Thread(new RunnableStage<Void>(clock2Stage)); - Pipeline<?, ?> pipeline = this.buildPipeline(clockStage, clock2Stage); - this.workerThread = new Thread(new RunnableStage(pipeline)); + Pipeline<Void, ?> pipeline = this.buildPipeline(clockStage, clock2Stage); + this.workerThread = new Thread(new RunnableStage<Void>(pipeline)); } private StageWithPort<Void, Long> buildClockPipeline(final long intervalDelayInMs) { @@ -70,13 +69,13 @@ public class TcpTraceReconstructionAnalysis extends Analysis { private Pipeline<Void, TraceEventRecords> buildPipeline(final StageWithPort<Void, Long> clockStage, final StageWithPort<Void, Long> clock2Stage) { // create stages TCPReader tcpReader = new TCPReader(); - this.recordCounter = new CountingFilter<IMonitoringRecord>(); + this.recordCounter = new Counter<IMonitoringRecord>(); final InstanceOfFilter<IMonitoringRecord, IFlowRecord> instanceOfFilter = new InstanceOfFilter<IMonitoringRecord, IFlowRecord>( IFlowRecord.class); this.recordThroughputFilter = new ElementThroughputMeasuringStage<IFlowRecord>(); final TraceReconstructionFilter traceReconstructionFilter = new TraceReconstructionFilter(this.traceId2trace); this.traceThroughputFilter = new ElementThroughputMeasuringStage<TraceEventRecords>(); - this.traceCounter = new CountingFilter<TraceEventRecords>(); + this.traceCounter = new Counter<TraceEventRecords>(); EndStage<TraceEventRecords> endStage = new EndStage<TraceEventRecords>(); // connect stages diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TraceReconstructionAnalysis.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TraceReconstructionAnalysis.java index c85f066cbbe3a8b82027577d0f39ac0536deb964..cbfe94b3c5702bc7235f2157e2bb4b179c9e2730 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TraceReconstructionAnalysis.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TraceReconstructionAnalysis.java @@ -3,7 +3,6 @@ package teetime.variant.methodcallWithPorts.examples.traceReconstruction; import java.io.File; import java.util.LinkedList; import java.util.List; -import java.util.Map; import teetime.util.concurrent.hashmap.ConcurrentHashMapWithDefault; import teetime.util.concurrent.hashmap.TraceBuffer; @@ -16,7 +15,7 @@ import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe; import teetime.variant.methodcallWithPorts.stage.Cache; import teetime.variant.methodcallWithPorts.stage.Clock; import teetime.variant.methodcallWithPorts.stage.CollectorSink; -import teetime.variant.methodcallWithPorts.stage.CountingFilter; +import teetime.variant.methodcallWithPorts.stage.Counter; import teetime.variant.methodcallWithPorts.stage.ElementThroughputMeasuringStage; import teetime.variant.methodcallWithPorts.stage.InstanceOfFilter; import teetime.variant.methodcallWithPorts.stage.kieker.Dir2RecordsFilter; @@ -38,10 +37,10 @@ public class TraceReconstructionAnalysis extends Analysis { private Thread workerThread; private ClassNameRegistryRepository classNameRegistryRepository; - private final Map<Long, TraceBuffer> traceId2trace = new ConcurrentHashMapWithDefault<Long, TraceBuffer>(new TraceBuffer()); + private final ConcurrentHashMapWithDefault<Long, TraceBuffer> traceId2trace = new ConcurrentHashMapWithDefault<Long, TraceBuffer>(new TraceBuffer()); - private CountingFilter<IMonitoringRecord> recordCounter; - private CountingFilter<TraceEventRecords> traceCounter; + private Counter<IMonitoringRecord> recordCounter; + private Counter<TraceEventRecords> traceCounter; private ElementThroughputMeasuringStage<IFlowRecord> throughputFilter; private File inputDir; @@ -68,7 +67,7 @@ public class TraceReconstructionAnalysis extends Analysis { // create stages final Dir2RecordsFilter dir2RecordsFilter = new Dir2RecordsFilter(this.classNameRegistryRepository); - this.recordCounter = new CountingFilter<IMonitoringRecord>(); + this.recordCounter = new Counter<IMonitoringRecord>(); final Cache<IMonitoringRecord> cache = new Cache<IMonitoringRecord>(); final StringBufferFilter<IMonitoringRecord> stringBufferFilter = new StringBufferFilter<IMonitoringRecord>(); @@ -76,7 +75,7 @@ public class TraceReconstructionAnalysis extends Analysis { IFlowRecord.class); this.throughputFilter = new ElementThroughputMeasuringStage<IFlowRecord>(); final TraceReconstructionFilter traceReconstructionFilter = new TraceReconstructionFilter(this.traceId2trace); - this.traceCounter = new CountingFilter<TraceEventRecords>(); + this.traceCounter = new Counter<TraceEventRecords>(); final CollectorSink<TraceEventRecords> collector = new CollectorSink<TraceEventRecords>(this.elementCollection); // configure stages diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest.java index a8130d71d5d6c38bb781deed0cfca6d362a58717..fad06d80c5b9fe52768d88fda5a09b5b45b29049 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest.java @@ -40,6 +40,8 @@ import kieker.common.record.IMonitoringRecord; @FixMethodOrder(MethodSorters.NAME_ASCENDING) public class ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest { + private static final int EXPECTED_NUM_TRACES = 1000000; + private StopWatch stopWatch; @Before @@ -101,6 +103,9 @@ public class ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest { } System.out.println("Max size of tcp-relay pipe: " + maxSize); + // System.out.println("#trace meta data read: " + analysis.getNumTraceMetadatas()); + // System.out.println("Max #trace created: " + analysis.getMaxElementsCreated()); + // Map<Double, Long> recordQuintiles = StatisticsUtil.calculateQuintiles(analysis.getRecordDelays()); // System.out.println("Median record delay: " + recordQuintiles.get(0.5) + " time units/record"); @@ -115,11 +120,13 @@ public class ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest { // TraceEventRecords trace6886 = analysis.getElementCollection().get(1); // assertEquals(6886, trace6886.getTraceMetadata().getTraceId()); - // assertEquals(21001, analysis.getNumRecords()); assertEquals("#records", 21000001, analysis.getNumRecords()); - // assertEquals(1000, analysis.getNumTraces()); - assertEquals("#traces", 1000000, analysis.getNumTraces()); + for (Integer count : analysis.getNumTraceMetadatas()) { + assertEquals("#traceMetadata per worker thread", EXPECTED_NUM_TRACES / numWorkerThreads, count.intValue()); // even distribution + } + + assertEquals("#traces", EXPECTED_NUM_TRACES, analysis.getNumTraces()); } public static void main(final String[] args) { diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreads.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreads.java index bc44c68d3f2266b3bec93eb7ce4e9cc33f867dc6..0fb446fb74f08635ab75046507665e5fa24b24d3 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreads.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreads.java @@ -5,7 +5,6 @@ import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import java.util.LinkedList; import java.util.List; -import java.util.Map; import teetime.util.concurrent.hashmap.ConcurrentHashMapWithDefault; import teetime.util.concurrent.hashmap.TraceBuffer; @@ -16,10 +15,11 @@ import teetime.variant.methodcallWithPorts.framework.core.StageWithPort; import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe; import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe; import teetime.variant.methodcallWithPorts.stage.Clock; -import teetime.variant.methodcallWithPorts.stage.CountingFilter; +import teetime.variant.methodcallWithPorts.stage.Counter; import teetime.variant.methodcallWithPorts.stage.ElementDelayMeasuringStage; import teetime.variant.methodcallWithPorts.stage.ElementThroughputMeasuringStage; import teetime.variant.methodcallWithPorts.stage.EndStage; +import teetime.variant.methodcallWithPorts.stage.InstanceCounter; import teetime.variant.methodcallWithPorts.stage.InstanceOfFilter; import teetime.variant.methodcallWithPorts.stage.Relay; import teetime.variant.methodcallWithPorts.stage.basic.distributor.Distributor; @@ -29,6 +29,7 @@ import teetime.variant.methodcallWithPorts.stage.kieker.traceReconstruction.Trac import kieker.analysis.plugin.filter.flow.TraceEventRecords; import kieker.common.record.IMonitoringRecord; import kieker.common.record.flow.IFlowRecord; +import kieker.common.record.flow.trace.TraceMetadata; public class TcpTraceReconstructionAnalysisWithThreads extends Analysis { @@ -72,7 +73,7 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis { SingleElementPipe.connect(tcpReader.getOutputPort(), distributor.getInputPort()); // create and configure pipeline - Pipeline<Void, IMonitoringRecord> pipeline = new Pipeline<Void, IMonitoringRecord>(); + Pipeline<Void, IMonitoringRecord> pipeline = new Pipeline<Void, IMonitoringRecord>("TCP reader pipeline"); pipeline.setFirstStage(tcpReader); pipeline.setLastStage(distributor); return pipeline; @@ -123,11 +124,13 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis { } } - private final Map<Long, TraceBuffer> traceId2trace = new ConcurrentHashMapWithDefault<Long, TraceBuffer>(new TraceBuffer()); + private final ConcurrentHashMapWithDefault<Long, TraceBuffer> traceId2trace = new ConcurrentHashMapWithDefault<Long, TraceBuffer>(new TraceBuffer()); - private final StageFactory<CountingFilter<IMonitoringRecord>> recordCounterFactory; + private final StageFactory<Counter<IMonitoringRecord>> recordCounterFactory; private final StageFactory<ElementDelayMeasuringStage<IMonitoringRecord>> recordThroughputFilterFactory; - private final StageFactory<CountingFilter<TraceEventRecords>> traceCounterFactory; + private final StageFactory<InstanceCounter<IMonitoringRecord, TraceMetadata>> traceMetadataCounterFactory; + private final StageFactory<TraceReconstructionFilter> traceReconstructionFilterFactory; + private final StageFactory<Counter<TraceEventRecords>> traceCounterFactory; private final StageFactory<ElementThroughputMeasuringStage<TraceEventRecords>> traceThroughputFilterFactory; private final List<SpScPipe<IMonitoringRecord>> tcpRelayPipes = new LinkedList<SpScPipe<IMonitoringRecord>>(); @@ -135,9 +138,11 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis { @SuppressWarnings({ "rawtypes", "unchecked" }) public TcpTraceReconstructionAnalysisWithThreads() { try { - this.recordCounterFactory = new StageFactory(CountingFilter.class.getConstructor()); + this.recordCounterFactory = new StageFactory(Counter.class.getConstructor()); this.recordThroughputFilterFactory = new StageFactory(ElementDelayMeasuringStage.class.getConstructor()); - this.traceCounterFactory = new StageFactory(CountingFilter.class.getConstructor()); + this.traceMetadataCounterFactory = new StageFactory(InstanceCounter.class.getConstructor(Class.class)); + this.traceReconstructionFilterFactory = new StageFactory(TraceReconstructionFilter.class.getConstructor(ConcurrentHashMapWithDefault.class)); + this.traceCounterFactory = new StageFactory(Counter.class.getConstructor()); this.traceThroughputFilterFactory = new StageFactory(ElementThroughputMeasuringStage.class.getConstructor()); } catch (NoSuchMethodException e) { throw new IllegalArgumentException(e); @@ -150,12 +155,14 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis { final StageWithPort<Void, Long> clockStage) { // create stages Relay<IMonitoringRecord> relay = new Relay<IMonitoringRecord>(); - CountingFilter<IMonitoringRecord> recordCounter = this.recordCounterFactory.create(); + Counter<IMonitoringRecord> recordCounter = this.recordCounterFactory.create(); + InstanceCounter<IMonitoringRecord, TraceMetadata> traceMetadataCounter = this.traceMetadataCounterFactory.create(TraceMetadata.class); + new InstanceCounter<IMonitoringRecord, TraceMetadata>(TraceMetadata.class); final InstanceOfFilter<IMonitoringRecord, IFlowRecord> instanceOfFilter = new InstanceOfFilter<IMonitoringRecord, IFlowRecord>( IFlowRecord.class); // ElementDelayMeasuringStage<IMonitoringRecord> recordThroughputFilter = this.recordThroughputFilterFactory.create(); - final TraceReconstructionFilter traceReconstructionFilter = new TraceReconstructionFilter(this.traceId2trace); - CountingFilter<TraceEventRecords> traceCounter = this.traceCounterFactory.create(); + final TraceReconstructionFilter traceReconstructionFilter = this.traceReconstructionFilterFactory.create(this.traceId2trace); + Counter<TraceEventRecords> traceCounter = this.traceCounterFactory.create(); ElementThroughputMeasuringStage<TraceEventRecords> traceThroughputFilter = this.traceThroughputFilterFactory.create(); EndStage<TraceEventRecords> endStage = new EndStage<TraceEventRecords>(); // EndStage<IMonitoringRecord> endStage = new EndStage<IMonitoringRecord>(); @@ -166,7 +173,8 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis { // SysOutFilter<TraceEventRecords> sysout = new SysOutFilter<TraceEventRecords>(tcpRelayPipe); SingleElementPipe.connect(relay.getOutputPort(), recordCounter.getInputPort()); - SingleElementPipe.connect(recordCounter.getOutputPort(), instanceOfFilter.getInputPort()); + SingleElementPipe.connect(recordCounter.getOutputPort(), traceMetadataCounter.getInputPort()); + SingleElementPipe.connect(traceMetadataCounter.getOutputPort(), instanceOfFilter.getInputPort()); // SingleElementPipe.connect(relay.getOutputPort(), instanceOfFilter.getInputPort()); // SingleElementPipe.connect(relay.getOutputPort(), sysout.getInputPort()); // SingleElementPipe.connect(sysout.getOutputPort(), endStage.getInputPort()); @@ -189,10 +197,11 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis { SpScPipe.connect(clockStage.getOutputPort(), traceThroughputFilter.getTriggerInputPort(), 10); // create and configure pipeline - Pipeline<IMonitoringRecord, TraceEventRecords> pipeline = new Pipeline<IMonitoringRecord, TraceEventRecords>(); + Pipeline<IMonitoringRecord, TraceEventRecords> pipeline = new Pipeline<IMonitoringRecord, TraceEventRecords>("Worker pipeline"); pipeline.setFirstStage(relay); pipeline.addIntermediateStage(recordCounter); // pipeline.addIntermediateStage(recordThroughputFilter); + pipeline.addIntermediateStage(traceMetadataCounter); pipeline.addIntermediateStage(instanceOfFilter); // pipeline.addIntermediateStage(this.recordThroughputFilter); pipeline.addIntermediateStage(traceReconstructionFilter); @@ -200,6 +209,7 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis { pipeline.addIntermediateStage(traceCounter); // pipeline.addIntermediateStage(sysout); pipeline.setLastStage(endStage); + return pipeline; } @@ -234,7 +244,7 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis { public int getNumRecords() { int sum = 0; - for (CountingFilter<IMonitoringRecord> stage : this.recordCounterFactory.getStages()) { + for (Counter<IMonitoringRecord> stage : this.recordCounterFactory.getStages()) { sum += stage.getNumElementsPassed(); } return sum; @@ -242,7 +252,7 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis { public int getNumTraces() { int sum = 0; - for (CountingFilter<TraceEventRecords> stage : this.traceCounterFactory.getStages()) { + for (Counter<TraceEventRecords> stage : this.traceCounterFactory.getStages()) { sum += stage.getNumElementsPassed(); } return sum; @@ -264,6 +274,14 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis { return throughputs; } + public List<Integer> getNumTraceMetadatas() { + List<Integer> numTraceMetadatas = new LinkedList<Integer>(); + for (InstanceCounter<IMonitoringRecord, TraceMetadata> stage : this.traceMetadataCounterFactory.getStages()) { + numTraceMetadatas.add(stage.getCounter()); + } + return numTraceMetadatas; + } + public List<SpScPipe<IMonitoringRecord>> getTcpRelayPipes() { return this.tcpRelayPipes; } @@ -276,4 +294,8 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis { this.numWorkerThreads = numWorkerThreads; } + public int getMaxElementsCreated() { + return this.traceId2trace.getMaxElements(); + } + } diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReductionWithThreads/TcpTraceReductionAnalysisWithThreads.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReductionWithThreads/TcpTraceReductionAnalysisWithThreads.java index f1a5f78651a1b3a101a9dc0475b8b524777daacb..070aac3c5d46124469ac7561351bba3142c0f8ae 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReductionWithThreads/TcpTraceReductionAnalysisWithThreads.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReductionWithThreads/TcpTraceReductionAnalysisWithThreads.java @@ -17,7 +17,7 @@ import teetime.variant.methodcallWithPorts.framework.core.StageWithPort; import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe; import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe; import teetime.variant.methodcallWithPorts.stage.Clock; -import teetime.variant.methodcallWithPorts.stage.CountingFilter; +import teetime.variant.methodcallWithPorts.stage.Counter; import teetime.variant.methodcallWithPorts.stage.ElementDelayMeasuringStage; import teetime.variant.methodcallWithPorts.stage.ElementThroughputMeasuringStage; import teetime.variant.methodcallWithPorts.stage.EndStage; @@ -53,20 +53,20 @@ public class TcpTraceReductionAnalysisWithThreads extends Analysis { public void init() { super.init(); StageWithPort<Void, IMonitoringRecord> tcpPipeline = this.buildTcpPipeline(); - this.tcpThread = new Thread(new RunnableStage(tcpPipeline)); + this.tcpThread = new Thread(new RunnableStage<Void>(tcpPipeline)); StageWithPort<Void, Long> clockStage = this.buildClockPipeline(1000); - this.clockThread = new Thread(new RunnableStage(clockStage)); + this.clockThread = new Thread(new RunnableStage<Void>(clockStage)); StageWithPort<Void, Long> clock2Stage = this.buildClockPipeline(5000); - this.clock2Thread = new Thread(new RunnableStage(clock2Stage)); + this.clock2Thread = new Thread(new RunnableStage<Void>(clock2Stage)); this.numWorkerThreads = Math.min(NUM_VIRTUAL_CORES, this.numWorkerThreads); this.workerThreads = new Thread[this.numWorkerThreads]; for (int i = 0; i < this.workerThreads.length; i++) { - StageWithPort<?, ?> pipeline = this.buildPipeline(tcpPipeline, clockStage, clock2Stage); - this.workerThreads[i] = new Thread(new RunnableStage(pipeline)); + StageWithPort<IMonitoringRecord, ?> pipeline = this.buildPipeline(tcpPipeline, clockStage, clock2Stage); + this.workerThreads[i] = new Thread(new RunnableStage<IMonitoringRecord>(pipeline)); } } @@ -128,19 +128,20 @@ public class TcpTraceReductionAnalysisWithThreads extends Analysis { } } - private final Map<Long, TraceBuffer> traceId2trace = new ConcurrentHashMapWithDefault<Long, TraceBuffer>(new TraceBuffer()); + private final ConcurrentHashMapWithDefault<Long, TraceBuffer> traceId2trace = new ConcurrentHashMapWithDefault<Long, TraceBuffer>(new TraceBuffer()); private final Map<TraceEventRecords, TraceAggregationBuffer> trace2buffer = new TreeMap<TraceEventRecords, TraceAggregationBuffer>(new TraceComperator()); - private final StageFactory<CountingFilter<IMonitoringRecord>> recordCounterFactory; + private final StageFactory<Counter<IMonitoringRecord>> recordCounterFactory; private final StageFactory<ElementDelayMeasuringStage<IMonitoringRecord>> recordThroughputFilterFactory; - private final StageFactory<CountingFilter<TraceEventRecords>> traceCounterFactory; + private final StageFactory<Counter<TraceEventRecords>> traceCounterFactory; private final StageFactory<ElementThroughputMeasuringStage<TraceEventRecords>> traceThroughputFilterFactory; + @SuppressWarnings({ "rawtypes", "unchecked" }) public TcpTraceReductionAnalysisWithThreads() { try { - this.recordCounterFactory = new StageFactory(CountingFilter.class.getConstructor()); + this.recordCounterFactory = new StageFactory(Counter.class.getConstructor()); this.recordThroughputFilterFactory = new StageFactory(ElementDelayMeasuringStage.class.getConstructor()); - this.traceCounterFactory = new StageFactory(CountingFilter.class.getConstructor()); + this.traceCounterFactory = new StageFactory(Counter.class.getConstructor()); this.traceThroughputFilterFactory = new StageFactory(ElementThroughputMeasuringStage.class.getConstructor()); } catch (NoSuchMethodException e) { throw new IllegalArgumentException(e); @@ -154,13 +155,13 @@ public class TcpTraceReductionAnalysisWithThreads extends Analysis { final StageWithPort<Void, Long> clock2Stage) { // create stages Relay<IMonitoringRecord> relay = new Relay<IMonitoringRecord>(); - CountingFilter<IMonitoringRecord> recordCounter = this.recordCounterFactory.create(); + // Counter<IMonitoringRecord> recordCounter = this.recordCounterFactory.create(); final InstanceOfFilter<IMonitoringRecord, IFlowRecord> instanceOfFilter = new InstanceOfFilter<IMonitoringRecord, IFlowRecord>( IFlowRecord.class); - ElementDelayMeasuringStage<IMonitoringRecord> recordThroughputFilter = this.recordThroughputFilterFactory.create(); + // ElementDelayMeasuringStage<IMonitoringRecord> recordThroughputFilter = this.recordThroughputFilterFactory.create(); final TraceReconstructionFilter traceReconstructionFilter = new TraceReconstructionFilter(this.traceId2trace); TraceReductionFilter traceReductionFilter = new TraceReductionFilter(this.trace2buffer); - CountingFilter<TraceEventRecords> traceCounter = this.traceCounterFactory.create(); + // Counter<TraceEventRecords> traceCounter = this.traceCounterFactory.create(); ElementThroughputMeasuringStage<TraceEventRecords> traceThroughputFilter = this.traceThroughputFilterFactory.create(); EndStage<TraceEventRecords> endStage = new EndStage<TraceEventRecords>(); @@ -222,7 +223,7 @@ public class TcpTraceReductionAnalysisWithThreads extends Analysis { public int getNumRecords() { int sum = 0; - for (CountingFilter<IMonitoringRecord> stage : this.recordCounterFactory.getStages()) { + for (Counter<IMonitoringRecord> stage : this.recordCounterFactory.getStages()) { sum += stage.getNumElementsPassed(); } return sum; @@ -230,7 +231,7 @@ public class TcpTraceReductionAnalysisWithThreads extends Analysis { public int getNumTraces() { int sum = 0; - for (CountingFilter<TraceEventRecords> stage : this.traceCounterFactory.getStages()) { + for (Counter<TraceEventRecords> stage : this.traceCounterFactory.getStages()) { sum += stage.getNumElementsPassed(); } return sum;