From 54c6472b7d73ab216a367673c35ced5e9934694c Mon Sep 17 00:00:00 2001
From: Florian Fittkau <ffi@informatik.uni-kiel.de>
Date: Fri, 13 Feb 2015 10:38:26 +0100
Subject: [PATCH] method and trace counting

---
 Start Master (testing only).launch            |  2 +-
 .../filter/counting/RecordCountingFilter.java |  4 +-
 .../filter/counting/TraceCountingFilter.java  | 37 +++++++++++++++++++
 .../TraceReconstructionFilter.java            |  2 +-
 .../reduction/AbstractReductionFilter.java    |  2 +-
 .../reduction/TracesSummarizationFilter.java  |  4 +-
 .../main/FilterConfiguration.java             |  6 +--
 .../main/WorkerStarter.java                   | 13 +++++--
 8 files changed, 55 insertions(+), 15 deletions(-)
 create mode 100644 src/explorviz/live_trace_processing/filter/counting/TraceCountingFilter.java

diff --git a/Start Master (testing only).launch b/Start Master (testing only).launch
index 92ef448..803bd3b 100644
--- a/Start Master (testing only).launch	
+++ b/Start Master (testing only).launch	
@@ -11,5 +11,5 @@
 </listAttribute>
 <stringAttribute key="org.eclipse.jdt.launching.MAIN_TYPE" value="explorviz.live_trace_processing.main.WorkerStarter"/>
 <stringAttribute key="org.eclipse.jdt.launching.PROJECT_ATTR" value="worker"/>
-<stringAttribute key="org.eclipse.jdt.launching.VM_ARGUMENTS" value="-Xmx4G -Dexplorviz.live_trace_processing.worker_enabled=false -Dexplorviz.live_trace_processing.reader_listening_port=10133"/>
+<stringAttribute key="org.eclipse.jdt.launching.VM_ARGUMENTS" value="-Xmx4G -Dexplorviz.live_trace_processing.worker_enabled=false -Dexplorviz.live_trace_processing.reader_listening_port=10134"/>
 </launchConfiguration>
diff --git a/src/explorviz/live_trace_processing/filter/counting/RecordCountingFilter.java b/src/explorviz/live_trace_processing/filter/counting/RecordCountingFilter.java
index 06034c7..eb46b44 100644
--- a/src/explorviz/live_trace_processing/filter/counting/RecordCountingFilter.java
+++ b/src/explorviz/live_trace_processing/filter/counting/RecordCountingFilter.java
@@ -12,8 +12,8 @@ import explorviz.live_trace_processing.record.trace.Trace;
 public class RecordCountingFilter extends AbstractFilter implements IRecordCounting {
 
 	public RecordCountingFilter(final IPipeReceiver receiver) {
-		super(receiver, 16, 64, "Methodcalls/sec");
-		counter.setEnabled(false);
+		super(receiver, 16, 64, "MethodCalls/10 sec", 1000 * 10);
+		counter.setEnabled(true);
 	}
 
 	@Override
diff --git a/src/explorviz/live_trace_processing/filter/counting/TraceCountingFilter.java b/src/explorviz/live_trace_processing/filter/counting/TraceCountingFilter.java
new file mode 100644
index 0000000..84e1aaa
--- /dev/null
+++ b/src/explorviz/live_trace_processing/filter/counting/TraceCountingFilter.java
@@ -0,0 +1,37 @@
+package explorviz.live_trace_processing.filter.counting;
+
+import explorviz.live_trace_processing.filter.AbstractFilter;
+import explorviz.live_trace_processing.filter.IPipeReceiver;
+import explorviz.live_trace_processing.record.IRecord;
+import explorviz.live_trace_processing.record.misc.TerminateRecord;
+import explorviz.live_trace_processing.record.misc.TimedPeriodRecord;
+import explorviz.live_trace_processing.record.trace.Trace;
+
+public class TraceCountingFilter extends AbstractFilter implements IRecordCounting {
+
+	public TraceCountingFilter(final IPipeReceiver receiver) {
+		super(receiver, 16, 64, "TraceCalls/10 sec", 1000 * 10);
+		counter.setEnabled(true);
+	}
+
+	@Override
+	public void processRecord(final IRecord record) {
+		if (record instanceof Trace) {
+			final Trace trace = (Trace) record;
+			counter.inputObjectsCount(trace.getCalledTimes());
+			deliver(record);
+		} else if (record instanceof TimedPeriodRecord) {
+			periodicFlush(record);
+			// deliver(record);
+		} else if (record instanceof TerminateRecord) {
+			terminate();
+			deliver(record);
+		} else {
+			deliver(record);
+		}
+	}
+
+	private void terminate() {
+
+	}
+}
diff --git a/src/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionFilter.java b/src/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionFilter.java
index cb5a78a..9b08c63 100644
--- a/src/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionFilter.java
+++ b/src/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionFilter.java
@@ -24,7 +24,7 @@ public final class TraceReconstructionFilter extends AbstractFilter implements I
 	public TraceReconstructionFilter(final long maxTraceTimeout,
 			final ITraceReduction traceReduction) {
 		super(traceReduction, Constants.TRACE_RECONSTRUCTION_DISRUPTOR_SIZE,
-				Constants.TRACE_RECONSTRUCTION_OUTPUT_BUFFER_SIZE, "Reconstructed traces/sec");
+				Constants.TRACE_RECONSTRUCTION_OUTPUT_BUFFER_SIZE, "Reconstructed traces/sec", 1000);
 		this.maxTraceTimeout = maxTraceTimeout;
 	}
 
diff --git a/src/explorviz/live_trace_processing/filter/reduction/AbstractReductionFilter.java b/src/explorviz/live_trace_processing/filter/reduction/AbstractReductionFilter.java
index f12f7dc..b99392e 100644
--- a/src/explorviz/live_trace_processing/filter/reduction/AbstractReductionFilter.java
+++ b/src/explorviz/live_trace_processing/filter/reduction/AbstractReductionFilter.java
@@ -12,7 +12,7 @@ public abstract class AbstractReductionFilter extends AbstractFilter implements
 
 	public AbstractReductionFilter(final IPipeReceiver receiver, final String counterString) {
 		super(receiver, Constants.TRACE_SUMMARIZATION_DISRUPTOR_SIZE,
-				Constants.TRACE_SUMMARIZATION_OUTPUT_BUFFER_SIZE, counterString);
+				Constants.TRACE_SUMMARIZATION_OUTPUT_BUFFER_SIZE, counterString, 1000);
 	}
 
 	public Trace testReduction(final Trace trace) {
diff --git a/src/explorviz/live_trace_processing/filter/reduction/TracesSummarizationFilter.java b/src/explorviz/live_trace_processing/filter/reduction/TracesSummarizationFilter.java
index c5bcd3e..d87b91c 100644
--- a/src/explorviz/live_trace_processing/filter/reduction/TracesSummarizationFilter.java
+++ b/src/explorviz/live_trace_processing/filter/reduction/TracesSummarizationFilter.java
@@ -27,7 +27,7 @@ public class TracesSummarizationFilter extends AbstractFilter implements ITraceR
 	public TracesSummarizationFilter(final long maxCollectionDuration,
 			final IPipeReceiver sinkReceiver) {
 		super(sinkReceiver, Constants.TRACE_SUMMARIZATION_DISRUPTOR_SIZE,
-				Constants.TRACE_SUMMARIZATION_OUTPUT_BUFFER_SIZE, "Reduced traces/sec");
+				Constants.TRACE_SUMMARIZATION_OUTPUT_BUFFER_SIZE, "Reduced traces/sec", 1000);
 
 		this.maxCollectionDuration = maxCollectionDuration;
 	}
@@ -83,7 +83,7 @@ public class TracesSummarizationFilter extends AbstractFilter implements ITraceR
 					final AbstractBeforeOperationEventRecord abstractBeforeOperationEventRecord = (AbstractBeforeOperationEventRecord) abstractBeforeEventRecord;
 
 					abstractBeforeOperationEventRecord.getRuntimeStatisticInformation()
-					.makeAccumulator(abstractBeforeOperationEventRecord.getObjectId());
+							.makeAccumulator(abstractBeforeOperationEventRecord.getObjectId());
 				} else {
 					abstractBeforeEventRecord.getRuntimeStatisticInformation().makeAccumulator(0);
 
diff --git a/src/explorviz/live_trace_processing/main/FilterConfiguration.java b/src/explorviz/live_trace_processing/main/FilterConfiguration.java
index 61b94e2..af2623e 100644
--- a/src/explorviz/live_trace_processing/main/FilterConfiguration.java
+++ b/src/explorviz/live_trace_processing/main/FilterConfiguration.java
@@ -4,7 +4,7 @@ import java.util.concurrent.TimeUnit;
 
 import explorviz.live_trace_processing.configuration.Configuration;
 import explorviz.live_trace_processing.configuration.ConfigurationFactory;
-import explorviz.live_trace_processing.filter.ITraceSink;
+import explorviz.live_trace_processing.filter.IPipeReceiver;
 import explorviz.live_trace_processing.filter.reconstruction.ITraceReconstruction;
 import explorviz.live_trace_processing.filter.reconstruction.TraceReconstructionFilter;
 import explorviz.live_trace_processing.filter.reduction.ITraceReduction;
@@ -13,9 +13,7 @@ import explorviz.live_trace_processing.reader.TCPReader;
 
 public class FilterConfiguration {
 	public static void configureAndStartFilters(final Configuration configuration,
-			final ITraceSink sink) {
-		// final IRecordCounting recordCounting = new
-		// RecordCountingFilter(sink);
+			final IPipeReceiver sink) {
 
 		final ITraceReduction traceReduction = new TracesSummarizationFilter(
 				TimeUnit.MILLISECONDS.toNanos(990), sink);
diff --git a/src/explorviz/live_trace_processing/main/WorkerStarter.java b/src/explorviz/live_trace_processing/main/WorkerStarter.java
index c9e6fbe..4ba23be 100644
--- a/src/explorviz/live_trace_processing/main/WorkerStarter.java
+++ b/src/explorviz/live_trace_processing/main/WorkerStarter.java
@@ -5,7 +5,9 @@ import java.io.IOException;
 import explorviz.live_trace_processing.configuration.Configuration;
 import explorviz.live_trace_processing.configuration.ConfigurationFactory;
 import explorviz.live_trace_processing.connector.TCPConnector;
-import explorviz.live_trace_processing.filter.ITraceSink;
+import explorviz.live_trace_processing.filter.IPipeReceiver;
+import explorviz.live_trace_processing.filter.counting.RecordCountingFilter;
+import explorviz.live_trace_processing.filter.counting.TraceCountingFilter;
 import explorviz.live_trace_processing.writer.load_balancer.LoadBalancer;
 
 public class WorkerStarter {
@@ -16,7 +18,7 @@ public class WorkerStarter {
 		final boolean isWorker = configuration
 				.getBooleanProperty(ConfigurationFactory.WORKER_ENABLED);
 
-		ITraceSink sink = null;
+		IPipeReceiver sink = null;
 
 		if (isWorker) {
 			final TCPConnector connector = new TCPConnector(
@@ -26,6 +28,9 @@ public class WorkerStarter {
 
 			configureLoadBalancerIfEnabled(configuration, connector);
 			sink = connector;
+		} else {
+			final RecordCountingFilter recordCountingFilter = new RecordCountingFilter(sink);
+			sink = new TraceCountingFilter(recordCountingFilter);
 		}
 
 		FilterConfiguration.configureAndStartFilters(configuration, sink);
@@ -42,9 +47,9 @@ public class WorkerStarter {
 					configuration.getIntProperty(ConfigurationFactory.LOAD_BALANCER_PORT, 9999),
 					configuration.getIntProperty(ConfigurationFactory.LOAD_BALANCER_WAIT_TIME,
 							20000),
-					configuration
+							configuration
 							.getStringProperty(ConfigurationFactory.LOAD_BALANCER_SCALING_GROUP),
-					tcpConnector);
+							tcpConnector);
 		} else {
 			try {
 				tcpConnector.connect();
-- 
GitLab