From 5402bf386a6ed44e9f01af53979938c9a77a5503 Mon Sep 17 00:00:00 2001
From: Florian Fittkau <ffi@informatik.uni-kiel.de>
Date: Fri, 9 Aug 2013 12:59:07 +0200
Subject: [PATCH] minor

---
 .classpath                                    |   2 +-
 .../connector/RabbitMQConnector.java          |   2 +-
 .../filter/CountingThroughputFilter.java      | 346 ++++++++++++++++++
 .../EventRecordTraceReconstructionFilter.java |   2 +-
 .../TraceEventRecordAggregationFilter.java    |   2 +-
 .../worker/main/WorkerController.xtend        |  84 +++--
 src/kieker/analysis/plugin/Bits.java          |  84 -----
 .../forward/CountingThroughputFilter.java     | 308 ----------------
 .../plugin/reader/mq/RabbitMQReader.java      |   2 +-
 9 files changed, 392 insertions(+), 440 deletions(-)
 create mode 100644 src/explorviz/hpc_monitoring/filter/CountingThroughputFilter.java
 rename src/explorviz/hpc_monitoring/{plugin => filter}/EventRecordTraceReconstructionFilter.java (99%)
 rename src/explorviz/hpc_monitoring/{plugin => filter}/TraceEventRecordAggregationFilter.java (99%)
 delete mode 100644 src/kieker/analysis/plugin/Bits.java
 delete mode 100644 src/kieker/analysis/plugin/filter/forward/CountingThroughputFilter.java

diff --git a/.classpath b/.classpath
index f0fe335..578cc5c 100644
--- a/.classpath
+++ b/.classpath
@@ -2,7 +2,7 @@
 <classpath>
 	<classpathentry kind="con" path="org.eclipse.xtend.XTEND_CONTAINER"/>
 	<classpathentry kind="src" path="xtend-gen"/>
-	<classpathentry kind="src" path="src"/>
+	<classpathentry excluding="kieker/analysis/plugin/reader/mq/" kind="src" path="src"/>
 	<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
 	<classpathentry kind="lib" path="lib/rabbitmq-client.jar"/>
 	<classpathentry kind="lib" path="lib/kieker-1.8-SNAPSHOT.jar"/>
diff --git a/src/explorviz/hpc_monitoring/connector/RabbitMQConnector.java b/src/explorviz/hpc_monitoring/connector/RabbitMQConnector.java
index 2f1f274..f46f670 100644
--- a/src/explorviz/hpc_monitoring/connector/RabbitMQConnector.java
+++ b/src/explorviz/hpc_monitoring/connector/RabbitMQConnector.java
@@ -20,7 +20,6 @@ import java.io.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import kieker.analysis.IProjectContext;
-import kieker.analysis.plugin.Bits;
 import kieker.analysis.plugin.annotation.*;
 import kieker.analysis.plugin.filter.AbstractFilterPlugin;
 import kieker.common.configuration.Configuration;
@@ -28,6 +27,7 @@ import kieker.common.logging.Log;
 import kieker.common.logging.LogFactory;
 import kieker.common.record.IMonitoringRecord;
 import com.rabbitmq.client.*;
+import explorviz.hpc_monitoring.Bits;
 import explorviz.hpc_monitoring.StringRegistryRecord;
 
 /**
diff --git a/src/explorviz/hpc_monitoring/filter/CountingThroughputFilter.java b/src/explorviz/hpc_monitoring/filter/CountingThroughputFilter.java
new file mode 100644
index 0000000..a9eca6e
--- /dev/null
+++ b/src/explorviz/hpc_monitoring/filter/CountingThroughputFilter.java
@@ -0,0 +1,346 @@
+/***************************************************************************
+ * Copyright 2013 Kieker Project (http://kieker-monitoring.net)
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ***************************************************************************/
+
+package explorviz.hpc_monitoring.filter;
+
+import java.util.*;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import kieker.analysis.IProjectContext;
+import kieker.analysis.plugin.annotation.*;
+import kieker.analysis.plugin.filter.AbstractFilterPlugin;
+import kieker.common.configuration.Configuration;
+import kieker.common.logging.Log;
+import kieker.common.logging.LogFactory;
+import kieker.common.record.IMonitoringRecord;
+
+// import kieker.common.util.ImmutableEntry;
+
+/**
+ * An instance of this class computes the throughput in terms of the number of
+ * events received per time unit.
+ * 
+ * Note that only one of the input ports should be used in a configuration!
+ * 
+ * @author Andre van Hoorn, Jan Waller
+ * 
+ * @since 1.6
+ */
+@Plugin(description = "A filter computing the throughput in terms of the number of events received per time unit", outputPorts = {
+        @OutputPort(name = CountingThroughputFilter.OUTPUT_PORT_NAME_RELAYED_OBJECTS, eventTypes = { Object.class }, description = "Provides each incoming object"),
+        @OutputPort(name = CountingThroughputFilter.OUTPUT_PORT_NAME_THROUGHPUT, eventTypes = { Object.class }, description = "Provides throughput per interval") }, configuration = {
+        @Property(name = CountingThroughputFilter.CONFIG_PROPERTY_NAME_TIMEUNIT, defaultValue = CountingThroughputFilter.CONFIG_PROPERTY_VALUE_TIMEUNIT),
+        @Property(name = CountingThroughputFilter.CONFIG_PROPERTY_NAME_INTERVAL_SIZE, defaultValue = CountingThroughputFilter.CONFIG_PROPERTY_VALUE_INTERVAL_SIZE_ONE_MINUTE),
+        @Property(name = CountingThroughputFilter.CONFIG_PROPERTY_NAME_INTERVALS_BASED_ON_1ST_TSTAMP, defaultValue = "true") })
+public final class CountingThroughputFilter extends AbstractFilterPlugin {
+
+    /**
+     * The name of the input port receiving the records.
+     */
+    public static final String             INPUT_PORT_NAME_RECORDS                            = "inputRecords";
+    /**
+     * The name of the input port receiving other objects.
+     */
+    public static final String             INPUT_PORT_NAME_OBJECTS                            = "inputObjects";
+
+    /**
+     * The name of the output port delivering the received objects.
+     */
+    public static final String             OUTPUT_PORT_NAME_RELAYED_OBJECTS                   = "relayedEvents";
+    public static final String             OUTPUT_PORT_NAME_THROUGHPUT                        = "throughputPerInterval";
+
+    /** The name of the property determining the time unit. */
+    public static final String             CONFIG_PROPERTY_NAME_TIMEUNIT                      = "timeunit";
+    /** The name of the property determining the interval size. */
+    public static final String             CONFIG_PROPERTY_NAME_INTERVAL_SIZE                 = "intervalSize";
+
+    /**
+     * The default value of the time unit property (nanoseconds).
+     */
+    public static final String             CONFIG_PROPERTY_VALUE_TIMEUNIT                     = "NANOSECONDS";                                 // TimeUnit.NANOSECONDS.name()
+
+    /**
+     * If the value is set to false, the intervals are computed based on time
+     * since 1970-1-1.
+     */
+    public static final String             CONFIG_PROPERTY_NAME_INTERVALS_BASED_ON_1ST_TSTAMP = "intervalsBasedOn1stTstamp";
+
+    /**
+     * The configuration property value for
+     * {@link #CONFIG_PROPERTY_NAME_INTERVAL_SIZE}, leading to a bin size of 1
+     * minute.
+     */
+    public static final String             CONFIG_PROPERTY_VALUE_INTERVAL_SIZE_ONE_MINUTE     = "60000000000";
+
+    private static final Log               LOG                                                = LogFactory
+                                                                                                      .getLog(CountingThroughputFilter.class);
+
+    private volatile long                  firstIntervalStart                                 = -1;
+    private final boolean                  intervalsBasedOn1stTstamp;
+    private final TimeUnit                 timeunit;
+
+    /**
+     * For a key <i>k</i>, the {@link Queue} stores the number of events
+     * observed in the time interval <i>(k-intervalSize,k(</i>, i.e.,
+     * the interval <b>excludes</b> the value <i>k</i>.
+     */
+    private final Queue<Entry<Long, Long>> eventCountsPerInterval                             = new ConcurrentLinkedQueue<Entry<Long, Long>>();
+
+    private final long                     intervalSize;
+
+    private final AtomicLong               currentCountForCurrentInterval                     = new AtomicLong(
+                                                                                                      0);
+
+    private volatile long                  firstTimestampInCurrentInterval                    = -1;                                            // initialized
+                                                                                                                                                // with
+                                                                                                                                                // the
+                                                                                                                                                // first
+                                                                                                                                                // incoming
+                                                                                                                                                // event
+    private volatile long                  lastTimestampInCurrentInterval                     = -1;                                            // initialized
+                                                                                                                                                // with
+                                                                                                                                                // the
+                                                                                                                                                // first
+                                                                                                                                                // incoming
+                                                                                                                                                // event
+
+    /**
+     * Creates a new instance of this class using the given parameters.
+     * 
+     * @param configuration
+     *            The configuration for this component.
+     * @param projectContext
+     *            The project context for this component.
+     */
+    public CountingThroughputFilter(final Configuration configuration,
+            final IProjectContext projectContext) {
+        super(configuration, projectContext);
+
+        final String recordTimeunitProperty = projectContext
+                .getProperty(IProjectContext.CONFIG_PROPERTY_NAME_RECORDS_TIME_UNIT);
+        TimeUnit recordTimeunit;
+        try {
+            recordTimeunit = TimeUnit.valueOf(recordTimeunitProperty);
+        }
+        catch (final IllegalArgumentException ex) { // already caught in
+                                                    // AnalysisController,
+                                                    // should never happen
+            LOG.warn(recordTimeunitProperty
+                    + " is no valid TimeUnit! Using NANOSECONDS instead.");
+            recordTimeunit = TimeUnit.NANOSECONDS;
+        }
+        timeunit = recordTimeunit;
+
+        final String configTimeunitProperty = configuration
+                .getStringProperty(CONFIG_PROPERTY_NAME_TIMEUNIT);
+        TimeUnit configTimeunit;
+        try {
+            configTimeunit = TimeUnit.valueOf(configTimeunitProperty);
+        }
+        catch (final IllegalArgumentException ex) {
+            LOG.warn(configTimeunitProperty
+                    + " is no valid TimeUnit! Using inherited value of "
+                    + timeunit.name() + " instead.");
+            configTimeunit = timeunit;
+        }
+
+        intervalSize = timeunit.convert(configuration
+                .getLongProperty(CONFIG_PROPERTY_NAME_INTERVAL_SIZE),
+                configTimeunit);
+        intervalsBasedOn1stTstamp = configuration
+                .getBooleanProperty(CONFIG_PROPERTY_NAME_INTERVALS_BASED_ON_1ST_TSTAMP);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public final Configuration getCurrentConfiguration() {
+        final Configuration configuration = new Configuration();
+        configuration.setProperty(CONFIG_PROPERTY_NAME_TIMEUNIT,
+                timeunit.name());
+        configuration.setProperty(CONFIG_PROPERTY_NAME_INTERVAL_SIZE,
+                Long.toString(intervalSize));
+        configuration.setProperty(
+                CONFIG_PROPERTY_NAME_INTERVALS_BASED_ON_1ST_TSTAMP,
+                Boolean.toString(intervalsBasedOn1stTstamp));
+        return configuration;
+    }
+
+    private void processEvent(final Object event, final long currentTime) {
+        final long startOfTimestampsInterval = computeFirstTimestampInInterval(currentTime);
+        final long endOfTimestampsInterval = computeLastTimestampInInterval(currentTime);
+
+        synchronized (this) {
+            // Check if we need to close the current interval.
+            if (endOfTimestampsInterval > lastTimestampInCurrentInterval) {
+                if (firstTimestampInCurrentInterval >= 0) { // don't do this for
+                                                            // the first record
+                                                            // (only used for
+                                                            // initialization of
+                                                            // variables)
+                    long currentCount = currentCountForCurrentInterval.get();
+                    // this.eventCountsPerInterval.add(
+                    // new ImmutableEntry<Long, Long>(
+                    // this.lastTimestampInCurrentInterval + 1,
+                    // currentCount));
+                    super.deliver(OUTPUT_PORT_NAME_THROUGHPUT, currentCount);
+
+                    // long numIntervalsElapsed = 1; // refined below
+                    // numIntervalsElapsed = (endOfTimestampsInterval -
+                    // this.lastTimestampInCurrentInterval) / this.intervalSize;
+                    // if (numIntervalsElapsed > 1) { // NOPMD
+                    // (AvoidDeeplyNestedIfStmts)
+                    // for (int i = 1; i < numIntervalsElapsed; i++) {
+                    // this.eventCountsPerInterval.add(
+                    // new ImmutableEntry<Long,
+                    // Long>((this.lastTimestampInCurrentInterval + (i *
+                    // this.intervalSize)) + 1, 0L));
+                    // }
+                    // }
+
+                }
+
+                firstTimestampInCurrentInterval = startOfTimestampsInterval;
+                lastTimestampInCurrentInterval = endOfTimestampsInterval;
+                currentCountForCurrentInterval.set(0);
+            }
+
+            currentCountForCurrentInterval.incrementAndGet(); // only
+                                                              // incremented in
+                                                              // synchronized
+                                                              // blocks
+        }
+        super.deliver(OUTPUT_PORT_NAME_RELAYED_OBJECTS, event);
+    }
+
+    /**
+     * This method represents the input port for incoming records.
+     * 
+     * @param record
+     *            The next record.
+     */
+    // #841 What happens with unordered events (i.e., timestamps before
+    // firstTimestampInCurrentInterval)?
+    @InputPort(name = INPUT_PORT_NAME_RECORDS, eventTypes = { IMonitoringRecord.class }, description = "Receives incoming monitoring records to be considered for the throughput computation and uses the record's logging timestamp")
+    public final void inputRecord(final IMonitoringRecord record) {
+        processEvent(record, record.getLoggingTimestamp());
+    }
+
+    /**
+     * This method represents the input port for incoming object.
+     * 
+     * @param object
+     *            The next object.
+     */
+    @InputPort(name = INPUT_PORT_NAME_OBJECTS, eventTypes = { Object.class }, description = "Receives incoming objects to be considered for the throughput computation and uses the current system time")
+    public final void inputObjects(final Object object) {
+        processEvent(object, currentTime());
+    }
+
+    /**
+     * Returns the current time in {@link TimeUnit#MILLISECONDS} since 1970.
+     * 
+     * @return The current time
+     */
+    private long currentTime() {
+        return timeunit.convert(System.currentTimeMillis(),
+                TimeUnit.MILLISECONDS);
+    }
+
+    // #840 is this correct? it probably makes more sense to provide a copy.
+    public Collection<Entry<Long, Long>> getCountsPerInterval() {
+        return Collections.unmodifiableCollection(eventCountsPerInterval);
+    }
+
+    /**
+     * Returns the first timestamp included in the interval that corresponds to
+     * the given timestamp.
+     * 
+     * @param timestamp
+     * 
+     * @return The timestamp in question.
+     */
+    private long computeFirstTimestampInInterval(final long timestamp) {
+        final long referenceTimePoint;
+
+        if (firstIntervalStart == -1) {
+            firstIntervalStart = timestamp;
+        }
+
+        if (intervalsBasedOn1stTstamp) {
+            referenceTimePoint = firstIntervalStart;
+        }
+        else {
+            referenceTimePoint = 0;
+        }
+
+        return referenceTimePoint
+                + (((timestamp - referenceTimePoint) / intervalSize) * intervalSize);
+    }
+
+    /**
+     * Returns the last timestamp included in the interval that corresponds to
+     * the given timestamp.
+     * 
+     * @param timestamp
+     * @return The timestamp in question.
+     */
+    private long computeLastTimestampInInterval(final long timestamp) {
+        final long referenceTimePoint;
+        if (intervalsBasedOn1stTstamp) {
+            referenceTimePoint = firstIntervalStart;
+        }
+        else {
+            referenceTimePoint = 0;
+        }
+
+        return referenceTimePoint
+                + (((((timestamp - referenceTimePoint) / intervalSize) + 1) * intervalSize) - 1);
+    }
+
+    /**
+     * @return the intervalSize
+     */
+    public long getIntervalSize() {
+        return intervalSize;
+    }
+
+    /**
+     * @return the firstTimestampInCurrentInterval -1 if no record processed so
+     *         far
+     */
+    public long getFirstTimestampInCurrentInterval() {
+        return firstTimestampInCurrentInterval;
+    }
+
+    /**
+     * @return the lastTimestampInCurrentInterval -1 if no record processed so
+     *         far
+     */
+    public long getLastTimestampInCurrentInterval() {
+        return lastTimestampInCurrentInterval;
+    }
+
+    /**
+     * @return the currentCountForCurrentInterval
+     */
+    public long getCurrentCountForCurrentInterval() {
+        return currentCountForCurrentInterval.get();
+    }
+}
diff --git a/src/explorviz/hpc_monitoring/plugin/EventRecordTraceReconstructionFilter.java b/src/explorviz/hpc_monitoring/filter/EventRecordTraceReconstructionFilter.java
similarity index 99%
rename from src/explorviz/hpc_monitoring/plugin/EventRecordTraceReconstructionFilter.java
rename to src/explorviz/hpc_monitoring/filter/EventRecordTraceReconstructionFilter.java
index 5bef4cb..0e92fab 100644
--- a/src/explorviz/hpc_monitoring/plugin/EventRecordTraceReconstructionFilter.java
+++ b/src/explorviz/hpc_monitoring/filter/EventRecordTraceReconstructionFilter.java
@@ -14,7 +14,7 @@
  * limitations under the License.
  ***************************************************************************/
 
-package explorviz.hpc_monitoring.plugin;
+package explorviz.hpc_monitoring.filter;
 
 import java.util.*;
 import java.util.Map.Entry;
diff --git a/src/explorviz/hpc_monitoring/plugin/TraceEventRecordAggregationFilter.java b/src/explorviz/hpc_monitoring/filter/TraceEventRecordAggregationFilter.java
similarity index 99%
rename from src/explorviz/hpc_monitoring/plugin/TraceEventRecordAggregationFilter.java
rename to src/explorviz/hpc_monitoring/filter/TraceEventRecordAggregationFilter.java
index 8f08808..1edd4dd 100644
--- a/src/explorviz/hpc_monitoring/plugin/TraceEventRecordAggregationFilter.java
+++ b/src/explorviz/hpc_monitoring/filter/TraceEventRecordAggregationFilter.java
@@ -1,4 +1,4 @@
-package explorviz.hpc_monitoring.plugin;
+package explorviz.hpc_monitoring.filter;
 
 import java.util.*;
 import java.util.concurrent.TimeUnit;
diff --git a/src/explorviz/worker/main/WorkerController.xtend b/src/explorviz/worker/main/WorkerController.xtend
index eb5ae81..75b7282 100644
--- a/src/explorviz/worker/main/WorkerController.xtend
+++ b/src/explorviz/worker/main/WorkerController.xtend
@@ -4,13 +4,13 @@ import kieker.analysis.AnalysisController
 import kieker.common.configuration.Configuration
 import kieker.analysis.IAnalysisController
 import kieker.analysis.plugin.filter.forward.TeeFilter
-import kieker.analysis.plugin.filter.forward.CountingThroughputFilter
 
 import kieker.analysis.plugin.reader.timer.TimeReader
-import explorviz.hpc_monitoring.plugin.EventRecordTraceReconstructionFilter
-import explorviz.hpc_monitoring.plugin.TraceEventRecordAggregationFilter
+import explorviz.hpc_monitoring.filter.EventRecordTraceReconstructionFilter
+import explorviz.hpc_monitoring.filter.TraceEventRecordAggregationFilter
 import explorviz.hpc_monitoring.connector.RabbitMQConnector
 import explorviz.hpc_monitoring.reader.TCPReader
+import explorviz.hpc_monitoring.filter.CountingThroughputFilter
 
 class WorkerController {
 
@@ -19,56 +19,54 @@ class WorkerController {
     def start() {
         analysisInstance = new AnalysisController()
 
-        val rabbitMQ = initRabbitMQ()
+        val tcpReader = initTCPReader()
         val eventTraceReconstructionFilter = initEventRecordTraceReconstructionFilter()
-//        val aggregationFilter = initAggregationFilter()
-//        val timer = initTimer()
-        val countingThroughputFilter = initCountingThroughputFilter()
-        val teeFilter = initTeeFilter()
-//        val rabbitMQConnector = initRabbitMQConnector()
+        val aggregationFilter = initAggregationFilter()
+        val timer = initTimer()
+        val tcpConnector = initTCPConnector()
 
-        analysisInstance.connect(rabbitMQ, TCPReader::OUTPUT_PORT_NAME_RECORDS, eventTraceReconstructionFilter,
+        analysisInstance.connect(tcpReader, TCPReader::OUTPUT_PORT_NAME_RECORDS, eventTraceReconstructionFilter,
             EventRecordTraceReconstructionFilter::INPUT_PORT_NAME_TRACE_RECORDS)
-            
-                    analysisInstance.connect(eventTraceReconstructionFilter,
-            EventRecordTraceReconstructionFilter::OUTPUT_PORT_NAME_TRACE_VALID, countingThroughputFilter,
-            CountingThroughputFilter::INPUT_PORT_NAME_OBJECTS)
-            
-                    analysisInstance.connect(countingThroughputFilter,
-            CountingThroughputFilter::OUTPUT_PORT_NAME_THROUGHPUT, teeFilter,
-            TeeFilter::INPUT_PORT_NAME_EVENTS)
-            
-//        analysisInstance.connect(eventTraceReconstructionFilter,
-//            EventRecordTraceReconstructionFilter::OUTPUT_PORT_NAME_TRACE_VALID, aggregationFilter,
-//            TraceEventRecordAggregationFilter::INPUT_PORT_NAME_TRACES)
-//
-//        analysisInstance.connect(eventTraceReconstructionFilter,
-//            EventRecordTraceReconstructionFilter::OUTPUT_PORT_NAME_TRACE_INVALID, rabbitMQConnector,
-//            RabbitMQConnector::INPUT_PORT_NAME_INVALID_TRACES)
-//
-//        analysisInstance.connect(timer,
-//            TimeReader::OUTPUT_PORT_NAME_TIMESTAMPS, aggregationFilter,
-//            TraceEventRecordAggregationFilter::INPUT_PORT_NAME_TIME_EVENT)
-            
-
-//        analysisInstance.connect(aggregationFilter,
-//            TraceEventRecordAggregationFilter::OUTPUT_PORT_NAME_TRACES, rabbitMQConnector,
-//            RabbitMQConnector::INPUT_PORT_NAME_VALID_TRACES)
 
+        analysisInstance.connect(eventTraceReconstructionFilter,
+            EventRecordTraceReconstructionFilter::OUTPUT_PORT_NAME_TRACE_VALID, aggregationFilter,
+            TraceEventRecordAggregationFilter::INPUT_PORT_NAME_TRACES)
+
+        analysisInstance.connect(eventTraceReconstructionFilter,
+            EventRecordTraceReconstructionFilter::OUTPUT_PORT_NAME_TRACE_INVALID, tcpConnector,
+            RabbitMQConnector::INPUT_PORT_NAME_INVALID_TRACES)
+
+        analysisInstance.connect(timer, TimeReader::OUTPUT_PORT_NAME_TIMESTAMPS, aggregationFilter,
+            TraceEventRecordAggregationFilter::INPUT_PORT_NAME_TIME_EVENT)
+        analysisInstance.connect(aggregationFilter,
+            TraceEventRecordAggregationFilter::OUTPUT_PORT_NAME_TRACES, tcpConnector,
+            RabbitMQConnector::INPUT_PORT_NAME_VALID_TRACES)
         try {
             analysisInstance.run()
         } catch (Exception e) {
             e.printStackTrace
         }
     }
+    
+    def teeOutput() {
+        //        val countingThroughputFilter = initCountingThroughputFilter()
+        //        val teeFilter = initTeeFilter()
+                //        analysisInstance.connect(eventTraceReconstructionFilter,
+        //            EventRecordTraceReconstructionFilter::OUTPUT_PORT_NAME_TRACE_VALID, countingThroughputFilter,
+        //            CountingThroughputFilter::INPUT_PORT_NAME_OBJECTS)
+        //
+        //        analysisInstance.connect(countingThroughputFilter,
+        //            CountingThroughputFilter::OUTPUT_PORT_NAME_THROUGHPUT, teeFilter,
+        //            TeeFilter::INPUT_PORT_NAME_EVENTS)
+    }
 
-    def initRabbitMQ() {
-        val rabbitConfig = new Configuration()
-        rabbitConfig.setProperty(TCPReader::CONFIG_PROPERTY_NAME_PROVIDERURL, "127.0.0.1")
-        new TCPReader(rabbitConfig, analysisInstance)
+    def initTCPReader() {
+        val config = new Configuration()
+        config.setProperty(TCPReader::CONFIG_PROPERTY_NAME_PROVIDERURL, "127.0.0.1")
+        new TCPReader(config, analysisInstance)
     }
-    
-    def initRabbitMQConnector() {
+
+    def initTCPConnector() {
         val rabbitConfig = new Configuration()
         rabbitConfig.setProperty(RabbitMQConnector::CONFIG_PROPERTY_NAME_PROVIDER, "localhost")
         rabbitConfig.setProperty(RabbitMQConnector::CONFIG_PROPERTY_NAME_QUEUE, "validTraces")
@@ -86,7 +84,7 @@ class WorkerController {
         config.setProperty(TimeReader::CONFIG_PROPERTY_VALUE_DELAY_NS, "0")
         new TimeReader(config, analysisInstance)
     }
-    
+
     def initCountingThroughputFilter() {
         val config = new Configuration()
         config.setProperty(CountingThroughputFilter::CONFIG_PROPERTY_NAME_INTERVAL_SIZE, "1000000000")
@@ -97,7 +95,7 @@ class WorkerController {
         val config = new Configuration()
         new EventRecordTraceReconstructionFilter(config, analysisInstance)
     }
-    
+
     def initTeeFilter() {
         val config = new Configuration()
         new TeeFilter(config, analysisInstance)
diff --git a/src/kieker/analysis/plugin/Bits.java b/src/kieker/analysis/plugin/Bits.java
deleted file mode 100644
index 77c943d..0000000
--- a/src/kieker/analysis/plugin/Bits.java
+++ /dev/null
@@ -1,84 +0,0 @@
-package kieker.analysis.plugin;
-
-public class Bits {
-
-    public static boolean getBoolean(byte[] b, int off) {
-        return b[off] != 0;
-    }
-
-    public static char getChar(byte[] b, int off) {
-        return (char) ((b[off + 1] & 0xFF) + (b[off] << 8));
-    }
-
-    public static short getShort(byte[] b, int off) {
-        return (short) ((b[off + 1] & 0xFF) + (b[off] << 8));
-    }
-
-    public static int getInt(byte[] b, int off) {
-        return ((b[off + 3] & 0xFF)) + ((b[off + 2] & 0xFF) << 8)
-                + ((b[off + 1] & 0xFF) << 16) + ((b[off]) << 24);
-    }
-
-    public static float getFloat(byte[] b, int off) {
-        return Float.intBitsToFloat(getInt(b, off));
-    }
-
-    public static long getLong(byte[] b, int off) {
-        return ((b[off + 7] & 0xFFL)) + ((b[off + 6] & 0xFFL) << 8)
-                + ((b[off + 5] & 0xFFL) << 16) + ((b[off + 4] & 0xFFL) << 24)
-                + ((b[off + 3] & 0xFFL) << 32) + ((b[off + 2] & 0xFFL) << 40)
-                + ((b[off + 1] & 0xFFL) << 48) + (((long) b[off]) << 56);
-    }
-
-    public static double getDouble(byte[] b, int off) {
-        return Double.longBitsToDouble(getLong(b, off));
-    }
-
-    public static byte getByte(byte[] b, int off) {
-        return b[off];
-    }
-
-    public static void putBoolean(byte[] b, int off, boolean val) {
-        b[off] = (byte) (val ? 1 : 0);
-    }
-
-    public static void putChar(byte[] b, int off, char val) {
-        b[off + 1] = (byte) (val);
-        b[off] = (byte) (val >>> 8);
-    }
-
-    public static void putShort(byte[] b, int off, short val) {
-        b[off + 1] = (byte) (val);
-        b[off] = (byte) (val >>> 8);
-    }
-
-    public static void putInt(byte[] b, int off, int val) {
-        b[off + 3] = (byte) (val);
-        b[off + 2] = (byte) (val >>> 8);
-        b[off + 1] = (byte) (val >>> 16);
-        b[off] = (byte) (val >>> 24);
-    }
-
-    public static void putFloat(byte[] b, int off, float val) {
-        putInt(b, off, Float.floatToIntBits(val));
-    }
-
-    public static void putLong(byte[] b, int off, long val) {
-        b[off + 7] = (byte) (val);
-        b[off + 6] = (byte) (val >>> 8);
-        b[off + 5] = (byte) (val >>> 16);
-        b[off + 4] = (byte) (val >>> 24);
-        b[off + 3] = (byte) (val >>> 32);
-        b[off + 2] = (byte) (val >>> 40);
-        b[off + 1] = (byte) (val >>> 48);
-        b[off] = (byte) (val >>> 56);
-    }
-
-    public static void putDouble(byte[] b, int off, double val) {
-        putLong(b, off, Double.doubleToLongBits(val));
-    }
-
-    public static void putByte(byte[] b, int off, byte val) {
-        b[off] = val;
-    }
-}
diff --git a/src/kieker/analysis/plugin/filter/forward/CountingThroughputFilter.java b/src/kieker/analysis/plugin/filter/forward/CountingThroughputFilter.java
deleted file mode 100644
index 5afb365..0000000
--- a/src/kieker/analysis/plugin/filter/forward/CountingThroughputFilter.java
+++ /dev/null
@@ -1,308 +0,0 @@
-/***************************************************************************
- * Copyright 2013 Kieker Project (http://kieker-monitoring.net)
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ***************************************************************************/
-
-package kieker.analysis.plugin.filter.forward;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map.Entry;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import kieker.analysis.IProjectContext;
-import kieker.analysis.plugin.annotation.InputPort;
-import kieker.analysis.plugin.annotation.OutputPort;
-import kieker.analysis.plugin.annotation.Plugin;
-import kieker.analysis.plugin.annotation.Property;
-import kieker.analysis.plugin.filter.AbstractFilterPlugin;
-import kieker.common.configuration.Configuration;
-import kieker.common.logging.Log;
-import kieker.common.logging.LogFactory;
-import kieker.common.record.IMonitoringRecord;
-//import kieker.common.util.ImmutableEntry;
-
-/**
- * An instance of this class computes the throughput in terms of the number of events received per time unit.
- * 
- * Note that only one of the input ports should be used in a configuration!
- * 
- * @author Andre van Hoorn, Jan Waller
- * 
- * @since 1.6
- */
-@Plugin(
-		description = "A filter computing the throughput in terms of the number of events received per time unit",
-		outputPorts = {
-			@OutputPort(name = CountingThroughputFilter.OUTPUT_PORT_NAME_RELAYED_OBJECTS, eventTypes = { Object.class },
-					description = "Provides each incoming object"),
-			@OutputPort(name = CountingThroughputFilter.OUTPUT_PORT_NAME_THROUGHPUT, eventTypes = { Object.class },
-			description = "Provides throughput per interval")
-		},
-		configuration = {
-			@Property(name = CountingThroughputFilter.CONFIG_PROPERTY_NAME_TIMEUNIT,
-					defaultValue = CountingThroughputFilter.CONFIG_PROPERTY_VALUE_TIMEUNIT),
-			@Property(name = CountingThroughputFilter.CONFIG_PROPERTY_NAME_INTERVAL_SIZE,
-					defaultValue = CountingThroughputFilter.CONFIG_PROPERTY_VALUE_INTERVAL_SIZE_ONE_MINUTE),
-			@Property(name = CountingThroughputFilter.CONFIG_PROPERTY_NAME_INTERVALS_BASED_ON_1ST_TSTAMP,
-					defaultValue = "true")
-		})
-public final class CountingThroughputFilter extends AbstractFilterPlugin {
-
-	/**
-	 * The name of the input port receiving the records.
-	 */
-	public static final String INPUT_PORT_NAME_RECORDS = "inputRecords";
-	/**
-	 * The name of the input port receiving other objects.
-	 */
-	public static final String INPUT_PORT_NAME_OBJECTS = "inputObjects";
-
-	/**
-	 * The name of the output port delivering the received objects.
-	 */
-	public static final String OUTPUT_PORT_NAME_RELAYED_OBJECTS = "relayedEvents";
-	public static final String OUTPUT_PORT_NAME_THROUGHPUT = "throughputPerInterval";
-
-	/** The name of the property determining the time unit. */
-	public static final String CONFIG_PROPERTY_NAME_TIMEUNIT = "timeunit";
-	/** The name of the property determining the interval size. */
-	public static final String CONFIG_PROPERTY_NAME_INTERVAL_SIZE = "intervalSize";
-
-	/**
-	 * The default value of the time unit property (nanoseconds).
-	 */
-	public static final String CONFIG_PROPERTY_VALUE_TIMEUNIT = "NANOSECONDS"; // TimeUnit.NANOSECONDS.name()
-
-	/**
-	 * If the value is set to false, the intervals are computed based on time since 1970-1-1.
-	 */
-	public static final String CONFIG_PROPERTY_NAME_INTERVALS_BASED_ON_1ST_TSTAMP = "intervalsBasedOn1stTstamp";
-
-	/**
-	 * The configuration property value for {@link #CONFIG_PROPERTY_NAME_INTERVAL_SIZE}, leading to a bin size of 1 minute.
-	 */
-	public static final String CONFIG_PROPERTY_VALUE_INTERVAL_SIZE_ONE_MINUTE = "60000000000";
-
-	private static final Log LOG = LogFactory.getLog(CountingThroughputFilter.class);
-
-	private volatile long firstIntervalStart = -1;
-	private final boolean intervalsBasedOn1stTstamp;
-	private final TimeUnit timeunit;
-
-	/**
-	 * For a key <i>k</i>, the {@link Queue} stores the number of events observed in the time interval <i>(k-intervalSize,k(</i>, i.e.,
-	 * the interval <b>excludes</b> the value <i>k</i>.
-	 */
-	private final Queue<Entry<Long, Long>> eventCountsPerInterval = new ConcurrentLinkedQueue<Entry<Long, Long>>();
-
-	private final long intervalSize;
-
-	private final AtomicLong currentCountForCurrentInterval = new AtomicLong(0);
-
-	private volatile long firstTimestampInCurrentInterval = -1; // initialized with the first incoming event
-	private volatile long lastTimestampInCurrentInterval = -1; // initialized with the first incoming event
-
-	/**
-	 * Creates a new instance of this class using the given parameters.
-	 * 
-	 * @param configuration
-	 *            The configuration for this component.
-	 * @param projectContext
-	 *            The project context for this component.
-	 */
-	public CountingThroughputFilter(final Configuration configuration, final IProjectContext projectContext) {
-		super(configuration, projectContext);
-
-		final String recordTimeunitProperty = projectContext.getProperty(IProjectContext.CONFIG_PROPERTY_NAME_RECORDS_TIME_UNIT);
-		TimeUnit recordTimeunit;
-		try {
-			recordTimeunit = TimeUnit.valueOf(recordTimeunitProperty);
-		} catch (final IllegalArgumentException ex) { // already caught in AnalysisController, should never happen
-			LOG.warn(recordTimeunitProperty + " is no valid TimeUnit! Using NANOSECONDS instead.");
-			recordTimeunit = TimeUnit.NANOSECONDS;
-		}
-		this.timeunit = recordTimeunit;
-
-		final String configTimeunitProperty = configuration.getStringProperty(CONFIG_PROPERTY_NAME_TIMEUNIT);
-		TimeUnit configTimeunit;
-		try {
-			configTimeunit = TimeUnit.valueOf(configTimeunitProperty);
-		} catch (final IllegalArgumentException ex) {
-			LOG.warn(configTimeunitProperty + " is no valid TimeUnit! Using inherited value of " + this.timeunit.name() + " instead.");
-			configTimeunit = this.timeunit;
-		}
-
-		this.intervalSize = this.timeunit.convert(configuration.getLongProperty(CONFIG_PROPERTY_NAME_INTERVAL_SIZE), configTimeunit);
-		this.intervalsBasedOn1stTstamp = configuration.getBooleanProperty(CONFIG_PROPERTY_NAME_INTERVALS_BASED_ON_1ST_TSTAMP);
-	}
-
-	/**
-	 * {@inheritDoc}
-	 */
-	@Override
-	public final Configuration getCurrentConfiguration() {
-		final Configuration configuration = new Configuration();
-		configuration.setProperty(CONFIG_PROPERTY_NAME_TIMEUNIT, this.timeunit.name());
-		configuration.setProperty(CONFIG_PROPERTY_NAME_INTERVAL_SIZE, Long.toString(this.intervalSize));
-		configuration.setProperty(CONFIG_PROPERTY_NAME_INTERVALS_BASED_ON_1ST_TSTAMP, Boolean.toString(this.intervalsBasedOn1stTstamp));
-		return configuration;
-	}
-
-	private void processEvent(final Object event, final long currentTime) {
-		final long startOfTimestampsInterval = this.computeFirstTimestampInInterval(currentTime);
-		final long endOfTimestampsInterval = this.computeLastTimestampInInterval(currentTime);
-
-		synchronized (this) {
-			// Check if we need to close the current interval.
-			if (endOfTimestampsInterval > this.lastTimestampInCurrentInterval) {
-				if (this.firstTimestampInCurrentInterval >= 0) { // don't do this for the first record (only used for initialization of variables)
-					long currentCount = this.currentCountForCurrentInterval.get();
-//					this.eventCountsPerInterval.add(
-//							new ImmutableEntry<Long, Long>(
-//									this.lastTimestampInCurrentInterval + 1,
-//									currentCount));
-					super.deliver(OUTPUT_PORT_NAME_THROUGHPUT, currentCount);
-
-//					long numIntervalsElapsed = 1; // refined below
-//					numIntervalsElapsed = (endOfTimestampsInterval - this.lastTimestampInCurrentInterval) / this.intervalSize;
-//					if (numIntervalsElapsed > 1) { // NOPMD (AvoidDeeplyNestedIfStmts)
-//						for (int i = 1; i < numIntervalsElapsed; i++) {
-//							this.eventCountsPerInterval.add(
-//									new ImmutableEntry<Long, Long>((this.lastTimestampInCurrentInterval + (i * this.intervalSize)) + 1, 0L));
-//						}
-//					}
-
-				}
-
-				this.firstTimestampInCurrentInterval = startOfTimestampsInterval;
-				this.lastTimestampInCurrentInterval = endOfTimestampsInterval;
-				this.currentCountForCurrentInterval.set(0);
-			}
-
-			this.currentCountForCurrentInterval.incrementAndGet(); // only incremented in synchronized blocks
-		}
-		super.deliver(OUTPUT_PORT_NAME_RELAYED_OBJECTS, event);
-	}
-
-	/**
-	 * This method represents the input port for incoming records.
-	 * 
-	 * @param record
-	 *            The next record.
-	 */
-	// #841 What happens with unordered events (i.e., timestamps before firstTimestampInCurrentInterval)?
-	@InputPort(name = INPUT_PORT_NAME_RECORDS, eventTypes = { IMonitoringRecord.class },
-			description = "Receives incoming monitoring records to be considered for the throughput computation and uses the record's logging timestamp")
-	public final void inputRecord(final IMonitoringRecord record) {
-		this.processEvent(record, record.getLoggingTimestamp());
-	}
-
-	/**
-	 * This method represents the input port for incoming object.
-	 * 
-	 * @param object
-	 *            The next object.
-	 */
-	@InputPort(name = INPUT_PORT_NAME_OBJECTS, eventTypes = { Object.class },
-			description = "Receives incoming objects to be considered for the throughput computation and uses the current system time")
-	public final void inputObjects(final Object object) {
-		this.processEvent(object, this.currentTime());
-	}
-
-	/**
-	 * Returns the current time in {@link TimeUnit#MILLISECONDS} since 1970.
-	 * 
-	 * @return The current time
-	 */
-	private long currentTime() {
-		return this.timeunit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
-	}
-
-	// #840 is this correct? it probably makes more sense to provide a copy.
-	public Collection<Entry<Long, Long>> getCountsPerInterval() {
-		return Collections.unmodifiableCollection(this.eventCountsPerInterval);
-	}
-
-	/**
-	 * Returns the first timestamp included in the interval that corresponds to the given timestamp.
-	 * 
-	 * @param timestamp
-	 * 
-	 * @return The timestamp in question.
-	 */
-	private long computeFirstTimestampInInterval(final long timestamp) {
-		final long referenceTimePoint;
-
-		if (this.firstIntervalStart == -1) {
-			this.firstIntervalStart = timestamp;
-		}
-
-		if (this.intervalsBasedOn1stTstamp) {
-			referenceTimePoint = this.firstIntervalStart;
-		} else {
-			referenceTimePoint = 0;
-		}
-
-		return referenceTimePoint + (((timestamp - referenceTimePoint) / this.intervalSize) * this.intervalSize);
-	}
-
-	/**
-	 * Returns the last timestamp included in the interval that corresponds to the given timestamp.
-	 * 
-	 * @param timestamp
-	 * @return The timestamp in question.
-	 */
-	private long computeLastTimestampInInterval(final long timestamp) {
-		final long referenceTimePoint;
-		if (this.intervalsBasedOn1stTstamp) {
-			referenceTimePoint = this.firstIntervalStart;
-		} else {
-			referenceTimePoint = 0;
-		}
-
-		return referenceTimePoint + (((((timestamp - referenceTimePoint) / this.intervalSize) + 1) * this.intervalSize) - 1);
-	}
-
-	/**
-	 * @return the intervalSize
-	 */
-	public long getIntervalSize() {
-		return this.intervalSize;
-	}
-
-	/**
-	 * @return the firstTimestampInCurrentInterval -1 if no record processed so far
-	 */
-	public long getFirstTimestampInCurrentInterval() {
-		return this.firstTimestampInCurrentInterval;
-	}
-
-	/**
-	 * @return the lastTimestampInCurrentInterval -1 if no record processed so far
-	 */
-	public long getLastTimestampInCurrentInterval() {
-		return this.lastTimestampInCurrentInterval;
-	}
-
-	/**
-	 * @return the currentCountForCurrentInterval
-	 */
-	public long getCurrentCountForCurrentInterval() {
-		return this.currentCountForCurrentInterval.get();
-	}
-}
diff --git a/src/kieker/analysis/plugin/reader/mq/RabbitMQReader.java b/src/kieker/analysis/plugin/reader/mq/RabbitMQReader.java
index a4d0323..4767e82 100644
--- a/src/kieker/analysis/plugin/reader/mq/RabbitMQReader.java
+++ b/src/kieker/analysis/plugin/reader/mq/RabbitMQReader.java
@@ -21,7 +21,6 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import kieker.analysis.IProjectContext;
-import kieker.analysis.plugin.Bits;
 import kieker.analysis.plugin.annotation.*;
 import kieker.analysis.plugin.reader.AbstractReaderPlugin;
 import kieker.common.configuration.Configuration;
@@ -31,6 +30,7 @@ import kieker.common.logging.LogFactory;
 import kieker.common.record.AbstractMonitoringRecord;
 import kieker.common.record.IMonitoringRecord;
 import com.rabbitmq.client.*;
+import explorviz.hpc_monitoring.Bits;
 
 /**
  * Reads monitoring records from the queue of an established RabbitMQ
-- 
GitLab