From 2524e7faa2d2e728d68ba16e101328416c6280fa Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Lars=20Blu=CC=88mke?= <lbl@informatik.uni-kiel.de>
Date: Thu, 28 Apr 2016 15:08:03 +0200
Subject: [PATCH] exported jms reader logic to a new class

---
 .../reader/jms/JMSReaderLogicModule.java      | 222 ++++++++++++++++++
 1 file changed, 222 insertions(+)
 create mode 100644 src/main/java/kieker/analysis/plugin/reader/jms/JMSReaderLogicModule.java

diff --git a/src/main/java/kieker/analysis/plugin/reader/jms/JMSReaderLogicModule.java b/src/main/java/kieker/analysis/plugin/reader/jms/JMSReaderLogicModule.java
new file mode 100644
index 00000000..a1ae3fbb
--- /dev/null
+++ b/src/main/java/kieker/analysis/plugin/reader/jms/JMSReaderLogicModule.java
@@ -0,0 +1,222 @@
+/***************************************************************************
+ * Copyright 2015 Kieker Project (http://kieker-monitoring.net)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ***************************************************************************/
+
+package kieker.analysis.plugin.reader.jms;
+
+import java.io.Serializable;
+import java.util.Hashtable;
+import java.util.concurrent.CountDownLatch;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageFormatException;
+import javax.jms.MessageListener;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NameNotFoundException;
+
+import kieker.common.logging.Log;
+import kieker.common.record.IMonitoringRecord;
+
+/**
+ * Reads monitoring records from a (remote or local) JMS queue.
+ *
+ *
+ * @author Andre van Hoorn, Matthias Rohr, Lars Erik Bluemke
+ *
+ * @since 0.95a
+ */
+public final class JMSReaderLogicModule {
+
+	private final String jmsProviderUrl;
+	private final String jmsDestination;
+	private final String jmsFactoryLookupName;
+	private final CountDownLatch cdLatch = new CountDownLatch(1);
+
+	private final Log log;
+	private final JMSReader producerStage;
+
+	/**
+	 * Creates a new instance of this class using the given parameters.
+	 *
+	 * @param jmsProviderUrl
+	 *            The name of the configuration determining the JMS provider URL,
+	 *            e.g. {@code tcp://localhost:3035/}
+	 * @param jmsDestination
+	 *            The name of the configuration determining the JMS destination,
+	 *            e.g. {@code queue1}.
+	 * @param jmsFactoryLookupName
+	 *            The name of the configuration determining the name of the used JMS factory,
+	 *            e.g. {@code org.exolab.jms.jndi.InitialContextFactory}.
+	 * @param log
+	 *            Kieker log.
+	 * @param producerStage
+	 *            The actual teetime stage which uses this class.
+	 *
+	 * @throws IllegalArgumentException
+	 *             If one of the properties is empty.
+	 */
+	public JMSReaderLogicModule(final String jmsProviderUrl, final String jmsDestination, final String jmsFactoryLookupName, final Log log,
+			final JMSReader producerStage)
+			throws IllegalArgumentException {
+
+		// Initialize the reader bases
+		this.jmsProviderUrl = jmsProviderUrl;
+		this.jmsDestination = jmsDestination;
+		this.jmsFactoryLookupName = jmsFactoryLookupName;
+		this.log = log;
+		this.producerStage = producerStage;
+
+		// simple sanity check
+		if ((this.jmsProviderUrl.length() == 0) || (this.jmsDestination.length() == 0) || (this.jmsFactoryLookupName.length() == 0)) {
+			throw new IllegalArgumentException("JMSReader has not sufficient parameters. jmsProviderUrl ('" + this.jmsProviderUrl + "'), jmsDestination ('"
+					+ this.jmsDestination + "'), or factoryLookupName ('" + this.jmsFactoryLookupName + "') is null");
+		}
+	}
+
+	/**
+	 * A call to this method is a blocking call.
+	 *
+	 * @return true if the method succeeds, false otherwise.
+	 */
+	public boolean read() {
+		boolean retVal = true;
+		Connection connection = null;
+		try {
+			final Hashtable<String, String> properties = new Hashtable<String, String>(); // NOPMD NOCS (InitialContext expects Hashtable)
+			properties.put(Context.INITIAL_CONTEXT_FACTORY, this.jmsFactoryLookupName);
+
+			// JMS initialization
+			properties.put(Context.PROVIDER_URL, this.jmsProviderUrl);
+			final Context context = new InitialContext(properties);
+			final ConnectionFactory factory = (ConnectionFactory) context.lookup("ConnectionFactory");
+			connection = factory.createConnection();
+			final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+			Destination destination;
+			try {
+				// As a first step, try a JNDI lookup (this seems to fail with ActiveMQ sometimes)
+				destination = (Destination) context.lookup(this.jmsDestination);
+			} catch (final NameNotFoundException exc) {
+				// JNDI lookup failed, try manual creation (this seems to fail with ActiveMQ/HornetQ sometimes)
+				destination = session.createQueue(this.jmsDestination);
+				if (destination == null) { //
+					this.log.error("Failed to lookup queue '" + this.jmsDestination + "' via JNDI: " + exc.getMessage() + " AND failed to create queue");
+					throw exc; // will be catched below to abort the read method
+				}
+			}
+
+			this.log.info("Listening to destination:" + destination + " at " + this.jmsProviderUrl + " !\n***\n\n");
+			final MessageConsumer receiver = session.createConsumer(destination);
+			receiver.setMessageListener(new JMSMessageListener());
+
+			// start the connection to enable message delivery
+			connection.start();
+
+			this.log.info("JMSReader started and waits for incoming monitoring events!");
+			this.block();
+			this.log.info("Woke up by shutdown");
+		} catch (final Exception ex) { // NOPMD NOCS (IllegalCatchCheck)
+			this.log.error("Error in read()", ex);
+			retVal = false;
+		} finally {
+			try {
+				if (connection != null) {
+					connection.close();
+				}
+			} catch (final JMSException ex) {
+				this.log.error("Failed to close JMS", ex);
+			}
+		}
+		return retVal;
+	}
+
+	private final void block() {
+		Runtime.getRuntime().addShutdownHook(new Thread() {
+
+			@Override
+			public final void run() {
+				JMSReaderLogicModule.this.unblock();
+			}
+		});
+		try {
+			this.cdLatch.await();
+		} catch (final InterruptedException e) { // ignore
+		}
+	}
+
+	final void unblock() { // NOPMD (package visible for inner class)
+		this.cdLatch.countDown();
+	}
+
+	final void deliverIndirect(final Object data) { // NOPMD (package visible for inner class)
+		producerStage.getOutputPort().send(data);
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	public void terminate(final boolean error) {
+		this.log.info("Shutdown of JMSReader requested.");
+		this.unblock();
+	}
+
+	protected Log getLog() {
+		return log;
+	}
+
+	/**
+	 * The MessageListener will read onMessage each time a message comes in.
+	 */
+	private final class JMSMessageListener implements MessageListener {
+
+		public JMSMessageListener() {
+			// empty default constructor
+		}
+
+		@Override
+		public void onMessage(final Message jmsMessage) {
+			if (jmsMessage == null) {
+				JMSReaderLogicModule.this.getLog().warn("Received null message");
+			} else {
+				if (jmsMessage instanceof ObjectMessage) {
+					try {
+						final ObjectMessage om = (ObjectMessage) jmsMessage;
+						final Serializable omo = om.getObject();
+						if ((omo instanceof IMonitoringRecord)) {
+							JMSReaderLogicModule.this.deliverIndirect(omo);
+						}
+					} catch (final MessageFormatException ex) {
+						JMSReaderLogicModule.this.getLog().error("Error delivering record", ex);
+					} catch (final JMSException ex) {
+						JMSReaderLogicModule.this.getLog().error("Error delivering record", ex);
+					} catch (final Exception ex) { // NOPMD NOCS (catch Exception)
+						JMSReaderLogicModule.this.getLog().error("Error delivering record", ex);
+					}
+				} else {
+					JMSReaderLogicModule.this.getLog().warn("Received message of invalid type: " + jmsMessage.getClass().getName());
+				}
+			}
+		}
+	}
+
+}
-- 
GitLab