diff --git a/.classpath b/.classpath index f0138a29495626303916ca763b8d879b126948b5..05566e3143dee15554b59748e680eb84d92f58dc 100644 --- a/.classpath +++ b/.classpath @@ -6,5 +6,11 @@ <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/> <classpathentry combineaccessrules="false" kind="src" path="/monitored-application"/> <classpathentry kind="lib" path="lib/kieker-1.8-SNAPSHOT_fast.jar"/> + <classpathentry kind="lib" path="lib/disruptor-3.2.0.jar"/> + <classpathentry kind="lib" path="lib/javolution-core-java-6.0.0-20130804.010711-11.jar"> + <attributes> + <attribute name="javadoc_location" value="jar:platform:/resource/worker/lib/javolution-core-java-6.0.0-20130804.010711-11-javadoc.jar!/"/> + </attributes> + </classpathentry> <classpathentry kind="output" path="bin"/> </classpath> diff --git a/lib/disruptor-3.2.0.jar b/lib/disruptor-3.2.0.jar new file mode 100644 index 0000000000000000000000000000000000000000..a39222cde682edc5e89ffbad34e4d70d9b803a39 Binary files /dev/null and b/lib/disruptor-3.2.0.jar differ diff --git a/lib/javolution-core-java-6.0.0-20130804.010711-11-javadoc.jar b/lib/javolution-core-java-6.0.0-20130804.010711-11-javadoc.jar new file mode 100644 index 0000000000000000000000000000000000000000..52b013030555bc0bc49fa386b11393b546a3a47e Binary files /dev/null and b/lib/javolution-core-java-6.0.0-20130804.010711-11-javadoc.jar differ diff --git a/lib/javolution-core-java-6.0.0-20130804.010711-11.jar b/lib/javolution-core-java-6.0.0-20130804.010711-11.jar new file mode 100644 index 0000000000000000000000000000000000000000..fbb64299c962959eebc6fdd29f79e821c91040fa Binary files /dev/null and b/lib/javolution-core-java-6.0.0-20130804.010711-11.jar differ diff --git a/lib/kieker-1.8-SNAPSHOT.jar b/lib/kieker-1.8-SNAPSHOT.jar deleted file mode 100644 index b8c383c52d067acb21270a8abe4f0154971724b6..0000000000000000000000000000000000000000 Binary files a/lib/kieker-1.8-SNAPSHOT.jar and /dev/null differ diff --git a/src/explorviz/hpc_monitoring/filter/CountingThroughputFilter.java b/src/explorviz/hpc_monitoring/filter/CountingThroughputFilter.java index a9eca6ea0af8a72cd192c91bad90a9491681d423..7988e7468312bece03c79809ee30a3ba1859c1ef 100644 --- a/src/explorviz/hpc_monitoring/filter/CountingThroughputFilter.java +++ b/src/explorviz/hpc_monitoring/filter/CountingThroughputFilter.java @@ -16,170 +16,25 @@ package explorviz.hpc_monitoring.filter; -import java.util.*; -import java.util.Map.Entry; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -import kieker.analysis.IProjectContext; -import kieker.analysis.plugin.annotation.*; -import kieker.analysis.plugin.filter.AbstractFilterPlugin; -import kieker.common.configuration.Configuration; -import kieker.common.logging.Log; -import kieker.common.logging.LogFactory; -import kieker.common.record.IMonitoringRecord; // import kieker.common.util.ImmutableEntry; -/** - * An instance of this class computes the throughput in terms of the number of - * events received per time unit. - * - * Note that only one of the input ports should be used in a configuration! - * - * @author Andre van Hoorn, Jan Waller - * - * @since 1.6 - */ -@Plugin(description = "A filter computing the throughput in terms of the number of events received per time unit", outputPorts = { - @OutputPort(name = CountingThroughputFilter.OUTPUT_PORT_NAME_RELAYED_OBJECTS, eventTypes = { Object.class }, description = "Provides each incoming object"), - @OutputPort(name = CountingThroughputFilter.OUTPUT_PORT_NAME_THROUGHPUT, eventTypes = { Object.class }, description = "Provides throughput per interval") }, configuration = { - @Property(name = CountingThroughputFilter.CONFIG_PROPERTY_NAME_TIMEUNIT, defaultValue = CountingThroughputFilter.CONFIG_PROPERTY_VALUE_TIMEUNIT), - @Property(name = CountingThroughputFilter.CONFIG_PROPERTY_NAME_INTERVAL_SIZE, defaultValue = CountingThroughputFilter.CONFIG_PROPERTY_VALUE_INTERVAL_SIZE_ONE_MINUTE), - @Property(name = CountingThroughputFilter.CONFIG_PROPERTY_NAME_INTERVALS_BASED_ON_1ST_TSTAMP, defaultValue = "true") }) -public final class CountingThroughputFilter extends AbstractFilterPlugin { - - /** - * The name of the input port receiving the records. - */ - public static final String INPUT_PORT_NAME_RECORDS = "inputRecords"; - /** - * The name of the input port receiving other objects. - */ - public static final String INPUT_PORT_NAME_OBJECTS = "inputObjects"; - - /** - * The name of the output port delivering the received objects. - */ - public static final String OUTPUT_PORT_NAME_RELAYED_OBJECTS = "relayedEvents"; - public static final String OUTPUT_PORT_NAME_THROUGHPUT = "throughputPerInterval"; - - /** The name of the property determining the time unit. */ - public static final String CONFIG_PROPERTY_NAME_TIMEUNIT = "timeunit"; - /** The name of the property determining the interval size. */ - public static final String CONFIG_PROPERTY_NAME_INTERVAL_SIZE = "intervalSize"; - - /** - * The default value of the time unit property (nanoseconds). - */ - public static final String CONFIG_PROPERTY_VALUE_TIMEUNIT = "NANOSECONDS"; // TimeUnit.NANOSECONDS.name() - - /** - * If the value is set to false, the intervals are computed based on time - * since 1970-1-1. - */ - public static final String CONFIG_PROPERTY_NAME_INTERVALS_BASED_ON_1ST_TSTAMP = "intervalsBasedOn1stTstamp"; - - /** - * The configuration property value for - * {@link #CONFIG_PROPERTY_NAME_INTERVAL_SIZE}, leading to a bin size of 1 - * minute. - */ - public static final String CONFIG_PROPERTY_VALUE_INTERVAL_SIZE_ONE_MINUTE = "60000000000"; - - private static final Log LOG = LogFactory - .getLog(CountingThroughputFilter.class); +public final class CountingThroughputFilter { + private volatile long firstIntervalStart = -1; + private final TimeUnit timeunit; - private volatile long firstIntervalStart = -1; - private final boolean intervalsBasedOn1stTstamp; - private final TimeUnit timeunit; + private final long intervalSize; - /** - * For a key <i>k</i>, the {@link Queue} stores the number of events - * observed in the time interval <i>(k-intervalSize,k(</i>, i.e., - * the interval <b>excludes</b> the value <i>k</i>. - */ - private final Queue<Entry<Long, Long>> eventCountsPerInterval = new ConcurrentLinkedQueue<Entry<Long, Long>>(); + private final AtomicLong currentCountForCurrentInterval = new AtomicLong(0); - private final long intervalSize; + private volatile long firstTimestampInCurrentInterval = -1; + private volatile long lastTimestampInCurrentInterval = -1; - private final AtomicLong currentCountForCurrentInterval = new AtomicLong( - 0); - - private volatile long firstTimestampInCurrentInterval = -1; // initialized - // with - // the - // first - // incoming - // event - private volatile long lastTimestampInCurrentInterval = -1; // initialized - // with - // the - // first - // incoming - // event - - /** - * Creates a new instance of this class using the given parameters. - * - * @param configuration - * The configuration for this component. - * @param projectContext - * The project context for this component. - */ - public CountingThroughputFilter(final Configuration configuration, - final IProjectContext projectContext) { - super(configuration, projectContext); - - final String recordTimeunitProperty = projectContext - .getProperty(IProjectContext.CONFIG_PROPERTY_NAME_RECORDS_TIME_UNIT); - TimeUnit recordTimeunit; - try { - recordTimeunit = TimeUnit.valueOf(recordTimeunitProperty); - } - catch (final IllegalArgumentException ex) { // already caught in - // AnalysisController, - // should never happen - LOG.warn(recordTimeunitProperty - + " is no valid TimeUnit! Using NANOSECONDS instead."); - recordTimeunit = TimeUnit.NANOSECONDS; - } - timeunit = recordTimeunit; - - final String configTimeunitProperty = configuration - .getStringProperty(CONFIG_PROPERTY_NAME_TIMEUNIT); - TimeUnit configTimeunit; - try { - configTimeunit = TimeUnit.valueOf(configTimeunitProperty); - } - catch (final IllegalArgumentException ex) { - LOG.warn(configTimeunitProperty - + " is no valid TimeUnit! Using inherited value of " - + timeunit.name() + " instead."); - configTimeunit = timeunit; - } - - intervalSize = timeunit.convert(configuration - .getLongProperty(CONFIG_PROPERTY_NAME_INTERVAL_SIZE), - configTimeunit); - intervalsBasedOn1stTstamp = configuration - .getBooleanProperty(CONFIG_PROPERTY_NAME_INTERVALS_BASED_ON_1ST_TSTAMP); - } - - /** - * {@inheritDoc} - */ - @Override - public final Configuration getCurrentConfiguration() { - final Configuration configuration = new Configuration(); - configuration.setProperty(CONFIG_PROPERTY_NAME_TIMEUNIT, - timeunit.name()); - configuration.setProperty(CONFIG_PROPERTY_NAME_INTERVAL_SIZE, - Long.toString(intervalSize)); - configuration.setProperty( - CONFIG_PROPERTY_NAME_INTERVALS_BASED_ON_1ST_TSTAMP, - Boolean.toString(intervalsBasedOn1stTstamp)); - return configuration; + public CountingThroughputFilter() { + timeunit = TimeUnit.NANOSECONDS; + intervalSize = timeunit.convert(1000000000, TimeUnit.NANOSECONDS); } private void processEvent(final Object event, final long currentTime) { @@ -187,33 +42,11 @@ public final class CountingThroughputFilter extends AbstractFilterPlugin { final long endOfTimestampsInterval = computeLastTimestampInInterval(currentTime); synchronized (this) { - // Check if we need to close the current interval. if (endOfTimestampsInterval > lastTimestampInCurrentInterval) { - if (firstTimestampInCurrentInterval >= 0) { // don't do this for - // the first record - // (only used for - // initialization of - // variables) - long currentCount = currentCountForCurrentInterval.get(); - // this.eventCountsPerInterval.add( - // new ImmutableEntry<Long, Long>( - // this.lastTimestampInCurrentInterval + 1, - // currentCount)); - super.deliver(OUTPUT_PORT_NAME_THROUGHPUT, currentCount); - - // long numIntervalsElapsed = 1; // refined below - // numIntervalsElapsed = (endOfTimestampsInterval - - // this.lastTimestampInCurrentInterval) / this.intervalSize; - // if (numIntervalsElapsed > 1) { // NOPMD - // (AvoidDeeplyNestedIfStmts) - // for (int i = 1; i < numIntervalsElapsed; i++) { - // this.eventCountsPerInterval.add( - // new ImmutableEntry<Long, - // Long>((this.lastTimestampInCurrentInterval + (i * - // this.intervalSize)) + 1, 0L)); - // } - // } - + if (firstTimestampInCurrentInterval >= 0) { + final long currentCount = currentCountForCurrentInterval + .get(); + System.out.println(currentCount); } firstTimestampInCurrentInterval = startOfTimestampsInterval; @@ -221,61 +54,19 @@ public final class CountingThroughputFilter extends AbstractFilterPlugin { currentCountForCurrentInterval.set(0); } - currentCountForCurrentInterval.incrementAndGet(); // only - // incremented in - // synchronized - // blocks + currentCountForCurrentInterval.incrementAndGet(); } - super.deliver(OUTPUT_PORT_NAME_RELAYED_OBJECTS, event); - } - - /** - * This method represents the input port for incoming records. - * - * @param record - * The next record. - */ - // #841 What happens with unordered events (i.e., timestamps before - // firstTimestampInCurrentInterval)? - @InputPort(name = INPUT_PORT_NAME_RECORDS, eventTypes = { IMonitoringRecord.class }, description = "Receives incoming monitoring records to be considered for the throughput computation and uses the record's logging timestamp") - public final void inputRecord(final IMonitoringRecord record) { - processEvent(record, record.getLoggingTimestamp()); } - /** - * This method represents the input port for incoming object. - * - * @param object - * The next object. - */ - @InputPort(name = INPUT_PORT_NAME_OBJECTS, eventTypes = { Object.class }, description = "Receives incoming objects to be considered for the throughput computation and uses the current system time") public final void inputObjects(final Object object) { processEvent(object, currentTime()); } - /** - * Returns the current time in {@link TimeUnit#MILLISECONDS} since 1970. - * - * @return The current time - */ private long currentTime() { return timeunit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS); } - // #840 is this correct? it probably makes more sense to provide a copy. - public Collection<Entry<Long, Long>> getCountsPerInterval() { - return Collections.unmodifiableCollection(eventCountsPerInterval); - } - - /** - * Returns the first timestamp included in the interval that corresponds to - * the given timestamp. - * - * @param timestamp - * - * @return The timestamp in question. - */ private long computeFirstTimestampInInterval(final long timestamp) { final long referenceTimePoint; @@ -283,64 +74,17 @@ public final class CountingThroughputFilter extends AbstractFilterPlugin { firstIntervalStart = timestamp; } - if (intervalsBasedOn1stTstamp) { - referenceTimePoint = firstIntervalStart; - } - else { - referenceTimePoint = 0; - } + referenceTimePoint = 0; return referenceTimePoint + (((timestamp - referenceTimePoint) / intervalSize) * intervalSize); } - /** - * Returns the last timestamp included in the interval that corresponds to - * the given timestamp. - * - * @param timestamp - * @return The timestamp in question. - */ private long computeLastTimestampInInterval(final long timestamp) { final long referenceTimePoint; - if (intervalsBasedOn1stTstamp) { - referenceTimePoint = firstIntervalStart; - } - else { - referenceTimePoint = 0; - } + referenceTimePoint = 0; return referenceTimePoint + (((((timestamp - referenceTimePoint) / intervalSize) + 1) * intervalSize) - 1); } - - /** - * @return the intervalSize - */ - public long getIntervalSize() { - return intervalSize; - } - - /** - * @return the firstTimestampInCurrentInterval -1 if no record processed so - * far - */ - public long getFirstTimestampInCurrentInterval() { - return firstTimestampInCurrentInterval; - } - - /** - * @return the lastTimestampInCurrentInterval -1 if no record processed so - * far - */ - public long getLastTimestampInCurrentInterval() { - return lastTimestampInCurrentInterval; - } - - /** - * @return the currentCountForCurrentInterval - */ - public long getCurrentCountForCurrentInterval() { - return currentCountForCurrentInterval.get(); - } } diff --git a/src/explorviz/hpc_monitoring/reader/TCPReader.java b/src/explorviz/hpc_monitoring/reader/TCPReader.java index a6bb4d63c8539c160db0613b09db93780aa1776f..9165731fbf60270be08554eedfd6a3bccf19a793 100644 --- a/src/explorviz/hpc_monitoring/reader/TCPReader.java +++ b/src/explorviz/hpc_monitoring/reader/TCPReader.java @@ -20,114 +20,48 @@ import java.io.BufferedInputStream; import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; -import kieker.analysis.IProjectContext; -import kieker.analysis.plugin.annotation.*; -import kieker.analysis.plugin.reader.AbstractReaderPlugin; -import kieker.common.configuration.Configuration; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import kieker.common.logging.Log; import kieker.common.logging.LogFactory; -import kieker.common.record.IMonitoringRecord; -import explorviz.hpc_monitoring.byteaccess.UnsafeBits; -import explorviz.hpc_monitoring.record.Trace; -import explorviz.hpc_monitoring.record.events.*; +import com.lmax.disruptor.RingBuffer; +import com.lmax.disruptor.dsl.Disruptor; +import explorviz.hpc_monitoring.writer.RecordEvent; -/** - * Reads monitoring records from the queue of an established RabbitMQ - * connection. - * - * @author Santje Finke - * - * @since 1.8 - */ -@Plugin(description = "A reader which reads records from a RabbitMQ queue", dependencies = "This plugin needs the file 'rabbitmq.client-*.jar'.", outputPorts = { @OutputPort(name = TCPReader.OUTPUT_PORT_NAME_RECORDS, eventTypes = { Object.class }, description = "Output Port of the JMSReader") }, configuration = { - @Property(name = TCPReader.CONFIG_PROPERTY_NAME_PROVIDERURL, defaultValue = "localhost"), - @Property(name = TCPReader.CONFIG_PROPERTY_PORT, defaultValue = "10133") +public final class TCPReader { + private static final int MESSAGE_BUFFER_SIZE = 65536; -}) -public final class TCPReader extends AbstractReaderPlugin { - private static final int MESSAGE_BUFFER_SIZE = 65536; + private final byte[] messages = new byte[MESSAGE_BUFFER_SIZE]; - private final byte[] messages = new byte[MESSAGE_BUFFER_SIZE]; + public static final String CONFIG_PROPERTY_NAME_PROVIDERURL = "127.0.0.1"; + public static final int CONFIG_PROPERTY_PORT = 10133; - /** The name of the output port delivering the received records. */ - public static final String OUTPUT_PORT_NAME_RECORDS = "monitoringRecords"; - /** The name of the configuration determining the RabbitMQ provider URL. */ - public static final String CONFIG_PROPERTY_NAME_PROVIDERURL = "mqProviderUrl"; - /** The port that is used to connect to a queue. */ - public static final String CONFIG_PROPERTY_PORT = "10133"; + static final Log LOG = LogFactory + .getLog(TCPReader.class); - static final Log LOG = LogFactory - .getLog(TCPReader.class); // NOPMD - // package - // for - // inner - // class + private final int port; - private final String providerUrl; - private final int port; + private ServerSocket serversocket; + private boolean active = true; - private final CountDownLatch cdLatch = new CountDownLatch( - 1); + private final TestFilter eventHandler1; - private final Map<Integer, String> stringRegistry = new ConcurrentHashMap<Integer, String>( - 16, - 0.75f, - 2); + private final RingBuffer<RecordEvent> ringBuffer; - private ServerSocket serversocket; - private final boolean active = true; + @SuppressWarnings("unchecked") + public TCPReader() throws IllegalArgumentException { + port = 10133; - /** - * Creates a new instance of this class using the given parameters. - * - * @param configuration - * The configuration used to initialize the whole reader. Keep in - * mind that the configuration should contain the following - * properties: - * <ul> - * <li>The property {@link #CONFIG_PROPERTY_NAME_PROVIDERURL}, - * e.g. {@code localhost} - * <li>The property {@link #CONFIG_PROPERTY_NAME_QUEUE}, e.g. - * {@code queue1} - * <li>The property {@link #CONFIG_PROPERTY_PASSWORD}, e.g. - * {@code password} - * <li>The property {@link #CONFIG_PROPERTY_USER}, e.g. - * {@code username} - * <li>The property {@link #CONFIG_PROPERTY_PORT}, e.g. - * {@code port} - * </ul> - * @param projectContext - * The project context for this component. - * - * @throws IllegalArgumentException - * If one of the properties is empty. - */ - public TCPReader(final Configuration configuration, - final IProjectContext projectContext) - throws IllegalArgumentException { - super(configuration, projectContext); + final ExecutorService exec = Executors.newCachedThreadPool(); + final Disruptor<RecordEvent> disruptor = new Disruptor<RecordEvent>( + RecordEvent.EVENT_FACTORY, 8192, exec); - // Initialize the reader bases on the given configuration. - providerUrl = configuration - .getStringProperty(CONFIG_PROPERTY_NAME_PROVIDERURL); - port = configuration.getIntProperty(CONFIG_PROPERTY_PORT); - - // registryConsumer = new RabbitMQRegistryConsumer(this, providerUrl, - // "registryRecords", username, password, port); - // registryConsumer.start(); + eventHandler1 = new TestFilter(); + disruptor.handleEventsWith(eventHandler1); + ringBuffer = disruptor.start(); } - /** - * A call to this method is a blocking call. - * - * @return true if the method succeeds, false otherwise. - */ - @Override - public boolean read() { - boolean retVal = true; + public void read() { try { open(); while (active) { @@ -135,29 +69,17 @@ public final class TCPReader extends AbstractReaderPlugin { final Socket socket = serversocket.accept(); final BufferedInputStream bufferedInputStream = new BufferedInputStream( socket.getInputStream(), MESSAGE_BUFFER_SIZE); - int readSize = 0; - int toReadOffset = 0; - while ((readSize = bufferedInputStream.read(messages, - toReadOffset, MESSAGE_BUFFER_SIZE - toReadOffset)) != -1) { - final byte[] unreadBytes = messagesfromByteArray(messages, - readSize + toReadOffset); - if (unreadBytes != null) { - toReadOffset = unreadBytes.length; - System.arraycopy(unreadBytes, 0, messages, 0, - toReadOffset); - } - else { - toReadOffset = 0; - } + int readBytes = 0; + while ((readBytes = bufferedInputStream.read(messages, 0, + MESSAGE_BUFFER_SIZE)) != -1) { + putInRingBuffer(messages, readBytes); } socket.close(); } } - catch (final IOException ex) { // NOPMD NOCS - // (IllegalCatchCheck) + catch (final IOException ex) { LOG.error("Error in read()", ex); - retVal = false; } finally { try { @@ -167,203 +89,24 @@ public final class TCPReader extends AbstractReaderPlugin { LOG.error("Error in read()", e); } } - return retVal; } private void open() throws IOException { serversocket = new ServerSocket(port); } - private byte[] messagesfromByteArray(final byte[] b, final int readSize) { - int offset = 0; - - while (offset < readSize) { - if ((readSize - offset) < 4) { - return createUnreadBytesArray(b, readSize, offset, false); - } - - final int clazzId = UnsafeBits.getInt(b, offset); - offset += 4; - - IMonitoringRecord record = null; - - switch (clazzId) { - case 0: { - if ((readSize - offset) < (8 + 4 + 8 + 4 + 4)) { - return createUnreadBytesArray(b, readSize, offset, true); - } - - final long traceId = UnsafeBits.getLong(b, offset); - offset += 8; - final Integer hostnameId = UnsafeBits.getInt(b, offset); - offset += 4; - final long parentTraceId = UnsafeBits.getLong(b, offset); - offset += 8; - final int parentOrderId = UnsafeBits.getInt(b, offset); - offset += 4; - final Integer applicationId = UnsafeBits.getInt(b, offset); - offset += 4; - - record = new Trace(traceId, - getStringFromRegistry(hostnameId), parentTraceId, - parentOrderId, getStringFromRegistry(applicationId)); - break; - } - case 1: { - if ((readSize - offset) < (8 + 8 + 4 + 4)) { - return createUnreadBytesArray(b, readSize, offset, true); - } - - final long timestamp = UnsafeBits.getLong(b, offset); - offset += 8; - final long traceId = UnsafeBits.getLong(b, offset); - offset += 8; - final int orderIndex = UnsafeBits.getInt(b, offset); - offset += 4; - final Integer operationId = UnsafeBits.getInt(b, offset); - offset += 4; - - record = new BeforeOperationEvent(timestamp, traceId, - orderIndex, getStringFromRegistry(operationId)); - break; - } - case 2: { - if ((readSize - offset) < (8 + 8 + 4 + 4 + 4)) { - return createUnreadBytesArray(b, readSize, offset, true); - } - - final long timestamp = UnsafeBits.getLong(b, offset); - offset += 8; - final long traceId = UnsafeBits.getLong(b, offset); - offset += 8; - final int orderIndex = UnsafeBits.getInt(b, offset); - offset += 4; - final Integer operationId = UnsafeBits.getInt(b, offset); - offset += 4; - final Integer causeId = UnsafeBits.getInt(b, offset); - offset += 4; - - record = new AfterFailedOperationEvent(timestamp, traceId, - orderIndex, getStringFromRegistry(operationId), - getStringFromRegistry(causeId)); - break; - } - case 3: { - if ((readSize - offset) < (8 + 8 + 4 + 4)) { - return createUnreadBytesArray(b, readSize, offset, true); - } - - final long timestamp = UnsafeBits.getLong(b, offset); - offset += 8; - final long traceId = UnsafeBits.getLong(b, offset); - offset += 8; - final int orderIndex = UnsafeBits.getInt(b, offset); - offset += 4; - final Integer operationId = UnsafeBits.getInt(b, offset); - offset += 4; - - record = new AfterOperationEvent(timestamp, traceId, - orderIndex, getStringFromRegistry(operationId)); - break; - } - case 4: { - if ((readSize - offset) < (4 + 4)) { - return createUnreadBytesArray(b, readSize, offset, true); - } - - final Integer mapId = UnsafeBits.getInt(b, offset); - offset += 4; - final int stringLength = UnsafeBits.getInt(b, offset); - offset += 4; - - if ((readSize - offset) < stringLength) { - return createUnreadBytesArray(b, readSize, offset - 8, - true); - } - - final byte[] stringBytes = new byte[stringLength]; - System.arraycopy(b, offset, stringBytes, 0, stringLength); - final String string = new String(stringBytes); - offset += stringLength; - - addToRegistry(mapId, string); - - break; - } - default: { - LOG.error("unknown class id " + clazzId); - } - } - - if ((record != null) - && !super.deliver(OUTPUT_PORT_NAME_RECORDS, record)) { - LOG.error("deliverRecord returned false"); - } - } - - return null; + private void putInRingBuffer(final byte[] message, final int readBytes) { + final long hiseq = ringBuffer.next(); + final RecordEvent valueEvent = ringBuffer.get(hiseq); + final byte[] toSaveCopy = new byte[readBytes]; + System.arraycopy(message, 0, toSaveCopy, 0, readBytes); + valueEvent.setValue(toSaveCopy); + valueEvent.setLength(readBytes); + ringBuffer.publish(hiseq); } - private byte[] createUnreadBytesArray(final byte[] b, final int readSize, - int offset, final boolean withClazzId) { - if (withClazzId) { - offset -= 4; - } - final int unreadBytesSize = readSize - offset; - final byte[] unreadBytes = new byte[unreadBytesSize]; - System.arraycopy(b, offset, unreadBytes, 0, unreadBytesSize); - return unreadBytes; - } - - final void unblock() { // NOPMD (package visible for inner class) - cdLatch.countDown(); - } - - /** - * {@inheritDoc} - */ - @Override public void terminate(final boolean error) { - LOG.info("Shutdown of RabbitMQReader requested."); - unblock(); - } - - /** - * {@inheritDoc} - */ - @Override - public Configuration getCurrentConfiguration() { - final Configuration configuration = new Configuration(); - - configuration - .setProperty(CONFIG_PROPERTY_NAME_PROVIDERURL, providerUrl); - - return configuration; - } - - public void addToRegistry(final Integer key, final String value) { - stringRegistry.put(key, value); - - synchronized (this) { - notifyAll(); - } - } - - private String getStringFromRegistry(final Integer id) { - String result = stringRegistry.get(id); - while (result == null) { - try { - synchronized (this) { - System.out.println("waiting for " + id); - this.wait(); - } - } - catch (final InterruptedException e) { - e.printStackTrace(); - } - result = stringRegistry.get(id); - } - - return result; + LOG.info("Shutdown of TCPReader requested."); + active = false; } } diff --git a/src/explorviz/hpc_monitoring/reader/TestFilter.java b/src/explorviz/hpc_monitoring/reader/TestFilter.java new file mode 100644 index 0000000000000000000000000000000000000000..6281074bffb3114cf0c779fe7c21c8b0b50b6715 --- /dev/null +++ b/src/explorviz/hpc_monitoring/reader/TestFilter.java @@ -0,0 +1,219 @@ +package explorviz.hpc_monitoring.reader; + +import java.util.Map; +import javolution.util.FastMap; +import kieker.common.record.IMonitoringRecord; +import com.lmax.disruptor.EventHandler; +import explorviz.hpc_monitoring.byteaccess.UnsafeBits; +import explorviz.hpc_monitoring.filter.CountingThroughputFilter; +import explorviz.hpc_monitoring.record.Trace; +import explorviz.hpc_monitoring.record.events.*; +import explorviz.hpc_monitoring.writer.RecordEvent; + +public class TestFilter implements EventHandler<RecordEvent> { + + private final CountingThroughputFilter counter = new CountingThroughputFilter(); + + private final FastMap<Integer, String> stringRegistryInternal = new FastMap<Integer, String>(); + private Map<Integer, String> stringRegistry = stringRegistryInternal + .toImmutable() + .value(); + + private byte[] unreadBytes = null; + + @Override + public void onEvent(final RecordEvent event, final long sequence, + final boolean endOfBatch) throws Exception { + final byte[] received = event.getValue(); + final int receivedLength = event.getLength(); + + byte[] messages = null; + int messagesLength = 0; + + if (unreadBytes != null) { + final int unreadBytesLength = unreadBytes.length; + + messagesLength = receivedLength + unreadBytesLength; + messages = new byte[messagesLength]; + + System.arraycopy(unreadBytes, 0, messages, 0, unreadBytesLength); + System.arraycopy(received, 0, messages, unreadBytesLength, + receivedLength); + } + else { + messages = received; + messagesLength = receivedLength; + } + + unreadBytes = messagesfromByteArray(messages, messagesLength); + counter.inputObjects(messages); + } + + private byte[] messagesfromByteArray(final byte[] b, final int readSize) { + int offset = 0; + + while (offset < readSize) { + if ((readSize - offset) < 4) { + return createUnreadBytesArray(b, readSize, offset, false); + } + + final int clazzId = UnsafeBits.getInt(b, offset); + offset += 4; + + IMonitoringRecord record = null; + + switch (clazzId) { + case 0: { + if ((readSize - offset) < (8 + 4 + 8 + 4 + 4)) { + return createUnreadBytesArray(b, readSize, offset, true); + } + + final long traceId = UnsafeBits.getLong(b, offset); + offset += 8; + final Integer hostnameId = UnsafeBits.getInt(b, offset); + offset += 4; + final long parentTraceId = UnsafeBits.getLong(b, offset); + offset += 8; + final int parentOrderId = UnsafeBits.getInt(b, offset); + offset += 4; + final Integer applicationId = UnsafeBits.getInt(b, offset); + offset += 4; + + record = new Trace(traceId, + getStringFromRegistry(hostnameId), parentTraceId, + parentOrderId, getStringFromRegistry(applicationId)); + break; + } + case 1: { + if ((readSize - offset) < (8 + 8 + 4 + 4)) { + return createUnreadBytesArray(b, readSize, offset, true); + } + + final long timestamp = UnsafeBits.getLong(b, offset); + offset += 8; + final long traceId = UnsafeBits.getLong(b, offset); + offset += 8; + final int orderIndex = UnsafeBits.getInt(b, offset); + offset += 4; + final Integer operationId = UnsafeBits.getInt(b, offset); + offset += 4; + + record = new BeforeOperationEvent(timestamp, traceId, + orderIndex, getStringFromRegistry(operationId)); + break; + } + case 2: { + if ((readSize - offset) < (8 + 8 + 4 + 4 + 4)) { + return createUnreadBytesArray(b, readSize, offset, true); + } + + final long timestamp = UnsafeBits.getLong(b, offset); + offset += 8; + final long traceId = UnsafeBits.getLong(b, offset); + offset += 8; + final int orderIndex = UnsafeBits.getInt(b, offset); + offset += 4; + final Integer operationId = UnsafeBits.getInt(b, offset); + offset += 4; + final Integer causeId = UnsafeBits.getInt(b, offset); + offset += 4; + + record = new AfterFailedOperationEvent(timestamp, traceId, + orderIndex, getStringFromRegistry(operationId), + getStringFromRegistry(causeId)); + break; + } + case 3: { + if ((readSize - offset) < (8 + 8 + 4 + 4)) { + return createUnreadBytesArray(b, readSize, offset, true); + } + + final long timestamp = UnsafeBits.getLong(b, offset); + offset += 8; + final long traceId = UnsafeBits.getLong(b, offset); + offset += 8; + final int orderIndex = UnsafeBits.getInt(b, offset); + offset += 4; + final Integer operationId = UnsafeBits.getInt(b, offset); + offset += 4; + + record = new AfterOperationEvent(timestamp, traceId, + orderIndex, getStringFromRegistry(operationId)); + break; + } + case 4: { + if ((readSize - offset) < (4 + 4)) { + return createUnreadBytesArray(b, readSize, offset, true); + } + + final Integer mapId = UnsafeBits.getInt(b, offset); + offset += 4; + final int stringLength = UnsafeBits.getInt(b, offset); + offset += 4; + + if ((readSize - offset) < stringLength) { + return createUnreadBytesArray(b, readSize, offset - 8, + true); + } + + final byte[] stringBytes = new byte[stringLength]; + System.arraycopy(b, offset, stringBytes, 0, stringLength); + final String string = new String(stringBytes); + offset += stringLength; + + addToRegistry(mapId, string); + + break; + } + default: { + System.out.println("unknown class id " + clazzId + + " at offset " + (offset - 4)); + return null; + } + } + + counter.inputObjects(record); + } + + return null; + } + + private byte[] createUnreadBytesArray(final byte[] b, final int readSize, + int offset, final boolean withClazzId) { + if (withClazzId) { + offset -= 4; + } + final int unreadBytesSize = readSize - offset; + final byte[] unreadBytes = new byte[unreadBytesSize]; + System.arraycopy(b, offset, unreadBytes, 0, unreadBytesSize); + return unreadBytes; + } + + public void addToRegistry(final Integer key, final String value) { + stringRegistryInternal.put(key, value); + stringRegistry = stringRegistryInternal.toImmutable().value(); + System.out.println(value); + + synchronized (this) { + notifyAll(); + } + } + + private String getStringFromRegistry(final Integer id) { + String result = stringRegistry.get(id); + while (result == null) { + try { + synchronized (this) { + System.out.println("waiting for " + id); + this.wait(); + } + } + catch (final InterruptedException e) { + e.printStackTrace(); + } + result = stringRegistry.get(id); + } + + return result; + } +} diff --git a/src/explorviz/worker/main/WorkerController.xtend b/src/explorviz/worker/main/WorkerController.xtend index ff55fb9618b7a6c57324fad2a9a3c44d111470b5..add695dcba2d226ef1b73c7ac49d7806af166a47 100644 --- a/src/explorviz/worker/main/WorkerController.xtend +++ b/src/explorviz/worker/main/WorkerController.xtend @@ -17,24 +17,20 @@ class WorkerController { var IAnalysisController analysisInstance def startWithCountingRecordsThroughput() { - analysisInstance = new AnalysisController() +// analysisInstance = new AnalysisController() val tcpReader = initTCPReader() - - val countingThroughputFilter = initCountingThroughputFilter() - val teeFilter = initTeeFilter() - analysisInstance.connect(tcpReader, TCPReader::OUTPUT_PORT_NAME_RECORDS, countingThroughputFilter, - CountingThroughputFilter::INPUT_PORT_NAME_OBJECTS) - - analysisInstance.connect(countingThroughputFilter, - CountingThroughputFilter::OUTPUT_PORT_NAME_THROUGHPUT, teeFilter, - TeeFilter::INPUT_PORT_NAME_EVENTS) - - try { - analysisInstance.run() - } catch (Exception e) { - e.printStackTrace - } + + tcpReader.read(); + +// val countingThroughputFilter = initCountingThroughputFilter() +// val teeFilter = initTeeFilter() +// analysisInstance.connect(tcpReader, TCPReader::OUTPUT_PORT_NAME_RECORDS, countingThroughputFilter, +// CountingThroughputFilter::INPUT_PORT_NAME_OBJECTS) +// +// analysisInstance.connect(countingThroughputFilter, +// CountingThroughputFilter::OUTPUT_PORT_NAME_THROUGHPUT, teeFilter, +// TeeFilter::INPUT_PORT_NAME_EVENTS) } def startWithCountingTracesThroughput() { @@ -46,16 +42,16 @@ class WorkerController { val countingThroughputFilter = initCountingThroughputFilter() val teeFilter = initTeeFilter() - analysisInstance.connect(tcpReader, TCPReader::OUTPUT_PORT_NAME_RECORDS, eventTraceReconstructionFilter, - EventRecordTraceReconstructionFilter::INPUT_PORT_NAME_TRACE_RECORDS) +// analysisInstance.connect(tcpReader, TCPReader::OUTPUT_PORT_NAME_RECORDS, eventTraceReconstructionFilter, +// EventRecordTraceReconstructionFilter::INPUT_PORT_NAME_TRACE_RECORDS) - analysisInstance.connect(eventTraceReconstructionFilter, - EventRecordTraceReconstructionFilter::OUTPUT_PORT_NAME_TRACE_VALID, countingThroughputFilter, - CountingThroughputFilter::INPUT_PORT_NAME_OBJECTS) - - analysisInstance.connect(countingThroughputFilter, - CountingThroughputFilter::OUTPUT_PORT_NAME_THROUGHPUT, teeFilter, - TeeFilter::INPUT_PORT_NAME_EVENTS) +// analysisInstance.connect(eventTraceReconstructionFilter, +// EventRecordTraceReconstructionFilter::OUTPUT_PORT_NAME_TRACE_VALID, countingThroughputFilter, +// CountingThroughputFilter::INPUT_PORT_NAME_OBJECTS) +// +// analysisInstance.connect(countingThroughputFilter, +// CountingThroughputFilter::OUTPUT_PORT_NAME_THROUGHPUT, teeFilter, +// TeeFilter::INPUT_PORT_NAME_EVENTS) try { analysisInstance.run() @@ -75,8 +71,8 @@ class WorkerController { val timer = initTimer() val tcpConnector = initTCPConnector() - analysisInstance.connect(tcpReader, TCPReader::OUTPUT_PORT_NAME_RECORDS, eventTraceReconstructionFilter, - EventRecordTraceReconstructionFilter::INPUT_PORT_NAME_TRACE_RECORDS) +// analysisInstance.connect(tcpReader, TCPReader::OUTPUT_PORT_NAME_RECORDS, eventTraceReconstructionFilter, +// EventRecordTraceReconstructionFilter::INPUT_PORT_NAME_TRACE_RECORDS) analysisInstance.connect(eventTraceReconstructionFilter, EventRecordTraceReconstructionFilter::OUTPUT_PORT_NAME_TRACE_VALID, aggregationFilter, @@ -99,9 +95,7 @@ class WorkerController { } def initTCPReader() { - val config = new Configuration() - config.setProperty(TCPReader::CONFIG_PROPERTY_NAME_PROVIDERURL, "127.0.0.1") - new TCPReader(config, analysisInstance) + new TCPReader() } def initEventRecordTraceReconstructionFilter() { @@ -122,9 +116,9 @@ class WorkerController { } def initCountingThroughputFilter() { - val config = new Configuration() - config.setProperty(CountingThroughputFilter::CONFIG_PROPERTY_NAME_INTERVAL_SIZE, "1000000000") - new CountingThroughputFilter(config, analysisInstance) +// val config = new Configuration() +// config.setProperty(CountingThroughputFilter::CONFIG_PROPERTY_NAME_INTERVAL_SIZE, "1000000000") + new CountingThroughputFilter() } def initTeeFilter() {