diff --git a/src/main/java/kieker/analysis/plugin/reader/jms/JMSReaderStage.java b/src/main/java/kieker/analysis/plugin/reader/AbstractReader.java similarity index 57% rename from src/main/java/kieker/analysis/plugin/reader/jms/JMSReaderStage.java rename to src/main/java/kieker/analysis/plugin/reader/AbstractReader.java index dcdec0ef5dacac43ef522d6cf6b21e95187204fd..71d7687827e5fe13ccef49da6d477bb8059e46ef 100644 --- a/src/main/java/kieker/analysis/plugin/reader/jms/JMSReaderStage.java +++ b/src/main/java/kieker/analysis/plugin/reader/AbstractReader.java @@ -14,40 +14,38 @@ * limitations under the License. ***************************************************************************/ -package kieker.analysis.plugin.reader.jms; +package kieker.analysis.plugin.reader; -import kieker.common.logging.Log; +import kieker.analysis.plugin.annotation.Plugin; import kieker.common.record.IMonitoringRecord; import teetime.framework.AbstractProducerStage; /** - * Reads monitoring records from a (remote or local) JMS queue by using the - * read-method from JMSReaderLogicModule. JMSReaderLogicModule also delivers - * read records to the output port. + * This abstract class defines the basic structure of each reader. Each reader has its reader logic separated + * from the actual TeeTime stage. The reader logic is mainly based on the original Kieker stages. * * @author Lars Erik Bluemke */ -public class JMSReaderStage extends AbstractProducerStage<IMonitoringRecord> { +@Plugin +public abstract class AbstractReader extends AbstractProducerStage<IMonitoringRecord> { - private final JMSReader jmsReader; - - public JMSReaderStage(final String jmsProviderUrl, final String jmsDestination, final String jmsFactoryLookupName, final Log log) { - this.jmsReader = new JMSReader(jmsProviderUrl, jmsDestination, jmsFactoryLookupName, log, this); - } + protected IReaderLogic readerLogic; @Override protected void execute() { - jmsReader.read(); - this.terminateStage(); + readerLogic.read(); } - protected void deliverRecord(final IMonitoringRecord monitoringRecord) { + /** Called from reader logic to send the read records to the output port */ + public void deliverRecord(final IMonitoringRecord monitoringRecord) { this.getOutputPort().send(monitoringRecord); } - /** Terminates the JMSReader by returning from read method. */ + /** Terminates the reader logic by returning from read method. */ public void terminate(final boolean error) { - this.jmsReader.terminate(error); + readerLogic.terminate(error); + this.terminateStage(); } + } diff --git a/src/main/java/kieker/analysis/plugin/reader/IReaderLogic.java b/src/main/java/kieker/analysis/plugin/reader/IReaderLogic.java new file mode 100644 index 0000000000000000000000000000000000000000..dd50ec2ff554df0a4b410aa8efaadd6935cc9f95 --- /dev/null +++ b/src/main/java/kieker/analysis/plugin/reader/IReaderLogic.java @@ -0,0 +1,56 @@ +/*************************************************************************** + * Copyright 2015 Kieker Project (http://kieker-monitoring.net) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ***************************************************************************/ + +package kieker.analysis.plugin.reader; + +/** + * This is the interface for reader plugins. + * + * @author Andre van Hoorn + * + * @since 0.95a + */ +public interface IReaderLogic { + + /** + * Starts the reader. This method is intended to be a blocking operation, + * i.e., it is assumed that reading has finished before this method returns. + * The method should indicate an error by the return value false. + * + * In asynchronous scenarios, the {@link kieker.analysis.plugin.IPlugin#terminate(boolean)} method can be used + * to initiate the termination of this method. + * + * @return true if reading was successful; false if an error occurred + * + * @since 1.2 + */ + public boolean read(); + + /** + * Initiates a termination of the plugin. This method is only used by the + * framework and should not be called manually. + * Use the method {@link kieker.analysis.AnalysisController#terminate(boolean)} instead. + * + * After receiving this notification, the plugin should terminate any running + * methods, e.g., read for readers. + * + * @param error + * Determines whether the plugin is terminated due to an error or not. + * + * @since 1.6 + */ + public void terminate(final boolean error); +} diff --git a/src/main/java/kieker/analysis/plugin/reader/amqp/AMQPReader.java b/src/main/java/kieker/analysis/plugin/reader/amqp/AMQPReader.java index 92ae48fb07f711da4409dd4414b65e6faa237972..59575fde545f0261b947df1609a43092ddd679b8 100644 --- a/src/main/java/kieker/analysis/plugin/reader/amqp/AMQPReader.java +++ b/src/main/java/kieker/analysis/plugin/reader/amqp/AMQPReader.java @@ -16,66 +16,19 @@ package kieker.analysis.plugin.reader.amqp; -import java.io.IOException; -import java.net.URISyntaxException; -import java.nio.ByteBuffer; -import java.security.KeyManagementException; -import java.security.NoSuchAlgorithmException; -import java.util.concurrent.TimeoutException; - -import com.rabbitmq.client.Channel; -import com.rabbitmq.client.Connection; -import com.rabbitmq.client.ConnectionFactory; -import com.rabbitmq.client.QueueingConsumer; -import com.rabbitmq.client.ShutdownSignalException; - +import kieker.analysis.plugin.reader.AbstractReader; import kieker.common.logging.Log; -import kieker.common.record.IMonitoringRecord; -import kieker.common.util.registry.ILookup; -import kieker.common.util.registry.Lookup; /** - * Logic module for the reader stage that reads monitoring records from an AMQP queue. + * Reader stage that reads monitoring records from an AMQP queue. * - * @author Holger Knoche, Lars Erik Bluemke + * @author Lars Erik Bluemke * - * @since 1.12 */ -public final class AMQPReader { - - /** The name of the output port delivering the received records. */ - public static final String OUTPUT_PORT_NAME_RECORDS = "monitoringRecords"; - - /** ID for registry records. */ - private static final byte REGISTRY_RECORD_ID = (byte) 0xFF; - - /** ID for regular records. */ - private static final byte REGULAR_RECORD_ID = (byte) 0x01; - - private final String uri; - private final String queueName; - private final int heartbeat; - - private volatile Connection connection; - private volatile Channel channel; - private volatile QueueingConsumer consumer; - - private final ILookup<String> stringRegistry = new Lookup<String>(); - - private volatile Thread registryRecordHandlerThread; - private volatile RegistryRecordHandler registryRecordHandler; - private volatile RegularRecordHandler regularRecordHandler; - - private volatile Thread regularRecordHandlerThread; - - private volatile boolean terminated; - private volatile boolean threadsStarted; - - private final Log log; - private final AMQPReaderStage amqpReaderStage; +public class AMQPReader extends AbstractReader { /** - * Creates a new logic module for an AMQP reader. + * Creates a new AMQP reader. * * @param uri * The name of the configuration property for the server URI. @@ -83,118 +36,9 @@ public final class AMQPReader { * The name of the configuration property for the AMQP queue name. * @param heartbeat * The name of the configuration property for the heartbeat timeout. - * @param log - * Kieker log. - * @param amqpReaderStage - * The actual teetime stage which uses this class. */ - public AMQPReader(final String uri, final String queueName, final int heartbeat, final Log log, final AMQPReaderStage amqpReaderStage) { - this.uri = uri; - this.queueName = queueName; - this.heartbeat = heartbeat; - this.log = log; - this.amqpReaderStage = amqpReaderStage; - } - - public void init() { - try { - this.connection = this.createConnection(); - this.channel = this.connection.createChannel(); - this.consumer = new QueueingConsumer(this.channel); - - // Set up record handlers - this.registryRecordHandler = new RegistryRecordHandler(this.stringRegistry); - this.regularRecordHandler = new RegularRecordHandler(this, this.stringRegistry); - - // Set up threads - this.registryRecordHandlerThread = new Thread(this.registryRecordHandler); - this.registryRecordHandlerThread.setDaemon(true); - - this.regularRecordHandlerThread = new Thread(this.regularRecordHandler); - this.regularRecordHandlerThread.setDaemon(true); - } catch (final KeyManagementException e) { - this.handleInitializationError(e); - } catch (final NoSuchAlgorithmException e) { - this.handleInitializationError(e); - } catch (final IOException e) { - this.handleInitializationError(e); - } catch (final TimeoutException e) { - this.handleInitializationError(e); - } catch (final URISyntaxException e) { - this.handleInitializationError(e); - } - - } - - private void handleInitializationError(final Throwable e) { - log.error("An error occurred initializing the AMQP reader: " + e); - } - - private Connection createConnection() throws IOException, TimeoutException, KeyManagementException, NoSuchAlgorithmException, URISyntaxException { - final ConnectionFactory connectionFactory = new ConnectionFactory(); - - connectionFactory.setUri(this.uri); - connectionFactory.setRequestedHeartbeat(this.heartbeat); - - return connectionFactory.newConnection(); - } - - public boolean read() { - // Start the worker threads, if necessary - if (!this.threadsStarted) { - this.registryRecordHandlerThread.start(); - this.regularRecordHandlerThread.start(); - this.threadsStarted = true; - } - - try { - this.channel.basicConsume(this.queueName, true, this.consumer); - - while (!this.terminated) { - final QueueingConsumer.Delivery delivery = this.consumer.nextDelivery(); - final byte[] body = delivery.getBody(); - - final ByteBuffer buffer = ByteBuffer.wrap(body); - final byte recordType = buffer.get(); - - // Dispatch to the appropriate handlers - switch (recordType) { - case REGISTRY_RECORD_ID: - this.registryRecordHandler.enqueueRegistryRecord(buffer); - break; - case REGULAR_RECORD_ID: - this.regularRecordHandler.enqueueRegularRecord(buffer); - break; - default: - this.log.error(String.format("Unknown record type: %02x", recordType)); - break; - } - } - } catch (final IOException e) { - this.log.error("Error while reading from queue " + this.queueName, e); - return false; - } catch (final InterruptedException e) { - this.log.error("Consumer was interrupted on queue " + this.queueName, e); - return false; - } catch (final ShutdownSignalException e) { - this.log.info("Consumer was shut down while waiting on queue " + this.queueName, e); - return true; - } - - return true; - } - - public void terminate(final boolean error) { - try { - this.terminated = true; - this.connection.close(); - } catch (final IOException e) { - this.log.error("IO error while trying to close the connection.", e); - } - } - - protected void deliverRecord(final IMonitoringRecord monitoringRecord) { - this.amqpReaderStage.deliverRecord(monitoringRecord); + public AMQPReader(final String uri, final String queueName, final int heartbeat, final Log log) { + this.readerLogic = new AMQPReaderLogic(uri, queueName, heartbeat, log, this); } } diff --git a/src/main/java/kieker/analysis/plugin/reader/amqp/AMQPReaderLogic.java b/src/main/java/kieker/analysis/plugin/reader/amqp/AMQPReaderLogic.java new file mode 100644 index 0000000000000000000000000000000000000000..d2c5f223863124db441ffb5991eb946490ffa7ca --- /dev/null +++ b/src/main/java/kieker/analysis/plugin/reader/amqp/AMQPReaderLogic.java @@ -0,0 +1,204 @@ +/*************************************************************************** + * Copyright 2015 Kieker Project (http://kieker-monitoring.net) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ***************************************************************************/ + +package kieker.analysis.plugin.reader.amqp; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.ByteBuffer; +import java.security.KeyManagementException; +import java.security.NoSuchAlgorithmException; +import java.util.concurrent.TimeoutException; + +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.QueueingConsumer; +import com.rabbitmq.client.ShutdownSignalException; + +import kieker.analysis.plugin.reader.IReaderLogic; +import kieker.common.logging.Log; +import kieker.common.record.IMonitoringRecord; +import kieker.common.util.registry.ILookup; +import kieker.common.util.registry.Lookup; + +/** + * Logic module for the reader stage that reads monitoring records from an AMQP queue. + * + * @author Holger Knoche, Lars Erik Bluemke + * + * @since 1.12 + */ +public final class AMQPReaderLogic implements IReaderLogic { + + /** The name of the output port delivering the received records. */ + public static final String OUTPUT_PORT_NAME_RECORDS = "monitoringRecords"; + + /** ID for registry records. */ + private static final byte REGISTRY_RECORD_ID = (byte) 0xFF; + + /** ID for regular records. */ + private static final byte REGULAR_RECORD_ID = (byte) 0x01; + + private final String uri; + private final String queueName; + private final int heartbeat; + + private volatile Connection connection; + private volatile Channel channel; + private volatile QueueingConsumer consumer; + + private final ILookup<String> stringRegistry = new Lookup<String>(); + + private volatile Thread registryRecordHandlerThread; + private volatile RegistryRecordHandler registryRecordHandler; + private volatile RegularRecordHandler regularRecordHandler; + + private volatile Thread regularRecordHandlerThread; + + private volatile boolean terminated; + private volatile boolean threadsStarted; + + private final Log log; + private final AMQPReader amqpReaderStage; + + /** + * Creates a new logic module for an AMQP reader. + * + * @param uri + * The name of the configuration property for the server URI. + * @param queueName + * The name of the configuration property for the AMQP queue name. + * @param heartbeat + * The name of the configuration property for the heartbeat timeout. + * @param log + * Kieker log. + * @param amqpReaderStage + * The actual teetime stage which uses this class. + */ + public AMQPReaderLogic(final String uri, final String queueName, final int heartbeat, final Log log, final AMQPReader amqpReaderStage) { + this.uri = uri; + this.queueName = queueName; + this.heartbeat = heartbeat; + this.log = log; + this.amqpReaderStage = amqpReaderStage; + this.init(); + } + + public void init() { + try { + this.connection = this.createConnection(); + this.channel = this.connection.createChannel(); + this.consumer = new QueueingConsumer(this.channel); + + // Set up record handlers + this.registryRecordHandler = new RegistryRecordHandler(this.stringRegistry); + this.regularRecordHandler = new RegularRecordHandler(this, this.stringRegistry); + + // Set up threads + this.registryRecordHandlerThread = new Thread(this.registryRecordHandler); + this.registryRecordHandlerThread.setDaemon(true); + + this.regularRecordHandlerThread = new Thread(this.regularRecordHandler); + this.regularRecordHandlerThread.setDaemon(true); + } catch (final KeyManagementException e) { + this.handleInitializationError(e); + } catch (final NoSuchAlgorithmException e) { + this.handleInitializationError(e); + } catch (final IOException e) { + this.handleInitializationError(e); + } catch (final TimeoutException e) { + this.handleInitializationError(e); + } catch (final URISyntaxException e) { + this.handleInitializationError(e); + } + + } + + private void handleInitializationError(final Throwable e) { + log.error("An error occurred initializing the AMQP reader: " + e); + } + + private Connection createConnection() throws IOException, TimeoutException, KeyManagementException, NoSuchAlgorithmException, URISyntaxException { + final ConnectionFactory connectionFactory = new ConnectionFactory(); + + connectionFactory.setUri(this.uri); + connectionFactory.setRequestedHeartbeat(this.heartbeat); + + return connectionFactory.newConnection(); + } + + @Override + public boolean read() { + // Start the worker threads, if necessary + if (!this.threadsStarted) { + this.registryRecordHandlerThread.start(); + this.regularRecordHandlerThread.start(); + this.threadsStarted = true; + } + + try { + this.channel.basicConsume(this.queueName, true, this.consumer); + + while (!this.terminated) { + final QueueingConsumer.Delivery delivery = this.consumer.nextDelivery(); + final byte[] body = delivery.getBody(); + + final ByteBuffer buffer = ByteBuffer.wrap(body); + final byte recordType = buffer.get(); + + // Dispatch to the appropriate handlers + switch (recordType) { + case REGISTRY_RECORD_ID: + this.registryRecordHandler.enqueueRegistryRecord(buffer); + break; + case REGULAR_RECORD_ID: + this.regularRecordHandler.enqueueRegularRecord(buffer); + break; + default: + this.log.error(String.format("Unknown record type: %02x", recordType)); + break; + } + } + } catch (final IOException e) { + this.log.error("Error while reading from queue " + this.queueName, e); + return false; + } catch (final InterruptedException e) { + this.log.error("Consumer was interrupted on queue " + this.queueName, e); + return false; + } catch (final ShutdownSignalException e) { + this.log.info("Consumer was shut down while waiting on queue " + this.queueName, e); + return true; + } + + return true; + } + + @Override + public void terminate(final boolean error) { + try { + this.terminated = true; + this.connection.close(); + } catch (final IOException e) { + this.log.error("IO error while trying to close the connection.", e); + } + } + + protected void deliverRecord(final IMonitoringRecord monitoringRecord) { + this.amqpReaderStage.deliverRecord(monitoringRecord); + } + +} diff --git a/src/main/java/kieker/analysis/plugin/reader/amqp/AMQPReaderStage.java b/src/main/java/kieker/analysis/plugin/reader/amqp/AMQPReaderStage.java deleted file mode 100644 index 05c818e68c29f3dc7732d6963f03f4184f5f0712..0000000000000000000000000000000000000000 --- a/src/main/java/kieker/analysis/plugin/reader/amqp/AMQPReaderStage.java +++ /dev/null @@ -1,63 +0,0 @@ -/*************************************************************************** - * Copyright 2015 Kieker Project (http://kieker-monitoring.net) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - ***************************************************************************/ - -package kieker.analysis.plugin.reader.amqp; - -import kieker.common.logging.Log; -import kieker.common.record.IMonitoringRecord; - -import teetime.framework.AbstractProducerStage; - -/** - * Reader stage that reads monitoring records from an AMQP queue. - * - * @author Lars Erik Bluemke - * - */ -public class AMQPReaderStage extends AbstractProducerStage<IMonitoringRecord> { - - private final AMQPReader amqpReader; - - /** - * Creates a new AMQP reader. - * - * @param uri - * The name of the configuration property for the server URI. - * @param queueName - * The name of the configuration property for the AMQP queue name. - * @param heartbeat - * The name of the configuration property for the heartbeat timeout. - */ - public AMQPReaderStage(final String uri, final String queueName, final int heartbeat, final Log log) { - this.amqpReader = new AMQPReader(uri, queueName, heartbeat, log, this); - } - - @Override - protected void execute() { - this.amqpReader.init(); - this.amqpReader.read(); - this.terminateStage(); - } - - protected void deliverRecord(final IMonitoringRecord monitoringRecord) { - this.getOutputPort().send(monitoringRecord); - } - - /** Terminates the AMQPReader by returning from read method. */ - public void terminate(final boolean error) { - this.amqpReader.terminate(error); - } -} diff --git a/src/main/java/kieker/analysis/plugin/reader/amqp/RegularRecordHandler.java b/src/main/java/kieker/analysis/plugin/reader/amqp/RegularRecordHandler.java index 683f9290aacf9dc8ba1c3ef22947987cd4a05f76..6a797839672b37f1479b07e5149e853f54ab5f04 100644 --- a/src/main/java/kieker/analysis/plugin/reader/amqp/RegularRecordHandler.java +++ b/src/main/java/kieker/analysis/plugin/reader/amqp/RegularRecordHandler.java @@ -44,20 +44,20 @@ public class RegularRecordHandler implements Runnable { private final ILookup<String> stringRegistry; private final CachedRecordFactoryCatalog cachedRecordFactoryCatalog = CachedRecordFactoryCatalog.getInstance(); - private final AMQPReader reader; + private final AMQPReaderLogic readerLogic; private final BlockingQueue<ByteBuffer> queue = new ArrayBlockingQueue<ByteBuffer>(DEFAULT_QUEUE_SIZE); /** * Creates a new regular record handler. * - * @param reader - * The reader to send the instantiated records to + * @param readerLogic + * The reader logic class to send the instantiated records to * @param stringRegistry * The string registry to use */ - public RegularRecordHandler(final AMQPReader reader, final ILookup<String> stringRegistry) { - this.reader = reader; + public RegularRecordHandler(final AMQPReaderLogic readerLogic, final ILookup<String> stringRegistry) { + this.readerLogic = readerLogic; this.stringRegistry = stringRegistry; } @@ -98,7 +98,7 @@ public class RegularRecordHandler implements Runnable { final IMonitoringRecord record = recordFactory.create(buffer, this.stringRegistry); record.setLoggingTimestamp(loggingTimestamp); - this.reader.deliverRecord(record); + this.readerLogic.deliverRecord(record); } catch (final RecordInstantiationException e) { LOG.error("Error instantiating record", e); } diff --git a/src/main/java/kieker/analysis/plugin/reader/jms/JMSReader.java b/src/main/java/kieker/analysis/plugin/reader/jms/JMSReader.java index 8301faf33644fc41aa1251347136c4e4a45dc866..1f84ee9ea7131bd17a7268c23087c1056d7e72cd 100644 --- a/src/main/java/kieker/analysis/plugin/reader/jms/JMSReader.java +++ b/src/main/java/kieker/analysis/plugin/reader/jms/JMSReader.java @@ -16,206 +16,20 @@ package kieker.analysis.plugin.reader.jms; -import java.io.Serializable; -import java.util.Hashtable; -import java.util.concurrent.CountDownLatch; - -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageFormatException; -import javax.jms.MessageListener; -import javax.jms.ObjectMessage; -import javax.jms.Session; -import javax.naming.Context; -import javax.naming.InitialContext; -import javax.naming.NameNotFoundException; - +import kieker.analysis.plugin.reader.AbstractReader; import kieker.common.logging.Log; -import kieker.common.record.IMonitoringRecord; /** - * Reads monitoring records from a (remote or local) JMS queue. - * - * - * @author Andre van Hoorn, Matthias Rohr, Lars Erik Bluemke + * Reads monitoring records from a (remote or local) JMS queue by using the + * read-method from JMSReaderLogicModule. JMSReaderLogicModule also delivers + * read records to the output port. * - * @since 0.95a + * @author Lars Erik Bluemke */ -public final class JMSReader { - - private final String jmsProviderUrl; - private final String jmsDestination; - private final String jmsFactoryLookupName; - private final CountDownLatch cdLatch = new CountDownLatch(1); - - private final Log log; - private final JMSReaderStage jmsReaderStage; - - /** - * Creates a new instance of this class using the given parameters. - * - * @param jmsProviderUrl - * The name of the configuration determining the JMS provider URL, - * e.g. {@code tcp://localhost:3035/} - * @param jmsDestination - * The name of the configuration determining the JMS destination, - * e.g. {@code queue1}. - * @param jmsFactoryLookupName - * The name of the configuration determining the name of the used JMS factory, - * e.g. {@code org.exolab.jms.jndi.InitialContextFactory}. - * @param log - * Kieker log. - * @param jmsReaderStage - * The actual teetime stage which uses this class. - * - * @throws IllegalArgumentException - * If one of the properties is empty. - */ - public JMSReader(final String jmsProviderUrl, final String jmsDestination, final String jmsFactoryLookupName, final Log log, - final JMSReaderStage jmsReaderStage) - throws IllegalArgumentException { - - // Initialize the reader bases - this.jmsProviderUrl = jmsProviderUrl; - this.jmsDestination = jmsDestination; - this.jmsFactoryLookupName = jmsFactoryLookupName; - this.log = log; - this.jmsReaderStage = jmsReaderStage; - - // simple sanity check - if ((this.jmsProviderUrl.length() == 0) || (this.jmsDestination.length() == 0) || (this.jmsFactoryLookupName.length() == 0)) { - throw new IllegalArgumentException("JMSReader has not sufficient parameters. jmsProviderUrl ('" + this.jmsProviderUrl + "'), jmsDestination ('" - + this.jmsDestination + "'), or factoryLookupName ('" + this.jmsFactoryLookupName + "') is null"); - } - } - - /** - * A call to this method is a blocking call. - * - * @return true if the method succeeds, false otherwise. - */ - public boolean read() { - boolean retVal = true; - Connection connection = null; - try { - final Hashtable<String, String> properties = new Hashtable<String, String>(); // NOPMD NOCS (InitialContext expects Hashtable) - properties.put(Context.INITIAL_CONTEXT_FACTORY, this.jmsFactoryLookupName); - - // JMS initialization - properties.put(Context.PROVIDER_URL, this.jmsProviderUrl); - final Context context = new InitialContext(properties); - final ConnectionFactory factory = (ConnectionFactory) context.lookup("ConnectionFactory"); - connection = factory.createConnection(); - final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - Destination destination; - try { - // As a first step, try a JNDI lookup (this seems to fail with ActiveMQ sometimes) - destination = (Destination) context.lookup(this.jmsDestination); - } catch (final NameNotFoundException exc) { - // JNDI lookup failed, try manual creation (this seems to fail with ActiveMQ/HornetQ sometimes) - destination = session.createQueue(this.jmsDestination); - if (destination == null) { // - this.log.error("Failed to lookup queue '" + this.jmsDestination + "' via JNDI: " + exc.getMessage() + " AND failed to create queue"); - throw exc; // will be catched below to abort the read method - } - } - - this.log.info("Listening to destination:" + destination + " at " + this.jmsProviderUrl + " !\n***\n\n"); - final MessageConsumer receiver = session.createConsumer(destination); - receiver.setMessageListener(new JMSMessageListener()); - - // start the connection to enable message delivery - connection.start(); +public class JMSReader extends AbstractReader { - this.log.info("JMSReader started and waits for incoming monitoring events!"); - this.block(); - this.log.info("Woke up by shutdown"); - } catch (final Exception ex) { // NOPMD NOCS (IllegalCatchCheck) - this.log.error("Error in read()", ex); - retVal = false; - } finally { - try { - if (connection != null) { - connection.close(); - } - } catch (final JMSException ex) { - this.log.error("Failed to close JMS", ex); - } - } - return retVal; + public JMSReader(final String jmsProviderUrl, final String jmsDestination, final String jmsFactoryLookupName, final Log log) { + this.readerLogic = new JMSReaderLogic(jmsProviderUrl, jmsDestination, jmsFactoryLookupName, log, this); } - private final void block() { - Runtime.getRuntime().addShutdownHook(new Thread() { - - @Override - public final void run() { - JMSReader.this.unblock(); - } - }); - try { - this.cdLatch.await(); - } catch (final InterruptedException e) { // ignore - } - } - - final void unblock() { // NOPMD (package visible for inner class) - this.cdLatch.countDown(); - } - - final void deliverIndirect(final IMonitoringRecord data) { // NOPMD (package visible for inner class) - jmsReaderStage.deliverRecord(data); - } - - /** - * {@inheritDoc} - */ - public void terminate(final boolean error) { - this.log.info("Shutdown of JMSReader requested."); - this.unblock(); - } - - protected Log getLog() { - return log; - } - - /** - * The MessageListener will read onMessage each time a message comes in. - */ - private final class JMSMessageListener implements MessageListener { - - public JMSMessageListener() { - // empty default constructor - } - - @Override - public void onMessage(final Message jmsMessage) { - if (jmsMessage == null) { - JMSReader.this.getLog().warn("Received null message"); - } else { - if (jmsMessage instanceof ObjectMessage) { - try { - final ObjectMessage om = (ObjectMessage) jmsMessage; - final Serializable omo = om.getObject(); - if ((omo instanceof IMonitoringRecord)) { - JMSReader.this.deliverIndirect((IMonitoringRecord) omo); - } - } catch (final MessageFormatException ex) { - JMSReader.this.getLog().error("Error delivering record", ex); - } catch (final JMSException ex) { - JMSReader.this.getLog().error("Error delivering record", ex); - } catch (final Exception ex) { // NOPMD NOCS (catch Exception) - JMSReader.this.getLog().error("Error delivering record", ex); - } - } else { - JMSReader.this.getLog().warn("Received message of invalid type: " + jmsMessage.getClass().getName()); - } - } - } - } } diff --git a/src/main/java/kieker/analysis/plugin/reader/jms/JMSReaderLogic.java b/src/main/java/kieker/analysis/plugin/reader/jms/JMSReaderLogic.java new file mode 100644 index 0000000000000000000000000000000000000000..741588bd3216db9204e139cee72ed8ec8a6d8950 --- /dev/null +++ b/src/main/java/kieker/analysis/plugin/reader/jms/JMSReaderLogic.java @@ -0,0 +1,224 @@ +/*************************************************************************** + * Copyright 2015 Kieker Project (http://kieker-monitoring.net) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ***************************************************************************/ + +package kieker.analysis.plugin.reader.jms; + +import java.io.Serializable; +import java.util.Hashtable; +import java.util.concurrent.CountDownLatch; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageFormatException; +import javax.jms.MessageListener; +import javax.jms.ObjectMessage; +import javax.jms.Session; +import javax.naming.Context; +import javax.naming.InitialContext; +import javax.naming.NameNotFoundException; + +import kieker.analysis.plugin.reader.IReaderLogic; +import kieker.common.logging.Log; +import kieker.common.record.IMonitoringRecord; + +/** + * Reads monitoring records from a (remote or local) JMS queue. + * + * + * @author Andre van Hoorn, Matthias Rohr, Lars Erik Bluemke + * + * @since 0.95a + */ +public final class JMSReaderLogic implements IReaderLogic { + + private final String jmsProviderUrl; + private final String jmsDestination; + private final String jmsFactoryLookupName; + private final CountDownLatch cdLatch = new CountDownLatch(1); + + private final Log log; + private final JMSReader jmsReaderStage; + + /** + * Creates a new instance of this class using the given parameters. + * + * @param jmsProviderUrl + * The name of the configuration determining the JMS provider URL, + * e.g. {@code tcp://localhost:3035/} + * @param jmsDestination + * The name of the configuration determining the JMS destination, + * e.g. {@code queue1}. + * @param jmsFactoryLookupName + * The name of the configuration determining the name of the used JMS factory, + * e.g. {@code org.exolab.jms.jndi.InitialContextFactory}. + * @param log + * Kieker log. + * @param jmsReaderStage + * The actual teetime stage which uses this class. + * + * @throws IllegalArgumentException + * If one of the properties is empty. + */ + public JMSReaderLogic(final String jmsProviderUrl, final String jmsDestination, final String jmsFactoryLookupName, final Log log, + final JMSReader jmsReaderStage) + throws IllegalArgumentException { + + // Initialize the reader bases + this.jmsProviderUrl = jmsProviderUrl; + this.jmsDestination = jmsDestination; + this.jmsFactoryLookupName = jmsFactoryLookupName; + this.log = log; + this.jmsReaderStage = jmsReaderStage; + + // simple sanity check + if ((this.jmsProviderUrl.length() == 0) || (this.jmsDestination.length() == 0) || (this.jmsFactoryLookupName.length() == 0)) { + throw new IllegalArgumentException("JMSReader has not sufficient parameters. jmsProviderUrl ('" + this.jmsProviderUrl + "'), jmsDestination ('" + + this.jmsDestination + "'), or factoryLookupName ('" + this.jmsFactoryLookupName + "') is null"); + } + } + + /** + * A call to this method is a blocking call. + * + * @return true if the method succeeds, false otherwise. + */ + @Override + public boolean read() { + boolean retVal = true; + Connection connection = null; + try { + final Hashtable<String, String> properties = new Hashtable<String, String>(); // NOPMD NOCS (InitialContext expects Hashtable) + properties.put(Context.INITIAL_CONTEXT_FACTORY, this.jmsFactoryLookupName); + + // JMS initialization + properties.put(Context.PROVIDER_URL, this.jmsProviderUrl); + final Context context = new InitialContext(properties); + final ConnectionFactory factory = (ConnectionFactory) context.lookup("ConnectionFactory"); + connection = factory.createConnection(); + final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Destination destination; + try { + // As a first step, try a JNDI lookup (this seems to fail with ActiveMQ sometimes) + destination = (Destination) context.lookup(this.jmsDestination); + } catch (final NameNotFoundException exc) { + // JNDI lookup failed, try manual creation (this seems to fail with ActiveMQ/HornetQ sometimes) + destination = session.createQueue(this.jmsDestination); + if (destination == null) { // + this.log.error("Failed to lookup queue '" + this.jmsDestination + "' via JNDI: " + exc.getMessage() + " AND failed to create queue"); + throw exc; // will be catched below to abort the read method + } + } + + this.log.info("Listening to destination:" + destination + " at " + this.jmsProviderUrl + " !\n***\n\n"); + final MessageConsumer receiver = session.createConsumer(destination); + receiver.setMessageListener(new JMSMessageListener()); + + // start the connection to enable message delivery + connection.start(); + + this.log.info("JMSReader started and waits for incoming monitoring events!"); + this.block(); + this.log.info("Woke up by shutdown"); + } catch (final Exception ex) { // NOPMD NOCS (IllegalCatchCheck) + this.log.error("Error in read()", ex); + retVal = false; + } finally { + try { + if (connection != null) { + connection.close(); + } + } catch (final JMSException ex) { + this.log.error("Failed to close JMS", ex); + } + } + return retVal; + } + + private final void block() { + Runtime.getRuntime().addShutdownHook(new Thread() { + + @Override + public final void run() { + JMSReaderLogic.this.unblock(); + } + }); + try { + this.cdLatch.await(); + } catch (final InterruptedException e) { // ignore + } + } + + final void unblock() { // NOPMD (package visible for inner class) + this.cdLatch.countDown(); + } + + final void deliverIndirect(final IMonitoringRecord data) { // NOPMD (package visible for inner class) + jmsReaderStage.deliverRecord(data); + } + + /** + * {@inheritDoc} + */ + @Override + public void terminate(final boolean error) { + this.log.info("Shutdown of JMSReader requested."); + this.unblock(); + } + + protected Log getLog() { + return log; + } + + /** + * The MessageListener will read onMessage each time a message comes in. + */ + private final class JMSMessageListener implements MessageListener { + + public JMSMessageListener() { + // empty default constructor + } + + @Override + public void onMessage(final Message jmsMessage) { + if (jmsMessage == null) { + JMSReaderLogic.this.getLog().warn("Received null message"); + } else { + if (jmsMessage instanceof ObjectMessage) { + try { + final ObjectMessage om = (ObjectMessage) jmsMessage; + final Serializable omo = om.getObject(); + if ((omo instanceof IMonitoringRecord)) { + JMSReaderLogic.this.deliverIndirect((IMonitoringRecord) omo); + } + } catch (final MessageFormatException ex) { + JMSReaderLogic.this.getLog().error("Error delivering record", ex); + } catch (final JMSException ex) { + JMSReaderLogic.this.getLog().error("Error delivering record", ex); + } catch (final Exception ex) { // NOPMD NOCS (catch Exception) + JMSReaderLogic.this.getLog().error("Error delivering record", ex); + } + } else { + JMSReaderLogic.this.getLog().warn("Received message of invalid type: " + jmsMessage.getClass().getName()); + } + } + } + } +} diff --git a/src/test/java/kieker/analysis/plugin/reader/amqp/AMQPReaderStageTest.java b/src/test/java/kieker/analysis/plugin/reader/amqp/AMQPReaderTest.java similarity index 98% rename from src/test/java/kieker/analysis/plugin/reader/amqp/AMQPReaderStageTest.java rename to src/test/java/kieker/analysis/plugin/reader/amqp/AMQPReaderTest.java index 91da1d9692339bc7c478a1e89052dd02a219b9c2..833ac3b334038b6f7bde9b26cb489cb12052c685 100644 --- a/src/test/java/kieker/analysis/plugin/reader/amqp/AMQPReaderStageTest.java +++ b/src/test/java/kieker/analysis/plugin/reader/amqp/AMQPReaderTest.java @@ -30,7 +30,7 @@ import kieker.common.util.registry.IMonitoringRecordReceiver; import kieker.common.util.registry.IRegistry; import kieker.common.util.registry.Registry; -public class AMQPReaderStageTest { +public class AMQPReaderTest { private AMQPTestReaderThread amqpReaderThread = null; private AMQPTestWriter amqpWriter = null; @@ -217,12 +217,12 @@ final class AMQPTestReaderThread extends Thread { private static final Log LOG = LogFactory.getLog(AMQPTestReaderThread.class); - private final AMQPReaderStage amqpReaderStage; + private final AMQPReader amqpReaderStage; private final List<IMonitoringRecord> outputList = new LinkedList<>(); public AMQPTestReaderThread(final String uri, final int heartbeat, final String exchangeName, final String queueName) { - amqpReaderStage = new AMQPReaderStage(uri, queueName, heartbeat, LOG); + amqpReaderStage = new AMQPReader(uri, queueName, heartbeat, LOG); } @Override diff --git a/src/test/java/kieker/analysis/plugin/reader/jms/JMSReaderStageTest.java b/src/test/java/kieker/analysis/plugin/reader/jms/JMSReaderTest.java similarity index 96% rename from src/test/java/kieker/analysis/plugin/reader/jms/JMSReaderStageTest.java rename to src/test/java/kieker/analysis/plugin/reader/jms/JMSReaderTest.java index a6cfadccbdaa7c1ac1845b9777a31f1305caa15b..7fa3d00e73019179f236975fb2e7d01e9db87703 100644 --- a/src/test/java/kieker/analysis/plugin/reader/jms/JMSReaderStageTest.java +++ b/src/test/java/kieker/analysis/plugin/reader/jms/JMSReaderTest.java @@ -30,7 +30,7 @@ import kieker.common.logging.LogFactory; import kieker.common.record.IMonitoringRecord; import kieker.common.record.system.CPUUtilizationRecord; -public class JMSReaderStageTest { +public class JMSReaderTest { private CPUUtilizationRecord monitoringRecord; private JMSTestReaderThread jmsReaderThread; @@ -119,7 +119,7 @@ public class JMSReaderStageTest { */ final class JMSTestWriter { - private static final Log LOG = LogFactory.getLog(JMSReaderStageTest.class); + private static final Log LOG = LogFactory.getLog(JMSReaderTest.class); private final Connection connection; private final Session session; @@ -184,12 +184,12 @@ final class JMSTestReaderThread extends Thread { private static final Log LOG = LogFactory.getLog(JMSTestReaderThread.class); - private final JMSReaderStage jmsReaderStage; + private final JMSReader jmsReaderStage; private final List<IMonitoringRecord> outputList = new LinkedList<>(); public JMSTestReaderThread(final String jmsProviderUrl, final String jmsDestination, final String jmsFactoryLookupName) { - jmsReaderStage = new JMSReaderStage(jmsProviderUrl, jmsDestination, jmsFactoryLookupName, LOG); + jmsReaderStage = new JMSReader(jmsProviderUrl, jmsDestination, jmsFactoryLookupName, LOG); } @Override