diff --git a/.classpath b/.classpath index 4d76e2942af91530759af460bbce2ef4c614fd16..f0fe3356694031424a2c4dfdf88b0a0bc1d92f3b 100644 --- a/.classpath +++ b/.classpath @@ -2,9 +2,10 @@ <classpath> <classpathentry kind="con" path="org.eclipse.xtend.XTEND_CONTAINER"/> <classpathentry kind="src" path="xtend-gen"/> - <classpathentry excluding="kieker/analysis/plugin/filter/flow/" kind="src" path="src"/> + <classpathentry kind="src" path="src"/> <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/> <classpathentry kind="lib" path="lib/rabbitmq-client.jar"/> - <classpathentry combineaccessrules="false" kind="src" path="/kieker"/> + <classpathentry kind="lib" path="lib/kieker-1.8-SNAPSHOT.jar"/> + <classpathentry combineaccessrules="false" kind="src" path="/monitored-application"/> <classpathentry kind="output" path="bin"/> </classpath> diff --git a/src/explorviz/hpc_monitoring/connector/RabbitMQConnector.java b/src/explorviz/hpc_monitoring/connector/RabbitMQConnector.java new file mode 100644 index 0000000000000000000000000000000000000000..2f1f274153288c53b911caf282d3c9423a315514 --- /dev/null +++ b/src/explorviz/hpc_monitoring/connector/RabbitMQConnector.java @@ -0,0 +1,459 @@ +/*************************************************************************** + * Copyright 2013 Kieker Project (http://kieker-monitoring.net) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ***************************************************************************/ + +package explorviz.hpc_monitoring.connector; + +import java.io.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import kieker.analysis.IProjectContext; +import kieker.analysis.plugin.Bits; +import kieker.analysis.plugin.annotation.*; +import kieker.analysis.plugin.filter.AbstractFilterPlugin; +import kieker.common.configuration.Configuration; +import kieker.common.logging.Log; +import kieker.common.logging.LogFactory; +import kieker.common.record.IMonitoringRecord; +import com.rabbitmq.client.*; +import explorviz.hpc_monitoring.StringRegistryRecord; + +/** + * A plugin used for kieker in the cloud. + * All incoming events are put into a RabbitMQ, but are also passed to an output + * port that can be used for + * testing purposes. + * + * @author Santje Finke + * + * @since 1.8 + * + */ +@Plugin(description = "A filter that writes all incoming events into a specified queue on a specified RabbitMQServer", outputPorts = { @OutputPort(name = RabbitMQConnector.OUTPUT_PORT_NAME, eventTypes = { Object.class }, description = "Provides each incoming object") }, configuration = { + @Property(name = RabbitMQConnector.CONFIG_PROPERTY_NAME_PROVIDER, defaultValue = "localhost"), + @Property(name = RabbitMQConnector.CONFIG_PROPERTY_NAME_QUEUE, defaultValue = "master"), + @Property(name = RabbitMQConnector.CONFIG_PROPERTY_PASSWORD, defaultValue = "guest"), + @Property(name = RabbitMQConnector.CONFIG_PROPERTY_USER, defaultValue = "guest") }) +public class RabbitMQConnector extends AbstractFilterPlugin { + private static final int MESSAGE_BUFFER_SIZE = 1500; + + /** + * The name of the input port receiving the incoming events. + */ + public static final String INPUT_PORT_NAME_INVALID_TRACES = "inputInvalidTraces"; + public static final String INPUT_PORT_NAME_VALID_TRACES = "inputValidTraces"; + + /** + * The name of the output port passing the incoming events. + */ + public static final String OUTPUT_PORT_NAME = "relayedEvents"; + /** + * The name of the property determining the address of the used Server. + */ + public static final String CONFIG_PROPERTY_NAME_PROVIDER = "providerUrl"; + /** + * The name of the property determining the name of the Queue. + */ + public static final String CONFIG_PROPERTY_NAME_QUEUE = "queueName"; + + /** + * The username that is used to connect to a queue. + */ + public static final String CONFIG_PROPERTY_USER = "guest"; + /** + * The password that is used to connect to a queue. + */ + public static final String CONFIG_PROPERTY_PASSWORD = "guest"; + + private static final Log LOG = LogFactory + .getLog(RabbitMQConnector.class); + + private ConnectionFactory factory; + private Connection connection; + + private final String providerUrl; + private Channel channel; + private final String queue; + + private String username = "guest"; + private String password = "guest"; + + private final byte[] validTracesMessages = new byte[MESSAGE_BUFFER_SIZE]; + private int validTracesMessagesOffset = 0; + + private final byte[] invalidTracesMessages = new byte[MESSAGE_BUFFER_SIZE]; + private int invalidTracesMessagesOffset = 0; + + private final ConcurrentHashMap<String, Integer> stringReg = new ConcurrentHashMap<String, Integer>( + 16, + 0.75f, + 2); + + private final AtomicInteger stringRegIndex = new AtomicInteger( + 0); + + public RabbitMQConnector(final Configuration configuration, + final IProjectContext projectContext) { + super(configuration, projectContext); + providerUrl = configuration + .getStringProperty(CONFIG_PROPERTY_NAME_PROVIDER); + queue = configuration.getStringProperty(CONFIG_PROPERTY_NAME_QUEUE); + username = configuration.getStringProperty(CONFIG_PROPERTY_USER); + password = configuration.getStringProperty(CONFIG_PROPERTY_PASSWORD); + createConnectionFactory(providerUrl); + try { + connect(); + } + catch (IOException e) { + e.printStackTrace(); + } + } + + private void createConnectionFactory(final String provider) { + factory = new ConnectionFactory(); + factory.setHost(provider); + factory.setConnectionTimeout(0); + factory.setUsername(username); + factory.setPassword(password); + } + + private void connect() throws IOException { + connection = factory.newConnection(); + channel = connection.createChannel(); + channel.queueDeclare("validTracesMaster", false, false, false, null); + channel.queueDeclare("invalidTracesMaster", false, false, false, null); + channel.queueDeclare("registryRecordsMaster", false, false, false, null); + } + + private void sendRegistryRecord(StringRegistryRecord record) { + // TODO propagate to each new rabbitMQ queue + final ByteArrayOutputStream boas = new ByteArrayOutputStream(); + + ObjectOutputStream out; + try { + out = new ObjectOutputStream(boas); + out.writeObject(record); + out.close(); + + byte[] message2 = boas.toByteArray(); + sendMessage(message2, "registryRecordsMaster"); + } + catch (IOException e) { + e.printStackTrace(); + } + } + + /** + * This method represents the input port of this filter. + * + * @param event + * The next event. + */ + @InputPort(name = INPUT_PORT_NAME_VALID_TRACES, eventTypes = { Object.class }, description = "Receives incoming objects to be forwarded to a queue") + public final void inputValidTraces(final Object monitoringRecord) { + try { + // if (monitoringRecord instanceof IMonitoringRecord) { + // byte[] message2 = toByteArray((IMonitoringRecord) + // monitoringRecord); + + final ByteArrayOutputStream boas = new ByteArrayOutputStream(); + + ObjectOutputStream out = new ObjectOutputStream(boas); + out.writeObject(monitoringRecord); + out.close(); + + byte[] message2 = boas.toByteArray(); // TODO optimize + + System.arraycopy(message2, 0, validTracesMessages, + validTracesMessagesOffset, message2.length); + validTracesMessagesOffset += message2.length; + + if ((validTracesMessagesOffset > (MESSAGE_BUFFER_SIZE - MESSAGE_BUFFER_SIZE))) { // TODO + // unsafe + // 200 + // || + // writeQueue.isEmpty() + Bits.putInt(validTracesMessages, validTracesMessagesOffset, -1); + sendMessage(validTracesMessages, "validTracesMaster"); + validTracesMessagesOffset = 0; + } + } + catch (final IOException e) { + LOG.error("Error sending record", e); + } + } + + /** + * This method represents the input port of this filter. + * + * @param event + * The next event. + */ + @InputPort(name = INPUT_PORT_NAME_INVALID_TRACES, eventTypes = { Object.class }, description = "Receives incoming objects to be forwarded to a queue") + public final void inputInvalidTraces(final Object monitoringRecord) { + try { + // if (monitoringRecord instanceof IMonitoringRecord) { + // byte[] message2 = toByteArray((IMonitoringRecord) + // monitoringRecord); + + final ByteArrayOutputStream boas = new ByteArrayOutputStream(); + + ObjectOutputStream out = new ObjectOutputStream(boas); + out.writeObject(monitoringRecord); + out.close(); + + byte[] message2 = boas.toByteArray(); // TODO optimize + + System.arraycopy(message2, 0, invalidTracesMessages, + invalidTracesMessagesOffset, message2.length); + invalidTracesMessagesOffset += message2.length; + + if ((invalidTracesMessagesOffset > (MESSAGE_BUFFER_SIZE - MESSAGE_BUFFER_SIZE))) { // TODO + // unsafe + // 200 + // || + // writeQueue.isEmpty() + Bits.putInt(invalidTracesMessages, invalidTracesMessagesOffset, + -1); + sendMessage(invalidTracesMessages, "invalidTracesMaster"); + invalidTracesMessagesOffset = 0; + } + } + catch (final IOException e) { + LOG.error("Error sending record", e); + } + } + + @SuppressWarnings("unused") + private byte[] toByteArray(final IMonitoringRecord monitoringRecord) { + final Class<?>[] recordTypes = monitoringRecord.getValueTypes(); + + int arraySize = 4 + 8; + + for (Class<?> recordType : recordTypes) { + if (recordType == String.class) { + arraySize += 4; + } + else if ((recordType == Integer.class) || (recordType == int.class)) { + arraySize += 4; + } + else if ((recordType == Long.class) || (recordType == long.class)) { + arraySize += 8; + } + else if ((recordType == Float.class) || (recordType == float.class)) { + arraySize += 4; + } + else if ((recordType == Double.class) + || (recordType == double.class)) { + arraySize += 8; + } + else if ((recordType == Byte.class) || (recordType == byte.class)) { + arraySize += 1; + } + else if ((recordType == Short.class) || (recordType == short.class)) { + arraySize += 2; + } + else if ((recordType == Boolean.class) + || (recordType == boolean.class)) { + arraySize += 1; + } + else { + arraySize += 1; + } + } + + byte[] result = new byte[arraySize]; + int offset = 0; + + Bits.putInt(result, offset, getIdForString(monitoringRecord.getClass() + .getName())); + offset += 4; + + Bits.putLong(result, offset, monitoringRecord.getLoggingTimestamp()); + offset += 8; + final Object[] recordFields = monitoringRecord.toArray(); + + for (int i = 0; i < recordFields.length; i++) { + if (recordFields[i] == null) { + if (recordTypes[i] == String.class) { + Bits.putInt(result, offset, getIdForString("")); + offset += 4; + } + else if ((recordTypes[i] == Integer.class) + || (recordTypes[i] == int.class)) { + Bits.putInt(result, offset, 0); + offset += 4; + } + else if ((recordTypes[i] == Long.class) + || (recordTypes[i] == long.class)) { + Bits.putLong(result, offset, 0L); + offset += 8; + } + else if ((recordTypes[i] == Float.class) + || (recordTypes[i] == float.class)) { + Bits.putFloat(result, offset, 0); + offset += 4; + } + else if ((recordTypes[i] == Double.class) + || (recordTypes[i] == double.class)) { + Bits.putDouble(result, offset, 0); + offset += 8; + } + else if ((recordTypes[i] == Byte.class) + || (recordTypes[i] == byte.class)) { + Bits.putByte(result, offset, (byte) 0); + offset += 1; + } + else if ((recordTypes[i] == Short.class) + || (recordTypes[i] == short.class)) { + Bits.putShort(result, offset, (short) 0); + offset += 2; + } + else if ((recordTypes[i] == Boolean.class) + || (recordTypes[i] == boolean.class)) { + Bits.putBoolean(result, offset, false); + offset += 1; + } + else { + LOG.warn("Record with unsupported recordField of type " + + recordFields[i].getClass()); + Bits.putByte(result, offset, (byte) 0); + offset += 1; + } + } + else if (recordFields[i] instanceof String) { + Bits.putInt(result, offset, + getIdForString((String) recordFields[i])); + offset += 4; + } + else if (recordFields[i] instanceof Integer) { + Bits.putInt(result, offset, (Integer) recordFields[i]); + offset += 4; + } + else if (recordFields[i] instanceof Long) { + Bits.putLong(result, offset, (Long) recordFields[i]); + offset += 8; + } + else if (recordFields[i] instanceof Float) { + Bits.putFloat(result, offset, (Float) recordFields[i]); + offset += 4; + } + else if (recordFields[i] instanceof Double) { + Bits.putDouble(result, offset, (Double) recordFields[i]); + offset += 8; + } + else if (recordFields[i] instanceof Byte) { + Bits.putByte(result, offset, (Byte) recordFields[i]); + offset += 1; + } + else if (recordFields[i] instanceof Short) { + Bits.putShort(result, offset, (Short) recordFields[i]); + offset += 2; + } + else if (recordFields[i] instanceof Boolean) { + Bits.putBoolean(result, offset, (Boolean) recordFields[i]); + offset += 1; + } + else { + LOG.warn("Record with unsupported recordField of type " + + recordFields[i].getClass()); + Bits.putByte(result, offset, (byte) 0); + offset += 1; + } + } + + return result; + } + + public int getIdForString(String value) { + Integer result = stringReg.get(value); + + if (result == null) { + result = stringRegIndex.getAndIncrement(); + stringReg.put(value, result); + sendRegistryRecord(new StringRegistryRecord(result, value)); + } + + return result; + } + + private void sendMessage(final byte[] message, String queueName) + throws IOException { + + synchronized (this) { + if (!connection.isOpen() || !channel.isOpen()) { + connect(); + } + channel.basicPublish("", queueName, MessageProperties.BASIC, + message); + } + } + + protected final void cleanup() { + disconnect(); + } + + private void disconnect() { + try { + if (channel != null) { + channel.close(); + } + } + catch (final IOException e) { + LOG.info("Error closing connection", e); + } + + try { + if (connection != null) { + connection.close(); + } + } + catch (final IOException e) { + LOG.info("Error closing connection", e); + } + } + + @Override + public final String toString() { + final StringBuilder sb = new StringBuilder(128); + sb.append(super.toString()); + sb.append("; Channel: '"); + if (null != channel) { + sb.append(channel.toString()); + } + else { + sb.append("null"); + } + sb.append("'; Connection: '"); + if (null != connection) { + sb.append(connection.toString()); + } + else { + sb.append("null"); + } + sb.append('\''); + return sb.toString(); + } + + @Override + public Configuration getCurrentConfiguration() { + final Configuration configuration = new Configuration(); + configuration.setProperty(CONFIG_PROPERTY_NAME_PROVIDER, providerUrl); + configuration.setProperty(CONFIG_PROPERTY_NAME_QUEUE, queue); + configuration.setProperty(CONFIG_PROPERTY_PASSWORD, password); + configuration.setProperty(CONFIG_PROPERTY_USER, username); + return configuration; + } +} diff --git a/src/explorviz/hpc_monitoring/plugin/EventRecordTraceReconstructionFilter.java b/src/explorviz/hpc_monitoring/plugin/EventRecordTraceReconstructionFilter.java new file mode 100644 index 0000000000000000000000000000000000000000..5bef4cb5776a595f44a29296acff6c3d021f2590 --- /dev/null +++ b/src/explorviz/hpc_monitoring/plugin/EventRecordTraceReconstructionFilter.java @@ -0,0 +1,421 @@ +/*************************************************************************** + * Copyright 2013 Kieker Project (http://kieker-monitoring.net) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ***************************************************************************/ + +package explorviz.hpc_monitoring.plugin; + +import java.util.*; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import kieker.analysis.IProjectContext; +import kieker.analysis.plugin.annotation.*; +import kieker.analysis.plugin.filter.AbstractFilterPlugin; +import kieker.common.configuration.Configuration; +import kieker.common.logging.Log; +import kieker.common.logging.LogFactory; +import kieker.common.record.flow.IFlowRecord; +import explorviz.hpc_monitoring.record.Trace; +import explorviz.hpc_monitoring.record.TraceEventRecords; +import explorviz.hpc_monitoring.record.events.*; + +/** + * @author Jan Waller + * + * @since 1.6 + */ +@Plugin(name = "Trace Reconstruction Filter (Event)", description = "Filter to reconstruct event based (flow) traces", outputPorts = { + @OutputPort(name = EventRecordTraceReconstructionFilter.OUTPUT_PORT_NAME_TRACE_VALID, description = "Outputs valid traces", eventTypes = { TraceEventRecords.class }), + @OutputPort(name = EventRecordTraceReconstructionFilter.OUTPUT_PORT_NAME_TRACE_INVALID, description = "Outputs traces missing crucial records", eventTypes = { TraceEventRecords.class }) }, configuration = { + @Property(name = EventRecordTraceReconstructionFilter.CONFIG_PROPERTY_NAME_TIMEUNIT, defaultValue = EventRecordTraceReconstructionFilter.CONFIG_PROPERTY_VALUE_TIMEUNIT), + @Property(name = EventRecordTraceReconstructionFilter.CONFIG_PROPERTY_NAME_MAX_TRACE_DURATION, defaultValue = EventRecordTraceReconstructionFilter.CONFIG_PROPERTY_VALUE_MAX_TIME), + @Property(name = EventRecordTraceReconstructionFilter.CONFIG_PROPERTY_NAME_MAX_TRACE_TIMEOUT, defaultValue = EventRecordTraceReconstructionFilter.CONFIG_PROPERTY_VALUE_MAX_TIME) }) +public final class EventRecordTraceReconstructionFilter extends + AbstractFilterPlugin { + /** + * The name of the output port delivering the valid traces. + */ + public static final String OUTPUT_PORT_NAME_TRACE_VALID = "validTraces"; + /** + * The name of the output port delivering the invalid traces. + */ + public static final String OUTPUT_PORT_NAME_TRACE_INVALID = "invalidTraces"; + /** + * The name of the input port receiving the trace records. + */ + public static final String INPUT_PORT_NAME_TRACE_RECORDS = "traceRecords"; + + /** + * The name of the property determining the time unit. + */ + public static final String CONFIG_PROPERTY_NAME_TIMEUNIT = "timeunit"; + /** + * The name of the property determining the maximal trace duration. + */ + public static final String CONFIG_PROPERTY_NAME_MAX_TRACE_DURATION = "maxTraceDuration"; + /** + * The name of the property determining the maximal trace timeout. + */ + public static final String CONFIG_PROPERTY_NAME_MAX_TRACE_TIMEOUT = "maxTraceTimeout"; + /** + * The default value of the properties for the maximal trace duration and + * timeout. + */ + public static final String CONFIG_PROPERTY_VALUE_MAX_TIME = "9223372036854775807"; // String.valueOf(Long.MAX_VALUE) + /** + * The default value of the time unit property (nanoseconds). + */ + public static final String CONFIG_PROPERTY_VALUE_TIMEUNIT = "NANOSECONDS"; // TimeUnit.NANOSECONDS.name() + + private static final Log LOG = LogFactory + .getLog(EventRecordTraceReconstructionFilter.class); + + private final TimeUnit timeunit; + private final long maxTraceDuration; + private final long maxTraceTimeout; + private final boolean timeout; + private long maxEncounteredLoggingTimestamp = -1; + + private final Map<Long, TraceBuffer> traceId2trace; + + /** + * Creates a new instance of this class using the given parameters. + * + * @param configuration + * The configuration for this component. + * @param projectContext + * The project context for this component. + */ + public EventRecordTraceReconstructionFilter( + final Configuration configuration, + final IProjectContext projectContext) { + super(configuration, projectContext); + + final String recordTimeunitProperty = projectContext + .getProperty(IProjectContext.CONFIG_PROPERTY_NAME_RECORDS_TIME_UNIT); + TimeUnit recordTimeunit; + try { + recordTimeunit = TimeUnit.valueOf(recordTimeunitProperty); + } + catch (final IllegalArgumentException ex) { // already caught in + // AnalysisController, + // should never happen + LOG.warn(recordTimeunitProperty + + " is no valid TimeUnit! Using NANOSECONDS instead."); + recordTimeunit = TimeUnit.NANOSECONDS; + } + timeunit = recordTimeunit; + + final String configTimeunitProperty = configuration + .getStringProperty(CONFIG_PROPERTY_NAME_TIMEUNIT); + TimeUnit configTimeunit; + try { + configTimeunit = TimeUnit.valueOf(configTimeunitProperty); + } + catch (final IllegalArgumentException ex) { + LOG.warn(configTimeunitProperty + + " is no valid TimeUnit! Using inherited value of " + + timeunit.name() + " instead."); + configTimeunit = timeunit; + } + + maxTraceDuration = timeunit.convert(configuration + .getLongProperty(CONFIG_PROPERTY_NAME_MAX_TRACE_DURATION), + configTimeunit); + maxTraceTimeout = timeunit.convert(configuration + .getLongProperty(CONFIG_PROPERTY_NAME_MAX_TRACE_TIMEOUT), + configTimeunit); + timeout = !((maxTraceTimeout == Long.MAX_VALUE) && (maxTraceDuration == Long.MAX_VALUE)); + traceId2trace = new ConcurrentHashMap<Long, TraceBuffer>(); + } + + /** + * This method is the input port for the new events for this filter. + * + * @param record + * The new record to handle. + */ + @InputPort(name = INPUT_PORT_NAME_TRACE_RECORDS, description = "Reconstruct traces from incoming flow records", eventTypes = { + Trace.class, AbstractOperationEvent.class }) + public void newEvent(final IFlowRecord record) { + final Long traceId; + TraceBuffer traceBuffer; + final long loggingTimestamp; + if (record instanceof Trace) { + traceId = ((Trace) record).getTraceId(); + traceBuffer = traceId2trace.get(traceId); + if (traceBuffer == null) { // first record for this id! + synchronized (this) { + traceBuffer = traceId2trace.get(traceId); + if (traceBuffer == null) { // NOCS (DCL) + traceBuffer = new TraceBuffer(); + traceId2trace.put(traceId, traceBuffer); + } + } + } + traceBuffer.setTrace((Trace) record); + loggingTimestamp = -1; + } + else if (record instanceof AbstractOperationEvent) { + traceId = ((AbstractOperationEvent) record).getTraceId(); + traceBuffer = traceId2trace.get(traceId); + if (traceBuffer == null) { // first record for this id! + synchronized (this) { + traceBuffer = traceId2trace.get(traceId); + if (traceBuffer == null) { // NOCS (DCL) + traceBuffer = new TraceBuffer(); + traceId2trace.put(traceId, traceBuffer); + } + } + } + traceBuffer.insertEvent((AbstractOperationEvent) record); + loggingTimestamp = ((AbstractOperationEvent) record) + .getLoggingTimestamp(); + } + else { + return; // invalid type which should not happen due to the specified + // eventTypes + } + if (traceBuffer.isFinished()) { + synchronized (this) { // has to be synchronized because of timeout + // cleanup + traceId2trace.remove(traceId); + } + super.deliver(OUTPUT_PORT_NAME_TRACE_VALID, + traceBuffer.toTraceEvents()); + } + if (timeout) { + synchronized (this) { + // can we assume a rough order of logging timestamps? (yes, + // except with DB reader) + if (loggingTimestamp > maxEncounteredLoggingTimestamp) { + maxEncounteredLoggingTimestamp = loggingTimestamp; + } + processTimeoutQueue(maxEncounteredLoggingTimestamp); + } + } + } + + /** + * {@inheritDoc} + */ + @Override + public void terminate(final boolean error) { + synchronized (this) { + for (final Entry<Long, TraceBuffer> entry : traceId2trace + .entrySet()) { + final TraceBuffer traceBuffer = entry.getValue(); + if (traceBuffer.isInvalid()) { + super.deliver(OUTPUT_PORT_NAME_TRACE_INVALID, + traceBuffer.toTraceEvents()); + } + else { + super.deliver(OUTPUT_PORT_NAME_TRACE_VALID, + traceBuffer.toTraceEvents()); + } + } + traceId2trace.clear(); + } + } + + // only called within synchronized! We assume timestamps >= 0 + private void processTimeoutQueue(final long timestamp) { + final long duration = timestamp - maxTraceDuration; + final long traceTimeout = timestamp - maxTraceTimeout; + for (final Iterator<Entry<Long, TraceBuffer>> iterator = traceId2trace + .entrySet().iterator(); iterator.hasNext();) { + final TraceBuffer traceBuffer = iterator.next().getValue(); + if ((traceBuffer.getMaxLoggingTimestamp() <= traceTimeout) // long + // time + // no see + || (traceBuffer.getMinLoggingTimestamp() <= duration)) { // max + // duration + // is + // gone + if (traceBuffer.isInvalid()) { + super.deliver(OUTPUT_PORT_NAME_TRACE_INVALID, + traceBuffer.toTraceEvents()); + } + else { + super.deliver(OUTPUT_PORT_NAME_TRACE_VALID, + traceBuffer.toTraceEvents()); + } + iterator.remove(); + } + } + } + + /** + * {@inheritDoc} + */ + @Override + public Configuration getCurrentConfiguration() { + final Configuration configuration = new Configuration(); + configuration.setProperty(CONFIG_PROPERTY_NAME_TIMEUNIT, + timeunit.name()); + configuration.setProperty(CONFIG_PROPERTY_NAME_MAX_TRACE_DURATION, + String.valueOf(maxTraceDuration)); + configuration.setProperty(CONFIG_PROPERTY_NAME_MAX_TRACE_TIMEOUT, + String.valueOf(maxTraceTimeout)); + return configuration; + } + + /** + * The TraceBuffer is synchronized to prevent problems with concurrent + * access. + * + * @author Jan Waller + */ + private static final class TraceBuffer { + private static final Log LOG = LogFactory + .getLog(TraceBuffer.class); + private static final Comparator<AbstractOperationEvent> COMPARATOR = new TraceOperationComperator(); + + private Trace trace; + private final SortedSet<AbstractOperationEvent> events = new TreeSet<AbstractOperationEvent>( + COMPARATOR); + + private boolean closeable; + private boolean damaged; + private int openEvents; + private int maxOrderIndex = -1; + + private long minLoggingTimestamp = Long.MAX_VALUE; + private long maxLoggingTimestamp = -1; + + private long traceId = -1; + + /** + * Creates a new instance of this class. + */ + public TraceBuffer() { + // default empty constructor + } + + public void insertEvent(final AbstractOperationEvent record) { + final long myTraceId = record.getTraceId(); + synchronized (this) { + if (traceId == -1) { + traceId = myTraceId; + } + else if (traceId != myTraceId) { + LOG.error("Invalid traceId! Expected: " + traceId + + " but found: " + myTraceId + " in event " + + record.toString()); + damaged = true; + } + final long loggingTimestamp = record.getLoggingTimestamp(); + if (loggingTimestamp > maxLoggingTimestamp) { + maxLoggingTimestamp = loggingTimestamp; + } + if (loggingTimestamp < minLoggingTimestamp) { + minLoggingTimestamp = loggingTimestamp; + } + final int orderIndex = record.getOrderIndex(); + if (orderIndex > maxOrderIndex) { + maxOrderIndex = orderIndex; + } + if (record instanceof BeforeOperationEvent) { + if (orderIndex == 0) { + closeable = true; + } + openEvents++; + } + else if (record instanceof AfterOperationEvent) { + openEvents--; + } + else if (record instanceof AfterFailedOperationEvent) { + openEvents--; + } + if (!events.add(record)) { + LOG.error("Duplicate entry for orderIndex " + orderIndex + + " with traceId " + myTraceId); + damaged = true; + } + } + } + + public void setTrace(final Trace trace) { + final long myTraceId = trace.getTraceId(); + synchronized (this) { + if (traceId == -1) { + traceId = myTraceId; + } + else if (traceId != myTraceId) { + LOG.error("Invalid traceId! Expected: " + traceId + + " but found: " + myTraceId + " in trace " + + trace.toString()); + damaged = true; + } + if (this.trace == null) { + this.trace = trace; + } + else { + LOG.error("Duplicate Trace entry for traceId " + myTraceId); + damaged = true; + } + } + } + + public boolean isFinished() { + synchronized (this) { + return closeable && !isInvalid(); + } + } + + public boolean isInvalid() { + synchronized (this) { + return (trace == null) + || damaged + || (openEvents != 0) + || (((maxOrderIndex + 1) != events.size()) || events + .isEmpty()); + } + } + + public TraceEventRecords toTraceEvents() { + synchronized (this) { + return new TraceEventRecords( + trace, + events.toArray(new AbstractOperationEvent[events.size()])); + } + } + + public long getMaxLoggingTimestamp() { + synchronized (this) { + return maxLoggingTimestamp; + } + } + + public long getMinLoggingTimestamp() { + synchronized (this) { + return minLoggingTimestamp; + } + } + + /** + * @author Jan Waller + */ + private static final class TraceOperationComperator implements + Comparator<AbstractOperationEvent> { + public TraceOperationComperator() {} + + public int compare(final AbstractOperationEvent o1, + final AbstractOperationEvent o2) { + return o1.getOrderIndex() - o2.getOrderIndex(); + } + } + } +} diff --git a/src/explorviz/hpc_monitoring/plugin/TraceEventRecordAggregationFilter.java b/src/explorviz/hpc_monitoring/plugin/TraceEventRecordAggregationFilter.java new file mode 100644 index 0000000000000000000000000000000000000000..8f088086c2a951b648895ef16d52e40e3f1e3e6c --- /dev/null +++ b/src/explorviz/hpc_monitoring/plugin/TraceEventRecordAggregationFilter.java @@ -0,0 +1,263 @@ +package explorviz.hpc_monitoring.plugin; + +import java.util.*; +import java.util.concurrent.TimeUnit; +import kieker.analysis.IProjectContext; +import kieker.analysis.plugin.annotation.*; +import kieker.analysis.plugin.filter.AbstractFilterPlugin; +import kieker.common.configuration.Configuration; +import explorviz.hpc_monitoring.record.TraceEventRecords; +import explorviz.hpc_monitoring.record.events.AbstractOperationEvent; + +/** + * This filter collects incoming traces for a specified amount of time. + * Any traces representing the same series of events will be used to calculate + * statistic informations like the average runtime of this kind of trace. + * Only one specimen of these traces containing this information will be + * forwarded + * from this filter. + * + * Statistic outliers regarding the runtime of the trace will be treated special + * and therefore send out as they are and will not be mixed with others. + * + * + * @author Florian Biss + * + * @since 1.8 + */ + +@Plugin(description = "This filter tries to aggregate similar TraceEventRecordss into a single trace.", outputPorts = { @OutputPort(name = TraceEventRecordAggregationFilter.OUTPUT_PORT_NAME_TRACES, description = "Output port for the processed traces", eventTypes = { TraceEventRecords.class }) }, configuration = { + @Property(name = TraceEventRecordAggregationFilter.CONFIG_PROPERTY_NAME_TIMEUNIT, defaultValue = TraceEventRecordAggregationFilter.CONFIG_PROPERTY_VALUE_TIMEUNIT), + @Property(name = TraceEventRecordAggregationFilter.CONFIG_PROPERTY_NAME_MAX_COLLECTION_DURATION, defaultValue = TraceEventRecordAggregationFilter.CONFIG_PROPERTY_VALUE_MAX_COLLECTION_DURATION), + @Property(name = TraceEventRecordAggregationFilter.CONFIG_PROPERTY_NAME_MAX_DEVIATION, defaultValue = TraceEventRecordAggregationFilter.CONFIG_PROPERTY_VALUE_MAX_DEVIATION) }) +public class TraceEventRecordAggregationFilter extends AbstractFilterPlugin { + /** + * The name of the output port delivering the valid traces. + */ + public static final String OUTPUT_PORT_NAME_TRACES = "tracesOut"; + + /** + * The name of the input port receiving the trace records. + */ + public static final String INPUT_PORT_NAME_TRACES = "tracesIn"; + + /** + * The name of the property determining the time unit. + */ + public static final String CONFIG_PROPERTY_NAME_TIMEUNIT = "timeunit"; + + /** + * Clock input for timeout handling. + */ + public static final String INPUT_PORT_NAME_TIME_EVENT = "timestamp"; + + /** + * The default value of the time unit property (nanoseconds). + */ + public static final String CONFIG_PROPERTY_VALUE_TIMEUNIT = "NANOSECONDS"; // TimeUnit.NANOSECONDS.name() + + /** + * The name of the property determining the maximal trace timeout. + */ + public static final String CONFIG_PROPERTY_NAME_MAX_COLLECTION_DURATION = "maxCollectionDuration"; + + /** + * The default value of the property determining the maximal trace timeout. + */ + public static final String CONFIG_PROPERTY_VALUE_MAX_COLLECTION_DURATION = "4000000000"; + + /** + * The name of the property determining the maximal runtime deviation + * factor. + * + * Outliers are indicated by + * <code>|runtime - averageRuntime| > deviationFactor * standardDeviation</code> + * . + * Use negative number to aggregate all traces. + */ + public static final String CONFIG_PROPERTY_NAME_MAX_DEVIATION = "maxDeviation"; + + /** + * The default value of the property determining the maximal runtime + * deviation factor. + * Default is two standard deviations. + */ + public static final String CONFIG_PROPERTY_VALUE_MAX_DEVIATION = "-1"; + + private final TimeUnit timeunit; + private final long maxCollectionDuration; + private final long maxDeviation; + + private final Map<TraceEventRecords, TraceAggregationBuffer> trace2buffer; + + public TraceEventRecordAggregationFilter(final Configuration configuration, + final IProjectContext projectContext) { + super(configuration, projectContext); + + final String recordTimeunitProperty = projectContext + .getProperty(IProjectContext.CONFIG_PROPERTY_NAME_RECORDS_TIME_UNIT); + TimeUnit recordTimeunit; + try { + recordTimeunit = TimeUnit.valueOf(recordTimeunitProperty); + } + catch (final IllegalArgumentException ex) { + recordTimeunit = TimeUnit.NANOSECONDS; + } + timeunit = recordTimeunit; + + maxDeviation = configuration + .getLongProperty(CONFIG_PROPERTY_NAME_MAX_DEVIATION); + + maxCollectionDuration = timeunit.convert(configuration + .getLongProperty(CONFIG_PROPERTY_NAME_MAX_COLLECTION_DURATION), + timeunit); + trace2buffer = new TreeMap<TraceEventRecords, TraceAggregationBuffer>( + new TraceComperator()); + } + + @InputPort(name = INPUT_PORT_NAME_TRACES, description = "Collect identical traces and aggregate them.", eventTypes = { TraceEventRecords.class }) + public void newEvent(final TraceEventRecords event) { + synchronized (this) { + insertIntoBuffer(event); + } + } + + private void insertIntoBuffer(final TraceEventRecords trace) { + TraceAggregationBuffer traceBuffer; + traceBuffer = trace2buffer.get(trace); + + if (traceBuffer == null) { // first record for this id! + synchronized (this) { + traceBuffer = trace2buffer.get(trace); + + if (traceBuffer == null) { // NOCS (DCL) + traceBuffer = new TraceAggregationBuffer(System.nanoTime()); + trace2buffer.put(trace, traceBuffer); + } + + } + } + synchronized (this) { + traceBuffer.insertTrace(trace); + } + } + + @InputPort(name = INPUT_PORT_NAME_TIME_EVENT, description = "Time signal for timeouts", eventTypes = { Long.class }) + public void newEvent(final Long timestamp) { + synchronized (this) { + processTimeoutQueue(timestamp); + } + } + + /** + * {@inheritDoc} + */ + @Override + public void terminate(final boolean error) { + synchronized (this) { + for (final TraceAggregationBuffer traceBuffer : trace2buffer + .values()) { + super.deliver(OUTPUT_PORT_NAME_TRACES, + traceBuffer.getAggregatedTrace()); + } + trace2buffer.clear(); + } + } + + private void processTimeoutQueue(final long timestamp) { + final long bufferTimeout = timestamp - maxCollectionDuration; + List<TraceEventRecords> toRemove = new ArrayList<TraceEventRecords>(); + for (final TraceAggregationBuffer traceBuffer : trace2buffer.values()) { + if (traceBuffer.getBufferCreatedTimestamp() <= bufferTimeout) { + TraceEventRecords aggregatedTrace = traceBuffer + .getAggregatedTrace(); + super.deliver(OUTPUT_PORT_NAME_TRACES, aggregatedTrace); + toRemove.add(aggregatedTrace); + } + } + for (TraceEventRecords traceEventRecords : toRemove) { + trace2buffer.remove(traceEventRecords); + } + } + + /** + * {@inheritDoc} + */ + @Override + public Configuration getCurrentConfiguration() { + final Configuration configuration = new Configuration(); + configuration.setProperty(CONFIG_PROPERTY_NAME_TIMEUNIT, + timeunit.name()); + configuration.setProperty(CONFIG_PROPERTY_NAME_MAX_COLLECTION_DURATION, + String.valueOf(maxCollectionDuration)); + configuration.setProperty(CONFIG_PROPERTY_NAME_MAX_DEVIATION, + String.valueOf(maxDeviation)); + return configuration; + } + + private static final class TraceAggregationBuffer { + private TraceEventRecords accumulator; + + private final long bufferCreatedTimestamp; + + public TraceAggregationBuffer(final long bufferCreatedTimestamp) { + this.bufferCreatedTimestamp = bufferCreatedTimestamp; + } + + public long getBufferCreatedTimestamp() { + return bufferCreatedTimestamp; + } + + public TraceEventRecords getAggregatedTrace() { + return accumulator; + } + + public void insertTrace(final TraceEventRecords trace) { + aggregate(trace); + } + + private void aggregate(final TraceEventRecords trace) { + if (accumulator == null) { + accumulator = trace; + } + else { + final AbstractOperationEvent[] aggregatedRecords = accumulator + .getTraceOperations(); + final AbstractOperationEvent[] records = trace + .getTraceOperations(); + for (int i = 0; i < aggregatedRecords.length; i++) { + aggregatedRecords[i].getRuntime().merge( + records[i].getRuntime()); + } + + accumulator.getRuntime().merge(trace.getRuntime()); + } + } + } + + private static final class TraceComperator implements + Comparator<TraceEventRecords> { + + public TraceComperator() {} + + public int compare(final TraceEventRecords t1, + final TraceEventRecords t2) { + final AbstractOperationEvent[] recordsT1 = t1.getTraceOperations(); + final AbstractOperationEvent[] recordsT2 = t2.getTraceOperations(); + + if ((recordsT1.length - recordsT2.length) != 0) { + return recordsT1.length - recordsT2.length; + } + + final int cmpHostnames = t1.getTrace().getHostname() + .compareTo(t2.getTrace().getHostname()); + if (cmpHostnames != 0) { + return cmpHostnames; + } + + // TODO deep check records + return 0; + } + } + +} diff --git a/src/explorviz/hpc_monitoring/reader/RabbitMQReader.java b/src/explorviz/hpc_monitoring/reader/RabbitMQReader.java new file mode 100644 index 0000000000000000000000000000000000000000..58ffd94b17e250c19aa3bc757e47e12c16697a0e --- /dev/null +++ b/src/explorviz/hpc_monitoring/reader/RabbitMQReader.java @@ -0,0 +1,426 @@ +/*************************************************************************** + * Copyright 2013 Kieker Project (http://kieker-monitoring.net) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ***************************************************************************/ + +package explorviz.hpc_monitoring.reader; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.zip.Inflater; +import kieker.analysis.IProjectContext; +import kieker.analysis.plugin.annotation.*; +import kieker.analysis.plugin.reader.AbstractReaderPlugin; +import kieker.common.configuration.Configuration; +import kieker.common.logging.Log; +import kieker.common.logging.LogFactory; +import kieker.common.record.IMonitoringRecord; +import com.rabbitmq.client.*; +import explorviz.hpc_monitoring.Bits; +import explorviz.hpc_monitoring.record.Trace; +import explorviz.hpc_monitoring.record.events.*; + +/** + * Reads monitoring records from the queue of an established RabbitMQ + * connection. + * + * @author Santje Finke + * + * @since 1.8 + */ +@Plugin(description = "A reader which reads records from a RabbitMQ queue", dependencies = "This plugin needs the file 'rabbitmq.client-*.jar'.", outputPorts = { @OutputPort(name = RabbitMQReader.OUTPUT_PORT_NAME_RECORDS, eventTypes = { IMonitoringRecord.class }, description = "Output Port of the JMSReader") }, configuration = { + @Property(name = RabbitMQReader.CONFIG_PROPERTY_NAME_PROVIDERURL, defaultValue = "localhost"), + @Property(name = RabbitMQReader.CONFIG_PROPERTY_NAME_QUEUE, defaultValue = "kieker"), + @Property(name = RabbitMQReader.CONFIG_PROPERTY_PASSWORD, defaultValue = "guest"), + @Property(name = RabbitMQReader.CONFIG_PROPERTY_USER, defaultValue = "guest"), + @Property(name = RabbitMQReader.CONFIG_PROPERTY_PORT, defaultValue = "5672") + +}) +public final class RabbitMQReader extends AbstractReaderPlugin { + + /** The name of the output port delivering the received records. */ + public static final String OUTPUT_PORT_NAME_RECORDS = "monitoringRecords"; + /** The name of the configuration determining the RabbitMQ provider URL. */ + public static final String CONFIG_PROPERTY_NAME_PROVIDERURL = "mqProviderUrl"; + /** + * The name of the configuration determining the RabbitMQ Queue (e.g. + * queue1). + */ + public static final String CONFIG_PROPERTY_NAME_QUEUE = "mqDestination"; + /** The username that is used to connect to a queue. */ + public static final String CONFIG_PROPERTY_USER = "guest"; + /** The password that is used to connect to a queue. */ + public static final String CONFIG_PROPERTY_PASSWORD = "guest"; + /** The port that is used to connect to a queue. */ + public static final String CONFIG_PROPERTY_PORT = "5672"; + + static final Log LOG = LogFactory + .getLog(RabbitMQReader.class); // NOPMD + // package + // for + // inner + // class + + private final String providerUrl; + private final String queueName; + private final String password; + private final String username; + private final int port; + private Connection connection; + private Channel channel; + private ConnectionFactory factory; + private QueueingConsumer normalConsumer; + + private final CountDownLatch cdLatch = new CountDownLatch( + 1); + + private final Map<Integer, String> stringRegistry = new ConcurrentHashMap<Integer, String>( + 16, + 0.75f, + 2); + + private final RabbitMQRegistryConsumer registryConsumer; + + /** + * Creates a new instance of this class using the given parameters. + * + * @param configuration + * The configuration used to initialize the whole reader. Keep in + * mind that the configuration should contain the following + * properties: + * <ul> + * <li>The property {@link #CONFIG_PROPERTY_NAME_PROVIDERURL}, + * e.g. {@code localhost} + * <li>The property {@link #CONFIG_PROPERTY_NAME_QUEUE}, e.g. + * {@code queue1} + * <li>The property {@link #CONFIG_PROPERTY_PASSWORD}, e.g. + * {@code password} + * <li>The property {@link #CONFIG_PROPERTY_USER}, e.g. + * {@code username} + * <li>The property {@link #CONFIG_PROPERTY_PORT}, e.g. + * {@code port} + * </ul> + * @param projectContext + * The project context for this component. + * + * @throws IllegalArgumentException + * If one of the properties is empty. + */ + public RabbitMQReader(final Configuration configuration, + final IProjectContext projectContext) + throws IllegalArgumentException { + super(configuration, projectContext); + + // Initialize the reader bases on the given configuration. + providerUrl = configuration + .getStringProperty(CONFIG_PROPERTY_NAME_PROVIDERURL); + queueName = configuration.getStringProperty(CONFIG_PROPERTY_NAME_QUEUE); + username = configuration.getStringProperty(CONFIG_PROPERTY_USER); + password = configuration.getStringProperty(CONFIG_PROPERTY_PASSWORD); + port = configuration.getIntProperty(CONFIG_PROPERTY_PORT); + // simple sanity check + if ((providerUrl.length() == 0) || (queueName.length() == 0)) { + throw new IllegalArgumentException( + "RabbitMQReader has not sufficient parameters. providerUrl ('" + + providerUrl + "') or destination ('" + queueName + + "'), is null"); + } + + registryConsumer = new RabbitMQRegistryConsumer(this, providerUrl, + "registryRecords", username, password, port); + registryConsumer.start(); + } + + /** + * Creates a new instance of this class using the given parameters. + * + * @param configuration + * The configuration used to initialize the whole reader. Keep in + * mind that the configuration should contain the following + * properties: + * <ul> + * <li>The property {@link #CONFIG_PROPERTY_NAME_PROVIDERURL}, + * e.g. {@code localhost} + * <li>The property {@link #CONFIG_PROPERTY_NAME_QUEUE}, e.g. + * {@code queue1} + * <li>The property {@link #CONFIG_PROPERTY_PASSWORD}, e.g. + * {@code password} + * <li>The property {@link #CONFIG_PROPERTY_USER}, e.g. + * {@code username} + * </ul> + * + * @throws IllegalArgumentException + * If one of the properties is empty. + * + * @deprecated To be removed in Kieker 1.8. + */ + @Deprecated + public RabbitMQReader(final Configuration configuration) + throws IllegalArgumentException { + this(configuration, null); + } + + /** + * A call to this method is a blocking call. + * + * @return true if the method succeeds, false otherwise. + */ + public boolean read() { + boolean retVal = true; + try { + createConnectionFactory(); + connect(); + while (!Thread.interrupted()) { + + if (!connection.isOpen() || !channel.isOpen()) { + reconnect(); + } + + final QueueingConsumer.Delivery delivery = normalConsumer + .nextDelivery(); + + byte[] batchedMessages = delivery.getBody(); + messagesfromByteArray(batchedMessages); + + normalConsumer.getChannel().basicAck( + delivery.getEnvelope().getDeliveryTag(), false); + } + } + catch (final IOException ex) { // NOPMD NOCS (IllegalCatchCheck) + LOG.error("Error in read()", ex); + retVal = false; + } + catch (final ShutdownSignalException e) { + LOG.error("Error in read(): ShutdownSignal Exception", e); + retVal = false; + } + catch (final ConsumerCancelledException e) { + LOG.error("Error in read(): ConsumerCancelled Exception", e); + retVal = false; + } + catch (final InterruptedException e) { + LOG.error("Error in read(): Interrupted Exception", e); + retVal = false; + } + finally { + try { + if (connection != null) { + connection.close(); + } + } + catch (final IOException e) { + LOG.error("Error in read()", e); + } + } + return retVal; + } + + public byte[] decompressByteArray(byte[] bytes) { + + ByteArrayOutputStream baos = null; + Inflater iflr = new Inflater(); + iflr.setInput(bytes); + baos = new ByteArrayOutputStream(); + byte[] tmp = new byte[4096]; + try { + while (!iflr.finished()) { + int size = iflr.inflate(tmp); + baos.write(tmp, 0, size); + } + } + catch (Exception ex) { + + } + finally { + try { + if (baos != null) { + baos.close(); + } + } + catch (Exception ex) {} + } + + return baos.toByteArray(); + } + + private void createConnectionFactory() { + factory = new ConnectionFactory(); + factory.setHost(providerUrl); + factory.setPort(port); + factory.setConnectionTimeout(0); + factory.setUsername(username); + factory.setPassword(password); + } + + private void connect() throws IOException { + connection = factory.newConnection(); + channel = connection.createChannel(); + + channel.queueDeclare(queueName, false, false, false, null); + + normalConsumer = new QueueingConsumer(channel); + channel.basicConsume(queueName, false, normalConsumer); + channel.basicQos(50); + } + + private void reconnect() { + try { + connect(); + } + catch (final IOException e) { + RabbitMQReader.LOG.error("Error reestablishing connection", e); + } + } + + private void messagesfromByteArray(final byte[] b) { + int offset = 0; + + int firstValue; + while ((firstValue = Bits.getInt(b, offset)) != -1) { + offset += 4; + + IMonitoringRecord record = null; + + switch (firstValue) { + case 0: { + final long traceId = Bits.getLong(b, offset); + offset += 8; + final Integer hostnameId = Bits.getInt(b, offset); + offset += 4; + final long parentTraceId = Bits.getLong(b, offset); + offset += 8; + final int parentOrderId = Bits.getInt(b, offset); + offset += 4; + offset += 4; // dummy for nextOrderIndex + + record = new Trace(traceId, + getStringFromRegistry(hostnameId), parentTraceId, + parentOrderId); + break; + } + case 1: { + final long timestamp = Bits.getLong(b, offset); + offset += 8; + final long traceId = Bits.getLong(b, offset); + offset += 8; + final int orderIndex = Bits.getInt(b, offset); + offset += 4; + final Integer operationId = Bits.getInt(b, offset); + offset += 4; + + record = new BeforeOperationEvent(timestamp, traceId, + orderIndex, getStringFromRegistry(operationId)); + break; + } + case 2: { + final long timestamp = Bits.getLong(b, offset); + offset += 8; + final long traceId = Bits.getLong(b, offset); + offset += 8; + final int orderIndex = Bits.getInt(b, offset); + offset += 4; + final Integer operationId = Bits.getInt(b, offset); + offset += 4; + final Integer causeId = Bits.getInt(b, offset); + offset += 4; + + record = new AfterFailedOperationEvent(timestamp, traceId, + orderIndex, getStringFromRegistry(operationId), + getStringFromRegistry(causeId)); + break; + } + case 3: { + final long timestamp = Bits.getLong(b, offset); + offset += 8; + final long traceId = Bits.getLong(b, offset); + offset += 8; + final int orderIndex = Bits.getInt(b, offset); + offset += 4; + final Integer operationId = Bits.getInt(b, offset); + offset += 4; + + record = new AfterOperationEvent(timestamp, traceId, + orderIndex, getStringFromRegistry(operationId)); + break; + } + default: { + LOG.error("unknown class id " + firstValue); + } + } + + if (!super.deliver(OUTPUT_PORT_NAME_RECORDS, record)) { + LOG.error("deliverRecord returned false"); + } + } + } + + final void unblock() { // NOPMD (package visible for inner class) + cdLatch.countDown(); + } + + /** + * {@inheritDoc} + */ + public void terminate(final boolean error) { + LOG.info("Shutdown of RabbitMQReader requested."); + registryConsumer.interrupt(); + unblock(); + } + + /** + * {@inheritDoc} + */ + @Override + public Configuration getCurrentConfiguration() { + final Configuration configuration = new Configuration(); + + configuration + .setProperty(CONFIG_PROPERTY_NAME_PROVIDERURL, providerUrl); + configuration.setProperty(CONFIG_PROPERTY_NAME_QUEUE, queueName); + configuration.setProperty(CONFIG_PROPERTY_USER, username); + configuration.setProperty(CONFIG_PROPERTY_PASSWORD, password); + + return configuration; + } + + public void addToRegistry(Integer key, String value) { + stringRegistry.put(key, value); + System.out.println(key + " " + value); + + synchronized (this) { + notifyAll(); + } + } + + private String getStringFromRegistry(Integer id) { + String result = stringRegistry.get(id); + while (result == null) { + try { + synchronized (this) { + System.out.println("waiting for " + id); + this.wait(); + } + } + catch (InterruptedException e) { + e.printStackTrace(); + } + result = stringRegistry.get(id); + } + + return result; + } +} diff --git a/src/explorviz/hpc_monitoring/reader/RabbitMQRegistryConsumer.java b/src/explorviz/hpc_monitoring/reader/RabbitMQRegistryConsumer.java new file mode 100644 index 0000000000000000000000000000000000000000..58d55ad8daecebab0245b39a12cc8d3ea77aa1ea --- /dev/null +++ b/src/explorviz/hpc_monitoring/reader/RabbitMQRegistryConsumer.java @@ -0,0 +1,127 @@ +package explorviz.hpc_monitoring.reader; + +import java.io.*; +import kieker.common.logging.Log; +import kieker.common.logging.LogFactory; +import com.rabbitmq.client.*; +import explorviz.hpc_monitoring.StringRegistryRecord; + +public class RabbitMQRegistryConsumer extends Thread { + + static final Log LOG = LogFactory.getLog(RabbitMQReader.class); // NOPMD + // package + // for + // inner + // class + + private final RabbitMQReader parent; + + private Connection connection; + private Channel channel; + private ConnectionFactory factory; + private QueueingConsumer registryConsumer; + + private final String providerUrl; + private final String queueName; + private final String password; + private final String username; + private final int port; + + public RabbitMQRegistryConsumer(RabbitMQReader parent, String providerUrl, + String queueName, String password, String username, int port) { + this.parent = parent; + this.providerUrl = providerUrl; + this.queueName = queueName; + this.password = password; + this.username = username; + this.port = port; + + } + + @Override + public void run() { + try { + createConnectionFactory(); + + connect(); + + while (!Thread.interrupted()) { + + if (!connection.isOpen() || !channel.isOpen()) { + reconnect(); + } + + final QueueingConsumer.Delivery delivery = registryConsumer + .nextDelivery(); + + final Object message = readObjectFromBytes(delivery.getBody()); + + if (message instanceof StringRegistryRecord) { + final StringRegistryRecord record = (StringRegistryRecord) message; + parent.addToRegistry(record.getId(), record.getObject()); + } + } + } + catch (final IOException ex) { // NOPMD NOCS (IllegalCatchCheck) + LOG.error("Error in read()", ex); + } + catch (final ClassNotFoundException e) { + LOG.error("Error in read(): ClassNotFound Exception", e); + } + catch (final ShutdownSignalException e) { + LOG.error("Error in read(): ShutdownSignal Exception", e); + } + catch (final ConsumerCancelledException e) { + LOG.error("Error in read(): ConsumerCancelled Exception", e); + } + catch (final InterruptedException e) { + LOG.error("Error in read(): Interrupted Exception", e); + } + finally { + try { + if (connection != null) { + connection.close(); + } + } + catch (final IOException e) { + LOG.error("Error in read()", e); + } + } + } + + private void createConnectionFactory() { + factory = new ConnectionFactory(); + factory.setHost(providerUrl); + factory.setPort(port); + factory.setConnectionTimeout(0); + factory.setUsername(username); + factory.setPassword(password); + } + + private void connect() throws IOException { + connection = factory.newConnection(); + channel = connection.createChannel(); + + channel.queueDeclare(queueName, false, false, false, null); + + registryConsumer = new QueueingConsumer(channel); + channel.basicConsume(queueName, true, registryConsumer); + } + + private void reconnect() { + try { + connect(); + } + catch (final IOException e) { + RabbitMQReader.LOG.error("Error reestablishing connection", e); + } + } + + private Object readObjectFromBytes(final byte[] bs) throws IOException, + ClassNotFoundException { + final ByteArrayInputStream bain = new ByteArrayInputStream(bs); + final ObjectInputStream in = new ObjectInputStream(bain); + final Object message = in.readObject(); + return message; + } +} diff --git a/src/explorviz/hpc_monitoring/record/RuntimeStatisticInformation.java b/src/explorviz/hpc_monitoring/record/RuntimeStatisticInformation.java new file mode 100644 index 0000000000000000000000000000000000000000..caa824c6f1f1d6096aeac0002649d7cfcfa6eb5d --- /dev/null +++ b/src/explorviz/hpc_monitoring/record/RuntimeStatisticInformation.java @@ -0,0 +1,122 @@ +/*************************************************************************** + * Copyright 2013 Kieker Project (http://kieker-monitoring.net) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ***************************************************************************/ + +package explorviz.hpc_monitoring.record; + +import java.io.Serializable; + +/** + * Provides methods to calculate the minimum, maximum, average and the standard + * deviation of numerical data. + * + * @author Florian Biss + * + * @since 1.8 + */ +public final class RuntimeStatisticInformation implements Serializable { + + private static final long serialVersionUID = -1628273045707598143L; + + private long count = 0; + private long min = Long.MAX_VALUE; + private long max = Long.MIN_VALUE; + // Using doubles to handle numbers above Integer.MAX_VALUE. Overflows are + // much worse than losing precision here. + private double sum; + private double squareSum; + + public RuntimeStatisticInformation(final long runtime) { + set(runtime); + } + + public long getCount() { + return count; + } + + public long getMin() { + return min; + } + + public long getMax() { + return max; + } + + public long getAvg() { + if (count > 0) { + return (long) (sum / count); + } + else { + return -1; + } + + } + + public long getStandardDeviation() { + if (count <= 2) { + return -1; + } + else { + final double variance = (squareSum - ((sum * sum) / count)) + / (count - 1); + return (long) Math.sqrt(variance); + } + + } + + public double getSum() { + return sum; + } + + public double getSquareSum() { + return squareSum; + } + + public void insert(final long data) { + + count++; + final double dataDouble = data; + sum += dataDouble; + squareSum += dataDouble * dataDouble; + min = Math.min(data, min); + max = Math.max(data, max); + + } + + public void set(final long data) { + count = 1; + // final double dataDouble = data; + // sum = dataDouble; + // squareSum = dataDouble * dataDouble; + // max = data; + // min = data; + } + + public void merge(final RuntimeStatisticInformation statistics) { + + count += statistics.getCount(); + // sum += statistics.getSum(); + // squareSum += statistics.getSquareSum(); + // min = Math.min(statistics.getMin(), min); + // max = Math.max(statistics.getMax(), max); + + } + + @Override + public String toString() { + return count + ":" + min + ":" + max + ":" + sum + ":" + squareSum; + } + +} diff --git a/src/explorviz/hpc_monitoring/record/Trace.java b/src/explorviz/hpc_monitoring/record/Trace.java new file mode 100644 index 0000000000000000000000000000000000000000..f70a3b65f3c750a3118328bd8bdd40434ced3140 --- /dev/null +++ b/src/explorviz/hpc_monitoring/record/Trace.java @@ -0,0 +1,67 @@ +package explorviz.hpc_monitoring.record; + +import kieker.common.record.IMonitoringRecord; +import kieker.common.record.flow.IFlowRecord; + +public class Trace implements IFlowRecord { + private static final long serialVersionUID = 4028885353683254444L; + + private final long traceId; + private final String hostname; + private final long parentTraceId; + private final int parentOrderId; + + public Trace(long traceId, String hostname, long parentTraceId, + int parentOrderId) { + this.traceId = traceId; + this.hostname = hostname; + this.parentTraceId = parentTraceId; + this.parentOrderId = parentOrderId; + } + + @Override + public long getLoggingTimestamp() { + throw new UnsupportedOperationException(); + } + + @Override + public Class<?>[] getValueTypes() { + throw new UnsupportedOperationException(); + } + + @Override + public void initFromArray(Object[] arg0) { + throw new UnsupportedOperationException(); + } + + @Override + public void setLoggingTimestamp(long arg0) { + throw new UnsupportedOperationException(); + } + + @Override + public Object[] toArray() { + throw new UnsupportedOperationException(); + } + + @Override + public int compareTo(IMonitoringRecord o) { + throw new UnsupportedOperationException(); + } + + public long getTraceId() { + return traceId; + } + + public String getHostname() { + return hostname; + } + + public long getParentTraceId() { + return parentTraceId; + } + + public int getParentOrderId() { + return parentOrderId; + } +} diff --git a/src/explorviz/hpc_monitoring/record/TraceEventRecords.java b/src/explorviz/hpc_monitoring/record/TraceEventRecords.java new file mode 100644 index 0000000000000000000000000000000000000000..a42bab49d59adcc37cd127e8dfa5621eee5469c0 --- /dev/null +++ b/src/explorviz/hpc_monitoring/record/TraceEventRecords.java @@ -0,0 +1,113 @@ +/*************************************************************************** + * Copyright 2013 Kieker Project (http://kieker-monitoring.net) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ***************************************************************************/ + +package explorviz.hpc_monitoring.record; + +import java.io.Serializable; +import java.util.Arrays; +import explorviz.hpc_monitoring.record.events.AbstractOperationEvent; + +public final class TraceEventRecords implements Serializable { + private static final long serialVersionUID = 8589405631073291022L; + + private final Trace trace; + private final AbstractOperationEvent[] traceEvents; + private final RuntimeStatisticInformation runtimeInformation; + + /** + * Creates a new instance of this class using the given parameters. + * + * @param trace + * The trace to be stored in this object. + * @param traceEvents + * The trace events to be stored in this object. + */ + public TraceEventRecords(final Trace trace, + final AbstractOperationEvent[] traceEvents) { // NOPMD (stored + // directly) + this.trace = trace; + this.traceEvents = traceEvents; + runtimeInformation = new RuntimeStatisticInformation(1); // TODO + } + + public Trace getTrace() { + return trace; + } + + public AbstractOperationEvent[] getTraceOperations() { + return traceEvents; // NOPMD (internal array exposed) + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder(64); + sb.append(super.toString()); + sb.append("\n\tTrace: "); + sb.append(trace); + for (final AbstractOperationEvent traceEvent : traceEvents) { + sb.append("\n\t"); + sb.append(traceEvent.getClass().getSimpleName()); + sb.append(": "); + sb.append(traceEvent); + } + sb.append("\n\t"); + sb.append(runtimeInformation.getClass().getSimpleName()); + sb.append(": "); + sb.append(runtimeInformation); + sb.append('\n'); + return sb.toString(); + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = (prime * result) + ((trace == null) ? 0 : trace.hashCode()); // NOCS + // (?:) + result = (prime * result) + Arrays.hashCode(traceEvents); + return result; + } + + @Override + public boolean equals(final Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (this.getClass() != obj.getClass()) { + return false; + } + final TraceEventRecords other = (TraceEventRecords) obj; + if (trace == null) { + if (other.trace != null) { + return false; + } + } + else if (!trace.equals(other.trace)) { + return false; + } + if (!Arrays.equals(traceEvents, other.traceEvents)) { + return false; + } + return true; + } + + public RuntimeStatisticInformation getRuntime() { + return runtimeInformation; + } +} diff --git a/src/explorviz/hpc_monitoring/record/events/AbstractOperationEvent.java b/src/explorviz/hpc_monitoring/record/events/AbstractOperationEvent.java new file mode 100644 index 0000000000000000000000000000000000000000..28953eff4bc6748c16fe9bef951c7f054773b85e --- /dev/null +++ b/src/explorviz/hpc_monitoring/record/events/AbstractOperationEvent.java @@ -0,0 +1,70 @@ +package explorviz.hpc_monitoring.record.events; + +import kieker.common.record.IMonitoringRecord; +import kieker.common.record.flow.IFlowRecord; +import explorviz.hpc_monitoring.record.RuntimeStatisticInformation; + +public class AbstractOperationEvent implements IFlowRecord { + private static final long serialVersionUID = 1224383944280820758L; + private final long timestamp; + private final long traceId; + private final int orderIndex; + private final String operationSignature; + private final RuntimeStatisticInformation runtimeStatisticInformation; + + public AbstractOperationEvent(long timestamp, long traceId, int orderIndex, + String operationSignature) { + super(); + this.timestamp = timestamp; + this.traceId = traceId; + this.orderIndex = orderIndex; + this.operationSignature = operationSignature; + runtimeStatisticInformation = new RuntimeStatisticInformation(timestamp); + } + + @Override + public long getLoggingTimestamp() { + return timestamp; + } + + @Override + public Class<?>[] getValueTypes() { + throw new UnsupportedOperationException(); + } + + @Override + public void initFromArray(Object[] arg0) { + throw new UnsupportedOperationException(); + } + + @Override + public void setLoggingTimestamp(long arg0) { + throw new UnsupportedOperationException(); + } + + @Override + public Object[] toArray() { + throw new UnsupportedOperationException(); + } + + @Override + public int compareTo(IMonitoringRecord o) { + throw new UnsupportedOperationException(); + } + + public long getTraceId() { + return traceId; + } + + public int getOrderIndex() { + return orderIndex; + } + + public String getOperationSignature() { + return operationSignature; + } + + public RuntimeStatisticInformation getRuntime() { + return runtimeStatisticInformation; + } +} diff --git a/src/explorviz/hpc_monitoring/record/events/AfterFailedOperationEvent.java b/src/explorviz/hpc_monitoring/record/events/AfterFailedOperationEvent.java new file mode 100644 index 0000000000000000000000000000000000000000..d78b9a0bccc5d79b78fed45d7fb4e8e0d2094668 --- /dev/null +++ b/src/explorviz/hpc_monitoring/record/events/AfterFailedOperationEvent.java @@ -0,0 +1,18 @@ +package explorviz.hpc_monitoring.record.events; + +public class AfterFailedOperationEvent extends AbstractOperationEvent { + private static final long serialVersionUID = -7488112271381329123L; + + private final String cause; + + public AfterFailedOperationEvent(long timestamp, long traceId, + int orderIndex, String operationSignature, String cause) { + super(timestamp, traceId, orderIndex, operationSignature); + + this.cause = cause; + } + + public String getCause() { + return cause; + } +} diff --git a/src/explorviz/hpc_monitoring/record/events/AfterOperationEvent.java b/src/explorviz/hpc_monitoring/record/events/AfterOperationEvent.java new file mode 100644 index 0000000000000000000000000000000000000000..4ccd1c6a0d6b17274226d262931a25013e3c2e74 --- /dev/null +++ b/src/explorviz/hpc_monitoring/record/events/AfterOperationEvent.java @@ -0,0 +1,10 @@ +package explorviz.hpc_monitoring.record.events; + +public class AfterOperationEvent extends AbstractOperationEvent { + private static final long serialVersionUID = 6136529395371343882L; + + public AfterOperationEvent(long timestamp, long traceId, int orderIndex, + String operationSignature) { + super(timestamp, traceId, orderIndex, operationSignature); + } +} diff --git a/src/explorviz/hpc_monitoring/record/events/BeforeOperationEvent.java b/src/explorviz/hpc_monitoring/record/events/BeforeOperationEvent.java new file mode 100644 index 0000000000000000000000000000000000000000..99cd35081b58f08c3a2581691bbf61254488c3be --- /dev/null +++ b/src/explorviz/hpc_monitoring/record/events/BeforeOperationEvent.java @@ -0,0 +1,10 @@ +package explorviz.hpc_monitoring.record.events; + +public class BeforeOperationEvent extends AbstractOperationEvent { + private static final long serialVersionUID = 1465085019211054059L; + + public BeforeOperationEvent(long timestamp, long traceId, int orderIndex, + String operationSignature) { + super(timestamp, traceId, orderIndex, operationSignature); + } +} diff --git a/src/explorviz/worker/main/WorkerController.xtend b/src/explorviz/worker/main/WorkerController.xtend index 5611f7d3cc2cc1627f6a760817ca697bbb5a7c69..926da1644a959e5788bdeb74a4bb2b8f680db23b 100644 --- a/src/explorviz/worker/main/WorkerController.xtend +++ b/src/explorviz/worker/main/WorkerController.xtend @@ -3,39 +3,64 @@ package explorviz.worker.main import kieker.analysis.AnalysisController import kieker.common.configuration.Configuration import kieker.analysis.IAnalysisController -import kieker.analysis.plugin.reader.mq.RabbitMQReader -import kieker.analysis.plugin.filter.forward.CountingFilter import kieker.analysis.plugin.filter.forward.TeeFilter import kieker.analysis.plugin.filter.forward.CountingThroughputFilter -import kieker.analysis.plugin.filter.flow.EventRecordTraceReconstructionFilter + +import explorviz.hpc_monitoring.reader.RabbitMQReader +import kieker.analysis.plugin.reader.timer.TimeReader +import explorviz.hpc_monitoring.plugin.EventRecordTraceReconstructionFilter +import explorviz.hpc_monitoring.plugin.TraceEventRecordAggregationFilter +import explorviz.hpc_monitoring.connector.RabbitMQConnector class WorkerController { - - var IAnalysisController analysisInstance - - def start() { + + var IAnalysisController analysisInstance + + def start() { analysisInstance = new AnalysisController() val rabbitMQ = initRabbitMQ() - - val config = new Configuration() -// config.setProperty(CountingThroughputFilter::CONFIG_PROPERTY_NAME_INTERVAL_SIZE, "1000000000") -// val countingFilter = new CountingThroughputFilter(config, analysisInstance) - val teeFilter = initTeeFilter() - - val eventTraceReconstructionFilter = - new EventRecordTraceReconstructionFilter(config, analysisInstance); - - analysisInstance.connect(rabbitMQ, RabbitMQReader::OUTPUT_PORT_NAME_RECORDS, eventTraceReconstructionFilter, EventRecordTraceReconstructionFilter::INPUT_PORT_NAME_TRACE_RECORDS) - analysisInstance.connect(eventTraceReconstructionFilter, EventRecordTraceReconstructionFilter::OUTPUT_PORT_NAME_TRACE_VALID, teeFilter, TeeFilter::INPUT_PORT_NAME_EVENTS) - - try { - analysisInstance.run() + val eventTraceReconstructionFilter = initEventRecordTraceReconstructionFilter() + val aggregationFilter = initAggregationFilter() + val timer = initTimer() +// val countingThroughputFilter = initCountingThroughputFilter() +// val teeFilter = initTeeFilter() + val rabbitMQConnector = initRabbitMQConnector() + + analysisInstance.connect(rabbitMQ, RabbitMQReader::OUTPUT_PORT_NAME_RECORDS, eventTraceReconstructionFilter, + EventRecordTraceReconstructionFilter::INPUT_PORT_NAME_TRACE_RECORDS) + +// analysisInstance.connect(eventTraceReconstructionFilter, +// EventRecordTraceReconstructionFilter::OUTPUT_PORT_NAME_TRACE_VALID, countingThroughputFilter, +// CountingThroughputFilter::INPUT_PORT_NAME_OBJECTS) + + analysisInstance.connect(eventTraceReconstructionFilter, + EventRecordTraceReconstructionFilter::OUTPUT_PORT_NAME_TRACE_VALID, aggregationFilter, + TraceEventRecordAggregationFilter::INPUT_PORT_NAME_TRACES) + + analysisInstance.connect(eventTraceReconstructionFilter, + EventRecordTraceReconstructionFilter::OUTPUT_PORT_NAME_TRACE_INVALID, rabbitMQConnector, + RabbitMQConnector::INPUT_PORT_NAME_INVALID_TRACES) + + analysisInstance.connect(timer, + TimeReader::OUTPUT_PORT_NAME_TIMESTAMPS, aggregationFilter, + TraceEventRecordAggregationFilter::INPUT_PORT_NAME_TIME_EVENT) + +// analysisInstance.connect(aggregationFilter, +// TraceEventRecordAggregationFilter::OUTPUT_PORT_NAME_TRACES, teeFilter, +// TeeFilter::INPUT_PORT_NAME_EVENTS) + + analysisInstance.connect(aggregationFilter, + TraceEventRecordAggregationFilter::OUTPUT_PORT_NAME_TRACES, rabbitMQConnector, + RabbitMQConnector::INPUT_PORT_NAME_VALID_TRACES) + + try { + analysisInstance.run() } catch (Exception e) { - e.printStackTrace + e.printStackTrace } } - + def initRabbitMQ() { val rabbitConfig = new Configuration() rabbitConfig.setProperty(RabbitMQReader::CONFIG_PROPERTY_NAME_PROVIDERURL, "localhost") @@ -43,13 +68,38 @@ class WorkerController { new RabbitMQReader(rabbitConfig, analysisInstance) } - def initCountingFilter() { - val config = new Configuration() - new CountingFilter(config, analysisInstance) - } - - def initTeeFilter() { - val config = new Configuration() - new TeeFilter(config, analysisInstance) - } -} \ No newline at end of file + def initRabbitMQConnector() { + val rabbitConfig = new Configuration() + rabbitConfig.setProperty(RabbitMQReader::CONFIG_PROPERTY_NAME_PROVIDERURL, "localhost") + rabbitConfig.setProperty(RabbitMQReader::CONFIG_PROPERTY_NAME_QUEUE, "validTraces") + new RabbitMQConnector(rabbitConfig, analysisInstance) + } + + def initAggregationFilter() { + val config = new Configuration() + new TraceEventRecordAggregationFilter(config, analysisInstance) + } + + def initTimer() { + val config = new Configuration() + config.setProperty(TimeReader::CONFIG_PROPERTY_VALUE_UPDATE_INTERVAL_NS, "1000000000") + config.setProperty(TimeReader::CONFIG_PROPERTY_VALUE_DELAY_NS, "0") + new TimeReader(config, analysisInstance) + } + + def initCountingThroughputFilter() { + val config = new Configuration() + config.setProperty(CountingThroughputFilter::CONFIG_PROPERTY_NAME_INTERVAL_SIZE, "1000000000") + new CountingThroughputFilter(config, analysisInstance) + } + + def initEventRecordTraceReconstructionFilter() { + val config = new Configuration() + new EventRecordTraceReconstructionFilter(config, analysisInstance) + } + + def initTeeFilter() { + val config = new Configuration() + new TeeFilter(config, analysisInstance) + } +} diff --git a/src/kieker/analysis/plugin/Bits.java b/src/kieker/analysis/plugin/Bits.java new file mode 100644 index 0000000000000000000000000000000000000000..77c943dbc82d4cc4f6a876b25f1296a4b0fbdbfb --- /dev/null +++ b/src/kieker/analysis/plugin/Bits.java @@ -0,0 +1,84 @@ +package kieker.analysis.plugin; + +public class Bits { + + public static boolean getBoolean(byte[] b, int off) { + return b[off] != 0; + } + + public static char getChar(byte[] b, int off) { + return (char) ((b[off + 1] & 0xFF) + (b[off] << 8)); + } + + public static short getShort(byte[] b, int off) { + return (short) ((b[off + 1] & 0xFF) + (b[off] << 8)); + } + + public static int getInt(byte[] b, int off) { + return ((b[off + 3] & 0xFF)) + ((b[off + 2] & 0xFF) << 8) + + ((b[off + 1] & 0xFF) << 16) + ((b[off]) << 24); + } + + public static float getFloat(byte[] b, int off) { + return Float.intBitsToFloat(getInt(b, off)); + } + + public static long getLong(byte[] b, int off) { + return ((b[off + 7] & 0xFFL)) + ((b[off + 6] & 0xFFL) << 8) + + ((b[off + 5] & 0xFFL) << 16) + ((b[off + 4] & 0xFFL) << 24) + + ((b[off + 3] & 0xFFL) << 32) + ((b[off + 2] & 0xFFL) << 40) + + ((b[off + 1] & 0xFFL) << 48) + (((long) b[off]) << 56); + } + + public static double getDouble(byte[] b, int off) { + return Double.longBitsToDouble(getLong(b, off)); + } + + public static byte getByte(byte[] b, int off) { + return b[off]; + } + + public static void putBoolean(byte[] b, int off, boolean val) { + b[off] = (byte) (val ? 1 : 0); + } + + public static void putChar(byte[] b, int off, char val) { + b[off + 1] = (byte) (val); + b[off] = (byte) (val >>> 8); + } + + public static void putShort(byte[] b, int off, short val) { + b[off + 1] = (byte) (val); + b[off] = (byte) (val >>> 8); + } + + public static void putInt(byte[] b, int off, int val) { + b[off + 3] = (byte) (val); + b[off + 2] = (byte) (val >>> 8); + b[off + 1] = (byte) (val >>> 16); + b[off] = (byte) (val >>> 24); + } + + public static void putFloat(byte[] b, int off, float val) { + putInt(b, off, Float.floatToIntBits(val)); + } + + public static void putLong(byte[] b, int off, long val) { + b[off + 7] = (byte) (val); + b[off + 6] = (byte) (val >>> 8); + b[off + 5] = (byte) (val >>> 16); + b[off + 4] = (byte) (val >>> 24); + b[off + 3] = (byte) (val >>> 32); + b[off + 2] = (byte) (val >>> 40); + b[off + 1] = (byte) (val >>> 48); + b[off] = (byte) (val >>> 56); + } + + public static void putDouble(byte[] b, int off, double val) { + putLong(b, off, Double.doubleToLongBits(val)); + } + + public static void putByte(byte[] b, int off, byte val) { + b[off] = val; + } +} diff --git a/src/kieker/analysis/plugin/connector/mq/RabbitMQConnector.java b/src/kieker/analysis/plugin/connector/mq/RabbitMQConnector.java deleted file mode 100644 index a70b5127a39538e5ae68d9dd5e493fdc7dced306..0000000000000000000000000000000000000000 --- a/src/kieker/analysis/plugin/connector/mq/RabbitMQConnector.java +++ /dev/null @@ -1,194 +0,0 @@ -/*************************************************************************** - * Copyright 2013 Kieker Project (http://kieker-monitoring.net) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - ***************************************************************************/ - -package kieker.analysis.plugin.connector.mq; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.ObjectOutputStream; - -import com.rabbitmq.client.Channel; -import com.rabbitmq.client.Connection; -import com.rabbitmq.client.ConnectionFactory; - -import kieker.analysis.IProjectContext; -import kieker.analysis.plugin.annotation.InputPort; -import kieker.analysis.plugin.annotation.OutputPort; -import kieker.analysis.plugin.annotation.Plugin; -import kieker.analysis.plugin.annotation.Property; -import kieker.analysis.plugin.filter.AbstractFilterPlugin; -import kieker.common.configuration.Configuration; -import kieker.common.logging.Log; -import kieker.common.logging.LogFactory; - -/** - * A plugin used for kieker in the cloud. - * All incoming events are put into a RabbitMQ, but are also passed to an output port that can be used for - * testing purposes. - * - * @author Santje Finke - * - * @since 1.8 - * - */ -@Plugin( - description = "A filter that writes all incoming events into a specified queue on a specified RabbitMQServer", - outputPorts = { - @OutputPort( - name = RabbitMQConnector.OUTPUT_PORT_NAME, eventTypes = { Object.class }, - description = "Provides each incoming object") }, - configuration = { - @Property(name = RabbitMQConnector.CONFIG_PROPERTY_NAME_PROVIDER, defaultValue = "localhost"), - @Property(name = RabbitMQConnector.CONFIG_PROPERTY_NAME_QUEUE, defaultValue = "master"), - @Property(name = RabbitMQConnector.CONFIG_PROPERTY_PASSWORD, defaultValue = "guest"), - @Property(name = RabbitMQConnector.CONFIG_PROPERTY_USER, defaultValue = "guest") - }) -public class RabbitMQConnector extends AbstractFilterPlugin { - - /** - * The name of the input port receiving the incoming events. - */ - public static final String INPUT_PORT_NAME_EVENTS = "inputEvents"; - /** - * The name of the output port passing the incoming events. - */ - public static final String OUTPUT_PORT_NAME = "relayedEvents"; - /** - * The name of the property determining the address of the used Server. - */ - public static final String CONFIG_PROPERTY_NAME_PROVIDER = "providerUrl"; - /** - * The name of the property determining the name of the Queue. - */ - public static final String CONFIG_PROPERTY_NAME_QUEUE = "queueName"; - - /** - * The username that is used to connect to a queue. - */ - public static final String CONFIG_PROPERTY_USER = "guest"; - /** - * The password that is used to connect to a queue. - */ - public static final String CONFIG_PROPERTY_PASSWORD = "guest"; - - private static final Log LOG = LogFactory.getLog(RabbitMQConnector.class); - - private final String provider; - private final String queue; - private final String password; - private final String username; - private Connection connection; - private Channel channel; - private final ConnectionFactory factory; - - /** - * Creates a new instance of this class using the given parameters. - * - * @param configuration - * The configuration for this plugin - * - * @deprecated To be removed in Kieker 1.8. - */ - @Deprecated - public RabbitMQConnector(final Configuration configuration) { - this(configuration, null); - } - - /** - * Creates a new instance of this class using the given parameters. - * - * @param configuration - * The configuration for this component. - * @param projectContext - * The project context for this component. - */ - public RabbitMQConnector(final Configuration configuration, final IProjectContext projectContext) { - super(configuration, projectContext); - this.provider = configuration.getStringProperty(CONFIG_PROPERTY_NAME_PROVIDER); - this.queue = configuration.getStringProperty(CONFIG_PROPERTY_NAME_QUEUE); - this.username = configuration.getStringProperty(CONFIG_PROPERTY_USER); - this.password = configuration.getStringProperty(CONFIG_PROPERTY_PASSWORD); - this.factory = new ConnectionFactory(); - this.factory.setHost(this.provider); - this.factory.setConnectionTimeout(0); - this.factory.setUsername(this.username); - this.factory.setPassword(this.password); - try { - this.connection = this.factory.newConnection(); - this.channel = this.connection.createChannel(); - this.channel.queueDeclare(this.queue, false, false, false, null); - } catch (final IOException e) { - LOG.info("Error establishing connection", e); - } - LOG.info("Sending to destination:" + this.queue + " at " + this.provider + " !\n***\n\n"); - - } - - @Override - public Configuration getCurrentConfiguration() { - final Configuration configuration = new Configuration(); - configuration.setProperty(CONFIG_PROPERTY_NAME_PROVIDER, this.provider); - configuration.setProperty(CONFIG_PROPERTY_NAME_QUEUE, this.queue); - configuration.setProperty(CONFIG_PROPERTY_PASSWORD, this.password); - configuration.setProperty(CONFIG_PROPERTY_USER, this.username); - return configuration; - } - - /** - * This method represents the input port of this filter. - * - * @param event - * The next event. - */ - @InputPort( - name = INPUT_PORT_NAME_EVENTS, - eventTypes = { Object.class }, - description = "Receives incoming objects to be forwarded to a queue") - public final void inputEvent(final Object event) { - super.deliver(OUTPUT_PORT_NAME, event); - final ByteArrayOutputStream boas = new ByteArrayOutputStream(); - ObjectOutputStream out; - try { - out = new ObjectOutputStream(boas); - out.writeObject(event); - out.close(); - final byte[] message = boas.toByteArray(); - - if (!this.connection.isOpen() || !this.channel.isOpen()) { - this.reconnect(); - } - - this.channel.basicPublish("", this.queue, null, message); - } catch (final IOException e) { - RabbitMQConnector.LOG.error("Error sending event", e); - } - - } - - /** - * Establishes a connection to a rabbitMQ channel with the current connection informationen. - */ - private void reconnect() { - try { - this.connection = this.factory.newConnection(); - this.channel = this.connection.createChannel(); - this.channel.queueDeclare(this.queue, false, false, false, null); - } catch (final IOException e) { - RabbitMQConnector.LOG.error("Error reestablishing connection", e); - } - } - -} diff --git a/src/kieker/analysis/plugin/filter/flow/WorkflowRecordFinalTraceReconstructionFilter.java b/src/kieker/analysis/plugin/filter/flow/WorkflowRecordFinalTraceReconstructionFilter.java deleted file mode 100644 index 139dc36bdbf2599eb502a9afdbc8b0547b7af6e1..0000000000000000000000000000000000000000 --- a/src/kieker/analysis/plugin/filter/flow/WorkflowRecordFinalTraceReconstructionFilter.java +++ /dev/null @@ -1,452 +0,0 @@ -/*************************************************************************** - * Copyright 2013 Kieker Project (http://kieker-monitoring.net) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - ***************************************************************************/ - -package kieker.analysis.plugin.filter.flow; - -import java.io.Serializable; -import java.util.Comparator; -import java.util.SortedSet; -import java.util.TreeSet; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.TimeUnit; - -import kieker.analysis.IProjectContext; -import kieker.analysis.plugin.annotation.InputPort; -import kieker.analysis.plugin.annotation.OutputPort; -import kieker.analysis.plugin.annotation.Plugin; -import kieker.analysis.plugin.annotation.Property; -import kieker.analysis.plugin.filter.AbstractFilterPlugin; -import kieker.common.configuration.Configuration; -import kieker.common.logging.Log; -import kieker.common.logging.LogFactory; -import kieker.common.record.flow.IFlowRecord; -import kieker.common.record.flow.trace.AbstractTraceEvent; -import kieker.common.record.flow.trace.WorkflowRecord; -import kieker.common.record.flow.trace.WorkflowTrace; - -/** - * This filter merges partial traces with the same trace id into one complete trace. - * Incomplete traces will be delivered after a specified timeout. - * - * @author Florian Biss, Soeren Mahmens, Bjoern Weissenfels - * - * @since 1.8 - */ -@Plugin(name = "Final Trace Reconstruction Filter (Workflow)", - description = "This filter merges partial WorkflowTraces into complete traces.", - outputPorts = { - @OutputPort(name = WorkflowRecordFinalTraceReconstructionFilter.OUTPUT_PORT_NAME_VALID_TRACES, - description = "Forwards valid traces", - eventTypes = { WorkflowTrace.class }), - @OutputPort(name = WorkflowRecordFinalTraceReconstructionFilter.OUTPUT_PORT_NAME_INVALID_TRACES, - description = "Forwards invalid traces", - eventTypes = { WorkflowTrace.class }) - }, - configuration = { - @Property(name = WorkflowRecordFinalTraceReconstructionFilter.CONFIG_PROPERTY_NAME_TIMEUNIT, - defaultValue = WorkflowRecordFinalTraceReconstructionFilter.CONFIG_PROPERTY_VALUE_TIMEUNIT), - @Property(name = WorkflowRecordFinalTraceReconstructionFilter.CONFIG_PROPERTY_NAME_MAX_TRACE_TIMEOUT, - defaultValue = WorkflowRecordFinalTraceReconstructionFilter.CONFIG_PROPERTY_VALUE_MAX_TIME), - @Property(name = WorkflowRecordFinalTraceReconstructionFilter.CONFIG_PROPERTY_NAME_MAX_TRACE_DURATION, - defaultValue = WorkflowRecordFinalTraceReconstructionFilter.CONFIG_PROPERTY_VALUE_MAX_TIME) - }) -public class WorkflowRecordFinalTraceReconstructionFilter extends AbstractFilterPlugin { - /** - * The name of the output port delivering the valid traces. - */ - public static final String OUTPUT_PORT_NAME_VALID_TRACES = "validTraces"; - /** - * The name of the output port delivering the valid traces. - */ - public static final String OUTPUT_PORT_NAME_INVALID_TRACES = "invalidTraces"; - /** - * The name of the input port receiving the trace records. - */ - public static final String INPUT_PORT_NAME_PARTIAL_TRACES = "partialTraces"; - - /** - * The name of the time trigger input port. - */ - public static final String INPUT_PORT_NAME_TIME_EVENT = "timestamp"; - - /** - * The name of the property determining the time unit. - */ - public static final String CONFIG_PROPERTY_NAME_TIMEUNIT = "timeunit"; - /** - * The name of the property determining the maximal trace duration, the time - * this filter waits for new partial traces of a trace. - */ - public static final String CONFIG_PROPERTY_NAME_MAX_TRACE_DURATION = "maxTraceDuration"; - /** - * The name of the property determining the maximal trace timeout. - */ - public static final String CONFIG_PROPERTY_NAME_MAX_TRACE_TIMEOUT = "maxTraceTimeout"; - /** - * The default value of the properties for the maximal trace duration and timeout. - */ - public static final String CONFIG_PROPERTY_VALUE_MAX_TIME = "9223372036854775807"; // String.valueOf(Long.MAX_VALUE) - /** - * The default value of the time unit property (nanoseconds). - */ - public static final String CONFIG_PROPERTY_VALUE_TIMEUNIT = "NANOSECONDS"; // TimeUnit.NANOSECONDS.name() - - private static final Log LOG = LogFactory.getLog(WorkflowRecordFinalTraceReconstructionFilter.class); - - private final TimeUnit timeunit; - private final long maxTraceDuration; - private final long maxTraceTimeout; - - private final ConcurrentMap<Long, TraceBuffer> traceId2trace; - - /** - * Creates a new instance of this class using the given parameters. - * - * @param configuration - * The configuration for this component. - * @param projectContext - * The project context for this component. - */ - public WorkflowRecordFinalTraceReconstructionFilter(final Configuration configuration, final IProjectContext projectContext) { - super(configuration, projectContext); - - if (null != projectContext) { // TODO #819 remove non-null check and else case in Kieker 1.8 //NOCS - final String recordTimeunitProperty = projectContext.getProperty(IProjectContext.CONFIG_PROPERTY_NAME_RECORDS_TIME_UNIT); - TimeUnit recordTimeunit; - try { - recordTimeunit = TimeUnit.valueOf(recordTimeunitProperty); - } catch (final IllegalArgumentException ex) { // already caught in AnalysisController, should never happen - LOG.warn(recordTimeunitProperty + " is no valid TimeUnit! Using NANOSECONDS instead."); - recordTimeunit = TimeUnit.NANOSECONDS; - } - this.timeunit = recordTimeunit; - } else { - this.timeunit = TimeUnit.NANOSECONDS; - } - - final String configTimeunitProperty = configuration.getStringProperty(CONFIG_PROPERTY_NAME_TIMEUNIT); - TimeUnit configTimeunit; - try { - configTimeunit = TimeUnit.valueOf(configTimeunitProperty); - } catch (final IllegalArgumentException ex) { - LOG.warn(configTimeunitProperty + " is no valid TimeUnit! Using inherited value of " + this.timeunit.name() + " instead."); - configTimeunit = this.timeunit; - } - - this.maxTraceDuration = this.timeunit.convert(configuration.getLongProperty(CONFIG_PROPERTY_NAME_MAX_TRACE_DURATION), configTimeunit); - this.maxTraceTimeout = this.timeunit.convert(configuration.getLongProperty(CONFIG_PROPERTY_NAME_MAX_TRACE_TIMEOUT), configTimeunit); - this.traceId2trace = new ConcurrentHashMap<Long, TraceBuffer>(); - } - - /** - * This method is the input port for the timeout. - * - * @param timestamp - * The actual nanotime - */ - @InputPort( - name = INPUT_PORT_NAME_TIME_EVENT, - description = "Input port for periodic a time signal", - eventTypes = { Long.class }) - public void newEvent(final Long timestamp) { - synchronized (this) { - for (final TraceBuffer traceBuffer : this.traceId2trace.values()) { - final long timeSinceLastRecord = timestamp - traceBuffer.getMaxLoggingTimestamp(); - final long timeSinceFirstRecord = timestamp - traceBuffer.getMinLoggingTimestamp(); - if ((timeSinceLastRecord >= this.maxTraceTimeout) || (timeSinceFirstRecord >= this.maxTraceDuration)) { // max duration or timeout is gone - super.deliver(OUTPUT_PORT_NAME_INVALID_TRACES, traceBuffer.toWorkflowTrace()); - // Concurrent modification allowed by ConcurrentMap - this.traceId2trace.remove(traceBuffer.getTraceId()); - } - } - } - } - - /** - * This method is the input port for the new events for this filter. - * - * @param record - * The new record to handle. - */ - @InputPort( - name = INPUT_PORT_NAME_PARTIAL_TRACES, - description = "Input port for partial traces from WorkflowRecordPartialTraceReconstructionFilters", - eventTypes = { WorkflowTrace.class }) - public void newEvent(final IFlowRecord record) { - final Long traceId; - TraceBuffer traceBuffer; - final WorkflowTrace trace; - - if (record instanceof WorkflowTrace) { - trace = (WorkflowTrace) record; - - if (trace.isComplete()) { - super.deliver(OUTPUT_PORT_NAME_VALID_TRACES, trace); // Nothing to do here - return; - } else { - traceId = trace.getTraceId(); - synchronized (this) { - traceBuffer = this.getTraceBuffer(traceId); - traceBuffer.insertTrace(trace); - } - } - } else { - LOG.error("Invalid input type at " + INPUT_PORT_NAME_PARTIAL_TRACES - + " in WorkflowRecordFinalTraceReconstructionFilter"); - return; // invalid type - } - - synchronized (this) { - if (traceBuffer.isComplete()) { - this.traceId2trace.remove(traceId); - super.deliver(OUTPUT_PORT_NAME_VALID_TRACES, traceBuffer.toWorkflowTrace()); - } - } - } - - private TraceBuffer getTraceBuffer(final Long traceId) { - TraceBuffer traceBuffer; - traceBuffer = this.traceId2trace.get(traceId); - if (traceBuffer == null) { // first record for this id! - synchronized (this) { - traceBuffer = this.traceId2trace.get(traceId); - - if (traceBuffer == null) { // NOCS (DCL) - final TraceBuffer newTraceBuffer = new TraceBuffer(this.timeunit); - traceBuffer = this.traceId2trace.put(traceId, newTraceBuffer); - if (traceBuffer == null) { - traceBuffer = newTraceBuffer; - } - } - } - } - return traceBuffer; - } - - /** - * {@inheritDoc} - */ - @Override - public void terminate(final boolean error) { - synchronized (this) { - for (final TraceBuffer traceBuffer : this.traceId2trace.values()) { - super.deliver(OUTPUT_PORT_NAME_INVALID_TRACES, traceBuffer.toWorkflowTrace()); - } - this.traceId2trace.clear(); - } - } - - /** - * {@inheritDoc} - */ - @Override - public Configuration getCurrentConfiguration() { - final Configuration configuration = new Configuration(); - configuration.setProperty(CONFIG_PROPERTY_NAME_TIMEUNIT, this.timeunit.name()); - configuration.setProperty(CONFIG_PROPERTY_NAME_MAX_TRACE_DURATION, String.valueOf(this.maxTraceDuration)); - configuration.setProperty(CONFIG_PROPERTY_NAME_MAX_TRACE_TIMEOUT, String.valueOf(this.maxTraceTimeout)); - return configuration; - } - - /** - * Buffer for events from partial traces that will be turned into a single trace. - * - * @author Florian Biss - */ - private static final class TraceBuffer { - private static final Log LOG = LogFactory.getLog(TraceBuffer.class); - private static final Comparator<AbstractTraceEvent> COMPARATOR = new TraceEventComperator(); - - private final SortedSet<AbstractTraceEvent> events = new TreeSet<AbstractTraceEvent>(COMPARATOR); - - private boolean damaged; - private int maxOrderIndex = -1; - - private final TimeUnit timeunit; - - private long minLoggingTimestamp = Long.MAX_VALUE; - private long maxLoggingTimestamp = -1; - - private long traceId = -1; - private boolean hasStart; - private boolean hasEnd; - - /** - * Creates a new buffer. - * - * @param timeunit - * TimetUnit used for logging timestamps. - */ - public TraceBuffer(final TimeUnit timeunit) { - this.timeunit = timeunit; - } - - /** - * Store all events of a partial trace in the buffer. - * - * @param paritalTrace - * A partial trace - */ - public void insertTrace(final WorkflowTrace paritalTrace) { - final long myTraceId = paritalTrace.getTraceId(); - // Time information in partial traces are old (partial traces timed out on a worker), - // use current system time instead. - final long loggingTimestamp = this.timeunit.convert(System.nanoTime(), TimeUnit.NANOSECONDS); - - synchronized (this) { - - if (this.traceId == -1) { - this.traceId = myTraceId; - this.minLoggingTimestamp = loggingTimestamp; - } else if (this.traceId != myTraceId) { - LOG.error("Invalid traceId! Expected: " + this.traceId + " but found: " + myTraceId + " in trace " + paritalTrace.toString()); - this.damaged = true; - } - - this.maxLoggingTimestamp = loggingTimestamp; - - final WorkflowRecord[] newEvents = paritalTrace.getTraceEvents(); - - for (final WorkflowRecord event : newEvents) { - this.insertEvent(event); - } - - if (paritalTrace.isDamaged()) { - this.damaged = true; - } - } - } - - private void insertEvent(final WorkflowRecord event) { - final int orderIndex = event.getOrderIndex(); - if (orderIndex > this.maxOrderIndex) { - this.maxOrderIndex = orderIndex; - } - - if (event.isStart()) { - if (this.hasStart) { - LOG.error("Duplicate start event! TraceId: " + this.traceId + " Event: " + event.toString()); - this.damaged = true; - } - this.hasStart = true; - } - - if (event.isEnd()) { - if (this.hasEnd) { - LOG.error("Duplicate end event! TraceId: " + this.traceId + " Event: " + event.toString()); - this.damaged = true; - } - this.hasEnd = true; - } - - if (!this.events.add(event)) { - LOG.error("Duplicate entry for orderIndex " + orderIndex + " with traceId " + this.traceId); - this.damaged = true; - } - } - - /** - * @return - * <code>true</code> if all records are present and the trace has a start and an end record. - */ - public boolean isComplete() { - synchronized (this) { - return ((this.maxOrderIndex + 1) == this.events.size()) && !this.events.isEmpty() - && this.hasEnd && this.hasStart && !this.damaged; - } - } - - /** - * @return <code>true</code> if the trace in this buffer is damaged. - */ - public boolean isDamaged() { - synchronized (this) { - return this.damaged; - } - } - - /** - * @return The trace id - */ - public long getTraceId() { - synchronized (this) { - return this.traceId; - } - } - - /** - * Process this buffer into a WorkflowTrace containing all buffered events. - * - * @return A new WorkflowTrace - */ - public WorkflowTrace toWorkflowTrace() { - synchronized (this) { - return new WorkflowTrace(this.events.toArray(new WorkflowRecord[this.events.size()]), - this.isComplete(), this.isDamaged()); - } - } - - /** - * {@inheritDoc} - */ - @Override - public String toString() { - return this.toWorkflowTrace().toString(); - } - - /** - * @return Insertion timestamp of the latest event in this buffer - */ - public long getMaxLoggingTimestamp() { - synchronized (this) { - return this.maxLoggingTimestamp; - } - } - - /** - * @return Insertion timestamp of the first event in this buffer - */ - public long getMinLoggingTimestamp() { - synchronized (this) { - return this.minLoggingTimestamp; - } - } - - /** - * Compares two trace events by their order index. - * - * @author Jan Waller - */ - private static final class TraceEventComperator implements Comparator<AbstractTraceEvent>, Serializable { - private static final long serialVersionUID = 89207356648232517L; - - /** - * Creates a new instance of this class. - */ - public TraceEventComperator() { - // default empty constructor - } - - /** - * {@inheritDoc} - */ - public int compare(final AbstractTraceEvent o1, final AbstractTraceEvent o2) { - return o1.getOrderIndex() - o2.getOrderIndex(); - } - } - } -} diff --git a/src/kieker/analysis/plugin/filter/flow/WorkflowRecordPartialTraceReconstructionFilter.java b/src/kieker/analysis/plugin/filter/flow/WorkflowRecordPartialTraceReconstructionFilter.java deleted file mode 100644 index ef15f0491c937ac2f25121d633762f0eaecfab1e..0000000000000000000000000000000000000000 --- a/src/kieker/analysis/plugin/filter/flow/WorkflowRecordPartialTraceReconstructionFilter.java +++ /dev/null @@ -1,388 +0,0 @@ -/*************************************************************************** - * Copyright 2013 Kieker Project (http://kieker-monitoring.net) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - ***************************************************************************/ - -package kieker.analysis.plugin.filter.flow; - -import java.io.Serializable; -import java.util.Comparator; -import java.util.SortedSet; -import java.util.TreeSet; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -import kieker.analysis.IProjectContext; -import kieker.analysis.plugin.annotation.InputPort; -import kieker.analysis.plugin.annotation.OutputPort; -import kieker.analysis.plugin.annotation.Plugin; -import kieker.analysis.plugin.annotation.Property; -import kieker.analysis.plugin.filter.AbstractFilterPlugin; -import kieker.common.configuration.Configuration; -import kieker.common.logging.Log; -import kieker.common.logging.LogFactory; -import kieker.common.record.flow.IFlowRecord; -import kieker.common.record.flow.trace.AbstractTraceEvent; -import kieker.common.record.flow.trace.WorkflowRecord; -import kieker.common.record.flow.trace.WorkflowTrace; - -/** - * This filter collects WorkflowRecords and constructs ordered traces of them. - * - * - * @author Florian Biss, Soeren Mahmens, Bjoern Weissenfels - * - * @since 1.8 - */ -@Plugin(name = "Partial Trace Reconstruction Filter (Workflow)", - description = "This filter bundles WorkflowRecords into a trace", - outputPorts = { - @OutputPort(name = WorkflowRecordPartialTraceReconstructionFilter.OUTPUT_PORT_NAME_PARTIAL_TRACES, - description = "Forwards the constructed partial and complete traces", - eventTypes = { WorkflowTrace.class }) - }, - configuration = { - @Property(name = WorkflowRecordPartialTraceReconstructionFilter.CONFIG_PROPERTY_NAME_MAX_TRACE_TIMEOUT_NS, - defaultValue = WorkflowRecordPartialTraceReconstructionFilter.CONFIG_PROPERTY_VALUE_MAX_TRACE_TIMEOUT_NS), - @Property(name = WorkflowRecordPartialTraceReconstructionFilter.CONFIG_PROPERTY_NAME_MAX_TRACE_DURATION_NS, - defaultValue = WorkflowRecordPartialTraceReconstructionFilter.CONFIG_PROPERTY_VALUE_MAX_TRACE_DURATION_NS) - }) -public class WorkflowRecordPartialTraceReconstructionFilter extends AbstractFilterPlugin { - /** - * The name of the output port delivering the valid traces. - */ - public static final String OUTPUT_PORT_NAME_PARTIAL_TRACES = "partialTraces"; - /** - * The name of the input port receiving the trace records. - */ - public static final String INPUT_PORT_NAME_WORKFLOW_RECORDS = "workflowRecords"; - - /** - * The name of the input port receiving the time stamps. - */ - public static final String INPUT_PORT_NAME_TIME_EVENT = "timestamp"; - /** - * The name of the property determining the maximal trace timeout. - */ - public static final String CONFIG_PROPERTY_NAME_MAX_TRACE_TIMEOUT_NS = "maxTraceTimeout"; - - /** - * The default value of the property determining the maximal trace timeout. - */ - public static final String CONFIG_PROPERTY_VALUE_MAX_TRACE_TIMEOUT_NS = "1000000000"; // 1 seconds - - /** - * The name of the property determining the maximal trace duration. - */ - public static final String CONFIG_PROPERTY_NAME_MAX_TRACE_DURATION_NS = "maxTraceDuration"; - - /** - * The default value of the property determining the maximal trace duration. - */ - public static final String CONFIG_PROPERTY_VALUE_MAX_TRACE_DURATION_NS = "5000000000"; // 5 seconds - - private static final Log LOG = LogFactory.getLog(WorkflowRecordPartialTraceReconstructionFilter.class); - - private final ConcurrentMap<Long, TraceBuffer> traceId2trace; - private final long maxTraceTimeout; - private final long maxTraceDuration; - - /** - * Creates a new instance of this class using the given parameters. - * - * @param configuration - * The configuration for this component. - * @param projectContext - * The project context for this component. - */ - public WorkflowRecordPartialTraceReconstructionFilter(final Configuration configuration, final IProjectContext projectContext) { - super(configuration, projectContext); - - this.traceId2trace = new ConcurrentHashMap<Long, TraceBuffer>(); - - this.maxTraceTimeout = configuration.getLongProperty(CONFIG_PROPERTY_NAME_MAX_TRACE_TIMEOUT_NS); - this.maxTraceDuration = configuration.getLongProperty(CONFIG_PROPERTY_NAME_MAX_TRACE_DURATION_NS); - - } - - /** - * This method is the input port for the timeout. - * - * @param timestamp - * The actual nanotime - */ - @InputPort( - name = INPUT_PORT_NAME_TIME_EVENT, - description = "Input port for periodic a time signal", - eventTypes = { Long.class }) - public void newEvent(final Long timestamp) { - synchronized (this) { - for (final TraceBuffer traceBuffer : this.traceId2trace.values()) { - final long timeSinceLastRecord = timestamp - traceBuffer.getMaxLoggingTimestamp(); - final long timeSinceFirstRecord = timestamp - traceBuffer.getMinLoggingTimestamp(); - if ((timeSinceLastRecord >= this.maxTraceTimeout) || (timeSinceFirstRecord >= this.maxTraceDuration)) { // max timeout or duration is gone - this.traceId2trace.remove(traceBuffer.getTraceID()); - super.deliver(OUTPUT_PORT_NAME_PARTIAL_TRACES, traceBuffer.toWorkflowTrace()); - } - } - } - } - - /** - * This method is the input port for the new events for this filter. - * - * @param record - * The new record to handle. - */ - @InputPort( - name = INPUT_PORT_NAME_WORKFLOW_RECORDS, - description = "Input port for WorkflowRecords", - eventTypes = { WorkflowRecord.class }) - public void newEvent(final IFlowRecord record) { - - final Long traceId; - TraceBuffer traceBuffer; - - if (record instanceof WorkflowRecord) { - final WorkflowRecord traceRecord = (WorkflowRecord) record; - traceId = traceRecord.getTraceId(); - traceBuffer = this.getTraceBuffer(traceId); - - synchronized (this) { - traceBuffer.insertEvent(traceRecord); - - if (traceBuffer.hasEnd()) { - this.traceId2trace.remove(traceId); - super.deliver(OUTPUT_PORT_NAME_PARTIAL_TRACES, traceBuffer.toWorkflowTrace()); - } - } - - } else { - LOG.error("Invalid event type in WorkflowRecordPartialTraceReconstructionFilter"); - return; // invalid type which should not happen due to the specified eventTypes - } - - } - - private TraceBuffer getTraceBuffer(final Long traceId) { - TraceBuffer traceBuffer; - traceBuffer = this.traceId2trace.get(traceId); - if (traceBuffer == null) { // first record for this id! - synchronized (this) { - traceBuffer = this.traceId2trace.get(traceId); - - if (traceBuffer == null) { // NOCS (DCL) - final TraceBuffer newTraceBuffer = new TraceBuffer(); - traceBuffer = this.traceId2trace.put(traceId, newTraceBuffer); - if (traceBuffer == null) { - traceBuffer = newTraceBuffer; - } - } - } - } - return traceBuffer; - } - - /** - * {@inheritDoc} - */ - @Override - public void terminate(final boolean error) { - this.deliverAllBuffer(); - } - - /** - * {@inheritDoc} - */ - @Override - public Configuration getCurrentConfiguration() { - final Configuration configuration = new Configuration(); - configuration.setProperty(CONFIG_PROPERTY_NAME_MAX_TRACE_TIMEOUT_NS, String.valueOf(this.maxTraceTimeout)); - return configuration; - } - - /** - * This method delivers all traceBuffer and clears the buffer. - */ - private void deliverAllBuffer() { - synchronized (this) { - for (final TraceBuffer traceBuffer : this.traceId2trace.values()) { - super.deliver(OUTPUT_PORT_NAME_PARTIAL_TRACES, traceBuffer.toWorkflowTrace()); - } - this.traceId2trace.clear(); - } - } - - /** - * Buffer for records that will be bundled into a single trace. - * - * @author Florian Biss - */ - private static final class TraceBuffer { - private static final Log LOG = LogFactory.getLog(TraceBuffer.class); - private static final Comparator<AbstractTraceEvent> COMPARATOR = new TraceEventComperator(); - - private final SortedSet<WorkflowRecord> events = new TreeSet<WorkflowRecord>(COMPARATOR); - - private boolean damaged; - private int maxOrderIndex = -1; - - private long minLoggingTimestamp = Long.MAX_VALUE; - private long maxLoggingTimestamp = -1; - - private long traceId = -1; - private boolean hasStart; - private boolean ended; - - /** - * Creates a new buffer. - */ - public TraceBuffer() { - // default empty constructor - } - - /** - * @return The trace id - */ - public Object getTraceID() { - synchronized (this) { - return this.traceId; - } - } - - /** - * Insert a new event into buffer. - * - * @param event - * New event - */ - public void insertEvent(final WorkflowRecord event) { - final long myTraceId = event.getTraceId(); - synchronized (this) { - - final long currentTime = System.nanoTime(); - if (this.traceId == -1) { - this.traceId = myTraceId; - this.minLoggingTimestamp = currentTime; - } else if (this.traceId != myTraceId) { - LOG.error("Invalid traceId! Expected: " + this.traceId + " but found: " + myTraceId + " in event " + event.toString()); - this.damaged = true; - } - - this.maxLoggingTimestamp = currentTime; - - final int orderIndex = event.getOrderIndex(); - if (orderIndex > this.maxOrderIndex) { - this.maxOrderIndex = orderIndex; - } - - if (event.isStart()) { - if (this.hasStart) { - LOG.error("Duplicate start event! TraceId: " + this.traceId + " Event: " + event.toString()); - this.damaged = true; - } - this.hasStart = true; - } - - if (event.isEnd()) { - if (this.ended) { - LOG.error("Duplicate end event! TraceId: " + this.traceId + " Event: " + event.toString()); - this.damaged = true; - } - this.ended = true; - } - - if (!this.events.add(event)) { - LOG.error("Duplicate entry for orderIndex " + orderIndex + " with traceId " + myTraceId); - this.damaged = true; - } - } - } - - /** - * @return <code>true</code> if this buffer contains a trace end record - */ - public boolean hasEnd() { - synchronized (this) { - return this.ended; - } - } - - /** - * @return <code>true</code> if this buffer contains all records of a trace, including start and end record - */ - public boolean isComplete() { - synchronized (this) { - return ((this.maxOrderIndex + 1) == this.events.size()) - && !this.events.isEmpty() && this.ended && this.hasStart; - } - } - - /** - * @return <code>true</code> if the trace in this buffer is damaged - */ - public boolean isDamaged() { - synchronized (this) { - return this.damaged; - } - } - - /** - * Process buffer into a trace. - * - * @return - * A new trace containing the buffered events - */ - public WorkflowTrace toWorkflowTrace() { - return new WorkflowTrace(this.events.toArray(new WorkflowRecord[this.events.size()]), - this.isComplete(), this.isDamaged()); - } - - /** - * @return Youngest time stamp in trace - */ - public long getMaxLoggingTimestamp() { - synchronized (this) { - return this.maxLoggingTimestamp; - } - } - - /** - * @return Oldest time stamp in trace - */ - public long getMinLoggingTimestamp() { - synchronized (this) { - return this.minLoggingTimestamp; - } - } - - /** - * @author Jan Waller - */ - private static final class TraceEventComperator implements Comparator<AbstractTraceEvent>, Serializable { - private static final long serialVersionUID = 89207356648232517L; - - /** - * Creates a new instance of this class. - */ - public TraceEventComperator() { - // default empty constructor - } - - public int compare(final AbstractTraceEvent o1, final AbstractTraceEvent o2) { - return o1.getOrderIndex() - o2.getOrderIndex(); - } - } - } -} diff --git a/src/kieker/analysis/plugin/filter/flow/WorkflowRecordTraceAgglomerationFilter.java b/src/kieker/analysis/plugin/filter/flow/WorkflowRecordTraceAgglomerationFilter.java deleted file mode 100644 index da5e9c7e2411545aaf4333f53be60776235eabe0..0000000000000000000000000000000000000000 --- a/src/kieker/analysis/plugin/filter/flow/WorkflowRecordTraceAgglomerationFilter.java +++ /dev/null @@ -1,496 +0,0 @@ -/*************************************************************************** - * Copyright 2013 Kieker Project (http://kieker-monitoring.net) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - ***************************************************************************/ - -package kieker.analysis.plugin.filter.flow; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Comparator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; -import java.util.concurrent.TimeUnit; - -import kieker.analysis.IProjectContext; -import kieker.analysis.plugin.annotation.InputPort; -import kieker.analysis.plugin.annotation.OutputPort; -import kieker.analysis.plugin.annotation.Plugin; -import kieker.analysis.plugin.annotation.Property; -import kieker.analysis.plugin.filter.AbstractFilterPlugin; -import kieker.common.configuration.Configuration; -import kieker.common.logging.Log; -import kieker.common.logging.LogFactory; -import kieker.common.record.flow.IFlowRecord; -import kieker.common.record.flow.trace.WorkflowRecord; -import kieker.common.record.flow.trace.WorkflowTrace; -import kieker.common.util.StatisticInformation; - -/** - * This filter collects incoming traces for a specified amount of time. - * Any traces representing the same series of events will be used to calculate - * statistic informations like the average runtime of this kind of trace. - * Only one specimen of these traces containing this information will be forwarded - * from this filter. - * - * Statistic outliers regarding the runtime of the trace will be treated special - * and therefore send out as they are and will not be mixed with others. - * - * - * @author Florian Biss - * - * @since 1.8 - */ - -@Plugin(description = "This filter tries to agglomerate similar WorkflowTraces into a single trace.", - outputPorts = { - @OutputPort(name = WorkflowRecordTraceAgglomerationFilter.OUTPUT_PORT_NAME_TRACES, - description = "Output port for the processed traces", - eventTypes = { WorkflowTrace.class }) - }, - configuration = { - @Property(name = WorkflowRecordTraceAgglomerationFilter.CONFIG_PROPERTY_NAME_TIMEUNIT, - defaultValue = WorkflowRecordTraceAgglomerationFilter.CONFIG_PROPERTY_VALUE_TIMEUNIT), - @Property(name = WorkflowRecordTraceAgglomerationFilter.CONFIG_PROPERTY_NAME_MAX_COLLECTION_DURATION, - defaultValue = WorkflowRecordTraceAgglomerationFilter.CONFIG_PROPERTY_VALUE_MAX_COLLECTION_DURATION), - @Property(name = WorkflowRecordTraceAgglomerationFilter.CONFIG_PROPERTY_NAME_MAX_DEVIATION, - defaultValue = WorkflowRecordTraceAgglomerationFilter.CONFIG_PROPERTY_VALUE_MAX_DEVIATION) - }) -public class WorkflowRecordTraceAgglomerationFilter extends AbstractFilterPlugin { - /** - * The name of the output port delivering the valid traces. - */ - public static final String OUTPUT_PORT_NAME_TRACES = "tracesOut"; - - /** - * The name of the input port receiving the trace records. - */ - public static final String INPUT_PORT_NAME_TRACES = "tracesIn"; - - /** - * The name of the property determining the time unit. - */ - public static final String CONFIG_PROPERTY_NAME_TIMEUNIT = "timeunit"; - - /** - * Clock input for timeout handling. - */ - public static final String INPUT_PORT_NAME_TIME_EVENT = "timestamp"; - - /** - * The default value of the time unit property (nanoseconds). - */ - public static final String CONFIG_PROPERTY_VALUE_TIMEUNIT = "NANOSECONDS"; // TimeUnit.NANOSECONDS.name() - - /** - * The name of the property determining the maximal trace timeout. - */ - public static final String CONFIG_PROPERTY_NAME_MAX_COLLECTION_DURATION = "maxCollectionDuration"; - - /** - * The default value of the property determining the maximal trace timeout. - */ - public static final String CONFIG_PROPERTY_VALUE_MAX_COLLECTION_DURATION = "5000000000"; - - /** - * The name of the property determining the maximal runtime deviation factor. - * - * Outliers are indicated by <code>|runtime - averageRuntime| > deviationFactor * standardDeviation</code>. - * Use negative number to agglomerate all traces. - */ - public static final String CONFIG_PROPERTY_NAME_MAX_DEVIATION = "maxDeviation"; - - /** - * The default value of the property determining the maximal runtime deviation factor. - * Default is two standard deviations. - */ - public static final String CONFIG_PROPERTY_VALUE_MAX_DEVIATION = "2"; - - private static final Log LOG = LogFactory.getLog(WorkflowRecordTraceAgglomerationFilter.class); - - private final TimeUnit timeunit; - private final long maxCollectionDuration; - private final long maxDeviation; - - private final Map<WorkflowTrace, TraceAgglomerationBuffer> trace2buffer; - - /** - * Creates a new instance of this class using the given parameters. - * - * @param configuration - * The configuration for this component. - * @param projectContext - * The project context for this component. - */ - public WorkflowRecordTraceAgglomerationFilter(final Configuration configuration, final IProjectContext projectContext) { - super(configuration, projectContext); - - if (null != projectContext) { // TODO #819 remove non-null check and else case in Kieker 1.8 //NOCS - final String recordTimeunitProperty = projectContext.getProperty(IProjectContext.CONFIG_PROPERTY_NAME_RECORDS_TIME_UNIT); - TimeUnit recordTimeunit; - try { - recordTimeunit = TimeUnit.valueOf(recordTimeunitProperty); - } catch (final IllegalArgumentException ex) { // already caught in AnalysisController, should never happen - LOG.warn(recordTimeunitProperty + " is no valid TimeUnit! Using NANOSECONDS instead."); - recordTimeunit = TimeUnit.NANOSECONDS; - } - this.timeunit = recordTimeunit; - } else { - this.timeunit = TimeUnit.NANOSECONDS; - } - this.maxDeviation = configuration.getLongProperty(CONFIG_PROPERTY_NAME_MAX_DEVIATION); - final String configTimeunitProperty = configuration.getStringProperty(CONFIG_PROPERTY_NAME_TIMEUNIT); - TimeUnit configTimeunit; - try { - configTimeunit = TimeUnit.valueOf(configTimeunitProperty); - } catch (final IllegalArgumentException ex) { - LOG.warn(configTimeunitProperty + " is no valid TimeUnit! Using inherited value of " + this.timeunit.name() + " instead."); - configTimeunit = this.timeunit; - } - this.maxCollectionDuration = this.timeunit.convert(configuration.getLongProperty(CONFIG_PROPERTY_NAME_MAX_COLLECTION_DURATION), configTimeunit); - this.trace2buffer = new TreeMap<WorkflowTrace, TraceAgglomerationBuffer>(new TraceComperator()); - } - - /** - * This method is the input port for incoming traces. - * - * @param record - * A WorkflowTrace - */ - @InputPort( - name = INPUT_PORT_NAME_TRACES, - description = "Collect identical traces and agglomerate them.", - eventTypes = { WorkflowTrace.class }) - public void newEvent(final IFlowRecord event) { - final WorkflowTrace trace; - synchronized (this) { - if (event instanceof WorkflowTrace) { - trace = (WorkflowTrace) event; - - if (!trace.isComplete() || trace.isDamaged()) { - super.deliver(OUTPUT_PORT_NAME_TRACES, trace); // Incomplete or damaged? Nothing to do here. - return; - } else { - this.insertIntoBuffer(trace); - } - } else { - LOG.error("Invalid input type at " + OUTPUT_PORT_NAME_TRACES - + " in WorkflowRecordTraceAgglomerationFilter"); - return; // invalid type - } - } - } - - /** - * Inserts a WorkflowTrace into the buffer. - * - * @param trace - * The WorkflowTrace that will be inserted - */ - private void insertIntoBuffer(final WorkflowTrace trace) { - - TraceAgglomerationBuffer traceBuffer; - final long timestamp; - timestamp = this.timeunit.convert(System.nanoTime(), TimeUnit.NANOSECONDS); - traceBuffer = this.trace2buffer.get(trace); - - if (traceBuffer == null) { // first record for this id! - synchronized (this) { - traceBuffer = this.trace2buffer.get(trace); - - if (traceBuffer == null) { // NOCS (DCL) - traceBuffer = new TraceAgglomerationBuffer(timestamp, this.maxDeviation); - this.trace2buffer.put(trace, traceBuffer); - } - - } - } - - traceBuffer.insertTrace(trace); - } - - /** - * This method is the input port for the timeout. - * - * @param timestamp - * The current nanotime - */ - @InputPort( - name = INPUT_PORT_NAME_TIME_EVENT, - description = "Time signal for timeouts", - eventTypes = { Long.class }) - public void newEvent(final Long timestamp) { - synchronized (this) { - this.processTimeoutQueue(timestamp); - } - } - - /** - * {@inheritDoc} - */ - @Override - public void terminate(final boolean error) { - synchronized (this) { - // Avoid ConcurrentModificationException, deliverBuffer will remove the buffer from trace2buffer. - final List<TraceAgglomerationBuffer> buffers = new LinkedList<TraceAgglomerationBuffer>(this.trace2buffer.values()); - for (final TraceAgglomerationBuffer traceBuffer : buffers) { - this.deliverBuffer(traceBuffer); - } - this.trace2buffer.clear(); - } - } - - private void processTimeoutQueue(final long timestamp) { - final long bufferTimeout = timestamp - this.maxCollectionDuration; - // Avoid ConcurrentModificationException, deliverBuffer will remove the buffer from trace2buffer. - final List<TraceAgglomerationBuffer> buffers = new LinkedList<TraceAgglomerationBuffer>(this.trace2buffer.values()); - for (final TraceAgglomerationBuffer traceBuffer : buffers) { - if (traceBuffer.getBufferCreatedTimestamp() <= bufferTimeout) { - this.deliverBuffer(traceBuffer); - } - } - } - - /** - * Deliver and remove a buffer. - * - * @param traceBuffer - */ - private void deliverBuffer(final TraceAgglomerationBuffer traceBuffer) { - final List<WorkflowTrace> traces = traceBuffer.processBuffer(); - for (final WorkflowTrace trace : traces) { - super.deliver(OUTPUT_PORT_NAME_TRACES, trace); - } - // Kill the buffer - if (this.trace2buffer.remove(traces.get(0)) == null) { - LOG.warn("Removal of buffer failed."); - } - } - - /** - * {@inheritDoc} - */ - @Override - public Configuration getCurrentConfiguration() { - final Configuration configuration = new Configuration(); - configuration.setProperty(CONFIG_PROPERTY_NAME_TIMEUNIT, this.timeunit.name()); - configuration.setProperty(CONFIG_PROPERTY_NAME_MAX_COLLECTION_DURATION, String.valueOf(this.maxCollectionDuration)); - configuration.setProperty(CONFIG_PROPERTY_NAME_MAX_DEVIATION, String.valueOf(this.maxDeviation)); - return configuration; - } - - /** - * Buffer for similar traces that are to be agglomerated into a single trace. - * - * @author Florian Biß - */ - private static final class TraceAgglomerationBuffer { - private static final Log LOG = LogFactory.getLog(TraceAgglomerationBuffer.class); - - /** Contains all buffered traces for statistical purposes. */ - private final List<WorkflowTrace> traces = new ArrayList<WorkflowTrace>(); - - /** Contains each unique trace at most once. */ - private WorkflowTrace agglomeratedTraces; - - private final long bufferCreatedTimestamp; - - /** - * Maximal runtime deviation factor. Use negative number to agglomerate all traces. - * - * Outliers are indicated by <code>averageRuntime - deviationFactor * standardDeviation > runtime</code> or - * <code>runtime > averageRuntime + deviationFactor * standardDeviation</code> - * - */ - private final long deviationFactor; - - /** - * Creates a new instance of this class. - */ - public TraceAgglomerationBuffer(final long bufferCreatedTimestamp, final long maxDeviation) { - this.bufferCreatedTimestamp = bufferCreatedTimestamp; - this.deviationFactor = maxDeviation; - } - - /** - * Insert a trace into this buffer. - * - * @param Trace - * to insert - */ - public void insertTrace(final WorkflowTrace trace) { - if (LOG.isDebugEnabled()) { - LOG.debug("Inserting into AgglomerationBuffer: " + trace.toString()); - } - synchronized (this) { - this.traces.add(trace); - } - } - - /** - * Agglomerate all traces that can and should be agglomerated. - * - * @return List of agglomerated traces and statistic outliers. - */ - public List<WorkflowTrace> processBuffer() { - final List<WorkflowTrace> processed = new ArrayList<WorkflowTrace>(); - final StatisticInformation tmpRuntime; - - synchronized (this) { - tmpRuntime = this.getTempBufferStatistic(); - for (final WorkflowTrace trace : this.traces) { - - // If deviationFactor is negative do not care about outliers. - if ((this.deviationFactor > 0) && this.isOutlier(tmpRuntime, trace.getRuntime())) { - if (LOG.isDebugEnabled()) { - LOG.debug("Statistical outlier: " + trace.toString()); - } - // Outlier. Do not agglomerate. - processed.add(trace); - } else { - // Add to agglomeratedTraces. - this.agglomerate(trace); - } - - } - - processed.add(this.agglomeratedTraces); - } - - return processed; - } - - /** - * Merge a trace with the agglomerated traces. - * - * @param trace - * Trace to agglomerate - */ - private void agglomerate(final WorkflowTrace trace) { - if (this.agglomeratedTraces == null) { - // Trace is new, add to map. - this.agglomeratedTraces = trace; - } else { - // Trace exists, merge statistics for every record. - final WorkflowRecord[] agglomeratedRecords = this.agglomeratedTraces.getTraceEvents(); - final WorkflowRecord[] records = trace.getTraceEvents(); - for (int i = 0; i < agglomeratedRecords.length; i++) { - agglomeratedRecords[i].getRuntime().merge(records[i].getRuntime()); - } - - this.agglomeratedTraces.getRuntime().merge(trace.getRuntime()); - } - } - - /** - * Detect outliers. - * - * @param averagedRuntimes - * Statistics to check against - * @param recordRuntime - * Data to check - * @return - * <code>true</code> if both are too different - */ - private boolean isOutlier(final StatisticInformation averagedRuntimes, final StatisticInformation recordRuntime) { - final long maxDeviation = averagedRuntimes.getStandardDeviation() * this.deviationFactor; - if (maxDeviation < 0) { - return true; - } - return Math.abs(recordRuntime.getAvg() - averagedRuntimes.getAvg()) > maxDeviation; - } - - /** - * @return Creation time of this buffer. - */ - public long getBufferCreatedTimestamp() { - synchronized (this) { - return this.bufferCreatedTimestamp; - } - } - - /** - * The the statistical runtime informations about a buffer so far. - * - * @return Runtime informations. - */ - private StatisticInformation getTempBufferStatistic() { - final StatisticInformation tmpStatistic = new StatisticInformation(); - - for (final WorkflowTrace trace : this.traces) { - tmpStatistic.merge(trace.getRuntime()); - - } - return tmpStatistic; - } - } - - /** - * Compares traces based on their content instead of traceIds. - * Order of comparisons: Process name -> Trace lenght -> Id of each node in trace. - * If it all matches both traces are similar and may be agglomerated, albeit TraceId and runtimes might differ. - * - * @author Florian Biß - * - */ - private static final class TraceComperator implements Comparator<WorkflowTrace>, Serializable { - private static final long serialVersionUID = 8920766818232517L; - private static final Log LOG = LogFactory.getLog(TraceComperator.class); - - /** - * Creates a new instance of this class. - */ - public TraceComperator() { - // default empty constructor - } - - /** - * {@inheritDoc} - */ - public int compare(final WorkflowTrace t1, final WorkflowTrace t2) { - final int compProcesses = t1.getProcessName().compareTo(t2.getProcessName()); - if (compProcesses != 0) { - return compProcesses; - } - - final WorkflowRecord[] recordsT1 = t1.getTraceEvents(); - final WorkflowRecord[] recordsT2 = t2.getTraceEvents(); - - if ((recordsT1.length - recordsT2.length) != 0) { - return recordsT1.length - recordsT2.length; - } - - // Records in traces are already sorted by orderIndex, only compare nodeIds. - for (int i = 0; i < recordsT1.length; i++) { - final WorkflowRecord r1 = recordsT1[i]; - final WorkflowRecord r2 = recordsT2[i]; - final long idDiff = r1.getNodeId() - r2.getNodeId(); - - if ((idDiff > Integer.MAX_VALUE) || (idDiff < Integer.MIN_VALUE)) { - LOG.warn("Overflow during thread comparison!"); - } - - if (idDiff != 0) { - return (int) (r1.getNodeId() - r2.getNodeId()); - } - } - - // All records match. - return 0; - } - } - -} diff --git a/src/kieker/analysis/plugin/reader/mq/Bits.java b/src/kieker/analysis/plugin/reader/mq/Bits.java deleted file mode 100644 index 1f4de169dfcd293803d5d749f1069a2c0b00a175..0000000000000000000000000000000000000000 --- a/src/kieker/analysis/plugin/reader/mq/Bits.java +++ /dev/null @@ -1,92 +0,0 @@ -package kieker.analysis.plugin.reader.mq; - -public class Bits { - - static boolean getBoolean(byte[] b, int off) { - return b[off] != 0; - } - - static char getChar(byte[] b, int off) { - return (char) ((b[off + 1] & 0xFF) + - (b[off] << 8)); - } - - static short getShort(byte[] b, int off) { - return (short) ((b[off + 1] & 0xFF) + - (b[off] << 8)); - } - - static int getInt(byte[] b, int off) { - return ((b[off + 3] & 0xFF) ) + - ((b[off + 2] & 0xFF) << 8) + - ((b[off + 1] & 0xFF) << 16) + - ((b[off ] ) << 24); - } - - static float getFloat(byte[] b, int off) { - return Float.intBitsToFloat(getInt(b, off)); - } - - static long getLong(byte[] b, int off) { - return ((b[off + 7] & 0xFFL) ) + - ((b[off + 6] & 0xFFL) << 8) + - ((b[off + 5] & 0xFFL) << 16) + - ((b[off + 4] & 0xFFL) << 24) + - ((b[off + 3] & 0xFFL) << 32) + - ((b[off + 2] & 0xFFL) << 40) + - ((b[off + 1] & 0xFFL) << 48) + - (((long) b[off]) << 56); - } - - static double getDouble(byte[] b, int off) { - return Double.longBitsToDouble(getLong(b, off)); - } - - public static byte getByte(byte[] b, int off) { - return b[off]; - } - - static void putBoolean(byte[] b, int off, boolean val) { - b[off] = (byte) (val ? 1 : 0); - } - - static void putChar(byte[] b, int off, char val) { - b[off + 1] = (byte) (val ); - b[off ] = (byte) (val >>> 8); - } - - static void putShort(byte[] b, int off, short val) { - b[off + 1] = (byte) (val ); - b[off ] = (byte) (val >>> 8); - } - - static void putInt(byte[] b, int off, int val) { - b[off + 3] = (byte) (val ); - b[off + 2] = (byte) (val >>> 8); - b[off + 1] = (byte) (val >>> 16); - b[off ] = (byte) (val >>> 24); - } - - static void putFloat(byte[] b, int off, float val) { - putInt(b, off, Float.floatToIntBits(val)); - } - - static void putLong(byte[] b, int off, long val) { - b[off + 7] = (byte) (val ); - b[off + 6] = (byte) (val >>> 8); - b[off + 5] = (byte) (val >>> 16); - b[off + 4] = (byte) (val >>> 24); - b[off + 3] = (byte) (val >>> 32); - b[off + 2] = (byte) (val >>> 40); - b[off + 1] = (byte) (val >>> 48); - b[off ] = (byte) (val >>> 56); - } - - static void putDouble(byte[] b, int off, double val) { - putLong(b, off, Double.doubleToLongBits(val)); - } - - public static void putByte(byte[] b, int off, byte val) { - b[off] = val; - } -} diff --git a/src/kieker/analysis/plugin/reader/mq/RabbitMQReader.java b/src/kieker/analysis/plugin/reader/mq/RabbitMQReader.java index 89d6125790ec0887e61f206037e44e39da7c0bde..a4d0323e04dc3db6fd84d5f3c78e82a2bc0aed34 100644 --- a/src/kieker/analysis/plugin/reader/mq/RabbitMQReader.java +++ b/src/kieker/analysis/plugin/reader/mq/RabbitMQReader.java @@ -1,12 +1,12 @@ /*************************************************************************** * Copyright 2013 Kieker Project (http://kieker-monitoring.net) - * + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -20,18 +20,9 @@ import java.io.IOException; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; - -import com.rabbitmq.client.Channel; -import com.rabbitmq.client.Connection; -import com.rabbitmq.client.ConnectionFactory; -import com.rabbitmq.client.ConsumerCancelledException; -import com.rabbitmq.client.QueueingConsumer; -import com.rabbitmq.client.ShutdownSignalException; - import kieker.analysis.IProjectContext; -import kieker.analysis.plugin.annotation.OutputPort; -import kieker.analysis.plugin.annotation.Plugin; -import kieker.analysis.plugin.annotation.Property; +import kieker.analysis.plugin.Bits; +import kieker.analysis.plugin.annotation.*; import kieker.analysis.plugin.reader.AbstractReaderPlugin; import kieker.common.configuration.Configuration; import kieker.common.exception.MonitoringRecordException; @@ -39,7 +30,7 @@ import kieker.common.logging.Log; import kieker.common.logging.LogFactory; import kieker.common.record.AbstractMonitoringRecord; import kieker.common.record.IMonitoringRecord; -import kieker.analysis.plugin.reader.mq.Bits; +import com.rabbitmq.client.*; /** * Reads monitoring records from the queue of an established RabbitMQ @@ -50,346 +41,365 @@ import kieker.analysis.plugin.reader.mq.Bits; * @since 1.8 */ @Plugin(description = "A reader which reads records from a RabbitMQ queue", dependencies = "This plugin needs the file 'rabbitmq.client-*.jar'.", outputPorts = { @OutputPort(name = RabbitMQReader.OUTPUT_PORT_NAME_RECORDS, eventTypes = { IMonitoringRecord.class }, description = "Output Port of the JMSReader") }, configuration = { - @Property(name = RabbitMQReader.CONFIG_PROPERTY_NAME_PROVIDERURL, defaultValue = "localhost"), - @Property(name = RabbitMQReader.CONFIG_PROPERTY_NAME_QUEUE, defaultValue = "analysis"), - @Property(name = RabbitMQReader.CONFIG_PROPERTY_PASSWORD, defaultValue = "guest"), - @Property(name = RabbitMQReader.CONFIG_PROPERTY_USER, defaultValue = "guest"), - @Property(name = RabbitMQReader.CONFIG_PROPERTY_PORT, defaultValue = "5672") + @Property(name = RabbitMQReader.CONFIG_PROPERTY_NAME_PROVIDERURL, defaultValue = "localhost"), + @Property(name = RabbitMQReader.CONFIG_PROPERTY_NAME_QUEUE, defaultValue = "analysis"), + @Property(name = RabbitMQReader.CONFIG_PROPERTY_PASSWORD, defaultValue = "guest"), + @Property(name = RabbitMQReader.CONFIG_PROPERTY_USER, defaultValue = "guest"), + @Property(name = RabbitMQReader.CONFIG_PROPERTY_PORT, defaultValue = "5672") }) public final class RabbitMQReader extends AbstractReaderPlugin { - /** The name of the output port delivering the received records. */ - public static final String OUTPUT_PORT_NAME_RECORDS = "monitoringRecords"; - /** The name of the configuration determining the RabbitMQ provider URL. */ - public static final String CONFIG_PROPERTY_NAME_PROVIDERURL = "mqProviderUrl"; - /** - * The name of the configuration determining the RabbitMQ Queue (e.g. - * queue1). - */ - public static final String CONFIG_PROPERTY_NAME_QUEUE = "mqDestination"; - /** The username that is used to connect to a queue. */ - public static final String CONFIG_PROPERTY_USER = "guest"; - /** The password that is used to connect to a queue. */ - public static final String CONFIG_PROPERTY_PASSWORD = "guest"; - /** The port that is used to connect to a queue. */ - public static final String CONFIG_PROPERTY_PORT = "5672"; - - static final Log LOG = LogFactory.getLog(RabbitMQReader.class); // NOPMD - // package - // for inner - // class - - private final String providerUrl; - private final String queueName; - private final String password; - private final String username; - private final int port; - private Connection connection; - private Channel channel; - private ConnectionFactory factory; - private QueueingConsumer normalConsumer; - - private final CountDownLatch cdLatch = new CountDownLatch(1); - - private final Map<Integer, String> stringRegistry = new ConcurrentHashMap<Integer, String>(); - - private RabbitMQRegistryConsumer registryConsumer; - - /** - * Creates a new instance of this class using the given parameters. - * - * @param configuration - * The configuration used to initialize the whole reader. Keep in - * mind that the configuration should contain the following - * properties: - * <ul> - * <li>The property {@link #CONFIG_PROPERTY_NAME_PROVIDERURL}, - * e.g. {@code localhost} - * <li>The property {@link #CONFIG_PROPERTY_NAME_QUEUE}, e.g. - * {@code queue1} - * <li>The property {@link #CONFIG_PROPERTY_PASSWORD}, e.g. - * {@code password} - * <li>The property {@link #CONFIG_PROPERTY_USER}, e.g. - * {@code username} - * <li>The property {@link #CONFIG_PROPERTY_PORT}, e.g. - * {@code port} - * </ul> - * @param projectContext - * The project context for this component. - * - * @throws IllegalArgumentException - * If one of the properties is empty. - */ - public RabbitMQReader(final Configuration configuration, - final IProjectContext projectContext) - throws IllegalArgumentException { - super(configuration, projectContext); - - // Initialize the reader bases on the given configuration. - this.providerUrl = configuration - .getStringProperty(CONFIG_PROPERTY_NAME_PROVIDERURL); - this.queueName = configuration - .getStringProperty(CONFIG_PROPERTY_NAME_QUEUE); - this.username = configuration.getStringProperty(CONFIG_PROPERTY_USER); - this.password = configuration - .getStringProperty(CONFIG_PROPERTY_PASSWORD); - this.port = configuration.getIntProperty(CONFIG_PROPERTY_PORT); - // simple sanity check - if ((this.providerUrl.length() == 0) || (this.queueName.length() == 0)) { - throw new IllegalArgumentException( - "RabbitMQReader has not sufficient parameters. providerUrl ('" - + this.providerUrl + "') or destination ('" - + this.queueName + "'), is null"); - } - - registryConsumer = new RabbitMQRegistryConsumer(this, providerUrl, - "registryRecords", username, password, port); - registryConsumer.start(); - } - - /** - * Creates a new instance of this class using the given parameters. - * - * @param configuration - * The configuration used to initialize the whole reader. Keep in - * mind that the configuration should contain the following - * properties: - * <ul> - * <li>The property {@link #CONFIG_PROPERTY_NAME_PROVIDERURL}, - * e.g. {@code localhost} - * <li>The property {@link #CONFIG_PROPERTY_NAME_QUEUE}, e.g. - * {@code queue1} - * <li>The property {@link #CONFIG_PROPERTY_PASSWORD}, e.g. - * {@code password} - * <li>The property {@link #CONFIG_PROPERTY_USER}, e.g. - * {@code username} - * </ul> - * - * @throws IllegalArgumentException - * If one of the properties is empty. - * - * @deprecated To be removed in Kieker 1.8. - */ - @Deprecated - public RabbitMQReader(final Configuration configuration) - throws IllegalArgumentException { - this(configuration, null); - } - - /** - * A call to this method is a blocking call. - * - * @return true if the method succeeds, false otherwise. - */ - public boolean read() { - boolean retVal = true; - try { - createConnectionFactory(); - connect(); - while (!Thread.interrupted()) { - - if (!this.connection.isOpen() || !this.channel.isOpen()) { - this.reconnect(); - } - - final QueueingConsumer.Delivery delivery = this.normalConsumer - .nextDelivery(); - - byte[] batchedMessages = delivery.getBody(); - messagesfromByteArray(batchedMessages, 0); - - this.normalConsumer.getChannel().basicAck( - delivery.getEnvelope().getDeliveryTag(), false); - } - } catch (final IOException ex) { // NOPMD NOCS (IllegalCatchCheck) - LOG.error("Error in read()", ex); - retVal = false; - } catch (final ShutdownSignalException e) { - LOG.error("Error in read(): ShutdownSignal Exception", e); - retVal = false; - } catch (final ConsumerCancelledException e) { - LOG.error("Error in read(): ConsumerCancelled Exception", e); - retVal = false; - } catch (final InterruptedException e) { - LOG.error("Error in read(): Interrupted Exception", e); - retVal = false; - } finally { - try { - if (this.connection != null) { - this.connection.close(); - } - } catch (final IOException e) { - LOG.error("Error in read()", e); - } - } - return retVal; - } - - private void createConnectionFactory() { - this.factory = new ConnectionFactory(); - this.factory.setHost(this.providerUrl); - this.factory.setPort(this.port); - this.factory.setConnectionTimeout(0); - this.factory.setUsername(this.username); - this.factory.setPassword(this.password); - } - - private void connect() throws IOException { - this.connection = this.factory.newConnection(); - this.channel = this.connection.createChannel(); - - this.channel.queueDeclare(this.queueName, false, false, false, null); - - this.normalConsumer = new QueueingConsumer(this.channel); - this.channel.basicConsume(this.queueName, false, this.normalConsumer); - this.channel.basicQos(10); - } - - private void reconnect() { - try { - connect(); - } catch (final IOException e) { - RabbitMQReader.LOG.error("Error reestablishing connection", e); - } - } - - private void messagesfromByteArray(final byte[] b, int initOffset) { - int offset = initOffset; - - while (Bits.getInt(b, offset) != -1) { - int firstValue = Bits.getInt(b, offset); - offset += 4; - - String classname = this.getStringFromRegistry(firstValue); - - Class<? extends IMonitoringRecord> clazz = null; - Class<?>[] recordTypes = null; - try { - clazz = AbstractMonitoringRecord.classForName(classname); - recordTypes = AbstractMonitoringRecord.typesForClass(clazz); - } catch (MonitoringRecordException e) { - LOG.error("could not create record", e); - } - - final long loggingTimestamp = Bits.getLong(b, offset); - offset += 8; - - final Object[] values = new Object[recordTypes.length]; - int valueIndex = 0; - - for (int i = 0; i < recordTypes.length; i++) { - if (recordTypes[i] == String.class) { - values[valueIndex] = this.getStringFromRegistry(Bits - .getInt(b, offset)); - offset += 4; - valueIndex++; - } else if (recordTypes[i] == Integer.class - || recordTypes[i] == int.class) { - values[valueIndex] = Bits.getInt(b, offset); - offset += 4; - valueIndex++; - } else if (recordTypes[i] == Long.class - || recordTypes[i] == long.class) { - values[valueIndex] = Bits.getLong(b, offset); - offset += 8; - valueIndex++; - } else if (recordTypes[i] == Float.class - || recordTypes[i] == float.class) { - values[valueIndex] = Bits.getFloat(b, offset); - offset += 4; - valueIndex++; - } else if (recordTypes[i] == Double.class - || recordTypes[i] == double.class) { - values[valueIndex] = Bits.getDouble(b, offset); - offset += 8; - valueIndex++; - } else if (recordTypes[i] == Byte.class - || recordTypes[i] == byte.class) { - values[valueIndex] = Bits.getByte(b, offset); - offset += 1; - valueIndex++; - } else if (recordTypes[i] == Short.class - || recordTypes[i] == short.class) { - values[valueIndex] = Bits.getShort(b, offset); - offset += 2; - valueIndex++; - } else if (recordTypes[i] == boolean.class - || recordTypes[i] == Boolean.class) { - values[valueIndex] = Bits.getBoolean(b, offset); - offset += 1; - valueIndex++; - } else { - values[valueIndex] = Bits.getByte(b, offset); - offset += 1; - valueIndex++; - } - } - IMonitoringRecord record = null; - try { - record = AbstractMonitoringRecord - .createFromArray(clazz, values); - } catch (MonitoringRecordException e) { - LOG.error("could not create record", e); - } - record.setLoggingTimestamp(loggingTimestamp); - - if (!super.deliver(OUTPUT_PORT_NAME_RECORDS, record)) { - LOG.error("deliverRecord returned false"); - } - } - } - - final void unblock() { // NOPMD (package visible for inner class) - this.cdLatch.countDown(); - } - - /** - * {@inheritDoc} - */ - public void terminate(final boolean error) { - LOG.info("Shutdown of RabbitMQReader requested."); - registryConsumer.interrupt(); - this.unblock(); - } - - /** - * {@inheritDoc} - */ - @Override - public Configuration getCurrentConfiguration() { - final Configuration configuration = new Configuration(); - - configuration.setProperty(CONFIG_PROPERTY_NAME_PROVIDERURL, - this.providerUrl); - configuration.setProperty(CONFIG_PROPERTY_NAME_QUEUE, this.queueName); - configuration.setProperty(CONFIG_PROPERTY_USER, this.username); - configuration.setProperty(CONFIG_PROPERTY_PASSWORD, this.password); - - return configuration; - } - - public void addToRegistry(Integer key, String value) { - stringRegistry.put(key, value); - - synchronized (this) { - this.notifyAll(); - } - } - - private String getStringFromRegistry(Integer id) { - String result = stringRegistry.get(id); - while (result == null) { - try { - synchronized (this) { - this.wait(); - } - } catch (InterruptedException e) { - e.printStackTrace(); - } - result = stringRegistry.get(id); - } - - return result; - } + /** The name of the output port delivering the received records. */ + public static final String OUTPUT_PORT_NAME_RECORDS = "monitoringRecords"; + /** The name of the configuration determining the RabbitMQ provider URL. */ + public static final String CONFIG_PROPERTY_NAME_PROVIDERURL = "mqProviderUrl"; + /** + * The name of the configuration determining the RabbitMQ Queue (e.g. + * queue1). + */ + public static final String CONFIG_PROPERTY_NAME_QUEUE = "mqDestination"; + /** The username that is used to connect to a queue. */ + public static final String CONFIG_PROPERTY_USER = "guest"; + /** The password that is used to connect to a queue. */ + public static final String CONFIG_PROPERTY_PASSWORD = "guest"; + /** The port that is used to connect to a queue. */ + public static final String CONFIG_PROPERTY_PORT = "5672"; + + static final Log LOG = LogFactory + .getLog(RabbitMQReader.class); // NOPMD + // package + // for + // inner + // class + + private final String providerUrl; + private final String queueName; + private final String password; + private final String username; + private final int port; + private Connection connection; + private Channel channel; + private ConnectionFactory factory; + private QueueingConsumer normalConsumer; + + private final CountDownLatch cdLatch = new CountDownLatch( + 1); + + private final Map<Integer, String> stringRegistry = new ConcurrentHashMap<Integer, String>(); + + private final RabbitMQRegistryConsumer registryConsumer; + + /** + * Creates a new instance of this class using the given parameters. + * + * @param configuration + * The configuration used to initialize the whole reader. Keep in + * mind that the configuration should contain the following + * properties: + * <ul> + * <li>The property {@link #CONFIG_PROPERTY_NAME_PROVIDERURL}, + * e.g. {@code localhost} + * <li>The property {@link #CONFIG_PROPERTY_NAME_QUEUE}, e.g. + * {@code queue1} + * <li>The property {@link #CONFIG_PROPERTY_PASSWORD}, e.g. + * {@code password} + * <li>The property {@link #CONFIG_PROPERTY_USER}, e.g. + * {@code username} + * <li>The property {@link #CONFIG_PROPERTY_PORT}, e.g. + * {@code port} + * </ul> + * @param projectContext + * The project context for this component. + * + * @throws IllegalArgumentException + * If one of the properties is empty. + */ + public RabbitMQReader(final Configuration configuration, + final IProjectContext projectContext) + throws IllegalArgumentException { + super(configuration, projectContext); + + // Initialize the reader bases on the given configuration. + providerUrl = configuration + .getStringProperty(CONFIG_PROPERTY_NAME_PROVIDERURL); + queueName = configuration.getStringProperty(CONFIG_PROPERTY_NAME_QUEUE); + username = configuration.getStringProperty(CONFIG_PROPERTY_USER); + password = configuration.getStringProperty(CONFIG_PROPERTY_PASSWORD); + port = configuration.getIntProperty(CONFIG_PROPERTY_PORT); + // simple sanity check + if ((providerUrl.length() == 0) || (queueName.length() == 0)) { + throw new IllegalArgumentException( + "RabbitMQReader has not sufficient parameters. providerUrl ('" + + providerUrl + "') or destination ('" + queueName + + "'), is null"); + } + + registryConsumer = new RabbitMQRegistryConsumer(this, providerUrl, + "registryRecords", username, password, port); + registryConsumer.start(); + } + + /** + * Creates a new instance of this class using the given parameters. + * + * @param configuration + * The configuration used to initialize the whole reader. Keep in + * mind that the configuration should contain the following + * properties: + * <ul> + * <li>The property {@link #CONFIG_PROPERTY_NAME_PROVIDERURL}, + * e.g. {@code localhost} + * <li>The property {@link #CONFIG_PROPERTY_NAME_QUEUE}, e.g. + * {@code queue1} + * <li>The property {@link #CONFIG_PROPERTY_PASSWORD}, e.g. + * {@code password} + * <li>The property {@link #CONFIG_PROPERTY_USER}, e.g. + * {@code username} + * </ul> + * + * @throws IllegalArgumentException + * If one of the properties is empty. + * + * @deprecated To be removed in Kieker 1.8. + */ + @Deprecated + public RabbitMQReader(final Configuration configuration) + throws IllegalArgumentException { + this(configuration, null); + } + + /** + * A call to this method is a blocking call. + * + * @return true if the method succeeds, false otherwise. + */ + public boolean read() { + boolean retVal = true; + try { + createConnectionFactory(); + connect(); + while (!Thread.interrupted()) { + + if (!connection.isOpen() || !channel.isOpen()) { + reconnect(); + } + + final QueueingConsumer.Delivery delivery = normalConsumer + .nextDelivery(); + + byte[] batchedMessages = delivery.getBody(); + messagesfromByteArray(batchedMessages, 0); + + normalConsumer.getChannel().basicAck( + delivery.getEnvelope().getDeliveryTag(), false); + } + } + catch (final IOException ex) { // NOPMD NOCS (IllegalCatchCheck) + LOG.error("Error in read()", ex); + retVal = false; + } + catch (final ShutdownSignalException e) { + LOG.error("Error in read(): ShutdownSignal Exception", e); + retVal = false; + } + catch (final ConsumerCancelledException e) { + LOG.error("Error in read(): ConsumerCancelled Exception", e); + retVal = false; + } + catch (final InterruptedException e) { + LOG.error("Error in read(): Interrupted Exception", e); + retVal = false; + } + finally { + try { + if (connection != null) { + connection.close(); + } + } + catch (final IOException e) { + LOG.error("Error in read()", e); + } + } + return retVal; + } + + private void createConnectionFactory() { + factory = new ConnectionFactory(); + factory.setHost(providerUrl); + factory.setPort(port); + factory.setConnectionTimeout(0); + factory.setUsername(username); + factory.setPassword(password); + } + + private void connect() throws IOException { + connection = factory.newConnection(); + channel = connection.createChannel(); + + channel.queueDeclare(queueName, false, false, false, null); + + normalConsumer = new QueueingConsumer(channel); + channel.basicConsume(queueName, false, normalConsumer); + channel.basicQos(10); + } + + private void reconnect() { + try { + connect(); + } + catch (final IOException e) { + RabbitMQReader.LOG.error("Error reestablishing connection", e); + } + } + + private void messagesfromByteArray(final byte[] b, int initOffset) { + int offset = initOffset; + + while (Bits.getInt(b, offset) != -1) { + int firstValue = Bits.getInt(b, offset); + offset += 4; + + String classname = getStringFromRegistry(firstValue); + + Class<? extends IMonitoringRecord> clazz = null; + Class<?>[] recordTypes = null; + try { + clazz = AbstractMonitoringRecord.classForName(classname); + recordTypes = AbstractMonitoringRecord.typesForClass(clazz); + } + catch (MonitoringRecordException e) { + LOG.error("could not create record", e); + } + + final long loggingTimestamp = Bits.getLong(b, offset); + offset += 8; + + final Object[] values = new Object[recordTypes.length]; + int valueIndex = 0; + + for (Class<?> recordType : recordTypes) { + if (recordType == String.class) { + values[valueIndex] = getStringFromRegistry(Bits.getInt(b, + offset)); + offset += 4; + valueIndex++; + } + else if ((recordType == Integer.class) + || (recordType == int.class)) { + values[valueIndex] = Bits.getInt(b, offset); + offset += 4; + valueIndex++; + } + else if ((recordType == Long.class) + || (recordType == long.class)) { + values[valueIndex] = Bits.getLong(b, offset); + offset += 8; + valueIndex++; + } + else if ((recordType == Float.class) + || (recordType == float.class)) { + values[valueIndex] = Bits.getFloat(b, offset); + offset += 4; + valueIndex++; + } + else if ((recordType == Double.class) + || (recordType == double.class)) { + values[valueIndex] = Bits.getDouble(b, offset); + offset += 8; + valueIndex++; + } + else if ((recordType == Byte.class) + || (recordType == byte.class)) { + values[valueIndex] = Bits.getByte(b, offset); + offset += 1; + valueIndex++; + } + else if ((recordType == Short.class) + || (recordType == short.class)) { + values[valueIndex] = Bits.getShort(b, offset); + offset += 2; + valueIndex++; + } + else if ((recordType == boolean.class) + || (recordType == Boolean.class)) { + values[valueIndex] = Bits.getBoolean(b, offset); + offset += 1; + valueIndex++; + } + else { + values[valueIndex] = Bits.getByte(b, offset); + offset += 1; + valueIndex++; + } + } + IMonitoringRecord record = null; + try { + record = AbstractMonitoringRecord + .createFromArray(clazz, values); + } + catch (MonitoringRecordException e) { + LOG.error("could not create record", e); + } + record.setLoggingTimestamp(loggingTimestamp); + + if (!super.deliver(OUTPUT_PORT_NAME_RECORDS, record)) { + LOG.error("deliverRecord returned false"); + } + } + } + + final void unblock() { // NOPMD (package visible for inner class) + cdLatch.countDown(); + } + + /** + * {@inheritDoc} + */ + public void terminate(final boolean error) { + LOG.info("Shutdown of RabbitMQReader requested."); + registryConsumer.interrupt(); + unblock(); + } + + /** + * {@inheritDoc} + */ + @Override + public Configuration getCurrentConfiguration() { + final Configuration configuration = new Configuration(); + + configuration + .setProperty(CONFIG_PROPERTY_NAME_PROVIDERURL, providerUrl); + configuration.setProperty(CONFIG_PROPERTY_NAME_QUEUE, queueName); + configuration.setProperty(CONFIG_PROPERTY_USER, username); + configuration.setProperty(CONFIG_PROPERTY_PASSWORD, password); + + return configuration; + } + + public void addToRegistry(Integer key, String value) { + stringRegistry.put(key, value); + + synchronized (this) { + notifyAll(); + } + } + + private String getStringFromRegistry(Integer id) { + String result = stringRegistry.get(id); + while (result == null) { + try { + synchronized (this) { + this.wait(); + } + } + catch (InterruptedException e) { + e.printStackTrace(); + } + result = stringRegistry.get(id); + } + + return result; + } }