Skip to content
Snippets Groups Projects
Commit 13534d18 authored by Florian Fittkau's avatar Florian Fittkau
Browse files

hpc monitoring

parent 1fd33438
No related branches found
No related tags found
No related merge requests found
Showing
with 2276 additions and 1657 deletions
......@@ -2,9 +2,10 @@
<classpath>
<classpathentry kind="con" path="org.eclipse.xtend.XTEND_CONTAINER"/>
<classpathentry kind="src" path="xtend-gen"/>
<classpathentry excluding="kieker/analysis/plugin/filter/flow/" kind="src" path="src"/>
<classpathentry kind="src" path="src"/>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
<classpathentry kind="lib" path="lib/rabbitmq-client.jar"/>
<classpathentry combineaccessrules="false" kind="src" path="/kieker"/>
<classpathentry kind="lib" path="lib/kieker-1.8-SNAPSHOT.jar"/>
<classpathentry combineaccessrules="false" kind="src" path="/monitored-application"/>
<classpathentry kind="output" path="bin"/>
</classpath>
/***************************************************************************
* Copyright 2013 Kieker Project (http://kieker-monitoring.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
***************************************************************************/
package explorviz.hpc_monitoring.connector;
import java.io.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import kieker.analysis.IProjectContext;
import kieker.analysis.plugin.Bits;
import kieker.analysis.plugin.annotation.*;
import kieker.analysis.plugin.filter.AbstractFilterPlugin;
import kieker.common.configuration.Configuration;
import kieker.common.logging.Log;
import kieker.common.logging.LogFactory;
import kieker.common.record.IMonitoringRecord;
import com.rabbitmq.client.*;
import explorviz.hpc_monitoring.StringRegistryRecord;
/**
* A plugin used for kieker in the cloud.
* All incoming events are put into a RabbitMQ, but are also passed to an output
* port that can be used for
* testing purposes.
*
* @author Santje Finke
*
* @since 1.8
*
*/
@Plugin(description = "A filter that writes all incoming events into a specified queue on a specified RabbitMQServer", outputPorts = { @OutputPort(name = RabbitMQConnector.OUTPUT_PORT_NAME, eventTypes = { Object.class }, description = "Provides each incoming object") }, configuration = {
@Property(name = RabbitMQConnector.CONFIG_PROPERTY_NAME_PROVIDER, defaultValue = "localhost"),
@Property(name = RabbitMQConnector.CONFIG_PROPERTY_NAME_QUEUE, defaultValue = "master"),
@Property(name = RabbitMQConnector.CONFIG_PROPERTY_PASSWORD, defaultValue = "guest"),
@Property(name = RabbitMQConnector.CONFIG_PROPERTY_USER, defaultValue = "guest") })
public class RabbitMQConnector extends AbstractFilterPlugin {
private static final int MESSAGE_BUFFER_SIZE = 1500;
/**
* The name of the input port receiving the incoming events.
*/
public static final String INPUT_PORT_NAME_INVALID_TRACES = "inputInvalidTraces";
public static final String INPUT_PORT_NAME_VALID_TRACES = "inputValidTraces";
/**
* The name of the output port passing the incoming events.
*/
public static final String OUTPUT_PORT_NAME = "relayedEvents";
/**
* The name of the property determining the address of the used Server.
*/
public static final String CONFIG_PROPERTY_NAME_PROVIDER = "providerUrl";
/**
* The name of the property determining the name of the Queue.
*/
public static final String CONFIG_PROPERTY_NAME_QUEUE = "queueName";
/**
* The username that is used to connect to a queue.
*/
public static final String CONFIG_PROPERTY_USER = "guest";
/**
* The password that is used to connect to a queue.
*/
public static final String CONFIG_PROPERTY_PASSWORD = "guest";
private static final Log LOG = LogFactory
.getLog(RabbitMQConnector.class);
private ConnectionFactory factory;
private Connection connection;
private final String providerUrl;
private Channel channel;
private final String queue;
private String username = "guest";
private String password = "guest";
private final byte[] validTracesMessages = new byte[MESSAGE_BUFFER_SIZE];
private int validTracesMessagesOffset = 0;
private final byte[] invalidTracesMessages = new byte[MESSAGE_BUFFER_SIZE];
private int invalidTracesMessagesOffset = 0;
private final ConcurrentHashMap<String, Integer> stringReg = new ConcurrentHashMap<String, Integer>(
16,
0.75f,
2);
private final AtomicInteger stringRegIndex = new AtomicInteger(
0);
public RabbitMQConnector(final Configuration configuration,
final IProjectContext projectContext) {
super(configuration, projectContext);
providerUrl = configuration
.getStringProperty(CONFIG_PROPERTY_NAME_PROVIDER);
queue = configuration.getStringProperty(CONFIG_PROPERTY_NAME_QUEUE);
username = configuration.getStringProperty(CONFIG_PROPERTY_USER);
password = configuration.getStringProperty(CONFIG_PROPERTY_PASSWORD);
createConnectionFactory(providerUrl);
try {
connect();
}
catch (IOException e) {
e.printStackTrace();
}
}
private void createConnectionFactory(final String provider) {
factory = new ConnectionFactory();
factory.setHost(provider);
factory.setConnectionTimeout(0);
factory.setUsername(username);
factory.setPassword(password);
}
private void connect() throws IOException {
connection = factory.newConnection();
channel = connection.createChannel();
channel.queueDeclare("validTracesMaster", false, false, false, null);
channel.queueDeclare("invalidTracesMaster", false, false, false, null);
channel.queueDeclare("registryRecordsMaster", false, false, false, null);
}
private void sendRegistryRecord(StringRegistryRecord record) {
// TODO propagate to each new rabbitMQ queue
final ByteArrayOutputStream boas = new ByteArrayOutputStream();
ObjectOutputStream out;
try {
out = new ObjectOutputStream(boas);
out.writeObject(record);
out.close();
byte[] message2 = boas.toByteArray();
sendMessage(message2, "registryRecordsMaster");
}
catch (IOException e) {
e.printStackTrace();
}
}
/**
* This method represents the input port of this filter.
*
* @param event
* The next event.
*/
@InputPort(name = INPUT_PORT_NAME_VALID_TRACES, eventTypes = { Object.class }, description = "Receives incoming objects to be forwarded to a queue")
public final void inputValidTraces(final Object monitoringRecord) {
try {
// if (monitoringRecord instanceof IMonitoringRecord) {
// byte[] message2 = toByteArray((IMonitoringRecord)
// monitoringRecord);
final ByteArrayOutputStream boas = new ByteArrayOutputStream();
ObjectOutputStream out = new ObjectOutputStream(boas);
out.writeObject(monitoringRecord);
out.close();
byte[] message2 = boas.toByteArray(); // TODO optimize
System.arraycopy(message2, 0, validTracesMessages,
validTracesMessagesOffset, message2.length);
validTracesMessagesOffset += message2.length;
if ((validTracesMessagesOffset > (MESSAGE_BUFFER_SIZE - MESSAGE_BUFFER_SIZE))) { // TODO
// unsafe
// 200
// ||
// writeQueue.isEmpty()
Bits.putInt(validTracesMessages, validTracesMessagesOffset, -1);
sendMessage(validTracesMessages, "validTracesMaster");
validTracesMessagesOffset = 0;
}
}
catch (final IOException e) {
LOG.error("Error sending record", e);
}
}
/**
* This method represents the input port of this filter.
*
* @param event
* The next event.
*/
@InputPort(name = INPUT_PORT_NAME_INVALID_TRACES, eventTypes = { Object.class }, description = "Receives incoming objects to be forwarded to a queue")
public final void inputInvalidTraces(final Object monitoringRecord) {
try {
// if (monitoringRecord instanceof IMonitoringRecord) {
// byte[] message2 = toByteArray((IMonitoringRecord)
// monitoringRecord);
final ByteArrayOutputStream boas = new ByteArrayOutputStream();
ObjectOutputStream out = new ObjectOutputStream(boas);
out.writeObject(monitoringRecord);
out.close();
byte[] message2 = boas.toByteArray(); // TODO optimize
System.arraycopy(message2, 0, invalidTracesMessages,
invalidTracesMessagesOffset, message2.length);
invalidTracesMessagesOffset += message2.length;
if ((invalidTracesMessagesOffset > (MESSAGE_BUFFER_SIZE - MESSAGE_BUFFER_SIZE))) { // TODO
// unsafe
// 200
// ||
// writeQueue.isEmpty()
Bits.putInt(invalidTracesMessages, invalidTracesMessagesOffset,
-1);
sendMessage(invalidTracesMessages, "invalidTracesMaster");
invalidTracesMessagesOffset = 0;
}
}
catch (final IOException e) {
LOG.error("Error sending record", e);
}
}
@SuppressWarnings("unused")
private byte[] toByteArray(final IMonitoringRecord monitoringRecord) {
final Class<?>[] recordTypes = monitoringRecord.getValueTypes();
int arraySize = 4 + 8;
for (Class<?> recordType : recordTypes) {
if (recordType == String.class) {
arraySize += 4;
}
else if ((recordType == Integer.class) || (recordType == int.class)) {
arraySize += 4;
}
else if ((recordType == Long.class) || (recordType == long.class)) {
arraySize += 8;
}
else if ((recordType == Float.class) || (recordType == float.class)) {
arraySize += 4;
}
else if ((recordType == Double.class)
|| (recordType == double.class)) {
arraySize += 8;
}
else if ((recordType == Byte.class) || (recordType == byte.class)) {
arraySize += 1;
}
else if ((recordType == Short.class) || (recordType == short.class)) {
arraySize += 2;
}
else if ((recordType == Boolean.class)
|| (recordType == boolean.class)) {
arraySize += 1;
}
else {
arraySize += 1;
}
}
byte[] result = new byte[arraySize];
int offset = 0;
Bits.putInt(result, offset, getIdForString(monitoringRecord.getClass()
.getName()));
offset += 4;
Bits.putLong(result, offset, monitoringRecord.getLoggingTimestamp());
offset += 8;
final Object[] recordFields = monitoringRecord.toArray();
for (int i = 0; i < recordFields.length; i++) {
if (recordFields[i] == null) {
if (recordTypes[i] == String.class) {
Bits.putInt(result, offset, getIdForString(""));
offset += 4;
}
else if ((recordTypes[i] == Integer.class)
|| (recordTypes[i] == int.class)) {
Bits.putInt(result, offset, 0);
offset += 4;
}
else if ((recordTypes[i] == Long.class)
|| (recordTypes[i] == long.class)) {
Bits.putLong(result, offset, 0L);
offset += 8;
}
else if ((recordTypes[i] == Float.class)
|| (recordTypes[i] == float.class)) {
Bits.putFloat(result, offset, 0);
offset += 4;
}
else if ((recordTypes[i] == Double.class)
|| (recordTypes[i] == double.class)) {
Bits.putDouble(result, offset, 0);
offset += 8;
}
else if ((recordTypes[i] == Byte.class)
|| (recordTypes[i] == byte.class)) {
Bits.putByte(result, offset, (byte) 0);
offset += 1;
}
else if ((recordTypes[i] == Short.class)
|| (recordTypes[i] == short.class)) {
Bits.putShort(result, offset, (short) 0);
offset += 2;
}
else if ((recordTypes[i] == Boolean.class)
|| (recordTypes[i] == boolean.class)) {
Bits.putBoolean(result, offset, false);
offset += 1;
}
else {
LOG.warn("Record with unsupported recordField of type "
+ recordFields[i].getClass());
Bits.putByte(result, offset, (byte) 0);
offset += 1;
}
}
else if (recordFields[i] instanceof String) {
Bits.putInt(result, offset,
getIdForString((String) recordFields[i]));
offset += 4;
}
else if (recordFields[i] instanceof Integer) {
Bits.putInt(result, offset, (Integer) recordFields[i]);
offset += 4;
}
else if (recordFields[i] instanceof Long) {
Bits.putLong(result, offset, (Long) recordFields[i]);
offset += 8;
}
else if (recordFields[i] instanceof Float) {
Bits.putFloat(result, offset, (Float) recordFields[i]);
offset += 4;
}
else if (recordFields[i] instanceof Double) {
Bits.putDouble(result, offset, (Double) recordFields[i]);
offset += 8;
}
else if (recordFields[i] instanceof Byte) {
Bits.putByte(result, offset, (Byte) recordFields[i]);
offset += 1;
}
else if (recordFields[i] instanceof Short) {
Bits.putShort(result, offset, (Short) recordFields[i]);
offset += 2;
}
else if (recordFields[i] instanceof Boolean) {
Bits.putBoolean(result, offset, (Boolean) recordFields[i]);
offset += 1;
}
else {
LOG.warn("Record with unsupported recordField of type "
+ recordFields[i].getClass());
Bits.putByte(result, offset, (byte) 0);
offset += 1;
}
}
return result;
}
public int getIdForString(String value) {
Integer result = stringReg.get(value);
if (result == null) {
result = stringRegIndex.getAndIncrement();
stringReg.put(value, result);
sendRegistryRecord(new StringRegistryRecord(result, value));
}
return result;
}
private void sendMessage(final byte[] message, String queueName)
throws IOException {
synchronized (this) {
if (!connection.isOpen() || !channel.isOpen()) {
connect();
}
channel.basicPublish("", queueName, MessageProperties.BASIC,
message);
}
}
protected final void cleanup() {
disconnect();
}
private void disconnect() {
try {
if (channel != null) {
channel.close();
}
}
catch (final IOException e) {
LOG.info("Error closing connection", e);
}
try {
if (connection != null) {
connection.close();
}
}
catch (final IOException e) {
LOG.info("Error closing connection", e);
}
}
@Override
public final String toString() {
final StringBuilder sb = new StringBuilder(128);
sb.append(super.toString());
sb.append("; Channel: '");
if (null != channel) {
sb.append(channel.toString());
}
else {
sb.append("null");
}
sb.append("'; Connection: '");
if (null != connection) {
sb.append(connection.toString());
}
else {
sb.append("null");
}
sb.append('\'');
return sb.toString();
}
@Override
public Configuration getCurrentConfiguration() {
final Configuration configuration = new Configuration();
configuration.setProperty(CONFIG_PROPERTY_NAME_PROVIDER, providerUrl);
configuration.setProperty(CONFIG_PROPERTY_NAME_QUEUE, queue);
configuration.setProperty(CONFIG_PROPERTY_PASSWORD, password);
configuration.setProperty(CONFIG_PROPERTY_USER, username);
return configuration;
}
}
/***************************************************************************
* Copyright 2013 Kieker Project (http://kieker-monitoring.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
***************************************************************************/
package explorviz.hpc_monitoring.plugin;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import kieker.analysis.IProjectContext;
import kieker.analysis.plugin.annotation.*;
import kieker.analysis.plugin.filter.AbstractFilterPlugin;
import kieker.common.configuration.Configuration;
import kieker.common.logging.Log;
import kieker.common.logging.LogFactory;
import kieker.common.record.flow.IFlowRecord;
import explorviz.hpc_monitoring.record.Trace;
import explorviz.hpc_monitoring.record.TraceEventRecords;
import explorviz.hpc_monitoring.record.events.*;
/**
* @author Jan Waller
*
* @since 1.6
*/
@Plugin(name = "Trace Reconstruction Filter (Event)", description = "Filter to reconstruct event based (flow) traces", outputPorts = {
@OutputPort(name = EventRecordTraceReconstructionFilter.OUTPUT_PORT_NAME_TRACE_VALID, description = "Outputs valid traces", eventTypes = { TraceEventRecords.class }),
@OutputPort(name = EventRecordTraceReconstructionFilter.OUTPUT_PORT_NAME_TRACE_INVALID, description = "Outputs traces missing crucial records", eventTypes = { TraceEventRecords.class }) }, configuration = {
@Property(name = EventRecordTraceReconstructionFilter.CONFIG_PROPERTY_NAME_TIMEUNIT, defaultValue = EventRecordTraceReconstructionFilter.CONFIG_PROPERTY_VALUE_TIMEUNIT),
@Property(name = EventRecordTraceReconstructionFilter.CONFIG_PROPERTY_NAME_MAX_TRACE_DURATION, defaultValue = EventRecordTraceReconstructionFilter.CONFIG_PROPERTY_VALUE_MAX_TIME),
@Property(name = EventRecordTraceReconstructionFilter.CONFIG_PROPERTY_NAME_MAX_TRACE_TIMEOUT, defaultValue = EventRecordTraceReconstructionFilter.CONFIG_PROPERTY_VALUE_MAX_TIME) })
public final class EventRecordTraceReconstructionFilter extends
AbstractFilterPlugin {
/**
* The name of the output port delivering the valid traces.
*/
public static final String OUTPUT_PORT_NAME_TRACE_VALID = "validTraces";
/**
* The name of the output port delivering the invalid traces.
*/
public static final String OUTPUT_PORT_NAME_TRACE_INVALID = "invalidTraces";
/**
* The name of the input port receiving the trace records.
*/
public static final String INPUT_PORT_NAME_TRACE_RECORDS = "traceRecords";
/**
* The name of the property determining the time unit.
*/
public static final String CONFIG_PROPERTY_NAME_TIMEUNIT = "timeunit";
/**
* The name of the property determining the maximal trace duration.
*/
public static final String CONFIG_PROPERTY_NAME_MAX_TRACE_DURATION = "maxTraceDuration";
/**
* The name of the property determining the maximal trace timeout.
*/
public static final String CONFIG_PROPERTY_NAME_MAX_TRACE_TIMEOUT = "maxTraceTimeout";
/**
* The default value of the properties for the maximal trace duration and
* timeout.
*/
public static final String CONFIG_PROPERTY_VALUE_MAX_TIME = "9223372036854775807"; // String.valueOf(Long.MAX_VALUE)
/**
* The default value of the time unit property (nanoseconds).
*/
public static final String CONFIG_PROPERTY_VALUE_TIMEUNIT = "NANOSECONDS"; // TimeUnit.NANOSECONDS.name()
private static final Log LOG = LogFactory
.getLog(EventRecordTraceReconstructionFilter.class);
private final TimeUnit timeunit;
private final long maxTraceDuration;
private final long maxTraceTimeout;
private final boolean timeout;
private long maxEncounteredLoggingTimestamp = -1;
private final Map<Long, TraceBuffer> traceId2trace;
/**
* Creates a new instance of this class using the given parameters.
*
* @param configuration
* The configuration for this component.
* @param projectContext
* The project context for this component.
*/
public EventRecordTraceReconstructionFilter(
final Configuration configuration,
final IProjectContext projectContext) {
super(configuration, projectContext);
final String recordTimeunitProperty = projectContext
.getProperty(IProjectContext.CONFIG_PROPERTY_NAME_RECORDS_TIME_UNIT);
TimeUnit recordTimeunit;
try {
recordTimeunit = TimeUnit.valueOf(recordTimeunitProperty);
}
catch (final IllegalArgumentException ex) { // already caught in
// AnalysisController,
// should never happen
LOG.warn(recordTimeunitProperty
+ " is no valid TimeUnit! Using NANOSECONDS instead.");
recordTimeunit = TimeUnit.NANOSECONDS;
}
timeunit = recordTimeunit;
final String configTimeunitProperty = configuration
.getStringProperty(CONFIG_PROPERTY_NAME_TIMEUNIT);
TimeUnit configTimeunit;
try {
configTimeunit = TimeUnit.valueOf(configTimeunitProperty);
}
catch (final IllegalArgumentException ex) {
LOG.warn(configTimeunitProperty
+ " is no valid TimeUnit! Using inherited value of "
+ timeunit.name() + " instead.");
configTimeunit = timeunit;
}
maxTraceDuration = timeunit.convert(configuration
.getLongProperty(CONFIG_PROPERTY_NAME_MAX_TRACE_DURATION),
configTimeunit);
maxTraceTimeout = timeunit.convert(configuration
.getLongProperty(CONFIG_PROPERTY_NAME_MAX_TRACE_TIMEOUT),
configTimeunit);
timeout = !((maxTraceTimeout == Long.MAX_VALUE) && (maxTraceDuration == Long.MAX_VALUE));
traceId2trace = new ConcurrentHashMap<Long, TraceBuffer>();
}
/**
* This method is the input port for the new events for this filter.
*
* @param record
* The new record to handle.
*/
@InputPort(name = INPUT_PORT_NAME_TRACE_RECORDS, description = "Reconstruct traces from incoming flow records", eventTypes = {
Trace.class, AbstractOperationEvent.class })
public void newEvent(final IFlowRecord record) {
final Long traceId;
TraceBuffer traceBuffer;
final long loggingTimestamp;
if (record instanceof Trace) {
traceId = ((Trace) record).getTraceId();
traceBuffer = traceId2trace.get(traceId);
if (traceBuffer == null) { // first record for this id!
synchronized (this) {
traceBuffer = traceId2trace.get(traceId);
if (traceBuffer == null) { // NOCS (DCL)
traceBuffer = new TraceBuffer();
traceId2trace.put(traceId, traceBuffer);
}
}
}
traceBuffer.setTrace((Trace) record);
loggingTimestamp = -1;
}
else if (record instanceof AbstractOperationEvent) {
traceId = ((AbstractOperationEvent) record).getTraceId();
traceBuffer = traceId2trace.get(traceId);
if (traceBuffer == null) { // first record for this id!
synchronized (this) {
traceBuffer = traceId2trace.get(traceId);
if (traceBuffer == null) { // NOCS (DCL)
traceBuffer = new TraceBuffer();
traceId2trace.put(traceId, traceBuffer);
}
}
}
traceBuffer.insertEvent((AbstractOperationEvent) record);
loggingTimestamp = ((AbstractOperationEvent) record)
.getLoggingTimestamp();
}
else {
return; // invalid type which should not happen due to the specified
// eventTypes
}
if (traceBuffer.isFinished()) {
synchronized (this) { // has to be synchronized because of timeout
// cleanup
traceId2trace.remove(traceId);
}
super.deliver(OUTPUT_PORT_NAME_TRACE_VALID,
traceBuffer.toTraceEvents());
}
if (timeout) {
synchronized (this) {
// can we assume a rough order of logging timestamps? (yes,
// except with DB reader)
if (loggingTimestamp > maxEncounteredLoggingTimestamp) {
maxEncounteredLoggingTimestamp = loggingTimestamp;
}
processTimeoutQueue(maxEncounteredLoggingTimestamp);
}
}
}
/**
* {@inheritDoc}
*/
@Override
public void terminate(final boolean error) {
synchronized (this) {
for (final Entry<Long, TraceBuffer> entry : traceId2trace
.entrySet()) {
final TraceBuffer traceBuffer = entry.getValue();
if (traceBuffer.isInvalid()) {
super.deliver(OUTPUT_PORT_NAME_TRACE_INVALID,
traceBuffer.toTraceEvents());
}
else {
super.deliver(OUTPUT_PORT_NAME_TRACE_VALID,
traceBuffer.toTraceEvents());
}
}
traceId2trace.clear();
}
}
// only called within synchronized! We assume timestamps >= 0
private void processTimeoutQueue(final long timestamp) {
final long duration = timestamp - maxTraceDuration;
final long traceTimeout = timestamp - maxTraceTimeout;
for (final Iterator<Entry<Long, TraceBuffer>> iterator = traceId2trace
.entrySet().iterator(); iterator.hasNext();) {
final TraceBuffer traceBuffer = iterator.next().getValue();
if ((traceBuffer.getMaxLoggingTimestamp() <= traceTimeout) // long
// time
// no see
|| (traceBuffer.getMinLoggingTimestamp() <= duration)) { // max
// duration
// is
// gone
if (traceBuffer.isInvalid()) {
super.deliver(OUTPUT_PORT_NAME_TRACE_INVALID,
traceBuffer.toTraceEvents());
}
else {
super.deliver(OUTPUT_PORT_NAME_TRACE_VALID,
traceBuffer.toTraceEvents());
}
iterator.remove();
}
}
}
/**
* {@inheritDoc}
*/
@Override
public Configuration getCurrentConfiguration() {
final Configuration configuration = new Configuration();
configuration.setProperty(CONFIG_PROPERTY_NAME_TIMEUNIT,
timeunit.name());
configuration.setProperty(CONFIG_PROPERTY_NAME_MAX_TRACE_DURATION,
String.valueOf(maxTraceDuration));
configuration.setProperty(CONFIG_PROPERTY_NAME_MAX_TRACE_TIMEOUT,
String.valueOf(maxTraceTimeout));
return configuration;
}
/**
* The TraceBuffer is synchronized to prevent problems with concurrent
* access.
*
* @author Jan Waller
*/
private static final class TraceBuffer {
private static final Log LOG = LogFactory
.getLog(TraceBuffer.class);
private static final Comparator<AbstractOperationEvent> COMPARATOR = new TraceOperationComperator();
private Trace trace;
private final SortedSet<AbstractOperationEvent> events = new TreeSet<AbstractOperationEvent>(
COMPARATOR);
private boolean closeable;
private boolean damaged;
private int openEvents;
private int maxOrderIndex = -1;
private long minLoggingTimestamp = Long.MAX_VALUE;
private long maxLoggingTimestamp = -1;
private long traceId = -1;
/**
* Creates a new instance of this class.
*/
public TraceBuffer() {
// default empty constructor
}
public void insertEvent(final AbstractOperationEvent record) {
final long myTraceId = record.getTraceId();
synchronized (this) {
if (traceId == -1) {
traceId = myTraceId;
}
else if (traceId != myTraceId) {
LOG.error("Invalid traceId! Expected: " + traceId
+ " but found: " + myTraceId + " in event "
+ record.toString());
damaged = true;
}
final long loggingTimestamp = record.getLoggingTimestamp();
if (loggingTimestamp > maxLoggingTimestamp) {
maxLoggingTimestamp = loggingTimestamp;
}
if (loggingTimestamp < minLoggingTimestamp) {
minLoggingTimestamp = loggingTimestamp;
}
final int orderIndex = record.getOrderIndex();
if (orderIndex > maxOrderIndex) {
maxOrderIndex = orderIndex;
}
if (record instanceof BeforeOperationEvent) {
if (orderIndex == 0) {
closeable = true;
}
openEvents++;
}
else if (record instanceof AfterOperationEvent) {
openEvents--;
}
else if (record instanceof AfterFailedOperationEvent) {
openEvents--;
}
if (!events.add(record)) {
LOG.error("Duplicate entry for orderIndex " + orderIndex
+ " with traceId " + myTraceId);
damaged = true;
}
}
}
public void setTrace(final Trace trace) {
final long myTraceId = trace.getTraceId();
synchronized (this) {
if (traceId == -1) {
traceId = myTraceId;
}
else if (traceId != myTraceId) {
LOG.error("Invalid traceId! Expected: " + traceId
+ " but found: " + myTraceId + " in trace "
+ trace.toString());
damaged = true;
}
if (this.trace == null) {
this.trace = trace;
}
else {
LOG.error("Duplicate Trace entry for traceId " + myTraceId);
damaged = true;
}
}
}
public boolean isFinished() {
synchronized (this) {
return closeable && !isInvalid();
}
}
public boolean isInvalid() {
synchronized (this) {
return (trace == null)
|| damaged
|| (openEvents != 0)
|| (((maxOrderIndex + 1) != events.size()) || events
.isEmpty());
}
}
public TraceEventRecords toTraceEvents() {
synchronized (this) {
return new TraceEventRecords(
trace,
events.toArray(new AbstractOperationEvent[events.size()]));
}
}
public long getMaxLoggingTimestamp() {
synchronized (this) {
return maxLoggingTimestamp;
}
}
public long getMinLoggingTimestamp() {
synchronized (this) {
return minLoggingTimestamp;
}
}
/**
* @author Jan Waller
*/
private static final class TraceOperationComperator implements
Comparator<AbstractOperationEvent> {
public TraceOperationComperator() {}
public int compare(final AbstractOperationEvent o1,
final AbstractOperationEvent o2) {
return o1.getOrderIndex() - o2.getOrderIndex();
}
}
}
}
package explorviz.hpc_monitoring.plugin;
import java.util.*;
import java.util.concurrent.TimeUnit;
import kieker.analysis.IProjectContext;
import kieker.analysis.plugin.annotation.*;
import kieker.analysis.plugin.filter.AbstractFilterPlugin;
import kieker.common.configuration.Configuration;
import explorviz.hpc_monitoring.record.TraceEventRecords;
import explorviz.hpc_monitoring.record.events.AbstractOperationEvent;
/**
* This filter collects incoming traces for a specified amount of time.
* Any traces representing the same series of events will be used to calculate
* statistic informations like the average runtime of this kind of trace.
* Only one specimen of these traces containing this information will be
* forwarded
* from this filter.
*
* Statistic outliers regarding the runtime of the trace will be treated special
* and therefore send out as they are and will not be mixed with others.
*
*
* @author Florian Biss
*
* @since 1.8
*/
@Plugin(description = "This filter tries to aggregate similar TraceEventRecordss into a single trace.", outputPorts = { @OutputPort(name = TraceEventRecordAggregationFilter.OUTPUT_PORT_NAME_TRACES, description = "Output port for the processed traces", eventTypes = { TraceEventRecords.class }) }, configuration = {
@Property(name = TraceEventRecordAggregationFilter.CONFIG_PROPERTY_NAME_TIMEUNIT, defaultValue = TraceEventRecordAggregationFilter.CONFIG_PROPERTY_VALUE_TIMEUNIT),
@Property(name = TraceEventRecordAggregationFilter.CONFIG_PROPERTY_NAME_MAX_COLLECTION_DURATION, defaultValue = TraceEventRecordAggregationFilter.CONFIG_PROPERTY_VALUE_MAX_COLLECTION_DURATION),
@Property(name = TraceEventRecordAggregationFilter.CONFIG_PROPERTY_NAME_MAX_DEVIATION, defaultValue = TraceEventRecordAggregationFilter.CONFIG_PROPERTY_VALUE_MAX_DEVIATION) })
public class TraceEventRecordAggregationFilter extends AbstractFilterPlugin {
/**
* The name of the output port delivering the valid traces.
*/
public static final String OUTPUT_PORT_NAME_TRACES = "tracesOut";
/**
* The name of the input port receiving the trace records.
*/
public static final String INPUT_PORT_NAME_TRACES = "tracesIn";
/**
* The name of the property determining the time unit.
*/
public static final String CONFIG_PROPERTY_NAME_TIMEUNIT = "timeunit";
/**
* Clock input for timeout handling.
*/
public static final String INPUT_PORT_NAME_TIME_EVENT = "timestamp";
/**
* The default value of the time unit property (nanoseconds).
*/
public static final String CONFIG_PROPERTY_VALUE_TIMEUNIT = "NANOSECONDS"; // TimeUnit.NANOSECONDS.name()
/**
* The name of the property determining the maximal trace timeout.
*/
public static final String CONFIG_PROPERTY_NAME_MAX_COLLECTION_DURATION = "maxCollectionDuration";
/**
* The default value of the property determining the maximal trace timeout.
*/
public static final String CONFIG_PROPERTY_VALUE_MAX_COLLECTION_DURATION = "4000000000";
/**
* The name of the property determining the maximal runtime deviation
* factor.
*
* Outliers are indicated by
* <code>|runtime - averageRuntime| > deviationFactor * standardDeviation</code>
* .
* Use negative number to aggregate all traces.
*/
public static final String CONFIG_PROPERTY_NAME_MAX_DEVIATION = "maxDeviation";
/**
* The default value of the property determining the maximal runtime
* deviation factor.
* Default is two standard deviations.
*/
public static final String CONFIG_PROPERTY_VALUE_MAX_DEVIATION = "-1";
private final TimeUnit timeunit;
private final long maxCollectionDuration;
private final long maxDeviation;
private final Map<TraceEventRecords, TraceAggregationBuffer> trace2buffer;
public TraceEventRecordAggregationFilter(final Configuration configuration,
final IProjectContext projectContext) {
super(configuration, projectContext);
final String recordTimeunitProperty = projectContext
.getProperty(IProjectContext.CONFIG_PROPERTY_NAME_RECORDS_TIME_UNIT);
TimeUnit recordTimeunit;
try {
recordTimeunit = TimeUnit.valueOf(recordTimeunitProperty);
}
catch (final IllegalArgumentException ex) {
recordTimeunit = TimeUnit.NANOSECONDS;
}
timeunit = recordTimeunit;
maxDeviation = configuration
.getLongProperty(CONFIG_PROPERTY_NAME_MAX_DEVIATION);
maxCollectionDuration = timeunit.convert(configuration
.getLongProperty(CONFIG_PROPERTY_NAME_MAX_COLLECTION_DURATION),
timeunit);
trace2buffer = new TreeMap<TraceEventRecords, TraceAggregationBuffer>(
new TraceComperator());
}
@InputPort(name = INPUT_PORT_NAME_TRACES, description = "Collect identical traces and aggregate them.", eventTypes = { TraceEventRecords.class })
public void newEvent(final TraceEventRecords event) {
synchronized (this) {
insertIntoBuffer(event);
}
}
private void insertIntoBuffer(final TraceEventRecords trace) {
TraceAggregationBuffer traceBuffer;
traceBuffer = trace2buffer.get(trace);
if (traceBuffer == null) { // first record for this id!
synchronized (this) {
traceBuffer = trace2buffer.get(trace);
if (traceBuffer == null) { // NOCS (DCL)
traceBuffer = new TraceAggregationBuffer(System.nanoTime());
trace2buffer.put(trace, traceBuffer);
}
}
}
synchronized (this) {
traceBuffer.insertTrace(trace);
}
}
@InputPort(name = INPUT_PORT_NAME_TIME_EVENT, description = "Time signal for timeouts", eventTypes = { Long.class })
public void newEvent(final Long timestamp) {
synchronized (this) {
processTimeoutQueue(timestamp);
}
}
/**
* {@inheritDoc}
*/
@Override
public void terminate(final boolean error) {
synchronized (this) {
for (final TraceAggregationBuffer traceBuffer : trace2buffer
.values()) {
super.deliver(OUTPUT_PORT_NAME_TRACES,
traceBuffer.getAggregatedTrace());
}
trace2buffer.clear();
}
}
private void processTimeoutQueue(final long timestamp) {
final long bufferTimeout = timestamp - maxCollectionDuration;
List<TraceEventRecords> toRemove = new ArrayList<TraceEventRecords>();
for (final TraceAggregationBuffer traceBuffer : trace2buffer.values()) {
if (traceBuffer.getBufferCreatedTimestamp() <= bufferTimeout) {
TraceEventRecords aggregatedTrace = traceBuffer
.getAggregatedTrace();
super.deliver(OUTPUT_PORT_NAME_TRACES, aggregatedTrace);
toRemove.add(aggregatedTrace);
}
}
for (TraceEventRecords traceEventRecords : toRemove) {
trace2buffer.remove(traceEventRecords);
}
}
/**
* {@inheritDoc}
*/
@Override
public Configuration getCurrentConfiguration() {
final Configuration configuration = new Configuration();
configuration.setProperty(CONFIG_PROPERTY_NAME_TIMEUNIT,
timeunit.name());
configuration.setProperty(CONFIG_PROPERTY_NAME_MAX_COLLECTION_DURATION,
String.valueOf(maxCollectionDuration));
configuration.setProperty(CONFIG_PROPERTY_NAME_MAX_DEVIATION,
String.valueOf(maxDeviation));
return configuration;
}
private static final class TraceAggregationBuffer {
private TraceEventRecords accumulator;
private final long bufferCreatedTimestamp;
public TraceAggregationBuffer(final long bufferCreatedTimestamp) {
this.bufferCreatedTimestamp = bufferCreatedTimestamp;
}
public long getBufferCreatedTimestamp() {
return bufferCreatedTimestamp;
}
public TraceEventRecords getAggregatedTrace() {
return accumulator;
}
public void insertTrace(final TraceEventRecords trace) {
aggregate(trace);
}
private void aggregate(final TraceEventRecords trace) {
if (accumulator == null) {
accumulator = trace;
}
else {
final AbstractOperationEvent[] aggregatedRecords = accumulator
.getTraceOperations();
final AbstractOperationEvent[] records = trace
.getTraceOperations();
for (int i = 0; i < aggregatedRecords.length; i++) {
aggregatedRecords[i].getRuntime().merge(
records[i].getRuntime());
}
accumulator.getRuntime().merge(trace.getRuntime());
}
}
}
private static final class TraceComperator implements
Comparator<TraceEventRecords> {
public TraceComperator() {}
public int compare(final TraceEventRecords t1,
final TraceEventRecords t2) {
final AbstractOperationEvent[] recordsT1 = t1.getTraceOperations();
final AbstractOperationEvent[] recordsT2 = t2.getTraceOperations();
if ((recordsT1.length - recordsT2.length) != 0) {
return recordsT1.length - recordsT2.length;
}
final int cmpHostnames = t1.getTrace().getHostname()
.compareTo(t2.getTrace().getHostname());
if (cmpHostnames != 0) {
return cmpHostnames;
}
// TODO deep check records
return 0;
}
}
}
/***************************************************************************
* Copyright 2013 Kieker Project (http://kieker-monitoring.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
***************************************************************************/
package explorviz.hpc_monitoring.reader;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.zip.Inflater;
import kieker.analysis.IProjectContext;
import kieker.analysis.plugin.annotation.*;
import kieker.analysis.plugin.reader.AbstractReaderPlugin;
import kieker.common.configuration.Configuration;
import kieker.common.logging.Log;
import kieker.common.logging.LogFactory;
import kieker.common.record.IMonitoringRecord;
import com.rabbitmq.client.*;
import explorviz.hpc_monitoring.Bits;
import explorviz.hpc_monitoring.record.Trace;
import explorviz.hpc_monitoring.record.events.*;
/**
* Reads monitoring records from the queue of an established RabbitMQ
* connection.
*
* @author Santje Finke
*
* @since 1.8
*/
@Plugin(description = "A reader which reads records from a RabbitMQ queue", dependencies = "This plugin needs the file 'rabbitmq.client-*.jar'.", outputPorts = { @OutputPort(name = RabbitMQReader.OUTPUT_PORT_NAME_RECORDS, eventTypes = { IMonitoringRecord.class }, description = "Output Port of the JMSReader") }, configuration = {
@Property(name = RabbitMQReader.CONFIG_PROPERTY_NAME_PROVIDERURL, defaultValue = "localhost"),
@Property(name = RabbitMQReader.CONFIG_PROPERTY_NAME_QUEUE, defaultValue = "kieker"),
@Property(name = RabbitMQReader.CONFIG_PROPERTY_PASSWORD, defaultValue = "guest"),
@Property(name = RabbitMQReader.CONFIG_PROPERTY_USER, defaultValue = "guest"),
@Property(name = RabbitMQReader.CONFIG_PROPERTY_PORT, defaultValue = "5672")
})
public final class RabbitMQReader extends AbstractReaderPlugin {
/** The name of the output port delivering the received records. */
public static final String OUTPUT_PORT_NAME_RECORDS = "monitoringRecords";
/** The name of the configuration determining the RabbitMQ provider URL. */
public static final String CONFIG_PROPERTY_NAME_PROVIDERURL = "mqProviderUrl";
/**
* The name of the configuration determining the RabbitMQ Queue (e.g.
* queue1).
*/
public static final String CONFIG_PROPERTY_NAME_QUEUE = "mqDestination";
/** The username that is used to connect to a queue. */
public static final String CONFIG_PROPERTY_USER = "guest";
/** The password that is used to connect to a queue. */
public static final String CONFIG_PROPERTY_PASSWORD = "guest";
/** The port that is used to connect to a queue. */
public static final String CONFIG_PROPERTY_PORT = "5672";
static final Log LOG = LogFactory
.getLog(RabbitMQReader.class); // NOPMD
// package
// for
// inner
// class
private final String providerUrl;
private final String queueName;
private final String password;
private final String username;
private final int port;
private Connection connection;
private Channel channel;
private ConnectionFactory factory;
private QueueingConsumer normalConsumer;
private final CountDownLatch cdLatch = new CountDownLatch(
1);
private final Map<Integer, String> stringRegistry = new ConcurrentHashMap<Integer, String>(
16,
0.75f,
2);
private final RabbitMQRegistryConsumer registryConsumer;
/**
* Creates a new instance of this class using the given parameters.
*
* @param configuration
* The configuration used to initialize the whole reader. Keep in
* mind that the configuration should contain the following
* properties:
* <ul>
* <li>The property {@link #CONFIG_PROPERTY_NAME_PROVIDERURL},
* e.g. {@code localhost}
* <li>The property {@link #CONFIG_PROPERTY_NAME_QUEUE}, e.g.
* {@code queue1}
* <li>The property {@link #CONFIG_PROPERTY_PASSWORD}, e.g.
* {@code password}
* <li>The property {@link #CONFIG_PROPERTY_USER}, e.g.
* {@code username}
* <li>The property {@link #CONFIG_PROPERTY_PORT}, e.g.
* {@code port}
* </ul>
* @param projectContext
* The project context for this component.
*
* @throws IllegalArgumentException
* If one of the properties is empty.
*/
public RabbitMQReader(final Configuration configuration,
final IProjectContext projectContext)
throws IllegalArgumentException {
super(configuration, projectContext);
// Initialize the reader bases on the given configuration.
providerUrl = configuration
.getStringProperty(CONFIG_PROPERTY_NAME_PROVIDERURL);
queueName = configuration.getStringProperty(CONFIG_PROPERTY_NAME_QUEUE);
username = configuration.getStringProperty(CONFIG_PROPERTY_USER);
password = configuration.getStringProperty(CONFIG_PROPERTY_PASSWORD);
port = configuration.getIntProperty(CONFIG_PROPERTY_PORT);
// simple sanity check
if ((providerUrl.length() == 0) || (queueName.length() == 0)) {
throw new IllegalArgumentException(
"RabbitMQReader has not sufficient parameters. providerUrl ('"
+ providerUrl + "') or destination ('" + queueName
+ "'), is null");
}
registryConsumer = new RabbitMQRegistryConsumer(this, providerUrl,
"registryRecords", username, password, port);
registryConsumer.start();
}
/**
* Creates a new instance of this class using the given parameters.
*
* @param configuration
* The configuration used to initialize the whole reader. Keep in
* mind that the configuration should contain the following
* properties:
* <ul>
* <li>The property {@link #CONFIG_PROPERTY_NAME_PROVIDERURL},
* e.g. {@code localhost}
* <li>The property {@link #CONFIG_PROPERTY_NAME_QUEUE}, e.g.
* {@code queue1}
* <li>The property {@link #CONFIG_PROPERTY_PASSWORD}, e.g.
* {@code password}
* <li>The property {@link #CONFIG_PROPERTY_USER}, e.g.
* {@code username}
* </ul>
*
* @throws IllegalArgumentException
* If one of the properties is empty.
*
* @deprecated To be removed in Kieker 1.8.
*/
@Deprecated
public RabbitMQReader(final Configuration configuration)
throws IllegalArgumentException {
this(configuration, null);
}
/**
* A call to this method is a blocking call.
*
* @return true if the method succeeds, false otherwise.
*/
public boolean read() {
boolean retVal = true;
try {
createConnectionFactory();
connect();
while (!Thread.interrupted()) {
if (!connection.isOpen() || !channel.isOpen()) {
reconnect();
}
final QueueingConsumer.Delivery delivery = normalConsumer
.nextDelivery();
byte[] batchedMessages = delivery.getBody();
messagesfromByteArray(batchedMessages);
normalConsumer.getChannel().basicAck(
delivery.getEnvelope().getDeliveryTag(), false);
}
}
catch (final IOException ex) { // NOPMD NOCS (IllegalCatchCheck)
LOG.error("Error in read()", ex);
retVal = false;
}
catch (final ShutdownSignalException e) {
LOG.error("Error in read(): ShutdownSignal Exception", e);
retVal = false;
}
catch (final ConsumerCancelledException e) {
LOG.error("Error in read(): ConsumerCancelled Exception", e);
retVal = false;
}
catch (final InterruptedException e) {
LOG.error("Error in read(): Interrupted Exception", e);
retVal = false;
}
finally {
try {
if (connection != null) {
connection.close();
}
}
catch (final IOException e) {
LOG.error("Error in read()", e);
}
}
return retVal;
}
public byte[] decompressByteArray(byte[] bytes) {
ByteArrayOutputStream baos = null;
Inflater iflr = new Inflater();
iflr.setInput(bytes);
baos = new ByteArrayOutputStream();
byte[] tmp = new byte[4096];
try {
while (!iflr.finished()) {
int size = iflr.inflate(tmp);
baos.write(tmp, 0, size);
}
}
catch (Exception ex) {
}
finally {
try {
if (baos != null) {
baos.close();
}
}
catch (Exception ex) {}
}
return baos.toByteArray();
}
private void createConnectionFactory() {
factory = new ConnectionFactory();
factory.setHost(providerUrl);
factory.setPort(port);
factory.setConnectionTimeout(0);
factory.setUsername(username);
factory.setPassword(password);
}
private void connect() throws IOException {
connection = factory.newConnection();
channel = connection.createChannel();
channel.queueDeclare(queueName, false, false, false, null);
normalConsumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, false, normalConsumer);
channel.basicQos(50);
}
private void reconnect() {
try {
connect();
}
catch (final IOException e) {
RabbitMQReader.LOG.error("Error reestablishing connection", e);
}
}
private void messagesfromByteArray(final byte[] b) {
int offset = 0;
int firstValue;
while ((firstValue = Bits.getInt(b, offset)) != -1) {
offset += 4;
IMonitoringRecord record = null;
switch (firstValue) {
case 0: {
final long traceId = Bits.getLong(b, offset);
offset += 8;
final Integer hostnameId = Bits.getInt(b, offset);
offset += 4;
final long parentTraceId = Bits.getLong(b, offset);
offset += 8;
final int parentOrderId = Bits.getInt(b, offset);
offset += 4;
offset += 4; // dummy for nextOrderIndex
record = new Trace(traceId,
getStringFromRegistry(hostnameId), parentTraceId,
parentOrderId);
break;
}
case 1: {
final long timestamp = Bits.getLong(b, offset);
offset += 8;
final long traceId = Bits.getLong(b, offset);
offset += 8;
final int orderIndex = Bits.getInt(b, offset);
offset += 4;
final Integer operationId = Bits.getInt(b, offset);
offset += 4;
record = new BeforeOperationEvent(timestamp, traceId,
orderIndex, getStringFromRegistry(operationId));
break;
}
case 2: {
final long timestamp = Bits.getLong(b, offset);
offset += 8;
final long traceId = Bits.getLong(b, offset);
offset += 8;
final int orderIndex = Bits.getInt(b, offset);
offset += 4;
final Integer operationId = Bits.getInt(b, offset);
offset += 4;
final Integer causeId = Bits.getInt(b, offset);
offset += 4;
record = new AfterFailedOperationEvent(timestamp, traceId,
orderIndex, getStringFromRegistry(operationId),
getStringFromRegistry(causeId));
break;
}
case 3: {
final long timestamp = Bits.getLong(b, offset);
offset += 8;
final long traceId = Bits.getLong(b, offset);
offset += 8;
final int orderIndex = Bits.getInt(b, offset);
offset += 4;
final Integer operationId = Bits.getInt(b, offset);
offset += 4;
record = new AfterOperationEvent(timestamp, traceId,
orderIndex, getStringFromRegistry(operationId));
break;
}
default: {
LOG.error("unknown class id " + firstValue);
}
}
if (!super.deliver(OUTPUT_PORT_NAME_RECORDS, record)) {
LOG.error("deliverRecord returned false");
}
}
}
final void unblock() { // NOPMD (package visible for inner class)
cdLatch.countDown();
}
/**
* {@inheritDoc}
*/
public void terminate(final boolean error) {
LOG.info("Shutdown of RabbitMQReader requested.");
registryConsumer.interrupt();
unblock();
}
/**
* {@inheritDoc}
*/
@Override
public Configuration getCurrentConfiguration() {
final Configuration configuration = new Configuration();
configuration
.setProperty(CONFIG_PROPERTY_NAME_PROVIDERURL, providerUrl);
configuration.setProperty(CONFIG_PROPERTY_NAME_QUEUE, queueName);
configuration.setProperty(CONFIG_PROPERTY_USER, username);
configuration.setProperty(CONFIG_PROPERTY_PASSWORD, password);
return configuration;
}
public void addToRegistry(Integer key, String value) {
stringRegistry.put(key, value);
System.out.println(key + " " + value);
synchronized (this) {
notifyAll();
}
}
private String getStringFromRegistry(Integer id) {
String result = stringRegistry.get(id);
while (result == null) {
try {
synchronized (this) {
System.out.println("waiting for " + id);
this.wait();
}
}
catch (InterruptedException e) {
e.printStackTrace();
}
result = stringRegistry.get(id);
}
return result;
}
}
package explorviz.hpc_monitoring.reader;
import java.io.*;
import kieker.common.logging.Log;
import kieker.common.logging.LogFactory;
import com.rabbitmq.client.*;
import explorviz.hpc_monitoring.StringRegistryRecord;
public class RabbitMQRegistryConsumer extends Thread {
static final Log LOG = LogFactory.getLog(RabbitMQReader.class); // NOPMD
// package
// for
// inner
// class
private final RabbitMQReader parent;
private Connection connection;
private Channel channel;
private ConnectionFactory factory;
private QueueingConsumer registryConsumer;
private final String providerUrl;
private final String queueName;
private final String password;
private final String username;
private final int port;
public RabbitMQRegistryConsumer(RabbitMQReader parent, String providerUrl,
String queueName, String password, String username, int port) {
this.parent = parent;
this.providerUrl = providerUrl;
this.queueName = queueName;
this.password = password;
this.username = username;
this.port = port;
}
@Override
public void run() {
try {
createConnectionFactory();
connect();
while (!Thread.interrupted()) {
if (!connection.isOpen() || !channel.isOpen()) {
reconnect();
}
final QueueingConsumer.Delivery delivery = registryConsumer
.nextDelivery();
final Object message = readObjectFromBytes(delivery.getBody());
if (message instanceof StringRegistryRecord) {
final StringRegistryRecord record = (StringRegistryRecord) message;
parent.addToRegistry(record.getId(), record.getObject());
}
}
}
catch (final IOException ex) { // NOPMD NOCS (IllegalCatchCheck)
LOG.error("Error in read()", ex);
}
catch (final ClassNotFoundException e) {
LOG.error("Error in read(): ClassNotFound Exception", e);
}
catch (final ShutdownSignalException e) {
LOG.error("Error in read(): ShutdownSignal Exception", e);
}
catch (final ConsumerCancelledException e) {
LOG.error("Error in read(): ConsumerCancelled Exception", e);
}
catch (final InterruptedException e) {
LOG.error("Error in read(): Interrupted Exception", e);
}
finally {
try {
if (connection != null) {
connection.close();
}
}
catch (final IOException e) {
LOG.error("Error in read()", e);
}
}
}
private void createConnectionFactory() {
factory = new ConnectionFactory();
factory.setHost(providerUrl);
factory.setPort(port);
factory.setConnectionTimeout(0);
factory.setUsername(username);
factory.setPassword(password);
}
private void connect() throws IOException {
connection = factory.newConnection();
channel = connection.createChannel();
channel.queueDeclare(queueName, false, false, false, null);
registryConsumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, registryConsumer);
}
private void reconnect() {
try {
connect();
}
catch (final IOException e) {
RabbitMQReader.LOG.error("Error reestablishing connection", e);
}
}
private Object readObjectFromBytes(final byte[] bs) throws IOException,
ClassNotFoundException {
final ByteArrayInputStream bain = new ByteArrayInputStream(bs);
final ObjectInputStream in = new ObjectInputStream(bain);
final Object message = in.readObject();
return message;
}
}
/***************************************************************************
* Copyright 2013 Kieker Project (http://kieker-monitoring.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
***************************************************************************/
package explorviz.hpc_monitoring.record;
import java.io.Serializable;
/**
* Provides methods to calculate the minimum, maximum, average and the standard
* deviation of numerical data.
*
* @author Florian Biss
*
* @since 1.8
*/
public final class RuntimeStatisticInformation implements Serializable {
private static final long serialVersionUID = -1628273045707598143L;
private long count = 0;
private long min = Long.MAX_VALUE;
private long max = Long.MIN_VALUE;
// Using doubles to handle numbers above Integer.MAX_VALUE. Overflows are
// much worse than losing precision here.
private double sum;
private double squareSum;
public RuntimeStatisticInformation(final long runtime) {
set(runtime);
}
public long getCount() {
return count;
}
public long getMin() {
return min;
}
public long getMax() {
return max;
}
public long getAvg() {
if (count > 0) {
return (long) (sum / count);
}
else {
return -1;
}
}
public long getStandardDeviation() {
if (count <= 2) {
return -1;
}
else {
final double variance = (squareSum - ((sum * sum) / count))
/ (count - 1);
return (long) Math.sqrt(variance);
}
}
public double getSum() {
return sum;
}
public double getSquareSum() {
return squareSum;
}
public void insert(final long data) {
count++;
final double dataDouble = data;
sum += dataDouble;
squareSum += dataDouble * dataDouble;
min = Math.min(data, min);
max = Math.max(data, max);
}
public void set(final long data) {
count = 1;
// final double dataDouble = data;
// sum = dataDouble;
// squareSum = dataDouble * dataDouble;
// max = data;
// min = data;
}
public void merge(final RuntimeStatisticInformation statistics) {
count += statistics.getCount();
// sum += statistics.getSum();
// squareSum += statistics.getSquareSum();
// min = Math.min(statistics.getMin(), min);
// max = Math.max(statistics.getMax(), max);
}
@Override
public String toString() {
return count + ":" + min + ":" + max + ":" + sum + ":" + squareSum;
}
}
package explorviz.hpc_monitoring.record;
import kieker.common.record.IMonitoringRecord;
import kieker.common.record.flow.IFlowRecord;
public class Trace implements IFlowRecord {
private static final long serialVersionUID = 4028885353683254444L;
private final long traceId;
private final String hostname;
private final long parentTraceId;
private final int parentOrderId;
public Trace(long traceId, String hostname, long parentTraceId,
int parentOrderId) {
this.traceId = traceId;
this.hostname = hostname;
this.parentTraceId = parentTraceId;
this.parentOrderId = parentOrderId;
}
@Override
public long getLoggingTimestamp() {
throw new UnsupportedOperationException();
}
@Override
public Class<?>[] getValueTypes() {
throw new UnsupportedOperationException();
}
@Override
public void initFromArray(Object[] arg0) {
throw new UnsupportedOperationException();
}
@Override
public void setLoggingTimestamp(long arg0) {
throw new UnsupportedOperationException();
}
@Override
public Object[] toArray() {
throw new UnsupportedOperationException();
}
@Override
public int compareTo(IMonitoringRecord o) {
throw new UnsupportedOperationException();
}
public long getTraceId() {
return traceId;
}
public String getHostname() {
return hostname;
}
public long getParentTraceId() {
return parentTraceId;
}
public int getParentOrderId() {
return parentOrderId;
}
}
/***************************************************************************
* Copyright 2013 Kieker Project (http://kieker-monitoring.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
***************************************************************************/
package explorviz.hpc_monitoring.record;
import java.io.Serializable;
import java.util.Arrays;
import explorviz.hpc_monitoring.record.events.AbstractOperationEvent;
public final class TraceEventRecords implements Serializable {
private static final long serialVersionUID = 8589405631073291022L;
private final Trace trace;
private final AbstractOperationEvent[] traceEvents;
private final RuntimeStatisticInformation runtimeInformation;
/**
* Creates a new instance of this class using the given parameters.
*
* @param trace
* The trace to be stored in this object.
* @param traceEvents
* The trace events to be stored in this object.
*/
public TraceEventRecords(final Trace trace,
final AbstractOperationEvent[] traceEvents) { // NOPMD (stored
// directly)
this.trace = trace;
this.traceEvents = traceEvents;
runtimeInformation = new RuntimeStatisticInformation(1); // TODO
}
public Trace getTrace() {
return trace;
}
public AbstractOperationEvent[] getTraceOperations() {
return traceEvents; // NOPMD (internal array exposed)
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder(64);
sb.append(super.toString());
sb.append("\n\tTrace: ");
sb.append(trace);
for (final AbstractOperationEvent traceEvent : traceEvents) {
sb.append("\n\t");
sb.append(traceEvent.getClass().getSimpleName());
sb.append(": ");
sb.append(traceEvent);
}
sb.append("\n\t");
sb.append(runtimeInformation.getClass().getSimpleName());
sb.append(": ");
sb.append(runtimeInformation);
sb.append('\n');
return sb.toString();
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = (prime * result) + ((trace == null) ? 0 : trace.hashCode()); // NOCS
// (?:)
result = (prime * result) + Arrays.hashCode(traceEvents);
return result;
}
@Override
public boolean equals(final Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (this.getClass() != obj.getClass()) {
return false;
}
final TraceEventRecords other = (TraceEventRecords) obj;
if (trace == null) {
if (other.trace != null) {
return false;
}
}
else if (!trace.equals(other.trace)) {
return false;
}
if (!Arrays.equals(traceEvents, other.traceEvents)) {
return false;
}
return true;
}
public RuntimeStatisticInformation getRuntime() {
return runtimeInformation;
}
}
package explorviz.hpc_monitoring.record.events;
import kieker.common.record.IMonitoringRecord;
import kieker.common.record.flow.IFlowRecord;
import explorviz.hpc_monitoring.record.RuntimeStatisticInformation;
public class AbstractOperationEvent implements IFlowRecord {
private static final long serialVersionUID = 1224383944280820758L;
private final long timestamp;
private final long traceId;
private final int orderIndex;
private final String operationSignature;
private final RuntimeStatisticInformation runtimeStatisticInformation;
public AbstractOperationEvent(long timestamp, long traceId, int orderIndex,
String operationSignature) {
super();
this.timestamp = timestamp;
this.traceId = traceId;
this.orderIndex = orderIndex;
this.operationSignature = operationSignature;
runtimeStatisticInformation = new RuntimeStatisticInformation(timestamp);
}
@Override
public long getLoggingTimestamp() {
return timestamp;
}
@Override
public Class<?>[] getValueTypes() {
throw new UnsupportedOperationException();
}
@Override
public void initFromArray(Object[] arg0) {
throw new UnsupportedOperationException();
}
@Override
public void setLoggingTimestamp(long arg0) {
throw new UnsupportedOperationException();
}
@Override
public Object[] toArray() {
throw new UnsupportedOperationException();
}
@Override
public int compareTo(IMonitoringRecord o) {
throw new UnsupportedOperationException();
}
public long getTraceId() {
return traceId;
}
public int getOrderIndex() {
return orderIndex;
}
public String getOperationSignature() {
return operationSignature;
}
public RuntimeStatisticInformation getRuntime() {
return runtimeStatisticInformation;
}
}
package explorviz.hpc_monitoring.record.events;
public class AfterFailedOperationEvent extends AbstractOperationEvent {
private static final long serialVersionUID = -7488112271381329123L;
private final String cause;
public AfterFailedOperationEvent(long timestamp, long traceId,
int orderIndex, String operationSignature, String cause) {
super(timestamp, traceId, orderIndex, operationSignature);
this.cause = cause;
}
public String getCause() {
return cause;
}
}
package explorviz.hpc_monitoring.record.events;
public class AfterOperationEvent extends AbstractOperationEvent {
private static final long serialVersionUID = 6136529395371343882L;
public AfterOperationEvent(long timestamp, long traceId, int orderIndex,
String operationSignature) {
super(timestamp, traceId, orderIndex, operationSignature);
}
}
package explorviz.hpc_monitoring.record.events;
public class BeforeOperationEvent extends AbstractOperationEvent {
private static final long serialVersionUID = 1465085019211054059L;
public BeforeOperationEvent(long timestamp, long traceId, int orderIndex,
String operationSignature) {
super(timestamp, traceId, orderIndex, operationSignature);
}
}
......@@ -3,11 +3,14 @@ package explorviz.worker.main
import kieker.analysis.AnalysisController
import kieker.common.configuration.Configuration
import kieker.analysis.IAnalysisController
import kieker.analysis.plugin.reader.mq.RabbitMQReader
import kieker.analysis.plugin.filter.forward.CountingFilter
import kieker.analysis.plugin.filter.forward.TeeFilter
import kieker.analysis.plugin.filter.forward.CountingThroughputFilter
import kieker.analysis.plugin.filter.flow.EventRecordTraceReconstructionFilter
import explorviz.hpc_monitoring.reader.RabbitMQReader
import kieker.analysis.plugin.reader.timer.TimeReader
import explorviz.hpc_monitoring.plugin.EventRecordTraceReconstructionFilter
import explorviz.hpc_monitoring.plugin.TraceEventRecordAggregationFilter
import explorviz.hpc_monitoring.connector.RabbitMQConnector
class WorkerController {
......@@ -17,17 +20,39 @@ class WorkerController {
analysisInstance = new AnalysisController()
val rabbitMQ = initRabbitMQ()
val eventTraceReconstructionFilter = initEventRecordTraceReconstructionFilter()
val aggregationFilter = initAggregationFilter()
val timer = initTimer()
// val countingThroughputFilter = initCountingThroughputFilter()
// val teeFilter = initTeeFilter()
val rabbitMQConnector = initRabbitMQConnector()
val config = new Configuration()
// config.setProperty(CountingThroughputFilter::CONFIG_PROPERTY_NAME_INTERVAL_SIZE, "1000000000")
// val countingFilter = new CountingThroughputFilter(config, analysisInstance)
val teeFilter = initTeeFilter()
analysisInstance.connect(rabbitMQ, RabbitMQReader::OUTPUT_PORT_NAME_RECORDS, eventTraceReconstructionFilter,
EventRecordTraceReconstructionFilter::INPUT_PORT_NAME_TRACE_RECORDS)
// analysisInstance.connect(eventTraceReconstructionFilter,
// EventRecordTraceReconstructionFilter::OUTPUT_PORT_NAME_TRACE_VALID, countingThroughputFilter,
// CountingThroughputFilter::INPUT_PORT_NAME_OBJECTS)
analysisInstance.connect(eventTraceReconstructionFilter,
EventRecordTraceReconstructionFilter::OUTPUT_PORT_NAME_TRACE_VALID, aggregationFilter,
TraceEventRecordAggregationFilter::INPUT_PORT_NAME_TRACES)
analysisInstance.connect(eventTraceReconstructionFilter,
EventRecordTraceReconstructionFilter::OUTPUT_PORT_NAME_TRACE_INVALID, rabbitMQConnector,
RabbitMQConnector::INPUT_PORT_NAME_INVALID_TRACES)
val eventTraceReconstructionFilter =
new EventRecordTraceReconstructionFilter(config, analysisInstance);
analysisInstance.connect(timer,
TimeReader::OUTPUT_PORT_NAME_TIMESTAMPS, aggregationFilter,
TraceEventRecordAggregationFilter::INPUT_PORT_NAME_TIME_EVENT)
analysisInstance.connect(rabbitMQ, RabbitMQReader::OUTPUT_PORT_NAME_RECORDS, eventTraceReconstructionFilter, EventRecordTraceReconstructionFilter::INPUT_PORT_NAME_TRACE_RECORDS)
analysisInstance.connect(eventTraceReconstructionFilter, EventRecordTraceReconstructionFilter::OUTPUT_PORT_NAME_TRACE_VALID, teeFilter, TeeFilter::INPUT_PORT_NAME_EVENTS)
// analysisInstance.connect(aggregationFilter,
// TraceEventRecordAggregationFilter::OUTPUT_PORT_NAME_TRACES, teeFilter,
// TeeFilter::INPUT_PORT_NAME_EVENTS)
analysisInstance.connect(aggregationFilter,
TraceEventRecordAggregationFilter::OUTPUT_PORT_NAME_TRACES, rabbitMQConnector,
RabbitMQConnector::INPUT_PORT_NAME_VALID_TRACES)
try {
analysisInstance.run()
......@@ -43,9 +68,34 @@ class WorkerController {
new RabbitMQReader(rabbitConfig, analysisInstance)
}
def initCountingFilter() {
def initRabbitMQConnector() {
val rabbitConfig = new Configuration()
rabbitConfig.setProperty(RabbitMQReader::CONFIG_PROPERTY_NAME_PROVIDERURL, "localhost")
rabbitConfig.setProperty(RabbitMQReader::CONFIG_PROPERTY_NAME_QUEUE, "validTraces")
new RabbitMQConnector(rabbitConfig, analysisInstance)
}
def initAggregationFilter() {
val config = new Configuration()
new TraceEventRecordAggregationFilter(config, analysisInstance)
}
def initTimer() {
val config = new Configuration()
config.setProperty(TimeReader::CONFIG_PROPERTY_VALUE_UPDATE_INTERVAL_NS, "1000000000")
config.setProperty(TimeReader::CONFIG_PROPERTY_VALUE_DELAY_NS, "0")
new TimeReader(config, analysisInstance)
}
def initCountingThroughputFilter() {
val config = new Configuration()
config.setProperty(CountingThroughputFilter::CONFIG_PROPERTY_NAME_INTERVAL_SIZE, "1000000000")
new CountingThroughputFilter(config, analysisInstance)
}
def initEventRecordTraceReconstructionFilter() {
val config = new Configuration()
new CountingFilter(config, analysisInstance)
new EventRecordTraceReconstructionFilter(config, analysisInstance)
}
def initTeeFilter() {
......
package kieker.analysis.plugin;
public class Bits {
public static boolean getBoolean(byte[] b, int off) {
return b[off] != 0;
}
public static char getChar(byte[] b, int off) {
return (char) ((b[off + 1] & 0xFF) + (b[off] << 8));
}
public static short getShort(byte[] b, int off) {
return (short) ((b[off + 1] & 0xFF) + (b[off] << 8));
}
public static int getInt(byte[] b, int off) {
return ((b[off + 3] & 0xFF)) + ((b[off + 2] & 0xFF) << 8)
+ ((b[off + 1] & 0xFF) << 16) + ((b[off]) << 24);
}
public static float getFloat(byte[] b, int off) {
return Float.intBitsToFloat(getInt(b, off));
}
public static long getLong(byte[] b, int off) {
return ((b[off + 7] & 0xFFL)) + ((b[off + 6] & 0xFFL) << 8)
+ ((b[off + 5] & 0xFFL) << 16) + ((b[off + 4] & 0xFFL) << 24)
+ ((b[off + 3] & 0xFFL) << 32) + ((b[off + 2] & 0xFFL) << 40)
+ ((b[off + 1] & 0xFFL) << 48) + (((long) b[off]) << 56);
}
public static double getDouble(byte[] b, int off) {
return Double.longBitsToDouble(getLong(b, off));
}
public static byte getByte(byte[] b, int off) {
return b[off];
}
public static void putBoolean(byte[] b, int off, boolean val) {
b[off] = (byte) (val ? 1 : 0);
}
public static void putChar(byte[] b, int off, char val) {
b[off + 1] = (byte) (val);
b[off] = (byte) (val >>> 8);
}
public static void putShort(byte[] b, int off, short val) {
b[off + 1] = (byte) (val);
b[off] = (byte) (val >>> 8);
}
public static void putInt(byte[] b, int off, int val) {
b[off + 3] = (byte) (val);
b[off + 2] = (byte) (val >>> 8);
b[off + 1] = (byte) (val >>> 16);
b[off] = (byte) (val >>> 24);
}
public static void putFloat(byte[] b, int off, float val) {
putInt(b, off, Float.floatToIntBits(val));
}
public static void putLong(byte[] b, int off, long val) {
b[off + 7] = (byte) (val);
b[off + 6] = (byte) (val >>> 8);
b[off + 5] = (byte) (val >>> 16);
b[off + 4] = (byte) (val >>> 24);
b[off + 3] = (byte) (val >>> 32);
b[off + 2] = (byte) (val >>> 40);
b[off + 1] = (byte) (val >>> 48);
b[off] = (byte) (val >>> 56);
}
public static void putDouble(byte[] b, int off, double val) {
putLong(b, off, Double.doubleToLongBits(val));
}
public static void putByte(byte[] b, int off, byte val) {
b[off] = val;
}
}
/***************************************************************************
* Copyright 2013 Kieker Project (http://kieker-monitoring.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
***************************************************************************/
package kieker.analysis.plugin.connector.mq;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import kieker.analysis.IProjectContext;
import kieker.analysis.plugin.annotation.InputPort;
import kieker.analysis.plugin.annotation.OutputPort;
import kieker.analysis.plugin.annotation.Plugin;
import kieker.analysis.plugin.annotation.Property;
import kieker.analysis.plugin.filter.AbstractFilterPlugin;
import kieker.common.configuration.Configuration;
import kieker.common.logging.Log;
import kieker.common.logging.LogFactory;
/**
* A plugin used for kieker in the cloud.
* All incoming events are put into a RabbitMQ, but are also passed to an output port that can be used for
* testing purposes.
*
* @author Santje Finke
*
* @since 1.8
*
*/
@Plugin(
description = "A filter that writes all incoming events into a specified queue on a specified RabbitMQServer",
outputPorts = {
@OutputPort(
name = RabbitMQConnector.OUTPUT_PORT_NAME, eventTypes = { Object.class },
description = "Provides each incoming object") },
configuration = {
@Property(name = RabbitMQConnector.CONFIG_PROPERTY_NAME_PROVIDER, defaultValue = "localhost"),
@Property(name = RabbitMQConnector.CONFIG_PROPERTY_NAME_QUEUE, defaultValue = "master"),
@Property(name = RabbitMQConnector.CONFIG_PROPERTY_PASSWORD, defaultValue = "guest"),
@Property(name = RabbitMQConnector.CONFIG_PROPERTY_USER, defaultValue = "guest")
})
public class RabbitMQConnector extends AbstractFilterPlugin {
/**
* The name of the input port receiving the incoming events.
*/
public static final String INPUT_PORT_NAME_EVENTS = "inputEvents";
/**
* The name of the output port passing the incoming events.
*/
public static final String OUTPUT_PORT_NAME = "relayedEvents";
/**
* The name of the property determining the address of the used Server.
*/
public static final String CONFIG_PROPERTY_NAME_PROVIDER = "providerUrl";
/**
* The name of the property determining the name of the Queue.
*/
public static final String CONFIG_PROPERTY_NAME_QUEUE = "queueName";
/**
* The username that is used to connect to a queue.
*/
public static final String CONFIG_PROPERTY_USER = "guest";
/**
* The password that is used to connect to a queue.
*/
public static final String CONFIG_PROPERTY_PASSWORD = "guest";
private static final Log LOG = LogFactory.getLog(RabbitMQConnector.class);
private final String provider;
private final String queue;
private final String password;
private final String username;
private Connection connection;
private Channel channel;
private final ConnectionFactory factory;
/**
* Creates a new instance of this class using the given parameters.
*
* @param configuration
* The configuration for this plugin
*
* @deprecated To be removed in Kieker 1.8.
*/
@Deprecated
public RabbitMQConnector(final Configuration configuration) {
this(configuration, null);
}
/**
* Creates a new instance of this class using the given parameters.
*
* @param configuration
* The configuration for this component.
* @param projectContext
* The project context for this component.
*/
public RabbitMQConnector(final Configuration configuration, final IProjectContext projectContext) {
super(configuration, projectContext);
this.provider = configuration.getStringProperty(CONFIG_PROPERTY_NAME_PROVIDER);
this.queue = configuration.getStringProperty(CONFIG_PROPERTY_NAME_QUEUE);
this.username = configuration.getStringProperty(CONFIG_PROPERTY_USER);
this.password = configuration.getStringProperty(CONFIG_PROPERTY_PASSWORD);
this.factory = new ConnectionFactory();
this.factory.setHost(this.provider);
this.factory.setConnectionTimeout(0);
this.factory.setUsername(this.username);
this.factory.setPassword(this.password);
try {
this.connection = this.factory.newConnection();
this.channel = this.connection.createChannel();
this.channel.queueDeclare(this.queue, false, false, false, null);
} catch (final IOException e) {
LOG.info("Error establishing connection", e);
}
LOG.info("Sending to destination:" + this.queue + " at " + this.provider + " !\n***\n\n");
}
@Override
public Configuration getCurrentConfiguration() {
final Configuration configuration = new Configuration();
configuration.setProperty(CONFIG_PROPERTY_NAME_PROVIDER, this.provider);
configuration.setProperty(CONFIG_PROPERTY_NAME_QUEUE, this.queue);
configuration.setProperty(CONFIG_PROPERTY_PASSWORD, this.password);
configuration.setProperty(CONFIG_PROPERTY_USER, this.username);
return configuration;
}
/**
* This method represents the input port of this filter.
*
* @param event
* The next event.
*/
@InputPort(
name = INPUT_PORT_NAME_EVENTS,
eventTypes = { Object.class },
description = "Receives incoming objects to be forwarded to a queue")
public final void inputEvent(final Object event) {
super.deliver(OUTPUT_PORT_NAME, event);
final ByteArrayOutputStream boas = new ByteArrayOutputStream();
ObjectOutputStream out;
try {
out = new ObjectOutputStream(boas);
out.writeObject(event);
out.close();
final byte[] message = boas.toByteArray();
if (!this.connection.isOpen() || !this.channel.isOpen()) {
this.reconnect();
}
this.channel.basicPublish("", this.queue, null, message);
} catch (final IOException e) {
RabbitMQConnector.LOG.error("Error sending event", e);
}
}
/**
* Establishes a connection to a rabbitMQ channel with the current connection informationen.
*/
private void reconnect() {
try {
this.connection = this.factory.newConnection();
this.channel = this.connection.createChannel();
this.channel.queueDeclare(this.queue, false, false, false, null);
} catch (final IOException e) {
RabbitMQConnector.LOG.error("Error reestablishing connection", e);
}
}
}
/***************************************************************************
* Copyright 2013 Kieker Project (http://kieker-monitoring.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
***************************************************************************/
package kieker.analysis.plugin.filter.flow;
import java.io.Serializable;
import java.util.Comparator;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import kieker.analysis.IProjectContext;
import kieker.analysis.plugin.annotation.InputPort;
import kieker.analysis.plugin.annotation.OutputPort;
import kieker.analysis.plugin.annotation.Plugin;
import kieker.analysis.plugin.annotation.Property;
import kieker.analysis.plugin.filter.AbstractFilterPlugin;
import kieker.common.configuration.Configuration;
import kieker.common.logging.Log;
import kieker.common.logging.LogFactory;
import kieker.common.record.flow.IFlowRecord;
import kieker.common.record.flow.trace.AbstractTraceEvent;
import kieker.common.record.flow.trace.WorkflowRecord;
import kieker.common.record.flow.trace.WorkflowTrace;
/**
* This filter merges partial traces with the same trace id into one complete trace.
* Incomplete traces will be delivered after a specified timeout.
*
* @author Florian Biss, Soeren Mahmens, Bjoern Weissenfels
*
* @since 1.8
*/
@Plugin(name = "Final Trace Reconstruction Filter (Workflow)",
description = "This filter merges partial WorkflowTraces into complete traces.",
outputPorts = {
@OutputPort(name = WorkflowRecordFinalTraceReconstructionFilter.OUTPUT_PORT_NAME_VALID_TRACES,
description = "Forwards valid traces",
eventTypes = { WorkflowTrace.class }),
@OutputPort(name = WorkflowRecordFinalTraceReconstructionFilter.OUTPUT_PORT_NAME_INVALID_TRACES,
description = "Forwards invalid traces",
eventTypes = { WorkflowTrace.class })
},
configuration = {
@Property(name = WorkflowRecordFinalTraceReconstructionFilter.CONFIG_PROPERTY_NAME_TIMEUNIT,
defaultValue = WorkflowRecordFinalTraceReconstructionFilter.CONFIG_PROPERTY_VALUE_TIMEUNIT),
@Property(name = WorkflowRecordFinalTraceReconstructionFilter.CONFIG_PROPERTY_NAME_MAX_TRACE_TIMEOUT,
defaultValue = WorkflowRecordFinalTraceReconstructionFilter.CONFIG_PROPERTY_VALUE_MAX_TIME),
@Property(name = WorkflowRecordFinalTraceReconstructionFilter.CONFIG_PROPERTY_NAME_MAX_TRACE_DURATION,
defaultValue = WorkflowRecordFinalTraceReconstructionFilter.CONFIG_PROPERTY_VALUE_MAX_TIME)
})
public class WorkflowRecordFinalTraceReconstructionFilter extends AbstractFilterPlugin {
/**
* The name of the output port delivering the valid traces.
*/
public static final String OUTPUT_PORT_NAME_VALID_TRACES = "validTraces";
/**
* The name of the output port delivering the valid traces.
*/
public static final String OUTPUT_PORT_NAME_INVALID_TRACES = "invalidTraces";
/**
* The name of the input port receiving the trace records.
*/
public static final String INPUT_PORT_NAME_PARTIAL_TRACES = "partialTraces";
/**
* The name of the time trigger input port.
*/
public static final String INPUT_PORT_NAME_TIME_EVENT = "timestamp";
/**
* The name of the property determining the time unit.
*/
public static final String CONFIG_PROPERTY_NAME_TIMEUNIT = "timeunit";
/**
* The name of the property determining the maximal trace duration, the time
* this filter waits for new partial traces of a trace.
*/
public static final String CONFIG_PROPERTY_NAME_MAX_TRACE_DURATION = "maxTraceDuration";
/**
* The name of the property determining the maximal trace timeout.
*/
public static final String CONFIG_PROPERTY_NAME_MAX_TRACE_TIMEOUT = "maxTraceTimeout";
/**
* The default value of the properties for the maximal trace duration and timeout.
*/
public static final String CONFIG_PROPERTY_VALUE_MAX_TIME = "9223372036854775807"; // String.valueOf(Long.MAX_VALUE)
/**
* The default value of the time unit property (nanoseconds).
*/
public static final String CONFIG_PROPERTY_VALUE_TIMEUNIT = "NANOSECONDS"; // TimeUnit.NANOSECONDS.name()
private static final Log LOG = LogFactory.getLog(WorkflowRecordFinalTraceReconstructionFilter.class);
private final TimeUnit timeunit;
private final long maxTraceDuration;
private final long maxTraceTimeout;
private final ConcurrentMap<Long, TraceBuffer> traceId2trace;
/**
* Creates a new instance of this class using the given parameters.
*
* @param configuration
* The configuration for this component.
* @param projectContext
* The project context for this component.
*/
public WorkflowRecordFinalTraceReconstructionFilter(final Configuration configuration, final IProjectContext projectContext) {
super(configuration, projectContext);
if (null != projectContext) { // TODO #819 remove non-null check and else case in Kieker 1.8 //NOCS
final String recordTimeunitProperty = projectContext.getProperty(IProjectContext.CONFIG_PROPERTY_NAME_RECORDS_TIME_UNIT);
TimeUnit recordTimeunit;
try {
recordTimeunit = TimeUnit.valueOf(recordTimeunitProperty);
} catch (final IllegalArgumentException ex) { // already caught in AnalysisController, should never happen
LOG.warn(recordTimeunitProperty + " is no valid TimeUnit! Using NANOSECONDS instead.");
recordTimeunit = TimeUnit.NANOSECONDS;
}
this.timeunit = recordTimeunit;
} else {
this.timeunit = TimeUnit.NANOSECONDS;
}
final String configTimeunitProperty = configuration.getStringProperty(CONFIG_PROPERTY_NAME_TIMEUNIT);
TimeUnit configTimeunit;
try {
configTimeunit = TimeUnit.valueOf(configTimeunitProperty);
} catch (final IllegalArgumentException ex) {
LOG.warn(configTimeunitProperty + " is no valid TimeUnit! Using inherited value of " + this.timeunit.name() + " instead.");
configTimeunit = this.timeunit;
}
this.maxTraceDuration = this.timeunit.convert(configuration.getLongProperty(CONFIG_PROPERTY_NAME_MAX_TRACE_DURATION), configTimeunit);
this.maxTraceTimeout = this.timeunit.convert(configuration.getLongProperty(CONFIG_PROPERTY_NAME_MAX_TRACE_TIMEOUT), configTimeunit);
this.traceId2trace = new ConcurrentHashMap<Long, TraceBuffer>();
}
/**
* This method is the input port for the timeout.
*
* @param timestamp
* The actual nanotime
*/
@InputPort(
name = INPUT_PORT_NAME_TIME_EVENT,
description = "Input port for periodic a time signal",
eventTypes = { Long.class })
public void newEvent(final Long timestamp) {
synchronized (this) {
for (final TraceBuffer traceBuffer : this.traceId2trace.values()) {
final long timeSinceLastRecord = timestamp - traceBuffer.getMaxLoggingTimestamp();
final long timeSinceFirstRecord = timestamp - traceBuffer.getMinLoggingTimestamp();
if ((timeSinceLastRecord >= this.maxTraceTimeout) || (timeSinceFirstRecord >= this.maxTraceDuration)) { // max duration or timeout is gone
super.deliver(OUTPUT_PORT_NAME_INVALID_TRACES, traceBuffer.toWorkflowTrace());
// Concurrent modification allowed by ConcurrentMap
this.traceId2trace.remove(traceBuffer.getTraceId());
}
}
}
}
/**
* This method is the input port for the new events for this filter.
*
* @param record
* The new record to handle.
*/
@InputPort(
name = INPUT_PORT_NAME_PARTIAL_TRACES,
description = "Input port for partial traces from WorkflowRecordPartialTraceReconstructionFilters",
eventTypes = { WorkflowTrace.class })
public void newEvent(final IFlowRecord record) {
final Long traceId;
TraceBuffer traceBuffer;
final WorkflowTrace trace;
if (record instanceof WorkflowTrace) {
trace = (WorkflowTrace) record;
if (trace.isComplete()) {
super.deliver(OUTPUT_PORT_NAME_VALID_TRACES, trace); // Nothing to do here
return;
} else {
traceId = trace.getTraceId();
synchronized (this) {
traceBuffer = this.getTraceBuffer(traceId);
traceBuffer.insertTrace(trace);
}
}
} else {
LOG.error("Invalid input type at " + INPUT_PORT_NAME_PARTIAL_TRACES
+ " in WorkflowRecordFinalTraceReconstructionFilter");
return; // invalid type
}
synchronized (this) {
if (traceBuffer.isComplete()) {
this.traceId2trace.remove(traceId);
super.deliver(OUTPUT_PORT_NAME_VALID_TRACES, traceBuffer.toWorkflowTrace());
}
}
}
private TraceBuffer getTraceBuffer(final Long traceId) {
TraceBuffer traceBuffer;
traceBuffer = this.traceId2trace.get(traceId);
if (traceBuffer == null) { // first record for this id!
synchronized (this) {
traceBuffer = this.traceId2trace.get(traceId);
if (traceBuffer == null) { // NOCS (DCL)
final TraceBuffer newTraceBuffer = new TraceBuffer(this.timeunit);
traceBuffer = this.traceId2trace.put(traceId, newTraceBuffer);
if (traceBuffer == null) {
traceBuffer = newTraceBuffer;
}
}
}
}
return traceBuffer;
}
/**
* {@inheritDoc}
*/
@Override
public void terminate(final boolean error) {
synchronized (this) {
for (final TraceBuffer traceBuffer : this.traceId2trace.values()) {
super.deliver(OUTPUT_PORT_NAME_INVALID_TRACES, traceBuffer.toWorkflowTrace());
}
this.traceId2trace.clear();
}
}
/**
* {@inheritDoc}
*/
@Override
public Configuration getCurrentConfiguration() {
final Configuration configuration = new Configuration();
configuration.setProperty(CONFIG_PROPERTY_NAME_TIMEUNIT, this.timeunit.name());
configuration.setProperty(CONFIG_PROPERTY_NAME_MAX_TRACE_DURATION, String.valueOf(this.maxTraceDuration));
configuration.setProperty(CONFIG_PROPERTY_NAME_MAX_TRACE_TIMEOUT, String.valueOf(this.maxTraceTimeout));
return configuration;
}
/**
* Buffer for events from partial traces that will be turned into a single trace.
*
* @author Florian Biss
*/
private static final class TraceBuffer {
private static final Log LOG = LogFactory.getLog(TraceBuffer.class);
private static final Comparator<AbstractTraceEvent> COMPARATOR = new TraceEventComperator();
private final SortedSet<AbstractTraceEvent> events = new TreeSet<AbstractTraceEvent>(COMPARATOR);
private boolean damaged;
private int maxOrderIndex = -1;
private final TimeUnit timeunit;
private long minLoggingTimestamp = Long.MAX_VALUE;
private long maxLoggingTimestamp = -1;
private long traceId = -1;
private boolean hasStart;
private boolean hasEnd;
/**
* Creates a new buffer.
*
* @param timeunit
* TimetUnit used for logging timestamps.
*/
public TraceBuffer(final TimeUnit timeunit) {
this.timeunit = timeunit;
}
/**
* Store all events of a partial trace in the buffer.
*
* @param paritalTrace
* A partial trace
*/
public void insertTrace(final WorkflowTrace paritalTrace) {
final long myTraceId = paritalTrace.getTraceId();
// Time information in partial traces are old (partial traces timed out on a worker),
// use current system time instead.
final long loggingTimestamp = this.timeunit.convert(System.nanoTime(), TimeUnit.NANOSECONDS);
synchronized (this) {
if (this.traceId == -1) {
this.traceId = myTraceId;
this.minLoggingTimestamp = loggingTimestamp;
} else if (this.traceId != myTraceId) {
LOG.error("Invalid traceId! Expected: " + this.traceId + " but found: " + myTraceId + " in trace " + paritalTrace.toString());
this.damaged = true;
}
this.maxLoggingTimestamp = loggingTimestamp;
final WorkflowRecord[] newEvents = paritalTrace.getTraceEvents();
for (final WorkflowRecord event : newEvents) {
this.insertEvent(event);
}
if (paritalTrace.isDamaged()) {
this.damaged = true;
}
}
}
private void insertEvent(final WorkflowRecord event) {
final int orderIndex = event.getOrderIndex();
if (orderIndex > this.maxOrderIndex) {
this.maxOrderIndex = orderIndex;
}
if (event.isStart()) {
if (this.hasStart) {
LOG.error("Duplicate start event! TraceId: " + this.traceId + " Event: " + event.toString());
this.damaged = true;
}
this.hasStart = true;
}
if (event.isEnd()) {
if (this.hasEnd) {
LOG.error("Duplicate end event! TraceId: " + this.traceId + " Event: " + event.toString());
this.damaged = true;
}
this.hasEnd = true;
}
if (!this.events.add(event)) {
LOG.error("Duplicate entry for orderIndex " + orderIndex + " with traceId " + this.traceId);
this.damaged = true;
}
}
/**
* @return
* <code>true</code> if all records are present and the trace has a start and an end record.
*/
public boolean isComplete() {
synchronized (this) {
return ((this.maxOrderIndex + 1) == this.events.size()) && !this.events.isEmpty()
&& this.hasEnd && this.hasStart && !this.damaged;
}
}
/**
* @return <code>true</code> if the trace in this buffer is damaged.
*/
public boolean isDamaged() {
synchronized (this) {
return this.damaged;
}
}
/**
* @return The trace id
*/
public long getTraceId() {
synchronized (this) {
return this.traceId;
}
}
/**
* Process this buffer into a WorkflowTrace containing all buffered events.
*
* @return A new WorkflowTrace
*/
public WorkflowTrace toWorkflowTrace() {
synchronized (this) {
return new WorkflowTrace(this.events.toArray(new WorkflowRecord[this.events.size()]),
this.isComplete(), this.isDamaged());
}
}
/**
* {@inheritDoc}
*/
@Override
public String toString() {
return this.toWorkflowTrace().toString();
}
/**
* @return Insertion timestamp of the latest event in this buffer
*/
public long getMaxLoggingTimestamp() {
synchronized (this) {
return this.maxLoggingTimestamp;
}
}
/**
* @return Insertion timestamp of the first event in this buffer
*/
public long getMinLoggingTimestamp() {
synchronized (this) {
return this.minLoggingTimestamp;
}
}
/**
* Compares two trace events by their order index.
*
* @author Jan Waller
*/
private static final class TraceEventComperator implements Comparator<AbstractTraceEvent>, Serializable {
private static final long serialVersionUID = 89207356648232517L;
/**
* Creates a new instance of this class.
*/
public TraceEventComperator() {
// default empty constructor
}
/**
* {@inheritDoc}
*/
public int compare(final AbstractTraceEvent o1, final AbstractTraceEvent o2) {
return o1.getOrderIndex() - o2.getOrderIndex();
}
}
}
}
/***************************************************************************
* Copyright 2013 Kieker Project (http://kieker-monitoring.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
***************************************************************************/
package kieker.analysis.plugin.filter.flow;
import java.io.Serializable;
import java.util.Comparator;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import kieker.analysis.IProjectContext;
import kieker.analysis.plugin.annotation.InputPort;
import kieker.analysis.plugin.annotation.OutputPort;
import kieker.analysis.plugin.annotation.Plugin;
import kieker.analysis.plugin.annotation.Property;
import kieker.analysis.plugin.filter.AbstractFilterPlugin;
import kieker.common.configuration.Configuration;
import kieker.common.logging.Log;
import kieker.common.logging.LogFactory;
import kieker.common.record.flow.IFlowRecord;
import kieker.common.record.flow.trace.AbstractTraceEvent;
import kieker.common.record.flow.trace.WorkflowRecord;
import kieker.common.record.flow.trace.WorkflowTrace;
/**
* This filter collects WorkflowRecords and constructs ordered traces of them.
*
*
* @author Florian Biss, Soeren Mahmens, Bjoern Weissenfels
*
* @since 1.8
*/
@Plugin(name = "Partial Trace Reconstruction Filter (Workflow)",
description = "This filter bundles WorkflowRecords into a trace",
outputPorts = {
@OutputPort(name = WorkflowRecordPartialTraceReconstructionFilter.OUTPUT_PORT_NAME_PARTIAL_TRACES,
description = "Forwards the constructed partial and complete traces",
eventTypes = { WorkflowTrace.class })
},
configuration = {
@Property(name = WorkflowRecordPartialTraceReconstructionFilter.CONFIG_PROPERTY_NAME_MAX_TRACE_TIMEOUT_NS,
defaultValue = WorkflowRecordPartialTraceReconstructionFilter.CONFIG_PROPERTY_VALUE_MAX_TRACE_TIMEOUT_NS),
@Property(name = WorkflowRecordPartialTraceReconstructionFilter.CONFIG_PROPERTY_NAME_MAX_TRACE_DURATION_NS,
defaultValue = WorkflowRecordPartialTraceReconstructionFilter.CONFIG_PROPERTY_VALUE_MAX_TRACE_DURATION_NS)
})
public class WorkflowRecordPartialTraceReconstructionFilter extends AbstractFilterPlugin {
/**
* The name of the output port delivering the valid traces.
*/
public static final String OUTPUT_PORT_NAME_PARTIAL_TRACES = "partialTraces";
/**
* The name of the input port receiving the trace records.
*/
public static final String INPUT_PORT_NAME_WORKFLOW_RECORDS = "workflowRecords";
/**
* The name of the input port receiving the time stamps.
*/
public static final String INPUT_PORT_NAME_TIME_EVENT = "timestamp";
/**
* The name of the property determining the maximal trace timeout.
*/
public static final String CONFIG_PROPERTY_NAME_MAX_TRACE_TIMEOUT_NS = "maxTraceTimeout";
/**
* The default value of the property determining the maximal trace timeout.
*/
public static final String CONFIG_PROPERTY_VALUE_MAX_TRACE_TIMEOUT_NS = "1000000000"; // 1 seconds
/**
* The name of the property determining the maximal trace duration.
*/
public static final String CONFIG_PROPERTY_NAME_MAX_TRACE_DURATION_NS = "maxTraceDuration";
/**
* The default value of the property determining the maximal trace duration.
*/
public static final String CONFIG_PROPERTY_VALUE_MAX_TRACE_DURATION_NS = "5000000000"; // 5 seconds
private static final Log LOG = LogFactory.getLog(WorkflowRecordPartialTraceReconstructionFilter.class);
private final ConcurrentMap<Long, TraceBuffer> traceId2trace;
private final long maxTraceTimeout;
private final long maxTraceDuration;
/**
* Creates a new instance of this class using the given parameters.
*
* @param configuration
* The configuration for this component.
* @param projectContext
* The project context for this component.
*/
public WorkflowRecordPartialTraceReconstructionFilter(final Configuration configuration, final IProjectContext projectContext) {
super(configuration, projectContext);
this.traceId2trace = new ConcurrentHashMap<Long, TraceBuffer>();
this.maxTraceTimeout = configuration.getLongProperty(CONFIG_PROPERTY_NAME_MAX_TRACE_TIMEOUT_NS);
this.maxTraceDuration = configuration.getLongProperty(CONFIG_PROPERTY_NAME_MAX_TRACE_DURATION_NS);
}
/**
* This method is the input port for the timeout.
*
* @param timestamp
* The actual nanotime
*/
@InputPort(
name = INPUT_PORT_NAME_TIME_EVENT,
description = "Input port for periodic a time signal",
eventTypes = { Long.class })
public void newEvent(final Long timestamp) {
synchronized (this) {
for (final TraceBuffer traceBuffer : this.traceId2trace.values()) {
final long timeSinceLastRecord = timestamp - traceBuffer.getMaxLoggingTimestamp();
final long timeSinceFirstRecord = timestamp - traceBuffer.getMinLoggingTimestamp();
if ((timeSinceLastRecord >= this.maxTraceTimeout) || (timeSinceFirstRecord >= this.maxTraceDuration)) { // max timeout or duration is gone
this.traceId2trace.remove(traceBuffer.getTraceID());
super.deliver(OUTPUT_PORT_NAME_PARTIAL_TRACES, traceBuffer.toWorkflowTrace());
}
}
}
}
/**
* This method is the input port for the new events for this filter.
*
* @param record
* The new record to handle.
*/
@InputPort(
name = INPUT_PORT_NAME_WORKFLOW_RECORDS,
description = "Input port for WorkflowRecords",
eventTypes = { WorkflowRecord.class })
public void newEvent(final IFlowRecord record) {
final Long traceId;
TraceBuffer traceBuffer;
if (record instanceof WorkflowRecord) {
final WorkflowRecord traceRecord = (WorkflowRecord) record;
traceId = traceRecord.getTraceId();
traceBuffer = this.getTraceBuffer(traceId);
synchronized (this) {
traceBuffer.insertEvent(traceRecord);
if (traceBuffer.hasEnd()) {
this.traceId2trace.remove(traceId);
super.deliver(OUTPUT_PORT_NAME_PARTIAL_TRACES, traceBuffer.toWorkflowTrace());
}
}
} else {
LOG.error("Invalid event type in WorkflowRecordPartialTraceReconstructionFilter");
return; // invalid type which should not happen due to the specified eventTypes
}
}
private TraceBuffer getTraceBuffer(final Long traceId) {
TraceBuffer traceBuffer;
traceBuffer = this.traceId2trace.get(traceId);
if (traceBuffer == null) { // first record for this id!
synchronized (this) {
traceBuffer = this.traceId2trace.get(traceId);
if (traceBuffer == null) { // NOCS (DCL)
final TraceBuffer newTraceBuffer = new TraceBuffer();
traceBuffer = this.traceId2trace.put(traceId, newTraceBuffer);
if (traceBuffer == null) {
traceBuffer = newTraceBuffer;
}
}
}
}
return traceBuffer;
}
/**
* {@inheritDoc}
*/
@Override
public void terminate(final boolean error) {
this.deliverAllBuffer();
}
/**
* {@inheritDoc}
*/
@Override
public Configuration getCurrentConfiguration() {
final Configuration configuration = new Configuration();
configuration.setProperty(CONFIG_PROPERTY_NAME_MAX_TRACE_TIMEOUT_NS, String.valueOf(this.maxTraceTimeout));
return configuration;
}
/**
* This method delivers all traceBuffer and clears the buffer.
*/
private void deliverAllBuffer() {
synchronized (this) {
for (final TraceBuffer traceBuffer : this.traceId2trace.values()) {
super.deliver(OUTPUT_PORT_NAME_PARTIAL_TRACES, traceBuffer.toWorkflowTrace());
}
this.traceId2trace.clear();
}
}
/**
* Buffer for records that will be bundled into a single trace.
*
* @author Florian Biss
*/
private static final class TraceBuffer {
private static final Log LOG = LogFactory.getLog(TraceBuffer.class);
private static final Comparator<AbstractTraceEvent> COMPARATOR = new TraceEventComperator();
private final SortedSet<WorkflowRecord> events = new TreeSet<WorkflowRecord>(COMPARATOR);
private boolean damaged;
private int maxOrderIndex = -1;
private long minLoggingTimestamp = Long.MAX_VALUE;
private long maxLoggingTimestamp = -1;
private long traceId = -1;
private boolean hasStart;
private boolean ended;
/**
* Creates a new buffer.
*/
public TraceBuffer() {
// default empty constructor
}
/**
* @return The trace id
*/
public Object getTraceID() {
synchronized (this) {
return this.traceId;
}
}
/**
* Insert a new event into buffer.
*
* @param event
* New event
*/
public void insertEvent(final WorkflowRecord event) {
final long myTraceId = event.getTraceId();
synchronized (this) {
final long currentTime = System.nanoTime();
if (this.traceId == -1) {
this.traceId = myTraceId;
this.minLoggingTimestamp = currentTime;
} else if (this.traceId != myTraceId) {
LOG.error("Invalid traceId! Expected: " + this.traceId + " but found: " + myTraceId + " in event " + event.toString());
this.damaged = true;
}
this.maxLoggingTimestamp = currentTime;
final int orderIndex = event.getOrderIndex();
if (orderIndex > this.maxOrderIndex) {
this.maxOrderIndex = orderIndex;
}
if (event.isStart()) {
if (this.hasStart) {
LOG.error("Duplicate start event! TraceId: " + this.traceId + " Event: " + event.toString());
this.damaged = true;
}
this.hasStart = true;
}
if (event.isEnd()) {
if (this.ended) {
LOG.error("Duplicate end event! TraceId: " + this.traceId + " Event: " + event.toString());
this.damaged = true;
}
this.ended = true;
}
if (!this.events.add(event)) {
LOG.error("Duplicate entry for orderIndex " + orderIndex + " with traceId " + myTraceId);
this.damaged = true;
}
}
}
/**
* @return <code>true</code> if this buffer contains a trace end record
*/
public boolean hasEnd() {
synchronized (this) {
return this.ended;
}
}
/**
* @return <code>true</code> if this buffer contains all records of a trace, including start and end record
*/
public boolean isComplete() {
synchronized (this) {
return ((this.maxOrderIndex + 1) == this.events.size())
&& !this.events.isEmpty() && this.ended && this.hasStart;
}
}
/**
* @return <code>true</code> if the trace in this buffer is damaged
*/
public boolean isDamaged() {
synchronized (this) {
return this.damaged;
}
}
/**
* Process buffer into a trace.
*
* @return
* A new trace containing the buffered events
*/
public WorkflowTrace toWorkflowTrace() {
return new WorkflowTrace(this.events.toArray(new WorkflowRecord[this.events.size()]),
this.isComplete(), this.isDamaged());
}
/**
* @return Youngest time stamp in trace
*/
public long getMaxLoggingTimestamp() {
synchronized (this) {
return this.maxLoggingTimestamp;
}
}
/**
* @return Oldest time stamp in trace
*/
public long getMinLoggingTimestamp() {
synchronized (this) {
return this.minLoggingTimestamp;
}
}
/**
* @author Jan Waller
*/
private static final class TraceEventComperator implements Comparator<AbstractTraceEvent>, Serializable {
private static final long serialVersionUID = 89207356648232517L;
/**
* Creates a new instance of this class.
*/
public TraceEventComperator() {
// default empty constructor
}
public int compare(final AbstractTraceEvent o1, final AbstractTraceEvent o2) {
return o1.getOrderIndex() - o2.getOrderIndex();
}
}
}
}
/***************************************************************************
* Copyright 2013 Kieker Project (http://kieker-monitoring.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
***************************************************************************/
package kieker.analysis.plugin.filter.flow;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import kieker.analysis.IProjectContext;
import kieker.analysis.plugin.annotation.InputPort;
import kieker.analysis.plugin.annotation.OutputPort;
import kieker.analysis.plugin.annotation.Plugin;
import kieker.analysis.plugin.annotation.Property;
import kieker.analysis.plugin.filter.AbstractFilterPlugin;
import kieker.common.configuration.Configuration;
import kieker.common.logging.Log;
import kieker.common.logging.LogFactory;
import kieker.common.record.flow.IFlowRecord;
import kieker.common.record.flow.trace.WorkflowRecord;
import kieker.common.record.flow.trace.WorkflowTrace;
import kieker.common.util.StatisticInformation;
/**
* This filter collects incoming traces for a specified amount of time.
* Any traces representing the same series of events will be used to calculate
* statistic informations like the average runtime of this kind of trace.
* Only one specimen of these traces containing this information will be forwarded
* from this filter.
*
* Statistic outliers regarding the runtime of the trace will be treated special
* and therefore send out as they are and will not be mixed with others.
*
*
* @author Florian Biss
*
* @since 1.8
*/
@Plugin(description = "This filter tries to agglomerate similar WorkflowTraces into a single trace.",
outputPorts = {
@OutputPort(name = WorkflowRecordTraceAgglomerationFilter.OUTPUT_PORT_NAME_TRACES,
description = "Output port for the processed traces",
eventTypes = { WorkflowTrace.class })
},
configuration = {
@Property(name = WorkflowRecordTraceAgglomerationFilter.CONFIG_PROPERTY_NAME_TIMEUNIT,
defaultValue = WorkflowRecordTraceAgglomerationFilter.CONFIG_PROPERTY_VALUE_TIMEUNIT),
@Property(name = WorkflowRecordTraceAgglomerationFilter.CONFIG_PROPERTY_NAME_MAX_COLLECTION_DURATION,
defaultValue = WorkflowRecordTraceAgglomerationFilter.CONFIG_PROPERTY_VALUE_MAX_COLLECTION_DURATION),
@Property(name = WorkflowRecordTraceAgglomerationFilter.CONFIG_PROPERTY_NAME_MAX_DEVIATION,
defaultValue = WorkflowRecordTraceAgglomerationFilter.CONFIG_PROPERTY_VALUE_MAX_DEVIATION)
})
public class WorkflowRecordTraceAgglomerationFilter extends AbstractFilterPlugin {
/**
* The name of the output port delivering the valid traces.
*/
public static final String OUTPUT_PORT_NAME_TRACES = "tracesOut";
/**
* The name of the input port receiving the trace records.
*/
public static final String INPUT_PORT_NAME_TRACES = "tracesIn";
/**
* The name of the property determining the time unit.
*/
public static final String CONFIG_PROPERTY_NAME_TIMEUNIT = "timeunit";
/**
* Clock input for timeout handling.
*/
public static final String INPUT_PORT_NAME_TIME_EVENT = "timestamp";
/**
* The default value of the time unit property (nanoseconds).
*/
public static final String CONFIG_PROPERTY_VALUE_TIMEUNIT = "NANOSECONDS"; // TimeUnit.NANOSECONDS.name()
/**
* The name of the property determining the maximal trace timeout.
*/
public static final String CONFIG_PROPERTY_NAME_MAX_COLLECTION_DURATION = "maxCollectionDuration";
/**
* The default value of the property determining the maximal trace timeout.
*/
public static final String CONFIG_PROPERTY_VALUE_MAX_COLLECTION_DURATION = "5000000000";
/**
* The name of the property determining the maximal runtime deviation factor.
*
* Outliers are indicated by <code>|runtime - averageRuntime| > deviationFactor * standardDeviation</code>.
* Use negative number to agglomerate all traces.
*/
public static final String CONFIG_PROPERTY_NAME_MAX_DEVIATION = "maxDeviation";
/**
* The default value of the property determining the maximal runtime deviation factor.
* Default is two standard deviations.
*/
public static final String CONFIG_PROPERTY_VALUE_MAX_DEVIATION = "2";
private static final Log LOG = LogFactory.getLog(WorkflowRecordTraceAgglomerationFilter.class);
private final TimeUnit timeunit;
private final long maxCollectionDuration;
private final long maxDeviation;
private final Map<WorkflowTrace, TraceAgglomerationBuffer> trace2buffer;
/**
* Creates a new instance of this class using the given parameters.
*
* @param configuration
* The configuration for this component.
* @param projectContext
* The project context for this component.
*/
public WorkflowRecordTraceAgglomerationFilter(final Configuration configuration, final IProjectContext projectContext) {
super(configuration, projectContext);
if (null != projectContext) { // TODO #819 remove non-null check and else case in Kieker 1.8 //NOCS
final String recordTimeunitProperty = projectContext.getProperty(IProjectContext.CONFIG_PROPERTY_NAME_RECORDS_TIME_UNIT);
TimeUnit recordTimeunit;
try {
recordTimeunit = TimeUnit.valueOf(recordTimeunitProperty);
} catch (final IllegalArgumentException ex) { // already caught in AnalysisController, should never happen
LOG.warn(recordTimeunitProperty + " is no valid TimeUnit! Using NANOSECONDS instead.");
recordTimeunit = TimeUnit.NANOSECONDS;
}
this.timeunit = recordTimeunit;
} else {
this.timeunit = TimeUnit.NANOSECONDS;
}
this.maxDeviation = configuration.getLongProperty(CONFIG_PROPERTY_NAME_MAX_DEVIATION);
final String configTimeunitProperty = configuration.getStringProperty(CONFIG_PROPERTY_NAME_TIMEUNIT);
TimeUnit configTimeunit;
try {
configTimeunit = TimeUnit.valueOf(configTimeunitProperty);
} catch (final IllegalArgumentException ex) {
LOG.warn(configTimeunitProperty + " is no valid TimeUnit! Using inherited value of " + this.timeunit.name() + " instead.");
configTimeunit = this.timeunit;
}
this.maxCollectionDuration = this.timeunit.convert(configuration.getLongProperty(CONFIG_PROPERTY_NAME_MAX_COLLECTION_DURATION), configTimeunit);
this.trace2buffer = new TreeMap<WorkflowTrace, TraceAgglomerationBuffer>(new TraceComperator());
}
/**
* This method is the input port for incoming traces.
*
* @param record
* A WorkflowTrace
*/
@InputPort(
name = INPUT_PORT_NAME_TRACES,
description = "Collect identical traces and agglomerate them.",
eventTypes = { WorkflowTrace.class })
public void newEvent(final IFlowRecord event) {
final WorkflowTrace trace;
synchronized (this) {
if (event instanceof WorkflowTrace) {
trace = (WorkflowTrace) event;
if (!trace.isComplete() || trace.isDamaged()) {
super.deliver(OUTPUT_PORT_NAME_TRACES, trace); // Incomplete or damaged? Nothing to do here.
return;
} else {
this.insertIntoBuffer(trace);
}
} else {
LOG.error("Invalid input type at " + OUTPUT_PORT_NAME_TRACES
+ " in WorkflowRecordTraceAgglomerationFilter");
return; // invalid type
}
}
}
/**
* Inserts a WorkflowTrace into the buffer.
*
* @param trace
* The WorkflowTrace that will be inserted
*/
private void insertIntoBuffer(final WorkflowTrace trace) {
TraceAgglomerationBuffer traceBuffer;
final long timestamp;
timestamp = this.timeunit.convert(System.nanoTime(), TimeUnit.NANOSECONDS);
traceBuffer = this.trace2buffer.get(trace);
if (traceBuffer == null) { // first record for this id!
synchronized (this) {
traceBuffer = this.trace2buffer.get(trace);
if (traceBuffer == null) { // NOCS (DCL)
traceBuffer = new TraceAgglomerationBuffer(timestamp, this.maxDeviation);
this.trace2buffer.put(trace, traceBuffer);
}
}
}
traceBuffer.insertTrace(trace);
}
/**
* This method is the input port for the timeout.
*
* @param timestamp
* The current nanotime
*/
@InputPort(
name = INPUT_PORT_NAME_TIME_EVENT,
description = "Time signal for timeouts",
eventTypes = { Long.class })
public void newEvent(final Long timestamp) {
synchronized (this) {
this.processTimeoutQueue(timestamp);
}
}
/**
* {@inheritDoc}
*/
@Override
public void terminate(final boolean error) {
synchronized (this) {
// Avoid ConcurrentModificationException, deliverBuffer will remove the buffer from trace2buffer.
final List<TraceAgglomerationBuffer> buffers = new LinkedList<TraceAgglomerationBuffer>(this.trace2buffer.values());
for (final TraceAgglomerationBuffer traceBuffer : buffers) {
this.deliverBuffer(traceBuffer);
}
this.trace2buffer.clear();
}
}
private void processTimeoutQueue(final long timestamp) {
final long bufferTimeout = timestamp - this.maxCollectionDuration;
// Avoid ConcurrentModificationException, deliverBuffer will remove the buffer from trace2buffer.
final List<TraceAgglomerationBuffer> buffers = new LinkedList<TraceAgglomerationBuffer>(this.trace2buffer.values());
for (final TraceAgglomerationBuffer traceBuffer : buffers) {
if (traceBuffer.getBufferCreatedTimestamp() <= bufferTimeout) {
this.deliverBuffer(traceBuffer);
}
}
}
/**
* Deliver and remove a buffer.
*
* @param traceBuffer
*/
private void deliverBuffer(final TraceAgglomerationBuffer traceBuffer) {
final List<WorkflowTrace> traces = traceBuffer.processBuffer();
for (final WorkflowTrace trace : traces) {
super.deliver(OUTPUT_PORT_NAME_TRACES, trace);
}
// Kill the buffer
if (this.trace2buffer.remove(traces.get(0)) == null) {
LOG.warn("Removal of buffer failed.");
}
}
/**
* {@inheritDoc}
*/
@Override
public Configuration getCurrentConfiguration() {
final Configuration configuration = new Configuration();
configuration.setProperty(CONFIG_PROPERTY_NAME_TIMEUNIT, this.timeunit.name());
configuration.setProperty(CONFIG_PROPERTY_NAME_MAX_COLLECTION_DURATION, String.valueOf(this.maxCollectionDuration));
configuration.setProperty(CONFIG_PROPERTY_NAME_MAX_DEVIATION, String.valueOf(this.maxDeviation));
return configuration;
}
/**
* Buffer for similar traces that are to be agglomerated into a single trace.
*
* @author Florian Biß
*/
private static final class TraceAgglomerationBuffer {
private static final Log LOG = LogFactory.getLog(TraceAgglomerationBuffer.class);
/** Contains all buffered traces for statistical purposes. */
private final List<WorkflowTrace> traces = new ArrayList<WorkflowTrace>();
/** Contains each unique trace at most once. */
private WorkflowTrace agglomeratedTraces;
private final long bufferCreatedTimestamp;
/**
* Maximal runtime deviation factor. Use negative number to agglomerate all traces.
*
* Outliers are indicated by <code>averageRuntime - deviationFactor * standardDeviation > runtime</code> or
* <code>runtime > averageRuntime + deviationFactor * standardDeviation</code>
*
*/
private final long deviationFactor;
/**
* Creates a new instance of this class.
*/
public TraceAgglomerationBuffer(final long bufferCreatedTimestamp, final long maxDeviation) {
this.bufferCreatedTimestamp = bufferCreatedTimestamp;
this.deviationFactor = maxDeviation;
}
/**
* Insert a trace into this buffer.
*
* @param Trace
* to insert
*/
public void insertTrace(final WorkflowTrace trace) {
if (LOG.isDebugEnabled()) {
LOG.debug("Inserting into AgglomerationBuffer: " + trace.toString());
}
synchronized (this) {
this.traces.add(trace);
}
}
/**
* Agglomerate all traces that can and should be agglomerated.
*
* @return List of agglomerated traces and statistic outliers.
*/
public List<WorkflowTrace> processBuffer() {
final List<WorkflowTrace> processed = new ArrayList<WorkflowTrace>();
final StatisticInformation tmpRuntime;
synchronized (this) {
tmpRuntime = this.getTempBufferStatistic();
for (final WorkflowTrace trace : this.traces) {
// If deviationFactor is negative do not care about outliers.
if ((this.deviationFactor > 0) && this.isOutlier(tmpRuntime, trace.getRuntime())) {
if (LOG.isDebugEnabled()) {
LOG.debug("Statistical outlier: " + trace.toString());
}
// Outlier. Do not agglomerate.
processed.add(trace);
} else {
// Add to agglomeratedTraces.
this.agglomerate(trace);
}
}
processed.add(this.agglomeratedTraces);
}
return processed;
}
/**
* Merge a trace with the agglomerated traces.
*
* @param trace
* Trace to agglomerate
*/
private void agglomerate(final WorkflowTrace trace) {
if (this.agglomeratedTraces == null) {
// Trace is new, add to map.
this.agglomeratedTraces = trace;
} else {
// Trace exists, merge statistics for every record.
final WorkflowRecord[] agglomeratedRecords = this.agglomeratedTraces.getTraceEvents();
final WorkflowRecord[] records = trace.getTraceEvents();
for (int i = 0; i < agglomeratedRecords.length; i++) {
agglomeratedRecords[i].getRuntime().merge(records[i].getRuntime());
}
this.agglomeratedTraces.getRuntime().merge(trace.getRuntime());
}
}
/**
* Detect outliers.
*
* @param averagedRuntimes
* Statistics to check against
* @param recordRuntime
* Data to check
* @return
* <code>true</code> if both are too different
*/
private boolean isOutlier(final StatisticInformation averagedRuntimes, final StatisticInformation recordRuntime) {
final long maxDeviation = averagedRuntimes.getStandardDeviation() * this.deviationFactor;
if (maxDeviation < 0) {
return true;
}
return Math.abs(recordRuntime.getAvg() - averagedRuntimes.getAvg()) > maxDeviation;
}
/**
* @return Creation time of this buffer.
*/
public long getBufferCreatedTimestamp() {
synchronized (this) {
return this.bufferCreatedTimestamp;
}
}
/**
* The the statistical runtime informations about a buffer so far.
*
* @return Runtime informations.
*/
private StatisticInformation getTempBufferStatistic() {
final StatisticInformation tmpStatistic = new StatisticInformation();
for (final WorkflowTrace trace : this.traces) {
tmpStatistic.merge(trace.getRuntime());
}
return tmpStatistic;
}
}
/**
* Compares traces based on their content instead of traceIds.
* Order of comparisons: Process name -> Trace lenght -> Id of each node in trace.
* If it all matches both traces are similar and may be agglomerated, albeit TraceId and runtimes might differ.
*
* @author Florian Biß
*
*/
private static final class TraceComperator implements Comparator<WorkflowTrace>, Serializable {
private static final long serialVersionUID = 8920766818232517L;
private static final Log LOG = LogFactory.getLog(TraceComperator.class);
/**
* Creates a new instance of this class.
*/
public TraceComperator() {
// default empty constructor
}
/**
* {@inheritDoc}
*/
public int compare(final WorkflowTrace t1, final WorkflowTrace t2) {
final int compProcesses = t1.getProcessName().compareTo(t2.getProcessName());
if (compProcesses != 0) {
return compProcesses;
}
final WorkflowRecord[] recordsT1 = t1.getTraceEvents();
final WorkflowRecord[] recordsT2 = t2.getTraceEvents();
if ((recordsT1.length - recordsT2.length) != 0) {
return recordsT1.length - recordsT2.length;
}
// Records in traces are already sorted by orderIndex, only compare nodeIds.
for (int i = 0; i < recordsT1.length; i++) {
final WorkflowRecord r1 = recordsT1[i];
final WorkflowRecord r2 = recordsT2[i];
final long idDiff = r1.getNodeId() - r2.getNodeId();
if ((idDiff > Integer.MAX_VALUE) || (idDiff < Integer.MIN_VALUE)) {
LOG.warn("Overflow during thread comparison!");
}
if (idDiff != 0) {
return (int) (r1.getNodeId() - r2.getNodeId());
}
}
// All records match.
return 0;
}
}
}
package kieker.analysis.plugin.reader.mq;
public class Bits {
static boolean getBoolean(byte[] b, int off) {
return b[off] != 0;
}
static char getChar(byte[] b, int off) {
return (char) ((b[off + 1] & 0xFF) +
(b[off] << 8));
}
static short getShort(byte[] b, int off) {
return (short) ((b[off + 1] & 0xFF) +
(b[off] << 8));
}
static int getInt(byte[] b, int off) {
return ((b[off + 3] & 0xFF) ) +
((b[off + 2] & 0xFF) << 8) +
((b[off + 1] & 0xFF) << 16) +
((b[off ] ) << 24);
}
static float getFloat(byte[] b, int off) {
return Float.intBitsToFloat(getInt(b, off));
}
static long getLong(byte[] b, int off) {
return ((b[off + 7] & 0xFFL) ) +
((b[off + 6] & 0xFFL) << 8) +
((b[off + 5] & 0xFFL) << 16) +
((b[off + 4] & 0xFFL) << 24) +
((b[off + 3] & 0xFFL) << 32) +
((b[off + 2] & 0xFFL) << 40) +
((b[off + 1] & 0xFFL) << 48) +
(((long) b[off]) << 56);
}
static double getDouble(byte[] b, int off) {
return Double.longBitsToDouble(getLong(b, off));
}
public static byte getByte(byte[] b, int off) {
return b[off];
}
static void putBoolean(byte[] b, int off, boolean val) {
b[off] = (byte) (val ? 1 : 0);
}
static void putChar(byte[] b, int off, char val) {
b[off + 1] = (byte) (val );
b[off ] = (byte) (val >>> 8);
}
static void putShort(byte[] b, int off, short val) {
b[off + 1] = (byte) (val );
b[off ] = (byte) (val >>> 8);
}
static void putInt(byte[] b, int off, int val) {
b[off + 3] = (byte) (val );
b[off + 2] = (byte) (val >>> 8);
b[off + 1] = (byte) (val >>> 16);
b[off ] = (byte) (val >>> 24);
}
static void putFloat(byte[] b, int off, float val) {
putInt(b, off, Float.floatToIntBits(val));
}
static void putLong(byte[] b, int off, long val) {
b[off + 7] = (byte) (val );
b[off + 6] = (byte) (val >>> 8);
b[off + 5] = (byte) (val >>> 16);
b[off + 4] = (byte) (val >>> 24);
b[off + 3] = (byte) (val >>> 32);
b[off + 2] = (byte) (val >>> 40);
b[off + 1] = (byte) (val >>> 48);
b[off ] = (byte) (val >>> 56);
}
static void putDouble(byte[] b, int off, double val) {
putLong(b, off, Double.doubleToLongBits(val));
}
public static void putByte(byte[] b, int off, byte val) {
b[off] = val;
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment