diff --git a/src/main/java/kieker/analysis/plugin/reader/amqp/AMQPReader.java b/src/main/java/kieker/analysis/plugin/reader/amqp/AMQPReader.java index 61251cb622d1bdc7a790e6a4b5d46cf73c014203..625d87e506cca8e1bf01c099edc958a442ee2b56 100644 --- a/src/main/java/kieker/analysis/plugin/reader/amqp/AMQPReader.java +++ b/src/main/java/kieker/analysis/plugin/reader/amqp/AMQPReader.java @@ -16,211 +16,38 @@ package kieker.analysis.plugin.reader.amqp; -import java.io.IOException; -import java.net.URISyntaxException; -import java.nio.ByteBuffer; -import java.security.KeyManagementException; -import java.security.NoSuchAlgorithmException; -import java.util.concurrent.TimeoutException; - -import com.rabbitmq.client.Channel; -import com.rabbitmq.client.Connection; -import com.rabbitmq.client.ConnectionFactory; -import com.rabbitmq.client.QueueingConsumer; - -import kieker.analysis.IProjectContext; -import kieker.analysis.plugin.annotation.OutputPort; -import kieker.analysis.plugin.annotation.Plugin; -import kieker.analysis.plugin.annotation.Property; -import kieker.analysis.plugin.reader.AbstractReaderPlugin; -import kieker.common.configuration.Configuration; +import kieker.common.logging.Log; import kieker.common.record.IMonitoringRecord; -import kieker.common.util.registry.ILookup; -import kieker.common.util.registry.Lookup; + +import teetime.framework.AbstractProducerStage; /** - * Reader plugin that reads monitoring records from an AMQP queue. + * Reader stage that reads monitoring records from an AMQP queue. * - * @author Holger Knoche + * @author Lars Erik Bluemke * - * @since 1.12 */ -@Plugin(description = "A plugin that reads monitoring records from an AMQP queue", outputPorts = { - @OutputPort(name = AMQPReader.OUTPUT_PORT_NAME_RECORDS, eventTypes = { - IMonitoringRecord.class }, description = "Output port of the AMQP reader") }, configuration = { - @Property(name = AMQPReader.CONFIG_PROPERTY_URI, defaultValue = "amqp://localhost", description = "Server URI of the AMQP server"), - @Property(name = AMQPReader.CONFIG_PROPERTY_QUEUENAME, defaultValue = "kieker", description = "AMQP queue name"), - @Property(name = AMQPReader.CONFIG_PROPERTY_HEARTBEAT, defaultValue = "60", description = "Heartbeat interval") - }) -public final class AMQPReader extends AbstractReaderPlugin { - - /** The name of the output port delivering the received records. */ - public static final String OUTPUT_PORT_NAME_RECORDS = "monitoringRecords"; - - /** The name of the configuration property for the server URI. */ - public static final String CONFIG_PROPERTY_URI = "uri"; - /** The name of the configuration property for the AMQP queue name. */ - public static final String CONFIG_PROPERTY_QUEUENAME = "queueName"; - /** The name of the configuration property for the heartbeat timeout. */ - public static final String CONFIG_PROPERTY_HEARTBEAT = "heartbeat"; - - /** ID for registry records. */ - private static final byte REGISTRY_RECORD_ID = (byte) 0xFF; - - /** ID for regular records. */ - private static final byte REGULAR_RECORD_ID = (byte) 0x01; - - private final String uri; - private final String queueName; - private final int heartbeat; - - private volatile Connection connection; - private volatile Channel channel; - private volatile QueueingConsumer consumer; - - private final ILookup<String> stringRegistry = new Lookup<String>(); - - private volatile Thread registryRecordHandlerThread; - private volatile RegistryRecordHandler registryRecordHandler; - private volatile RegularRecordHandler regularRecordHandler; +public class AMQPReader extends AbstractProducerStage<IMonitoringRecord> { - private volatile Thread regularRecordHandlerThread; - - private volatile boolean terminated; - private volatile boolean threadsStarted; + private final AMQPReaderLogicModule logicModule; /** - * Creates a new AMQP reader with the given configuration in the given context. + * Creates a new AMQP reader. * - * @param configuration - * The configuration for this reader - * @param projectContext - * The project context for this component + * @param uri + * The name of the configuration property for the server URI. + * @param queueName + * The name of the configuration property for the AMQP queue name. + * @param heartbeat + * The name of the configuration property for the heartbeat timeout. */ - public AMQPReader(final Configuration configuration, final IProjectContext projectContext) { - super(configuration, projectContext); - - this.uri = this.configuration.getStringProperty(CONFIG_PROPERTY_URI); - this.queueName = this.configuration.getStringProperty(CONFIG_PROPERTY_QUEUENAME); - this.heartbeat = this.configuration.getIntProperty(CONFIG_PROPERTY_HEARTBEAT); - } - - @Override - public boolean init() { - try { - this.connection = this.createConnection(); - this.channel = this.connection.createChannel(); - this.consumer = new QueueingConsumer(this.channel); - - // Set up record handlers - this.registryRecordHandler = new RegistryRecordHandler(this.stringRegistry); - this.regularRecordHandler = new RegularRecordHandler(this, this.stringRegistry); - - // Set up threads - this.registryRecordHandlerThread = new Thread(this.registryRecordHandler); - this.registryRecordHandlerThread.setDaemon(true); - - this.regularRecordHandlerThread = new Thread(this.regularRecordHandler); - this.regularRecordHandlerThread.setDaemon(true); - } catch (final KeyManagementException e) { - this.handleInitializationError(e); - return false; - } catch (final NoSuchAlgorithmException e) { - this.handleInitializationError(e); - return false; - } catch (final IOException e) { - this.handleInitializationError(e); - return false; - } catch (final TimeoutException e) { - this.handleInitializationError(e); - return false; - } catch (final URISyntaxException e) { - this.handleInitializationError(e); - return false; - } - - return super.init(); - } - - private void handleInitializationError(final Throwable e) { - LOG.error("An error occurred initializing the AMQP reader: " + e); - } - - private Connection createConnection() throws IOException, TimeoutException, KeyManagementException, NoSuchAlgorithmException, URISyntaxException { - final ConnectionFactory connectionFactory = new ConnectionFactory(); - - connectionFactory.setUri(this.uri); - connectionFactory.setRequestedHeartbeat(this.heartbeat); - - return connectionFactory.newConnection(); + public AMQPReader(final String uri, final String queueName, final int heartbeat, final Log log) { + this.logicModule = new AMQPReaderLogicModule(uri, queueName, heartbeat, log, this); } @Override - public Configuration getCurrentConfiguration() { - final Configuration configuration = new Configuration(); - - configuration.setProperty(CONFIG_PROPERTY_URI, this.uri); - configuration.setProperty(CONFIG_PROPERTY_QUEUENAME, this.queueName); - configuration.setProperty(CONFIG_PROPERTY_HEARTBEAT, Integer.toString(this.heartbeat)); - - return configuration; + protected void execute() { + this.logicModule.init(); + this.logicModule.read(); } - - @Override - public boolean read() { - // Start the worker threads, if necessary - if (!this.threadsStarted) { - this.registryRecordHandlerThread.start(); - this.regularRecordHandlerThread.start(); - this.threadsStarted = true; - } - - try { - this.channel.basicConsume(this.queueName, true, this.consumer); - - while (!this.terminated) { - final QueueingConsumer.Delivery delivery = this.consumer.nextDelivery(); - final byte[] body = delivery.getBody(); - - final ByteBuffer buffer = ByteBuffer.wrap(body); - final byte recordType = buffer.get(); - - // Dispatch to the appropriate handlers - switch (recordType) { - case REGISTRY_RECORD_ID: - this.registryRecordHandler.enqueueRegistryRecord(buffer); - break; - case REGULAR_RECORD_ID: - this.regularRecordHandler.enqueueRegularRecord(buffer); - break; - default: - this.log.error(String.format("Unknown record type: %02x", recordType)); - break; - } - } - } catch (final IOException e) { - this.log.error("Error while reading from queue " + this.queueName, e); - return false; - } catch (final InterruptedException e) { - this.log.error("Consumer was interrupted on queue " + this.queueName, e); - return false; - } - - return true; - } - - @Override - public void terminate(final boolean error) { - try { - this.terminated = true; - this.connection.close(); - } catch (final IOException e) { - this.log.error("IO error while trying to close the connection.", e); - } - } - - protected void deliverRecord(final IMonitoringRecord monitoringRecord) { - this.deliver(OUTPUT_PORT_NAME_RECORDS, monitoringRecord); - } - } diff --git a/src/main/java/kieker/analysis/plugin/reader/amqp/AMQPReaderLogicModule.java b/src/main/java/kieker/analysis/plugin/reader/amqp/AMQPReaderLogicModule.java new file mode 100644 index 0000000000000000000000000000000000000000..61251cb622d1bdc7a790e6a4b5d46cf73c014203 --- /dev/null +++ b/src/main/java/kieker/analysis/plugin/reader/amqp/AMQPReaderLogicModule.java @@ -0,0 +1,226 @@ +/*************************************************************************** + * Copyright 2015 Kieker Project (http://kieker-monitoring.net) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ***************************************************************************/ + +package kieker.analysis.plugin.reader.amqp; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.ByteBuffer; +import java.security.KeyManagementException; +import java.security.NoSuchAlgorithmException; +import java.util.concurrent.TimeoutException; + +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.QueueingConsumer; + +import kieker.analysis.IProjectContext; +import kieker.analysis.plugin.annotation.OutputPort; +import kieker.analysis.plugin.annotation.Plugin; +import kieker.analysis.plugin.annotation.Property; +import kieker.analysis.plugin.reader.AbstractReaderPlugin; +import kieker.common.configuration.Configuration; +import kieker.common.record.IMonitoringRecord; +import kieker.common.util.registry.ILookup; +import kieker.common.util.registry.Lookup; + +/** + * Reader plugin that reads monitoring records from an AMQP queue. + * + * @author Holger Knoche + * + * @since 1.12 + */ +@Plugin(description = "A plugin that reads monitoring records from an AMQP queue", outputPorts = { + @OutputPort(name = AMQPReader.OUTPUT_PORT_NAME_RECORDS, eventTypes = { + IMonitoringRecord.class }, description = "Output port of the AMQP reader") }, configuration = { + @Property(name = AMQPReader.CONFIG_PROPERTY_URI, defaultValue = "amqp://localhost", description = "Server URI of the AMQP server"), + @Property(name = AMQPReader.CONFIG_PROPERTY_QUEUENAME, defaultValue = "kieker", description = "AMQP queue name"), + @Property(name = AMQPReader.CONFIG_PROPERTY_HEARTBEAT, defaultValue = "60", description = "Heartbeat interval") + }) +public final class AMQPReader extends AbstractReaderPlugin { + + /** The name of the output port delivering the received records. */ + public static final String OUTPUT_PORT_NAME_RECORDS = "monitoringRecords"; + + /** The name of the configuration property for the server URI. */ + public static final String CONFIG_PROPERTY_URI = "uri"; + /** The name of the configuration property for the AMQP queue name. */ + public static final String CONFIG_PROPERTY_QUEUENAME = "queueName"; + /** The name of the configuration property for the heartbeat timeout. */ + public static final String CONFIG_PROPERTY_HEARTBEAT = "heartbeat"; + + /** ID for registry records. */ + private static final byte REGISTRY_RECORD_ID = (byte) 0xFF; + + /** ID for regular records. */ + private static final byte REGULAR_RECORD_ID = (byte) 0x01; + + private final String uri; + private final String queueName; + private final int heartbeat; + + private volatile Connection connection; + private volatile Channel channel; + private volatile QueueingConsumer consumer; + + private final ILookup<String> stringRegistry = new Lookup<String>(); + + private volatile Thread registryRecordHandlerThread; + private volatile RegistryRecordHandler registryRecordHandler; + private volatile RegularRecordHandler regularRecordHandler; + + private volatile Thread regularRecordHandlerThread; + + private volatile boolean terminated; + private volatile boolean threadsStarted; + + /** + * Creates a new AMQP reader with the given configuration in the given context. + * + * @param configuration + * The configuration for this reader + * @param projectContext + * The project context for this component + */ + public AMQPReader(final Configuration configuration, final IProjectContext projectContext) { + super(configuration, projectContext); + + this.uri = this.configuration.getStringProperty(CONFIG_PROPERTY_URI); + this.queueName = this.configuration.getStringProperty(CONFIG_PROPERTY_QUEUENAME); + this.heartbeat = this.configuration.getIntProperty(CONFIG_PROPERTY_HEARTBEAT); + } + + @Override + public boolean init() { + try { + this.connection = this.createConnection(); + this.channel = this.connection.createChannel(); + this.consumer = new QueueingConsumer(this.channel); + + // Set up record handlers + this.registryRecordHandler = new RegistryRecordHandler(this.stringRegistry); + this.regularRecordHandler = new RegularRecordHandler(this, this.stringRegistry); + + // Set up threads + this.registryRecordHandlerThread = new Thread(this.registryRecordHandler); + this.registryRecordHandlerThread.setDaemon(true); + + this.regularRecordHandlerThread = new Thread(this.regularRecordHandler); + this.regularRecordHandlerThread.setDaemon(true); + } catch (final KeyManagementException e) { + this.handleInitializationError(e); + return false; + } catch (final NoSuchAlgorithmException e) { + this.handleInitializationError(e); + return false; + } catch (final IOException e) { + this.handleInitializationError(e); + return false; + } catch (final TimeoutException e) { + this.handleInitializationError(e); + return false; + } catch (final URISyntaxException e) { + this.handleInitializationError(e); + return false; + } + + return super.init(); + } + + private void handleInitializationError(final Throwable e) { + LOG.error("An error occurred initializing the AMQP reader: " + e); + } + + private Connection createConnection() throws IOException, TimeoutException, KeyManagementException, NoSuchAlgorithmException, URISyntaxException { + final ConnectionFactory connectionFactory = new ConnectionFactory(); + + connectionFactory.setUri(this.uri); + connectionFactory.setRequestedHeartbeat(this.heartbeat); + + return connectionFactory.newConnection(); + } + + @Override + public Configuration getCurrentConfiguration() { + final Configuration configuration = new Configuration(); + + configuration.setProperty(CONFIG_PROPERTY_URI, this.uri); + configuration.setProperty(CONFIG_PROPERTY_QUEUENAME, this.queueName); + configuration.setProperty(CONFIG_PROPERTY_HEARTBEAT, Integer.toString(this.heartbeat)); + + return configuration; + } + + @Override + public boolean read() { + // Start the worker threads, if necessary + if (!this.threadsStarted) { + this.registryRecordHandlerThread.start(); + this.regularRecordHandlerThread.start(); + this.threadsStarted = true; + } + + try { + this.channel.basicConsume(this.queueName, true, this.consumer); + + while (!this.terminated) { + final QueueingConsumer.Delivery delivery = this.consumer.nextDelivery(); + final byte[] body = delivery.getBody(); + + final ByteBuffer buffer = ByteBuffer.wrap(body); + final byte recordType = buffer.get(); + + // Dispatch to the appropriate handlers + switch (recordType) { + case REGISTRY_RECORD_ID: + this.registryRecordHandler.enqueueRegistryRecord(buffer); + break; + case REGULAR_RECORD_ID: + this.regularRecordHandler.enqueueRegularRecord(buffer); + break; + default: + this.log.error(String.format("Unknown record type: %02x", recordType)); + break; + } + } + } catch (final IOException e) { + this.log.error("Error while reading from queue " + this.queueName, e); + return false; + } catch (final InterruptedException e) { + this.log.error("Consumer was interrupted on queue " + this.queueName, e); + return false; + } + + return true; + } + + @Override + public void terminate(final boolean error) { + try { + this.terminated = true; + this.connection.close(); + } catch (final IOException e) { + this.log.error("IO error while trying to close the connection.", e); + } + } + + protected void deliverRecord(final IMonitoringRecord monitoringRecord) { + this.deliver(OUTPUT_PORT_NAME_RECORDS, monitoringRecord); + } + +}