diff --git a/.classpath b/.classpath index f0fe3356694031424a2c4dfdf88b0a0bc1d92f3b..578cc5c454a7cdd761ed273246aa961539db47ff 100644 --- a/.classpath +++ b/.classpath @@ -2,7 +2,7 @@ <classpath> <classpathentry kind="con" path="org.eclipse.xtend.XTEND_CONTAINER"/> <classpathentry kind="src" path="xtend-gen"/> - <classpathentry kind="src" path="src"/> + <classpathentry excluding="kieker/analysis/plugin/reader/mq/" kind="src" path="src"/> <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/> <classpathentry kind="lib" path="lib/rabbitmq-client.jar"/> <classpathentry kind="lib" path="lib/kieker-1.8-SNAPSHOT.jar"/> diff --git a/src/explorviz/hpc_monitoring/connector/RabbitMQConnector.java b/src/explorviz/hpc_monitoring/connector/RabbitMQConnector.java index 2f1f274153288c53b911caf282d3c9423a315514..f46f6703e4767cc19e5e5ae9c75218367ca3a4e2 100644 --- a/src/explorviz/hpc_monitoring/connector/RabbitMQConnector.java +++ b/src/explorviz/hpc_monitoring/connector/RabbitMQConnector.java @@ -20,7 +20,6 @@ import java.io.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import kieker.analysis.IProjectContext; -import kieker.analysis.plugin.Bits; import kieker.analysis.plugin.annotation.*; import kieker.analysis.plugin.filter.AbstractFilterPlugin; import kieker.common.configuration.Configuration; @@ -28,6 +27,7 @@ import kieker.common.logging.Log; import kieker.common.logging.LogFactory; import kieker.common.record.IMonitoringRecord; import com.rabbitmq.client.*; +import explorviz.hpc_monitoring.Bits; import explorviz.hpc_monitoring.StringRegistryRecord; /** diff --git a/src/explorviz/hpc_monitoring/filter/CountingThroughputFilter.java b/src/explorviz/hpc_monitoring/filter/CountingThroughputFilter.java new file mode 100644 index 0000000000000000000000000000000000000000..a9eca6ea0af8a72cd192c91bad90a9491681d423 --- /dev/null +++ b/src/explorviz/hpc_monitoring/filter/CountingThroughputFilter.java @@ -0,0 +1,346 @@ +/*************************************************************************** + * Copyright 2013 Kieker Project (http://kieker-monitoring.net) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ***************************************************************************/ + +package explorviz.hpc_monitoring.filter; + +import java.util.*; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import kieker.analysis.IProjectContext; +import kieker.analysis.plugin.annotation.*; +import kieker.analysis.plugin.filter.AbstractFilterPlugin; +import kieker.common.configuration.Configuration; +import kieker.common.logging.Log; +import kieker.common.logging.LogFactory; +import kieker.common.record.IMonitoringRecord; + +// import kieker.common.util.ImmutableEntry; + +/** + * An instance of this class computes the throughput in terms of the number of + * events received per time unit. + * + * Note that only one of the input ports should be used in a configuration! + * + * @author Andre van Hoorn, Jan Waller + * + * @since 1.6 + */ +@Plugin(description = "A filter computing the throughput in terms of the number of events received per time unit", outputPorts = { + @OutputPort(name = CountingThroughputFilter.OUTPUT_PORT_NAME_RELAYED_OBJECTS, eventTypes = { Object.class }, description = "Provides each incoming object"), + @OutputPort(name = CountingThroughputFilter.OUTPUT_PORT_NAME_THROUGHPUT, eventTypes = { Object.class }, description = "Provides throughput per interval") }, configuration = { + @Property(name = CountingThroughputFilter.CONFIG_PROPERTY_NAME_TIMEUNIT, defaultValue = CountingThroughputFilter.CONFIG_PROPERTY_VALUE_TIMEUNIT), + @Property(name = CountingThroughputFilter.CONFIG_PROPERTY_NAME_INTERVAL_SIZE, defaultValue = CountingThroughputFilter.CONFIG_PROPERTY_VALUE_INTERVAL_SIZE_ONE_MINUTE), + @Property(name = CountingThroughputFilter.CONFIG_PROPERTY_NAME_INTERVALS_BASED_ON_1ST_TSTAMP, defaultValue = "true") }) +public final class CountingThroughputFilter extends AbstractFilterPlugin { + + /** + * The name of the input port receiving the records. + */ + public static final String INPUT_PORT_NAME_RECORDS = "inputRecords"; + /** + * The name of the input port receiving other objects. + */ + public static final String INPUT_PORT_NAME_OBJECTS = "inputObjects"; + + /** + * The name of the output port delivering the received objects. + */ + public static final String OUTPUT_PORT_NAME_RELAYED_OBJECTS = "relayedEvents"; + public static final String OUTPUT_PORT_NAME_THROUGHPUT = "throughputPerInterval"; + + /** The name of the property determining the time unit. */ + public static final String CONFIG_PROPERTY_NAME_TIMEUNIT = "timeunit"; + /** The name of the property determining the interval size. */ + public static final String CONFIG_PROPERTY_NAME_INTERVAL_SIZE = "intervalSize"; + + /** + * The default value of the time unit property (nanoseconds). + */ + public static final String CONFIG_PROPERTY_VALUE_TIMEUNIT = "NANOSECONDS"; // TimeUnit.NANOSECONDS.name() + + /** + * If the value is set to false, the intervals are computed based on time + * since 1970-1-1. + */ + public static final String CONFIG_PROPERTY_NAME_INTERVALS_BASED_ON_1ST_TSTAMP = "intervalsBasedOn1stTstamp"; + + /** + * The configuration property value for + * {@link #CONFIG_PROPERTY_NAME_INTERVAL_SIZE}, leading to a bin size of 1 + * minute. + */ + public static final String CONFIG_PROPERTY_VALUE_INTERVAL_SIZE_ONE_MINUTE = "60000000000"; + + private static final Log LOG = LogFactory + .getLog(CountingThroughputFilter.class); + + private volatile long firstIntervalStart = -1; + private final boolean intervalsBasedOn1stTstamp; + private final TimeUnit timeunit; + + /** + * For a key <i>k</i>, the {@link Queue} stores the number of events + * observed in the time interval <i>(k-intervalSize,k(</i>, i.e., + * the interval <b>excludes</b> the value <i>k</i>. + */ + private final Queue<Entry<Long, Long>> eventCountsPerInterval = new ConcurrentLinkedQueue<Entry<Long, Long>>(); + + private final long intervalSize; + + private final AtomicLong currentCountForCurrentInterval = new AtomicLong( + 0); + + private volatile long firstTimestampInCurrentInterval = -1; // initialized + // with + // the + // first + // incoming + // event + private volatile long lastTimestampInCurrentInterval = -1; // initialized + // with + // the + // first + // incoming + // event + + /** + * Creates a new instance of this class using the given parameters. + * + * @param configuration + * The configuration for this component. + * @param projectContext + * The project context for this component. + */ + public CountingThroughputFilter(final Configuration configuration, + final IProjectContext projectContext) { + super(configuration, projectContext); + + final String recordTimeunitProperty = projectContext + .getProperty(IProjectContext.CONFIG_PROPERTY_NAME_RECORDS_TIME_UNIT); + TimeUnit recordTimeunit; + try { + recordTimeunit = TimeUnit.valueOf(recordTimeunitProperty); + } + catch (final IllegalArgumentException ex) { // already caught in + // AnalysisController, + // should never happen + LOG.warn(recordTimeunitProperty + + " is no valid TimeUnit! Using NANOSECONDS instead."); + recordTimeunit = TimeUnit.NANOSECONDS; + } + timeunit = recordTimeunit; + + final String configTimeunitProperty = configuration + .getStringProperty(CONFIG_PROPERTY_NAME_TIMEUNIT); + TimeUnit configTimeunit; + try { + configTimeunit = TimeUnit.valueOf(configTimeunitProperty); + } + catch (final IllegalArgumentException ex) { + LOG.warn(configTimeunitProperty + + " is no valid TimeUnit! Using inherited value of " + + timeunit.name() + " instead."); + configTimeunit = timeunit; + } + + intervalSize = timeunit.convert(configuration + .getLongProperty(CONFIG_PROPERTY_NAME_INTERVAL_SIZE), + configTimeunit); + intervalsBasedOn1stTstamp = configuration + .getBooleanProperty(CONFIG_PROPERTY_NAME_INTERVALS_BASED_ON_1ST_TSTAMP); + } + + /** + * {@inheritDoc} + */ + @Override + public final Configuration getCurrentConfiguration() { + final Configuration configuration = new Configuration(); + configuration.setProperty(CONFIG_PROPERTY_NAME_TIMEUNIT, + timeunit.name()); + configuration.setProperty(CONFIG_PROPERTY_NAME_INTERVAL_SIZE, + Long.toString(intervalSize)); + configuration.setProperty( + CONFIG_PROPERTY_NAME_INTERVALS_BASED_ON_1ST_TSTAMP, + Boolean.toString(intervalsBasedOn1stTstamp)); + return configuration; + } + + private void processEvent(final Object event, final long currentTime) { + final long startOfTimestampsInterval = computeFirstTimestampInInterval(currentTime); + final long endOfTimestampsInterval = computeLastTimestampInInterval(currentTime); + + synchronized (this) { + // Check if we need to close the current interval. + if (endOfTimestampsInterval > lastTimestampInCurrentInterval) { + if (firstTimestampInCurrentInterval >= 0) { // don't do this for + // the first record + // (only used for + // initialization of + // variables) + long currentCount = currentCountForCurrentInterval.get(); + // this.eventCountsPerInterval.add( + // new ImmutableEntry<Long, Long>( + // this.lastTimestampInCurrentInterval + 1, + // currentCount)); + super.deliver(OUTPUT_PORT_NAME_THROUGHPUT, currentCount); + + // long numIntervalsElapsed = 1; // refined below + // numIntervalsElapsed = (endOfTimestampsInterval - + // this.lastTimestampInCurrentInterval) / this.intervalSize; + // if (numIntervalsElapsed > 1) { // NOPMD + // (AvoidDeeplyNestedIfStmts) + // for (int i = 1; i < numIntervalsElapsed; i++) { + // this.eventCountsPerInterval.add( + // new ImmutableEntry<Long, + // Long>((this.lastTimestampInCurrentInterval + (i * + // this.intervalSize)) + 1, 0L)); + // } + // } + + } + + firstTimestampInCurrentInterval = startOfTimestampsInterval; + lastTimestampInCurrentInterval = endOfTimestampsInterval; + currentCountForCurrentInterval.set(0); + } + + currentCountForCurrentInterval.incrementAndGet(); // only + // incremented in + // synchronized + // blocks + } + super.deliver(OUTPUT_PORT_NAME_RELAYED_OBJECTS, event); + } + + /** + * This method represents the input port for incoming records. + * + * @param record + * The next record. + */ + // #841 What happens with unordered events (i.e., timestamps before + // firstTimestampInCurrentInterval)? + @InputPort(name = INPUT_PORT_NAME_RECORDS, eventTypes = { IMonitoringRecord.class }, description = "Receives incoming monitoring records to be considered for the throughput computation and uses the record's logging timestamp") + public final void inputRecord(final IMonitoringRecord record) { + processEvent(record, record.getLoggingTimestamp()); + } + + /** + * This method represents the input port for incoming object. + * + * @param object + * The next object. + */ + @InputPort(name = INPUT_PORT_NAME_OBJECTS, eventTypes = { Object.class }, description = "Receives incoming objects to be considered for the throughput computation and uses the current system time") + public final void inputObjects(final Object object) { + processEvent(object, currentTime()); + } + + /** + * Returns the current time in {@link TimeUnit#MILLISECONDS} since 1970. + * + * @return The current time + */ + private long currentTime() { + return timeunit.convert(System.currentTimeMillis(), + TimeUnit.MILLISECONDS); + } + + // #840 is this correct? it probably makes more sense to provide a copy. + public Collection<Entry<Long, Long>> getCountsPerInterval() { + return Collections.unmodifiableCollection(eventCountsPerInterval); + } + + /** + * Returns the first timestamp included in the interval that corresponds to + * the given timestamp. + * + * @param timestamp + * + * @return The timestamp in question. + */ + private long computeFirstTimestampInInterval(final long timestamp) { + final long referenceTimePoint; + + if (firstIntervalStart == -1) { + firstIntervalStart = timestamp; + } + + if (intervalsBasedOn1stTstamp) { + referenceTimePoint = firstIntervalStart; + } + else { + referenceTimePoint = 0; + } + + return referenceTimePoint + + (((timestamp - referenceTimePoint) / intervalSize) * intervalSize); + } + + /** + * Returns the last timestamp included in the interval that corresponds to + * the given timestamp. + * + * @param timestamp + * @return The timestamp in question. + */ + private long computeLastTimestampInInterval(final long timestamp) { + final long referenceTimePoint; + if (intervalsBasedOn1stTstamp) { + referenceTimePoint = firstIntervalStart; + } + else { + referenceTimePoint = 0; + } + + return referenceTimePoint + + (((((timestamp - referenceTimePoint) / intervalSize) + 1) * intervalSize) - 1); + } + + /** + * @return the intervalSize + */ + public long getIntervalSize() { + return intervalSize; + } + + /** + * @return the firstTimestampInCurrentInterval -1 if no record processed so + * far + */ + public long getFirstTimestampInCurrentInterval() { + return firstTimestampInCurrentInterval; + } + + /** + * @return the lastTimestampInCurrentInterval -1 if no record processed so + * far + */ + public long getLastTimestampInCurrentInterval() { + return lastTimestampInCurrentInterval; + } + + /** + * @return the currentCountForCurrentInterval + */ + public long getCurrentCountForCurrentInterval() { + return currentCountForCurrentInterval.get(); + } +} diff --git a/src/explorviz/hpc_monitoring/plugin/EventRecordTraceReconstructionFilter.java b/src/explorviz/hpc_monitoring/filter/EventRecordTraceReconstructionFilter.java similarity index 99% rename from src/explorviz/hpc_monitoring/plugin/EventRecordTraceReconstructionFilter.java rename to src/explorviz/hpc_monitoring/filter/EventRecordTraceReconstructionFilter.java index 5bef4cb5776a595f44a29296acff6c3d021f2590..0e92fab443663d624d0cee51de35ba92aec9dff9 100644 --- a/src/explorviz/hpc_monitoring/plugin/EventRecordTraceReconstructionFilter.java +++ b/src/explorviz/hpc_monitoring/filter/EventRecordTraceReconstructionFilter.java @@ -14,7 +14,7 @@ * limitations under the License. ***************************************************************************/ -package explorviz.hpc_monitoring.plugin; +package explorviz.hpc_monitoring.filter; import java.util.*; import java.util.Map.Entry; diff --git a/src/explorviz/hpc_monitoring/plugin/TraceEventRecordAggregationFilter.java b/src/explorviz/hpc_monitoring/filter/TraceEventRecordAggregationFilter.java similarity index 99% rename from src/explorviz/hpc_monitoring/plugin/TraceEventRecordAggregationFilter.java rename to src/explorviz/hpc_monitoring/filter/TraceEventRecordAggregationFilter.java index 8f088086c2a951b648895ef16d52e40e3f1e3e6c..1edd4dd66153bb25f83d39d2894dacf11e090290 100644 --- a/src/explorviz/hpc_monitoring/plugin/TraceEventRecordAggregationFilter.java +++ b/src/explorviz/hpc_monitoring/filter/TraceEventRecordAggregationFilter.java @@ -1,4 +1,4 @@ -package explorviz.hpc_monitoring.plugin; +package explorviz.hpc_monitoring.filter; import java.util.*; import java.util.concurrent.TimeUnit; diff --git a/src/explorviz/worker/main/WorkerController.xtend b/src/explorviz/worker/main/WorkerController.xtend index eb5ae8185441d2efb7864ed34dbad15df9372f1c..75b72823c60bb4bb2d11e37d58e1a42b8ba96abe 100644 --- a/src/explorviz/worker/main/WorkerController.xtend +++ b/src/explorviz/worker/main/WorkerController.xtend @@ -4,13 +4,13 @@ import kieker.analysis.AnalysisController import kieker.common.configuration.Configuration import kieker.analysis.IAnalysisController import kieker.analysis.plugin.filter.forward.TeeFilter -import kieker.analysis.plugin.filter.forward.CountingThroughputFilter import kieker.analysis.plugin.reader.timer.TimeReader -import explorviz.hpc_monitoring.plugin.EventRecordTraceReconstructionFilter -import explorviz.hpc_monitoring.plugin.TraceEventRecordAggregationFilter +import explorviz.hpc_monitoring.filter.EventRecordTraceReconstructionFilter +import explorviz.hpc_monitoring.filter.TraceEventRecordAggregationFilter import explorviz.hpc_monitoring.connector.RabbitMQConnector import explorviz.hpc_monitoring.reader.TCPReader +import explorviz.hpc_monitoring.filter.CountingThroughputFilter class WorkerController { @@ -19,56 +19,54 @@ class WorkerController { def start() { analysisInstance = new AnalysisController() - val rabbitMQ = initRabbitMQ() + val tcpReader = initTCPReader() val eventTraceReconstructionFilter = initEventRecordTraceReconstructionFilter() -// val aggregationFilter = initAggregationFilter() -// val timer = initTimer() - val countingThroughputFilter = initCountingThroughputFilter() - val teeFilter = initTeeFilter() -// val rabbitMQConnector = initRabbitMQConnector() + val aggregationFilter = initAggregationFilter() + val timer = initTimer() + val tcpConnector = initTCPConnector() - analysisInstance.connect(rabbitMQ, TCPReader::OUTPUT_PORT_NAME_RECORDS, eventTraceReconstructionFilter, + analysisInstance.connect(tcpReader, TCPReader::OUTPUT_PORT_NAME_RECORDS, eventTraceReconstructionFilter, EventRecordTraceReconstructionFilter::INPUT_PORT_NAME_TRACE_RECORDS) - - analysisInstance.connect(eventTraceReconstructionFilter, - EventRecordTraceReconstructionFilter::OUTPUT_PORT_NAME_TRACE_VALID, countingThroughputFilter, - CountingThroughputFilter::INPUT_PORT_NAME_OBJECTS) - - analysisInstance.connect(countingThroughputFilter, - CountingThroughputFilter::OUTPUT_PORT_NAME_THROUGHPUT, teeFilter, - TeeFilter::INPUT_PORT_NAME_EVENTS) - -// analysisInstance.connect(eventTraceReconstructionFilter, -// EventRecordTraceReconstructionFilter::OUTPUT_PORT_NAME_TRACE_VALID, aggregationFilter, -// TraceEventRecordAggregationFilter::INPUT_PORT_NAME_TRACES) -// -// analysisInstance.connect(eventTraceReconstructionFilter, -// EventRecordTraceReconstructionFilter::OUTPUT_PORT_NAME_TRACE_INVALID, rabbitMQConnector, -// RabbitMQConnector::INPUT_PORT_NAME_INVALID_TRACES) -// -// analysisInstance.connect(timer, -// TimeReader::OUTPUT_PORT_NAME_TIMESTAMPS, aggregationFilter, -// TraceEventRecordAggregationFilter::INPUT_PORT_NAME_TIME_EVENT) - - -// analysisInstance.connect(aggregationFilter, -// TraceEventRecordAggregationFilter::OUTPUT_PORT_NAME_TRACES, rabbitMQConnector, -// RabbitMQConnector::INPUT_PORT_NAME_VALID_TRACES) + analysisInstance.connect(eventTraceReconstructionFilter, + EventRecordTraceReconstructionFilter::OUTPUT_PORT_NAME_TRACE_VALID, aggregationFilter, + TraceEventRecordAggregationFilter::INPUT_PORT_NAME_TRACES) + + analysisInstance.connect(eventTraceReconstructionFilter, + EventRecordTraceReconstructionFilter::OUTPUT_PORT_NAME_TRACE_INVALID, tcpConnector, + RabbitMQConnector::INPUT_PORT_NAME_INVALID_TRACES) + + analysisInstance.connect(timer, TimeReader::OUTPUT_PORT_NAME_TIMESTAMPS, aggregationFilter, + TraceEventRecordAggregationFilter::INPUT_PORT_NAME_TIME_EVENT) + analysisInstance.connect(aggregationFilter, + TraceEventRecordAggregationFilter::OUTPUT_PORT_NAME_TRACES, tcpConnector, + RabbitMQConnector::INPUT_PORT_NAME_VALID_TRACES) try { analysisInstance.run() } catch (Exception e) { e.printStackTrace } } + + def teeOutput() { + // val countingThroughputFilter = initCountingThroughputFilter() + // val teeFilter = initTeeFilter() + // analysisInstance.connect(eventTraceReconstructionFilter, + // EventRecordTraceReconstructionFilter::OUTPUT_PORT_NAME_TRACE_VALID, countingThroughputFilter, + // CountingThroughputFilter::INPUT_PORT_NAME_OBJECTS) + // + // analysisInstance.connect(countingThroughputFilter, + // CountingThroughputFilter::OUTPUT_PORT_NAME_THROUGHPUT, teeFilter, + // TeeFilter::INPUT_PORT_NAME_EVENTS) + } - def initRabbitMQ() { - val rabbitConfig = new Configuration() - rabbitConfig.setProperty(TCPReader::CONFIG_PROPERTY_NAME_PROVIDERURL, "127.0.0.1") - new TCPReader(rabbitConfig, analysisInstance) + def initTCPReader() { + val config = new Configuration() + config.setProperty(TCPReader::CONFIG_PROPERTY_NAME_PROVIDERURL, "127.0.0.1") + new TCPReader(config, analysisInstance) } - - def initRabbitMQConnector() { + + def initTCPConnector() { val rabbitConfig = new Configuration() rabbitConfig.setProperty(RabbitMQConnector::CONFIG_PROPERTY_NAME_PROVIDER, "localhost") rabbitConfig.setProperty(RabbitMQConnector::CONFIG_PROPERTY_NAME_QUEUE, "validTraces") @@ -86,7 +84,7 @@ class WorkerController { config.setProperty(TimeReader::CONFIG_PROPERTY_VALUE_DELAY_NS, "0") new TimeReader(config, analysisInstance) } - + def initCountingThroughputFilter() { val config = new Configuration() config.setProperty(CountingThroughputFilter::CONFIG_PROPERTY_NAME_INTERVAL_SIZE, "1000000000") @@ -97,7 +95,7 @@ class WorkerController { val config = new Configuration() new EventRecordTraceReconstructionFilter(config, analysisInstance) } - + def initTeeFilter() { val config = new Configuration() new TeeFilter(config, analysisInstance) diff --git a/src/kieker/analysis/plugin/Bits.java b/src/kieker/analysis/plugin/Bits.java deleted file mode 100644 index 77c943dbc82d4cc4f6a876b25f1296a4b0fbdbfb..0000000000000000000000000000000000000000 --- a/src/kieker/analysis/plugin/Bits.java +++ /dev/null @@ -1,84 +0,0 @@ -package kieker.analysis.plugin; - -public class Bits { - - public static boolean getBoolean(byte[] b, int off) { - return b[off] != 0; - } - - public static char getChar(byte[] b, int off) { - return (char) ((b[off + 1] & 0xFF) + (b[off] << 8)); - } - - public static short getShort(byte[] b, int off) { - return (short) ((b[off + 1] & 0xFF) + (b[off] << 8)); - } - - public static int getInt(byte[] b, int off) { - return ((b[off + 3] & 0xFF)) + ((b[off + 2] & 0xFF) << 8) - + ((b[off + 1] & 0xFF) << 16) + ((b[off]) << 24); - } - - public static float getFloat(byte[] b, int off) { - return Float.intBitsToFloat(getInt(b, off)); - } - - public static long getLong(byte[] b, int off) { - return ((b[off + 7] & 0xFFL)) + ((b[off + 6] & 0xFFL) << 8) - + ((b[off + 5] & 0xFFL) << 16) + ((b[off + 4] & 0xFFL) << 24) - + ((b[off + 3] & 0xFFL) << 32) + ((b[off + 2] & 0xFFL) << 40) - + ((b[off + 1] & 0xFFL) << 48) + (((long) b[off]) << 56); - } - - public static double getDouble(byte[] b, int off) { - return Double.longBitsToDouble(getLong(b, off)); - } - - public static byte getByte(byte[] b, int off) { - return b[off]; - } - - public static void putBoolean(byte[] b, int off, boolean val) { - b[off] = (byte) (val ? 1 : 0); - } - - public static void putChar(byte[] b, int off, char val) { - b[off + 1] = (byte) (val); - b[off] = (byte) (val >>> 8); - } - - public static void putShort(byte[] b, int off, short val) { - b[off + 1] = (byte) (val); - b[off] = (byte) (val >>> 8); - } - - public static void putInt(byte[] b, int off, int val) { - b[off + 3] = (byte) (val); - b[off + 2] = (byte) (val >>> 8); - b[off + 1] = (byte) (val >>> 16); - b[off] = (byte) (val >>> 24); - } - - public static void putFloat(byte[] b, int off, float val) { - putInt(b, off, Float.floatToIntBits(val)); - } - - public static void putLong(byte[] b, int off, long val) { - b[off + 7] = (byte) (val); - b[off + 6] = (byte) (val >>> 8); - b[off + 5] = (byte) (val >>> 16); - b[off + 4] = (byte) (val >>> 24); - b[off + 3] = (byte) (val >>> 32); - b[off + 2] = (byte) (val >>> 40); - b[off + 1] = (byte) (val >>> 48); - b[off] = (byte) (val >>> 56); - } - - public static void putDouble(byte[] b, int off, double val) { - putLong(b, off, Double.doubleToLongBits(val)); - } - - public static void putByte(byte[] b, int off, byte val) { - b[off] = val; - } -} diff --git a/src/kieker/analysis/plugin/filter/forward/CountingThroughputFilter.java b/src/kieker/analysis/plugin/filter/forward/CountingThroughputFilter.java deleted file mode 100644 index 5afb3651ac4fa2ca2168f0ada6b5f5bbec4d989e..0000000000000000000000000000000000000000 --- a/src/kieker/analysis/plugin/filter/forward/CountingThroughputFilter.java +++ /dev/null @@ -1,308 +0,0 @@ -/*************************************************************************** - * Copyright 2013 Kieker Project (http://kieker-monitoring.net) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - ***************************************************************************/ - -package kieker.analysis.plugin.filter.forward; - -import java.util.Collection; -import java.util.Collections; -import java.util.Map.Entry; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; - -import kieker.analysis.IProjectContext; -import kieker.analysis.plugin.annotation.InputPort; -import kieker.analysis.plugin.annotation.OutputPort; -import kieker.analysis.plugin.annotation.Plugin; -import kieker.analysis.plugin.annotation.Property; -import kieker.analysis.plugin.filter.AbstractFilterPlugin; -import kieker.common.configuration.Configuration; -import kieker.common.logging.Log; -import kieker.common.logging.LogFactory; -import kieker.common.record.IMonitoringRecord; -//import kieker.common.util.ImmutableEntry; - -/** - * An instance of this class computes the throughput in terms of the number of events received per time unit. - * - * Note that only one of the input ports should be used in a configuration! - * - * @author Andre van Hoorn, Jan Waller - * - * @since 1.6 - */ -@Plugin( - description = "A filter computing the throughput in terms of the number of events received per time unit", - outputPorts = { - @OutputPort(name = CountingThroughputFilter.OUTPUT_PORT_NAME_RELAYED_OBJECTS, eventTypes = { Object.class }, - description = "Provides each incoming object"), - @OutputPort(name = CountingThroughputFilter.OUTPUT_PORT_NAME_THROUGHPUT, eventTypes = { Object.class }, - description = "Provides throughput per interval") - }, - configuration = { - @Property(name = CountingThroughputFilter.CONFIG_PROPERTY_NAME_TIMEUNIT, - defaultValue = CountingThroughputFilter.CONFIG_PROPERTY_VALUE_TIMEUNIT), - @Property(name = CountingThroughputFilter.CONFIG_PROPERTY_NAME_INTERVAL_SIZE, - defaultValue = CountingThroughputFilter.CONFIG_PROPERTY_VALUE_INTERVAL_SIZE_ONE_MINUTE), - @Property(name = CountingThroughputFilter.CONFIG_PROPERTY_NAME_INTERVALS_BASED_ON_1ST_TSTAMP, - defaultValue = "true") - }) -public final class CountingThroughputFilter extends AbstractFilterPlugin { - - /** - * The name of the input port receiving the records. - */ - public static final String INPUT_PORT_NAME_RECORDS = "inputRecords"; - /** - * The name of the input port receiving other objects. - */ - public static final String INPUT_PORT_NAME_OBJECTS = "inputObjects"; - - /** - * The name of the output port delivering the received objects. - */ - public static final String OUTPUT_PORT_NAME_RELAYED_OBJECTS = "relayedEvents"; - public static final String OUTPUT_PORT_NAME_THROUGHPUT = "throughputPerInterval"; - - /** The name of the property determining the time unit. */ - public static final String CONFIG_PROPERTY_NAME_TIMEUNIT = "timeunit"; - /** The name of the property determining the interval size. */ - public static final String CONFIG_PROPERTY_NAME_INTERVAL_SIZE = "intervalSize"; - - /** - * The default value of the time unit property (nanoseconds). - */ - public static final String CONFIG_PROPERTY_VALUE_TIMEUNIT = "NANOSECONDS"; // TimeUnit.NANOSECONDS.name() - - /** - * If the value is set to false, the intervals are computed based on time since 1970-1-1. - */ - public static final String CONFIG_PROPERTY_NAME_INTERVALS_BASED_ON_1ST_TSTAMP = "intervalsBasedOn1stTstamp"; - - /** - * The configuration property value for {@link #CONFIG_PROPERTY_NAME_INTERVAL_SIZE}, leading to a bin size of 1 minute. - */ - public static final String CONFIG_PROPERTY_VALUE_INTERVAL_SIZE_ONE_MINUTE = "60000000000"; - - private static final Log LOG = LogFactory.getLog(CountingThroughputFilter.class); - - private volatile long firstIntervalStart = -1; - private final boolean intervalsBasedOn1stTstamp; - private final TimeUnit timeunit; - - /** - * For a key <i>k</i>, the {@link Queue} stores the number of events observed in the time interval <i>(k-intervalSize,k(</i>, i.e., - * the interval <b>excludes</b> the value <i>k</i>. - */ - private final Queue<Entry<Long, Long>> eventCountsPerInterval = new ConcurrentLinkedQueue<Entry<Long, Long>>(); - - private final long intervalSize; - - private final AtomicLong currentCountForCurrentInterval = new AtomicLong(0); - - private volatile long firstTimestampInCurrentInterval = -1; // initialized with the first incoming event - private volatile long lastTimestampInCurrentInterval = -1; // initialized with the first incoming event - - /** - * Creates a new instance of this class using the given parameters. - * - * @param configuration - * The configuration for this component. - * @param projectContext - * The project context for this component. - */ - public CountingThroughputFilter(final Configuration configuration, final IProjectContext projectContext) { - super(configuration, projectContext); - - final String recordTimeunitProperty = projectContext.getProperty(IProjectContext.CONFIG_PROPERTY_NAME_RECORDS_TIME_UNIT); - TimeUnit recordTimeunit; - try { - recordTimeunit = TimeUnit.valueOf(recordTimeunitProperty); - } catch (final IllegalArgumentException ex) { // already caught in AnalysisController, should never happen - LOG.warn(recordTimeunitProperty + " is no valid TimeUnit! Using NANOSECONDS instead."); - recordTimeunit = TimeUnit.NANOSECONDS; - } - this.timeunit = recordTimeunit; - - final String configTimeunitProperty = configuration.getStringProperty(CONFIG_PROPERTY_NAME_TIMEUNIT); - TimeUnit configTimeunit; - try { - configTimeunit = TimeUnit.valueOf(configTimeunitProperty); - } catch (final IllegalArgumentException ex) { - LOG.warn(configTimeunitProperty + " is no valid TimeUnit! Using inherited value of " + this.timeunit.name() + " instead."); - configTimeunit = this.timeunit; - } - - this.intervalSize = this.timeunit.convert(configuration.getLongProperty(CONFIG_PROPERTY_NAME_INTERVAL_SIZE), configTimeunit); - this.intervalsBasedOn1stTstamp = configuration.getBooleanProperty(CONFIG_PROPERTY_NAME_INTERVALS_BASED_ON_1ST_TSTAMP); - } - - /** - * {@inheritDoc} - */ - @Override - public final Configuration getCurrentConfiguration() { - final Configuration configuration = new Configuration(); - configuration.setProperty(CONFIG_PROPERTY_NAME_TIMEUNIT, this.timeunit.name()); - configuration.setProperty(CONFIG_PROPERTY_NAME_INTERVAL_SIZE, Long.toString(this.intervalSize)); - configuration.setProperty(CONFIG_PROPERTY_NAME_INTERVALS_BASED_ON_1ST_TSTAMP, Boolean.toString(this.intervalsBasedOn1stTstamp)); - return configuration; - } - - private void processEvent(final Object event, final long currentTime) { - final long startOfTimestampsInterval = this.computeFirstTimestampInInterval(currentTime); - final long endOfTimestampsInterval = this.computeLastTimestampInInterval(currentTime); - - synchronized (this) { - // Check if we need to close the current interval. - if (endOfTimestampsInterval > this.lastTimestampInCurrentInterval) { - if (this.firstTimestampInCurrentInterval >= 0) { // don't do this for the first record (only used for initialization of variables) - long currentCount = this.currentCountForCurrentInterval.get(); -// this.eventCountsPerInterval.add( -// new ImmutableEntry<Long, Long>( -// this.lastTimestampInCurrentInterval + 1, -// currentCount)); - super.deliver(OUTPUT_PORT_NAME_THROUGHPUT, currentCount); - -// long numIntervalsElapsed = 1; // refined below -// numIntervalsElapsed = (endOfTimestampsInterval - this.lastTimestampInCurrentInterval) / this.intervalSize; -// if (numIntervalsElapsed > 1) { // NOPMD (AvoidDeeplyNestedIfStmts) -// for (int i = 1; i < numIntervalsElapsed; i++) { -// this.eventCountsPerInterval.add( -// new ImmutableEntry<Long, Long>((this.lastTimestampInCurrentInterval + (i * this.intervalSize)) + 1, 0L)); -// } -// } - - } - - this.firstTimestampInCurrentInterval = startOfTimestampsInterval; - this.lastTimestampInCurrentInterval = endOfTimestampsInterval; - this.currentCountForCurrentInterval.set(0); - } - - this.currentCountForCurrentInterval.incrementAndGet(); // only incremented in synchronized blocks - } - super.deliver(OUTPUT_PORT_NAME_RELAYED_OBJECTS, event); - } - - /** - * This method represents the input port for incoming records. - * - * @param record - * The next record. - */ - // #841 What happens with unordered events (i.e., timestamps before firstTimestampInCurrentInterval)? - @InputPort(name = INPUT_PORT_NAME_RECORDS, eventTypes = { IMonitoringRecord.class }, - description = "Receives incoming monitoring records to be considered for the throughput computation and uses the record's logging timestamp") - public final void inputRecord(final IMonitoringRecord record) { - this.processEvent(record, record.getLoggingTimestamp()); - } - - /** - * This method represents the input port for incoming object. - * - * @param object - * The next object. - */ - @InputPort(name = INPUT_PORT_NAME_OBJECTS, eventTypes = { Object.class }, - description = "Receives incoming objects to be considered for the throughput computation and uses the current system time") - public final void inputObjects(final Object object) { - this.processEvent(object, this.currentTime()); - } - - /** - * Returns the current time in {@link TimeUnit#MILLISECONDS} since 1970. - * - * @return The current time - */ - private long currentTime() { - return this.timeunit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS); - } - - // #840 is this correct? it probably makes more sense to provide a copy. - public Collection<Entry<Long, Long>> getCountsPerInterval() { - return Collections.unmodifiableCollection(this.eventCountsPerInterval); - } - - /** - * Returns the first timestamp included in the interval that corresponds to the given timestamp. - * - * @param timestamp - * - * @return The timestamp in question. - */ - private long computeFirstTimestampInInterval(final long timestamp) { - final long referenceTimePoint; - - if (this.firstIntervalStart == -1) { - this.firstIntervalStart = timestamp; - } - - if (this.intervalsBasedOn1stTstamp) { - referenceTimePoint = this.firstIntervalStart; - } else { - referenceTimePoint = 0; - } - - return referenceTimePoint + (((timestamp - referenceTimePoint) / this.intervalSize) * this.intervalSize); - } - - /** - * Returns the last timestamp included in the interval that corresponds to the given timestamp. - * - * @param timestamp - * @return The timestamp in question. - */ - private long computeLastTimestampInInterval(final long timestamp) { - final long referenceTimePoint; - if (this.intervalsBasedOn1stTstamp) { - referenceTimePoint = this.firstIntervalStart; - } else { - referenceTimePoint = 0; - } - - return referenceTimePoint + (((((timestamp - referenceTimePoint) / this.intervalSize) + 1) * this.intervalSize) - 1); - } - - /** - * @return the intervalSize - */ - public long getIntervalSize() { - return this.intervalSize; - } - - /** - * @return the firstTimestampInCurrentInterval -1 if no record processed so far - */ - public long getFirstTimestampInCurrentInterval() { - return this.firstTimestampInCurrentInterval; - } - - /** - * @return the lastTimestampInCurrentInterval -1 if no record processed so far - */ - public long getLastTimestampInCurrentInterval() { - return this.lastTimestampInCurrentInterval; - } - - /** - * @return the currentCountForCurrentInterval - */ - public long getCurrentCountForCurrentInterval() { - return this.currentCountForCurrentInterval.get(); - } -} diff --git a/src/kieker/analysis/plugin/reader/mq/RabbitMQReader.java b/src/kieker/analysis/plugin/reader/mq/RabbitMQReader.java index a4d0323e04dc3db6fd84d5f3c78e82a2bc0aed34..4767e8243282dbc70e628797af8fe44a9fc582d4 100644 --- a/src/kieker/analysis/plugin/reader/mq/RabbitMQReader.java +++ b/src/kieker/analysis/plugin/reader/mq/RabbitMQReader.java @@ -21,7 +21,6 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import kieker.analysis.IProjectContext; -import kieker.analysis.plugin.Bits; import kieker.analysis.plugin.annotation.*; import kieker.analysis.plugin.reader.AbstractReaderPlugin; import kieker.common.configuration.Configuration; @@ -31,6 +30,7 @@ import kieker.common.logging.LogFactory; import kieker.common.record.AbstractMonitoringRecord; import kieker.common.record.IMonitoringRecord; import com.rabbitmq.client.*; +import explorviz.hpc_monitoring.Bits; /** * Reads monitoring records from the queue of an established RabbitMQ