From 3db1691acd27cc3a8b2c10ee578350613767493a Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Lars=20Blu=CC=88mke?= <lbl@informatik.uni-kiel.de>
Date: Fri, 26 Aug 2016 14:22:35 +0200
Subject: [PATCH] restructuring of the readers

---
 ...MSReaderStage.java => AbstractReader.java} |  30 ++-
 .../analysis/plugin/reader/IReaderLogic.java  |  56 +++++
 .../plugin/reader/amqp/AMQPReader.java        | 170 +------------
 .../plugin/reader/amqp/AMQPReaderLogic.java   | 204 ++++++++++++++++
 .../plugin/reader/amqp/AMQPReaderStage.java   |  63 -----
 .../reader/amqp/RegularRecordHandler.java     |  12 +-
 .../analysis/plugin/reader/jms/JMSReader.java | 202 +---------------
 .../plugin/reader/jms/JMSReaderLogic.java     | 224 ++++++++++++++++++
 ...aderStageTest.java => AMQPReaderTest.java} |   6 +-
 ...eaderStageTest.java => JMSReaderTest.java} |   8 +-
 10 files changed, 526 insertions(+), 449 deletions(-)
 rename src/main/java/kieker/analysis/plugin/reader/{jms/JMSReaderStage.java => AbstractReader.java} (57%)
 create mode 100644 src/main/java/kieker/analysis/plugin/reader/IReaderLogic.java
 create mode 100644 src/main/java/kieker/analysis/plugin/reader/amqp/AMQPReaderLogic.java
 delete mode 100644 src/main/java/kieker/analysis/plugin/reader/amqp/AMQPReaderStage.java
 create mode 100644 src/main/java/kieker/analysis/plugin/reader/jms/JMSReaderLogic.java
 rename src/test/java/kieker/analysis/plugin/reader/amqp/{AMQPReaderStageTest.java => AMQPReaderTest.java} (98%)
 rename src/test/java/kieker/analysis/plugin/reader/jms/{JMSReaderStageTest.java => JMSReaderTest.java} (96%)

diff --git a/src/main/java/kieker/analysis/plugin/reader/jms/JMSReaderStage.java b/src/main/java/kieker/analysis/plugin/reader/AbstractReader.java
similarity index 57%
rename from src/main/java/kieker/analysis/plugin/reader/jms/JMSReaderStage.java
rename to src/main/java/kieker/analysis/plugin/reader/AbstractReader.java
index dcdec0ef..71d76878 100644
--- a/src/main/java/kieker/analysis/plugin/reader/jms/JMSReaderStage.java
+++ b/src/main/java/kieker/analysis/plugin/reader/AbstractReader.java
@@ -14,40 +14,38 @@
  * limitations under the License.
  ***************************************************************************/
 
-package kieker.analysis.plugin.reader.jms;
+package kieker.analysis.plugin.reader;
 
-import kieker.common.logging.Log;
+import kieker.analysis.plugin.annotation.Plugin;
 import kieker.common.record.IMonitoringRecord;
 
 import teetime.framework.AbstractProducerStage;
 
 /**
- * Reads monitoring records from a (remote or local) JMS queue by using the
- * read-method from JMSReaderLogicModule. JMSReaderLogicModule also delivers
- * read records to the output port.
+ * This abstract class defines the basic structure of each reader. Each reader has its reader logic separated
+ * from the actual TeeTime stage. The reader logic is mainly based on the original Kieker stages.
  *
  * @author Lars Erik Bluemke
  */
-public class JMSReaderStage extends AbstractProducerStage<IMonitoringRecord> {
+@Plugin
+public abstract class AbstractReader extends AbstractProducerStage<IMonitoringRecord> {
 
-	private final JMSReader jmsReader;
-
-	public JMSReaderStage(final String jmsProviderUrl, final String jmsDestination, final String jmsFactoryLookupName, final Log log) {
-		this.jmsReader = new JMSReader(jmsProviderUrl, jmsDestination, jmsFactoryLookupName, log, this);
-	}
+	protected IReaderLogic readerLogic;
 
 	@Override
 	protected void execute() {
-		jmsReader.read();
-		this.terminateStage();
+		readerLogic.read();
 	}
 
-	protected void deliverRecord(final IMonitoringRecord monitoringRecord) {
+	/** Called from reader logic to send the read records to the output port */
+	public void deliverRecord(final IMonitoringRecord monitoringRecord) {
 		this.getOutputPort().send(monitoringRecord);
 	}
 
-	/** Terminates the JMSReader by returning from read method. */
+	/** Terminates the reader logic by returning from read method. */
 	public void terminate(final boolean error) {
-		this.jmsReader.terminate(error);
+		readerLogic.terminate(error);
+		this.terminateStage();
 	}
+
 }
diff --git a/src/main/java/kieker/analysis/plugin/reader/IReaderLogic.java b/src/main/java/kieker/analysis/plugin/reader/IReaderLogic.java
new file mode 100644
index 00000000..dd50ec2f
--- /dev/null
+++ b/src/main/java/kieker/analysis/plugin/reader/IReaderLogic.java
@@ -0,0 +1,56 @@
+/***************************************************************************
+ * Copyright 2015 Kieker Project (http://kieker-monitoring.net)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ***************************************************************************/
+
+package kieker.analysis.plugin.reader;
+
+/**
+ * This is the interface for reader plugins.
+ *
+ * @author Andre van Hoorn
+ *
+ * @since 0.95a
+ */
+public interface IReaderLogic {
+
+	/**
+	 * Starts the reader. This method is intended to be a blocking operation,
+	 * i.e., it is assumed that reading has finished before this method returns.
+	 * The method should indicate an error by the return value false.
+	 *
+	 * In asynchronous scenarios, the {@link kieker.analysis.plugin.IPlugin#terminate(boolean)} method can be used
+	 * to initiate the termination of this method.
+	 *
+	 * @return true if reading was successful; false if an error occurred
+	 *
+	 * @since 1.2
+	 */
+	public boolean read();
+
+	/**
+	 * Initiates a termination of the plugin. This method is only used by the
+	 * framework and should not be called manually.
+	 * Use the method {@link kieker.analysis.AnalysisController#terminate(boolean)} instead.
+	 *
+	 * After receiving this notification, the plugin should terminate any running
+	 * methods, e.g., read for readers.
+	 *
+	 * @param error
+	 *            Determines whether the plugin is terminated due to an error or not.
+	 *
+	 * @since 1.6
+	 */
+	public void terminate(final boolean error);
+}
diff --git a/src/main/java/kieker/analysis/plugin/reader/amqp/AMQPReader.java b/src/main/java/kieker/analysis/plugin/reader/amqp/AMQPReader.java
index 92ae48fb..59575fde 100644
--- a/src/main/java/kieker/analysis/plugin/reader/amqp/AMQPReader.java
+++ b/src/main/java/kieker/analysis/plugin/reader/amqp/AMQPReader.java
@@ -16,66 +16,19 @@
 
 package kieker.analysis.plugin.reader.amqp;
 
-import java.io.IOException;
-import java.net.URISyntaxException;
-import java.nio.ByteBuffer;
-import java.security.KeyManagementException;
-import java.security.NoSuchAlgorithmException;
-import java.util.concurrent.TimeoutException;
-
-import com.rabbitmq.client.Channel;
-import com.rabbitmq.client.Connection;
-import com.rabbitmq.client.ConnectionFactory;
-import com.rabbitmq.client.QueueingConsumer;
-import com.rabbitmq.client.ShutdownSignalException;
-
+import kieker.analysis.plugin.reader.AbstractReader;
 import kieker.common.logging.Log;
-import kieker.common.record.IMonitoringRecord;
-import kieker.common.util.registry.ILookup;
-import kieker.common.util.registry.Lookup;
 
 /**
- * Logic module for the reader stage that reads monitoring records from an AMQP queue.
+ * Reader stage that reads monitoring records from an AMQP queue.
  *
- * @author Holger Knoche, Lars Erik Bluemke
+ * @author Lars Erik Bluemke
  *
- * @since 1.12
  */
-public final class AMQPReader {
-
-	/** The name of the output port delivering the received records. */
-	public static final String OUTPUT_PORT_NAME_RECORDS = "monitoringRecords";
-
-	/** ID for registry records. */
-	private static final byte REGISTRY_RECORD_ID = (byte) 0xFF;
-
-	/** ID for regular records. */
-	private static final byte REGULAR_RECORD_ID = (byte) 0x01;
-
-	private final String uri;
-	private final String queueName;
-	private final int heartbeat;
-
-	private volatile Connection connection;
-	private volatile Channel channel;
-	private volatile QueueingConsumer consumer;
-
-	private final ILookup<String> stringRegistry = new Lookup<String>();
-
-	private volatile Thread registryRecordHandlerThread;
-	private volatile RegistryRecordHandler registryRecordHandler;
-	private volatile RegularRecordHandler regularRecordHandler;
-
-	private volatile Thread regularRecordHandlerThread;
-
-	private volatile boolean terminated;
-	private volatile boolean threadsStarted;
-
-	private final Log log;
-	private final AMQPReaderStage amqpReaderStage;
+public class AMQPReader extends AbstractReader {
 
 	/**
-	 * Creates a new logic module for an AMQP reader.
+	 * Creates a new AMQP reader.
 	 *
 	 * @param uri
 	 *            The name of the configuration property for the server URI.
@@ -83,118 +36,9 @@ public final class AMQPReader {
 	 *            The name of the configuration property for the AMQP queue name.
 	 * @param heartbeat
 	 *            The name of the configuration property for the heartbeat timeout.
-	 * @param log
-	 *            Kieker log.
-	 * @param amqpReaderStage
-	 *            The actual teetime stage which uses this class.
 	 */
-	public AMQPReader(final String uri, final String queueName, final int heartbeat, final Log log, final AMQPReaderStage amqpReaderStage) {
-		this.uri = uri;
-		this.queueName = queueName;
-		this.heartbeat = heartbeat;
-		this.log = log;
-		this.amqpReaderStage = amqpReaderStage;
-	}
-
-	public void init() {
-		try {
-			this.connection = this.createConnection();
-			this.channel = this.connection.createChannel();
-			this.consumer = new QueueingConsumer(this.channel);
-
-			// Set up record handlers
-			this.registryRecordHandler = new RegistryRecordHandler(this.stringRegistry);
-			this.regularRecordHandler = new RegularRecordHandler(this, this.stringRegistry);
-
-			// Set up threads
-			this.registryRecordHandlerThread = new Thread(this.registryRecordHandler);
-			this.registryRecordHandlerThread.setDaemon(true);
-
-			this.regularRecordHandlerThread = new Thread(this.regularRecordHandler);
-			this.regularRecordHandlerThread.setDaemon(true);
-		} catch (final KeyManagementException e) {
-			this.handleInitializationError(e);
-		} catch (final NoSuchAlgorithmException e) {
-			this.handleInitializationError(e);
-		} catch (final IOException e) {
-			this.handleInitializationError(e);
-		} catch (final TimeoutException e) {
-			this.handleInitializationError(e);
-		} catch (final URISyntaxException e) {
-			this.handleInitializationError(e);
-		}
-
-	}
-
-	private void handleInitializationError(final Throwable e) {
-		log.error("An error occurred initializing the AMQP reader: " + e);
-	}
-
-	private Connection createConnection() throws IOException, TimeoutException, KeyManagementException, NoSuchAlgorithmException, URISyntaxException {
-		final ConnectionFactory connectionFactory = new ConnectionFactory();
-
-		connectionFactory.setUri(this.uri);
-		connectionFactory.setRequestedHeartbeat(this.heartbeat);
-
-		return connectionFactory.newConnection();
-	}
-
-	public boolean read() {
-		// Start the worker threads, if necessary
-		if (!this.threadsStarted) {
-			this.registryRecordHandlerThread.start();
-			this.regularRecordHandlerThread.start();
-			this.threadsStarted = true;
-		}
-
-		try {
-			this.channel.basicConsume(this.queueName, true, this.consumer);
-
-			while (!this.terminated) {
-				final QueueingConsumer.Delivery delivery = this.consumer.nextDelivery();
-				final byte[] body = delivery.getBody();
-
-				final ByteBuffer buffer = ByteBuffer.wrap(body);
-				final byte recordType = buffer.get();
-
-				// Dispatch to the appropriate handlers
-				switch (recordType) {
-				case REGISTRY_RECORD_ID:
-					this.registryRecordHandler.enqueueRegistryRecord(buffer);
-					break;
-				case REGULAR_RECORD_ID:
-					this.regularRecordHandler.enqueueRegularRecord(buffer);
-					break;
-				default:
-					this.log.error(String.format("Unknown record type: %02x", recordType));
-					break;
-				}
-			}
-		} catch (final IOException e) {
-			this.log.error("Error while reading from queue " + this.queueName, e);
-			return false;
-		} catch (final InterruptedException e) {
-			this.log.error("Consumer was interrupted on queue " + this.queueName, e);
-			return false;
-		} catch (final ShutdownSignalException e) {
-			this.log.info("Consumer was shut down while waiting on queue " + this.queueName, e);
-			return true;
-		}
-
-		return true;
-	}
-
-	public void terminate(final boolean error) {
-		try {
-			this.terminated = true;
-			this.connection.close();
-		} catch (final IOException e) {
-			this.log.error("IO error while trying to close the connection.", e);
-		}
-	}
-
-	protected void deliverRecord(final IMonitoringRecord monitoringRecord) {
-		this.amqpReaderStage.deliverRecord(monitoringRecord);
+	public AMQPReader(final String uri, final String queueName, final int heartbeat, final Log log) {
+		this.readerLogic = new AMQPReaderLogic(uri, queueName, heartbeat, log, this);
 	}
 
 }
diff --git a/src/main/java/kieker/analysis/plugin/reader/amqp/AMQPReaderLogic.java b/src/main/java/kieker/analysis/plugin/reader/amqp/AMQPReaderLogic.java
new file mode 100644
index 00000000..d2c5f223
--- /dev/null
+++ b/src/main/java/kieker/analysis/plugin/reader/amqp/AMQPReaderLogic.java
@@ -0,0 +1,204 @@
+/***************************************************************************
+ * Copyright 2015 Kieker Project (http://kieker-monitoring.net)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ***************************************************************************/
+
+package kieker.analysis.plugin.reader.amqp;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.security.KeyManagementException;
+import java.security.NoSuchAlgorithmException;
+import java.util.concurrent.TimeoutException;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.QueueingConsumer;
+import com.rabbitmq.client.ShutdownSignalException;
+
+import kieker.analysis.plugin.reader.IReaderLogic;
+import kieker.common.logging.Log;
+import kieker.common.record.IMonitoringRecord;
+import kieker.common.util.registry.ILookup;
+import kieker.common.util.registry.Lookup;
+
+/**
+ * Logic module for the reader stage that reads monitoring records from an AMQP queue.
+ *
+ * @author Holger Knoche, Lars Erik Bluemke
+ *
+ * @since 1.12
+ */
+public final class AMQPReaderLogic implements IReaderLogic {
+
+	/** The name of the output port delivering the received records. */
+	public static final String OUTPUT_PORT_NAME_RECORDS = "monitoringRecords";
+
+	/** ID for registry records. */
+	private static final byte REGISTRY_RECORD_ID = (byte) 0xFF;
+
+	/** ID for regular records. */
+	private static final byte REGULAR_RECORD_ID = (byte) 0x01;
+
+	private final String uri;
+	private final String queueName;
+	private final int heartbeat;
+
+	private volatile Connection connection;
+	private volatile Channel channel;
+	private volatile QueueingConsumer consumer;
+
+	private final ILookup<String> stringRegistry = new Lookup<String>();
+
+	private volatile Thread registryRecordHandlerThread;
+	private volatile RegistryRecordHandler registryRecordHandler;
+	private volatile RegularRecordHandler regularRecordHandler;
+
+	private volatile Thread regularRecordHandlerThread;
+
+	private volatile boolean terminated;
+	private volatile boolean threadsStarted;
+
+	private final Log log;
+	private final AMQPReader amqpReaderStage;
+
+	/**
+	 * Creates a new logic module for an AMQP reader.
+	 *
+	 * @param uri
+	 *            The name of the configuration property for the server URI.
+	 * @param queueName
+	 *            The name of the configuration property for the AMQP queue name.
+	 * @param heartbeat
+	 *            The name of the configuration property for the heartbeat timeout.
+	 * @param log
+	 *            Kieker log.
+	 * @param amqpReaderStage
+	 *            The actual teetime stage which uses this class.
+	 */
+	public AMQPReaderLogic(final String uri, final String queueName, final int heartbeat, final Log log, final AMQPReader amqpReaderStage) {
+		this.uri = uri;
+		this.queueName = queueName;
+		this.heartbeat = heartbeat;
+		this.log = log;
+		this.amqpReaderStage = amqpReaderStage;
+		this.init();
+	}
+
+	public void init() {
+		try {
+			this.connection = this.createConnection();
+			this.channel = this.connection.createChannel();
+			this.consumer = new QueueingConsumer(this.channel);
+
+			// Set up record handlers
+			this.registryRecordHandler = new RegistryRecordHandler(this.stringRegistry);
+			this.regularRecordHandler = new RegularRecordHandler(this, this.stringRegistry);
+
+			// Set up threads
+			this.registryRecordHandlerThread = new Thread(this.registryRecordHandler);
+			this.registryRecordHandlerThread.setDaemon(true);
+
+			this.regularRecordHandlerThread = new Thread(this.regularRecordHandler);
+			this.regularRecordHandlerThread.setDaemon(true);
+		} catch (final KeyManagementException e) {
+			this.handleInitializationError(e);
+		} catch (final NoSuchAlgorithmException e) {
+			this.handleInitializationError(e);
+		} catch (final IOException e) {
+			this.handleInitializationError(e);
+		} catch (final TimeoutException e) {
+			this.handleInitializationError(e);
+		} catch (final URISyntaxException e) {
+			this.handleInitializationError(e);
+		}
+
+	}
+
+	private void handleInitializationError(final Throwable e) {
+		log.error("An error occurred initializing the AMQP reader: " + e);
+	}
+
+	private Connection createConnection() throws IOException, TimeoutException, KeyManagementException, NoSuchAlgorithmException, URISyntaxException {
+		final ConnectionFactory connectionFactory = new ConnectionFactory();
+
+		connectionFactory.setUri(this.uri);
+		connectionFactory.setRequestedHeartbeat(this.heartbeat);
+
+		return connectionFactory.newConnection();
+	}
+
+	@Override
+	public boolean read() {
+		// Start the worker threads, if necessary
+		if (!this.threadsStarted) {
+			this.registryRecordHandlerThread.start();
+			this.regularRecordHandlerThread.start();
+			this.threadsStarted = true;
+		}
+
+		try {
+			this.channel.basicConsume(this.queueName, true, this.consumer);
+
+			while (!this.terminated) {
+				final QueueingConsumer.Delivery delivery = this.consumer.nextDelivery();
+				final byte[] body = delivery.getBody();
+
+				final ByteBuffer buffer = ByteBuffer.wrap(body);
+				final byte recordType = buffer.get();
+
+				// Dispatch to the appropriate handlers
+				switch (recordType) {
+				case REGISTRY_RECORD_ID:
+					this.registryRecordHandler.enqueueRegistryRecord(buffer);
+					break;
+				case REGULAR_RECORD_ID:
+					this.regularRecordHandler.enqueueRegularRecord(buffer);
+					break;
+				default:
+					this.log.error(String.format("Unknown record type: %02x", recordType));
+					break;
+				}
+			}
+		} catch (final IOException e) {
+			this.log.error("Error while reading from queue " + this.queueName, e);
+			return false;
+		} catch (final InterruptedException e) {
+			this.log.error("Consumer was interrupted on queue " + this.queueName, e);
+			return false;
+		} catch (final ShutdownSignalException e) {
+			this.log.info("Consumer was shut down while waiting on queue " + this.queueName, e);
+			return true;
+		}
+
+		return true;
+	}
+
+	@Override
+	public void terminate(final boolean error) {
+		try {
+			this.terminated = true;
+			this.connection.close();
+		} catch (final IOException e) {
+			this.log.error("IO error while trying to close the connection.", e);
+		}
+	}
+
+	protected void deliverRecord(final IMonitoringRecord monitoringRecord) {
+		this.amqpReaderStage.deliverRecord(monitoringRecord);
+	}
+
+}
diff --git a/src/main/java/kieker/analysis/plugin/reader/amqp/AMQPReaderStage.java b/src/main/java/kieker/analysis/plugin/reader/amqp/AMQPReaderStage.java
deleted file mode 100644
index 05c818e6..00000000
--- a/src/main/java/kieker/analysis/plugin/reader/amqp/AMQPReaderStage.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/***************************************************************************
- * Copyright 2015 Kieker Project (http://kieker-monitoring.net)
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ***************************************************************************/
-
-package kieker.analysis.plugin.reader.amqp;
-
-import kieker.common.logging.Log;
-import kieker.common.record.IMonitoringRecord;
-
-import teetime.framework.AbstractProducerStage;
-
-/**
- * Reader stage that reads monitoring records from an AMQP queue.
- *
- * @author Lars Erik Bluemke
- *
- */
-public class AMQPReaderStage extends AbstractProducerStage<IMonitoringRecord> {
-
-	private final AMQPReader amqpReader;
-
-	/**
-	 * Creates a new AMQP reader.
-	 *
-	 * @param uri
-	 *            The name of the configuration property for the server URI.
-	 * @param queueName
-	 *            The name of the configuration property for the AMQP queue name.
-	 * @param heartbeat
-	 *            The name of the configuration property for the heartbeat timeout.
-	 */
-	public AMQPReaderStage(final String uri, final String queueName, final int heartbeat, final Log log) {
-		this.amqpReader = new AMQPReader(uri, queueName, heartbeat, log, this);
-	}
-
-	@Override
-	protected void execute() {
-		this.amqpReader.init();
-		this.amqpReader.read();
-		this.terminateStage();
-	}
-
-	protected void deliverRecord(final IMonitoringRecord monitoringRecord) {
-		this.getOutputPort().send(monitoringRecord);
-	}
-
-	/** Terminates the AMQPReader by returning from read method. */
-	public void terminate(final boolean error) {
-		this.amqpReader.terminate(error);
-	}
-}
diff --git a/src/main/java/kieker/analysis/plugin/reader/amqp/RegularRecordHandler.java b/src/main/java/kieker/analysis/plugin/reader/amqp/RegularRecordHandler.java
index 683f9290..6a797839 100644
--- a/src/main/java/kieker/analysis/plugin/reader/amqp/RegularRecordHandler.java
+++ b/src/main/java/kieker/analysis/plugin/reader/amqp/RegularRecordHandler.java
@@ -44,20 +44,20 @@ public class RegularRecordHandler implements Runnable {
 
 	private final ILookup<String> stringRegistry;
 	private final CachedRecordFactoryCatalog cachedRecordFactoryCatalog = CachedRecordFactoryCatalog.getInstance();
-	private final AMQPReader reader;
+	private final AMQPReaderLogic readerLogic;
 
 	private final BlockingQueue<ByteBuffer> queue = new ArrayBlockingQueue<ByteBuffer>(DEFAULT_QUEUE_SIZE);
 
 	/**
 	 * Creates a new regular record handler.
 	 *
-	 * @param reader
-	 *            The reader to send the instantiated records to
+	 * @param readerLogic
+	 *            The reader logic class to send the instantiated records to
 	 * @param stringRegistry
 	 *            The string registry to use
 	 */
-	public RegularRecordHandler(final AMQPReader reader, final ILookup<String> stringRegistry) {
-		this.reader = reader;
+	public RegularRecordHandler(final AMQPReaderLogic readerLogic, final ILookup<String> stringRegistry) {
+		this.readerLogic = readerLogic;
 		this.stringRegistry = stringRegistry;
 	}
 
@@ -98,7 +98,7 @@ public class RegularRecordHandler implements Runnable {
 			final IMonitoringRecord record = recordFactory.create(buffer, this.stringRegistry);
 			record.setLoggingTimestamp(loggingTimestamp);
 
-			this.reader.deliverRecord(record);
+			this.readerLogic.deliverRecord(record);
 		} catch (final RecordInstantiationException e) {
 			LOG.error("Error instantiating record", e);
 		}
diff --git a/src/main/java/kieker/analysis/plugin/reader/jms/JMSReader.java b/src/main/java/kieker/analysis/plugin/reader/jms/JMSReader.java
index 8301faf3..1f84ee9e 100644
--- a/src/main/java/kieker/analysis/plugin/reader/jms/JMSReader.java
+++ b/src/main/java/kieker/analysis/plugin/reader/jms/JMSReader.java
@@ -16,206 +16,20 @@
 
 package kieker.analysis.plugin.reader.jms;
 
-import java.io.Serializable;
-import java.util.Hashtable;
-import java.util.concurrent.CountDownLatch;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageFormatException;
-import javax.jms.MessageListener;
-import javax.jms.ObjectMessage;
-import javax.jms.Session;
-import javax.naming.Context;
-import javax.naming.InitialContext;
-import javax.naming.NameNotFoundException;
-
+import kieker.analysis.plugin.reader.AbstractReader;
 import kieker.common.logging.Log;
-import kieker.common.record.IMonitoringRecord;
 
 /**
- * Reads monitoring records from a (remote or local) JMS queue.
- *
- *
- * @author Andre van Hoorn, Matthias Rohr, Lars Erik Bluemke
+ * Reads monitoring records from a (remote or local) JMS queue by using the
+ * read-method from JMSReaderLogicModule. JMSReaderLogicModule also delivers
+ * read records to the output port.
  *
- * @since 0.95a
+ * @author Lars Erik Bluemke
  */
-public final class JMSReader {
-
-	private final String jmsProviderUrl;
-	private final String jmsDestination;
-	private final String jmsFactoryLookupName;
-	private final CountDownLatch cdLatch = new CountDownLatch(1);
-
-	private final Log log;
-	private final JMSReaderStage jmsReaderStage;
-
-	/**
-	 * Creates a new instance of this class using the given parameters.
-	 *
-	 * @param jmsProviderUrl
-	 *            The name of the configuration determining the JMS provider URL,
-	 *            e.g. {@code tcp://localhost:3035/}
-	 * @param jmsDestination
-	 *            The name of the configuration determining the JMS destination,
-	 *            e.g. {@code queue1}.
-	 * @param jmsFactoryLookupName
-	 *            The name of the configuration determining the name of the used JMS factory,
-	 *            e.g. {@code org.exolab.jms.jndi.InitialContextFactory}.
-	 * @param log
-	 *            Kieker log.
-	 * @param jmsReaderStage
-	 *            The actual teetime stage which uses this class.
-	 *
-	 * @throws IllegalArgumentException
-	 *             If one of the properties is empty.
-	 */
-	public JMSReader(final String jmsProviderUrl, final String jmsDestination, final String jmsFactoryLookupName, final Log log,
-			final JMSReaderStage jmsReaderStage)
-			throws IllegalArgumentException {
-
-		// Initialize the reader bases
-		this.jmsProviderUrl = jmsProviderUrl;
-		this.jmsDestination = jmsDestination;
-		this.jmsFactoryLookupName = jmsFactoryLookupName;
-		this.log = log;
-		this.jmsReaderStage = jmsReaderStage;
-
-		// simple sanity check
-		if ((this.jmsProviderUrl.length() == 0) || (this.jmsDestination.length() == 0) || (this.jmsFactoryLookupName.length() == 0)) {
-			throw new IllegalArgumentException("JMSReader has not sufficient parameters. jmsProviderUrl ('" + this.jmsProviderUrl + "'), jmsDestination ('"
-					+ this.jmsDestination + "'), or factoryLookupName ('" + this.jmsFactoryLookupName + "') is null");
-		}
-	}
-
-	/**
-	 * A call to this method is a blocking call.
-	 *
-	 * @return true if the method succeeds, false otherwise.
-	 */
-	public boolean read() {
-		boolean retVal = true;
-		Connection connection = null;
-		try {
-			final Hashtable<String, String> properties = new Hashtable<String, String>(); // NOPMD NOCS (InitialContext expects Hashtable)
-			properties.put(Context.INITIAL_CONTEXT_FACTORY, this.jmsFactoryLookupName);
-
-			// JMS initialization
-			properties.put(Context.PROVIDER_URL, this.jmsProviderUrl);
-			final Context context = new InitialContext(properties);
-			final ConnectionFactory factory = (ConnectionFactory) context.lookup("ConnectionFactory");
-			connection = factory.createConnection();
-			final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-			Destination destination;
-			try {
-				// As a first step, try a JNDI lookup (this seems to fail with ActiveMQ sometimes)
-				destination = (Destination) context.lookup(this.jmsDestination);
-			} catch (final NameNotFoundException exc) {
-				// JNDI lookup failed, try manual creation (this seems to fail with ActiveMQ/HornetQ sometimes)
-				destination = session.createQueue(this.jmsDestination);
-				if (destination == null) { //
-					this.log.error("Failed to lookup queue '" + this.jmsDestination + "' via JNDI: " + exc.getMessage() + " AND failed to create queue");
-					throw exc; // will be catched below to abort the read method
-				}
-			}
-
-			this.log.info("Listening to destination:" + destination + " at " + this.jmsProviderUrl + " !\n***\n\n");
-			final MessageConsumer receiver = session.createConsumer(destination);
-			receiver.setMessageListener(new JMSMessageListener());
-
-			// start the connection to enable message delivery
-			connection.start();
+public class JMSReader extends AbstractReader {
 
-			this.log.info("JMSReader started and waits for incoming monitoring events!");
-			this.block();
-			this.log.info("Woke up by shutdown");
-		} catch (final Exception ex) { // NOPMD NOCS (IllegalCatchCheck)
-			this.log.error("Error in read()", ex);
-			retVal = false;
-		} finally {
-			try {
-				if (connection != null) {
-					connection.close();
-				}
-			} catch (final JMSException ex) {
-				this.log.error("Failed to close JMS", ex);
-			}
-		}
-		return retVal;
+	public JMSReader(final String jmsProviderUrl, final String jmsDestination, final String jmsFactoryLookupName, final Log log) {
+		this.readerLogic = new JMSReaderLogic(jmsProviderUrl, jmsDestination, jmsFactoryLookupName, log, this);
 	}
 
-	private final void block() {
-		Runtime.getRuntime().addShutdownHook(new Thread() {
-
-			@Override
-			public final void run() {
-				JMSReader.this.unblock();
-			}
-		});
-		try {
-			this.cdLatch.await();
-		} catch (final InterruptedException e) { // ignore
-		}
-	}
-
-	final void unblock() { // NOPMD (package visible for inner class)
-		this.cdLatch.countDown();
-	}
-
-	final void deliverIndirect(final IMonitoringRecord data) { // NOPMD (package visible for inner class)
-		jmsReaderStage.deliverRecord(data);
-	}
-
-	/**
-	 * {@inheritDoc}
-	 */
-	public void terminate(final boolean error) {
-		this.log.info("Shutdown of JMSReader requested.");
-		this.unblock();
-	}
-
-	protected Log getLog() {
-		return log;
-	}
-
-	/**
-	 * The MessageListener will read onMessage each time a message comes in.
-	 */
-	private final class JMSMessageListener implements MessageListener {
-
-		public JMSMessageListener() {
-			// empty default constructor
-		}
-
-		@Override
-		public void onMessage(final Message jmsMessage) {
-			if (jmsMessage == null) {
-				JMSReader.this.getLog().warn("Received null message");
-			} else {
-				if (jmsMessage instanceof ObjectMessage) {
-					try {
-						final ObjectMessage om = (ObjectMessage) jmsMessage;
-						final Serializable omo = om.getObject();
-						if ((omo instanceof IMonitoringRecord)) {
-							JMSReader.this.deliverIndirect((IMonitoringRecord) omo);
-						}
-					} catch (final MessageFormatException ex) {
-						JMSReader.this.getLog().error("Error delivering record", ex);
-					} catch (final JMSException ex) {
-						JMSReader.this.getLog().error("Error delivering record", ex);
-					} catch (final Exception ex) { // NOPMD NOCS (catch Exception)
-						JMSReader.this.getLog().error("Error delivering record", ex);
-					}
-				} else {
-					JMSReader.this.getLog().warn("Received message of invalid type: " + jmsMessage.getClass().getName());
-				}
-			}
-		}
-	}
 }
diff --git a/src/main/java/kieker/analysis/plugin/reader/jms/JMSReaderLogic.java b/src/main/java/kieker/analysis/plugin/reader/jms/JMSReaderLogic.java
new file mode 100644
index 00000000..741588bd
--- /dev/null
+++ b/src/main/java/kieker/analysis/plugin/reader/jms/JMSReaderLogic.java
@@ -0,0 +1,224 @@
+/***************************************************************************
+ * Copyright 2015 Kieker Project (http://kieker-monitoring.net)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ***************************************************************************/
+
+package kieker.analysis.plugin.reader.jms;
+
+import java.io.Serializable;
+import java.util.Hashtable;
+import java.util.concurrent.CountDownLatch;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageFormatException;
+import javax.jms.MessageListener;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NameNotFoundException;
+
+import kieker.analysis.plugin.reader.IReaderLogic;
+import kieker.common.logging.Log;
+import kieker.common.record.IMonitoringRecord;
+
+/**
+ * Reads monitoring records from a (remote or local) JMS queue.
+ *
+ *
+ * @author Andre van Hoorn, Matthias Rohr, Lars Erik Bluemke
+ *
+ * @since 0.95a
+ */
+public final class JMSReaderLogic implements IReaderLogic {
+
+	private final String jmsProviderUrl;
+	private final String jmsDestination;
+	private final String jmsFactoryLookupName;
+	private final CountDownLatch cdLatch = new CountDownLatch(1);
+
+	private final Log log;
+	private final JMSReader jmsReaderStage;
+
+	/**
+	 * Creates a new instance of this class using the given parameters.
+	 *
+	 * @param jmsProviderUrl
+	 *            The name of the configuration determining the JMS provider URL,
+	 *            e.g. {@code tcp://localhost:3035/}
+	 * @param jmsDestination
+	 *            The name of the configuration determining the JMS destination,
+	 *            e.g. {@code queue1}.
+	 * @param jmsFactoryLookupName
+	 *            The name of the configuration determining the name of the used JMS factory,
+	 *            e.g. {@code org.exolab.jms.jndi.InitialContextFactory}.
+	 * @param log
+	 *            Kieker log.
+	 * @param jmsReaderStage
+	 *            The actual teetime stage which uses this class.
+	 *
+	 * @throws IllegalArgumentException
+	 *             If one of the properties is empty.
+	 */
+	public JMSReaderLogic(final String jmsProviderUrl, final String jmsDestination, final String jmsFactoryLookupName, final Log log,
+			final JMSReader jmsReaderStage)
+			throws IllegalArgumentException {
+
+		// Initialize the reader bases
+		this.jmsProviderUrl = jmsProviderUrl;
+		this.jmsDestination = jmsDestination;
+		this.jmsFactoryLookupName = jmsFactoryLookupName;
+		this.log = log;
+		this.jmsReaderStage = jmsReaderStage;
+
+		// simple sanity check
+		if ((this.jmsProviderUrl.length() == 0) || (this.jmsDestination.length() == 0) || (this.jmsFactoryLookupName.length() == 0)) {
+			throw new IllegalArgumentException("JMSReader has not sufficient parameters. jmsProviderUrl ('" + this.jmsProviderUrl + "'), jmsDestination ('"
+					+ this.jmsDestination + "'), or factoryLookupName ('" + this.jmsFactoryLookupName + "') is null");
+		}
+	}
+
+	/**
+	 * A call to this method is a blocking call.
+	 *
+	 * @return true if the method succeeds, false otherwise.
+	 */
+	@Override
+	public boolean read() {
+		boolean retVal = true;
+		Connection connection = null;
+		try {
+			final Hashtable<String, String> properties = new Hashtable<String, String>(); // NOPMD NOCS (InitialContext expects Hashtable)
+			properties.put(Context.INITIAL_CONTEXT_FACTORY, this.jmsFactoryLookupName);
+
+			// JMS initialization
+			properties.put(Context.PROVIDER_URL, this.jmsProviderUrl);
+			final Context context = new InitialContext(properties);
+			final ConnectionFactory factory = (ConnectionFactory) context.lookup("ConnectionFactory");
+			connection = factory.createConnection();
+			final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+			Destination destination;
+			try {
+				// As a first step, try a JNDI lookup (this seems to fail with ActiveMQ sometimes)
+				destination = (Destination) context.lookup(this.jmsDestination);
+			} catch (final NameNotFoundException exc) {
+				// JNDI lookup failed, try manual creation (this seems to fail with ActiveMQ/HornetQ sometimes)
+				destination = session.createQueue(this.jmsDestination);
+				if (destination == null) { //
+					this.log.error("Failed to lookup queue '" + this.jmsDestination + "' via JNDI: " + exc.getMessage() + " AND failed to create queue");
+					throw exc; // will be catched below to abort the read method
+				}
+			}
+
+			this.log.info("Listening to destination:" + destination + " at " + this.jmsProviderUrl + " !\n***\n\n");
+			final MessageConsumer receiver = session.createConsumer(destination);
+			receiver.setMessageListener(new JMSMessageListener());
+
+			// start the connection to enable message delivery
+			connection.start();
+
+			this.log.info("JMSReader started and waits for incoming monitoring events!");
+			this.block();
+			this.log.info("Woke up by shutdown");
+		} catch (final Exception ex) { // NOPMD NOCS (IllegalCatchCheck)
+			this.log.error("Error in read()", ex);
+			retVal = false;
+		} finally {
+			try {
+				if (connection != null) {
+					connection.close();
+				}
+			} catch (final JMSException ex) {
+				this.log.error("Failed to close JMS", ex);
+			}
+		}
+		return retVal;
+	}
+
+	private final void block() {
+		Runtime.getRuntime().addShutdownHook(new Thread() {
+
+			@Override
+			public final void run() {
+				JMSReaderLogic.this.unblock();
+			}
+		});
+		try {
+			this.cdLatch.await();
+		} catch (final InterruptedException e) { // ignore
+		}
+	}
+
+	final void unblock() { // NOPMD (package visible for inner class)
+		this.cdLatch.countDown();
+	}
+
+	final void deliverIndirect(final IMonitoringRecord data) { // NOPMD (package visible for inner class)
+		jmsReaderStage.deliverRecord(data);
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public void terminate(final boolean error) {
+		this.log.info("Shutdown of JMSReader requested.");
+		this.unblock();
+	}
+
+	protected Log getLog() {
+		return log;
+	}
+
+	/**
+	 * The MessageListener will read onMessage each time a message comes in.
+	 */
+	private final class JMSMessageListener implements MessageListener {
+
+		public JMSMessageListener() {
+			// empty default constructor
+		}
+
+		@Override
+		public void onMessage(final Message jmsMessage) {
+			if (jmsMessage == null) {
+				JMSReaderLogic.this.getLog().warn("Received null message");
+			} else {
+				if (jmsMessage instanceof ObjectMessage) {
+					try {
+						final ObjectMessage om = (ObjectMessage) jmsMessage;
+						final Serializable omo = om.getObject();
+						if ((omo instanceof IMonitoringRecord)) {
+							JMSReaderLogic.this.deliverIndirect((IMonitoringRecord) omo);
+						}
+					} catch (final MessageFormatException ex) {
+						JMSReaderLogic.this.getLog().error("Error delivering record", ex);
+					} catch (final JMSException ex) {
+						JMSReaderLogic.this.getLog().error("Error delivering record", ex);
+					} catch (final Exception ex) { // NOPMD NOCS (catch Exception)
+						JMSReaderLogic.this.getLog().error("Error delivering record", ex);
+					}
+				} else {
+					JMSReaderLogic.this.getLog().warn("Received message of invalid type: " + jmsMessage.getClass().getName());
+				}
+			}
+		}
+	}
+}
diff --git a/src/test/java/kieker/analysis/plugin/reader/amqp/AMQPReaderStageTest.java b/src/test/java/kieker/analysis/plugin/reader/amqp/AMQPReaderTest.java
similarity index 98%
rename from src/test/java/kieker/analysis/plugin/reader/amqp/AMQPReaderStageTest.java
rename to src/test/java/kieker/analysis/plugin/reader/amqp/AMQPReaderTest.java
index 91da1d96..833ac3b3 100644
--- a/src/test/java/kieker/analysis/plugin/reader/amqp/AMQPReaderStageTest.java
+++ b/src/test/java/kieker/analysis/plugin/reader/amqp/AMQPReaderTest.java
@@ -30,7 +30,7 @@ import kieker.common.util.registry.IMonitoringRecordReceiver;
 import kieker.common.util.registry.IRegistry;
 import kieker.common.util.registry.Registry;
 
-public class AMQPReaderStageTest {
+public class AMQPReaderTest {
 
 	private AMQPTestReaderThread amqpReaderThread = null;
 	private AMQPTestWriter amqpWriter = null;
@@ -217,12 +217,12 @@ final class AMQPTestReaderThread extends Thread {
 
 	private static final Log LOG = LogFactory.getLog(AMQPTestReaderThread.class);
 
-	private final AMQPReaderStage amqpReaderStage;
+	private final AMQPReader amqpReaderStage;
 
 	private final List<IMonitoringRecord> outputList = new LinkedList<>();
 
 	public AMQPTestReaderThread(final String uri, final int heartbeat, final String exchangeName, final String queueName) {
-		amqpReaderStage = new AMQPReaderStage(uri, queueName, heartbeat, LOG);
+		amqpReaderStage = new AMQPReader(uri, queueName, heartbeat, LOG);
 	}
 
 	@Override
diff --git a/src/test/java/kieker/analysis/plugin/reader/jms/JMSReaderStageTest.java b/src/test/java/kieker/analysis/plugin/reader/jms/JMSReaderTest.java
similarity index 96%
rename from src/test/java/kieker/analysis/plugin/reader/jms/JMSReaderStageTest.java
rename to src/test/java/kieker/analysis/plugin/reader/jms/JMSReaderTest.java
index a6cfadcc..7fa3d00e 100644
--- a/src/test/java/kieker/analysis/plugin/reader/jms/JMSReaderStageTest.java
+++ b/src/test/java/kieker/analysis/plugin/reader/jms/JMSReaderTest.java
@@ -30,7 +30,7 @@ import kieker.common.logging.LogFactory;
 import kieker.common.record.IMonitoringRecord;
 import kieker.common.record.system.CPUUtilizationRecord;
 
-public class JMSReaderStageTest {
+public class JMSReaderTest {
 
 	private CPUUtilizationRecord monitoringRecord;
 	private JMSTestReaderThread jmsReaderThread;
@@ -119,7 +119,7 @@ public class JMSReaderStageTest {
  */
 final class JMSTestWriter {
 
-	private static final Log LOG = LogFactory.getLog(JMSReaderStageTest.class);
+	private static final Log LOG = LogFactory.getLog(JMSReaderTest.class);
 
 	private final Connection connection;
 	private final Session session;
@@ -184,12 +184,12 @@ final class JMSTestReaderThread extends Thread {
 
 	private static final Log LOG = LogFactory.getLog(JMSTestReaderThread.class);
 
-	private final JMSReaderStage jmsReaderStage;
+	private final JMSReader jmsReaderStage;
 
 	private final List<IMonitoringRecord> outputList = new LinkedList<>();
 
 	public JMSTestReaderThread(final String jmsProviderUrl, final String jmsDestination, final String jmsFactoryLookupName) {
-		jmsReaderStage = new JMSReaderStage(jmsProviderUrl, jmsDestination, jmsFactoryLookupName, LOG);
+		jmsReaderStage = new JMSReader(jmsProviderUrl, jmsDestination, jmsFactoryLookupName, LOG);
 	}
 
 	@Override
-- 
GitLab