From 86d47cde425faed87743523a664fbeac04286549 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lars=20Blu=CC=88mke?= <lbl@informatik.uni-kiel.de> Date: Wed, 11 May 2016 20:28:37 +0200 Subject: [PATCH] first version of amqp reader test - test record is not recognized as a regular record yet --- .../reader/amqp/AMQPReaderLogicModule.java | 8 ++ .../plugin/reader/amqp/AMQPReaderTest.java | 118 ++++++++++++++++++ 2 files changed, 126 insertions(+) create mode 100644 src/test/java/kieker/analysis/plugin/reader/amqp/AMQPReaderTest.java diff --git a/src/main/java/kieker/analysis/plugin/reader/amqp/AMQPReaderLogicModule.java b/src/main/java/kieker/analysis/plugin/reader/amqp/AMQPReaderLogicModule.java index f5f9de21..7f3d07c7 100644 --- a/src/main/java/kieker/analysis/plugin/reader/amqp/AMQPReaderLogicModule.java +++ b/src/main/java/kieker/analysis/plugin/reader/amqp/AMQPReaderLogicModule.java @@ -139,6 +139,8 @@ public final class AMQPReaderLogicModule { } public boolean read() { + // System.out.println(("I'm now in read()")); + // Start the worker threads, if necessary if (!this.threadsStarted) { this.registryRecordHandlerThread.start(); @@ -150,9 +152,14 @@ public final class AMQPReaderLogicModule { this.channel.basicConsume(this.queueName, true, this.consumer); while (!this.terminated) { + System.out.println("bp1"); final QueueingConsumer.Delivery delivery = this.consumer.nextDelivery(); + System.out.println("bp2"); final byte[] body = delivery.getBody(); + String bodyStr = new String(body); + System.out.println("Received: " + bodyStr); + final ByteBuffer buffer = ByteBuffer.wrap(body); final byte recordType = buffer.get(); @@ -177,6 +184,7 @@ public final class AMQPReaderLogicModule { return false; } + // System.out.println("Exiting read()"); return true; } diff --git a/src/test/java/kieker/analysis/plugin/reader/amqp/AMQPReaderTest.java b/src/test/java/kieker/analysis/plugin/reader/amqp/AMQPReaderTest.java new file mode 100644 index 00000000..0dc87c7c --- /dev/null +++ b/src/test/java/kieker/analysis/plugin/reader/amqp/AMQPReaderTest.java @@ -0,0 +1,118 @@ +package kieker.analysis.plugin.reader.amqp; + +import static teetime.framework.test.StageTester.test; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectOutputStream; +import java.net.URISyntaxException; +import java.security.KeyManagementException; +import java.security.NoSuchAlgorithmException; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.TimeoutException; + +import org.junit.Before; +import org.junit.Test; + +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; + +import kieker.common.logging.Log; +import kieker.common.logging.LogFactory; +import kieker.common.record.IMonitoringRecord; +import kieker.common.record.system.CPUUtilizationRecord; + +public class AMQPReaderTest { + + private AMQPReader amqpReader; + private CPUUtilizationRecord record; + + // AMQPReader constructor arguments + private final String uri = "amqp://guest:guest@localhost"; // See the amqp uri scheme documentation for detailed information about uri structure + // (amqp://localhost works as well) + private final String queueName = "testQueue"; + private final int heartbeat = 60; + private final Log log = LogFactory.getLog(this.getClass().getName()); + + // AMQP connection parts + private Connection connection; + private Channel channel; + + // Record data + private final long timestamp = 1L; + private final String hostname = "test_host"; + private final String cpuID = "cpu_1"; + private final double user = 2.0; + private final double system = 3.0; + private final double wait = 4.0; + private final double nice = 5.0; + private final double irq = 6.0; + private final double totalUtilisation = 7.0; + private final double idle = 8.0; + + @Before + public void initializeAMQPReader() { + amqpReader = new AMQPReader(uri, queueName, heartbeat, log); + record = new CPUUtilizationRecord(timestamp, hostname, cpuID, user, system, wait, nice, irq, totalUtilisation, idle); + } + + @Test + public void simpleAMQPTest() { + new ReaderThread().start(); + + try { + connection = createConnection(); + channel = connection.createChannel(); + channel.queueDeclare(queueName, false, false, false, null); + + // Sending the test record as a byte array + channel.basicPublish("", queueName, null, serialize(record)); + System.out.println("Sent record"); + + channel.close(); + connection.close(); + } catch (Exception e) { + e.printStackTrace(); + } + } + + private Connection createConnection() throws IOException, TimeoutException, KeyManagementException, NoSuchAlgorithmException, URISyntaxException { + final ConnectionFactory connectionFactory = new ConnectionFactory(); + + connectionFactory.setUri(this.uri); + connectionFactory.setRequestedHeartbeat(this.heartbeat); + + return connectionFactory.newConnection(); + } + + private byte[] serialize(final Object object) { + try { + ByteArrayOutputStream b = new ByteArrayOutputStream(); + ObjectOutputStream o = new ObjectOutputStream(b); + + o.writeObject(object); + + return b.toByteArray(); + + } catch (IOException e) { + e.printStackTrace(); + return null; + } + } + + private class ReaderThread extends Thread { + List<IMonitoringRecord> outputList = new LinkedList<>(); + + @Override + public void run() { + // amqpReader.execute(); + test(amqpReader).and().receive(outputList).from(amqpReader.getOutputPort()).start(); + + for (IMonitoringRecord r : outputList) { + System.out.println("Finally delivered: " + r.toString()); + } + } + } +} -- GitLab