Skip to content
Snippets Groups Projects
Commit 2e3fed2e authored by Florian Fittkau's avatar Florian Fittkau
Browse files

moved to TCP instead of RabbitMQ

parent 13534d18
No related branches found
No related tags found
No related merge requests found
package explorviz.hpc_monitoring.reader;
import java.io.*;
import kieker.common.logging.Log;
import kieker.common.logging.LogFactory;
import com.rabbitmq.client.*;
import explorviz.hpc_monitoring.StringRegistryRecord;
public class RabbitMQRegistryConsumer extends Thread {
static final Log LOG = LogFactory.getLog(RabbitMQReader.class); // NOPMD
// package
// for
// inner
// class
private final RabbitMQReader parent;
private Connection connection;
private Channel channel;
private ConnectionFactory factory;
private QueueingConsumer registryConsumer;
private final String providerUrl;
private final String queueName;
private final String password;
private final String username;
private final int port;
public RabbitMQRegistryConsumer(RabbitMQReader parent, String providerUrl,
String queueName, String password, String username, int port) {
this.parent = parent;
this.providerUrl = providerUrl;
this.queueName = queueName;
this.password = password;
this.username = username;
this.port = port;
}
@Override
public void run() {
try {
createConnectionFactory();
connect();
while (!Thread.interrupted()) {
if (!connection.isOpen() || !channel.isOpen()) {
reconnect();
}
final QueueingConsumer.Delivery delivery = registryConsumer
.nextDelivery();
final Object message = readObjectFromBytes(delivery.getBody());
if (message instanceof StringRegistryRecord) {
final StringRegistryRecord record = (StringRegistryRecord) message;
parent.addToRegistry(record.getId(), record.getObject());
}
}
}
catch (final IOException ex) { // NOPMD NOCS (IllegalCatchCheck)
LOG.error("Error in read()", ex);
}
catch (final ClassNotFoundException e) {
LOG.error("Error in read(): ClassNotFound Exception", e);
}
catch (final ShutdownSignalException e) {
LOG.error("Error in read(): ShutdownSignal Exception", e);
}
catch (final ConsumerCancelledException e) {
LOG.error("Error in read(): ConsumerCancelled Exception", e);
}
catch (final InterruptedException e) {
LOG.error("Error in read(): Interrupted Exception", e);
}
finally {
try {
if (connection != null) {
connection.close();
}
}
catch (final IOException e) {
LOG.error("Error in read()", e);
}
}
}
private void createConnectionFactory() {
factory = new ConnectionFactory();
factory.setHost(providerUrl);
factory.setPort(port);
factory.setConnectionTimeout(0);
factory.setUsername(username);
factory.setPassword(password);
}
private void connect() throws IOException {
connection = factory.newConnection();
channel = connection.createChannel();
channel.queueDeclare(queueName, false, false, false, null);
registryConsumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, registryConsumer);
}
private void reconnect() {
try {
connect();
}
catch (final IOException e) {
RabbitMQReader.LOG.error("Error reestablishing connection", e);
}
}
private Object readObjectFromBytes(final byte[] bs) throws IOException,
ClassNotFoundException {
final ByteArrayInputStream bain = new ByteArrayInputStream(bs);
final ObjectInputStream in = new ObjectInputStream(bain);
final Object message = in.readObject();
return message;
}
}
......@@ -16,12 +16,13 @@
package explorviz.hpc_monitoring.reader;
import java.io.ByteArrayOutputStream;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.zip.Inflater;
import kieker.analysis.IProjectContext;
import kieker.analysis.plugin.annotation.*;
import kieker.analysis.plugin.reader.AbstractReaderPlugin;
......@@ -29,7 +30,6 @@ import kieker.common.configuration.Configuration;
import kieker.common.logging.Log;
import kieker.common.logging.LogFactory;
import kieker.common.record.IMonitoringRecord;
import com.rabbitmq.client.*;
import explorviz.hpc_monitoring.Bits;
import explorviz.hpc_monitoring.record.Trace;
import explorviz.hpc_monitoring.record.events.*;
......@@ -42,58 +42,43 @@ import explorviz.hpc_monitoring.record.events.*;
*
* @since 1.8
*/
@Plugin(description = "A reader which reads records from a RabbitMQ queue", dependencies = "This plugin needs the file 'rabbitmq.client-*.jar'.", outputPorts = { @OutputPort(name = RabbitMQReader.OUTPUT_PORT_NAME_RECORDS, eventTypes = { IMonitoringRecord.class }, description = "Output Port of the JMSReader") }, configuration = {
@Property(name = RabbitMQReader.CONFIG_PROPERTY_NAME_PROVIDERURL, defaultValue = "localhost"),
@Property(name = RabbitMQReader.CONFIG_PROPERTY_NAME_QUEUE, defaultValue = "kieker"),
@Property(name = RabbitMQReader.CONFIG_PROPERTY_PASSWORD, defaultValue = "guest"),
@Property(name = RabbitMQReader.CONFIG_PROPERTY_USER, defaultValue = "guest"),
@Property(name = RabbitMQReader.CONFIG_PROPERTY_PORT, defaultValue = "5672")
@Plugin(description = "A reader which reads records from a RabbitMQ queue", dependencies = "This plugin needs the file 'rabbitmq.client-*.jar'.", outputPorts = { @OutputPort(name = TCPReader.OUTPUT_PORT_NAME_RECORDS, eventTypes = { Object.class }, description = "Output Port of the JMSReader") }, configuration = {
@Property(name = TCPReader.CONFIG_PROPERTY_NAME_PROVIDERURL, defaultValue = "localhost"),
@Property(name = TCPReader.CONFIG_PROPERTY_PORT, defaultValue = "9876")
})
public final class RabbitMQReader extends AbstractReaderPlugin {
public final class TCPReader extends AbstractReaderPlugin {
private static final int MESSAGE_BUFFER_SIZE = 65536;
private final byte[] messages = new byte[MESSAGE_BUFFER_SIZE];
/** The name of the output port delivering the received records. */
public static final String OUTPUT_PORT_NAME_RECORDS = "monitoringRecords";
public static final String OUTPUT_PORT_NAME_RECORDS = "monitoringRecords";
/** The name of the configuration determining the RabbitMQ provider URL. */
public static final String CONFIG_PROPERTY_NAME_PROVIDERURL = "mqProviderUrl";
/**
* The name of the configuration determining the RabbitMQ Queue (e.g.
* queue1).
*/
public static final String CONFIG_PROPERTY_NAME_QUEUE = "mqDestination";
/** The username that is used to connect to a queue. */
public static final String CONFIG_PROPERTY_USER = "guest";
/** The password that is used to connect to a queue. */
public static final String CONFIG_PROPERTY_PASSWORD = "guest";
public static final String CONFIG_PROPERTY_NAME_PROVIDERURL = "mqProviderUrl";
/** The port that is used to connect to a queue. */
public static final String CONFIG_PROPERTY_PORT = "5672";
static final Log LOG = LogFactory
.getLog(RabbitMQReader.class); // NOPMD
// package
// for
// inner
// class
private final String providerUrl;
private final String queueName;
private final String password;
private final String username;
private final int port;
private Connection connection;
private Channel channel;
private ConnectionFactory factory;
private QueueingConsumer normalConsumer;
private final CountDownLatch cdLatch = new CountDownLatch(
1);
private final Map<Integer, String> stringRegistry = new ConcurrentHashMap<Integer, String>(
16,
0.75f,
2);
private final RabbitMQRegistryConsumer registryConsumer;
public static final String CONFIG_PROPERTY_PORT = "9876";
static final Log LOG = LogFactory
.getLog(TCPReader.class); // NOPMD
// package
// for
// inner
// class
private final String providerUrl;
private final int port;
private final CountDownLatch cdLatch = new CountDownLatch(
1);
private final Map<Integer, String> stringRegistry = new ConcurrentHashMap<Integer, String>(
16,
0.75f,
2);
private ServerSocket serversocket;
private final boolean active = true;
/**
* Creates a new instance of this class using the given parameters.
......@@ -120,7 +105,7 @@ public final class RabbitMQReader extends AbstractReaderPlugin {
* @throws IllegalArgumentException
* If one of the properties is empty.
*/
public RabbitMQReader(final Configuration configuration,
public TCPReader(final Configuration configuration,
final IProjectContext projectContext)
throws IllegalArgumentException {
super(configuration, projectContext);
......@@ -128,21 +113,11 @@ public final class RabbitMQReader extends AbstractReaderPlugin {
// Initialize the reader bases on the given configuration.
providerUrl = configuration
.getStringProperty(CONFIG_PROPERTY_NAME_PROVIDERURL);
queueName = configuration.getStringProperty(CONFIG_PROPERTY_NAME_QUEUE);
username = configuration.getStringProperty(CONFIG_PROPERTY_USER);
password = configuration.getStringProperty(CONFIG_PROPERTY_PASSWORD);
port = configuration.getIntProperty(CONFIG_PROPERTY_PORT);
// simple sanity check
if ((providerUrl.length() == 0) || (queueName.length() == 0)) {
throw new IllegalArgumentException(
"RabbitMQReader has not sufficient parameters. providerUrl ('"
+ providerUrl + "') or destination ('" + queueName
+ "'), is null");
}
registryConsumer = new RabbitMQRegistryConsumer(this, providerUrl,
"registryRecords", username, password, port);
registryConsumer.start();
// registryConsumer = new RabbitMQRegistryConsumer(this, providerUrl,
// "registryRecords", username, password, port);
// registryConsumer.start();
}
/**
......@@ -169,7 +144,7 @@ public final class RabbitMQReader extends AbstractReaderPlugin {
* @deprecated To be removed in Kieker 1.8.
*/
@Deprecated
public RabbitMQReader(final Configuration configuration)
public TCPReader(final Configuration configuration)
throws IllegalArgumentException {
this(configuration, null);
}
......@@ -182,45 +157,39 @@ public final class RabbitMQReader extends AbstractReaderPlugin {
public boolean read() {
boolean retVal = true;
try {
createConnectionFactory();
connect();
while (!Thread.interrupted()) {
if (!connection.isOpen() || !channel.isOpen()) {
reconnect();
open();
while (active) {
// TODO only one connection!
final Socket socket = serversocket.accept();
BufferedInputStream bufferedInputStream = new BufferedInputStream(
socket.getInputStream(), MESSAGE_BUFFER_SIZE);
int readSize = 0;
int toReadOffset = 0;
while ((readSize = bufferedInputStream.read(messages,
toReadOffset, MESSAGE_BUFFER_SIZE - toReadOffset)) != -1) {
byte[] unreadBytes = messagesfromByteArray(messages,
readSize + toReadOffset);
if (unreadBytes != null) {
toReadOffset = unreadBytes.length;
System.arraycopy(unreadBytes, 0, messages, 0,
toReadOffset);
}
else {
toReadOffset = 0;
}
}
final QueueingConsumer.Delivery delivery = normalConsumer
.nextDelivery();
byte[] batchedMessages = delivery.getBody();
messagesfromByteArray(batchedMessages);
normalConsumer.getChannel().basicAck(
delivery.getEnvelope().getDeliveryTag(), false);
socket.close();
}
}
catch (final IOException ex) { // NOPMD NOCS (IllegalCatchCheck)
catch (final IOException ex) { // NOPMD NOCS
// (IllegalCatchCheck)
LOG.error("Error in read()", ex);
retVal = false;
}
catch (final ShutdownSignalException e) {
LOG.error("Error in read(): ShutdownSignal Exception", e);
retVal = false;
}
catch (final ConsumerCancelledException e) {
LOG.error("Error in read(): ConsumerCancelled Exception", e);
retVal = false;
}
catch (final InterruptedException e) {
LOG.error("Error in read(): Interrupted Exception", e);
retVal = false;
}
finally {
try {
if (connection != null) {
connection.close();
}
serversocket.close();
}
catch (final IOException e) {
LOG.error("Error in read()", e);
......@@ -229,73 +198,52 @@ public final class RabbitMQReader extends AbstractReaderPlugin {
return retVal;
}
public byte[] decompressByteArray(byte[] bytes) {
ByteArrayOutputStream baos = null;
Inflater iflr = new Inflater();
iflr.setInput(bytes);
baos = new ByteArrayOutputStream();
byte[] tmp = new byte[4096];
try {
while (!iflr.finished()) {
int size = iflr.inflate(tmp);
baos.write(tmp, 0, size);
}
}
catch (Exception ex) {
}
finally {
try {
if (baos != null) {
baos.close();
}
}
catch (Exception ex) {}
}
return baos.toByteArray();
}
private void createConnectionFactory() {
factory = new ConnectionFactory();
factory.setHost(providerUrl);
factory.setPort(port);
factory.setConnectionTimeout(0);
factory.setUsername(username);
factory.setPassword(password);
private void open() throws IOException {
serversocket = new ServerSocket(port);
}
private void connect() throws IOException {
connection = factory.newConnection();
channel = connection.createChannel();
channel.queueDeclare(queueName, false, false, false, null);
normalConsumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, false, normalConsumer);
channel.basicQos(50);
}
private void reconnect() {
try {
connect();
}
catch (final IOException e) {
RabbitMQReader.LOG.error("Error reestablishing connection", e);
}
}
private void messagesfromByteArray(final byte[] b) {
private byte[] messagesfromByteArray(final byte[] b, int readSize) {
int offset = 0;
int firstValue;
while ((firstValue = Bits.getInt(b, offset)) != -1) {
while (offset < readSize) {
if ((readSize - offset) < 4) {
return createUnreadBytesArray(b, readSize, offset, false);
}
final int clazzId = Bits.getInt(b, offset);
offset += 4;
switch (clazzId) {
case 0: {
if ((readSize - offset) < (8 + 4 + 8 + 4)) {
return createUnreadBytesArray(b, readSize, offset, true);
}
}
case 1: {
if ((readSize - offset) < (8 + 8 + 4 + 4)) {
return createUnreadBytesArray(b, readSize, offset, true);
}
}
case 2: {
if ((readSize - offset) < (8 + 8 + 4 + 4 + 4)) {
return createUnreadBytesArray(b, readSize, offset, true);
}
}
case 3: {
if ((readSize - offset) < (8 + 8 + 4 + 4)) {
return createUnreadBytesArray(b, readSize, offset, true);
}
}
case 4: {
if ((readSize - offset) < (4 + 4)) {
return createUnreadBytesArray(b, readSize, offset, true);
}
}
}
IMonitoringRecord record = null;
switch (firstValue) {
switch (clazzId) {
case 0: {
final long traceId = Bits.getLong(b, offset);
offset += 8;
......@@ -305,7 +253,6 @@ public final class RabbitMQReader extends AbstractReaderPlugin {
offset += 8;
final int parentOrderId = Bits.getInt(b, offset);
offset += 4;
offset += 4; // dummy for nextOrderIndex
record = new Trace(traceId,
getStringFromRegistry(hostnameId), parentTraceId,
......@@ -357,15 +304,49 @@ public final class RabbitMQReader extends AbstractReaderPlugin {
orderIndex, getStringFromRegistry(operationId));
break;
}
case 4: {
final Integer mapId = Bits.getInt(b, offset);
offset += 4;
final int stringLength = Bits.getInt(b, offset);
offset += 4;
if ((readSize - offset) < stringLength) {
return createUnreadBytesArray(b, readSize, offset - 8,
true);
}
byte[] stringBytes = new byte[stringLength];
System.arraycopy(b, offset, stringBytes, 0, stringLength);
String string = new String(stringBytes);
offset += stringLength;
addToRegistry(mapId, string);
break;
}
default: {
LOG.error("unknown class id " + firstValue);
LOG.error("unknown class id " + clazzId);
}
}
if (!super.deliver(OUTPUT_PORT_NAME_RECORDS, record)) {
if ((record != null)
&& !super.deliver(OUTPUT_PORT_NAME_RECORDS, record)) {
LOG.error("deliverRecord returned false");
}
}
return null;
}
private byte[] createUnreadBytesArray(final byte[] b, int readSize,
int offset, boolean withClazzId) {
if (withClazzId) {
offset -= 4;
}
final int unreadBytesSize = readSize - offset;
final byte[] unreadBytes = new byte[unreadBytesSize];
System.arraycopy(b, offset, unreadBytes, 0, unreadBytesSize);
return unreadBytes;
}
final void unblock() { // NOPMD (package visible for inner class)
......@@ -377,7 +358,6 @@ public final class RabbitMQReader extends AbstractReaderPlugin {
*/
public void terminate(final boolean error) {
LOG.info("Shutdown of RabbitMQReader requested.");
registryConsumer.interrupt();
unblock();
}
......@@ -390,9 +370,6 @@ public final class RabbitMQReader extends AbstractReaderPlugin {
configuration
.setProperty(CONFIG_PROPERTY_NAME_PROVIDERURL, providerUrl);
configuration.setProperty(CONFIG_PROPERTY_NAME_QUEUE, queueName);
configuration.setProperty(CONFIG_PROPERTY_USER, username);
configuration.setProperty(CONFIG_PROPERTY_PASSWORD, password);
return configuration;
}
......
......@@ -6,11 +6,11 @@ import kieker.analysis.IAnalysisController
import kieker.analysis.plugin.filter.forward.TeeFilter
import kieker.analysis.plugin.filter.forward.CountingThroughputFilter
import explorviz.hpc_monitoring.reader.RabbitMQReader
import kieker.analysis.plugin.reader.timer.TimeReader
import explorviz.hpc_monitoring.plugin.EventRecordTraceReconstructionFilter
import explorviz.hpc_monitoring.plugin.TraceEventRecordAggregationFilter
import explorviz.hpc_monitoring.connector.RabbitMQConnector
import explorviz.hpc_monitoring.reader.TCPReader
class WorkerController {
......@@ -21,38 +21,39 @@ class WorkerController {
val rabbitMQ = initRabbitMQ()
val eventTraceReconstructionFilter = initEventRecordTraceReconstructionFilter()
val aggregationFilter = initAggregationFilter()
val timer = initTimer()
// val countingThroughputFilter = initCountingThroughputFilter()
// val teeFilter = initTeeFilter()
val rabbitMQConnector = initRabbitMQConnector()
// val aggregationFilter = initAggregationFilter()
// val timer = initTimer()
val countingThroughputFilter = initCountingThroughputFilter()
val teeFilter = initTeeFilter()
// val rabbitMQConnector = initRabbitMQConnector()
analysisInstance.connect(rabbitMQ, RabbitMQReader::OUTPUT_PORT_NAME_RECORDS, eventTraceReconstructionFilter,
analysisInstance.connect(rabbitMQ, TCPReader::OUTPUT_PORT_NAME_RECORDS, eventTraceReconstructionFilter,
EventRecordTraceReconstructionFilter::INPUT_PORT_NAME_TRACE_RECORDS)
// analysisInstance.connect(eventTraceReconstructionFilter,
// EventRecordTraceReconstructionFilter::OUTPUT_PORT_NAME_TRACE_VALID, countingThroughputFilter,
// CountingThroughputFilter::INPUT_PORT_NAME_OBJECTS)
analysisInstance.connect(eventTraceReconstructionFilter,
EventRecordTraceReconstructionFilter::OUTPUT_PORT_NAME_TRACE_VALID, countingThroughputFilter,
CountingThroughputFilter::INPUT_PORT_NAME_OBJECTS)
analysisInstance.connect(eventTraceReconstructionFilter,
EventRecordTraceReconstructionFilter::OUTPUT_PORT_NAME_TRACE_VALID, aggregationFilter,
TraceEventRecordAggregationFilter::INPUT_PORT_NAME_TRACES)
analysisInstance.connect(eventTraceReconstructionFilter,
EventRecordTraceReconstructionFilter::OUTPUT_PORT_NAME_TRACE_INVALID, rabbitMQConnector,
RabbitMQConnector::INPUT_PORT_NAME_INVALID_TRACES)
analysisInstance.connect(timer,
TimeReader::OUTPUT_PORT_NAME_TIMESTAMPS, aggregationFilter,
TraceEventRecordAggregationFilter::INPUT_PORT_NAME_TIME_EVENT)
analysisInstance.connect(countingThroughputFilter,
CountingThroughputFilter::OUTPUT_PORT_NAME_THROUGHPUT, teeFilter,
TeeFilter::INPUT_PORT_NAME_EVENTS)
// analysisInstance.connect(eventTraceReconstructionFilter,
// EventRecordTraceReconstructionFilter::OUTPUT_PORT_NAME_TRACE_VALID, aggregationFilter,
// TraceEventRecordAggregationFilter::INPUT_PORT_NAME_TRACES)
//
// analysisInstance.connect(eventTraceReconstructionFilter,
// EventRecordTraceReconstructionFilter::OUTPUT_PORT_NAME_TRACE_INVALID, rabbitMQConnector,
// RabbitMQConnector::INPUT_PORT_NAME_INVALID_TRACES)
//
// analysisInstance.connect(timer,
// TimeReader::OUTPUT_PORT_NAME_TIMESTAMPS, aggregationFilter,
// TraceEventRecordAggregationFilter::INPUT_PORT_NAME_TIME_EVENT)
// analysisInstance.connect(aggregationFilter,
// TraceEventRecordAggregationFilter::OUTPUT_PORT_NAME_TRACES, teeFilter,
// TeeFilter::INPUT_PORT_NAME_EVENTS)
analysisInstance.connect(aggregationFilter,
TraceEventRecordAggregationFilter::OUTPUT_PORT_NAME_TRACES, rabbitMQConnector,
RabbitMQConnector::INPUT_PORT_NAME_VALID_TRACES)
// analysisInstance.connect(aggregationFilter,
// TraceEventRecordAggregationFilter::OUTPUT_PORT_NAME_TRACES, rabbitMQConnector,
// RabbitMQConnector::INPUT_PORT_NAME_VALID_TRACES)
try {
analysisInstance.run()
......@@ -63,15 +64,14 @@ class WorkerController {
def initRabbitMQ() {
val rabbitConfig = new Configuration()
rabbitConfig.setProperty(RabbitMQReader::CONFIG_PROPERTY_NAME_PROVIDERURL, "localhost")
rabbitConfig.setProperty(RabbitMQReader::CONFIG_PROPERTY_NAME_QUEUE, "kieker")
new RabbitMQReader(rabbitConfig, analysisInstance)
rabbitConfig.setProperty(TCPReader::CONFIG_PROPERTY_NAME_PROVIDERURL, "127.0.0.1")
new TCPReader(rabbitConfig, analysisInstance)
}
def initRabbitMQConnector() {
val rabbitConfig = new Configuration()
rabbitConfig.setProperty(RabbitMQReader::CONFIG_PROPERTY_NAME_PROVIDERURL, "localhost")
rabbitConfig.setProperty(RabbitMQReader::CONFIG_PROPERTY_NAME_QUEUE, "validTraces")
rabbitConfig.setProperty(RabbitMQConnector::CONFIG_PROPERTY_NAME_PROVIDER, "localhost")
rabbitConfig.setProperty(RabbitMQConnector::CONFIG_PROPERTY_NAME_QUEUE, "validTraces")
new RabbitMQConnector(rabbitConfig, analysisInstance)
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment