Skip to content
Snippets Groups Projects
Commit 86d47cde authored by Lars Erik Blümke's avatar Lars Erik Blümke
Browse files

first version of amqp reader test - test record is not recognized as a regular record yet

parent aa357282
No related branches found
No related tags found
No related merge requests found
...@@ -139,6 +139,8 @@ public final class AMQPReaderLogicModule { ...@@ -139,6 +139,8 @@ public final class AMQPReaderLogicModule {
} }
public boolean read() { public boolean read() {
// System.out.println(("I'm now in read()"));
// Start the worker threads, if necessary // Start the worker threads, if necessary
if (!this.threadsStarted) { if (!this.threadsStarted) {
this.registryRecordHandlerThread.start(); this.registryRecordHandlerThread.start();
...@@ -150,9 +152,14 @@ public final class AMQPReaderLogicModule { ...@@ -150,9 +152,14 @@ public final class AMQPReaderLogicModule {
this.channel.basicConsume(this.queueName, true, this.consumer); this.channel.basicConsume(this.queueName, true, this.consumer);
while (!this.terminated) { while (!this.terminated) {
System.out.println("bp1");
final QueueingConsumer.Delivery delivery = this.consumer.nextDelivery(); final QueueingConsumer.Delivery delivery = this.consumer.nextDelivery();
System.out.println("bp2");
final byte[] body = delivery.getBody(); final byte[] body = delivery.getBody();
String bodyStr = new String(body);
System.out.println("Received: " + bodyStr);
final ByteBuffer buffer = ByteBuffer.wrap(body); final ByteBuffer buffer = ByteBuffer.wrap(body);
final byte recordType = buffer.get(); final byte recordType = buffer.get();
...@@ -177,6 +184,7 @@ public final class AMQPReaderLogicModule { ...@@ -177,6 +184,7 @@ public final class AMQPReaderLogicModule {
return false; return false;
} }
// System.out.println("Exiting read()");
return true; return true;
} }
......
package kieker.analysis.plugin.reader.amqp;
import static teetime.framework.test.StageTester.test;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.net.URISyntaxException;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeoutException;
import org.junit.Before;
import org.junit.Test;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import kieker.common.logging.Log;
import kieker.common.logging.LogFactory;
import kieker.common.record.IMonitoringRecord;
import kieker.common.record.system.CPUUtilizationRecord;
public class AMQPReaderTest {
private AMQPReader amqpReader;
private CPUUtilizationRecord record;
// AMQPReader constructor arguments
private final String uri = "amqp://guest:guest@localhost"; // See the amqp uri scheme documentation for detailed information about uri structure
// (amqp://localhost works as well)
private final String queueName = "testQueue";
private final int heartbeat = 60;
private final Log log = LogFactory.getLog(this.getClass().getName());
// AMQP connection parts
private Connection connection;
private Channel channel;
// Record data
private final long timestamp = 1L;
private final String hostname = "test_host";
private final String cpuID = "cpu_1";
private final double user = 2.0;
private final double system = 3.0;
private final double wait = 4.0;
private final double nice = 5.0;
private final double irq = 6.0;
private final double totalUtilisation = 7.0;
private final double idle = 8.0;
@Before
public void initializeAMQPReader() {
amqpReader = new AMQPReader(uri, queueName, heartbeat, log);
record = new CPUUtilizationRecord(timestamp, hostname, cpuID, user, system, wait, nice, irq, totalUtilisation, idle);
}
@Test
public void simpleAMQPTest() {
new ReaderThread().start();
try {
connection = createConnection();
channel = connection.createChannel();
channel.queueDeclare(queueName, false, false, false, null);
// Sending the test record as a byte array
channel.basicPublish("", queueName, null, serialize(record));
System.out.println("Sent record");
channel.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
private Connection createConnection() throws IOException, TimeoutException, KeyManagementException, NoSuchAlgorithmException, URISyntaxException {
final ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setUri(this.uri);
connectionFactory.setRequestedHeartbeat(this.heartbeat);
return connectionFactory.newConnection();
}
private byte[] serialize(final Object object) {
try {
ByteArrayOutputStream b = new ByteArrayOutputStream();
ObjectOutputStream o = new ObjectOutputStream(b);
o.writeObject(object);
return b.toByteArray();
} catch (IOException e) {
e.printStackTrace();
return null;
}
}
private class ReaderThread extends Thread {
List<IMonitoringRecord> outputList = new LinkedList<>();
@Override
public void run() {
// amqpReader.execute();
test(amqpReader).and().receive(outputList).from(amqpReader.getOutputPort()).start();
for (IMonitoringRecord r : outputList) {
System.out.println("Finally delivered: " + r.toString());
}
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment