Skip to content
Snippets Groups Projects
Commit da2b7bd8 authored by Florian Fittkau's avatar Florian Fittkau
Browse files

4.5 millionen records per second :)

parent fe0abf85
Branches
Tags
No related merge requests found
......@@ -6,5 +6,11 @@
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
<classpathentry combineaccessrules="false" kind="src" path="/monitored-application"/>
<classpathentry kind="lib" path="lib/kieker-1.8-SNAPSHOT_fast.jar"/>
<classpathentry kind="lib" path="lib/disruptor-3.2.0.jar"/>
<classpathentry kind="lib" path="lib/javolution-core-java-6.0.0-20130804.010711-11.jar">
<attributes>
<attribute name="javadoc_location" value="jar:platform:/resource/worker/lib/javolution-core-java-6.0.0-20130804.010711-11-javadoc.jar!/"/>
</attributes>
</classpathentry>
<classpathentry kind="output" path="bin"/>
</classpath>
File added
File added
File added
File deleted
......@@ -16,170 +16,25 @@
package explorviz.hpc_monitoring.filter;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import kieker.analysis.IProjectContext;
import kieker.analysis.plugin.annotation.*;
import kieker.analysis.plugin.filter.AbstractFilterPlugin;
import kieker.common.configuration.Configuration;
import kieker.common.logging.Log;
import kieker.common.logging.LogFactory;
import kieker.common.record.IMonitoringRecord;
// import kieker.common.util.ImmutableEntry;
/**
* An instance of this class computes the throughput in terms of the number of
* events received per time unit.
*
* Note that only one of the input ports should be used in a configuration!
*
* @author Andre van Hoorn, Jan Waller
*
* @since 1.6
*/
@Plugin(description = "A filter computing the throughput in terms of the number of events received per time unit", outputPorts = {
@OutputPort(name = CountingThroughputFilter.OUTPUT_PORT_NAME_RELAYED_OBJECTS, eventTypes = { Object.class }, description = "Provides each incoming object"),
@OutputPort(name = CountingThroughputFilter.OUTPUT_PORT_NAME_THROUGHPUT, eventTypes = { Object.class }, description = "Provides throughput per interval") }, configuration = {
@Property(name = CountingThroughputFilter.CONFIG_PROPERTY_NAME_TIMEUNIT, defaultValue = CountingThroughputFilter.CONFIG_PROPERTY_VALUE_TIMEUNIT),
@Property(name = CountingThroughputFilter.CONFIG_PROPERTY_NAME_INTERVAL_SIZE, defaultValue = CountingThroughputFilter.CONFIG_PROPERTY_VALUE_INTERVAL_SIZE_ONE_MINUTE),
@Property(name = CountingThroughputFilter.CONFIG_PROPERTY_NAME_INTERVALS_BASED_ON_1ST_TSTAMP, defaultValue = "true") })
public final class CountingThroughputFilter extends AbstractFilterPlugin {
/**
* The name of the input port receiving the records.
*/
public static final String INPUT_PORT_NAME_RECORDS = "inputRecords";
/**
* The name of the input port receiving other objects.
*/
public static final String INPUT_PORT_NAME_OBJECTS = "inputObjects";
/**
* The name of the output port delivering the received objects.
*/
public static final String OUTPUT_PORT_NAME_RELAYED_OBJECTS = "relayedEvents";
public static final String OUTPUT_PORT_NAME_THROUGHPUT = "throughputPerInterval";
/** The name of the property determining the time unit. */
public static final String CONFIG_PROPERTY_NAME_TIMEUNIT = "timeunit";
/** The name of the property determining the interval size. */
public static final String CONFIG_PROPERTY_NAME_INTERVAL_SIZE = "intervalSize";
/**
* The default value of the time unit property (nanoseconds).
*/
public static final String CONFIG_PROPERTY_VALUE_TIMEUNIT = "NANOSECONDS"; // TimeUnit.NANOSECONDS.name()
/**
* If the value is set to false, the intervals are computed based on time
* since 1970-1-1.
*/
public static final String CONFIG_PROPERTY_NAME_INTERVALS_BASED_ON_1ST_TSTAMP = "intervalsBasedOn1stTstamp";
/**
* The configuration property value for
* {@link #CONFIG_PROPERTY_NAME_INTERVAL_SIZE}, leading to a bin size of 1
* minute.
*/
public static final String CONFIG_PROPERTY_VALUE_INTERVAL_SIZE_ONE_MINUTE = "60000000000";
private static final Log LOG = LogFactory
.getLog(CountingThroughputFilter.class);
public final class CountingThroughputFilter {
private volatile long firstIntervalStart = -1;
private final boolean intervalsBasedOn1stTstamp;
private final TimeUnit timeunit;
/**
* For a key <i>k</i>, the {@link Queue} stores the number of events
* observed in the time interval <i>(k-intervalSize,k(</i>, i.e.,
* the interval <b>excludes</b> the value <i>k</i>.
*/
private final Queue<Entry<Long, Long>> eventCountsPerInterval = new ConcurrentLinkedQueue<Entry<Long, Long>>();
private final long intervalSize;
private final AtomicLong currentCountForCurrentInterval = new AtomicLong(
0);
private final AtomicLong currentCountForCurrentInterval = new AtomicLong(0);
private volatile long firstTimestampInCurrentInterval = -1; // initialized
// with
// the
// first
// incoming
// event
private volatile long lastTimestampInCurrentInterval = -1; // initialized
// with
// the
// first
// incoming
// event
private volatile long firstTimestampInCurrentInterval = -1;
private volatile long lastTimestampInCurrentInterval = -1;
/**
* Creates a new instance of this class using the given parameters.
*
* @param configuration
* The configuration for this component.
* @param projectContext
* The project context for this component.
*/
public CountingThroughputFilter(final Configuration configuration,
final IProjectContext projectContext) {
super(configuration, projectContext);
final String recordTimeunitProperty = projectContext
.getProperty(IProjectContext.CONFIG_PROPERTY_NAME_RECORDS_TIME_UNIT);
TimeUnit recordTimeunit;
try {
recordTimeunit = TimeUnit.valueOf(recordTimeunitProperty);
}
catch (final IllegalArgumentException ex) { // already caught in
// AnalysisController,
// should never happen
LOG.warn(recordTimeunitProperty
+ " is no valid TimeUnit! Using NANOSECONDS instead.");
recordTimeunit = TimeUnit.NANOSECONDS;
}
timeunit = recordTimeunit;
final String configTimeunitProperty = configuration
.getStringProperty(CONFIG_PROPERTY_NAME_TIMEUNIT);
TimeUnit configTimeunit;
try {
configTimeunit = TimeUnit.valueOf(configTimeunitProperty);
}
catch (final IllegalArgumentException ex) {
LOG.warn(configTimeunitProperty
+ " is no valid TimeUnit! Using inherited value of "
+ timeunit.name() + " instead.");
configTimeunit = timeunit;
}
intervalSize = timeunit.convert(configuration
.getLongProperty(CONFIG_PROPERTY_NAME_INTERVAL_SIZE),
configTimeunit);
intervalsBasedOn1stTstamp = configuration
.getBooleanProperty(CONFIG_PROPERTY_NAME_INTERVALS_BASED_ON_1ST_TSTAMP);
}
/**
* {@inheritDoc}
*/
@Override
public final Configuration getCurrentConfiguration() {
final Configuration configuration = new Configuration();
configuration.setProperty(CONFIG_PROPERTY_NAME_TIMEUNIT,
timeunit.name());
configuration.setProperty(CONFIG_PROPERTY_NAME_INTERVAL_SIZE,
Long.toString(intervalSize));
configuration.setProperty(
CONFIG_PROPERTY_NAME_INTERVALS_BASED_ON_1ST_TSTAMP,
Boolean.toString(intervalsBasedOn1stTstamp));
return configuration;
public CountingThroughputFilter() {
timeunit = TimeUnit.NANOSECONDS;
intervalSize = timeunit.convert(1000000000, TimeUnit.NANOSECONDS);
}
private void processEvent(final Object event, final long currentTime) {
......@@ -187,33 +42,11 @@ public final class CountingThroughputFilter extends AbstractFilterPlugin {
final long endOfTimestampsInterval = computeLastTimestampInInterval(currentTime);
synchronized (this) {
// Check if we need to close the current interval.
if (endOfTimestampsInterval > lastTimestampInCurrentInterval) {
if (firstTimestampInCurrentInterval >= 0) { // don't do this for
// the first record
// (only used for
// initialization of
// variables)
long currentCount = currentCountForCurrentInterval.get();
// this.eventCountsPerInterval.add(
// new ImmutableEntry<Long, Long>(
// this.lastTimestampInCurrentInterval + 1,
// currentCount));
super.deliver(OUTPUT_PORT_NAME_THROUGHPUT, currentCount);
// long numIntervalsElapsed = 1; // refined below
// numIntervalsElapsed = (endOfTimestampsInterval -
// this.lastTimestampInCurrentInterval) / this.intervalSize;
// if (numIntervalsElapsed > 1) { // NOPMD
// (AvoidDeeplyNestedIfStmts)
// for (int i = 1; i < numIntervalsElapsed; i++) {
// this.eventCountsPerInterval.add(
// new ImmutableEntry<Long,
// Long>((this.lastTimestampInCurrentInterval + (i *
// this.intervalSize)) + 1, 0L));
// }
// }
if (firstTimestampInCurrentInterval >= 0) {
final long currentCount = currentCountForCurrentInterval
.get();
System.out.println(currentCount);
}
firstTimestampInCurrentInterval = startOfTimestampsInterval;
......@@ -221,61 +54,19 @@ public final class CountingThroughputFilter extends AbstractFilterPlugin {
currentCountForCurrentInterval.set(0);
}
currentCountForCurrentInterval.incrementAndGet(); // only
// incremented in
// synchronized
// blocks
}
super.deliver(OUTPUT_PORT_NAME_RELAYED_OBJECTS, event);
currentCountForCurrentInterval.incrementAndGet();
}
/**
* This method represents the input port for incoming records.
*
* @param record
* The next record.
*/
// #841 What happens with unordered events (i.e., timestamps before
// firstTimestampInCurrentInterval)?
@InputPort(name = INPUT_PORT_NAME_RECORDS, eventTypes = { IMonitoringRecord.class }, description = "Receives incoming monitoring records to be considered for the throughput computation and uses the record's logging timestamp")
public final void inputRecord(final IMonitoringRecord record) {
processEvent(record, record.getLoggingTimestamp());
}
/**
* This method represents the input port for incoming object.
*
* @param object
* The next object.
*/
@InputPort(name = INPUT_PORT_NAME_OBJECTS, eventTypes = { Object.class }, description = "Receives incoming objects to be considered for the throughput computation and uses the current system time")
public final void inputObjects(final Object object) {
processEvent(object, currentTime());
}
/**
* Returns the current time in {@link TimeUnit#MILLISECONDS} since 1970.
*
* @return The current time
*/
private long currentTime() {
return timeunit.convert(System.currentTimeMillis(),
TimeUnit.MILLISECONDS);
}
// #840 is this correct? it probably makes more sense to provide a copy.
public Collection<Entry<Long, Long>> getCountsPerInterval() {
return Collections.unmodifiableCollection(eventCountsPerInterval);
}
/**
* Returns the first timestamp included in the interval that corresponds to
* the given timestamp.
*
* @param timestamp
*
* @return The timestamp in question.
*/
private long computeFirstTimestampInInterval(final long timestamp) {
final long referenceTimePoint;
......@@ -283,64 +74,17 @@ public final class CountingThroughputFilter extends AbstractFilterPlugin {
firstIntervalStart = timestamp;
}
if (intervalsBasedOn1stTstamp) {
referenceTimePoint = firstIntervalStart;
}
else {
referenceTimePoint = 0;
}
return referenceTimePoint
+ (((timestamp - referenceTimePoint) / intervalSize) * intervalSize);
}
/**
* Returns the last timestamp included in the interval that corresponds to
* the given timestamp.
*
* @param timestamp
* @return The timestamp in question.
*/
private long computeLastTimestampInInterval(final long timestamp) {
final long referenceTimePoint;
if (intervalsBasedOn1stTstamp) {
referenceTimePoint = firstIntervalStart;
}
else {
referenceTimePoint = 0;
}
return referenceTimePoint
+ (((((timestamp - referenceTimePoint) / intervalSize) + 1) * intervalSize) - 1);
}
/**
* @return the intervalSize
*/
public long getIntervalSize() {
return intervalSize;
}
/**
* @return the firstTimestampInCurrentInterval -1 if no record processed so
* far
*/
public long getFirstTimestampInCurrentInterval() {
return firstTimestampInCurrentInterval;
}
/**
* @return the lastTimestampInCurrentInterval -1 if no record processed so
* far
*/
public long getLastTimestampInCurrentInterval() {
return lastTimestampInCurrentInterval;
}
/**
* @return the currentCountForCurrentInterval
*/
public long getCurrentCountForCurrentInterval() {
return currentCountForCurrentInterval.get();
}
}
......@@ -20,114 +20,48 @@ import java.io.BufferedInputStream;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import kieker.analysis.IProjectContext;
import kieker.analysis.plugin.annotation.*;
import kieker.analysis.plugin.reader.AbstractReaderPlugin;
import kieker.common.configuration.Configuration;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import kieker.common.logging.Log;
import kieker.common.logging.LogFactory;
import kieker.common.record.IMonitoringRecord;
import explorviz.hpc_monitoring.byteaccess.UnsafeBits;
import explorviz.hpc_monitoring.record.Trace;
import explorviz.hpc_monitoring.record.events.*;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import explorviz.hpc_monitoring.writer.RecordEvent;
/**
* Reads monitoring records from the queue of an established RabbitMQ
* connection.
*
* @author Santje Finke
*
* @since 1.8
*/
@Plugin(description = "A reader which reads records from a RabbitMQ queue", dependencies = "This plugin needs the file 'rabbitmq.client-*.jar'.", outputPorts = { @OutputPort(name = TCPReader.OUTPUT_PORT_NAME_RECORDS, eventTypes = { Object.class }, description = "Output Port of the JMSReader") }, configuration = {
@Property(name = TCPReader.CONFIG_PROPERTY_NAME_PROVIDERURL, defaultValue = "localhost"),
@Property(name = TCPReader.CONFIG_PROPERTY_PORT, defaultValue = "10133")
})
public final class TCPReader extends AbstractReaderPlugin {
public final class TCPReader {
private static final int MESSAGE_BUFFER_SIZE = 65536;
private final byte[] messages = new byte[MESSAGE_BUFFER_SIZE];
/** The name of the output port delivering the received records. */
public static final String OUTPUT_PORT_NAME_RECORDS = "monitoringRecords";
/** The name of the configuration determining the RabbitMQ provider URL. */
public static final String CONFIG_PROPERTY_NAME_PROVIDERURL = "mqProviderUrl";
/** The port that is used to connect to a queue. */
public static final String CONFIG_PROPERTY_PORT = "10133";
public static final String CONFIG_PROPERTY_NAME_PROVIDERURL = "127.0.0.1";
public static final int CONFIG_PROPERTY_PORT = 10133;
static final Log LOG = LogFactory
.getLog(TCPReader.class); // NOPMD
// package
// for
// inner
// class
.getLog(TCPReader.class);
private final String providerUrl;
private final int port;
private final CountDownLatch cdLatch = new CountDownLatch(
1);
private ServerSocket serversocket;
private boolean active = true;
private final Map<Integer, String> stringRegistry = new ConcurrentHashMap<Integer, String>(
16,
0.75f,
2);
private final TestFilter eventHandler1;
private ServerSocket serversocket;
private final boolean active = true;
private final RingBuffer<RecordEvent> ringBuffer;
/**
* Creates a new instance of this class using the given parameters.
*
* @param configuration
* The configuration used to initialize the whole reader. Keep in
* mind that the configuration should contain the following
* properties:
* <ul>
* <li>The property {@link #CONFIG_PROPERTY_NAME_PROVIDERURL},
* e.g. {@code localhost}
* <li>The property {@link #CONFIG_PROPERTY_NAME_QUEUE}, e.g.
* {@code queue1}
* <li>The property {@link #CONFIG_PROPERTY_PASSWORD}, e.g.
* {@code password}
* <li>The property {@link #CONFIG_PROPERTY_USER}, e.g.
* {@code username}
* <li>The property {@link #CONFIG_PROPERTY_PORT}, e.g.
* {@code port}
* </ul>
* @param projectContext
* The project context for this component.
*
* @throws IllegalArgumentException
* If one of the properties is empty.
*/
public TCPReader(final Configuration configuration,
final IProjectContext projectContext)
throws IllegalArgumentException {
super(configuration, projectContext);
@SuppressWarnings("unchecked")
public TCPReader() throws IllegalArgumentException {
port = 10133;
// Initialize the reader bases on the given configuration.
providerUrl = configuration
.getStringProperty(CONFIG_PROPERTY_NAME_PROVIDERURL);
port = configuration.getIntProperty(CONFIG_PROPERTY_PORT);
final ExecutorService exec = Executors.newCachedThreadPool();
final Disruptor<RecordEvent> disruptor = new Disruptor<RecordEvent>(
RecordEvent.EVENT_FACTORY, 8192, exec);
// registryConsumer = new RabbitMQRegistryConsumer(this, providerUrl,
// "registryRecords", username, password, port);
// registryConsumer.start();
eventHandler1 = new TestFilter();
disruptor.handleEventsWith(eventHandler1);
ringBuffer = disruptor.start();
}
/**
* A call to this method is a blocking call.
*
* @return true if the method succeeds, false otherwise.
*/
@Override
public boolean read() {
boolean retVal = true;
public void read() {
try {
open();
while (active) {
......@@ -135,29 +69,17 @@ public final class TCPReader extends AbstractReaderPlugin {
final Socket socket = serversocket.accept();
final BufferedInputStream bufferedInputStream = new BufferedInputStream(
socket.getInputStream(), MESSAGE_BUFFER_SIZE);
int readSize = 0;
int toReadOffset = 0;
while ((readSize = bufferedInputStream.read(messages,
toReadOffset, MESSAGE_BUFFER_SIZE - toReadOffset)) != -1) {
final byte[] unreadBytes = messagesfromByteArray(messages,
readSize + toReadOffset);
if (unreadBytes != null) {
toReadOffset = unreadBytes.length;
System.arraycopy(unreadBytes, 0, messages, 0,
toReadOffset);
}
else {
toReadOffset = 0;
}
int readBytes = 0;
while ((readBytes = bufferedInputStream.read(messages, 0,
MESSAGE_BUFFER_SIZE)) != -1) {
putInRingBuffer(messages, readBytes);
}
socket.close();
}
}
catch (final IOException ex) { // NOPMD NOCS
// (IllegalCatchCheck)
catch (final IOException ex) {
LOG.error("Error in read()", ex);
retVal = false;
}
finally {
try {
......@@ -167,203 +89,24 @@ public final class TCPReader extends AbstractReaderPlugin {
LOG.error("Error in read()", e);
}
}
return retVal;
}
private void open() throws IOException {
serversocket = new ServerSocket(port);
}
private byte[] messagesfromByteArray(final byte[] b, final int readSize) {
int offset = 0;
while (offset < readSize) {
if ((readSize - offset) < 4) {
return createUnreadBytesArray(b, readSize, offset, false);
}
final int clazzId = UnsafeBits.getInt(b, offset);
offset += 4;
IMonitoringRecord record = null;
switch (clazzId) {
case 0: {
if ((readSize - offset) < (8 + 4 + 8 + 4 + 4)) {
return createUnreadBytesArray(b, readSize, offset, true);
}
final long traceId = UnsafeBits.getLong(b, offset);
offset += 8;
final Integer hostnameId = UnsafeBits.getInt(b, offset);
offset += 4;
final long parentTraceId = UnsafeBits.getLong(b, offset);
offset += 8;
final int parentOrderId = UnsafeBits.getInt(b, offset);
offset += 4;
final Integer applicationId = UnsafeBits.getInt(b, offset);
offset += 4;
record = new Trace(traceId,
getStringFromRegistry(hostnameId), parentTraceId,
parentOrderId, getStringFromRegistry(applicationId));
break;
}
case 1: {
if ((readSize - offset) < (8 + 8 + 4 + 4)) {
return createUnreadBytesArray(b, readSize, offset, true);
}
final long timestamp = UnsafeBits.getLong(b, offset);
offset += 8;
final long traceId = UnsafeBits.getLong(b, offset);
offset += 8;
final int orderIndex = UnsafeBits.getInt(b, offset);
offset += 4;
final Integer operationId = UnsafeBits.getInt(b, offset);
offset += 4;
record = new BeforeOperationEvent(timestamp, traceId,
orderIndex, getStringFromRegistry(operationId));
break;
}
case 2: {
if ((readSize - offset) < (8 + 8 + 4 + 4 + 4)) {
return createUnreadBytesArray(b, readSize, offset, true);
}
final long timestamp = UnsafeBits.getLong(b, offset);
offset += 8;
final long traceId = UnsafeBits.getLong(b, offset);
offset += 8;
final int orderIndex = UnsafeBits.getInt(b, offset);
offset += 4;
final Integer operationId = UnsafeBits.getInt(b, offset);
offset += 4;
final Integer causeId = UnsafeBits.getInt(b, offset);
offset += 4;
record = new AfterFailedOperationEvent(timestamp, traceId,
orderIndex, getStringFromRegistry(operationId),
getStringFromRegistry(causeId));
break;
}
case 3: {
if ((readSize - offset) < (8 + 8 + 4 + 4)) {
return createUnreadBytesArray(b, readSize, offset, true);
}
final long timestamp = UnsafeBits.getLong(b, offset);
offset += 8;
final long traceId = UnsafeBits.getLong(b, offset);
offset += 8;
final int orderIndex = UnsafeBits.getInt(b, offset);
offset += 4;
final Integer operationId = UnsafeBits.getInt(b, offset);
offset += 4;
record = new AfterOperationEvent(timestamp, traceId,
orderIndex, getStringFromRegistry(operationId));
break;
}
case 4: {
if ((readSize - offset) < (4 + 4)) {
return createUnreadBytesArray(b, readSize, offset, true);
}
final Integer mapId = UnsafeBits.getInt(b, offset);
offset += 4;
final int stringLength = UnsafeBits.getInt(b, offset);
offset += 4;
if ((readSize - offset) < stringLength) {
return createUnreadBytesArray(b, readSize, offset - 8,
true);
}
final byte[] stringBytes = new byte[stringLength];
System.arraycopy(b, offset, stringBytes, 0, stringLength);
final String string = new String(stringBytes);
offset += stringLength;
addToRegistry(mapId, string);
break;
}
default: {
LOG.error("unknown class id " + clazzId);
}
}
if ((record != null)
&& !super.deliver(OUTPUT_PORT_NAME_RECORDS, record)) {
LOG.error("deliverRecord returned false");
}
}
return null;
}
private byte[] createUnreadBytesArray(final byte[] b, final int readSize,
int offset, final boolean withClazzId) {
if (withClazzId) {
offset -= 4;
}
final int unreadBytesSize = readSize - offset;
final byte[] unreadBytes = new byte[unreadBytesSize];
System.arraycopy(b, offset, unreadBytes, 0, unreadBytesSize);
return unreadBytes;
}
final void unblock() { // NOPMD (package visible for inner class)
cdLatch.countDown();
private void putInRingBuffer(final byte[] message, final int readBytes) {
final long hiseq = ringBuffer.next();
final RecordEvent valueEvent = ringBuffer.get(hiseq);
final byte[] toSaveCopy = new byte[readBytes];
System.arraycopy(message, 0, toSaveCopy, 0, readBytes);
valueEvent.setValue(toSaveCopy);
valueEvent.setLength(readBytes);
ringBuffer.publish(hiseq);
}
/**
* {@inheritDoc}
*/
@Override
public void terminate(final boolean error) {
LOG.info("Shutdown of RabbitMQReader requested.");
unblock();
}
/**
* {@inheritDoc}
*/
@Override
public Configuration getCurrentConfiguration() {
final Configuration configuration = new Configuration();
configuration
.setProperty(CONFIG_PROPERTY_NAME_PROVIDERURL, providerUrl);
return configuration;
}
public void addToRegistry(final Integer key, final String value) {
stringRegistry.put(key, value);
synchronized (this) {
notifyAll();
}
}
private String getStringFromRegistry(final Integer id) {
String result = stringRegistry.get(id);
while (result == null) {
try {
synchronized (this) {
System.out.println("waiting for " + id);
this.wait();
}
}
catch (final InterruptedException e) {
e.printStackTrace();
}
result = stringRegistry.get(id);
}
return result;
LOG.info("Shutdown of TCPReader requested.");
active = false;
}
}
package explorviz.hpc_monitoring.reader;
import java.util.Map;
import javolution.util.FastMap;
import kieker.common.record.IMonitoringRecord;
import com.lmax.disruptor.EventHandler;
import explorviz.hpc_monitoring.byteaccess.UnsafeBits;
import explorviz.hpc_monitoring.filter.CountingThroughputFilter;
import explorviz.hpc_monitoring.record.Trace;
import explorviz.hpc_monitoring.record.events.*;
import explorviz.hpc_monitoring.writer.RecordEvent;
public class TestFilter implements EventHandler<RecordEvent> {
private final CountingThroughputFilter counter = new CountingThroughputFilter();
private final FastMap<Integer, String> stringRegistryInternal = new FastMap<Integer, String>();
private Map<Integer, String> stringRegistry = stringRegistryInternal
.toImmutable()
.value();
private byte[] unreadBytes = null;
@Override
public void onEvent(final RecordEvent event, final long sequence,
final boolean endOfBatch) throws Exception {
final byte[] received = event.getValue();
final int receivedLength = event.getLength();
byte[] messages = null;
int messagesLength = 0;
if (unreadBytes != null) {
final int unreadBytesLength = unreadBytes.length;
messagesLength = receivedLength + unreadBytesLength;
messages = new byte[messagesLength];
System.arraycopy(unreadBytes, 0, messages, 0, unreadBytesLength);
System.arraycopy(received, 0, messages, unreadBytesLength,
receivedLength);
}
else {
messages = received;
messagesLength = receivedLength;
}
unreadBytes = messagesfromByteArray(messages, messagesLength);
counter.inputObjects(messages);
}
private byte[] messagesfromByteArray(final byte[] b, final int readSize) {
int offset = 0;
while (offset < readSize) {
if ((readSize - offset) < 4) {
return createUnreadBytesArray(b, readSize, offset, false);
}
final int clazzId = UnsafeBits.getInt(b, offset);
offset += 4;
IMonitoringRecord record = null;
switch (clazzId) {
case 0: {
if ((readSize - offset) < (8 + 4 + 8 + 4 + 4)) {
return createUnreadBytesArray(b, readSize, offset, true);
}
final long traceId = UnsafeBits.getLong(b, offset);
offset += 8;
final Integer hostnameId = UnsafeBits.getInt(b, offset);
offset += 4;
final long parentTraceId = UnsafeBits.getLong(b, offset);
offset += 8;
final int parentOrderId = UnsafeBits.getInt(b, offset);
offset += 4;
final Integer applicationId = UnsafeBits.getInt(b, offset);
offset += 4;
record = new Trace(traceId,
getStringFromRegistry(hostnameId), parentTraceId,
parentOrderId, getStringFromRegistry(applicationId));
break;
}
case 1: {
if ((readSize - offset) < (8 + 8 + 4 + 4)) {
return createUnreadBytesArray(b, readSize, offset, true);
}
final long timestamp = UnsafeBits.getLong(b, offset);
offset += 8;
final long traceId = UnsafeBits.getLong(b, offset);
offset += 8;
final int orderIndex = UnsafeBits.getInt(b, offset);
offset += 4;
final Integer operationId = UnsafeBits.getInt(b, offset);
offset += 4;
record = new BeforeOperationEvent(timestamp, traceId,
orderIndex, getStringFromRegistry(operationId));
break;
}
case 2: {
if ((readSize - offset) < (8 + 8 + 4 + 4 + 4)) {
return createUnreadBytesArray(b, readSize, offset, true);
}
final long timestamp = UnsafeBits.getLong(b, offset);
offset += 8;
final long traceId = UnsafeBits.getLong(b, offset);
offset += 8;
final int orderIndex = UnsafeBits.getInt(b, offset);
offset += 4;
final Integer operationId = UnsafeBits.getInt(b, offset);
offset += 4;
final Integer causeId = UnsafeBits.getInt(b, offset);
offset += 4;
record = new AfterFailedOperationEvent(timestamp, traceId,
orderIndex, getStringFromRegistry(operationId),
getStringFromRegistry(causeId));
break;
}
case 3: {
if ((readSize - offset) < (8 + 8 + 4 + 4)) {
return createUnreadBytesArray(b, readSize, offset, true);
}
final long timestamp = UnsafeBits.getLong(b, offset);
offset += 8;
final long traceId = UnsafeBits.getLong(b, offset);
offset += 8;
final int orderIndex = UnsafeBits.getInt(b, offset);
offset += 4;
final Integer operationId = UnsafeBits.getInt(b, offset);
offset += 4;
record = new AfterOperationEvent(timestamp, traceId,
orderIndex, getStringFromRegistry(operationId));
break;
}
case 4: {
if ((readSize - offset) < (4 + 4)) {
return createUnreadBytesArray(b, readSize, offset, true);
}
final Integer mapId = UnsafeBits.getInt(b, offset);
offset += 4;
final int stringLength = UnsafeBits.getInt(b, offset);
offset += 4;
if ((readSize - offset) < stringLength) {
return createUnreadBytesArray(b, readSize, offset - 8,
true);
}
final byte[] stringBytes = new byte[stringLength];
System.arraycopy(b, offset, stringBytes, 0, stringLength);
final String string = new String(stringBytes);
offset += stringLength;
addToRegistry(mapId, string);
break;
}
default: {
System.out.println("unknown class id " + clazzId
+ " at offset " + (offset - 4));
return null;
}
}
counter.inputObjects(record);
}
return null;
}
private byte[] createUnreadBytesArray(final byte[] b, final int readSize,
int offset, final boolean withClazzId) {
if (withClazzId) {
offset -= 4;
}
final int unreadBytesSize = readSize - offset;
final byte[] unreadBytes = new byte[unreadBytesSize];
System.arraycopy(b, offset, unreadBytes, 0, unreadBytesSize);
return unreadBytes;
}
public void addToRegistry(final Integer key, final String value) {
stringRegistryInternal.put(key, value);
stringRegistry = stringRegistryInternal.toImmutable().value();
System.out.println(value);
synchronized (this) {
notifyAll();
}
}
private String getStringFromRegistry(final Integer id) {
String result = stringRegistry.get(id);
while (result == null) {
try {
synchronized (this) {
System.out.println("waiting for " + id);
this.wait();
}
}
catch (final InterruptedException e) {
e.printStackTrace();
}
result = stringRegistry.get(id);
}
return result;
}
}
......@@ -17,24 +17,20 @@ class WorkerController {
var IAnalysisController analysisInstance
def startWithCountingRecordsThroughput() {
analysisInstance = new AnalysisController()
// analysisInstance = new AnalysisController()
val tcpReader = initTCPReader()
val countingThroughputFilter = initCountingThroughputFilter()
val teeFilter = initTeeFilter()
analysisInstance.connect(tcpReader, TCPReader::OUTPUT_PORT_NAME_RECORDS, countingThroughputFilter,
CountingThroughputFilter::INPUT_PORT_NAME_OBJECTS)
analysisInstance.connect(countingThroughputFilter,
CountingThroughputFilter::OUTPUT_PORT_NAME_THROUGHPUT, teeFilter,
TeeFilter::INPUT_PORT_NAME_EVENTS)
tcpReader.read();
try {
analysisInstance.run()
} catch (Exception e) {
e.printStackTrace
}
// val countingThroughputFilter = initCountingThroughputFilter()
// val teeFilter = initTeeFilter()
// analysisInstance.connect(tcpReader, TCPReader::OUTPUT_PORT_NAME_RECORDS, countingThroughputFilter,
// CountingThroughputFilter::INPUT_PORT_NAME_OBJECTS)
//
// analysisInstance.connect(countingThroughputFilter,
// CountingThroughputFilter::OUTPUT_PORT_NAME_THROUGHPUT, teeFilter,
// TeeFilter::INPUT_PORT_NAME_EVENTS)
}
def startWithCountingTracesThroughput() {
......@@ -46,16 +42,16 @@ class WorkerController {
val countingThroughputFilter = initCountingThroughputFilter()
val teeFilter = initTeeFilter()
analysisInstance.connect(tcpReader, TCPReader::OUTPUT_PORT_NAME_RECORDS, eventTraceReconstructionFilter,
EventRecordTraceReconstructionFilter::INPUT_PORT_NAME_TRACE_RECORDS)
analysisInstance.connect(eventTraceReconstructionFilter,
EventRecordTraceReconstructionFilter::OUTPUT_PORT_NAME_TRACE_VALID, countingThroughputFilter,
CountingThroughputFilter::INPUT_PORT_NAME_OBJECTS)
// analysisInstance.connect(tcpReader, TCPReader::OUTPUT_PORT_NAME_RECORDS, eventTraceReconstructionFilter,
// EventRecordTraceReconstructionFilter::INPUT_PORT_NAME_TRACE_RECORDS)
analysisInstance.connect(countingThroughputFilter,
CountingThroughputFilter::OUTPUT_PORT_NAME_THROUGHPUT, teeFilter,
TeeFilter::INPUT_PORT_NAME_EVENTS)
// analysisInstance.connect(eventTraceReconstructionFilter,
// EventRecordTraceReconstructionFilter::OUTPUT_PORT_NAME_TRACE_VALID, countingThroughputFilter,
// CountingThroughputFilter::INPUT_PORT_NAME_OBJECTS)
//
// analysisInstance.connect(countingThroughputFilter,
// CountingThroughputFilter::OUTPUT_PORT_NAME_THROUGHPUT, teeFilter,
// TeeFilter::INPUT_PORT_NAME_EVENTS)
try {
analysisInstance.run()
......@@ -75,8 +71,8 @@ class WorkerController {
val timer = initTimer()
val tcpConnector = initTCPConnector()
analysisInstance.connect(tcpReader, TCPReader::OUTPUT_PORT_NAME_RECORDS, eventTraceReconstructionFilter,
EventRecordTraceReconstructionFilter::INPUT_PORT_NAME_TRACE_RECORDS)
// analysisInstance.connect(tcpReader, TCPReader::OUTPUT_PORT_NAME_RECORDS, eventTraceReconstructionFilter,
// EventRecordTraceReconstructionFilter::INPUT_PORT_NAME_TRACE_RECORDS)
analysisInstance.connect(eventTraceReconstructionFilter,
EventRecordTraceReconstructionFilter::OUTPUT_PORT_NAME_TRACE_VALID, aggregationFilter,
......@@ -99,9 +95,7 @@ class WorkerController {
}
def initTCPReader() {
val config = new Configuration()
config.setProperty(TCPReader::CONFIG_PROPERTY_NAME_PROVIDERURL, "127.0.0.1")
new TCPReader(config, analysisInstance)
new TCPReader()
}
def initEventRecordTraceReconstructionFilter() {
......@@ -122,9 +116,9 @@ class WorkerController {
}
def initCountingThroughputFilter() {
val config = new Configuration()
config.setProperty(CountingThroughputFilter::CONFIG_PROPERTY_NAME_INTERVAL_SIZE, "1000000000")
new CountingThroughputFilter(config, analysisInstance)
// val config = new Configuration()
// config.setProperty(CountingThroughputFilter::CONFIG_PROPERTY_NAME_INTERVAL_SIZE, "1000000000")
new CountingThroughputFilter()
}
def initTeeFilter() {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment