diff --git a/src/explorviz/hpc_monitoring/reader/TCPReader.java b/src/explorviz/hpc_monitoring/reader/TCPReader.java index d5604815052962f91eac5eb77f7e9e4b857052b0..389c22c8011b05d6f91056140029f1c63e3571fd 100644 --- a/src/explorviz/hpc_monitoring/reader/TCPReader.java +++ b/src/explorviz/hpc_monitoring/reader/TCPReader.java @@ -30,7 +30,7 @@ import kieker.common.configuration.Configuration; import kieker.common.logging.Log; import kieker.common.logging.LogFactory; import kieker.common.record.IMonitoringRecord; -import explorviz.hpc_monitoring.Bits; +import explorviz.hpc_monitoring.UnsafeBits; import explorviz.hpc_monitoring.record.Trace; import explorviz.hpc_monitoring.record.events.*; @@ -210,7 +210,7 @@ public final class TCPReader extends AbstractReaderPlugin { return createUnreadBytesArray(b, readSize, offset, false); } - final int clazzId = Bits.getInt(b, offset); + final int clazzId = UnsafeBits.getInt(b, offset); offset += 4; switch (clazzId) { @@ -245,13 +245,13 @@ public final class TCPReader extends AbstractReaderPlugin { switch (clazzId) { case 0: { - final long traceId = Bits.getLong(b, offset); + final long traceId = UnsafeBits.getLong(b, offset); offset += 8; - final Integer hostnameId = Bits.getInt(b, offset); + final Integer hostnameId = UnsafeBits.getInt(b, offset); offset += 4; - final long parentTraceId = Bits.getLong(b, offset); + final long parentTraceId = UnsafeBits.getLong(b, offset); offset += 8; - final int parentOrderId = Bits.getInt(b, offset); + final int parentOrderId = UnsafeBits.getInt(b, offset); offset += 4; record = new Trace(traceId, @@ -260,13 +260,13 @@ public final class TCPReader extends AbstractReaderPlugin { break; } case 1: { - final long timestamp = Bits.getLong(b, offset); + final long timestamp = UnsafeBits.getLong(b, offset); offset += 8; - final long traceId = Bits.getLong(b, offset); + final long traceId = UnsafeBits.getLong(b, offset); offset += 8; - final int orderIndex = Bits.getInt(b, offset); + final int orderIndex = UnsafeBits.getInt(b, offset); offset += 4; - final Integer operationId = Bits.getInt(b, offset); + final Integer operationId = UnsafeBits.getInt(b, offset); offset += 4; record = new BeforeOperationEvent(timestamp, traceId, @@ -274,15 +274,15 @@ public final class TCPReader extends AbstractReaderPlugin { break; } case 2: { - final long timestamp = Bits.getLong(b, offset); + final long timestamp = UnsafeBits.getLong(b, offset); offset += 8; - final long traceId = Bits.getLong(b, offset); + final long traceId = UnsafeBits.getLong(b, offset); offset += 8; - final int orderIndex = Bits.getInt(b, offset); + final int orderIndex = UnsafeBits.getInt(b, offset); offset += 4; - final Integer operationId = Bits.getInt(b, offset); + final Integer operationId = UnsafeBits.getInt(b, offset); offset += 4; - final Integer causeId = Bits.getInt(b, offset); + final Integer causeId = UnsafeBits.getInt(b, offset); offset += 4; record = new AfterFailedOperationEvent(timestamp, traceId, @@ -291,13 +291,13 @@ public final class TCPReader extends AbstractReaderPlugin { break; } case 3: { - final long timestamp = Bits.getLong(b, offset); + final long timestamp = UnsafeBits.getLong(b, offset); offset += 8; - final long traceId = Bits.getLong(b, offset); + final long traceId = UnsafeBits.getLong(b, offset); offset += 8; - final int orderIndex = Bits.getInt(b, offset); + final int orderIndex = UnsafeBits.getInt(b, offset); offset += 4; - final Integer operationId = Bits.getInt(b, offset); + final Integer operationId = UnsafeBits.getInt(b, offset); offset += 4; record = new AfterOperationEvent(timestamp, traceId, @@ -305,9 +305,9 @@ public final class TCPReader extends AbstractReaderPlugin { break; } case 4: { - final Integer mapId = Bits.getInt(b, offset); + final Integer mapId = UnsafeBits.getInt(b, offset); offset += 4; - final int stringLength = Bits.getInt(b, offset); + final int stringLength = UnsafeBits.getInt(b, offset); offset += 4; if ((readSize - offset) < stringLength) { diff --git a/src/explorviz/worker/main/WorkerController.xtend b/src/explorviz/worker/main/WorkerController.xtend index fb8b78f836f794f3a1e8576ea86997ced7805f0f..05b43cafabd867a150fa4e90f88308bd4c682c84 100644 --- a/src/explorviz/worker/main/WorkerController.xtend +++ b/src/explorviz/worker/main/WorkerController.xtend @@ -19,24 +19,26 @@ class WorkerController { analysisInstance = new AnalysisController() val tcpReader = initTCPReader() - val eventTraceReconstructionFilter = initEventRecordTraceReconstructionFilter() - val aggregationFilter = initAggregationFilter() - val timer = initTimer() - val tcpConnector = initTCPConnector() - analysisInstance.connect(tcpReader, TCPReader::OUTPUT_PORT_NAME_RECORDS, eventTraceReconstructionFilter, - EventRecordTraceReconstructionFilter::INPUT_PORT_NAME_TRACE_RECORDS) + val countingThroughputFilter = initCountingThroughputFilter() + val teeFilter = initTeeFilter() + analysisInstance.connect(tcpReader, TCPReader::OUTPUT_PORT_NAME_RECORDS, countingThroughputFilter, + CountingThroughputFilter::INPUT_PORT_NAME_OBJECTS) + + analysisInstance.connect(countingThroughputFilter, + CountingThroughputFilter::OUTPUT_PORT_NAME_THROUGHPUT, teeFilter, + TeeFilter::INPUT_PORT_NAME_EVENTS) - analysisInstance.connect(eventTraceReconstructionFilter, - EventRecordTraceReconstructionFilter::OUTPUT_PORT_NAME_TRACE_VALID, aggregationFilter, - TraceEventRecordAggregationFilter::INPUT_PORT_NAME_TRACES) +// analysisInstance.connect(eventTraceReconstructionFilter, +// EventRecordTraceReconstructionFilter::OUTPUT_PORT_NAME_TRACE_VALID, aggregationFilter, +// TraceEventRecordAggregationFilter::INPUT_PORT_NAME_TRACES) // analysisInstance.connect(eventTraceReconstructionFilter, // EventRecordTraceReconstructionFilter::OUTPUT_PORT_NAME_TRACE_INVALID, tcpConnector, // RabbitMQConnector::INPUT_PORT_NAME_INVALID_TRACES) - - analysisInstance.connect(timer, TimeReader::OUTPUT_PORT_NAME_TIMESTAMPS, aggregationFilter, - TraceEventRecordAggregationFilter::INPUT_PORT_NAME_TIME_EVENT) +// +// analysisInstance.connect(timer, TimeReader::OUTPUT_PORT_NAME_TIMESTAMPS, aggregationFilter, +// TraceEventRecordAggregationFilter::INPUT_PORT_NAME_TIME_EVENT) // analysisInstance.connect(aggregationFilter, // TraceEventRecordAggregationFilter::OUTPUT_PORT_NAME_TRACES, tcpConnector, // RabbitMQConnector::INPUT_PORT_NAME_VALID_TRACES)