Skip to content
Snippets Groups Projects
Commit 54c6472b authored by Florian Fittkau's avatar Florian Fittkau
Browse files

method and trace counting

parent 96b57e0a
No related branches found
No related tags found
No related merge requests found
......@@ -11,5 +11,5 @@
</listAttribute>
<stringAttribute key="org.eclipse.jdt.launching.MAIN_TYPE" value="explorviz.live_trace_processing.main.WorkerStarter"/>
<stringAttribute key="org.eclipse.jdt.launching.PROJECT_ATTR" value="worker"/>
<stringAttribute key="org.eclipse.jdt.launching.VM_ARGUMENTS" value="-Xmx4G -Dexplorviz.live_trace_processing.worker_enabled=false -Dexplorviz.live_trace_processing.reader_listening_port=10133"/>
<stringAttribute key="org.eclipse.jdt.launching.VM_ARGUMENTS" value="-Xmx4G -Dexplorviz.live_trace_processing.worker_enabled=false -Dexplorviz.live_trace_processing.reader_listening_port=10134"/>
</launchConfiguration>
......@@ -12,8 +12,8 @@ import explorviz.live_trace_processing.record.trace.Trace;
public class RecordCountingFilter extends AbstractFilter implements IRecordCounting {
public RecordCountingFilter(final IPipeReceiver receiver) {
super(receiver, 16, 64, "Methodcalls/sec");
counter.setEnabled(false);
super(receiver, 16, 64, "MethodCalls/10 sec", 1000 * 10);
counter.setEnabled(true);
}
@Override
......
package explorviz.live_trace_processing.filter.counting;
import explorviz.live_trace_processing.filter.AbstractFilter;
import explorviz.live_trace_processing.filter.IPipeReceiver;
import explorviz.live_trace_processing.record.IRecord;
import explorviz.live_trace_processing.record.misc.TerminateRecord;
import explorviz.live_trace_processing.record.misc.TimedPeriodRecord;
import explorviz.live_trace_processing.record.trace.Trace;
public class TraceCountingFilter extends AbstractFilter implements IRecordCounting {
public TraceCountingFilter(final IPipeReceiver receiver) {
super(receiver, 16, 64, "TraceCalls/10 sec", 1000 * 10);
counter.setEnabled(true);
}
@Override
public void processRecord(final IRecord record) {
if (record instanceof Trace) {
final Trace trace = (Trace) record;
counter.inputObjectsCount(trace.getCalledTimes());
deliver(record);
} else if (record instanceof TimedPeriodRecord) {
periodicFlush(record);
// deliver(record);
} else if (record instanceof TerminateRecord) {
terminate();
deliver(record);
} else {
deliver(record);
}
}
private void terminate() {
}
}
......@@ -24,7 +24,7 @@ public final class TraceReconstructionFilter extends AbstractFilter implements I
public TraceReconstructionFilter(final long maxTraceTimeout,
final ITraceReduction traceReduction) {
super(traceReduction, Constants.TRACE_RECONSTRUCTION_DISRUPTOR_SIZE,
Constants.TRACE_RECONSTRUCTION_OUTPUT_BUFFER_SIZE, "Reconstructed traces/sec");
Constants.TRACE_RECONSTRUCTION_OUTPUT_BUFFER_SIZE, "Reconstructed traces/sec", 1000);
this.maxTraceTimeout = maxTraceTimeout;
}
......
......@@ -12,7 +12,7 @@ public abstract class AbstractReductionFilter extends AbstractFilter implements
public AbstractReductionFilter(final IPipeReceiver receiver, final String counterString) {
super(receiver, Constants.TRACE_SUMMARIZATION_DISRUPTOR_SIZE,
Constants.TRACE_SUMMARIZATION_OUTPUT_BUFFER_SIZE, counterString);
Constants.TRACE_SUMMARIZATION_OUTPUT_BUFFER_SIZE, counterString, 1000);
}
public Trace testReduction(final Trace trace) {
......
......@@ -27,7 +27,7 @@ public class TracesSummarizationFilter extends AbstractFilter implements ITraceR
public TracesSummarizationFilter(final long maxCollectionDuration,
final IPipeReceiver sinkReceiver) {
super(sinkReceiver, Constants.TRACE_SUMMARIZATION_DISRUPTOR_SIZE,
Constants.TRACE_SUMMARIZATION_OUTPUT_BUFFER_SIZE, "Reduced traces/sec");
Constants.TRACE_SUMMARIZATION_OUTPUT_BUFFER_SIZE, "Reduced traces/sec", 1000);
this.maxCollectionDuration = maxCollectionDuration;
}
......@@ -83,7 +83,7 @@ public class TracesSummarizationFilter extends AbstractFilter implements ITraceR
final AbstractBeforeOperationEventRecord abstractBeforeOperationEventRecord = (AbstractBeforeOperationEventRecord) abstractBeforeEventRecord;
abstractBeforeOperationEventRecord.getRuntimeStatisticInformation()
.makeAccumulator(abstractBeforeOperationEventRecord.getObjectId());
.makeAccumulator(abstractBeforeOperationEventRecord.getObjectId());
} else {
abstractBeforeEventRecord.getRuntimeStatisticInformation().makeAccumulator(0);
......
......@@ -4,7 +4,7 @@ import java.util.concurrent.TimeUnit;
import explorviz.live_trace_processing.configuration.Configuration;
import explorviz.live_trace_processing.configuration.ConfigurationFactory;
import explorviz.live_trace_processing.filter.ITraceSink;
import explorviz.live_trace_processing.filter.IPipeReceiver;
import explorviz.live_trace_processing.filter.reconstruction.ITraceReconstruction;
import explorviz.live_trace_processing.filter.reconstruction.TraceReconstructionFilter;
import explorviz.live_trace_processing.filter.reduction.ITraceReduction;
......@@ -13,9 +13,7 @@ import explorviz.live_trace_processing.reader.TCPReader;
public class FilterConfiguration {
public static void configureAndStartFilters(final Configuration configuration,
final ITraceSink sink) {
// final IRecordCounting recordCounting = new
// RecordCountingFilter(sink);
final IPipeReceiver sink) {
final ITraceReduction traceReduction = new TracesSummarizationFilter(
TimeUnit.MILLISECONDS.toNanos(990), sink);
......
......@@ -5,7 +5,9 @@ import java.io.IOException;
import explorviz.live_trace_processing.configuration.Configuration;
import explorviz.live_trace_processing.configuration.ConfigurationFactory;
import explorviz.live_trace_processing.connector.TCPConnector;
import explorviz.live_trace_processing.filter.ITraceSink;
import explorviz.live_trace_processing.filter.IPipeReceiver;
import explorviz.live_trace_processing.filter.counting.RecordCountingFilter;
import explorviz.live_trace_processing.filter.counting.TraceCountingFilter;
import explorviz.live_trace_processing.writer.load_balancer.LoadBalancer;
public class WorkerStarter {
......@@ -16,7 +18,7 @@ public class WorkerStarter {
final boolean isWorker = configuration
.getBooleanProperty(ConfigurationFactory.WORKER_ENABLED);
ITraceSink sink = null;
IPipeReceiver sink = null;
if (isWorker) {
final TCPConnector connector = new TCPConnector(
......@@ -26,6 +28,9 @@ public class WorkerStarter {
configureLoadBalancerIfEnabled(configuration, connector);
sink = connector;
} else {
final RecordCountingFilter recordCountingFilter = new RecordCountingFilter(sink);
sink = new TraceCountingFilter(recordCountingFilter);
}
FilterConfiguration.configureAndStartFilters(configuration, sink);
......@@ -42,9 +47,9 @@ public class WorkerStarter {
configuration.getIntProperty(ConfigurationFactory.LOAD_BALANCER_PORT, 9999),
configuration.getIntProperty(ConfigurationFactory.LOAD_BALANCER_WAIT_TIME,
20000),
configuration
configuration
.getStringProperty(ConfigurationFactory.LOAD_BALANCER_SCALING_GROUP),
tcpConnector);
tcpConnector);
} else {
try {
tcpConnector.connect();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment