diff --git a/src/main/java/kieker/analysis/plugin/reader/amqp/AMQPReaderLogicModule.java b/src/main/java/kieker/analysis/plugin/reader/amqp/AMQPReaderLogicModule.java index 61251cb622d1bdc7a790e6a4b5d46cf73c014203..f5f9de2117767c179e18ca57d09f3c81cd882f7b 100644 --- a/src/main/java/kieker/analysis/plugin/reader/amqp/AMQPReaderLogicModule.java +++ b/src/main/java/kieker/analysis/plugin/reader/amqp/AMQPReaderLogicModule.java @@ -28,42 +28,23 @@ import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; -import kieker.analysis.IProjectContext; -import kieker.analysis.plugin.annotation.OutputPort; -import kieker.analysis.plugin.annotation.Plugin; -import kieker.analysis.plugin.annotation.Property; -import kieker.analysis.plugin.reader.AbstractReaderPlugin; -import kieker.common.configuration.Configuration; +import kieker.common.logging.Log; import kieker.common.record.IMonitoringRecord; import kieker.common.util.registry.ILookup; import kieker.common.util.registry.Lookup; /** - * Reader plugin that reads monitoring records from an AMQP queue. + * Logic module for the reader stage that reads monitoring records from an AMQP queue. * - * @author Holger Knoche + * @author Holger Knoche, Lars Erik Bluemke * * @since 1.12 */ -@Plugin(description = "A plugin that reads monitoring records from an AMQP queue", outputPorts = { - @OutputPort(name = AMQPReader.OUTPUT_PORT_NAME_RECORDS, eventTypes = { - IMonitoringRecord.class }, description = "Output port of the AMQP reader") }, configuration = { - @Property(name = AMQPReader.CONFIG_PROPERTY_URI, defaultValue = "amqp://localhost", description = "Server URI of the AMQP server"), - @Property(name = AMQPReader.CONFIG_PROPERTY_QUEUENAME, defaultValue = "kieker", description = "AMQP queue name"), - @Property(name = AMQPReader.CONFIG_PROPERTY_HEARTBEAT, defaultValue = "60", description = "Heartbeat interval") - }) -public final class AMQPReader extends AbstractReaderPlugin { +public final class AMQPReaderLogicModule { /** The name of the output port delivering the received records. */ public static final String OUTPUT_PORT_NAME_RECORDS = "monitoringRecords"; - /** The name of the configuration property for the server URI. */ - public static final String CONFIG_PROPERTY_URI = "uri"; - /** The name of the configuration property for the AMQP queue name. */ - public static final String CONFIG_PROPERTY_QUEUENAME = "queueName"; - /** The name of the configuration property for the heartbeat timeout. */ - public static final String CONFIG_PROPERTY_HEARTBEAT = "heartbeat"; - /** ID for registry records. */ private static final byte REGISTRY_RECORD_ID = (byte) 0xFF; @@ -89,24 +70,32 @@ public final class AMQPReader extends AbstractReaderPlugin { private volatile boolean terminated; private volatile boolean threadsStarted; + private final Log log; + private final AMQPReader producerStage; + /** - * Creates a new AMQP reader with the given configuration in the given context. + * Creates a new logic module for an AMQP reader. * - * @param configuration - * The configuration for this reader - * @param projectContext - * The project context for this component + * @param uri + * The name of the configuration property for the server URI. + * @param queueName + * The name of the configuration property for the AMQP queue name. + * @param heartbeat + * The name of the configuration property for the heartbeat timeout. + * @param log + * Kieker log. + * @param producerStage + * The actual teetime stage which uses this class. */ - public AMQPReader(final Configuration configuration, final IProjectContext projectContext) { - super(configuration, projectContext); - - this.uri = this.configuration.getStringProperty(CONFIG_PROPERTY_URI); - this.queueName = this.configuration.getStringProperty(CONFIG_PROPERTY_QUEUENAME); - this.heartbeat = this.configuration.getIntProperty(CONFIG_PROPERTY_HEARTBEAT); + public AMQPReaderLogicModule(final String uri, final String queueName, final int heartbeat, final Log log, final AMQPReader producerStage) { + this.uri = uri; + this.queueName = queueName; + this.heartbeat = heartbeat; + this.log = log; + this.producerStage = producerStage; } - @Override - public boolean init() { + public void init() { try { this.connection = this.createConnection(); this.channel = this.connection.createChannel(); @@ -124,26 +113,20 @@ public final class AMQPReader extends AbstractReaderPlugin { this.regularRecordHandlerThread.setDaemon(true); } catch (final KeyManagementException e) { this.handleInitializationError(e); - return false; } catch (final NoSuchAlgorithmException e) { this.handleInitializationError(e); - return false; } catch (final IOException e) { this.handleInitializationError(e); - return false; } catch (final TimeoutException e) { this.handleInitializationError(e); - return false; } catch (final URISyntaxException e) { this.handleInitializationError(e); - return false; } - return super.init(); } private void handleInitializationError(final Throwable e) { - LOG.error("An error occurred initializing the AMQP reader: " + e); + log.error("An error occurred initializing the AMQP reader: " + e); } private Connection createConnection() throws IOException, TimeoutException, KeyManagementException, NoSuchAlgorithmException, URISyntaxException { @@ -155,18 +138,6 @@ public final class AMQPReader extends AbstractReaderPlugin { return connectionFactory.newConnection(); } - @Override - public Configuration getCurrentConfiguration() { - final Configuration configuration = new Configuration(); - - configuration.setProperty(CONFIG_PROPERTY_URI, this.uri); - configuration.setProperty(CONFIG_PROPERTY_QUEUENAME, this.queueName); - configuration.setProperty(CONFIG_PROPERTY_HEARTBEAT, Integer.toString(this.heartbeat)); - - return configuration; - } - - @Override public boolean read() { // Start the worker threads, if necessary if (!this.threadsStarted) { @@ -209,7 +180,6 @@ public final class AMQPReader extends AbstractReaderPlugin { return true; } - @Override public void terminate(final boolean error) { try { this.terminated = true; @@ -220,7 +190,7 @@ public final class AMQPReader extends AbstractReaderPlugin { } protected void deliverRecord(final IMonitoringRecord monitoringRecord) { - this.deliver(OUTPUT_PORT_NAME_RECORDS, monitoringRecord); + this.producerStage.getOutputPort().send(monitoringRecord); } } diff --git a/src/main/java/kieker/analysis/plugin/reader/amqp/RegularRecordHandler.java b/src/main/java/kieker/analysis/plugin/reader/amqp/RegularRecordHandler.java index 683f9290aacf9dc8ba1c3ef22947987cd4a05f76..98c17fbcc7dc5c439868c240ec781e92c22a7908 100644 --- a/src/main/java/kieker/analysis/plugin/reader/amqp/RegularRecordHandler.java +++ b/src/main/java/kieker/analysis/plugin/reader/amqp/RegularRecordHandler.java @@ -44,7 +44,7 @@ public class RegularRecordHandler implements Runnable { private final ILookup<String> stringRegistry; private final CachedRecordFactoryCatalog cachedRecordFactoryCatalog = CachedRecordFactoryCatalog.getInstance(); - private final AMQPReader reader; + private final AMQPReaderLogicModule reader; private final BlockingQueue<ByteBuffer> queue = new ArrayBlockingQueue<ByteBuffer>(DEFAULT_QUEUE_SIZE); @@ -56,7 +56,7 @@ public class RegularRecordHandler implements Runnable { * @param stringRegistry * The string registry to use */ - public RegularRecordHandler(final AMQPReader reader, final ILookup<String> stringRegistry) { + public RegularRecordHandler(final AMQPReaderLogicModule reader, final ILookup<String> stringRegistry) { this.reader = reader; this.stringRegistry = stringRegistry; }