diff --git a/.classpath b/.classpath index a5354eb322bde45a25257429317c45144b513fae..f0138a29495626303916ca763b8d879b126948b5 100644 --- a/.classpath +++ b/.classpath @@ -2,9 +2,9 @@ <classpath> <classpathentry kind="con" path="org.eclipse.xtend.XTEND_CONTAINER"/> <classpathentry kind="src" path="xtend-gen"/> - <classpathentry excluding="kieker/analysis/plugin/reader/mq/|explorviz/hpc_monitoring/connector/" kind="src" path="src"/> + <classpathentry excluding="kieker/analysis/plugin/reader/mq/" kind="src" path="src"/> <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/> - <classpathentry kind="lib" path="lib/kieker-1.8-SNAPSHOT_fast.jar"/> <classpathentry combineaccessrules="false" kind="src" path="/monitored-application"/> + <classpathentry kind="lib" path="lib/kieker-1.8-SNAPSHOT_fast.jar"/> <classpathentry kind="output" path="bin"/> </classpath> diff --git a/.settings/org.eclipse.jdt.ui.prefs b/.settings/org.eclipse.jdt.ui.prefs new file mode 100644 index 0000000000000000000000000000000000000000..7279228defdbf5e72938e172b331c88f19f5cc65 --- /dev/null +++ b/.settings/org.eclipse.jdt.ui.prefs @@ -0,0 +1,54 @@ +eclipse.preferences.version=1 +editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true +sp_cleanup.add_default_serial_version_id=true +sp_cleanup.add_generated_serial_version_id=false +sp_cleanup.add_missing_annotations=true +sp_cleanup.add_missing_deprecated_annotations=true +sp_cleanup.add_missing_methods=false +sp_cleanup.add_missing_nls_tags=false +sp_cleanup.add_missing_override_annotations=true +sp_cleanup.add_missing_override_annotations_interface_methods=true +sp_cleanup.add_serial_version_id=false +sp_cleanup.always_use_blocks=true +sp_cleanup.always_use_parentheses_in_expressions=true +sp_cleanup.always_use_this_for_non_static_field_access=false +sp_cleanup.always_use_this_for_non_static_method_access=false +sp_cleanup.convert_to_enhanced_for_loop=true +sp_cleanup.correct_indentation=true +sp_cleanup.format_source_code=true +sp_cleanup.format_source_code_changes_only=false +sp_cleanup.make_local_variable_final=true +sp_cleanup.make_parameters_final=true +sp_cleanup.make_private_fields_final=true +sp_cleanup.make_type_abstract_if_missing_method=false +sp_cleanup.make_variable_declarations_final=true +sp_cleanup.never_use_blocks=false +sp_cleanup.never_use_parentheses_in_expressions=false +sp_cleanup.on_save_use_additional_actions=true +sp_cleanup.organize_imports=true +sp_cleanup.qualify_static_field_accesses_with_declaring_class=false +sp_cleanup.qualify_static_member_accesses_through_instances_with_declaring_class=true +sp_cleanup.qualify_static_member_accesses_through_subtypes_with_declaring_class=true +sp_cleanup.qualify_static_member_accesses_with_declaring_class=false +sp_cleanup.qualify_static_method_accesses_with_declaring_class=false +sp_cleanup.remove_private_constructors=true +sp_cleanup.remove_trailing_whitespaces=true +sp_cleanup.remove_trailing_whitespaces_all=true +sp_cleanup.remove_trailing_whitespaces_ignore_empty=false +sp_cleanup.remove_unnecessary_casts=true +sp_cleanup.remove_unnecessary_nls_tags=true +sp_cleanup.remove_unused_imports=true +sp_cleanup.remove_unused_local_variables=false +sp_cleanup.remove_unused_private_fields=true +sp_cleanup.remove_unused_private_members=false +sp_cleanup.remove_unused_private_methods=true +sp_cleanup.remove_unused_private_types=true +sp_cleanup.sort_members=false +sp_cleanup.sort_members_all=false +sp_cleanup.use_blocks=true +sp_cleanup.use_blocks_only_for_return_and_throw=false +sp_cleanup.use_parentheses_in_expressions=true +sp_cleanup.use_this_for_non_static_field_access=true +sp_cleanup.use_this_for_non_static_field_access_only_if_necessary=true +sp_cleanup.use_this_for_non_static_method_access=true +sp_cleanup.use_this_for_non_static_method_access_only_if_necessary=true diff --git a/src/explorviz/hpc_monitoring/connector/RabbitMQConnector.java b/src/explorviz/hpc_monitoring/connector/RabbitMQConnector.java deleted file mode 100644 index f46f6703e4767cc19e5e5ae9c75218367ca3a4e2..0000000000000000000000000000000000000000 --- a/src/explorviz/hpc_monitoring/connector/RabbitMQConnector.java +++ /dev/null @@ -1,459 +0,0 @@ -/*************************************************************************** - * Copyright 2013 Kieker Project (http://kieker-monitoring.net) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - ***************************************************************************/ - -package explorviz.hpc_monitoring.connector; - -import java.io.*; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicInteger; -import kieker.analysis.IProjectContext; -import kieker.analysis.plugin.annotation.*; -import kieker.analysis.plugin.filter.AbstractFilterPlugin; -import kieker.common.configuration.Configuration; -import kieker.common.logging.Log; -import kieker.common.logging.LogFactory; -import kieker.common.record.IMonitoringRecord; -import com.rabbitmq.client.*; -import explorviz.hpc_monitoring.Bits; -import explorviz.hpc_monitoring.StringRegistryRecord; - -/** - * A plugin used for kieker in the cloud. - * All incoming events are put into a RabbitMQ, but are also passed to an output - * port that can be used for - * testing purposes. - * - * @author Santje Finke - * - * @since 1.8 - * - */ -@Plugin(description = "A filter that writes all incoming events into a specified queue on a specified RabbitMQServer", outputPorts = { @OutputPort(name = RabbitMQConnector.OUTPUT_PORT_NAME, eventTypes = { Object.class }, description = "Provides each incoming object") }, configuration = { - @Property(name = RabbitMQConnector.CONFIG_PROPERTY_NAME_PROVIDER, defaultValue = "localhost"), - @Property(name = RabbitMQConnector.CONFIG_PROPERTY_NAME_QUEUE, defaultValue = "master"), - @Property(name = RabbitMQConnector.CONFIG_PROPERTY_PASSWORD, defaultValue = "guest"), - @Property(name = RabbitMQConnector.CONFIG_PROPERTY_USER, defaultValue = "guest") }) -public class RabbitMQConnector extends AbstractFilterPlugin { - private static final int MESSAGE_BUFFER_SIZE = 1500; - - /** - * The name of the input port receiving the incoming events. - */ - public static final String INPUT_PORT_NAME_INVALID_TRACES = "inputInvalidTraces"; - public static final String INPUT_PORT_NAME_VALID_TRACES = "inputValidTraces"; - - /** - * The name of the output port passing the incoming events. - */ - public static final String OUTPUT_PORT_NAME = "relayedEvents"; - /** - * The name of the property determining the address of the used Server. - */ - public static final String CONFIG_PROPERTY_NAME_PROVIDER = "providerUrl"; - /** - * The name of the property determining the name of the Queue. - */ - public static final String CONFIG_PROPERTY_NAME_QUEUE = "queueName"; - - /** - * The username that is used to connect to a queue. - */ - public static final String CONFIG_PROPERTY_USER = "guest"; - /** - * The password that is used to connect to a queue. - */ - public static final String CONFIG_PROPERTY_PASSWORD = "guest"; - - private static final Log LOG = LogFactory - .getLog(RabbitMQConnector.class); - - private ConnectionFactory factory; - private Connection connection; - - private final String providerUrl; - private Channel channel; - private final String queue; - - private String username = "guest"; - private String password = "guest"; - - private final byte[] validTracesMessages = new byte[MESSAGE_BUFFER_SIZE]; - private int validTracesMessagesOffset = 0; - - private final byte[] invalidTracesMessages = new byte[MESSAGE_BUFFER_SIZE]; - private int invalidTracesMessagesOffset = 0; - - private final ConcurrentHashMap<String, Integer> stringReg = new ConcurrentHashMap<String, Integer>( - 16, - 0.75f, - 2); - - private final AtomicInteger stringRegIndex = new AtomicInteger( - 0); - - public RabbitMQConnector(final Configuration configuration, - final IProjectContext projectContext) { - super(configuration, projectContext); - providerUrl = configuration - .getStringProperty(CONFIG_PROPERTY_NAME_PROVIDER); - queue = configuration.getStringProperty(CONFIG_PROPERTY_NAME_QUEUE); - username = configuration.getStringProperty(CONFIG_PROPERTY_USER); - password = configuration.getStringProperty(CONFIG_PROPERTY_PASSWORD); - createConnectionFactory(providerUrl); - try { - connect(); - } - catch (IOException e) { - e.printStackTrace(); - } - } - - private void createConnectionFactory(final String provider) { - factory = new ConnectionFactory(); - factory.setHost(provider); - factory.setConnectionTimeout(0); - factory.setUsername(username); - factory.setPassword(password); - } - - private void connect() throws IOException { - connection = factory.newConnection(); - channel = connection.createChannel(); - channel.queueDeclare("validTracesMaster", false, false, false, null); - channel.queueDeclare("invalidTracesMaster", false, false, false, null); - channel.queueDeclare("registryRecordsMaster", false, false, false, null); - } - - private void sendRegistryRecord(StringRegistryRecord record) { - // TODO propagate to each new rabbitMQ queue - final ByteArrayOutputStream boas = new ByteArrayOutputStream(); - - ObjectOutputStream out; - try { - out = new ObjectOutputStream(boas); - out.writeObject(record); - out.close(); - - byte[] message2 = boas.toByteArray(); - sendMessage(message2, "registryRecordsMaster"); - } - catch (IOException e) { - e.printStackTrace(); - } - } - - /** - * This method represents the input port of this filter. - * - * @param event - * The next event. - */ - @InputPort(name = INPUT_PORT_NAME_VALID_TRACES, eventTypes = { Object.class }, description = "Receives incoming objects to be forwarded to a queue") - public final void inputValidTraces(final Object monitoringRecord) { - try { - // if (monitoringRecord instanceof IMonitoringRecord) { - // byte[] message2 = toByteArray((IMonitoringRecord) - // monitoringRecord); - - final ByteArrayOutputStream boas = new ByteArrayOutputStream(); - - ObjectOutputStream out = new ObjectOutputStream(boas); - out.writeObject(monitoringRecord); - out.close(); - - byte[] message2 = boas.toByteArray(); // TODO optimize - - System.arraycopy(message2, 0, validTracesMessages, - validTracesMessagesOffset, message2.length); - validTracesMessagesOffset += message2.length; - - if ((validTracesMessagesOffset > (MESSAGE_BUFFER_SIZE - MESSAGE_BUFFER_SIZE))) { // TODO - // unsafe - // 200 - // || - // writeQueue.isEmpty() - Bits.putInt(validTracesMessages, validTracesMessagesOffset, -1); - sendMessage(validTracesMessages, "validTracesMaster"); - validTracesMessagesOffset = 0; - } - } - catch (final IOException e) { - LOG.error("Error sending record", e); - } - } - - /** - * This method represents the input port of this filter. - * - * @param event - * The next event. - */ - @InputPort(name = INPUT_PORT_NAME_INVALID_TRACES, eventTypes = { Object.class }, description = "Receives incoming objects to be forwarded to a queue") - public final void inputInvalidTraces(final Object monitoringRecord) { - try { - // if (monitoringRecord instanceof IMonitoringRecord) { - // byte[] message2 = toByteArray((IMonitoringRecord) - // monitoringRecord); - - final ByteArrayOutputStream boas = new ByteArrayOutputStream(); - - ObjectOutputStream out = new ObjectOutputStream(boas); - out.writeObject(monitoringRecord); - out.close(); - - byte[] message2 = boas.toByteArray(); // TODO optimize - - System.arraycopy(message2, 0, invalidTracesMessages, - invalidTracesMessagesOffset, message2.length); - invalidTracesMessagesOffset += message2.length; - - if ((invalidTracesMessagesOffset > (MESSAGE_BUFFER_SIZE - MESSAGE_BUFFER_SIZE))) { // TODO - // unsafe - // 200 - // || - // writeQueue.isEmpty() - Bits.putInt(invalidTracesMessages, invalidTracesMessagesOffset, - -1); - sendMessage(invalidTracesMessages, "invalidTracesMaster"); - invalidTracesMessagesOffset = 0; - } - } - catch (final IOException e) { - LOG.error("Error sending record", e); - } - } - - @SuppressWarnings("unused") - private byte[] toByteArray(final IMonitoringRecord monitoringRecord) { - final Class<?>[] recordTypes = monitoringRecord.getValueTypes(); - - int arraySize = 4 + 8; - - for (Class<?> recordType : recordTypes) { - if (recordType == String.class) { - arraySize += 4; - } - else if ((recordType == Integer.class) || (recordType == int.class)) { - arraySize += 4; - } - else if ((recordType == Long.class) || (recordType == long.class)) { - arraySize += 8; - } - else if ((recordType == Float.class) || (recordType == float.class)) { - arraySize += 4; - } - else if ((recordType == Double.class) - || (recordType == double.class)) { - arraySize += 8; - } - else if ((recordType == Byte.class) || (recordType == byte.class)) { - arraySize += 1; - } - else if ((recordType == Short.class) || (recordType == short.class)) { - arraySize += 2; - } - else if ((recordType == Boolean.class) - || (recordType == boolean.class)) { - arraySize += 1; - } - else { - arraySize += 1; - } - } - - byte[] result = new byte[arraySize]; - int offset = 0; - - Bits.putInt(result, offset, getIdForString(monitoringRecord.getClass() - .getName())); - offset += 4; - - Bits.putLong(result, offset, monitoringRecord.getLoggingTimestamp()); - offset += 8; - final Object[] recordFields = monitoringRecord.toArray(); - - for (int i = 0; i < recordFields.length; i++) { - if (recordFields[i] == null) { - if (recordTypes[i] == String.class) { - Bits.putInt(result, offset, getIdForString("")); - offset += 4; - } - else if ((recordTypes[i] == Integer.class) - || (recordTypes[i] == int.class)) { - Bits.putInt(result, offset, 0); - offset += 4; - } - else if ((recordTypes[i] == Long.class) - || (recordTypes[i] == long.class)) { - Bits.putLong(result, offset, 0L); - offset += 8; - } - else if ((recordTypes[i] == Float.class) - || (recordTypes[i] == float.class)) { - Bits.putFloat(result, offset, 0); - offset += 4; - } - else if ((recordTypes[i] == Double.class) - || (recordTypes[i] == double.class)) { - Bits.putDouble(result, offset, 0); - offset += 8; - } - else if ((recordTypes[i] == Byte.class) - || (recordTypes[i] == byte.class)) { - Bits.putByte(result, offset, (byte) 0); - offset += 1; - } - else if ((recordTypes[i] == Short.class) - || (recordTypes[i] == short.class)) { - Bits.putShort(result, offset, (short) 0); - offset += 2; - } - else if ((recordTypes[i] == Boolean.class) - || (recordTypes[i] == boolean.class)) { - Bits.putBoolean(result, offset, false); - offset += 1; - } - else { - LOG.warn("Record with unsupported recordField of type " - + recordFields[i].getClass()); - Bits.putByte(result, offset, (byte) 0); - offset += 1; - } - } - else if (recordFields[i] instanceof String) { - Bits.putInt(result, offset, - getIdForString((String) recordFields[i])); - offset += 4; - } - else if (recordFields[i] instanceof Integer) { - Bits.putInt(result, offset, (Integer) recordFields[i]); - offset += 4; - } - else if (recordFields[i] instanceof Long) { - Bits.putLong(result, offset, (Long) recordFields[i]); - offset += 8; - } - else if (recordFields[i] instanceof Float) { - Bits.putFloat(result, offset, (Float) recordFields[i]); - offset += 4; - } - else if (recordFields[i] instanceof Double) { - Bits.putDouble(result, offset, (Double) recordFields[i]); - offset += 8; - } - else if (recordFields[i] instanceof Byte) { - Bits.putByte(result, offset, (Byte) recordFields[i]); - offset += 1; - } - else if (recordFields[i] instanceof Short) { - Bits.putShort(result, offset, (Short) recordFields[i]); - offset += 2; - } - else if (recordFields[i] instanceof Boolean) { - Bits.putBoolean(result, offset, (Boolean) recordFields[i]); - offset += 1; - } - else { - LOG.warn("Record with unsupported recordField of type " - + recordFields[i].getClass()); - Bits.putByte(result, offset, (byte) 0); - offset += 1; - } - } - - return result; - } - - public int getIdForString(String value) { - Integer result = stringReg.get(value); - - if (result == null) { - result = stringRegIndex.getAndIncrement(); - stringReg.put(value, result); - sendRegistryRecord(new StringRegistryRecord(result, value)); - } - - return result; - } - - private void sendMessage(final byte[] message, String queueName) - throws IOException { - - synchronized (this) { - if (!connection.isOpen() || !channel.isOpen()) { - connect(); - } - channel.basicPublish("", queueName, MessageProperties.BASIC, - message); - } - } - - protected final void cleanup() { - disconnect(); - } - - private void disconnect() { - try { - if (channel != null) { - channel.close(); - } - } - catch (final IOException e) { - LOG.info("Error closing connection", e); - } - - try { - if (connection != null) { - connection.close(); - } - } - catch (final IOException e) { - LOG.info("Error closing connection", e); - } - } - - @Override - public final String toString() { - final StringBuilder sb = new StringBuilder(128); - sb.append(super.toString()); - sb.append("; Channel: '"); - if (null != channel) { - sb.append(channel.toString()); - } - else { - sb.append("null"); - } - sb.append("'; Connection: '"); - if (null != connection) { - sb.append(connection.toString()); - } - else { - sb.append("null"); - } - sb.append('\''); - return sb.toString(); - } - - @Override - public Configuration getCurrentConfiguration() { - final Configuration configuration = new Configuration(); - configuration.setProperty(CONFIG_PROPERTY_NAME_PROVIDER, providerUrl); - configuration.setProperty(CONFIG_PROPERTY_NAME_QUEUE, queue); - configuration.setProperty(CONFIG_PROPERTY_PASSWORD, password); - configuration.setProperty(CONFIG_PROPERTY_USER, username); - return configuration; - } -} diff --git a/src/explorviz/hpc_monitoring/connector/TCPConnector.java b/src/explorviz/hpc_monitoring/connector/TCPConnector.java new file mode 100644 index 0000000000000000000000000000000000000000..0f552c60c9a4e59148af2ab617d947eb543b134d --- /dev/null +++ b/src/explorviz/hpc_monitoring/connector/TCPConnector.java @@ -0,0 +1,216 @@ +/*************************************************************************** + * Copyright 2013 Kieker Project (http://kieker-monitoring.net) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ***************************************************************************/ + +package explorviz.hpc_monitoring.connector; + +import java.io.*; +import kieker.analysis.IProjectContext; +import kieker.analysis.plugin.annotation.*; +import kieker.analysis.plugin.filter.AbstractFilterPlugin; +import kieker.common.configuration.Configuration; +import kieker.common.logging.Log; +import kieker.common.logging.LogFactory; +import kieker.common.record.IMonitoringRecord; +import explorviz.hpc_monitoring.byteaccess.UnsafeBits; + +/** + * A plugin used for kieker in the cloud. + * All incoming events are put into a RabbitMQ, but are also passed to an output + * port that can be used for + * testing purposes. + * + * @author Santje Finke + * + * @since 1.8 + * + */ +@Plugin(description = "A filter that writes all incoming events into a specified queue on a specified RabbitMQServer", outputPorts = { @OutputPort(name = TCPConnector.OUTPUT_PORT_NAME, eventTypes = { Object.class }, description = "Provides each incoming object") }, configuration = { + @Property(name = TCPConnector.CONFIG_PROPERTY_NAME_PROVIDER, defaultValue = "localhost"), + @Property(name = TCPConnector.CONFIG_PROPERTY_NAME_QUEUE, defaultValue = "master"), + @Property(name = TCPConnector.CONFIG_PROPERTY_PASSWORD, defaultValue = "guest"), + @Property(name = TCPConnector.CONFIG_PROPERTY_USER, defaultValue = "guest") }) +public class TCPConnector extends AbstractFilterPlugin { + private static final int MESSAGE_BUFFER_SIZE = 1500; + + /** + * The name of the input port receiving the incoming events. + */ + public static final String INPUT_PORT_NAME_INVALID_TRACES = "inputInvalidTraces"; + public static final String INPUT_PORT_NAME_VALID_TRACES = "inputValidTraces"; + + /** + * The name of the output port passing the incoming events. + */ + public static final String OUTPUT_PORT_NAME = "relayedEvents"; + /** + * The name of the property determining the address of the used Server. + */ + public static final String CONFIG_PROPERTY_NAME_PROVIDER = "providerUrl"; + /** + * The name of the property determining the name of the Queue. + */ + public static final String CONFIG_PROPERTY_NAME_QUEUE = "queueName"; + + /** + * The username that is used to connect to a queue. + */ + public static final String CONFIG_PROPERTY_USER = "guest"; + /** + * The password that is used to connect to a queue. + */ + public static final String CONFIG_PROPERTY_PASSWORD = "guest"; + + private static final Log LOG = LogFactory + .getLog(TCPConnector.class); + + private final String providerUrl; + + private final byte[] validTracesMessages = new byte[MESSAGE_BUFFER_SIZE]; + private int validTracesMessagesOffset = 0; + + private final byte[] invalidTracesMessages = new byte[MESSAGE_BUFFER_SIZE]; + private int invalidTracesMessagesOffset = 0; + + public TCPConnector(final Configuration configuration, + final IProjectContext projectContext) { + super(configuration, projectContext); + providerUrl = configuration + .getStringProperty(CONFIG_PROPERTY_NAME_PROVIDER); + try { + connect(); + } + catch (final IOException e) { + e.printStackTrace(); + } + } + + private void connect() throws IOException {} + + /** + * This method represents the input port of this filter. + * + * @param event + * The next event. + */ + @InputPort(name = INPUT_PORT_NAME_VALID_TRACES, eventTypes = { Object.class }, description = "Receives incoming objects to be forwarded to a queue") + public final void inputValidTraces(final Object monitoringRecord) { + try { + // if (monitoringRecord instanceof IMonitoringRecord) { + // byte[] message2 = toByteArray((IMonitoringRecord) + // monitoringRecord); + + final ByteArrayOutputStream boas = new ByteArrayOutputStream(); + + final ObjectOutputStream out = new ObjectOutputStream(boas); + out.writeObject(monitoringRecord); + out.close(); + + final byte[] message2 = boas.toByteArray(); // TODO optimize + + System.arraycopy(message2, 0, validTracesMessages, + validTracesMessagesOffset, message2.length); + validTracesMessagesOffset += message2.length; + + if ((validTracesMessagesOffset > (MESSAGE_BUFFER_SIZE - MESSAGE_BUFFER_SIZE))) { // TODO + // unsafe + // 200 + // || + // writeQueue.isEmpty() + UnsafeBits.putInt(validTracesMessages, + validTracesMessagesOffset, -1); + sendMessage(validTracesMessages, "validTracesMaster"); + validTracesMessagesOffset = 0; + } + } + catch (final IOException e) { + LOG.error("Error sending record", e); + } + } + + /** + * This method represents the input port of this filter. + * + * @param event + * The next event. + */ + @InputPort(name = INPUT_PORT_NAME_INVALID_TRACES, eventTypes = { Object.class }, description = "Receives incoming objects to be forwarded to a queue") + public final void inputInvalidTraces(final Object monitoringRecord) { + try { + // if (monitoringRecord instanceof IMonitoringRecord) { + // byte[] message2 = toByteArray((IMonitoringRecord) + // monitoringRecord); + + final ByteArrayOutputStream boas = new ByteArrayOutputStream(); + + final ObjectOutputStream out = new ObjectOutputStream(boas); + out.writeObject(monitoringRecord); + out.close(); + + final byte[] message2 = boas.toByteArray(); // TODO optimize + + System.arraycopy(message2, 0, invalidTracesMessages, + invalidTracesMessagesOffset, message2.length); + invalidTracesMessagesOffset += message2.length; + + if ((invalidTracesMessagesOffset > (MESSAGE_BUFFER_SIZE - MESSAGE_BUFFER_SIZE))) { // TODO + // unsafe + // 200 + // || + // writeQueue.isEmpty() + UnsafeBits.putInt(invalidTracesMessages, + invalidTracesMessagesOffset, -1); + sendMessage(invalidTracesMessages, "invalidTracesMaster"); + invalidTracesMessagesOffset = 0; + } + } + catch (final IOException e) { + LOG.error("Error sending record", e); + } + } + + @SuppressWarnings("unused") + private byte[] toByteArray(final IMonitoringRecord monitoringRecord) { + return new byte[1]; // TODO + } + + private void sendMessage(final byte[] message, final String queueName) + throws IOException { + + synchronized (this) {} + } + + protected final void cleanup() { + disconnect(); + } + + private void disconnect() { + + } + + @Override + public final String toString() { + final StringBuilder sb = new StringBuilder(128); + + return sb.toString(); + } + + @Override + public Configuration getCurrentConfiguration() { + final Configuration configuration = new Configuration(); + configuration.setProperty(CONFIG_PROPERTY_NAME_PROVIDER, providerUrl); + return configuration; + } +} diff --git a/src/explorviz/hpc_monitoring/reader/TCPReader.java b/src/explorviz/hpc_monitoring/reader/TCPReader.java index 389c22c8011b05d6f91056140029f1c63e3571fd..a6bb4d63c8539c160db0613b09db93780aa1776f 100644 --- a/src/explorviz/hpc_monitoring/reader/TCPReader.java +++ b/src/explorviz/hpc_monitoring/reader/TCPReader.java @@ -30,7 +30,7 @@ import kieker.common.configuration.Configuration; import kieker.common.logging.Log; import kieker.common.logging.LogFactory; import kieker.common.record.IMonitoringRecord; -import explorviz.hpc_monitoring.UnsafeBits; +import explorviz.hpc_monitoring.byteaccess.UnsafeBits; import explorviz.hpc_monitoring.record.Trace; import explorviz.hpc_monitoring.record.events.*; @@ -120,40 +120,12 @@ public final class TCPReader extends AbstractReaderPlugin { // registryConsumer.start(); } - /** - * Creates a new instance of this class using the given parameters. - * - * @param configuration - * The configuration used to initialize the whole reader. Keep in - * mind that the configuration should contain the following - * properties: - * <ul> - * <li>The property {@link #CONFIG_PROPERTY_NAME_PROVIDERURL}, - * e.g. {@code localhost} - * <li>The property {@link #CONFIG_PROPERTY_NAME_QUEUE}, e.g. - * {@code queue1} - * <li>The property {@link #CONFIG_PROPERTY_PASSWORD}, e.g. - * {@code password} - * <li>The property {@link #CONFIG_PROPERTY_USER}, e.g. - * {@code username} - * </ul> - * - * @throws IllegalArgumentException - * If one of the properties is empty. - * - * @deprecated To be removed in Kieker 1.8. - */ - @Deprecated - public TCPReader(final Configuration configuration) - throws IllegalArgumentException { - this(configuration, null); - } - /** * A call to this method is a blocking call. * * @return true if the method succeeds, false otherwise. */ + @Override public boolean read() { boolean retVal = true; try { @@ -161,13 +133,13 @@ public final class TCPReader extends AbstractReaderPlugin { while (active) { // TODO only one connection! final Socket socket = serversocket.accept(); - BufferedInputStream bufferedInputStream = new BufferedInputStream( + final BufferedInputStream bufferedInputStream = new BufferedInputStream( socket.getInputStream(), MESSAGE_BUFFER_SIZE); int readSize = 0; int toReadOffset = 0; while ((readSize = bufferedInputStream.read(messages, toReadOffset, MESSAGE_BUFFER_SIZE - toReadOffset)) != -1) { - byte[] unreadBytes = messagesfromByteArray(messages, + final byte[] unreadBytes = messagesfromByteArray(messages, readSize + toReadOffset); if (unreadBytes != null) { toReadOffset = unreadBytes.length; @@ -202,7 +174,7 @@ public final class TCPReader extends AbstractReaderPlugin { serversocket = new ServerSocket(port); } - private byte[] messagesfromByteArray(final byte[] b, int readSize) { + private byte[] messagesfromByteArray(final byte[] b, final int readSize) { int offset = 0; while (offset < readSize) { @@ -213,38 +185,14 @@ public final class TCPReader extends AbstractReaderPlugin { final int clazzId = UnsafeBits.getInt(b, offset); offset += 4; + IMonitoringRecord record = null; + switch (clazzId) { case 0: { - if ((readSize - offset) < (8 + 4 + 8 + 4)) { - return createUnreadBytesArray(b, readSize, offset, true); - } - } - case 1: { - if ((readSize - offset) < (8 + 8 + 4 + 4)) { - return createUnreadBytesArray(b, readSize, offset, true); - } - } - case 2: { - if ((readSize - offset) < (8 + 8 + 4 + 4 + 4)) { - return createUnreadBytesArray(b, readSize, offset, true); - } - } - case 3: { - if ((readSize - offset) < (8 + 8 + 4 + 4)) { - return createUnreadBytesArray(b, readSize, offset, true); - } - } - case 4: { - if ((readSize - offset) < (4 + 4)) { + if ((readSize - offset) < (8 + 4 + 8 + 4 + 4)) { return createUnreadBytesArray(b, readSize, offset, true); } - } - } - IMonitoringRecord record = null; - - switch (clazzId) { - case 0: { final long traceId = UnsafeBits.getLong(b, offset); offset += 8; final Integer hostnameId = UnsafeBits.getInt(b, offset); @@ -253,13 +201,19 @@ public final class TCPReader extends AbstractReaderPlugin { offset += 8; final int parentOrderId = UnsafeBits.getInt(b, offset); offset += 4; + final Integer applicationId = UnsafeBits.getInt(b, offset); + offset += 4; record = new Trace(traceId, getStringFromRegistry(hostnameId), parentTraceId, - parentOrderId); + parentOrderId, getStringFromRegistry(applicationId)); break; } case 1: { + if ((readSize - offset) < (8 + 8 + 4 + 4)) { + return createUnreadBytesArray(b, readSize, offset, true); + } + final long timestamp = UnsafeBits.getLong(b, offset); offset += 8; final long traceId = UnsafeBits.getLong(b, offset); @@ -274,6 +228,10 @@ public final class TCPReader extends AbstractReaderPlugin { break; } case 2: { + if ((readSize - offset) < (8 + 8 + 4 + 4 + 4)) { + return createUnreadBytesArray(b, readSize, offset, true); + } + final long timestamp = UnsafeBits.getLong(b, offset); offset += 8; final long traceId = UnsafeBits.getLong(b, offset); @@ -291,6 +249,10 @@ public final class TCPReader extends AbstractReaderPlugin { break; } case 3: { + if ((readSize - offset) < (8 + 8 + 4 + 4)) { + return createUnreadBytesArray(b, readSize, offset, true); + } + final long timestamp = UnsafeBits.getLong(b, offset); offset += 8; final long traceId = UnsafeBits.getLong(b, offset); @@ -305,6 +267,10 @@ public final class TCPReader extends AbstractReaderPlugin { break; } case 4: { + if ((readSize - offset) < (4 + 4)) { + return createUnreadBytesArray(b, readSize, offset, true); + } + final Integer mapId = UnsafeBits.getInt(b, offset); offset += 4; final int stringLength = UnsafeBits.getInt(b, offset); @@ -315,9 +281,9 @@ public final class TCPReader extends AbstractReaderPlugin { true); } - byte[] stringBytes = new byte[stringLength]; + final byte[] stringBytes = new byte[stringLength]; System.arraycopy(b, offset, stringBytes, 0, stringLength); - String string = new String(stringBytes); + final String string = new String(stringBytes); offset += stringLength; addToRegistry(mapId, string); @@ -338,8 +304,8 @@ public final class TCPReader extends AbstractReaderPlugin { return null; } - private byte[] createUnreadBytesArray(final byte[] b, int readSize, - int offset, boolean withClazzId) { + private byte[] createUnreadBytesArray(final byte[] b, final int readSize, + int offset, final boolean withClazzId) { if (withClazzId) { offset -= 4; } @@ -356,6 +322,7 @@ public final class TCPReader extends AbstractReaderPlugin { /** * {@inheritDoc} */ + @Override public void terminate(final boolean error) { LOG.info("Shutdown of RabbitMQReader requested."); unblock(); @@ -374,7 +341,7 @@ public final class TCPReader extends AbstractReaderPlugin { return configuration; } - public void addToRegistry(Integer key, String value) { + public void addToRegistry(final Integer key, final String value) { stringRegistry.put(key, value); synchronized (this) { @@ -382,7 +349,7 @@ public final class TCPReader extends AbstractReaderPlugin { } } - private String getStringFromRegistry(Integer id) { + private String getStringFromRegistry(final Integer id) { String result = stringRegistry.get(id); while (result == null) { try { @@ -391,7 +358,7 @@ public final class TCPReader extends AbstractReaderPlugin { this.wait(); } } - catch (InterruptedException e) { + catch (final InterruptedException e) { e.printStackTrace(); } result = stringRegistry.get(id); diff --git a/src/explorviz/hpc_monitoring/record/Trace.java b/src/explorviz/hpc_monitoring/record/Trace.java index 292dceaf27dd48743160a3e4710e177d7ce44afb..2c631c42ea02f9a33b41062386f765399d151bcd 100644 --- a/src/explorviz/hpc_monitoring/record/Trace.java +++ b/src/explorviz/hpc_monitoring/record/Trace.java @@ -10,13 +10,16 @@ public class Trace implements IFlowRecord { private final String hostname; private final long parentTraceId; private final int parentOrderId; + private final String application; - public Trace(long traceId, String hostname, long parentTraceId, - int parentOrderId) { + public Trace(final long traceId, final String hostname, + final long parentTraceId, final int parentOrderId, + final String application) { this.traceId = traceId; this.hostname = hostname; this.parentTraceId = parentTraceId; this.parentOrderId = parentOrderId; + this.application = application; } @Override @@ -30,12 +33,12 @@ public class Trace implements IFlowRecord { } @Override - public void initFromArray(Object[] arg0) { + public void initFromArray(final Object[] arg0) { throw new UnsupportedOperationException(); } @Override - public void setLoggingTimestamp(long arg0) { + public void setLoggingTimestamp(final long arg0) { throw new UnsupportedOperationException(); } @@ -45,7 +48,7 @@ public class Trace implements IFlowRecord { } @Override - public int compareTo(IMonitoringRecord o) { + public int compareTo(final IMonitoringRecord o) { throw new UnsupportedOperationException(); } @@ -57,10 +60,6 @@ public class Trace implements IFlowRecord { return hostname; } - public String getApplication() { - return "MonitoredApplication"; // TODO - } - public long getParentTraceId() { return parentTraceId; } @@ -68,4 +67,9 @@ public class Trace implements IFlowRecord { public int getParentOrderId() { return parentOrderId; } + + public String getApplication() { + return application; + } + } diff --git a/src/explorviz/worker/main/WorkerController.xtend b/src/explorviz/worker/main/WorkerController.xtend index 05b43cafabd867a150fa4e90f88308bd4c682c84..ff55fb9618b7a6c57324fad2a9a3c44d111470b5 100644 --- a/src/explorviz/worker/main/WorkerController.xtend +++ b/src/explorviz/worker/main/WorkerController.xtend @@ -10,55 +10,92 @@ import explorviz.hpc_monitoring.filter.EventRecordTraceReconstructionFilter import explorviz.hpc_monitoring.filter.TraceEventRecordAggregationFilter import explorviz.hpc_monitoring.reader.TCPReader import explorviz.hpc_monitoring.filter.CountingThroughputFilter +import explorviz.hpc_monitoring.connector.TCPConnector class WorkerController { var IAnalysisController analysisInstance - def start() { + def startWithCountingRecordsThroughput() { analysisInstance = new AnalysisController() val tcpReader = initTCPReader() - val countingThroughputFilter = initCountingThroughputFilter() - val teeFilter = initTeeFilter() - analysisInstance.connect(tcpReader, TCPReader::OUTPUT_PORT_NAME_RECORDS, countingThroughputFilter, - CountingThroughputFilter::INPUT_PORT_NAME_OBJECTS) - - analysisInstance.connect(countingThroughputFilter, - CountingThroughputFilter::OUTPUT_PORT_NAME_THROUGHPUT, teeFilter, - TeeFilter::INPUT_PORT_NAME_EVENTS) - -// analysisInstance.connect(eventTraceReconstructionFilter, -// EventRecordTraceReconstructionFilter::OUTPUT_PORT_NAME_TRACE_VALID, aggregationFilter, -// TraceEventRecordAggregationFilter::INPUT_PORT_NAME_TRACES) - -// analysisInstance.connect(eventTraceReconstructionFilter, -// EventRecordTraceReconstructionFilter::OUTPUT_PORT_NAME_TRACE_INVALID, tcpConnector, -// RabbitMQConnector::INPUT_PORT_NAME_INVALID_TRACES) -// -// analysisInstance.connect(timer, TimeReader::OUTPUT_PORT_NAME_TIMESTAMPS, aggregationFilter, -// TraceEventRecordAggregationFilter::INPUT_PORT_NAME_TIME_EVENT) -// analysisInstance.connect(aggregationFilter, -// TraceEventRecordAggregationFilter::OUTPUT_PORT_NAME_TRACES, tcpConnector, -// RabbitMQConnector::INPUT_PORT_NAME_VALID_TRACES) + val countingThroughputFilter = initCountingThroughputFilter() + val teeFilter = initTeeFilter() + analysisInstance.connect(tcpReader, TCPReader::OUTPUT_PORT_NAME_RECORDS, countingThroughputFilter, + CountingThroughputFilter::INPUT_PORT_NAME_OBJECTS) + + analysisInstance.connect(countingThroughputFilter, + CountingThroughputFilter::OUTPUT_PORT_NAME_THROUGHPUT, teeFilter, + TeeFilter::INPUT_PORT_NAME_EVENTS) + try { analysisInstance.run() } catch (Exception e) { e.printStackTrace } } - - def teeOutput() { - // val countingThroughputFilter = initCountingThroughputFilter() - // val teeFilter = initTeeFilter() - // analysisInstance.connect(eventTraceReconstructionFilter, - // EventRecordTraceReconstructionFilter::OUTPUT_PORT_NAME_TRACE_VALID, countingThroughputFilter, - // CountingThroughputFilter::INPUT_PORT_NAME_OBJECTS) - // - // analysisInstance.connect(countingThroughputFilter, - // CountingThroughputFilter::OUTPUT_PORT_NAME_THROUGHPUT, teeFilter, - // TeeFilter::INPUT_PORT_NAME_EVENTS) + + def startWithCountingTracesThroughput() { + analysisInstance = new AnalysisController() + + val tcpReader = initTCPReader() + + val eventTraceReconstructionFilter = initEventRecordTraceReconstructionFilter() + val countingThroughputFilter = initCountingThroughputFilter() + val teeFilter = initTeeFilter() + + analysisInstance.connect(tcpReader, TCPReader::OUTPUT_PORT_NAME_RECORDS, eventTraceReconstructionFilter, + EventRecordTraceReconstructionFilter::INPUT_PORT_NAME_TRACE_RECORDS) + + analysisInstance.connect(eventTraceReconstructionFilter, + EventRecordTraceReconstructionFilter::OUTPUT_PORT_NAME_TRACE_VALID, countingThroughputFilter, + CountingThroughputFilter::INPUT_PORT_NAME_OBJECTS) + + analysisInstance.connect(countingThroughputFilter, + CountingThroughputFilter::OUTPUT_PORT_NAME_THROUGHPUT, teeFilter, + TeeFilter::INPUT_PORT_NAME_EVENTS) + + try { + analysisInstance.run() + } catch (Exception e) { + e.printStackTrace + } + } + + def startNormalWorker() { + analysisInstance = new AnalysisController() + + val tcpReader = initTCPReader() + val eventTraceReconstructionFilter = initEventRecordTraceReconstructionFilter() + + val aggregationFilter = initAggregationFilter() + + val timer = initTimer() + val tcpConnector = initTCPConnector() + + analysisInstance.connect(tcpReader, TCPReader::OUTPUT_PORT_NAME_RECORDS, eventTraceReconstructionFilter, + EventRecordTraceReconstructionFilter::INPUT_PORT_NAME_TRACE_RECORDS) + + analysisInstance.connect(eventTraceReconstructionFilter, + EventRecordTraceReconstructionFilter::OUTPUT_PORT_NAME_TRACE_VALID, aggregationFilter, + TraceEventRecordAggregationFilter::INPUT_PORT_NAME_TRACES) + analysisInstance.connect(eventTraceReconstructionFilter, + EventRecordTraceReconstructionFilter::OUTPUT_PORT_NAME_TRACE_INVALID, tcpConnector, + TCPConnector::INPUT_PORT_NAME_INVALID_TRACES) + + analysisInstance.connect(timer, TimeReader::OUTPUT_PORT_NAME_TIMESTAMPS, aggregationFilter, + TraceEventRecordAggregationFilter::INPUT_PORT_NAME_TIME_EVENT) + analysisInstance.connect(aggregationFilter, + TraceEventRecordAggregationFilter::OUTPUT_PORT_NAME_TRACES, tcpConnector, + TCPConnector::INPUT_PORT_NAME_VALID_TRACES) + + try { + analysisInstance.run() + } catch (Exception e) { + e.printStackTrace + } } def initTCPReader() { @@ -67,11 +104,9 @@ class WorkerController { new TCPReader(config, analysisInstance) } - def initTCPConnector() { -// val rabbitConfig = new Configuration() -// rabbitConfig.setProperty(RabbitMQConnector::CONFIG_PROPERTY_NAME_PROVIDER, "localhost") -// rabbitConfig.setProperty(RabbitMQConnector::CONFIG_PROPERTY_NAME_QUEUE, "validTraces") -// new RabbitMQConnector(rabbitConfig, analysisInstance) + def initEventRecordTraceReconstructionFilter() { + val config = new Configuration() + new EventRecordTraceReconstructionFilter(config, analysisInstance) } def initAggregationFilter() { @@ -92,13 +127,14 @@ class WorkerController { new CountingThroughputFilter(config, analysisInstance) } - def initEventRecordTraceReconstructionFilter() { + def initTeeFilter() { val config = new Configuration() - new EventRecordTraceReconstructionFilter(config, analysisInstance) + new TeeFilter(config, analysisInstance) } - def initTeeFilter() { + def initTCPConnector() { val config = new Configuration() - new TeeFilter(config, analysisInstance) + config.setProperty(TCPConnector::CONFIG_PROPERTY_NAME_PROVIDER, "127.0.0.1") + new TCPConnector(config, analysisInstance) } } diff --git a/src/explorviz/worker/main/WorkerStarter.java b/src/explorviz/worker/main/WorkerStarter.java index 81bbbf5eaa3501761b070575abbae63f30a710e9..7db7154cce523380f417fad2e7d3488cdd4a8257 100644 --- a/src/explorviz/worker/main/WorkerStarter.java +++ b/src/explorviz/worker/main/WorkerStarter.java @@ -1,8 +1,8 @@ package explorviz.worker.main; public class WorkerStarter { - - public static void main(String[] args) { - new WorkerController().start(); - } + + public static void main(String[] args) { + new WorkerController().startWithCountingRecordsThroughput(); + } } diff --git a/src/kieker/analysis/plugin/reader/mq/RabbitMQReader.java b/src/kieker/analysis/plugin/reader/mq/RabbitMQReader.java deleted file mode 100644 index 4767e8243282dbc70e628797af8fe44a9fc582d4..0000000000000000000000000000000000000000 --- a/src/kieker/analysis/plugin/reader/mq/RabbitMQReader.java +++ /dev/null @@ -1,405 +0,0 @@ -/*************************************************************************** - * Copyright 2013 Kieker Project (http://kieker-monitoring.net) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - ***************************************************************************/ - -package kieker.analysis.plugin.reader.mq; - -import java.io.IOException; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; -import kieker.analysis.IProjectContext; -import kieker.analysis.plugin.annotation.*; -import kieker.analysis.plugin.reader.AbstractReaderPlugin; -import kieker.common.configuration.Configuration; -import kieker.common.exception.MonitoringRecordException; -import kieker.common.logging.Log; -import kieker.common.logging.LogFactory; -import kieker.common.record.AbstractMonitoringRecord; -import kieker.common.record.IMonitoringRecord; -import com.rabbitmq.client.*; -import explorviz.hpc_monitoring.Bits; - -/** - * Reads monitoring records from the queue of an established RabbitMQ - * connection. - * - * @author Santje Finke - * - * @since 1.8 - */ -@Plugin(description = "A reader which reads records from a RabbitMQ queue", dependencies = "This plugin needs the file 'rabbitmq.client-*.jar'.", outputPorts = { @OutputPort(name = RabbitMQReader.OUTPUT_PORT_NAME_RECORDS, eventTypes = { IMonitoringRecord.class }, description = "Output Port of the JMSReader") }, configuration = { - @Property(name = RabbitMQReader.CONFIG_PROPERTY_NAME_PROVIDERURL, defaultValue = "localhost"), - @Property(name = RabbitMQReader.CONFIG_PROPERTY_NAME_QUEUE, defaultValue = "analysis"), - @Property(name = RabbitMQReader.CONFIG_PROPERTY_PASSWORD, defaultValue = "guest"), - @Property(name = RabbitMQReader.CONFIG_PROPERTY_USER, defaultValue = "guest"), - @Property(name = RabbitMQReader.CONFIG_PROPERTY_PORT, defaultValue = "5672") - -}) -public final class RabbitMQReader extends AbstractReaderPlugin { - - /** The name of the output port delivering the received records. */ - public static final String OUTPUT_PORT_NAME_RECORDS = "monitoringRecords"; - /** The name of the configuration determining the RabbitMQ provider URL. */ - public static final String CONFIG_PROPERTY_NAME_PROVIDERURL = "mqProviderUrl"; - /** - * The name of the configuration determining the RabbitMQ Queue (e.g. - * queue1). - */ - public static final String CONFIG_PROPERTY_NAME_QUEUE = "mqDestination"; - /** The username that is used to connect to a queue. */ - public static final String CONFIG_PROPERTY_USER = "guest"; - /** The password that is used to connect to a queue. */ - public static final String CONFIG_PROPERTY_PASSWORD = "guest"; - /** The port that is used to connect to a queue. */ - public static final String CONFIG_PROPERTY_PORT = "5672"; - - static final Log LOG = LogFactory - .getLog(RabbitMQReader.class); // NOPMD - // package - // for - // inner - // class - - private final String providerUrl; - private final String queueName; - private final String password; - private final String username; - private final int port; - private Connection connection; - private Channel channel; - private ConnectionFactory factory; - private QueueingConsumer normalConsumer; - - private final CountDownLatch cdLatch = new CountDownLatch( - 1); - - private final Map<Integer, String> stringRegistry = new ConcurrentHashMap<Integer, String>(); - - private final RabbitMQRegistryConsumer registryConsumer; - - /** - * Creates a new instance of this class using the given parameters. - * - * @param configuration - * The configuration used to initialize the whole reader. Keep in - * mind that the configuration should contain the following - * properties: - * <ul> - * <li>The property {@link #CONFIG_PROPERTY_NAME_PROVIDERURL}, - * e.g. {@code localhost} - * <li>The property {@link #CONFIG_PROPERTY_NAME_QUEUE}, e.g. - * {@code queue1} - * <li>The property {@link #CONFIG_PROPERTY_PASSWORD}, e.g. - * {@code password} - * <li>The property {@link #CONFIG_PROPERTY_USER}, e.g. - * {@code username} - * <li>The property {@link #CONFIG_PROPERTY_PORT}, e.g. - * {@code port} - * </ul> - * @param projectContext - * The project context for this component. - * - * @throws IllegalArgumentException - * If one of the properties is empty. - */ - public RabbitMQReader(final Configuration configuration, - final IProjectContext projectContext) - throws IllegalArgumentException { - super(configuration, projectContext); - - // Initialize the reader bases on the given configuration. - providerUrl = configuration - .getStringProperty(CONFIG_PROPERTY_NAME_PROVIDERURL); - queueName = configuration.getStringProperty(CONFIG_PROPERTY_NAME_QUEUE); - username = configuration.getStringProperty(CONFIG_PROPERTY_USER); - password = configuration.getStringProperty(CONFIG_PROPERTY_PASSWORD); - port = configuration.getIntProperty(CONFIG_PROPERTY_PORT); - // simple sanity check - if ((providerUrl.length() == 0) || (queueName.length() == 0)) { - throw new IllegalArgumentException( - "RabbitMQReader has not sufficient parameters. providerUrl ('" - + providerUrl + "') or destination ('" + queueName - + "'), is null"); - } - - registryConsumer = new RabbitMQRegistryConsumer(this, providerUrl, - "registryRecords", username, password, port); - registryConsumer.start(); - } - - /** - * Creates a new instance of this class using the given parameters. - * - * @param configuration - * The configuration used to initialize the whole reader. Keep in - * mind that the configuration should contain the following - * properties: - * <ul> - * <li>The property {@link #CONFIG_PROPERTY_NAME_PROVIDERURL}, - * e.g. {@code localhost} - * <li>The property {@link #CONFIG_PROPERTY_NAME_QUEUE}, e.g. - * {@code queue1} - * <li>The property {@link #CONFIG_PROPERTY_PASSWORD}, e.g. - * {@code password} - * <li>The property {@link #CONFIG_PROPERTY_USER}, e.g. - * {@code username} - * </ul> - * - * @throws IllegalArgumentException - * If one of the properties is empty. - * - * @deprecated To be removed in Kieker 1.8. - */ - @Deprecated - public RabbitMQReader(final Configuration configuration) - throws IllegalArgumentException { - this(configuration, null); - } - - /** - * A call to this method is a blocking call. - * - * @return true if the method succeeds, false otherwise. - */ - public boolean read() { - boolean retVal = true; - try { - createConnectionFactory(); - connect(); - while (!Thread.interrupted()) { - - if (!connection.isOpen() || !channel.isOpen()) { - reconnect(); - } - - final QueueingConsumer.Delivery delivery = normalConsumer - .nextDelivery(); - - byte[] batchedMessages = delivery.getBody(); - messagesfromByteArray(batchedMessages, 0); - - normalConsumer.getChannel().basicAck( - delivery.getEnvelope().getDeliveryTag(), false); - } - } - catch (final IOException ex) { // NOPMD NOCS (IllegalCatchCheck) - LOG.error("Error in read()", ex); - retVal = false; - } - catch (final ShutdownSignalException e) { - LOG.error("Error in read(): ShutdownSignal Exception", e); - retVal = false; - } - catch (final ConsumerCancelledException e) { - LOG.error("Error in read(): ConsumerCancelled Exception", e); - retVal = false; - } - catch (final InterruptedException e) { - LOG.error("Error in read(): Interrupted Exception", e); - retVal = false; - } - finally { - try { - if (connection != null) { - connection.close(); - } - } - catch (final IOException e) { - LOG.error("Error in read()", e); - } - } - return retVal; - } - - private void createConnectionFactory() { - factory = new ConnectionFactory(); - factory.setHost(providerUrl); - factory.setPort(port); - factory.setConnectionTimeout(0); - factory.setUsername(username); - factory.setPassword(password); - } - - private void connect() throws IOException { - connection = factory.newConnection(); - channel = connection.createChannel(); - - channel.queueDeclare(queueName, false, false, false, null); - - normalConsumer = new QueueingConsumer(channel); - channel.basicConsume(queueName, false, normalConsumer); - channel.basicQos(10); - } - - private void reconnect() { - try { - connect(); - } - catch (final IOException e) { - RabbitMQReader.LOG.error("Error reestablishing connection", e); - } - } - - private void messagesfromByteArray(final byte[] b, int initOffset) { - int offset = initOffset; - - while (Bits.getInt(b, offset) != -1) { - int firstValue = Bits.getInt(b, offset); - offset += 4; - - String classname = getStringFromRegistry(firstValue); - - Class<? extends IMonitoringRecord> clazz = null; - Class<?>[] recordTypes = null; - try { - clazz = AbstractMonitoringRecord.classForName(classname); - recordTypes = AbstractMonitoringRecord.typesForClass(clazz); - } - catch (MonitoringRecordException e) { - LOG.error("could not create record", e); - } - - final long loggingTimestamp = Bits.getLong(b, offset); - offset += 8; - - final Object[] values = new Object[recordTypes.length]; - int valueIndex = 0; - - for (Class<?> recordType : recordTypes) { - if (recordType == String.class) { - values[valueIndex] = getStringFromRegistry(Bits.getInt(b, - offset)); - offset += 4; - valueIndex++; - } - else if ((recordType == Integer.class) - || (recordType == int.class)) { - values[valueIndex] = Bits.getInt(b, offset); - offset += 4; - valueIndex++; - } - else if ((recordType == Long.class) - || (recordType == long.class)) { - values[valueIndex] = Bits.getLong(b, offset); - offset += 8; - valueIndex++; - } - else if ((recordType == Float.class) - || (recordType == float.class)) { - values[valueIndex] = Bits.getFloat(b, offset); - offset += 4; - valueIndex++; - } - else if ((recordType == Double.class) - || (recordType == double.class)) { - values[valueIndex] = Bits.getDouble(b, offset); - offset += 8; - valueIndex++; - } - else if ((recordType == Byte.class) - || (recordType == byte.class)) { - values[valueIndex] = Bits.getByte(b, offset); - offset += 1; - valueIndex++; - } - else if ((recordType == Short.class) - || (recordType == short.class)) { - values[valueIndex] = Bits.getShort(b, offset); - offset += 2; - valueIndex++; - } - else if ((recordType == boolean.class) - || (recordType == Boolean.class)) { - values[valueIndex] = Bits.getBoolean(b, offset); - offset += 1; - valueIndex++; - } - else { - values[valueIndex] = Bits.getByte(b, offset); - offset += 1; - valueIndex++; - } - } - IMonitoringRecord record = null; - try { - record = AbstractMonitoringRecord - .createFromArray(clazz, values); - } - catch (MonitoringRecordException e) { - LOG.error("could not create record", e); - } - record.setLoggingTimestamp(loggingTimestamp); - - if (!super.deliver(OUTPUT_PORT_NAME_RECORDS, record)) { - LOG.error("deliverRecord returned false"); - } - } - } - - final void unblock() { // NOPMD (package visible for inner class) - cdLatch.countDown(); - } - - /** - * {@inheritDoc} - */ - public void terminate(final boolean error) { - LOG.info("Shutdown of RabbitMQReader requested."); - registryConsumer.interrupt(); - unblock(); - } - - /** - * {@inheritDoc} - */ - @Override - public Configuration getCurrentConfiguration() { - final Configuration configuration = new Configuration(); - - configuration - .setProperty(CONFIG_PROPERTY_NAME_PROVIDERURL, providerUrl); - configuration.setProperty(CONFIG_PROPERTY_NAME_QUEUE, queueName); - configuration.setProperty(CONFIG_PROPERTY_USER, username); - configuration.setProperty(CONFIG_PROPERTY_PASSWORD, password); - - return configuration; - } - - public void addToRegistry(Integer key, String value) { - stringRegistry.put(key, value); - - synchronized (this) { - notifyAll(); - } - } - - private String getStringFromRegistry(Integer id) { - String result = stringRegistry.get(id); - while (result == null) { - try { - synchronized (this) { - this.wait(); - } - } - catch (InterruptedException e) { - e.printStackTrace(); - } - result = stringRegistry.get(id); - } - - return result; - } -} diff --git a/src/kieker/analysis/plugin/reader/mq/RabbitMQRegistryConsumer.java b/src/kieker/analysis/plugin/reader/mq/RabbitMQRegistryConsumer.java deleted file mode 100644 index 357caa0aa53f6f74ff91911a7ca30056652bf2e9..0000000000000000000000000000000000000000 --- a/src/kieker/analysis/plugin/reader/mq/RabbitMQRegistryConsumer.java +++ /dev/null @@ -1,124 +0,0 @@ -package kieker.analysis.plugin.reader.mq; - -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.ObjectInputStream; - -import kieker.common.logging.Log; -import kieker.common.logging.LogFactory; -import kieker.common.record.misc.RegistryRecord; - -import com.rabbitmq.client.Channel; -import com.rabbitmq.client.Connection; -import com.rabbitmq.client.ConnectionFactory; -import com.rabbitmq.client.ConsumerCancelledException; -import com.rabbitmq.client.QueueingConsumer; -import com.rabbitmq.client.ShutdownSignalException; - -public class RabbitMQRegistryConsumer extends Thread { - - static final Log LOG = LogFactory.getLog(RabbitMQReader.class); // NOPMD package for inner class - - private final RabbitMQReader parent; - - private Connection connection; - private Channel channel; - private ConnectionFactory factory; - private QueueingConsumer registryConsumer; - - private String providerUrl; - private String queueName; - private String password; - private String username; - private int port; - - public RabbitMQRegistryConsumer(RabbitMQReader parent, String providerUrl, String queueName, String password, String username, int port) { - this.parent = parent; - this.providerUrl = providerUrl; - this.queueName = queueName; - this.password = password; - this.username = username; - this.port = port; - - } - - @Override - public void run() { - try { - createConnectionFactory(); - - connect(); - - while (!Thread.interrupted()) { - - if (!this.connection.isOpen() || !this.channel.isOpen()) { - this.reconnect(); - } - - final QueueingConsumer.Delivery delivery = this.registryConsumer.nextDelivery(); - - final Object message = readObjectFromBytes(delivery.getBody()); - - - if (message instanceof RegistryRecord) { - @SuppressWarnings("unchecked") - final RegistryRecord<String> record = (RegistryRecord<String>) message; - parent.addToRegistry(record.getId(), record.getObject()); - } - } - } catch (final IOException ex) { // NOPMD NOCS (IllegalCatchCheck) - LOG.error("Error in read()", ex); - } catch (final ClassNotFoundException e) { - LOG.error("Error in read(): ClassNotFound Exception", e); - } catch (final ShutdownSignalException e) { - LOG.error("Error in read(): ShutdownSignal Exception", e); - } catch (final ConsumerCancelledException e) { - LOG.error("Error in read(): ConsumerCancelled Exception", e); - } catch (final InterruptedException e) { - LOG.error("Error in read(): Interrupted Exception", e); - } finally { - try { - if (this.connection != null) { - this.connection.close(); - } - } catch (final IOException e) { - LOG.error("Error in read()", e); - } - } - } - - private void createConnectionFactory() { - this.factory = new ConnectionFactory(); - this.factory.setHost(this.providerUrl); - this.factory.setPort(this.port); - this.factory.setConnectionTimeout(0); - this.factory.setUsername(this.username); - this.factory.setPassword(this.password); - } - - private void connect() throws IOException { - this.connection = this.factory.newConnection(); - this.channel = this.connection.createChannel(); - - this.channel.queueDeclare(this.queueName, false, false, false, null); - - this.registryConsumer = new QueueingConsumer(this.channel); - this.channel.basicConsume(this.queueName, true, this.registryConsumer); - } - - private void reconnect() { - try { - connect(); - } catch (final IOException e) { - RabbitMQReader.LOG.error("Error reestablishing connection", e); - } - } - - private Object readObjectFromBytes(final byte[] bs) - throws IOException, ClassNotFoundException { - final ByteArrayInputStream bain = new ByteArrayInputStream(bs); - final ObjectInputStream in = new ObjectInputStream(bain); - final Object message = in.readObject(); - return message; - } -}