From b1b8e899f664616fe1bc669608418d1b357a67eb Mon Sep 17 00:00:00 2001
From: Florian Fittkau <ffi@informatik.uni-kiel.de>
Date: Sun, 11 Aug 2013 23:41:23 +0200
Subject: [PATCH] UnsafeBits try

---
 .../hpc_monitoring/reader/TCPReader.java      | 42 +++++++++----------
 .../worker/main/WorkerController.xtend        | 26 ++++++------
 2 files changed, 35 insertions(+), 33 deletions(-)

diff --git a/src/explorviz/hpc_monitoring/reader/TCPReader.java b/src/explorviz/hpc_monitoring/reader/TCPReader.java
index d560481..389c22c 100644
--- a/src/explorviz/hpc_monitoring/reader/TCPReader.java
+++ b/src/explorviz/hpc_monitoring/reader/TCPReader.java
@@ -30,7 +30,7 @@ import kieker.common.configuration.Configuration;
 import kieker.common.logging.Log;
 import kieker.common.logging.LogFactory;
 import kieker.common.record.IMonitoringRecord;
-import explorviz.hpc_monitoring.Bits;
+import explorviz.hpc_monitoring.UnsafeBits;
 import explorviz.hpc_monitoring.record.Trace;
 import explorviz.hpc_monitoring.record.events.*;
 
@@ -210,7 +210,7 @@ public final class TCPReader extends AbstractReaderPlugin {
                 return createUnreadBytesArray(b, readSize, offset, false);
             }
 
-            final int clazzId = Bits.getInt(b, offset);
+            final int clazzId = UnsafeBits.getInt(b, offset);
             offset += 4;
 
             switch (clazzId) {
@@ -245,13 +245,13 @@ public final class TCPReader extends AbstractReaderPlugin {
 
             switch (clazzId) {
                 case 0: {
-                    final long traceId = Bits.getLong(b, offset);
+                    final long traceId = UnsafeBits.getLong(b, offset);
                     offset += 8;
-                    final Integer hostnameId = Bits.getInt(b, offset);
+                    final Integer hostnameId = UnsafeBits.getInt(b, offset);
                     offset += 4;
-                    final long parentTraceId = Bits.getLong(b, offset);
+                    final long parentTraceId = UnsafeBits.getLong(b, offset);
                     offset += 8;
-                    final int parentOrderId = Bits.getInt(b, offset);
+                    final int parentOrderId = UnsafeBits.getInt(b, offset);
                     offset += 4;
 
                     record = new Trace(traceId,
@@ -260,13 +260,13 @@ public final class TCPReader extends AbstractReaderPlugin {
                     break;
                 }
                 case 1: {
-                    final long timestamp = Bits.getLong(b, offset);
+                    final long timestamp = UnsafeBits.getLong(b, offset);
                     offset += 8;
-                    final long traceId = Bits.getLong(b, offset);
+                    final long traceId = UnsafeBits.getLong(b, offset);
                     offset += 8;
-                    final int orderIndex = Bits.getInt(b, offset);
+                    final int orderIndex = UnsafeBits.getInt(b, offset);
                     offset += 4;
-                    final Integer operationId = Bits.getInt(b, offset);
+                    final Integer operationId = UnsafeBits.getInt(b, offset);
                     offset += 4;
 
                     record = new BeforeOperationEvent(timestamp, traceId,
@@ -274,15 +274,15 @@ public final class TCPReader extends AbstractReaderPlugin {
                     break;
                 }
                 case 2: {
-                    final long timestamp = Bits.getLong(b, offset);
+                    final long timestamp = UnsafeBits.getLong(b, offset);
                     offset += 8;
-                    final long traceId = Bits.getLong(b, offset);
+                    final long traceId = UnsafeBits.getLong(b, offset);
                     offset += 8;
-                    final int orderIndex = Bits.getInt(b, offset);
+                    final int orderIndex = UnsafeBits.getInt(b, offset);
                     offset += 4;
-                    final Integer operationId = Bits.getInt(b, offset);
+                    final Integer operationId = UnsafeBits.getInt(b, offset);
                     offset += 4;
-                    final Integer causeId = Bits.getInt(b, offset);
+                    final Integer causeId = UnsafeBits.getInt(b, offset);
                     offset += 4;
 
                     record = new AfterFailedOperationEvent(timestamp, traceId,
@@ -291,13 +291,13 @@ public final class TCPReader extends AbstractReaderPlugin {
                     break;
                 }
                 case 3: {
-                    final long timestamp = Bits.getLong(b, offset);
+                    final long timestamp = UnsafeBits.getLong(b, offset);
                     offset += 8;
-                    final long traceId = Bits.getLong(b, offset);
+                    final long traceId = UnsafeBits.getLong(b, offset);
                     offset += 8;
-                    final int orderIndex = Bits.getInt(b, offset);
+                    final int orderIndex = UnsafeBits.getInt(b, offset);
                     offset += 4;
-                    final Integer operationId = Bits.getInt(b, offset);
+                    final Integer operationId = UnsafeBits.getInt(b, offset);
                     offset += 4;
 
                     record = new AfterOperationEvent(timestamp, traceId,
@@ -305,9 +305,9 @@ public final class TCPReader extends AbstractReaderPlugin {
                     break;
                 }
                 case 4: {
-                    final Integer mapId = Bits.getInt(b, offset);
+                    final Integer mapId = UnsafeBits.getInt(b, offset);
                     offset += 4;
-                    final int stringLength = Bits.getInt(b, offset);
+                    final int stringLength = UnsafeBits.getInt(b, offset);
                     offset += 4;
 
                     if ((readSize - offset) < stringLength) {
diff --git a/src/explorviz/worker/main/WorkerController.xtend b/src/explorviz/worker/main/WorkerController.xtend
index fb8b78f..05b43ca 100644
--- a/src/explorviz/worker/main/WorkerController.xtend
+++ b/src/explorviz/worker/main/WorkerController.xtend
@@ -19,24 +19,26 @@ class WorkerController {
         analysisInstance = new AnalysisController()
 
         val tcpReader = initTCPReader()
-        val eventTraceReconstructionFilter = initEventRecordTraceReconstructionFilter()
-        val aggregationFilter = initAggregationFilter()
-        val timer = initTimer()
-        val tcpConnector = initTCPConnector()
 
-        analysisInstance.connect(tcpReader, TCPReader::OUTPUT_PORT_NAME_RECORDS, eventTraceReconstructionFilter,
-            EventRecordTraceReconstructionFilter::INPUT_PORT_NAME_TRACE_RECORDS)
+                val countingThroughputFilter = initCountingThroughputFilter()
+                val teeFilter = initTeeFilter()
+                        analysisInstance.connect(tcpReader, TCPReader::OUTPUT_PORT_NAME_RECORDS, countingThroughputFilter,
+                    CountingThroughputFilter::INPUT_PORT_NAME_OBJECTS)
+        
+                analysisInstance.connect(countingThroughputFilter,
+                    CountingThroughputFilter::OUTPUT_PORT_NAME_THROUGHPUT, teeFilter,
+                    TeeFilter::INPUT_PORT_NAME_EVENTS)
 
-        analysisInstance.connect(eventTraceReconstructionFilter,
-            EventRecordTraceReconstructionFilter::OUTPUT_PORT_NAME_TRACE_VALID, aggregationFilter,
-            TraceEventRecordAggregationFilter::INPUT_PORT_NAME_TRACES)
+//        analysisInstance.connect(eventTraceReconstructionFilter,
+//            EventRecordTraceReconstructionFilter::OUTPUT_PORT_NAME_TRACE_VALID, aggregationFilter,
+//            TraceEventRecordAggregationFilter::INPUT_PORT_NAME_TRACES)
 
 //        analysisInstance.connect(eventTraceReconstructionFilter,
 //            EventRecordTraceReconstructionFilter::OUTPUT_PORT_NAME_TRACE_INVALID, tcpConnector,
 //            RabbitMQConnector::INPUT_PORT_NAME_INVALID_TRACES)
-
-        analysisInstance.connect(timer, TimeReader::OUTPUT_PORT_NAME_TIMESTAMPS, aggregationFilter,
-            TraceEventRecordAggregationFilter::INPUT_PORT_NAME_TIME_EVENT)
+//
+//        analysisInstance.connect(timer, TimeReader::OUTPUT_PORT_NAME_TIMESTAMPS, aggregationFilter,
+//            TraceEventRecordAggregationFilter::INPUT_PORT_NAME_TIME_EVENT)
 //        analysisInstance.connect(aggregationFilter,
 //            TraceEventRecordAggregationFilter::OUTPUT_PORT_NAME_TRACES, tcpConnector,
 //            RabbitMQConnector::INPUT_PORT_NAME_VALID_TRACES)
-- 
GitLab