diff --git a/src/explorviz/hpc_monitoring/reader/RabbitMQRegistryConsumer.java b/src/explorviz/hpc_monitoring/reader/RabbitMQRegistryConsumer.java deleted file mode 100644 index 58d55ad8daecebab0245b39a12cc8d3ea77aa1ea..0000000000000000000000000000000000000000 --- a/src/explorviz/hpc_monitoring/reader/RabbitMQRegistryConsumer.java +++ /dev/null @@ -1,127 +0,0 @@ -package explorviz.hpc_monitoring.reader; - -import java.io.*; -import kieker.common.logging.Log; -import kieker.common.logging.LogFactory; -import com.rabbitmq.client.*; -import explorviz.hpc_monitoring.StringRegistryRecord; - -public class RabbitMQRegistryConsumer extends Thread { - - static final Log LOG = LogFactory.getLog(RabbitMQReader.class); // NOPMD - // package - // for - // inner - // class - - private final RabbitMQReader parent; - - private Connection connection; - private Channel channel; - private ConnectionFactory factory; - private QueueingConsumer registryConsumer; - - private final String providerUrl; - private final String queueName; - private final String password; - private final String username; - private final int port; - - public RabbitMQRegistryConsumer(RabbitMQReader parent, String providerUrl, - String queueName, String password, String username, int port) { - this.parent = parent; - this.providerUrl = providerUrl; - this.queueName = queueName; - this.password = password; - this.username = username; - this.port = port; - - } - - @Override - public void run() { - try { - createConnectionFactory(); - - connect(); - - while (!Thread.interrupted()) { - - if (!connection.isOpen() || !channel.isOpen()) { - reconnect(); - } - - final QueueingConsumer.Delivery delivery = registryConsumer - .nextDelivery(); - - final Object message = readObjectFromBytes(delivery.getBody()); - - if (message instanceof StringRegistryRecord) { - final StringRegistryRecord record = (StringRegistryRecord) message; - parent.addToRegistry(record.getId(), record.getObject()); - } - } - } - catch (final IOException ex) { // NOPMD NOCS (IllegalCatchCheck) - LOG.error("Error in read()", ex); - } - catch (final ClassNotFoundException e) { - LOG.error("Error in read(): ClassNotFound Exception", e); - } - catch (final ShutdownSignalException e) { - LOG.error("Error in read(): ShutdownSignal Exception", e); - } - catch (final ConsumerCancelledException e) { - LOG.error("Error in read(): ConsumerCancelled Exception", e); - } - catch (final InterruptedException e) { - LOG.error("Error in read(): Interrupted Exception", e); - } - finally { - try { - if (connection != null) { - connection.close(); - } - } - catch (final IOException e) { - LOG.error("Error in read()", e); - } - } - } - - private void createConnectionFactory() { - factory = new ConnectionFactory(); - factory.setHost(providerUrl); - factory.setPort(port); - factory.setConnectionTimeout(0); - factory.setUsername(username); - factory.setPassword(password); - } - - private void connect() throws IOException { - connection = factory.newConnection(); - channel = connection.createChannel(); - - channel.queueDeclare(queueName, false, false, false, null); - - registryConsumer = new QueueingConsumer(channel); - channel.basicConsume(queueName, true, registryConsumer); - } - - private void reconnect() { - try { - connect(); - } - catch (final IOException e) { - RabbitMQReader.LOG.error("Error reestablishing connection", e); - } - } - - private Object readObjectFromBytes(final byte[] bs) throws IOException, - ClassNotFoundException { - final ByteArrayInputStream bain = new ByteArrayInputStream(bs); - final ObjectInputStream in = new ObjectInputStream(bain); - final Object message = in.readObject(); - return message; - } -} diff --git a/src/explorviz/hpc_monitoring/reader/RabbitMQReader.java b/src/explorviz/hpc_monitoring/reader/TCPReader.java similarity index 58% rename from src/explorviz/hpc_monitoring/reader/RabbitMQReader.java rename to src/explorviz/hpc_monitoring/reader/TCPReader.java index 58ffd94b17e250c19aa3bc757e47e12c16697a0e..7848fb4da35b6d092b507e30afd3afb280b1374e 100644 --- a/src/explorviz/hpc_monitoring/reader/RabbitMQReader.java +++ b/src/explorviz/hpc_monitoring/reader/TCPReader.java @@ -16,12 +16,13 @@ package explorviz.hpc_monitoring.reader; -import java.io.ByteArrayOutputStream; +import java.io.BufferedInputStream; import java.io.IOException; +import java.net.ServerSocket; +import java.net.Socket; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; -import java.util.zip.Inflater; import kieker.analysis.IProjectContext; import kieker.analysis.plugin.annotation.*; import kieker.analysis.plugin.reader.AbstractReaderPlugin; @@ -29,7 +30,6 @@ import kieker.common.configuration.Configuration; import kieker.common.logging.Log; import kieker.common.logging.LogFactory; import kieker.common.record.IMonitoringRecord; -import com.rabbitmq.client.*; import explorviz.hpc_monitoring.Bits; import explorviz.hpc_monitoring.record.Trace; import explorviz.hpc_monitoring.record.events.*; @@ -42,58 +42,43 @@ import explorviz.hpc_monitoring.record.events.*; * * @since 1.8 */ -@Plugin(description = "A reader which reads records from a RabbitMQ queue", dependencies = "This plugin needs the file 'rabbitmq.client-*.jar'.", outputPorts = { @OutputPort(name = RabbitMQReader.OUTPUT_PORT_NAME_RECORDS, eventTypes = { IMonitoringRecord.class }, description = "Output Port of the JMSReader") }, configuration = { - @Property(name = RabbitMQReader.CONFIG_PROPERTY_NAME_PROVIDERURL, defaultValue = "localhost"), - @Property(name = RabbitMQReader.CONFIG_PROPERTY_NAME_QUEUE, defaultValue = "kieker"), - @Property(name = RabbitMQReader.CONFIG_PROPERTY_PASSWORD, defaultValue = "guest"), - @Property(name = RabbitMQReader.CONFIG_PROPERTY_USER, defaultValue = "guest"), - @Property(name = RabbitMQReader.CONFIG_PROPERTY_PORT, defaultValue = "5672") +@Plugin(description = "A reader which reads records from a RabbitMQ queue", dependencies = "This plugin needs the file 'rabbitmq.client-*.jar'.", outputPorts = { @OutputPort(name = TCPReader.OUTPUT_PORT_NAME_RECORDS, eventTypes = { Object.class }, description = "Output Port of the JMSReader") }, configuration = { + @Property(name = TCPReader.CONFIG_PROPERTY_NAME_PROVIDERURL, defaultValue = "localhost"), + @Property(name = TCPReader.CONFIG_PROPERTY_PORT, defaultValue = "9876") }) -public final class RabbitMQReader extends AbstractReaderPlugin { +public final class TCPReader extends AbstractReaderPlugin { + private static final int MESSAGE_BUFFER_SIZE = 65536; + + private final byte[] messages = new byte[MESSAGE_BUFFER_SIZE]; /** The name of the output port delivering the received records. */ - public static final String OUTPUT_PORT_NAME_RECORDS = "monitoringRecords"; + public static final String OUTPUT_PORT_NAME_RECORDS = "monitoringRecords"; /** The name of the configuration determining the RabbitMQ provider URL. */ - public static final String CONFIG_PROPERTY_NAME_PROVIDERURL = "mqProviderUrl"; - /** - * The name of the configuration determining the RabbitMQ Queue (e.g. - * queue1). - */ - public static final String CONFIG_PROPERTY_NAME_QUEUE = "mqDestination"; - /** The username that is used to connect to a queue. */ - public static final String CONFIG_PROPERTY_USER = "guest"; - /** The password that is used to connect to a queue. */ - public static final String CONFIG_PROPERTY_PASSWORD = "guest"; + public static final String CONFIG_PROPERTY_NAME_PROVIDERURL = "mqProviderUrl"; /** The port that is used to connect to a queue. */ - public static final String CONFIG_PROPERTY_PORT = "5672"; - - static final Log LOG = LogFactory - .getLog(RabbitMQReader.class); // NOPMD - // package - // for - // inner - // class - - private final String providerUrl; - private final String queueName; - private final String password; - private final String username; - private final int port; - private Connection connection; - private Channel channel; - private ConnectionFactory factory; - private QueueingConsumer normalConsumer; - - private final CountDownLatch cdLatch = new CountDownLatch( - 1); - - private final Map<Integer, String> stringRegistry = new ConcurrentHashMap<Integer, String>( - 16, - 0.75f, - 2); - - private final RabbitMQRegistryConsumer registryConsumer; + public static final String CONFIG_PROPERTY_PORT = "9876"; + + static final Log LOG = LogFactory + .getLog(TCPReader.class); // NOPMD + // package + // for + // inner + // class + + private final String providerUrl; + private final int port; + + private final CountDownLatch cdLatch = new CountDownLatch( + 1); + + private final Map<Integer, String> stringRegistry = new ConcurrentHashMap<Integer, String>( + 16, + 0.75f, + 2); + + private ServerSocket serversocket; + private final boolean active = true; /** * Creates a new instance of this class using the given parameters. @@ -120,7 +105,7 @@ public final class RabbitMQReader extends AbstractReaderPlugin { * @throws IllegalArgumentException * If one of the properties is empty. */ - public RabbitMQReader(final Configuration configuration, + public TCPReader(final Configuration configuration, final IProjectContext projectContext) throws IllegalArgumentException { super(configuration, projectContext); @@ -128,21 +113,11 @@ public final class RabbitMQReader extends AbstractReaderPlugin { // Initialize the reader bases on the given configuration. providerUrl = configuration .getStringProperty(CONFIG_PROPERTY_NAME_PROVIDERURL); - queueName = configuration.getStringProperty(CONFIG_PROPERTY_NAME_QUEUE); - username = configuration.getStringProperty(CONFIG_PROPERTY_USER); - password = configuration.getStringProperty(CONFIG_PROPERTY_PASSWORD); port = configuration.getIntProperty(CONFIG_PROPERTY_PORT); - // simple sanity check - if ((providerUrl.length() == 0) || (queueName.length() == 0)) { - throw new IllegalArgumentException( - "RabbitMQReader has not sufficient parameters. providerUrl ('" - + providerUrl + "') or destination ('" + queueName - + "'), is null"); - } - registryConsumer = new RabbitMQRegistryConsumer(this, providerUrl, - "registryRecords", username, password, port); - registryConsumer.start(); + // registryConsumer = new RabbitMQRegistryConsumer(this, providerUrl, + // "registryRecords", username, password, port); + // registryConsumer.start(); } /** @@ -169,7 +144,7 @@ public final class RabbitMQReader extends AbstractReaderPlugin { * @deprecated To be removed in Kieker 1.8. */ @Deprecated - public RabbitMQReader(final Configuration configuration) + public TCPReader(final Configuration configuration) throws IllegalArgumentException { this(configuration, null); } @@ -182,45 +157,39 @@ public final class RabbitMQReader extends AbstractReaderPlugin { public boolean read() { boolean retVal = true; try { - createConnectionFactory(); - connect(); - while (!Thread.interrupted()) { - - if (!connection.isOpen() || !channel.isOpen()) { - reconnect(); + open(); + while (active) { + // TODO only one connection! + final Socket socket = serversocket.accept(); + BufferedInputStream bufferedInputStream = new BufferedInputStream( + socket.getInputStream(), MESSAGE_BUFFER_SIZE); + int readSize = 0; + int toReadOffset = 0; + while ((readSize = bufferedInputStream.read(messages, + toReadOffset, MESSAGE_BUFFER_SIZE - toReadOffset)) != -1) { + byte[] unreadBytes = messagesfromByteArray(messages, + readSize + toReadOffset); + if (unreadBytes != null) { + toReadOffset = unreadBytes.length; + System.arraycopy(unreadBytes, 0, messages, 0, + toReadOffset); + } + else { + toReadOffset = 0; + } } - final QueueingConsumer.Delivery delivery = normalConsumer - .nextDelivery(); - - byte[] batchedMessages = delivery.getBody(); - messagesfromByteArray(batchedMessages); - - normalConsumer.getChannel().basicAck( - delivery.getEnvelope().getDeliveryTag(), false); + socket.close(); } } - catch (final IOException ex) { // NOPMD NOCS (IllegalCatchCheck) + catch (final IOException ex) { // NOPMD NOCS + // (IllegalCatchCheck) LOG.error("Error in read()", ex); retVal = false; } - catch (final ShutdownSignalException e) { - LOG.error("Error in read(): ShutdownSignal Exception", e); - retVal = false; - } - catch (final ConsumerCancelledException e) { - LOG.error("Error in read(): ConsumerCancelled Exception", e); - retVal = false; - } - catch (final InterruptedException e) { - LOG.error("Error in read(): Interrupted Exception", e); - retVal = false; - } finally { try { - if (connection != null) { - connection.close(); - } + serversocket.close(); } catch (final IOException e) { LOG.error("Error in read()", e); @@ -229,73 +198,52 @@ public final class RabbitMQReader extends AbstractReaderPlugin { return retVal; } - public byte[] decompressByteArray(byte[] bytes) { - - ByteArrayOutputStream baos = null; - Inflater iflr = new Inflater(); - iflr.setInput(bytes); - baos = new ByteArrayOutputStream(); - byte[] tmp = new byte[4096]; - try { - while (!iflr.finished()) { - int size = iflr.inflate(tmp); - baos.write(tmp, 0, size); - } - } - catch (Exception ex) { - - } - finally { - try { - if (baos != null) { - baos.close(); - } - } - catch (Exception ex) {} - } - - return baos.toByteArray(); - } - - private void createConnectionFactory() { - factory = new ConnectionFactory(); - factory.setHost(providerUrl); - factory.setPort(port); - factory.setConnectionTimeout(0); - factory.setUsername(username); - factory.setPassword(password); + private void open() throws IOException { + serversocket = new ServerSocket(port); } - private void connect() throws IOException { - connection = factory.newConnection(); - channel = connection.createChannel(); - - channel.queueDeclare(queueName, false, false, false, null); - - normalConsumer = new QueueingConsumer(channel); - channel.basicConsume(queueName, false, normalConsumer); - channel.basicQos(50); - } - - private void reconnect() { - try { - connect(); - } - catch (final IOException e) { - RabbitMQReader.LOG.error("Error reestablishing connection", e); - } - } - - private void messagesfromByteArray(final byte[] b) { + private byte[] messagesfromByteArray(final byte[] b, int readSize) { int offset = 0; - int firstValue; - while ((firstValue = Bits.getInt(b, offset)) != -1) { + while (offset < readSize) { + if ((readSize - offset) < 4) { + return createUnreadBytesArray(b, readSize, offset, false); + } + + final int clazzId = Bits.getInt(b, offset); offset += 4; + switch (clazzId) { + case 0: { + if ((readSize - offset) < (8 + 4 + 8 + 4)) { + return createUnreadBytesArray(b, readSize, offset, true); + } + } + case 1: { + if ((readSize - offset) < (8 + 8 + 4 + 4)) { + return createUnreadBytesArray(b, readSize, offset, true); + } + } + case 2: { + if ((readSize - offset) < (8 + 8 + 4 + 4 + 4)) { + return createUnreadBytesArray(b, readSize, offset, true); + } + } + case 3: { + if ((readSize - offset) < (8 + 8 + 4 + 4)) { + return createUnreadBytesArray(b, readSize, offset, true); + } + } + case 4: { + if ((readSize - offset) < (4 + 4)) { + return createUnreadBytesArray(b, readSize, offset, true); + } + } + } + IMonitoringRecord record = null; - switch (firstValue) { + switch (clazzId) { case 0: { final long traceId = Bits.getLong(b, offset); offset += 8; @@ -305,7 +253,6 @@ public final class RabbitMQReader extends AbstractReaderPlugin { offset += 8; final int parentOrderId = Bits.getInt(b, offset); offset += 4; - offset += 4; // dummy for nextOrderIndex record = new Trace(traceId, getStringFromRegistry(hostnameId), parentTraceId, @@ -357,15 +304,49 @@ public final class RabbitMQReader extends AbstractReaderPlugin { orderIndex, getStringFromRegistry(operationId)); break; } + case 4: { + final Integer mapId = Bits.getInt(b, offset); + offset += 4; + final int stringLength = Bits.getInt(b, offset); + offset += 4; + + if ((readSize - offset) < stringLength) { + return createUnreadBytesArray(b, readSize, offset - 8, + true); + } + + byte[] stringBytes = new byte[stringLength]; + System.arraycopy(b, offset, stringBytes, 0, stringLength); + String string = new String(stringBytes); + offset += stringLength; + + addToRegistry(mapId, string); + + break; + } default: { - LOG.error("unknown class id " + firstValue); + LOG.error("unknown class id " + clazzId); } } - if (!super.deliver(OUTPUT_PORT_NAME_RECORDS, record)) { + if ((record != null) + && !super.deliver(OUTPUT_PORT_NAME_RECORDS, record)) { LOG.error("deliverRecord returned false"); } } + + return null; + } + + private byte[] createUnreadBytesArray(final byte[] b, int readSize, + int offset, boolean withClazzId) { + if (withClazzId) { + offset -= 4; + } + final int unreadBytesSize = readSize - offset; + final byte[] unreadBytes = new byte[unreadBytesSize]; + System.arraycopy(b, offset, unreadBytes, 0, unreadBytesSize); + return unreadBytes; } final void unblock() { // NOPMD (package visible for inner class) @@ -377,7 +358,6 @@ public final class RabbitMQReader extends AbstractReaderPlugin { */ public void terminate(final boolean error) { LOG.info("Shutdown of RabbitMQReader requested."); - registryConsumer.interrupt(); unblock(); } @@ -390,9 +370,6 @@ public final class RabbitMQReader extends AbstractReaderPlugin { configuration .setProperty(CONFIG_PROPERTY_NAME_PROVIDERURL, providerUrl); - configuration.setProperty(CONFIG_PROPERTY_NAME_QUEUE, queueName); - configuration.setProperty(CONFIG_PROPERTY_USER, username); - configuration.setProperty(CONFIG_PROPERTY_PASSWORD, password); return configuration; } diff --git a/src/explorviz/worker/main/WorkerController.xtend b/src/explorviz/worker/main/WorkerController.xtend index 926da1644a959e5788bdeb74a4bb2b8f680db23b..eb5ae8185441d2efb7864ed34dbad15df9372f1c 100644 --- a/src/explorviz/worker/main/WorkerController.xtend +++ b/src/explorviz/worker/main/WorkerController.xtend @@ -6,11 +6,11 @@ import kieker.analysis.IAnalysisController import kieker.analysis.plugin.filter.forward.TeeFilter import kieker.analysis.plugin.filter.forward.CountingThroughputFilter -import explorviz.hpc_monitoring.reader.RabbitMQReader import kieker.analysis.plugin.reader.timer.TimeReader import explorviz.hpc_monitoring.plugin.EventRecordTraceReconstructionFilter import explorviz.hpc_monitoring.plugin.TraceEventRecordAggregationFilter import explorviz.hpc_monitoring.connector.RabbitMQConnector +import explorviz.hpc_monitoring.reader.TCPReader class WorkerController { @@ -21,38 +21,39 @@ class WorkerController { val rabbitMQ = initRabbitMQ() val eventTraceReconstructionFilter = initEventRecordTraceReconstructionFilter() - val aggregationFilter = initAggregationFilter() - val timer = initTimer() -// val countingThroughputFilter = initCountingThroughputFilter() -// val teeFilter = initTeeFilter() - val rabbitMQConnector = initRabbitMQConnector() +// val aggregationFilter = initAggregationFilter() +// val timer = initTimer() + val countingThroughputFilter = initCountingThroughputFilter() + val teeFilter = initTeeFilter() +// val rabbitMQConnector = initRabbitMQConnector() - analysisInstance.connect(rabbitMQ, RabbitMQReader::OUTPUT_PORT_NAME_RECORDS, eventTraceReconstructionFilter, + analysisInstance.connect(rabbitMQ, TCPReader::OUTPUT_PORT_NAME_RECORDS, eventTraceReconstructionFilter, EventRecordTraceReconstructionFilter::INPUT_PORT_NAME_TRACE_RECORDS) -// analysisInstance.connect(eventTraceReconstructionFilter, -// EventRecordTraceReconstructionFilter::OUTPUT_PORT_NAME_TRACE_VALID, countingThroughputFilter, -// CountingThroughputFilter::INPUT_PORT_NAME_OBJECTS) + analysisInstance.connect(eventTraceReconstructionFilter, + EventRecordTraceReconstructionFilter::OUTPUT_PORT_NAME_TRACE_VALID, countingThroughputFilter, + CountingThroughputFilter::INPUT_PORT_NAME_OBJECTS) - analysisInstance.connect(eventTraceReconstructionFilter, - EventRecordTraceReconstructionFilter::OUTPUT_PORT_NAME_TRACE_VALID, aggregationFilter, - TraceEventRecordAggregationFilter::INPUT_PORT_NAME_TRACES) - - analysisInstance.connect(eventTraceReconstructionFilter, - EventRecordTraceReconstructionFilter::OUTPUT_PORT_NAME_TRACE_INVALID, rabbitMQConnector, - RabbitMQConnector::INPUT_PORT_NAME_INVALID_TRACES) - - analysisInstance.connect(timer, - TimeReader::OUTPUT_PORT_NAME_TIMESTAMPS, aggregationFilter, - TraceEventRecordAggregationFilter::INPUT_PORT_NAME_TIME_EVENT) + analysisInstance.connect(countingThroughputFilter, + CountingThroughputFilter::OUTPUT_PORT_NAME_THROUGHPUT, teeFilter, + TeeFilter::INPUT_PORT_NAME_EVENTS) + +// analysisInstance.connect(eventTraceReconstructionFilter, +// EventRecordTraceReconstructionFilter::OUTPUT_PORT_NAME_TRACE_VALID, aggregationFilter, +// TraceEventRecordAggregationFilter::INPUT_PORT_NAME_TRACES) +// +// analysisInstance.connect(eventTraceReconstructionFilter, +// EventRecordTraceReconstructionFilter::OUTPUT_PORT_NAME_TRACE_INVALID, rabbitMQConnector, +// RabbitMQConnector::INPUT_PORT_NAME_INVALID_TRACES) +// +// analysisInstance.connect(timer, +// TimeReader::OUTPUT_PORT_NAME_TIMESTAMPS, aggregationFilter, +// TraceEventRecordAggregationFilter::INPUT_PORT_NAME_TIME_EVENT) -// analysisInstance.connect(aggregationFilter, -// TraceEventRecordAggregationFilter::OUTPUT_PORT_NAME_TRACES, teeFilter, -// TeeFilter::INPUT_PORT_NAME_EVENTS) - analysisInstance.connect(aggregationFilter, - TraceEventRecordAggregationFilter::OUTPUT_PORT_NAME_TRACES, rabbitMQConnector, - RabbitMQConnector::INPUT_PORT_NAME_VALID_TRACES) +// analysisInstance.connect(aggregationFilter, +// TraceEventRecordAggregationFilter::OUTPUT_PORT_NAME_TRACES, rabbitMQConnector, +// RabbitMQConnector::INPUT_PORT_NAME_VALID_TRACES) try { analysisInstance.run() @@ -63,15 +64,14 @@ class WorkerController { def initRabbitMQ() { val rabbitConfig = new Configuration() - rabbitConfig.setProperty(RabbitMQReader::CONFIG_PROPERTY_NAME_PROVIDERURL, "localhost") - rabbitConfig.setProperty(RabbitMQReader::CONFIG_PROPERTY_NAME_QUEUE, "kieker") - new RabbitMQReader(rabbitConfig, analysisInstance) + rabbitConfig.setProperty(TCPReader::CONFIG_PROPERTY_NAME_PROVIDERURL, "127.0.0.1") + new TCPReader(rabbitConfig, analysisInstance) } def initRabbitMQConnector() { val rabbitConfig = new Configuration() - rabbitConfig.setProperty(RabbitMQReader::CONFIG_PROPERTY_NAME_PROVIDERURL, "localhost") - rabbitConfig.setProperty(RabbitMQReader::CONFIG_PROPERTY_NAME_QUEUE, "validTraces") + rabbitConfig.setProperty(RabbitMQConnector::CONFIG_PROPERTY_NAME_PROVIDER, "localhost") + rabbitConfig.setProperty(RabbitMQConnector::CONFIG_PROPERTY_NAME_QUEUE, "validTraces") new RabbitMQConnector(rabbitConfig, analysisInstance) }