diff --git a/.classpath b/.classpath index e5536a4fdcf434c6206788e092e637f081337596..4d76e2942af91530759af460bbce2ef4c614fd16 100644 --- a/.classpath +++ b/.classpath @@ -1,8 +1,10 @@ <?xml version="1.0" encoding="UTF-8"?> <classpath> + <classpathentry kind="con" path="org.eclipse.xtend.XTEND_CONTAINER"/> + <classpathentry kind="src" path="xtend-gen"/> <classpathentry excluding="kieker/analysis/plugin/filter/flow/" kind="src" path="src"/> <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/> - <classpathentry kind="lib" path="lib/kieker-1.8-SNAPSHOT.jar"/> <classpathentry kind="lib" path="lib/rabbitmq-client.jar"/> + <classpathentry combineaccessrules="false" kind="src" path="/kieker"/> <classpathentry kind="output" path="bin"/> </classpath> diff --git a/lib/kieker-1.8-SNAPSHOT.jar b/lib/kieker-1.8-SNAPSHOT.jar index 9e9e153746367f8ac7c41f68b455097d0b23d165..81509a16d0eef4a09917e5bd724a21677dd5c516 100644 Binary files a/lib/kieker-1.8-SNAPSHOT.jar and b/lib/kieker-1.8-SNAPSHOT.jar differ diff --git a/src/explorviz/worker/main/WorkerController.xtend b/src/explorviz/worker/main/WorkerController.xtend new file mode 100644 index 0000000000000000000000000000000000000000..5611f7d3cc2cc1627f6a760817ca697bbb5a7c69 --- /dev/null +++ b/src/explorviz/worker/main/WorkerController.xtend @@ -0,0 +1,55 @@ +package explorviz.worker.main + +import kieker.analysis.AnalysisController +import kieker.common.configuration.Configuration +import kieker.analysis.IAnalysisController +import kieker.analysis.plugin.reader.mq.RabbitMQReader +import kieker.analysis.plugin.filter.forward.CountingFilter +import kieker.analysis.plugin.filter.forward.TeeFilter +import kieker.analysis.plugin.filter.forward.CountingThroughputFilter +import kieker.analysis.plugin.filter.flow.EventRecordTraceReconstructionFilter + +class WorkerController { + + var IAnalysisController analysisInstance + + def start() { + analysisInstance = new AnalysisController() + + val rabbitMQ = initRabbitMQ() + + val config = new Configuration() +// config.setProperty(CountingThroughputFilter::CONFIG_PROPERTY_NAME_INTERVAL_SIZE, "1000000000") +// val countingFilter = new CountingThroughputFilter(config, analysisInstance) + val teeFilter = initTeeFilter() + + val eventTraceReconstructionFilter = + new EventRecordTraceReconstructionFilter(config, analysisInstance); + + analysisInstance.connect(rabbitMQ, RabbitMQReader::OUTPUT_PORT_NAME_RECORDS, eventTraceReconstructionFilter, EventRecordTraceReconstructionFilter::INPUT_PORT_NAME_TRACE_RECORDS) + analysisInstance.connect(eventTraceReconstructionFilter, EventRecordTraceReconstructionFilter::OUTPUT_PORT_NAME_TRACE_VALID, teeFilter, TeeFilter::INPUT_PORT_NAME_EVENTS) + + try { + analysisInstance.run() + } catch (Exception e) { + e.printStackTrace + } + } + + def initRabbitMQ() { + val rabbitConfig = new Configuration() + rabbitConfig.setProperty(RabbitMQReader::CONFIG_PROPERTY_NAME_PROVIDERURL, "localhost") + rabbitConfig.setProperty(RabbitMQReader::CONFIG_PROPERTY_NAME_QUEUE, "kieker") + new RabbitMQReader(rabbitConfig, analysisInstance) + } + + def initCountingFilter() { + val config = new Configuration() + new CountingFilter(config, analysisInstance) + } + + def initTeeFilter() { + val config = new Configuration() + new TeeFilter(config, analysisInstance) + } +} \ No newline at end of file diff --git a/src/explorviz/worker/main/WorkerStarter.java b/src/explorviz/worker/main/WorkerStarter.java new file mode 100644 index 0000000000000000000000000000000000000000..81bbbf5eaa3501761b070575abbae63f30a710e9 --- /dev/null +++ b/src/explorviz/worker/main/WorkerStarter.java @@ -0,0 +1,8 @@ +package explorviz.worker.main; + +public class WorkerStarter { + + public static void main(String[] args) { + new WorkerController().start(); + } +} diff --git a/src/kieker/analysis/plugin/filter/forward/CountingThroughputFilter.java b/src/kieker/analysis/plugin/filter/forward/CountingThroughputFilter.java new file mode 100644 index 0000000000000000000000000000000000000000..5afb3651ac4fa2ca2168f0ada6b5f5bbec4d989e --- /dev/null +++ b/src/kieker/analysis/plugin/filter/forward/CountingThroughputFilter.java @@ -0,0 +1,308 @@ +/*************************************************************************** + * Copyright 2013 Kieker Project (http://kieker-monitoring.net) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ***************************************************************************/ + +package kieker.analysis.plugin.filter.forward; + +import java.util.Collection; +import java.util.Collections; +import java.util.Map.Entry; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import kieker.analysis.IProjectContext; +import kieker.analysis.plugin.annotation.InputPort; +import kieker.analysis.plugin.annotation.OutputPort; +import kieker.analysis.plugin.annotation.Plugin; +import kieker.analysis.plugin.annotation.Property; +import kieker.analysis.plugin.filter.AbstractFilterPlugin; +import kieker.common.configuration.Configuration; +import kieker.common.logging.Log; +import kieker.common.logging.LogFactory; +import kieker.common.record.IMonitoringRecord; +//import kieker.common.util.ImmutableEntry; + +/** + * An instance of this class computes the throughput in terms of the number of events received per time unit. + * + * Note that only one of the input ports should be used in a configuration! + * + * @author Andre van Hoorn, Jan Waller + * + * @since 1.6 + */ +@Plugin( + description = "A filter computing the throughput in terms of the number of events received per time unit", + outputPorts = { + @OutputPort(name = CountingThroughputFilter.OUTPUT_PORT_NAME_RELAYED_OBJECTS, eventTypes = { Object.class }, + description = "Provides each incoming object"), + @OutputPort(name = CountingThroughputFilter.OUTPUT_PORT_NAME_THROUGHPUT, eventTypes = { Object.class }, + description = "Provides throughput per interval") + }, + configuration = { + @Property(name = CountingThroughputFilter.CONFIG_PROPERTY_NAME_TIMEUNIT, + defaultValue = CountingThroughputFilter.CONFIG_PROPERTY_VALUE_TIMEUNIT), + @Property(name = CountingThroughputFilter.CONFIG_PROPERTY_NAME_INTERVAL_SIZE, + defaultValue = CountingThroughputFilter.CONFIG_PROPERTY_VALUE_INTERVAL_SIZE_ONE_MINUTE), + @Property(name = CountingThroughputFilter.CONFIG_PROPERTY_NAME_INTERVALS_BASED_ON_1ST_TSTAMP, + defaultValue = "true") + }) +public final class CountingThroughputFilter extends AbstractFilterPlugin { + + /** + * The name of the input port receiving the records. + */ + public static final String INPUT_PORT_NAME_RECORDS = "inputRecords"; + /** + * The name of the input port receiving other objects. + */ + public static final String INPUT_PORT_NAME_OBJECTS = "inputObjects"; + + /** + * The name of the output port delivering the received objects. + */ + public static final String OUTPUT_PORT_NAME_RELAYED_OBJECTS = "relayedEvents"; + public static final String OUTPUT_PORT_NAME_THROUGHPUT = "throughputPerInterval"; + + /** The name of the property determining the time unit. */ + public static final String CONFIG_PROPERTY_NAME_TIMEUNIT = "timeunit"; + /** The name of the property determining the interval size. */ + public static final String CONFIG_PROPERTY_NAME_INTERVAL_SIZE = "intervalSize"; + + /** + * The default value of the time unit property (nanoseconds). + */ + public static final String CONFIG_PROPERTY_VALUE_TIMEUNIT = "NANOSECONDS"; // TimeUnit.NANOSECONDS.name() + + /** + * If the value is set to false, the intervals are computed based on time since 1970-1-1. + */ + public static final String CONFIG_PROPERTY_NAME_INTERVALS_BASED_ON_1ST_TSTAMP = "intervalsBasedOn1stTstamp"; + + /** + * The configuration property value for {@link #CONFIG_PROPERTY_NAME_INTERVAL_SIZE}, leading to a bin size of 1 minute. + */ + public static final String CONFIG_PROPERTY_VALUE_INTERVAL_SIZE_ONE_MINUTE = "60000000000"; + + private static final Log LOG = LogFactory.getLog(CountingThroughputFilter.class); + + private volatile long firstIntervalStart = -1; + private final boolean intervalsBasedOn1stTstamp; + private final TimeUnit timeunit; + + /** + * For a key <i>k</i>, the {@link Queue} stores the number of events observed in the time interval <i>(k-intervalSize,k(</i>, i.e., + * the interval <b>excludes</b> the value <i>k</i>. + */ + private final Queue<Entry<Long, Long>> eventCountsPerInterval = new ConcurrentLinkedQueue<Entry<Long, Long>>(); + + private final long intervalSize; + + private final AtomicLong currentCountForCurrentInterval = new AtomicLong(0); + + private volatile long firstTimestampInCurrentInterval = -1; // initialized with the first incoming event + private volatile long lastTimestampInCurrentInterval = -1; // initialized with the first incoming event + + /** + * Creates a new instance of this class using the given parameters. + * + * @param configuration + * The configuration for this component. + * @param projectContext + * The project context for this component. + */ + public CountingThroughputFilter(final Configuration configuration, final IProjectContext projectContext) { + super(configuration, projectContext); + + final String recordTimeunitProperty = projectContext.getProperty(IProjectContext.CONFIG_PROPERTY_NAME_RECORDS_TIME_UNIT); + TimeUnit recordTimeunit; + try { + recordTimeunit = TimeUnit.valueOf(recordTimeunitProperty); + } catch (final IllegalArgumentException ex) { // already caught in AnalysisController, should never happen + LOG.warn(recordTimeunitProperty + " is no valid TimeUnit! Using NANOSECONDS instead."); + recordTimeunit = TimeUnit.NANOSECONDS; + } + this.timeunit = recordTimeunit; + + final String configTimeunitProperty = configuration.getStringProperty(CONFIG_PROPERTY_NAME_TIMEUNIT); + TimeUnit configTimeunit; + try { + configTimeunit = TimeUnit.valueOf(configTimeunitProperty); + } catch (final IllegalArgumentException ex) { + LOG.warn(configTimeunitProperty + " is no valid TimeUnit! Using inherited value of " + this.timeunit.name() + " instead."); + configTimeunit = this.timeunit; + } + + this.intervalSize = this.timeunit.convert(configuration.getLongProperty(CONFIG_PROPERTY_NAME_INTERVAL_SIZE), configTimeunit); + this.intervalsBasedOn1stTstamp = configuration.getBooleanProperty(CONFIG_PROPERTY_NAME_INTERVALS_BASED_ON_1ST_TSTAMP); + } + + /** + * {@inheritDoc} + */ + @Override + public final Configuration getCurrentConfiguration() { + final Configuration configuration = new Configuration(); + configuration.setProperty(CONFIG_PROPERTY_NAME_TIMEUNIT, this.timeunit.name()); + configuration.setProperty(CONFIG_PROPERTY_NAME_INTERVAL_SIZE, Long.toString(this.intervalSize)); + configuration.setProperty(CONFIG_PROPERTY_NAME_INTERVALS_BASED_ON_1ST_TSTAMP, Boolean.toString(this.intervalsBasedOn1stTstamp)); + return configuration; + } + + private void processEvent(final Object event, final long currentTime) { + final long startOfTimestampsInterval = this.computeFirstTimestampInInterval(currentTime); + final long endOfTimestampsInterval = this.computeLastTimestampInInterval(currentTime); + + synchronized (this) { + // Check if we need to close the current interval. + if (endOfTimestampsInterval > this.lastTimestampInCurrentInterval) { + if (this.firstTimestampInCurrentInterval >= 0) { // don't do this for the first record (only used for initialization of variables) + long currentCount = this.currentCountForCurrentInterval.get(); +// this.eventCountsPerInterval.add( +// new ImmutableEntry<Long, Long>( +// this.lastTimestampInCurrentInterval + 1, +// currentCount)); + super.deliver(OUTPUT_PORT_NAME_THROUGHPUT, currentCount); + +// long numIntervalsElapsed = 1; // refined below +// numIntervalsElapsed = (endOfTimestampsInterval - this.lastTimestampInCurrentInterval) / this.intervalSize; +// if (numIntervalsElapsed > 1) { // NOPMD (AvoidDeeplyNestedIfStmts) +// for (int i = 1; i < numIntervalsElapsed; i++) { +// this.eventCountsPerInterval.add( +// new ImmutableEntry<Long, Long>((this.lastTimestampInCurrentInterval + (i * this.intervalSize)) + 1, 0L)); +// } +// } + + } + + this.firstTimestampInCurrentInterval = startOfTimestampsInterval; + this.lastTimestampInCurrentInterval = endOfTimestampsInterval; + this.currentCountForCurrentInterval.set(0); + } + + this.currentCountForCurrentInterval.incrementAndGet(); // only incremented in synchronized blocks + } + super.deliver(OUTPUT_PORT_NAME_RELAYED_OBJECTS, event); + } + + /** + * This method represents the input port for incoming records. + * + * @param record + * The next record. + */ + // #841 What happens with unordered events (i.e., timestamps before firstTimestampInCurrentInterval)? + @InputPort(name = INPUT_PORT_NAME_RECORDS, eventTypes = { IMonitoringRecord.class }, + description = "Receives incoming monitoring records to be considered for the throughput computation and uses the record's logging timestamp") + public final void inputRecord(final IMonitoringRecord record) { + this.processEvent(record, record.getLoggingTimestamp()); + } + + /** + * This method represents the input port for incoming object. + * + * @param object + * The next object. + */ + @InputPort(name = INPUT_PORT_NAME_OBJECTS, eventTypes = { Object.class }, + description = "Receives incoming objects to be considered for the throughput computation and uses the current system time") + public final void inputObjects(final Object object) { + this.processEvent(object, this.currentTime()); + } + + /** + * Returns the current time in {@link TimeUnit#MILLISECONDS} since 1970. + * + * @return The current time + */ + private long currentTime() { + return this.timeunit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS); + } + + // #840 is this correct? it probably makes more sense to provide a copy. + public Collection<Entry<Long, Long>> getCountsPerInterval() { + return Collections.unmodifiableCollection(this.eventCountsPerInterval); + } + + /** + * Returns the first timestamp included in the interval that corresponds to the given timestamp. + * + * @param timestamp + * + * @return The timestamp in question. + */ + private long computeFirstTimestampInInterval(final long timestamp) { + final long referenceTimePoint; + + if (this.firstIntervalStart == -1) { + this.firstIntervalStart = timestamp; + } + + if (this.intervalsBasedOn1stTstamp) { + referenceTimePoint = this.firstIntervalStart; + } else { + referenceTimePoint = 0; + } + + return referenceTimePoint + (((timestamp - referenceTimePoint) / this.intervalSize) * this.intervalSize); + } + + /** + * Returns the last timestamp included in the interval that corresponds to the given timestamp. + * + * @param timestamp + * @return The timestamp in question. + */ + private long computeLastTimestampInInterval(final long timestamp) { + final long referenceTimePoint; + if (this.intervalsBasedOn1stTstamp) { + referenceTimePoint = this.firstIntervalStart; + } else { + referenceTimePoint = 0; + } + + return referenceTimePoint + (((((timestamp - referenceTimePoint) / this.intervalSize) + 1) * this.intervalSize) - 1); + } + + /** + * @return the intervalSize + */ + public long getIntervalSize() { + return this.intervalSize; + } + + /** + * @return the firstTimestampInCurrentInterval -1 if no record processed so far + */ + public long getFirstTimestampInCurrentInterval() { + return this.firstTimestampInCurrentInterval; + } + + /** + * @return the lastTimestampInCurrentInterval -1 if no record processed so far + */ + public long getLastTimestampInCurrentInterval() { + return this.lastTimestampInCurrentInterval; + } + + /** + * @return the currentCountForCurrentInterval + */ + public long getCurrentCountForCurrentInterval() { + return this.currentCountForCurrentInterval.get(); + } +} diff --git a/src/kieker/analysis/plugin/reader/mq/RabbitMQReader.java b/src/kieker/analysis/plugin/reader/mq/RabbitMQReader.java index 0b7af3c98b444a597c5ee2a71909a2461081b084..89d6125790ec0887e61f206037e44e39da7c0bde 100644 --- a/src/kieker/analysis/plugin/reader/mq/RabbitMQReader.java +++ b/src/kieker/analysis/plugin/reader/mq/RabbitMQReader.java @@ -16,11 +16,9 @@ package kieker.analysis.plugin.reader.mq; -import java.io.ByteArrayInputStream; import java.io.IOException; -import java.io.ObjectInputStream; -import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import com.rabbitmq.client.Channel; @@ -36,38 +34,39 @@ import kieker.analysis.plugin.annotation.Plugin; import kieker.analysis.plugin.annotation.Property; import kieker.analysis.plugin.reader.AbstractReaderPlugin; import kieker.common.configuration.Configuration; +import kieker.common.exception.MonitoringRecordException; import kieker.common.logging.Log; import kieker.common.logging.LogFactory; +import kieker.common.record.AbstractMonitoringRecord; import kieker.common.record.IMonitoringRecord; import kieker.analysis.plugin.reader.mq.Bits; /** - * Reads monitoring records from the queue of an established RabbitMQ connection. + * Reads monitoring records from the queue of an established RabbitMQ + * connection. * * @author Santje Finke * * @since 1.8 */ -@Plugin(description = "A reader which reads records from a RabbitMQ queue", - dependencies = "This plugin needs the file 'rabbitmq.client-*.jar'.", - outputPorts = { - @OutputPort(name = RabbitMQReader.OUTPUT_PORT_NAME_RECORDS, eventTypes = { IMonitoringRecord.class }, description = "Output Port of the JMSReader") - }, - configuration = { - @Property(name = RabbitMQReader.CONFIG_PROPERTY_NAME_PROVIDERURL, defaultValue = "localhost"), - @Property(name = RabbitMQReader.CONFIG_PROPERTY_NAME_QUEUE, defaultValue = "analysis"), - @Property(name = RabbitMQReader.CONFIG_PROPERTY_PASSWORD, defaultValue = "guest"), - @Property(name = RabbitMQReader.CONFIG_PROPERTY_USER, defaultValue = "guest"), - @Property(name = RabbitMQReader.CONFIG_PROPERTY_PORT, defaultValue = "5672") - - }) +@Plugin(description = "A reader which reads records from a RabbitMQ queue", dependencies = "This plugin needs the file 'rabbitmq.client-*.jar'.", outputPorts = { @OutputPort(name = RabbitMQReader.OUTPUT_PORT_NAME_RECORDS, eventTypes = { IMonitoringRecord.class }, description = "Output Port of the JMSReader") }, configuration = { + @Property(name = RabbitMQReader.CONFIG_PROPERTY_NAME_PROVIDERURL, defaultValue = "localhost"), + @Property(name = RabbitMQReader.CONFIG_PROPERTY_NAME_QUEUE, defaultValue = "analysis"), + @Property(name = RabbitMQReader.CONFIG_PROPERTY_PASSWORD, defaultValue = "guest"), + @Property(name = RabbitMQReader.CONFIG_PROPERTY_USER, defaultValue = "guest"), + @Property(name = RabbitMQReader.CONFIG_PROPERTY_PORT, defaultValue = "5672") + +}) public final class RabbitMQReader extends AbstractReaderPlugin { /** The name of the output port delivering the received records. */ public static final String OUTPUT_PORT_NAME_RECORDS = "monitoringRecords"; /** The name of the configuration determining the RabbitMQ provider URL. */ public static final String CONFIG_PROPERTY_NAME_PROVIDERURL = "mqProviderUrl"; - /** The name of the configuration determining the RabbitMQ Queue (e.g. queue1). */ + /** + * The name of the configuration determining the RabbitMQ Queue (e.g. + * queue1). + */ public static final String CONFIG_PROPERTY_NAME_QUEUE = "mqDestination"; /** The username that is used to connect to a queue. */ public static final String CONFIG_PROPERTY_USER = "guest"; @@ -76,7 +75,10 @@ public final class RabbitMQReader extends AbstractReaderPlugin { /** The port that is used to connect to a queue. */ public static final String CONFIG_PROPERTY_PORT = "5672"; - static final Log LOG = LogFactory.getLog(RabbitMQReader.class); // NOPMD package for inner class + static final Log LOG = LogFactory.getLog(RabbitMQReader.class); // NOPMD + // package + // for inner + // class private final String providerUrl; private final String queueName; @@ -87,22 +89,31 @@ public final class RabbitMQReader extends AbstractReaderPlugin { private Channel channel; private ConnectionFactory factory; private QueueingConsumer normalConsumer; - private QueueingConsumer registryConsumer; + private final CountDownLatch cdLatch = new CountDownLatch(1); - - private final Map<Integer, String> stringRegistry = new HashMap<Integer, String>(); + + private final Map<Integer, String> stringRegistry = new ConcurrentHashMap<Integer, String>(); + + private RabbitMQRegistryConsumer registryConsumer; /** * Creates a new instance of this class using the given parameters. * * @param configuration - * The configuration used to initialize the whole reader. Keep in mind that the configuration should contain the following properties: + * The configuration used to initialize the whole reader. Keep in + * mind that the configuration should contain the following + * properties: * <ul> - * <li>The property {@link #CONFIG_PROPERTY_NAME_PROVIDERURL}, e.g. {@code localhost} - * <li>The property {@link #CONFIG_PROPERTY_NAME_QUEUE}, e.g. {@code queue1} - * <li>The property {@link #CONFIG_PROPERTY_PASSWORD}, e.g. {@code password} - * <li>The property {@link #CONFIG_PROPERTY_USER}, e.g. {@code username} - * <li>The property {@link #CONFIG_PROPERTY_PORT}, e.g. {@code port} + * <li>The property {@link #CONFIG_PROPERTY_NAME_PROVIDERURL}, + * e.g. {@code localhost} + * <li>The property {@link #CONFIG_PROPERTY_NAME_QUEUE}, e.g. + * {@code queue1} + * <li>The property {@link #CONFIG_PROPERTY_PASSWORD}, e.g. + * {@code password} + * <li>The property {@link #CONFIG_PROPERTY_USER}, e.g. + * {@code username} + * <li>The property {@link #CONFIG_PROPERTY_PORT}, e.g. + * {@code port} * </ul> * @param projectContext * The project context for this component. @@ -110,22 +121,30 @@ public final class RabbitMQReader extends AbstractReaderPlugin { * @throws IllegalArgumentException * If one of the properties is empty. */ - public RabbitMQReader(final Configuration configuration, final IProjectContext projectContext) throws IllegalArgumentException { + public RabbitMQReader(final Configuration configuration, + final IProjectContext projectContext) + throws IllegalArgumentException { super(configuration, projectContext); // Initialize the reader bases on the given configuration. - this.providerUrl = configuration.getStringProperty(CONFIG_PROPERTY_NAME_PROVIDERURL); - this.queueName = configuration.getStringProperty(CONFIG_PROPERTY_NAME_QUEUE); + this.providerUrl = configuration + .getStringProperty(CONFIG_PROPERTY_NAME_PROVIDERURL); + this.queueName = configuration + .getStringProperty(CONFIG_PROPERTY_NAME_QUEUE); this.username = configuration.getStringProperty(CONFIG_PROPERTY_USER); - this.password = configuration.getStringProperty(CONFIG_PROPERTY_PASSWORD); + this.password = configuration + .getStringProperty(CONFIG_PROPERTY_PASSWORD); this.port = configuration.getIntProperty(CONFIG_PROPERTY_PORT); // simple sanity check if ((this.providerUrl.length() == 0) || (this.queueName.length() == 0)) { - throw new IllegalArgumentException("RabbitMQReader has not sufficient parameters. providerUrl ('" + this.providerUrl + "') or destination ('" - + this.queueName + "'), is null"); + throw new IllegalArgumentException( + "RabbitMQReader has not sufficient parameters. providerUrl ('" + + this.providerUrl + "') or destination ('" + + this.queueName + "'), is null"); } - - RabbitMQRegistryConsumer registryConsumer = new RabbitMQRegistryConsumer(); + + registryConsumer = new RabbitMQRegistryConsumer(this, providerUrl, + "registryRecords", username, password, port); registryConsumer.start(); } @@ -133,12 +152,18 @@ public final class RabbitMQReader extends AbstractReaderPlugin { * Creates a new instance of this class using the given parameters. * * @param configuration - * The configuration used to initialize the whole reader. Keep in mind that the configuration should contain the following properties: + * The configuration used to initialize the whole reader. Keep in + * mind that the configuration should contain the following + * properties: * <ul> - * <li>The property {@link #CONFIG_PROPERTY_NAME_PROVIDERURL}, e.g. {@code localhost} - * <li>The property {@link #CONFIG_PROPERTY_NAME_QUEUE}, e.g. {@code queue1} - * <li>The property {@link #CONFIG_PROPERTY_PASSWORD}, e.g. {@code password} - * <li>The property {@link #CONFIG_PROPERTY_USER}, e.g. {@code username} + * <li>The property {@link #CONFIG_PROPERTY_NAME_PROVIDERURL}, + * e.g. {@code localhost} + * <li>The property {@link #CONFIG_PROPERTY_NAME_QUEUE}, e.g. + * {@code queue1} + * <li>The property {@link #CONFIG_PROPERTY_PASSWORD}, e.g. + * {@code password} + * <li>The property {@link #CONFIG_PROPERTY_USER}, e.g. + * {@code username} * </ul> * * @throws IllegalArgumentException @@ -147,7 +172,8 @@ public final class RabbitMQReader extends AbstractReaderPlugin { * @deprecated To be removed in Kieker 1.8. */ @Deprecated - public RabbitMQReader(final Configuration configuration) throws IllegalArgumentException { + public RabbitMQReader(final Configuration configuration) + throws IllegalArgumentException { this(configuration, null); } @@ -159,51 +185,26 @@ public final class RabbitMQReader extends AbstractReaderPlugin { public boolean read() { boolean retVal = true; try { - this.factory = new ConnectionFactory(); - this.factory.setHost(this.providerUrl); - this.factory.setPort(this.port); - this.factory.setConnectionTimeout(0); - this.factory.setUsername(this.username); - this.factory.setPassword(this.password); - - this.connection = this.factory.newConnection(); - this.channel = this.connection.createChannel(); - - this.channel.queueDeclare(this.queueName, false, false, false, null); - this.channel.queueDeclare("registryRecords", false, false, false, null); - - LOG.info("Listening to destination:" + this.queueName + " at " + this.providerUrl + " !\n***\n\n"); - - this.normalConsumer = new QueueingConsumer(this.channel); - this.channel.basicConsume(this.queueName, true, this.normalConsumer); - - this.registryConsumer = new QueueingConsumer(this.channel); - this.channel.basicConsume("registryRecords", true, this.registryConsumer); - + createConnectionFactory(); + connect(); while (!Thread.interrupted()) { if (!this.connection.isOpen() || !this.channel.isOpen()) { this.reconnect(); } - final QueueingConsumer.Delivery delivery = this.normalConsumer.nextDelivery(); + final QueueingConsumer.Delivery delivery = this.normalConsumer + .nextDelivery(); - final ByteArrayInputStream bain = new ByteArrayInputStream(delivery.getBody()); - final ObjectInputStream in = new ObjectInputStream(bain); - final Object message = in.readObject(); - - if ((message instanceof IMonitoringRecord) && (!this.deliverIndirect(OUTPUT_PORT_NAME_RECORDS, message))) { - LOG.error("deliverRecord returned false"); - } + byte[] batchedMessages = delivery.getBody(); + messagesfromByteArray(batchedMessages, 0); + this.normalConsumer.getChannel().basicAck( + delivery.getEnvelope().getDeliveryTag(), false); } - } catch (final IOException ex) { // NOPMD NOCS (IllegalCatchCheck) LOG.error("Error in read()", ex); retVal = false; - } catch (final ClassNotFoundException e) { - LOG.error("Error in read(): ClassNotFound Exception", e); - retVal = false; } catch (final ShutdownSignalException e) { LOG.error("Error in read(): ShutdownSignal Exception", e); retVal = false; @@ -224,140 +225,118 @@ public final class RabbitMQReader extends AbstractReaderPlugin { } return retVal; } - - private IMonitoringRecord fromByteArray(final byte[] b) { - final Class<?>[] recordTypes = monitoringRecord.getValueTypes(); - - int arraySize = 4 + 8; - - for (int i = 0; i < recordTypes.length; i++) { - if (recordTypes[i] == String.class) { - arraySize += 4; - } else if (recordTypes[i] == Integer.class - || recordTypes[i] == int.class) { - arraySize += 4; - } else if (recordTypes[i] == Long.class - || recordTypes[i] == long.class) { - arraySize += 8; - } else if (recordTypes[i] == Float.class - || recordTypes[i] == float.class) { - arraySize += 4; - } else if (recordTypes[i] == Double.class - || recordTypes[i] == double.class) { - arraySize += 8; - } else if (recordTypes[i] == Byte.class - || recordTypes[i] == byte.class) { - arraySize += 1; - } else if (recordTypes[i] == Short.class - || recordTypes[i] == short.class) { - arraySize += 2; - } else if (recordTypes[i] == Boolean.class - || recordTypes[i] == boolean.class) { - arraySize += 1; - } else { - arraySize += 1; - } + + private void createConnectionFactory() { + this.factory = new ConnectionFactory(); + this.factory.setHost(this.providerUrl); + this.factory.setPort(this.port); + this.factory.setConnectionTimeout(0); + this.factory.setUsername(this.username); + this.factory.setPassword(this.password); + } + + private void connect() throws IOException { + this.connection = this.factory.newConnection(); + this.channel = this.connection.createChannel(); + + this.channel.queueDeclare(this.queueName, false, false, false, null); + + this.normalConsumer = new QueueingConsumer(this.channel); + this.channel.basicConsume(this.queueName, false, this.normalConsumer); + this.channel.basicQos(10); + } + + private void reconnect() { + try { + connect(); + } catch (final IOException e) { + RabbitMQReader.LOG.error("Error reestablishing connection", e); } + } + + private void messagesfromByteArray(final byte[] b, int initOffset) { + int offset = initOffset; + + while (Bits.getInt(b, offset) != -1) { + int firstValue = Bits.getInt(b, offset); + offset += 4; - byte[] result = new byte[arraySize]; - int offset = 0; + String classname = this.getStringFromRegistry(firstValue); + + Class<? extends IMonitoringRecord> clazz = null; + Class<?>[] recordTypes = null; + try { + clazz = AbstractMonitoringRecord.classForName(classname); + recordTypes = AbstractMonitoringRecord.typesForClass(clazz); + } catch (MonitoringRecordException e) { + LOG.error("could not create record", e); + } - Bits.putInt(result, offset, this.monitoringController - .getIdForString(monitoringRecord.getClass().getName())); - offset += 4; + final long loggingTimestamp = Bits.getLong(b, offset); + offset += 8; - Bits.putLong(result, offset, monitoringRecord.getLoggingTimestamp()); - offset += 8; - final Object[] recordFields = monitoringRecord.toArray(); + final Object[] values = new Object[recordTypes.length]; + int valueIndex = 0; - for (int i = 0; i < recordFields.length; i++) { - if (recordFields[i] == null) { + for (int i = 0; i < recordTypes.length; i++) { if (recordTypes[i] == String.class) { - Bits.putInt(result, offset, - this.monitoringController.getIdForString("")); + values[valueIndex] = this.getStringFromRegistry(Bits + .getInt(b, offset)); offset += 4; + valueIndex++; } else if (recordTypes[i] == Integer.class || recordTypes[i] == int.class) { - Bits.putInt(result, offset, 0); + values[valueIndex] = Bits.getInt(b, offset); offset += 4; + valueIndex++; } else if (recordTypes[i] == Long.class || recordTypes[i] == long.class) { - Bits.putLong(result, offset, 0L); + values[valueIndex] = Bits.getLong(b, offset); offset += 8; + valueIndex++; } else if (recordTypes[i] == Float.class || recordTypes[i] == float.class) { - Bits.putFloat(result, offset, 0); + values[valueIndex] = Bits.getFloat(b, offset); offset += 4; + valueIndex++; } else if (recordTypes[i] == Double.class || recordTypes[i] == double.class) { - Bits.putDouble(result, offset, 0); + values[valueIndex] = Bits.getDouble(b, offset); offset += 8; + valueIndex++; } else if (recordTypes[i] == Byte.class || recordTypes[i] == byte.class) { - Bits.putByte(result, offset, (byte) 0); + values[valueIndex] = Bits.getByte(b, offset); offset += 1; + valueIndex++; } else if (recordTypes[i] == Short.class || recordTypes[i] == short.class) { - Bits.putShort(result, offset, (short) 0); + values[valueIndex] = Bits.getShort(b, offset); offset += 2; - } else if (recordTypes[i] == Boolean.class - || recordTypes[i] == boolean.class) { - Bits.putBoolean(result, offset, false); + valueIndex++; + } else if (recordTypes[i] == boolean.class + || recordTypes[i] == Boolean.class) { + values[valueIndex] = Bits.getBoolean(b, offset); offset += 1; + valueIndex++; } else { - LOG.warn("Record with unsupported recordField of type " - + recordFields[i].getClass()); - Bits.putByte(result, offset, (byte) 0); + values[valueIndex] = Bits.getByte(b, offset); offset += 1; + valueIndex++; } - } else if (recordFields[i] instanceof String) { - Bits.putInt(result, offset, this.monitoringController - .getIdForString((String) recordFields[i])); - offset += 4; - } else if (recordFields[i] instanceof Integer) { - Bits.putInt(result, offset, (Integer) recordFields[i]); - offset += 4; - } else if (recordFields[i] instanceof Long) { - Bits.putLong(result, offset, (Long) recordFields[i]); - offset += 8; - } else if (recordFields[i] instanceof Float) { - Bits.putFloat(result, offset, (Float) recordFields[i]); - offset += 4; - } else if (recordFields[i] instanceof Double) { - Bits.putDouble(result, offset, (Double) recordFields[i]); - offset += 8; - } else if (recordFields[i] instanceof Byte) { - Bits.putByte(result, offset, (Byte) recordFields[i]); - offset += 1; - } else if (recordFields[i] instanceof Short) { - Bits.putShort(result, offset, (Short) recordFields[i]); - offset += 2; - } else if (recordFields[i] instanceof Boolean) { - Bits.putBoolean(result, offset, (Boolean) recordFields[i]); - offset += 1; - } else { - LOG.warn("Record with unsupported recordField of type " - + recordFields[i].getClass()); - Bits.putByte(result, offset, (byte) 0); - offset += 1; } - } + IMonitoringRecord record = null; + try { + record = AbstractMonitoringRecord + .createFromArray(clazz, values); + } catch (MonitoringRecordException e) { + LOG.error("could not create record", e); + } + record.setLoggingTimestamp(loggingTimestamp); - return result; - } - - /** - * Establishes a connection to a rabbitMQ channel with the current connection informationen. - */ - private void reconnect() { - try { - this.connection = this.factory.newConnection(); - this.channel = this.connection.createChannel(); - this.channel.queueDeclare(this.queueName, false, false, false, null); - this.normalConsumer = new QueueingConsumer(this.channel); - this.channel.basicConsume(this.queueName, true, this.normalConsumer); - } catch (final IOException e) { - RabbitMQReader.LOG.error("Error reestablishing connection", e); + if (!super.deliver(OUTPUT_PORT_NAME_RECORDS, record)) { + LOG.error("deliverRecord returned false"); + } } } @@ -365,15 +344,12 @@ public final class RabbitMQReader extends AbstractReaderPlugin { this.cdLatch.countDown(); } - final boolean deliverIndirect(final String outputPortName, final Object data) { // NOPMD (package visible for inner class) - return super.deliver(outputPortName, data); - } - /** * {@inheritDoc} */ public void terminate(final boolean error) { LOG.info("Shutdown of RabbitMQReader requested."); + registryConsumer.interrupt(); this.unblock(); } @@ -384,11 +360,36 @@ public final class RabbitMQReader extends AbstractReaderPlugin { public Configuration getCurrentConfiguration() { final Configuration configuration = new Configuration(); - configuration.setProperty(CONFIG_PROPERTY_NAME_PROVIDERURL, this.providerUrl); + configuration.setProperty(CONFIG_PROPERTY_NAME_PROVIDERURL, + this.providerUrl); configuration.setProperty(CONFIG_PROPERTY_NAME_QUEUE, this.queueName); - configuration.setProperty(CONFIG_PROPERTY_PASSWORD, this.password); configuration.setProperty(CONFIG_PROPERTY_USER, this.username); + configuration.setProperty(CONFIG_PROPERTY_PASSWORD, this.password); return configuration; } + + public void addToRegistry(Integer key, String value) { + stringRegistry.put(key, value); + + synchronized (this) { + this.notifyAll(); + } + } + + private String getStringFromRegistry(Integer id) { + String result = stringRegistry.get(id); + while (result == null) { + try { + synchronized (this) { + this.wait(); + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + result = stringRegistry.get(id); + } + + return result; + } } diff --git a/src/kieker/analysis/plugin/reader/mq/RabbitMQRegistryConsumer.java b/src/kieker/analysis/plugin/reader/mq/RabbitMQRegistryConsumer.java index e90c6fceb34cd4b368d8101aa44bd82ecaa60c31..357caa0aa53f6f74ff91911a7ca30056652bf2e9 100644 --- a/src/kieker/analysis/plugin/reader/mq/RabbitMQRegistryConsumer.java +++ b/src/kieker/analysis/plugin/reader/mq/RabbitMQRegistryConsumer.java @@ -1,9 +1,124 @@ package kieker.analysis.plugin.reader.mq; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.ObjectInputStream; + +import kieker.common.logging.Log; +import kieker.common.logging.LogFactory; +import kieker.common.record.misc.RegistryRecord; + +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.ConsumerCancelledException; +import com.rabbitmq.client.QueueingConsumer; +import com.rabbitmq.client.ShutdownSignalException; + public class RabbitMQRegistryConsumer extends Thread { + static final Log LOG = LogFactory.getLog(RabbitMQReader.class); // NOPMD package for inner class + + private final RabbitMQReader parent; + + private Connection connection; + private Channel channel; + private ConnectionFactory factory; + private QueueingConsumer registryConsumer; + + private String providerUrl; + private String queueName; + private String password; + private String username; + private int port; + + public RabbitMQRegistryConsumer(RabbitMQReader parent, String providerUrl, String queueName, String password, String username, int port) { + this.parent = parent; + this.providerUrl = providerUrl; + this.queueName = queueName; + this.password = password; + this.username = username; + this.port = port; + + } + @Override public void run() { - + try { + createConnectionFactory(); + + connect(); + + while (!Thread.interrupted()) { + + if (!this.connection.isOpen() || !this.channel.isOpen()) { + this.reconnect(); + } + + final QueueingConsumer.Delivery delivery = this.registryConsumer.nextDelivery(); + + final Object message = readObjectFromBytes(delivery.getBody()); + + + if (message instanceof RegistryRecord) { + @SuppressWarnings("unchecked") + final RegistryRecord<String> record = (RegistryRecord<String>) message; + parent.addToRegistry(record.getId(), record.getObject()); + } + } + } catch (final IOException ex) { // NOPMD NOCS (IllegalCatchCheck) + LOG.error("Error in read()", ex); + } catch (final ClassNotFoundException e) { + LOG.error("Error in read(): ClassNotFound Exception", e); + } catch (final ShutdownSignalException e) { + LOG.error("Error in read(): ShutdownSignal Exception", e); + } catch (final ConsumerCancelledException e) { + LOG.error("Error in read(): ConsumerCancelled Exception", e); + } catch (final InterruptedException e) { + LOG.error("Error in read(): Interrupted Exception", e); + } finally { + try { + if (this.connection != null) { + this.connection.close(); + } + } catch (final IOException e) { + LOG.error("Error in read()", e); + } + } + } + + private void createConnectionFactory() { + this.factory = new ConnectionFactory(); + this.factory.setHost(this.providerUrl); + this.factory.setPort(this.port); + this.factory.setConnectionTimeout(0); + this.factory.setUsername(this.username); + this.factory.setPassword(this.password); + } + + private void connect() throws IOException { + this.connection = this.factory.newConnection(); + this.channel = this.connection.createChannel(); + + this.channel.queueDeclare(this.queueName, false, false, false, null); + + this.registryConsumer = new QueueingConsumer(this.channel); + this.channel.basicConsume(this.queueName, true, this.registryConsumer); + } + + private void reconnect() { + try { + connect(); + } catch (final IOException e) { + RabbitMQReader.LOG.error("Error reestablishing connection", e); + } + } + + private Object readObjectFromBytes(final byte[] bs) + throws IOException, ClassNotFoundException { + final ByteArrayInputStream bain = new ByteArrayInputStream(bs); + final ObjectInputStream in = new ObjectInputStream(bain); + final Object message = in.readObject(); + return message; } }