diff --git a/.classpath b/.classpath index 05566e3143dee15554b59748e680eb84d92f58dc..46938154629c35233d71aa842b90be2a7030041b 100644 --- a/.classpath +++ b/.classpath @@ -1,15 +1,10 @@ <?xml version="1.0" encoding="UTF-8"?> <classpath> - <classpathentry kind="con" path="org.eclipse.xtend.XTEND_CONTAINER"/> - <classpathentry kind="src" path="xtend-gen"/> <classpathentry excluding="kieker/analysis/plugin/reader/mq/" kind="src" path="src"/> <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/> - <classpathentry combineaccessrules="false" kind="src" path="/monitored-application"/> - <classpathentry kind="lib" path="lib/kieker-1.8-SNAPSHOT_fast.jar"/> - <classpathentry kind="lib" path="lib/disruptor-3.2.0.jar"/> - <classpathentry kind="lib" path="lib/javolution-core-java-6.0.0-20130804.010711-11.jar"> + <classpathentry kind="lib" path="lib/disruptor-3.2.0.jar"> <attributes> - <attribute name="javadoc_location" value="jar:platform:/resource/worker/lib/javolution-core-java-6.0.0-20130804.010711-11-javadoc.jar!/"/> + <attribute name="javadoc_location" value="jar:platform:/resource/worker/lib/disruptor-3.2.0-javadoc.jar!/"/> </attributes> </classpathentry> <classpathentry kind="output" path="bin"/> diff --git a/.project b/.project index 3dd2328f435a0fef8e2a084a490f444ff5d58da1..ea4c13bf9c36c1d3f8f3e4eca61afc379b1cb60b 100644 --- a/.project +++ b/.project @@ -5,11 +5,6 @@ <projects> </projects> <buildSpec> - <buildCommand> - <name>org.eclipse.xtext.ui.shared.xtextBuilder</name> - <arguments> - </arguments> - </buildCommand> <buildCommand> <name>org.eclipse.jdt.core.javabuilder</name> <arguments> @@ -18,6 +13,5 @@ </buildSpec> <natures> <nature>org.eclipse.jdt.core.javanature</nature> - <nature>org.eclipse.xtext.ui.shared.xtextNature</nature> </natures> </projectDescription> diff --git a/lib/disruptor-3.2.0-javadoc.jar b/lib/disruptor-3.2.0-javadoc.jar new file mode 100644 index 0000000000000000000000000000000000000000..8478a02a1237038b547ffcd7436883ecbc4017e3 Binary files /dev/null and b/lib/disruptor-3.2.0-javadoc.jar differ diff --git a/lib/javolution-core-java-6.0.0-20130804.010711-11-javadoc.jar b/lib/javolution-core-java-6.0.0-20130804.010711-11-javadoc.jar deleted file mode 100644 index 52b013030555bc0bc49fa386b11393b546a3a47e..0000000000000000000000000000000000000000 Binary files a/lib/javolution-core-java-6.0.0-20130804.010711-11-javadoc.jar and /dev/null differ diff --git a/lib/javolution-core-java-6.0.0-20130804.010711-11.jar b/lib/javolution-core-java-6.0.0-20130804.010711-11.jar deleted file mode 100644 index fbb64299c962959eebc6fdd29f79e821c91040fa..0000000000000000000000000000000000000000 Binary files a/lib/javolution-core-java-6.0.0-20130804.010711-11.jar and /dev/null differ diff --git a/src/explorviz/hpc_monitoring/byteaccess/UnsafeBits.java b/src/explorviz/hpc_monitoring/byteaccess/UnsafeBits.java new file mode 100644 index 0000000000000000000000000000000000000000..8ce42a07f961129e7202058fb73cf470b1d0ea32 --- /dev/null +++ b/src/explorviz/hpc_monitoring/byteaccess/UnsafeBits.java @@ -0,0 +1,38 @@ +package explorviz.hpc_monitoring.byteaccess; + +import java.lang.reflect.Field; +import sun.misc.Unsafe; + +public class UnsafeBits { + private static final Unsafe unsafe; + static { + try { + final Field field = Unsafe.class.getDeclaredField("theUnsafe"); + field.setAccessible(true); + unsafe = (Unsafe) field.get(null); + } + catch (final Exception e) { + throw new RuntimeException(e); + } + } + + private static final long byteArrayOffset = unsafe.arrayBaseOffset(byte[].class); + + public static final void putInt(final byte[] buffer, final int pos, + final int value) { + unsafe.putInt(buffer, byteArrayOffset + pos, value); + } + + public static final int getInt(final byte[] buffer, final int pos) { + return unsafe.getInt(buffer, byteArrayOffset + pos); + } + + public static final void putLong(final byte[] buffer, final int pos, + final long value) { + unsafe.putLong(buffer, byteArrayOffset + pos, value); + } + + public static final long getLong(final byte[] buffer, final int pos) { + return unsafe.getLong(buffer, byteArrayOffset + pos); + } +} \ No newline at end of file diff --git a/src/explorviz/hpc_monitoring/connector/TCPConnector.java b/src/explorviz/hpc_monitoring/connector/TCPConnector.java index 0f552c60c9a4e59148af2ab617d947eb543b134d..4719002ce510436420c42f34f5751b7e895ee73e 100644 --- a/src/explorviz/hpc_monitoring/connector/TCPConnector.java +++ b/src/explorviz/hpc_monitoring/connector/TCPConnector.java @@ -1,216 +1,85 @@ -/*************************************************************************** - * Copyright 2013 Kieker Project (http://kieker-monitoring.net) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - ***************************************************************************/ - package explorviz.hpc_monitoring.connector; -import java.io.*; -import kieker.analysis.IProjectContext; -import kieker.analysis.plugin.annotation.*; -import kieker.analysis.plugin.filter.AbstractFilterPlugin; -import kieker.common.configuration.Configuration; -import kieker.common.logging.Log; -import kieker.common.logging.LogFactory; -import kieker.common.record.IMonitoringRecord; -import explorviz.hpc_monitoring.byteaccess.UnsafeBits; - -/** - * A plugin used for kieker in the cloud. - * All incoming events are put into a RabbitMQ, but are also passed to an output - * port that can be used for - * testing purposes. - * - * @author Santje Finke - * - * @since 1.8 - * - */ -@Plugin(description = "A filter that writes all incoming events into a specified queue on a specified RabbitMQServer", outputPorts = { @OutputPort(name = TCPConnector.OUTPUT_PORT_NAME, eventTypes = { Object.class }, description = "Provides each incoming object") }, configuration = { - @Property(name = TCPConnector.CONFIG_PROPERTY_NAME_PROVIDER, defaultValue = "localhost"), - @Property(name = TCPConnector.CONFIG_PROPERTY_NAME_QUEUE, defaultValue = "master"), - @Property(name = TCPConnector.CONFIG_PROPERTY_PASSWORD, defaultValue = "guest"), - @Property(name = TCPConnector.CONFIG_PROPERTY_USER, defaultValue = "guest") }) -public class TCPConnector extends AbstractFilterPlugin { - private static final int MESSAGE_BUFFER_SIZE = 1500; - - /** - * The name of the input port receiving the incoming events. - */ - public static final String INPUT_PORT_NAME_INVALID_TRACES = "inputInvalidTraces"; - public static final String INPUT_PORT_NAME_VALID_TRACES = "inputValidTraces"; +import java.io.BufferedOutputStream; +import java.io.IOException; +import java.net.Socket; +import com.lmax.disruptor.EventHandler; +import explorviz.hpc_monitoring.reader.ByteArrayEvent; - /** - * The name of the output port passing the incoming events. - */ - public static final String OUTPUT_PORT_NAME = "relayedEvents"; - /** - * The name of the property determining the address of the used Server. - */ - public static final String CONFIG_PROPERTY_NAME_PROVIDER = "providerUrl"; - /** - * The name of the property determining the name of the Queue. - */ - public static final String CONFIG_PROPERTY_NAME_QUEUE = "queueName"; +public class TCPConnector implements EventHandler<ByteArrayEvent> { + private static final int MESSAGE_BUFFER_SIZE = 65536; - /** - * The username that is used to connect to a queue. - */ - public static final String CONFIG_PROPERTY_USER = "guest"; - /** - * The password that is used to connect to a queue. - */ - public static final String CONFIG_PROPERTY_PASSWORD = "guest"; + private String providerUrl; + private final int providerPort; - private static final Log LOG = LogFactory - .getLog(TCPConnector.class); + private Socket socket; - private final String providerUrl; + private BufferedOutputStream bufferedOutputStream; - private final byte[] validTracesMessages = new byte[MESSAGE_BUFFER_SIZE]; - private int validTracesMessagesOffset = 0; + public TCPConnector(final String providerUrl, final int providerPort) { + this.providerUrl = providerUrl; + this.providerPort = providerPort; - private final byte[] invalidTracesMessages = new byte[MESSAGE_BUFFER_SIZE]; - private int invalidTracesMessagesOffset = 0; - - public TCPConnector(final Configuration configuration, - final IProjectContext projectContext) { - super(configuration, projectContext); - providerUrl = configuration - .getStringProperty(CONFIG_PROPERTY_NAME_PROVIDER); try { - connect(); + connect(providerUrl); } catch (final IOException e) { e.printStackTrace(); } } - private void connect() throws IOException {} - - /** - * This method represents the input port of this filter. - * - * @param event - * The next event. - */ - @InputPort(name = INPUT_PORT_NAME_VALID_TRACES, eventTypes = { Object.class }, description = "Receives incoming objects to be forwarded to a queue") - public final void inputValidTraces(final Object monitoringRecord) { - try { - // if (monitoringRecord instanceof IMonitoringRecord) { - // byte[] message2 = toByteArray((IMonitoringRecord) - // monitoringRecord); - - final ByteArrayOutputStream boas = new ByteArrayOutputStream(); - - final ObjectOutputStream out = new ObjectOutputStream(boas); - out.writeObject(monitoringRecord); - out.close(); - - final byte[] message2 = boas.toByteArray(); // TODO optimize - - System.arraycopy(message2, 0, validTracesMessages, - validTracesMessagesOffset, message2.length); - validTracesMessagesOffset += message2.length; - - if ((validTracesMessagesOffset > (MESSAGE_BUFFER_SIZE - MESSAGE_BUFFER_SIZE))) { // TODO - // unsafe - // 200 - // || - // writeQueue.isEmpty() - UnsafeBits.putInt(validTracesMessages, - validTracesMessagesOffset, -1); - sendMessage(validTracesMessages, "validTracesMaster"); - validTracesMessagesOffset = 0; - } - } - catch (final IOException e) { - LOG.error("Error sending record", e); - } + private void connect(final String provider) throws IOException { + socket = new Socket(providerUrl, providerPort); + bufferedOutputStream = new BufferedOutputStream( + socket.getOutputStream(), MESSAGE_BUFFER_SIZE); } - /** - * This method represents the input port of this filter. - * - * @param event - * The next event. - */ - @InputPort(name = INPUT_PORT_NAME_INVALID_TRACES, eventTypes = { Object.class }, description = "Receives incoming objects to be forwarded to a queue") - public final void inputInvalidTraces(final Object monitoringRecord) { + public final void sendMessage(final byte[] message, final int length) { try { - // if (monitoringRecord instanceof IMonitoringRecord) { - // byte[] message2 = toByteArray((IMonitoringRecord) - // monitoringRecord); - - final ByteArrayOutputStream boas = new ByteArrayOutputStream(); - - final ObjectOutputStream out = new ObjectOutputStream(boas); - out.writeObject(monitoringRecord); - out.close(); - - final byte[] message2 = boas.toByteArray(); // TODO optimize - - System.arraycopy(message2, 0, invalidTracesMessages, - invalidTracesMessagesOffset, message2.length); - invalidTracesMessagesOffset += message2.length; - - if ((invalidTracesMessagesOffset > (MESSAGE_BUFFER_SIZE - MESSAGE_BUFFER_SIZE))) { // TODO - // unsafe - // 200 - // || - // writeQueue.isEmpty() - UnsafeBits.putInt(invalidTracesMessages, - invalidTracesMessagesOffset, -1); - sendMessage(invalidTracesMessages, "invalidTracesMaster"); - invalidTracesMessagesOffset = 0; - } + bufferedOutputStream.write(message, 0, length); + // if (endOfBatch) { + // bufferedOutputStream.flush(); + // } } catch (final IOException e) { - LOG.error("Error sending record", e); + e.printStackTrace(); } } - @SuppressWarnings("unused") - private byte[] toByteArray(final IMonitoringRecord monitoringRecord) { - return new byte[1]; // TODO - } - - private void sendMessage(final byte[] message, final String queueName) - throws IOException { - - synchronized (this) {} - } - - protected final void cleanup() { + public final void cleanup() { disconnect(); } private void disconnect() { - + if (socket.isConnected()) { + try { + socket.close(); + } + catch (final IOException e) { + System.out.println(e.toString()); + } + } } - @Override - public final String toString() { - final StringBuilder sb = new StringBuilder(128); - - return sb.toString(); + public void setProvider(final String provider) { + synchronized (this) { + if (!provider.equals(providerUrl)) { + disconnect(); + try { + connect(provider); + providerUrl = provider; + notifyAll(); + } + catch (final IOException e) { + e.printStackTrace(); + } + } + } } @Override - public Configuration getCurrentConfiguration() { - final Configuration configuration = new Configuration(); - configuration.setProperty(CONFIG_PROPERTY_NAME_PROVIDER, providerUrl); - return configuration; + public void onEvent(final ByteArrayEvent event, final long sequence, + final boolean endOfBatch) throws Exception { + sendMessage(event.getValue(), event.getLength()); } } diff --git a/src/explorviz/hpc_monitoring/filter/EventRecordTraceReconstructionFilter.java b/src/explorviz/hpc_monitoring/filter/EventRecordTraceReconstructionFilter.java deleted file mode 100644 index 0e92fab443663d624d0cee51de35ba92aec9dff9..0000000000000000000000000000000000000000 --- a/src/explorviz/hpc_monitoring/filter/EventRecordTraceReconstructionFilter.java +++ /dev/null @@ -1,421 +0,0 @@ -/*************************************************************************** - * Copyright 2013 Kieker Project (http://kieker-monitoring.net) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - ***************************************************************************/ - -package explorviz.hpc_monitoring.filter; - -import java.util.*; -import java.util.Map.Entry; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; -import kieker.analysis.IProjectContext; -import kieker.analysis.plugin.annotation.*; -import kieker.analysis.plugin.filter.AbstractFilterPlugin; -import kieker.common.configuration.Configuration; -import kieker.common.logging.Log; -import kieker.common.logging.LogFactory; -import kieker.common.record.flow.IFlowRecord; -import explorviz.hpc_monitoring.record.Trace; -import explorviz.hpc_monitoring.record.TraceEventRecords; -import explorviz.hpc_monitoring.record.events.*; - -/** - * @author Jan Waller - * - * @since 1.6 - */ -@Plugin(name = "Trace Reconstruction Filter (Event)", description = "Filter to reconstruct event based (flow) traces", outputPorts = { - @OutputPort(name = EventRecordTraceReconstructionFilter.OUTPUT_PORT_NAME_TRACE_VALID, description = "Outputs valid traces", eventTypes = { TraceEventRecords.class }), - @OutputPort(name = EventRecordTraceReconstructionFilter.OUTPUT_PORT_NAME_TRACE_INVALID, description = "Outputs traces missing crucial records", eventTypes = { TraceEventRecords.class }) }, configuration = { - @Property(name = EventRecordTraceReconstructionFilter.CONFIG_PROPERTY_NAME_TIMEUNIT, defaultValue = EventRecordTraceReconstructionFilter.CONFIG_PROPERTY_VALUE_TIMEUNIT), - @Property(name = EventRecordTraceReconstructionFilter.CONFIG_PROPERTY_NAME_MAX_TRACE_DURATION, defaultValue = EventRecordTraceReconstructionFilter.CONFIG_PROPERTY_VALUE_MAX_TIME), - @Property(name = EventRecordTraceReconstructionFilter.CONFIG_PROPERTY_NAME_MAX_TRACE_TIMEOUT, defaultValue = EventRecordTraceReconstructionFilter.CONFIG_PROPERTY_VALUE_MAX_TIME) }) -public final class EventRecordTraceReconstructionFilter extends - AbstractFilterPlugin { - /** - * The name of the output port delivering the valid traces. - */ - public static final String OUTPUT_PORT_NAME_TRACE_VALID = "validTraces"; - /** - * The name of the output port delivering the invalid traces. - */ - public static final String OUTPUT_PORT_NAME_TRACE_INVALID = "invalidTraces"; - /** - * The name of the input port receiving the trace records. - */ - public static final String INPUT_PORT_NAME_TRACE_RECORDS = "traceRecords"; - - /** - * The name of the property determining the time unit. - */ - public static final String CONFIG_PROPERTY_NAME_TIMEUNIT = "timeunit"; - /** - * The name of the property determining the maximal trace duration. - */ - public static final String CONFIG_PROPERTY_NAME_MAX_TRACE_DURATION = "maxTraceDuration"; - /** - * The name of the property determining the maximal trace timeout. - */ - public static final String CONFIG_PROPERTY_NAME_MAX_TRACE_TIMEOUT = "maxTraceTimeout"; - /** - * The default value of the properties for the maximal trace duration and - * timeout. - */ - public static final String CONFIG_PROPERTY_VALUE_MAX_TIME = "9223372036854775807"; // String.valueOf(Long.MAX_VALUE) - /** - * The default value of the time unit property (nanoseconds). - */ - public static final String CONFIG_PROPERTY_VALUE_TIMEUNIT = "NANOSECONDS"; // TimeUnit.NANOSECONDS.name() - - private static final Log LOG = LogFactory - .getLog(EventRecordTraceReconstructionFilter.class); - - private final TimeUnit timeunit; - private final long maxTraceDuration; - private final long maxTraceTimeout; - private final boolean timeout; - private long maxEncounteredLoggingTimestamp = -1; - - private final Map<Long, TraceBuffer> traceId2trace; - - /** - * Creates a new instance of this class using the given parameters. - * - * @param configuration - * The configuration for this component. - * @param projectContext - * The project context for this component. - */ - public EventRecordTraceReconstructionFilter( - final Configuration configuration, - final IProjectContext projectContext) { - super(configuration, projectContext); - - final String recordTimeunitProperty = projectContext - .getProperty(IProjectContext.CONFIG_PROPERTY_NAME_RECORDS_TIME_UNIT); - TimeUnit recordTimeunit; - try { - recordTimeunit = TimeUnit.valueOf(recordTimeunitProperty); - } - catch (final IllegalArgumentException ex) { // already caught in - // AnalysisController, - // should never happen - LOG.warn(recordTimeunitProperty - + " is no valid TimeUnit! Using NANOSECONDS instead."); - recordTimeunit = TimeUnit.NANOSECONDS; - } - timeunit = recordTimeunit; - - final String configTimeunitProperty = configuration - .getStringProperty(CONFIG_PROPERTY_NAME_TIMEUNIT); - TimeUnit configTimeunit; - try { - configTimeunit = TimeUnit.valueOf(configTimeunitProperty); - } - catch (final IllegalArgumentException ex) { - LOG.warn(configTimeunitProperty - + " is no valid TimeUnit! Using inherited value of " - + timeunit.name() + " instead."); - configTimeunit = timeunit; - } - - maxTraceDuration = timeunit.convert(configuration - .getLongProperty(CONFIG_PROPERTY_NAME_MAX_TRACE_DURATION), - configTimeunit); - maxTraceTimeout = timeunit.convert(configuration - .getLongProperty(CONFIG_PROPERTY_NAME_MAX_TRACE_TIMEOUT), - configTimeunit); - timeout = !((maxTraceTimeout == Long.MAX_VALUE) && (maxTraceDuration == Long.MAX_VALUE)); - traceId2trace = new ConcurrentHashMap<Long, TraceBuffer>(); - } - - /** - * This method is the input port for the new events for this filter. - * - * @param record - * The new record to handle. - */ - @InputPort(name = INPUT_PORT_NAME_TRACE_RECORDS, description = "Reconstruct traces from incoming flow records", eventTypes = { - Trace.class, AbstractOperationEvent.class }) - public void newEvent(final IFlowRecord record) { - final Long traceId; - TraceBuffer traceBuffer; - final long loggingTimestamp; - if (record instanceof Trace) { - traceId = ((Trace) record).getTraceId(); - traceBuffer = traceId2trace.get(traceId); - if (traceBuffer == null) { // first record for this id! - synchronized (this) { - traceBuffer = traceId2trace.get(traceId); - if (traceBuffer == null) { // NOCS (DCL) - traceBuffer = new TraceBuffer(); - traceId2trace.put(traceId, traceBuffer); - } - } - } - traceBuffer.setTrace((Trace) record); - loggingTimestamp = -1; - } - else if (record instanceof AbstractOperationEvent) { - traceId = ((AbstractOperationEvent) record).getTraceId(); - traceBuffer = traceId2trace.get(traceId); - if (traceBuffer == null) { // first record for this id! - synchronized (this) { - traceBuffer = traceId2trace.get(traceId); - if (traceBuffer == null) { // NOCS (DCL) - traceBuffer = new TraceBuffer(); - traceId2trace.put(traceId, traceBuffer); - } - } - } - traceBuffer.insertEvent((AbstractOperationEvent) record); - loggingTimestamp = ((AbstractOperationEvent) record) - .getLoggingTimestamp(); - } - else { - return; // invalid type which should not happen due to the specified - // eventTypes - } - if (traceBuffer.isFinished()) { - synchronized (this) { // has to be synchronized because of timeout - // cleanup - traceId2trace.remove(traceId); - } - super.deliver(OUTPUT_PORT_NAME_TRACE_VALID, - traceBuffer.toTraceEvents()); - } - if (timeout) { - synchronized (this) { - // can we assume a rough order of logging timestamps? (yes, - // except with DB reader) - if (loggingTimestamp > maxEncounteredLoggingTimestamp) { - maxEncounteredLoggingTimestamp = loggingTimestamp; - } - processTimeoutQueue(maxEncounteredLoggingTimestamp); - } - } - } - - /** - * {@inheritDoc} - */ - @Override - public void terminate(final boolean error) { - synchronized (this) { - for (final Entry<Long, TraceBuffer> entry : traceId2trace - .entrySet()) { - final TraceBuffer traceBuffer = entry.getValue(); - if (traceBuffer.isInvalid()) { - super.deliver(OUTPUT_PORT_NAME_TRACE_INVALID, - traceBuffer.toTraceEvents()); - } - else { - super.deliver(OUTPUT_PORT_NAME_TRACE_VALID, - traceBuffer.toTraceEvents()); - } - } - traceId2trace.clear(); - } - } - - // only called within synchronized! We assume timestamps >= 0 - private void processTimeoutQueue(final long timestamp) { - final long duration = timestamp - maxTraceDuration; - final long traceTimeout = timestamp - maxTraceTimeout; - for (final Iterator<Entry<Long, TraceBuffer>> iterator = traceId2trace - .entrySet().iterator(); iterator.hasNext();) { - final TraceBuffer traceBuffer = iterator.next().getValue(); - if ((traceBuffer.getMaxLoggingTimestamp() <= traceTimeout) // long - // time - // no see - || (traceBuffer.getMinLoggingTimestamp() <= duration)) { // max - // duration - // is - // gone - if (traceBuffer.isInvalid()) { - super.deliver(OUTPUT_PORT_NAME_TRACE_INVALID, - traceBuffer.toTraceEvents()); - } - else { - super.deliver(OUTPUT_PORT_NAME_TRACE_VALID, - traceBuffer.toTraceEvents()); - } - iterator.remove(); - } - } - } - - /** - * {@inheritDoc} - */ - @Override - public Configuration getCurrentConfiguration() { - final Configuration configuration = new Configuration(); - configuration.setProperty(CONFIG_PROPERTY_NAME_TIMEUNIT, - timeunit.name()); - configuration.setProperty(CONFIG_PROPERTY_NAME_MAX_TRACE_DURATION, - String.valueOf(maxTraceDuration)); - configuration.setProperty(CONFIG_PROPERTY_NAME_MAX_TRACE_TIMEOUT, - String.valueOf(maxTraceTimeout)); - return configuration; - } - - /** - * The TraceBuffer is synchronized to prevent problems with concurrent - * access. - * - * @author Jan Waller - */ - private static final class TraceBuffer { - private static final Log LOG = LogFactory - .getLog(TraceBuffer.class); - private static final Comparator<AbstractOperationEvent> COMPARATOR = new TraceOperationComperator(); - - private Trace trace; - private final SortedSet<AbstractOperationEvent> events = new TreeSet<AbstractOperationEvent>( - COMPARATOR); - - private boolean closeable; - private boolean damaged; - private int openEvents; - private int maxOrderIndex = -1; - - private long minLoggingTimestamp = Long.MAX_VALUE; - private long maxLoggingTimestamp = -1; - - private long traceId = -1; - - /** - * Creates a new instance of this class. - */ - public TraceBuffer() { - // default empty constructor - } - - public void insertEvent(final AbstractOperationEvent record) { - final long myTraceId = record.getTraceId(); - synchronized (this) { - if (traceId == -1) { - traceId = myTraceId; - } - else if (traceId != myTraceId) { - LOG.error("Invalid traceId! Expected: " + traceId - + " but found: " + myTraceId + " in event " - + record.toString()); - damaged = true; - } - final long loggingTimestamp = record.getLoggingTimestamp(); - if (loggingTimestamp > maxLoggingTimestamp) { - maxLoggingTimestamp = loggingTimestamp; - } - if (loggingTimestamp < minLoggingTimestamp) { - minLoggingTimestamp = loggingTimestamp; - } - final int orderIndex = record.getOrderIndex(); - if (orderIndex > maxOrderIndex) { - maxOrderIndex = orderIndex; - } - if (record instanceof BeforeOperationEvent) { - if (orderIndex == 0) { - closeable = true; - } - openEvents++; - } - else if (record instanceof AfterOperationEvent) { - openEvents--; - } - else if (record instanceof AfterFailedOperationEvent) { - openEvents--; - } - if (!events.add(record)) { - LOG.error("Duplicate entry for orderIndex " + orderIndex - + " with traceId " + myTraceId); - damaged = true; - } - } - } - - public void setTrace(final Trace trace) { - final long myTraceId = trace.getTraceId(); - synchronized (this) { - if (traceId == -1) { - traceId = myTraceId; - } - else if (traceId != myTraceId) { - LOG.error("Invalid traceId! Expected: " + traceId - + " but found: " + myTraceId + " in trace " - + trace.toString()); - damaged = true; - } - if (this.trace == null) { - this.trace = trace; - } - else { - LOG.error("Duplicate Trace entry for traceId " + myTraceId); - damaged = true; - } - } - } - - public boolean isFinished() { - synchronized (this) { - return closeable && !isInvalid(); - } - } - - public boolean isInvalid() { - synchronized (this) { - return (trace == null) - || damaged - || (openEvents != 0) - || (((maxOrderIndex + 1) != events.size()) || events - .isEmpty()); - } - } - - public TraceEventRecords toTraceEvents() { - synchronized (this) { - return new TraceEventRecords( - trace, - events.toArray(new AbstractOperationEvent[events.size()])); - } - } - - public long getMaxLoggingTimestamp() { - synchronized (this) { - return maxLoggingTimestamp; - } - } - - public long getMinLoggingTimestamp() { - synchronized (this) { - return minLoggingTimestamp; - } - } - - /** - * @author Jan Waller - */ - private static final class TraceOperationComperator implements - Comparator<AbstractOperationEvent> { - public TraceOperationComperator() {} - - public int compare(final AbstractOperationEvent o1, - final AbstractOperationEvent o2) { - return o1.getOrderIndex() - o2.getOrderIndex(); - } - } - } -} diff --git a/src/explorviz/hpc_monitoring/filter/PipelineStartFilter.java b/src/explorviz/hpc_monitoring/filter/PipelineStartFilter.java new file mode 100644 index 0000000000000000000000000000000000000000..4b5752c3bd467345875cc8420d1ce669c23e05a6 --- /dev/null +++ b/src/explorviz/hpc_monitoring/filter/PipelineStartFilter.java @@ -0,0 +1,18 @@ +package explorviz.hpc_monitoring.filter; + +import com.lmax.disruptor.EventHandler; +import explorviz.hpc_monitoring.filter.reconstruction.TraceReconstructionFilter; +import explorviz.hpc_monitoring.reader.RecordEvent; + +public class PipelineStartFilter implements EventHandler<RecordEvent> { + + private static final TraceReconstructionFilter reconstructionFilter = new TraceReconstructionFilter( + 5 * 1000 * 1000); + + @Override + public void onEvent(final RecordEvent event, final long sequence, + final boolean endOfBatch) throws Exception { + reconstructionFilter.newEvent(event.getValue()); + } + +} diff --git a/src/explorviz/hpc_monitoring/filter/TraceEventRecordAggregationFilter.java b/src/explorviz/hpc_monitoring/filter/TraceEventRecordAggregationFilter.java deleted file mode 100644 index 1edd4dd66153bb25f83d39d2894dacf11e090290..0000000000000000000000000000000000000000 --- a/src/explorviz/hpc_monitoring/filter/TraceEventRecordAggregationFilter.java +++ /dev/null @@ -1,263 +0,0 @@ -package explorviz.hpc_monitoring.filter; - -import java.util.*; -import java.util.concurrent.TimeUnit; -import kieker.analysis.IProjectContext; -import kieker.analysis.plugin.annotation.*; -import kieker.analysis.plugin.filter.AbstractFilterPlugin; -import kieker.common.configuration.Configuration; -import explorviz.hpc_monitoring.record.TraceEventRecords; -import explorviz.hpc_monitoring.record.events.AbstractOperationEvent; - -/** - * This filter collects incoming traces for a specified amount of time. - * Any traces representing the same series of events will be used to calculate - * statistic informations like the average runtime of this kind of trace. - * Only one specimen of these traces containing this information will be - * forwarded - * from this filter. - * - * Statistic outliers regarding the runtime of the trace will be treated special - * and therefore send out as they are and will not be mixed with others. - * - * - * @author Florian Biss - * - * @since 1.8 - */ - -@Plugin(description = "This filter tries to aggregate similar TraceEventRecordss into a single trace.", outputPorts = { @OutputPort(name = TraceEventRecordAggregationFilter.OUTPUT_PORT_NAME_TRACES, description = "Output port for the processed traces", eventTypes = { TraceEventRecords.class }) }, configuration = { - @Property(name = TraceEventRecordAggregationFilter.CONFIG_PROPERTY_NAME_TIMEUNIT, defaultValue = TraceEventRecordAggregationFilter.CONFIG_PROPERTY_VALUE_TIMEUNIT), - @Property(name = TraceEventRecordAggregationFilter.CONFIG_PROPERTY_NAME_MAX_COLLECTION_DURATION, defaultValue = TraceEventRecordAggregationFilter.CONFIG_PROPERTY_VALUE_MAX_COLLECTION_DURATION), - @Property(name = TraceEventRecordAggregationFilter.CONFIG_PROPERTY_NAME_MAX_DEVIATION, defaultValue = TraceEventRecordAggregationFilter.CONFIG_PROPERTY_VALUE_MAX_DEVIATION) }) -public class TraceEventRecordAggregationFilter extends AbstractFilterPlugin { - /** - * The name of the output port delivering the valid traces. - */ - public static final String OUTPUT_PORT_NAME_TRACES = "tracesOut"; - - /** - * The name of the input port receiving the trace records. - */ - public static final String INPUT_PORT_NAME_TRACES = "tracesIn"; - - /** - * The name of the property determining the time unit. - */ - public static final String CONFIG_PROPERTY_NAME_TIMEUNIT = "timeunit"; - - /** - * Clock input for timeout handling. - */ - public static final String INPUT_PORT_NAME_TIME_EVENT = "timestamp"; - - /** - * The default value of the time unit property (nanoseconds). - */ - public static final String CONFIG_PROPERTY_VALUE_TIMEUNIT = "NANOSECONDS"; // TimeUnit.NANOSECONDS.name() - - /** - * The name of the property determining the maximal trace timeout. - */ - public static final String CONFIG_PROPERTY_NAME_MAX_COLLECTION_DURATION = "maxCollectionDuration"; - - /** - * The default value of the property determining the maximal trace timeout. - */ - public static final String CONFIG_PROPERTY_VALUE_MAX_COLLECTION_DURATION = "4000000000"; - - /** - * The name of the property determining the maximal runtime deviation - * factor. - * - * Outliers are indicated by - * <code>|runtime - averageRuntime| > deviationFactor * standardDeviation</code> - * . - * Use negative number to aggregate all traces. - */ - public static final String CONFIG_PROPERTY_NAME_MAX_DEVIATION = "maxDeviation"; - - /** - * The default value of the property determining the maximal runtime - * deviation factor. - * Default is two standard deviations. - */ - public static final String CONFIG_PROPERTY_VALUE_MAX_DEVIATION = "-1"; - - private final TimeUnit timeunit; - private final long maxCollectionDuration; - private final long maxDeviation; - - private final Map<TraceEventRecords, TraceAggregationBuffer> trace2buffer; - - public TraceEventRecordAggregationFilter(final Configuration configuration, - final IProjectContext projectContext) { - super(configuration, projectContext); - - final String recordTimeunitProperty = projectContext - .getProperty(IProjectContext.CONFIG_PROPERTY_NAME_RECORDS_TIME_UNIT); - TimeUnit recordTimeunit; - try { - recordTimeunit = TimeUnit.valueOf(recordTimeunitProperty); - } - catch (final IllegalArgumentException ex) { - recordTimeunit = TimeUnit.NANOSECONDS; - } - timeunit = recordTimeunit; - - maxDeviation = configuration - .getLongProperty(CONFIG_PROPERTY_NAME_MAX_DEVIATION); - - maxCollectionDuration = timeunit.convert(configuration - .getLongProperty(CONFIG_PROPERTY_NAME_MAX_COLLECTION_DURATION), - timeunit); - trace2buffer = new TreeMap<TraceEventRecords, TraceAggregationBuffer>( - new TraceComperator()); - } - - @InputPort(name = INPUT_PORT_NAME_TRACES, description = "Collect identical traces and aggregate them.", eventTypes = { TraceEventRecords.class }) - public void newEvent(final TraceEventRecords event) { - synchronized (this) { - insertIntoBuffer(event); - } - } - - private void insertIntoBuffer(final TraceEventRecords trace) { - TraceAggregationBuffer traceBuffer; - traceBuffer = trace2buffer.get(trace); - - if (traceBuffer == null) { // first record for this id! - synchronized (this) { - traceBuffer = trace2buffer.get(trace); - - if (traceBuffer == null) { // NOCS (DCL) - traceBuffer = new TraceAggregationBuffer(System.nanoTime()); - trace2buffer.put(trace, traceBuffer); - } - - } - } - synchronized (this) { - traceBuffer.insertTrace(trace); - } - } - - @InputPort(name = INPUT_PORT_NAME_TIME_EVENT, description = "Time signal for timeouts", eventTypes = { Long.class }) - public void newEvent(final Long timestamp) { - synchronized (this) { - processTimeoutQueue(timestamp); - } - } - - /** - * {@inheritDoc} - */ - @Override - public void terminate(final boolean error) { - synchronized (this) { - for (final TraceAggregationBuffer traceBuffer : trace2buffer - .values()) { - super.deliver(OUTPUT_PORT_NAME_TRACES, - traceBuffer.getAggregatedTrace()); - } - trace2buffer.clear(); - } - } - - private void processTimeoutQueue(final long timestamp) { - final long bufferTimeout = timestamp - maxCollectionDuration; - List<TraceEventRecords> toRemove = new ArrayList<TraceEventRecords>(); - for (final TraceAggregationBuffer traceBuffer : trace2buffer.values()) { - if (traceBuffer.getBufferCreatedTimestamp() <= bufferTimeout) { - TraceEventRecords aggregatedTrace = traceBuffer - .getAggregatedTrace(); - super.deliver(OUTPUT_PORT_NAME_TRACES, aggregatedTrace); - toRemove.add(aggregatedTrace); - } - } - for (TraceEventRecords traceEventRecords : toRemove) { - trace2buffer.remove(traceEventRecords); - } - } - - /** - * {@inheritDoc} - */ - @Override - public Configuration getCurrentConfiguration() { - final Configuration configuration = new Configuration(); - configuration.setProperty(CONFIG_PROPERTY_NAME_TIMEUNIT, - timeunit.name()); - configuration.setProperty(CONFIG_PROPERTY_NAME_MAX_COLLECTION_DURATION, - String.valueOf(maxCollectionDuration)); - configuration.setProperty(CONFIG_PROPERTY_NAME_MAX_DEVIATION, - String.valueOf(maxDeviation)); - return configuration; - } - - private static final class TraceAggregationBuffer { - private TraceEventRecords accumulator; - - private final long bufferCreatedTimestamp; - - public TraceAggregationBuffer(final long bufferCreatedTimestamp) { - this.bufferCreatedTimestamp = bufferCreatedTimestamp; - } - - public long getBufferCreatedTimestamp() { - return bufferCreatedTimestamp; - } - - public TraceEventRecords getAggregatedTrace() { - return accumulator; - } - - public void insertTrace(final TraceEventRecords trace) { - aggregate(trace); - } - - private void aggregate(final TraceEventRecords trace) { - if (accumulator == null) { - accumulator = trace; - } - else { - final AbstractOperationEvent[] aggregatedRecords = accumulator - .getTraceOperations(); - final AbstractOperationEvent[] records = trace - .getTraceOperations(); - for (int i = 0; i < aggregatedRecords.length; i++) { - aggregatedRecords[i].getRuntime().merge( - records[i].getRuntime()); - } - - accumulator.getRuntime().merge(trace.getRuntime()); - } - } - } - - private static final class TraceComperator implements - Comparator<TraceEventRecords> { - - public TraceComperator() {} - - public int compare(final TraceEventRecords t1, - final TraceEventRecords t2) { - final AbstractOperationEvent[] recordsT1 = t1.getTraceOperations(); - final AbstractOperationEvent[] recordsT2 = t2.getTraceOperations(); - - if ((recordsT1.length - recordsT2.length) != 0) { - return recordsT1.length - recordsT2.length; - } - - final int cmpHostnames = t1.getTrace().getHostname() - .compareTo(t2.getTrace().getHostname()); - if (cmpHostnames != 0) { - return cmpHostnames; - } - - // TODO deep check records - return 0; - } - } - -} diff --git a/src/explorviz/hpc_monitoring/filter/CountingThroughputFilter.java b/src/explorviz/hpc_monitoring/filter/counting/CountingThroughputFilter.java similarity index 92% rename from src/explorviz/hpc_monitoring/filter/CountingThroughputFilter.java rename to src/explorviz/hpc_monitoring/filter/counting/CountingThroughputFilter.java index 7988e7468312bece03c79809ee30a3ba1859c1ef..52e62a6309d021cb0b5351377837b1748fd018dc 100644 --- a/src/explorviz/hpc_monitoring/filter/CountingThroughputFilter.java +++ b/src/explorviz/hpc_monitoring/filter/counting/CountingThroughputFilter.java @@ -14,7 +14,7 @@ * limitations under the License. ***************************************************************************/ -package explorviz.hpc_monitoring.filter; +package explorviz.hpc_monitoring.filter.counting; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -31,8 +31,10 @@ public final class CountingThroughputFilter { private volatile long firstTimestampInCurrentInterval = -1; private volatile long lastTimestampInCurrentInterval = -1; + private final String beforeOut; - public CountingThroughputFilter() { + public CountingThroughputFilter(final String beforeOut) { + this.beforeOut = beforeOut; timeunit = TimeUnit.NANOSECONDS; intervalSize = timeunit.convert(1000000000, TimeUnit.NANOSECONDS); } @@ -46,7 +48,7 @@ public final class CountingThroughputFilter { if (firstTimestampInCurrentInterval >= 0) { final long currentCount = currentCountForCurrentInterval .get(); - System.out.println(currentCount); + System.out.println(beforeOut + ": " + currentCount); } firstTimestampInCurrentInterval = startOfTimestampsInterval; diff --git a/src/explorviz/hpc_monitoring/filter/reconstruction/TraceBuffer.java b/src/explorviz/hpc_monitoring/filter/reconstruction/TraceBuffer.java new file mode 100644 index 0000000000000000000000000000000000000000..f1c60061151fb6fc59c43f90452335984274479c --- /dev/null +++ b/src/explorviz/hpc_monitoring/filter/reconstruction/TraceBuffer.java @@ -0,0 +1,131 @@ +package explorviz.hpc_monitoring.filter.reconstruction; + +import java.io.Serializable; +import java.util.*; +import explorviz.hpc_monitoring.record.Trace; +import explorviz.hpc_monitoring.record.TraceMetadata; +import explorviz.hpc_monitoring.record.events.AbstractOperationEvent; +import explorviz.hpc_monitoring.record.events.normal.*; + +public class TraceBuffer { + private static final Comparator<AbstractOperationEvent> COMPARATOR = new AbstractOperationEventComperator(); + + private TraceMetadata traceMetadata; + private final TreeSet<AbstractOperationEvent> events = new TreeSet<AbstractOperationEvent>( + COMPARATOR); + + private boolean closeable; + private boolean damaged; + + private int openEvents; + + private int maxOrderIndex = -1; + + private long maxLoggingTimestamp = -1; + + public TraceBuffer() { + // default empty constructor + } + + public final long getMaxLoggingTimestamp() { + synchronized (this) { + return maxLoggingTimestamp; + } + } + + public void insertEvent(final AbstractOperationEvent event) { + synchronized (this) { + setMaxLoggingTimestamp(event); + final int orderIndex = setMaxOrderIndex(event); + + if (event instanceof BeforeOperationEvent) { + if (orderIndex == 0) { + closeable = true; + } + openEvents++; + } + else if (event instanceof AfterOperationEvent) { + openEvents--; + } + else if (event instanceof AfterFailedOperationEvent) { + openEvents--; + } + + if (!events.add(event)) { + System.out.println("Duplicate entry for orderIndex " + + orderIndex + " with traceId " + + traceMetadata.getTraceId()); + damaged = true; + } + } + } + + private final void setMaxLoggingTimestamp(final AbstractOperationEvent event) { + final long loggingTimestamp = event.getLoggingTimestamp(); + if (loggingTimestamp > maxLoggingTimestamp) { + maxLoggingTimestamp = loggingTimestamp; + } + } + + private final int setMaxOrderIndex(final AbstractOperationEvent event) { + final int orderIndex = event.getOrderIndex(); + if (orderIndex > maxOrderIndex) { + maxOrderIndex = orderIndex; + } + return orderIndex; + } + + public void setTrace(final TraceMetadata trace) { + synchronized (this) { + if (traceMetadata != null) { + System.out.println("Duplicate Trace entry for traceId " + + trace.getTraceId()); + damaged = true; + return; + } + traceMetadata = trace; + } + } + + public final boolean isFinished() { + synchronized (this) { + return !isInvalid() && closeable; + } + } + + public final boolean isInvalid() { + synchronized (this) { + return (((maxOrderIndex + 1) != events.size()) || events.isEmpty() + || (openEvents != 0) || (traceMetadata == null) || damaged); + } + } + + public final Trace toTrace() { // TODO still very slow! + synchronized (this) { + final AbstractOperationEvent[] arrayEvents = new AbstractOperationEvent[events + .size()]; + final Iterator<AbstractOperationEvent> iterator = events.iterator(); + int index = 0; + while (iterator.hasNext()) { + arrayEvents[index] = iterator.next(); + index++; + } + + return new Trace(traceMetadata, arrayEvents); + } + } + + /** + * @author Jan Waller + */ + private static final class AbstractOperationEventComperator implements + Comparator<AbstractOperationEvent>, Serializable { + private static final long serialVersionUID = 8920737343446332517L; + + @Override + public int compare(final AbstractOperationEvent o1, + final AbstractOperationEvent o2) { + return o1.getOrderIndex() - o2.getOrderIndex(); + } + } +} diff --git a/src/explorviz/hpc_monitoring/filter/reconstruction/TraceReconstructionFilter.java b/src/explorviz/hpc_monitoring/filter/reconstruction/TraceReconstructionFilter.java new file mode 100644 index 0000000000000000000000000000000000000000..689bcf92c4bef52d67176a2f1af8bed5b72293c3 --- /dev/null +++ b/src/explorviz/hpc_monitoring/filter/reconstruction/TraceReconstructionFilter.java @@ -0,0 +1,101 @@ +package explorviz.hpc_monitoring.filter.reconstruction; + +import java.util.Iterator; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import explorviz.hpc_monitoring.filter.counting.CountingThroughputFilter; +import explorviz.hpc_monitoring.record.IRecord; +import explorviz.hpc_monitoring.record.TraceMetadata; +import explorviz.hpc_monitoring.record.events.AbstractOperationEvent; + +public final class TraceReconstructionFilter { + private static final CountingThroughputFilter counter = new CountingThroughputFilter( + "Reconstructed traces per second"); + + private final long maxTraceTimeout; + + private final ConcurrentMap<Long, TraceBuffer> traceId2trace = new ConcurrentHashMap<Long, TraceBuffer>( + 128, + 0.75f, + 2); + + public TraceReconstructionFilter(final long maxTraceTimeout) { + this.maxTraceTimeout = maxTraceTimeout; + } + + public void periodicTimeSignal(final long timestamp) { + checkForTimeouts(timestamp); + } + + private void checkForTimeouts(final long timestamp) { + final long traceTimeout = timestamp - maxTraceTimeout; + for (final Iterator<Entry<Long, TraceBuffer>> iterator = traceId2trace + .entrySet().iterator(); iterator.hasNext();) { + final TraceBuffer traceBuffer = iterator.next().getValue(); + if (traceBuffer.getMaxLoggingTimestamp() <= traceTimeout) { + sentOutTrace(traceBuffer); + iterator.remove(); + } + } + } + + public void newEvent(final IRecord record) { + if (record instanceof TraceMetadata) { + final TraceMetadata traceMetadata = ((TraceMetadata) record); + + final Long traceId = traceMetadata.getTraceId(); + final TraceBuffer traceBuffer = getBufferForTraceId(traceId); + traceBuffer.setTrace(traceMetadata); + } + else if (record instanceof AbstractOperationEvent) { + final AbstractOperationEvent abstractOperationEvent = ((AbstractOperationEvent) record); + + final Long traceId = abstractOperationEvent.getTraceId(); + final TraceBuffer traceBuffer = getBufferForTraceId(traceId); + traceBuffer.insertEvent(abstractOperationEvent); + + if (traceBuffer.isFinished()) { + synchronized (this) { + traceId2trace.remove(traceId); + } + sentOutTrace(traceBuffer); + } + } + } + + private TraceBuffer getBufferForTraceId(final Long traceId) { + TraceBuffer traceBuffer = traceId2trace.get(traceId); + if (traceBuffer == null) { // first record for this id! + synchronized (this) { + traceBuffer = traceId2trace.get(traceId); + if (traceBuffer == null) { // NOCS (DCL) + traceBuffer = new TraceBuffer(); + traceId2trace.put(traceId, traceBuffer); + } + } + } + return traceBuffer; + } + + public void terminate() { + synchronized (this) { + for (final Entry<Long, TraceBuffer> entry : traceId2trace + .entrySet()) { + sentOutTrace(entry.getValue()); + } + traceId2trace.clear(); + } + } + + private void sentOutTrace(final TraceBuffer traceBuffer) { + if (traceBuffer.isInvalid()) { + // TODO invalid out + counter.inputObjects(traceBuffer.toTrace()); + } + else { + // TODO valid out + counter.inputObjects(traceBuffer.toTrace()); + } + } +} \ No newline at end of file diff --git a/src/explorviz/hpc_monitoring/filter/reduction/TraceEventRecordAggregationFilter.java b/src/explorviz/hpc_monitoring/filter/reduction/TraceEventRecordAggregationFilter.java new file mode 100644 index 0000000000000000000000000000000000000000..8d0eac04a70f4a09967ee32b1604ad5740beece7 --- /dev/null +++ b/src/explorviz/hpc_monitoring/filter/reduction/TraceEventRecordAggregationFilter.java @@ -0,0 +1,147 @@ +package explorviz.hpc_monitoring.filter.reduction; + +import java.util.*; +import java.util.concurrent.TimeUnit; +import explorviz.hpc_monitoring.record.Trace; +import explorviz.hpc_monitoring.record.events.AbstractOperationEvent; + +public class TraceEventRecordAggregationFilter { + private final TimeUnit timeunit; + private final long maxCollectionDuration; + + private final Map<Trace, TraceAggregationBuffer> trace2buffer; + + public TraceEventRecordAggregationFilter() { + timeunit = TimeUnit.NANOSECONDS; + + maxCollectionDuration = timeunit.convert(50000000L, timeunit); + trace2buffer = new TreeMap<Trace, TraceAggregationBuffer>( + new TraceComperator()); + } + + public void newEvent(final Trace event) { + synchronized (this) { + insertIntoBuffer(event); + } + } + + private void insertIntoBuffer(final Trace trace) { + TraceAggregationBuffer traceBuffer; + traceBuffer = trace2buffer.get(trace); + + if (traceBuffer == null) { // first record for this id! + synchronized (this) { + traceBuffer = trace2buffer.get(trace); + + if (traceBuffer == null) { // NOCS (DCL) + traceBuffer = new TraceAggregationBuffer(System.nanoTime()); + trace2buffer.put(trace, traceBuffer); + } + + } + } + synchronized (this) { + traceBuffer.insertTrace(trace); + } + } + + public void newEvent(final Long timestamp) { + synchronized (this) { + processTimeoutQueue(timestamp); + } + } + + public void terminate(final boolean error) { + synchronized (this) { + // for (final TraceAggregationBuffer traceBuffer : trace2buffer + // .values()) { + // // super.deliver(OUTPUT_PORT_NAME_TRACES, + // // traceBuffer.getAggregatedTrace()); + // } + trace2buffer.clear(); + } + } + + private void processTimeoutQueue(final long timestamp) { + final long bufferTimeout = timestamp - maxCollectionDuration; + final List<Trace> toRemove = new ArrayList<Trace>(); + for (final TraceAggregationBuffer traceBuffer : trace2buffer.values()) { + if (traceBuffer.getBufferCreatedTimestamp() <= bufferTimeout) { + final Trace aggregatedTrace = traceBuffer + .getAggregatedTrace(); + // super.deliver(OUTPUT_PORT_NAME_TRACES, aggregatedTrace); + toRemove.add(aggregatedTrace); + } + } + for (final Trace traceEventRecords : toRemove) { + trace2buffer.remove(traceEventRecords); + } + } + + private static final class TraceAggregationBuffer { + private Trace accumulator; + + private final long bufferCreatedTimestamp; + + public TraceAggregationBuffer(final long bufferCreatedTimestamp) { + this.bufferCreatedTimestamp = bufferCreatedTimestamp; + } + + public long getBufferCreatedTimestamp() { + return bufferCreatedTimestamp; + } + + public Trace getAggregatedTrace() { + return accumulator; + } + + public void insertTrace(final Trace trace) { + aggregate(trace); + } + + private void aggregate(final Trace trace) { + if (accumulator == null) { + accumulator = trace; + } + else { + final AbstractOperationEvent[] aggregatedRecords = accumulator + .getTraceOperations(); + final AbstractOperationEvent[] records = trace + .getTraceOperations(); + for (int i = 0; i < aggregatedRecords.length; i++) { + aggregatedRecords[i].getRuntime().merge( + records[i].getRuntime()); + } + + accumulator.getRuntime().merge(trace.getRuntime()); + } + } + } + + private static final class TraceComperator implements + Comparator<Trace> { + + public TraceComperator() {} + + @Override + public int compare(final Trace t1, + final Trace t2) { + final AbstractOperationEvent[] recordsT1 = t1.getTraceOperations(); + final AbstractOperationEvent[] recordsT2 = t2.getTraceOperations(); + + if ((recordsT1.length - recordsT2.length) != 0) { + return recordsT1.length - recordsT2.length; + } + + final int cmpHostnames = t1.getTraceMetadata().getHostname() + .compareTo(t2.getTraceMetadata().getHostname()); + if (cmpHostnames != 0) { + return cmpHostnames; + } + + // TODO deep check records + return 0; + } + } + +} diff --git a/src/explorviz/hpc_monitoring/reader/ByteArrayEvent.java b/src/explorviz/hpc_monitoring/reader/ByteArrayEvent.java new file mode 100644 index 0000000000000000000000000000000000000000..959decea72a3fbfa142c50cdd15c4697914c621c --- /dev/null +++ b/src/explorviz/hpc_monitoring/reader/ByteArrayEvent.java @@ -0,0 +1,36 @@ +package explorviz.hpc_monitoring.reader; + +import com.lmax.disruptor.EventFactory; + +/** + * WARNING: This is a mutable object which will be recycled by the RingBuffer. + * You must take a copy of data it holds + * before the framework recycles it. + */ +public final class ByteArrayEvent { + private byte[] value; + private int length; + + public final byte[] getValue() { + return value; + } + + public void setValue(final byte[] value) { + this.value = value; + } + + public final int getLength() { + return length; + } + + public void setLength(final int length) { + this.length = length; + } + + public final static EventFactory<ByteArrayEvent> EVENT_FACTORY = new EventFactory<ByteArrayEvent>() { + @Override + public ByteArrayEvent newInstance() { + return new ByteArrayEvent(); + } + }; +} \ No newline at end of file diff --git a/src/explorviz/hpc_monitoring/reader/TestFilter.java b/src/explorviz/hpc_monitoring/reader/MessageDistributer.java similarity index 79% rename from src/explorviz/hpc_monitoring/reader/TestFilter.java rename to src/explorviz/hpc_monitoring/reader/MessageDistributer.java index 6281074bffb3114cf0c779fe7c21c8b0b50b6715..3fed683e5776d8b2c8ff5c53d1fc2ae4fbbb8705 100644 --- a/src/explorviz/hpc_monitoring/reader/TestFilter.java +++ b/src/explorviz/hpc_monitoring/reader/MessageDistributer.java @@ -1,52 +1,59 @@ package explorviz.hpc_monitoring.reader; -import java.util.Map; -import javolution.util.FastMap; -import kieker.common.record.IMonitoringRecord; +import java.util.concurrent.*; import com.lmax.disruptor.EventHandler; +import com.lmax.disruptor.RingBuffer; +import com.lmax.disruptor.dsl.Disruptor; import explorviz.hpc_monitoring.byteaccess.UnsafeBits; -import explorviz.hpc_monitoring.filter.CountingThroughputFilter; -import explorviz.hpc_monitoring.record.Trace; -import explorviz.hpc_monitoring.record.events.*; -import explorviz.hpc_monitoring.writer.RecordEvent; +import explorviz.hpc_monitoring.filter.PipelineStartFilter; +import explorviz.hpc_monitoring.record.IRecord; +import explorviz.hpc_monitoring.record.TraceMetadata; +import explorviz.hpc_monitoring.record.events.normal.*; -public class TestFilter implements EventHandler<RecordEvent> { +public class MessageDistributer implements EventHandler<ByteArrayEvent> { - private final CountingThroughputFilter counter = new CountingThroughputFilter(); + private final ConcurrentMap<Integer, String> stringRegistry = new ConcurrentHashMap<Integer, String>( + 16, + 0.75f, + 2); - private final FastMap<Integer, String> stringRegistryInternal = new FastMap<Integer, String>(); - private Map<Integer, String> stringRegistry = stringRegistryInternal - .toImmutable() - .value(); + private byte[] unreadBytes = null; - private byte[] unreadBytes = null; + private final RingBuffer<RecordEvent> ringBuffer; + + @SuppressWarnings("unchecked") + public MessageDistributer() { + final ExecutorService exec = Executors.newCachedThreadPool(); + final Disruptor<RecordEvent> disruptor = new Disruptor<RecordEvent>( + RecordEvent.EVENT_FACTORY, 32768, exec); + + final EventHandler<RecordEvent>[] eventHandlers = new EventHandler[1]; + eventHandlers[0] = new PipelineStartFilter(); + disruptor.handleEventsWith(eventHandlers); + ringBuffer = disruptor.start(); + } @Override - public void onEvent(final RecordEvent event, final long sequence, + public void onEvent(final ByteArrayEvent event, final long sequence, final boolean endOfBatch) throws Exception { final byte[] received = event.getValue(); final int receivedLength = event.getLength(); - byte[] messages = null; - int messagesLength = 0; + byte[] messages = received; + int messagesLength = receivedLength; if (unreadBytes != null) { final int unreadBytesLength = unreadBytes.length; - messagesLength = receivedLength + unreadBytesLength; + messagesLength += unreadBytesLength; messages = new byte[messagesLength]; System.arraycopy(unreadBytes, 0, messages, 0, unreadBytesLength); System.arraycopy(received, 0, messages, unreadBytesLength, receivedLength); } - else { - messages = received; - messagesLength = receivedLength; - } unreadBytes = messagesfromByteArray(messages, messagesLength); - counter.inputObjects(messages); } private byte[] messagesfromByteArray(final byte[] b, final int readSize) { @@ -60,7 +67,7 @@ public class TestFilter implements EventHandler<RecordEvent> { final int clazzId = UnsafeBits.getInt(b, offset); offset += 4; - IMonitoringRecord record = null; + IRecord record = null; switch (clazzId) { case 0: { @@ -79,7 +86,7 @@ public class TestFilter implements EventHandler<RecordEvent> { final Integer applicationId = UnsafeBits.getInt(b, offset); offset += 4; - record = new Trace(traceId, + record = new TraceMetadata(traceId, getStringFromRegistry(hostnameId), parentTraceId, parentOrderId, getStringFromRegistry(applicationId)); break; @@ -172,7 +179,7 @@ public class TestFilter implements EventHandler<RecordEvent> { } } - counter.inputObjects(record); + putInRingBuffer(record); } return null; @@ -189,10 +196,15 @@ public class TestFilter implements EventHandler<RecordEvent> { return unreadBytes; } + private void putInRingBuffer(final IRecord record) { + final long hiseq = ringBuffer.next(); + final RecordEvent valueEvent = ringBuffer.get(hiseq); + valueEvent.setValue(record); + ringBuffer.publish(hiseq); + } + public void addToRegistry(final Integer key, final String value) { - stringRegistryInternal.put(key, value); - stringRegistry = stringRegistryInternal.toImmutable().value(); - System.out.println(value); + stringRegistry.put(key, value); synchronized (this) { notifyAll(); diff --git a/src/explorviz/hpc_monitoring/reader/RecordEvent.java b/src/explorviz/hpc_monitoring/reader/RecordEvent.java new file mode 100644 index 0000000000000000000000000000000000000000..f2312eff38f72901f2f57fdb071116250bf6e1cf --- /dev/null +++ b/src/explorviz/hpc_monitoring/reader/RecordEvent.java @@ -0,0 +1,28 @@ +package explorviz.hpc_monitoring.reader; + +import com.lmax.disruptor.EventFactory; +import explorviz.hpc_monitoring.record.IRecord; + +/** + * WARNING: This is a mutable object which will be recycled by the RingBuffer. + * You must take a copy of data it holds + * before the framework recycles it. + */ +public final class RecordEvent { + private IRecord value; + + public final IRecord getValue() { + return value; + } + + public void setValue(final IRecord value) { + this.value = value; + } + + public final static EventFactory<RecordEvent> EVENT_FACTORY = new EventFactory<RecordEvent>() { + @Override + public RecordEvent newInstance() { + return new RecordEvent(); + } + }; +} \ No newline at end of file diff --git a/src/explorviz/hpc_monitoring/reader/TCPReader.java b/src/explorviz/hpc_monitoring/reader/TCPReader.java index 9165731fbf60270be08554eedfd6a3bccf19a793..fcf6d54e6e340b4b7560ab989e99264d4b1a4c87 100644 --- a/src/explorviz/hpc_monitoring/reader/TCPReader.java +++ b/src/explorviz/hpc_monitoring/reader/TCPReader.java @@ -1,19 +1,3 @@ -/*************************************************************************** - * Copyright 2013 Kieker Project (http://kieker-monitoring.net) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - ***************************************************************************/ - package explorviz.hpc_monitoring.reader; import java.io.BufferedInputStream; @@ -22,42 +6,32 @@ import java.net.ServerSocket; import java.net.Socket; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import kieker.common.logging.Log; -import kieker.common.logging.LogFactory; +import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.dsl.Disruptor; -import explorviz.hpc_monitoring.writer.RecordEvent; public final class TCPReader { - private static final int MESSAGE_BUFFER_SIZE = 65536; - - private final byte[] messages = new byte[MESSAGE_BUFFER_SIZE]; - - public static final String CONFIG_PROPERTY_NAME_PROVIDERURL = "127.0.0.1"; - public static final int CONFIG_PROPERTY_PORT = 10133; - - static final Log LOG = LogFactory - .getLog(TCPReader.class); - - private final int port; + private static final int MESSAGE_BUFFER_SIZE = 65536; + private final byte[] messages = new byte[MESSAGE_BUFFER_SIZE]; - private ServerSocket serversocket; - private boolean active = true; + private final int listeningPort; - private final TestFilter eventHandler1; + private ServerSocket serversocket; + private boolean active = true; - private final RingBuffer<RecordEvent> ringBuffer; + private final RingBuffer<ByteArrayEvent> ringBuffer; - @SuppressWarnings("unchecked") - public TCPReader() throws IllegalArgumentException { - port = 10133; + public TCPReader(final int listeningPort) throws IllegalArgumentException { + this.listeningPort = listeningPort; final ExecutorService exec = Executors.newCachedThreadPool(); - final Disruptor<RecordEvent> disruptor = new Disruptor<RecordEvent>( - RecordEvent.EVENT_FACTORY, 8192, exec); + final Disruptor<ByteArrayEvent> disruptor = new Disruptor<ByteArrayEvent>( + ByteArrayEvent.EVENT_FACTORY, 4096, exec); - eventHandler1 = new TestFilter(); - disruptor.handleEventsWith(eventHandler1); + @SuppressWarnings("unchecked") + final EventHandler<ByteArrayEvent>[] eventHandlers = new EventHandler[1]; + eventHandlers[0] = new MessageDistributer(); + disruptor.handleEventsWith(eventHandlers); ringBuffer = disruptor.start(); } @@ -79,25 +53,25 @@ public final class TCPReader { } } catch (final IOException ex) { - LOG.error("Error in read()", ex); + System.out.println("Error in read() " + ex.toString()); } finally { try { serversocket.close(); } catch (final IOException e) { - LOG.error("Error in read()", e); + System.out.println("Error in read()" + e.toString()); } } } private void open() throws IOException { - serversocket = new ServerSocket(port); + serversocket = new ServerSocket(listeningPort); } private void putInRingBuffer(final byte[] message, final int readBytes) { final long hiseq = ringBuffer.next(); - final RecordEvent valueEvent = ringBuffer.get(hiseq); + final ByteArrayEvent valueEvent = ringBuffer.get(hiseq); final byte[] toSaveCopy = new byte[readBytes]; System.arraycopy(message, 0, toSaveCopy, 0, readBytes); valueEvent.setValue(toSaveCopy); @@ -106,7 +80,7 @@ public final class TCPReader { } public void terminate(final boolean error) { - LOG.info("Shutdown of TCPReader requested."); + System.out.println("Shutdown of TCPReader requested."); active = false; } } diff --git a/src/explorviz/hpc_monitoring/reader/TimeReader.java b/src/explorviz/hpc_monitoring/reader/TimeReader.java new file mode 100644 index 0000000000000000000000000000000000000000..732999eb79133a481f367ad37bbd9cd34c1467b9 --- /dev/null +++ b/src/explorviz/hpc_monitoring/reader/TimeReader.java @@ -0,0 +1,55 @@ +package explorviz.hpc_monitoring.reader; + +import java.util.concurrent.*; + +public final class TimeReader { + private final long period; + + private boolean terminated = false; + + private final ScheduledExecutorService executorService; + private ScheduledFuture<?> result; + + public TimeReader(final long periodInNanoSec) { + // period = 10 * 1000 * 1000; + period = periodInNanoSec; + executorService = new ScheduledThreadPoolExecutor(1); + } + + public void read() { + result = executorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + sendTimestampEvent(); + } + }, 0, period, TimeUnit.NANOSECONDS); + try { + result.get(); + } + catch (final ExecutionException ex) { + terminate(); + } + catch (final InterruptedException ignore) {} + catch (final CancellationException ignore) {} + terminate(); + } + + protected void sendTimestampEvent() { + // final long timestamp = System.nanoTime(); + // super.deliver(OUTPUT_PORT_NAME_TIMESTAMPS, timestamp); + } + + public void terminate() { + if (!terminated) { + executorService.shutdown(); + try { + terminated = executorService.awaitTermination(5, + TimeUnit.SECONDS); + } + catch (final InterruptedException ex) {} + if (!terminated) { + result.cancel(true); + } + } + } +} diff --git a/src/explorviz/hpc_monitoring/record/IRecord.java b/src/explorviz/hpc_monitoring/record/IRecord.java new file mode 100644 index 0000000000000000000000000000000000000000..1350246777a8093868efb5dc9c35efeefd5507db --- /dev/null +++ b/src/explorviz/hpc_monitoring/record/IRecord.java @@ -0,0 +1,5 @@ +package explorviz.hpc_monitoring.record; + +public interface IRecord { + +} diff --git a/src/explorviz/hpc_monitoring/record/Trace.java b/src/explorviz/hpc_monitoring/record/Trace.java index 2c631c42ea02f9a33b41062386f765399d151bcd..9a6f00158ed05cbe46686534803465f6b7f3117e 100644 --- a/src/explorviz/hpc_monitoring/record/Trace.java +++ b/src/explorviz/hpc_monitoring/record/Trace.java @@ -1,75 +1,89 @@ package explorviz.hpc_monitoring.record; -import kieker.common.record.IMonitoringRecord; -import kieker.common.record.flow.IFlowRecord; - -public class Trace implements IFlowRecord { - private static final long serialVersionUID = 4028885353683254444L; - - private final long traceId; - private final String hostname; - private final long parentTraceId; - private final int parentOrderId; - private final String application; - - public Trace(final long traceId, final String hostname, - final long parentTraceId, final int parentOrderId, - final String application) { - this.traceId = traceId; - this.hostname = hostname; - this.parentTraceId = parentTraceId; - this.parentOrderId = parentOrderId; - this.application = application; +import java.io.Serializable; +import java.util.Arrays; +import explorviz.hpc_monitoring.record.events.AbstractOperationEvent; + +public final class Trace implements Serializable, IRecord { + private static final long serialVersionUID = 8589405631073291022L; + + private final TraceMetadata traceMetadata; + private final AbstractOperationEvent[] traceEvents; + private final RuntimeStatisticInformation runtimeInformation; + + public Trace(final TraceMetadata traceMetadata, + final AbstractOperationEvent[] traceEvents) { + this.traceMetadata = traceMetadata; + this.traceEvents = traceEvents; + runtimeInformation = new RuntimeStatisticInformation(1); // TODO } - @Override - public long getLoggingTimestamp() { - throw new UnsupportedOperationException(); + public TraceMetadata getTraceMetadata() { + return traceMetadata; } - @Override - public Class<?>[] getValueTypes() { - throw new UnsupportedOperationException(); + public AbstractOperationEvent[] getTraceOperations() { + return traceEvents; } @Override - public void initFromArray(final Object[] arg0) { - throw new UnsupportedOperationException(); + public String toString() { + final StringBuilder sb = new StringBuilder(64); + sb.append(super.toString()); + sb.append("\n\tTrace: "); + sb.append(traceMetadata); + for (final AbstractOperationEvent traceEvent : traceEvents) { + sb.append("\n\t"); + sb.append(traceEvent.getClass().getSimpleName()); + sb.append(": "); + sb.append(traceEvent); + } + sb.append("\n\t"); + sb.append(runtimeInformation.getClass().getSimpleName()); + sb.append(": "); + sb.append(runtimeInformation); + sb.append('\n'); + return sb.toString(); } @Override - public void setLoggingTimestamp(final long arg0) { - throw new UnsupportedOperationException(); + public int hashCode() { + final int prime = 31; + int result = 1; + result = (prime * result) + + ((traceMetadata == null) ? 0 : traceMetadata.hashCode()); // NOCS + // (?:) + result = (prime * result) + Arrays.hashCode(traceEvents); + return result; } @Override - public Object[] toArray() { - throw new UnsupportedOperationException(); + public boolean equals(final Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (this.getClass() != obj.getClass()) { + return false; + } + final Trace other = (Trace) obj; + if (traceMetadata == null) { + if (other.traceMetadata != null) { + return false; + } + } + else if (!traceMetadata.equals(other.traceMetadata)) { + return false; + } + if (!Arrays.equals(traceEvents, other.traceEvents)) { + return false; + } + return true; } - @Override - public int compareTo(final IMonitoringRecord o) { - throw new UnsupportedOperationException(); - } - - public long getTraceId() { - return traceId; + public RuntimeStatisticInformation getRuntime() { + return runtimeInformation; } - - public String getHostname() { - return hostname; - } - - public long getParentTraceId() { - return parentTraceId; - } - - public int getParentOrderId() { - return parentOrderId; - } - - public String getApplication() { - return application; - } - } diff --git a/src/explorviz/hpc_monitoring/record/TraceEventRecords.java b/src/explorviz/hpc_monitoring/record/TraceEventRecords.java deleted file mode 100644 index a42bab49d59adcc37cd127e8dfa5621eee5469c0..0000000000000000000000000000000000000000 --- a/src/explorviz/hpc_monitoring/record/TraceEventRecords.java +++ /dev/null @@ -1,113 +0,0 @@ -/*************************************************************************** - * Copyright 2013 Kieker Project (http://kieker-monitoring.net) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - ***************************************************************************/ - -package explorviz.hpc_monitoring.record; - -import java.io.Serializable; -import java.util.Arrays; -import explorviz.hpc_monitoring.record.events.AbstractOperationEvent; - -public final class TraceEventRecords implements Serializable { - private static final long serialVersionUID = 8589405631073291022L; - - private final Trace trace; - private final AbstractOperationEvent[] traceEvents; - private final RuntimeStatisticInformation runtimeInformation; - - /** - * Creates a new instance of this class using the given parameters. - * - * @param trace - * The trace to be stored in this object. - * @param traceEvents - * The trace events to be stored in this object. - */ - public TraceEventRecords(final Trace trace, - final AbstractOperationEvent[] traceEvents) { // NOPMD (stored - // directly) - this.trace = trace; - this.traceEvents = traceEvents; - runtimeInformation = new RuntimeStatisticInformation(1); // TODO - } - - public Trace getTrace() { - return trace; - } - - public AbstractOperationEvent[] getTraceOperations() { - return traceEvents; // NOPMD (internal array exposed) - } - - @Override - public String toString() { - final StringBuilder sb = new StringBuilder(64); - sb.append(super.toString()); - sb.append("\n\tTrace: "); - sb.append(trace); - for (final AbstractOperationEvent traceEvent : traceEvents) { - sb.append("\n\t"); - sb.append(traceEvent.getClass().getSimpleName()); - sb.append(": "); - sb.append(traceEvent); - } - sb.append("\n\t"); - sb.append(runtimeInformation.getClass().getSimpleName()); - sb.append(": "); - sb.append(runtimeInformation); - sb.append('\n'); - return sb.toString(); - } - - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = (prime * result) + ((trace == null) ? 0 : trace.hashCode()); // NOCS - // (?:) - result = (prime * result) + Arrays.hashCode(traceEvents); - return result; - } - - @Override - public boolean equals(final Object obj) { - if (this == obj) { - return true; - } - if (obj == null) { - return false; - } - if (this.getClass() != obj.getClass()) { - return false; - } - final TraceEventRecords other = (TraceEventRecords) obj; - if (trace == null) { - if (other.trace != null) { - return false; - } - } - else if (!trace.equals(other.trace)) { - return false; - } - if (!Arrays.equals(traceEvents, other.traceEvents)) { - return false; - } - return true; - } - - public RuntimeStatisticInformation getRuntime() { - return runtimeInformation; - } -} diff --git a/src/explorviz/hpc_monitoring/record/TraceMetadata.java b/src/explorviz/hpc_monitoring/record/TraceMetadata.java new file mode 100644 index 0000000000000000000000000000000000000000..0a26170e3d81bd8374f034c8f0067624ae998513 --- /dev/null +++ b/src/explorviz/hpc_monitoring/record/TraceMetadata.java @@ -0,0 +1,44 @@ +package explorviz.hpc_monitoring.record; + +public class TraceMetadata implements IRecord { + private final long traceId; + private final String hostname; + private final long parentTraceId; + private final int parentOrderId; + private final String application; + + public TraceMetadata(final long traceId, final String hostname, + final long parentTraceId, final int parentOrderId, + final String application) { + this.traceId = traceId; + this.hostname = hostname; + this.parentTraceId = parentTraceId; + this.parentOrderId = parentOrderId; + this.application = application; + } + + public long getLoggingTimestamp() { + throw new UnsupportedOperationException(); + } + + public long getTraceId() { + return traceId; + } + + public String getHostname() { + return hostname; + } + + public long getParentTraceId() { + return parentTraceId; + } + + public int getParentOrderId() { + return parentOrderId; + } + + public String getApplication() { + return application; + } + +} diff --git a/src/explorviz/hpc_monitoring/record/events/AbstractOperationEvent.java b/src/explorviz/hpc_monitoring/record/events/AbstractOperationEvent.java index 28953eff4bc6748c16fe9bef951c7f054773b85e..94580cfcb67f7643cae35941a5ff525e02a7eb28 100644 --- a/src/explorviz/hpc_monitoring/record/events/AbstractOperationEvent.java +++ b/src/explorviz/hpc_monitoring/record/events/AbstractOperationEvent.java @@ -1,19 +1,17 @@ package explorviz.hpc_monitoring.record.events; -import kieker.common.record.IMonitoringRecord; -import kieker.common.record.flow.IFlowRecord; +import explorviz.hpc_monitoring.record.IRecord; import explorviz.hpc_monitoring.record.RuntimeStatisticInformation; -public class AbstractOperationEvent implements IFlowRecord { - private static final long serialVersionUID = 1224383944280820758L; +public class AbstractOperationEvent implements IRecord { private final long timestamp; private final long traceId; private final int orderIndex; private final String operationSignature; private final RuntimeStatisticInformation runtimeStatisticInformation; - public AbstractOperationEvent(long timestamp, long traceId, int orderIndex, - String operationSignature) { + public AbstractOperationEvent(final long timestamp, final long traceId, + final int orderIndex, final String operationSignature) { super(); this.timestamp = timestamp; this.traceId = traceId; @@ -22,36 +20,10 @@ public class AbstractOperationEvent implements IFlowRecord { runtimeStatisticInformation = new RuntimeStatisticInformation(timestamp); } - @Override public long getLoggingTimestamp() { return timestamp; } - @Override - public Class<?>[] getValueTypes() { - throw new UnsupportedOperationException(); - } - - @Override - public void initFromArray(Object[] arg0) { - throw new UnsupportedOperationException(); - } - - @Override - public void setLoggingTimestamp(long arg0) { - throw new UnsupportedOperationException(); - } - - @Override - public Object[] toArray() { - throw new UnsupportedOperationException(); - } - - @Override - public int compareTo(IMonitoringRecord o) { - throw new UnsupportedOperationException(); - } - public long getTraceId() { return traceId; } diff --git a/src/explorviz/hpc_monitoring/record/events/AfterFailedOperationEvent.java b/src/explorviz/hpc_monitoring/record/events/AfterFailedOperationEvent.java deleted file mode 100644 index d78b9a0bccc5d79b78fed45d7fb4e8e0d2094668..0000000000000000000000000000000000000000 --- a/src/explorviz/hpc_monitoring/record/events/AfterFailedOperationEvent.java +++ /dev/null @@ -1,18 +0,0 @@ -package explorviz.hpc_monitoring.record.events; - -public class AfterFailedOperationEvent extends AbstractOperationEvent { - private static final long serialVersionUID = -7488112271381329123L; - - private final String cause; - - public AfterFailedOperationEvent(long timestamp, long traceId, - int orderIndex, String operationSignature, String cause) { - super(timestamp, traceId, orderIndex, operationSignature); - - this.cause = cause; - } - - public String getCause() { - return cause; - } -} diff --git a/src/explorviz/hpc_monitoring/record/events/AfterOperationEvent.java b/src/explorviz/hpc_monitoring/record/events/AfterOperationEvent.java deleted file mode 100644 index 4ccd1c6a0d6b17274226d262931a25013e3c2e74..0000000000000000000000000000000000000000 --- a/src/explorviz/hpc_monitoring/record/events/AfterOperationEvent.java +++ /dev/null @@ -1,10 +0,0 @@ -package explorviz.hpc_monitoring.record.events; - -public class AfterOperationEvent extends AbstractOperationEvent { - private static final long serialVersionUID = 6136529395371343882L; - - public AfterOperationEvent(long timestamp, long traceId, int orderIndex, - String operationSignature) { - super(timestamp, traceId, orderIndex, operationSignature); - } -} diff --git a/src/explorviz/hpc_monitoring/record/events/BeforeOperationEvent.java b/src/explorviz/hpc_monitoring/record/events/BeforeOperationEvent.java deleted file mode 100644 index 99cd35081b58f08c3a2581691bbf61254488c3be..0000000000000000000000000000000000000000 --- a/src/explorviz/hpc_monitoring/record/events/BeforeOperationEvent.java +++ /dev/null @@ -1,10 +0,0 @@ -package explorviz.hpc_monitoring.record.events; - -public class BeforeOperationEvent extends AbstractOperationEvent { - private static final long serialVersionUID = 1465085019211054059L; - - public BeforeOperationEvent(long timestamp, long traceId, int orderIndex, - String operationSignature) { - super(timestamp, traceId, orderIndex, operationSignature); - } -} diff --git a/src/explorviz/hpc_monitoring/record/events/normal/AfterFailedOperationEvent.java b/src/explorviz/hpc_monitoring/record/events/normal/AfterFailedOperationEvent.java new file mode 100644 index 0000000000000000000000000000000000000000..16bf14e6cf7fe90c50912ddc11b1afd7a1f5cbc4 --- /dev/null +++ b/src/explorviz/hpc_monitoring/record/events/normal/AfterFailedOperationEvent.java @@ -0,0 +1,19 @@ +package explorviz.hpc_monitoring.record.events.normal; + +import explorviz.hpc_monitoring.record.events.AbstractOperationEvent; + +public class AfterFailedOperationEvent extends AbstractOperationEvent { + private final String cause; + + public AfterFailedOperationEvent(final long timestamp, final long traceId, + final int orderIndex, final String operationSignature, + final String cause) { + super(timestamp, traceId, orderIndex, operationSignature); + + this.cause = cause; + } + + public String getCause() { + return cause; + } +} diff --git a/src/explorviz/hpc_monitoring/record/events/normal/AfterOperationEvent.java b/src/explorviz/hpc_monitoring/record/events/normal/AfterOperationEvent.java new file mode 100644 index 0000000000000000000000000000000000000000..453cf1865ef7aef89685b10509e069d9309b7ee1 --- /dev/null +++ b/src/explorviz/hpc_monitoring/record/events/normal/AfterOperationEvent.java @@ -0,0 +1,10 @@ +package explorviz.hpc_monitoring.record.events.normal; + +import explorviz.hpc_monitoring.record.events.AbstractOperationEvent; + +public class AfterOperationEvent extends AbstractOperationEvent { + public AfterOperationEvent(final long timestamp, final long traceId, + final int orderIndex, final String operationSignature) { + super(timestamp, traceId, orderIndex, operationSignature); + } +} diff --git a/src/explorviz/hpc_monitoring/record/events/normal/BeforeOperationEvent.java b/src/explorviz/hpc_monitoring/record/events/normal/BeforeOperationEvent.java new file mode 100644 index 0000000000000000000000000000000000000000..d4d18c115fb0422297bfa084c4d55c8faa69001e --- /dev/null +++ b/src/explorviz/hpc_monitoring/record/events/normal/BeforeOperationEvent.java @@ -0,0 +1,10 @@ +package explorviz.hpc_monitoring.record.events.normal; + +import explorviz.hpc_monitoring.record.events.AbstractOperationEvent; + +public class BeforeOperationEvent extends AbstractOperationEvent { + public BeforeOperationEvent(final long timestamp, final long traceId, + final int orderIndex, final String operationSignature) { + super(timestamp, traceId, orderIndex, operationSignature); + } +} diff --git a/src/explorviz/hpc_monitoring/record/events/remote/RemoteCallReceivedOperationEvent.java b/src/explorviz/hpc_monitoring/record/events/remote/RemoteCallReceivedOperationEvent.java new file mode 100644 index 0000000000000000000000000000000000000000..aea28222e7885dd28057a81416c43579e4b8a9d3 --- /dev/null +++ b/src/explorviz/hpc_monitoring/record/events/remote/RemoteCallReceivedOperationEvent.java @@ -0,0 +1,10 @@ +package explorviz.hpc_monitoring.record.events.remote; + +import explorviz.hpc_monitoring.record.events.AbstractOperationEvent; + +public class RemoteCallReceivedOperationEvent extends AbstractOperationEvent { + public RemoteCallReceivedOperationEvent(final long timestamp, final long traceId, + final int orderIndex, final String operationSignature) { + super(timestamp, traceId, orderIndex, operationSignature); + } +} diff --git a/src/explorviz/hpc_monitoring/record/events/remote/RemoteCallSentOperationEvent.java b/src/explorviz/hpc_monitoring/record/events/remote/RemoteCallSentOperationEvent.java new file mode 100644 index 0000000000000000000000000000000000000000..f5f89b07fb99d437e51a4f9771b57257f7775f08 --- /dev/null +++ b/src/explorviz/hpc_monitoring/record/events/remote/RemoteCallSentOperationEvent.java @@ -0,0 +1,10 @@ +package explorviz.hpc_monitoring.record.events.remote; + +import explorviz.hpc_monitoring.record.events.AbstractOperationEvent; + +public class RemoteCallSentOperationEvent extends AbstractOperationEvent { + public RemoteCallSentOperationEvent(final long timestamp, final long traceId, + final int orderIndex, final String operationSignature) { + super(timestamp, traceId, orderIndex, operationSignature); + } +} diff --git a/src/explorviz/worker/main/WorkerController.java b/src/explorviz/worker/main/WorkerController.java new file mode 100644 index 0000000000000000000000000000000000000000..0e138e4a158ebbc1d1171fd236ccbb708e7f9bc2 --- /dev/null +++ b/src/explorviz/worker/main/WorkerController.java @@ -0,0 +1,11 @@ +package explorviz.worker.main; + +import explorviz.hpc_monitoring.reader.TCPReader; + +public class WorkerController { + + public void start() { + final TCPReader tcpReader = new TCPReader(10133); + tcpReader.read(); + } +} diff --git a/src/explorviz/worker/main/WorkerController.xtend b/src/explorviz/worker/main/WorkerController.xtend deleted file mode 100644 index add695dcba2d226ef1b73c7ac49d7806af166a47..0000000000000000000000000000000000000000 --- a/src/explorviz/worker/main/WorkerController.xtend +++ /dev/null @@ -1,134 +0,0 @@ -package explorviz.worker.main - -import kieker.analysis.AnalysisController -import kieker.common.configuration.Configuration -import kieker.analysis.IAnalysisController -import kieker.analysis.plugin.filter.forward.TeeFilter - -import kieker.analysis.plugin.reader.timer.TimeReader -import explorviz.hpc_monitoring.filter.EventRecordTraceReconstructionFilter -import explorviz.hpc_monitoring.filter.TraceEventRecordAggregationFilter -import explorviz.hpc_monitoring.reader.TCPReader -import explorviz.hpc_monitoring.filter.CountingThroughputFilter -import explorviz.hpc_monitoring.connector.TCPConnector - -class WorkerController { - - var IAnalysisController analysisInstance - - def startWithCountingRecordsThroughput() { -// analysisInstance = new AnalysisController() - - val tcpReader = initTCPReader() - - tcpReader.read(); - -// val countingThroughputFilter = initCountingThroughputFilter() -// val teeFilter = initTeeFilter() -// analysisInstance.connect(tcpReader, TCPReader::OUTPUT_PORT_NAME_RECORDS, countingThroughputFilter, -// CountingThroughputFilter::INPUT_PORT_NAME_OBJECTS) -// -// analysisInstance.connect(countingThroughputFilter, -// CountingThroughputFilter::OUTPUT_PORT_NAME_THROUGHPUT, teeFilter, -// TeeFilter::INPUT_PORT_NAME_EVENTS) - } - - def startWithCountingTracesThroughput() { - analysisInstance = new AnalysisController() - - val tcpReader = initTCPReader() - - val eventTraceReconstructionFilter = initEventRecordTraceReconstructionFilter() - val countingThroughputFilter = initCountingThroughputFilter() - val teeFilter = initTeeFilter() - -// analysisInstance.connect(tcpReader, TCPReader::OUTPUT_PORT_NAME_RECORDS, eventTraceReconstructionFilter, -// EventRecordTraceReconstructionFilter::INPUT_PORT_NAME_TRACE_RECORDS) - -// analysisInstance.connect(eventTraceReconstructionFilter, -// EventRecordTraceReconstructionFilter::OUTPUT_PORT_NAME_TRACE_VALID, countingThroughputFilter, -// CountingThroughputFilter::INPUT_PORT_NAME_OBJECTS) -// -// analysisInstance.connect(countingThroughputFilter, -// CountingThroughputFilter::OUTPUT_PORT_NAME_THROUGHPUT, teeFilter, -// TeeFilter::INPUT_PORT_NAME_EVENTS) - - try { - analysisInstance.run() - } catch (Exception e) { - e.printStackTrace - } - } - - def startNormalWorker() { - analysisInstance = new AnalysisController() - - val tcpReader = initTCPReader() - val eventTraceReconstructionFilter = initEventRecordTraceReconstructionFilter() - - val aggregationFilter = initAggregationFilter() - - val timer = initTimer() - val tcpConnector = initTCPConnector() - -// analysisInstance.connect(tcpReader, TCPReader::OUTPUT_PORT_NAME_RECORDS, eventTraceReconstructionFilter, -// EventRecordTraceReconstructionFilter::INPUT_PORT_NAME_TRACE_RECORDS) - - analysisInstance.connect(eventTraceReconstructionFilter, - EventRecordTraceReconstructionFilter::OUTPUT_PORT_NAME_TRACE_VALID, aggregationFilter, - TraceEventRecordAggregationFilter::INPUT_PORT_NAME_TRACES) - analysisInstance.connect(eventTraceReconstructionFilter, - EventRecordTraceReconstructionFilter::OUTPUT_PORT_NAME_TRACE_INVALID, tcpConnector, - TCPConnector::INPUT_PORT_NAME_INVALID_TRACES) - - analysisInstance.connect(timer, TimeReader::OUTPUT_PORT_NAME_TIMESTAMPS, aggregationFilter, - TraceEventRecordAggregationFilter::INPUT_PORT_NAME_TIME_EVENT) - analysisInstance.connect(aggregationFilter, - TraceEventRecordAggregationFilter::OUTPUT_PORT_NAME_TRACES, tcpConnector, - TCPConnector::INPUT_PORT_NAME_VALID_TRACES) - - try { - analysisInstance.run() - } catch (Exception e) { - e.printStackTrace - } - } - - def initTCPReader() { - new TCPReader() - } - - def initEventRecordTraceReconstructionFilter() { - val config = new Configuration() - new EventRecordTraceReconstructionFilter(config, analysisInstance) - } - - def initAggregationFilter() { - val config = new Configuration() - new TraceEventRecordAggregationFilter(config, analysisInstance) - } - - def initTimer() { - val config = new Configuration() - config.setProperty(TimeReader::CONFIG_PROPERTY_VALUE_UPDATE_INTERVAL_NS, "1000000000") - config.setProperty(TimeReader::CONFIG_PROPERTY_VALUE_DELAY_NS, "0") - new TimeReader(config, analysisInstance) - } - - def initCountingThroughputFilter() { -// val config = new Configuration() -// config.setProperty(CountingThroughputFilter::CONFIG_PROPERTY_NAME_INTERVAL_SIZE, "1000000000") - new CountingThroughputFilter() - } - - def initTeeFilter() { - val config = new Configuration() - new TeeFilter(config, analysisInstance) - } - - def initTCPConnector() { - val config = new Configuration() - config.setProperty(TCPConnector::CONFIG_PROPERTY_NAME_PROVIDER, "127.0.0.1") - new TCPConnector(config, analysisInstance) - } -} diff --git a/src/explorviz/worker/main/WorkerStarter.java b/src/explorviz/worker/main/WorkerStarter.java index 7db7154cce523380f417fad2e7d3488cdd4a8257..3376029ff486be7460f6639ca0b60b11044328f1 100644 --- a/src/explorviz/worker/main/WorkerStarter.java +++ b/src/explorviz/worker/main/WorkerStarter.java @@ -3,6 +3,6 @@ package explorviz.worker.main; public class WorkerStarter { public static void main(String[] args) { - new WorkerController().startWithCountingRecordsThroughput(); + new WorkerController().start(); } }