Skip to content
Snippets Groups Projects
Commit 822fd3f6 authored by Lars Erik Blümke's avatar Lars Erik Blümke
Browse files

removed debugging statements from AMQPReaderLogicModule and stashed buggy AMQPReaderTest for now

parent fe022edc
No related branches found
No related tags found
No related merge requests found
Pipeline #
......@@ -139,8 +139,6 @@ public final class AMQPReaderLogicModule {
}
public boolean read() {
// System.out.println(("I'm now in read()"));
// Start the worker threads, if necessary
if (!this.threadsStarted) {
this.registryRecordHandlerThread.start();
......@@ -188,7 +186,6 @@ public final class AMQPReaderLogicModule {
return false;
}
// System.out.println("Exiting read()");
return true;
}
......
package kieker.analysis.plugin.reader.amqp;
import static teetime.framework.test.StageTester.test;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeoutException;
import org.junit.Before;
import org.junit.Test;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import kieker.common.logging.Log;
import kieker.common.logging.LogFactory;
import kieker.common.record.IMonitoringRecord;
import kieker.common.record.misc.RegistryRecord;
import kieker.common.record.system.CPUUtilizationRecord;
import kieker.common.util.registry.ILookup;
import kieker.common.util.registry.Lookup;
public class AMQPReaderTest {
private AMQPReader amqpReader;
private CPUUtilizationRecord record;
private RegistryRecord registryRecord;
// AMQPReader constructor arguments
private final String uri = "amqp://guest:guest@localhost"; // See the amqp uri scheme documentation for detailed information about uri structure
// (amqp://localhost works as well)
private final String queueName = "testQueue";
private final int heartbeat = 60;
private final Log log = LogFactory.getLog(this.getClass().getName());
// AMQP connection parts
private Connection connection;
private Channel channel;
// Record data
private final long timestamp = 1L;
private final String hostname = "test_host";
private final String cpuID = "cpu_1";
private final double user = 2.0;
private final double system = 3.0;
private final double wait = 4.0;
private final double nice = 5.0;
private final double irq = 6.0;
private final double totalUtilisation = 7.0;
private final double idle = 8.0;
@Before
public void initializeAMQPReader() {
amqpReader = new AMQPReader(uri, queueName, heartbeat, log);
record = new CPUUtilizationRecord(timestamp, hostname, cpuID, user, system, wait, nice, irq, totalUtilisation, idle);
registryRecord = new RegistryRecord(0xFFFFFFFF, "holla");
}
@Test
public void simpleAMQPTest() {
new ReaderThread().start();
try {
connection = createConnection();
channel = connection.createChannel();
channel.queueDeclare(queueName, false, false, false, null);
ILookup<String> stringRegistry = new Lookup<String>();
// Sending the test record as a byte array
ByteBuffer buffer = ByteBuffer.allocate(1000);
registryRecord.writeBytes(buffer, stringRegistry);
System.out.println("RecordType before sending " + buffer.get() + buffer.get() + buffer.get());
// channel.basicPublish("", queueName, null, buffer);
System.out.println("Sent record");
channel.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
private Connection createConnection() throws IOException, TimeoutException, KeyManagementException, NoSuchAlgorithmException, URISyntaxException {
final ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setUri(this.uri);
connectionFactory.setRequestedHeartbeat(this.heartbeat);
return connectionFactory.newConnection();
}
private byte[] serialize(final Object object) {
try {
ByteArrayOutputStream b = new ByteArrayOutputStream();
ObjectOutputStream o = new ObjectOutputStream(b);
o.writeObject(object);
return b.toByteArray();
} catch (IOException e) {
e.printStackTrace();
return null;
}
}
private class ReaderThread extends Thread {
List<IMonitoringRecord> outputList = new LinkedList<>();
@Override
public void run() {
// amqpReader.execute();
test(amqpReader).and().receive(outputList).from(amqpReader.getOutputPort()).start();
for (IMonitoringRecord r : outputList) {
System.out.println("Finally delivered: " + r.toString());
}
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment