diff --git a/Start Master (testing only).launch b/Start Master (testing only).launch index 92ef4480469f95b69dddcabcc9929f542185459e..803bd3b4486e8ea4853f474e0ca60b1012026170 100644 --- a/Start Master (testing only).launch +++ b/Start Master (testing only).launch @@ -11,5 +11,5 @@ </listAttribute> <stringAttribute key="org.eclipse.jdt.launching.MAIN_TYPE" value="explorviz.live_trace_processing.main.WorkerStarter"/> <stringAttribute key="org.eclipse.jdt.launching.PROJECT_ATTR" value="worker"/> -<stringAttribute key="org.eclipse.jdt.launching.VM_ARGUMENTS" value="-Xmx4G -Dexplorviz.live_trace_processing.worker_enabled=false -Dexplorviz.live_trace_processing.reader_listening_port=10133"/> +<stringAttribute key="org.eclipse.jdt.launching.VM_ARGUMENTS" value="-Xmx4G -Dexplorviz.live_trace_processing.worker_enabled=false -Dexplorviz.live_trace_processing.reader_listening_port=10134"/> </launchConfiguration> diff --git a/src/explorviz/live_trace_processing/filter/counting/RecordCountingFilter.java b/src/explorviz/live_trace_processing/filter/counting/RecordCountingFilter.java index 06034c7f0551768beb0d8b12abac3e30662b687b..eb46b447c470173a00e2d0119ca3d137af6906d6 100644 --- a/src/explorviz/live_trace_processing/filter/counting/RecordCountingFilter.java +++ b/src/explorviz/live_trace_processing/filter/counting/RecordCountingFilter.java @@ -12,8 +12,8 @@ import explorviz.live_trace_processing.record.trace.Trace; public class RecordCountingFilter extends AbstractFilter implements IRecordCounting { public RecordCountingFilter(final IPipeReceiver receiver) { - super(receiver, 16, 64, "Methodcalls/sec"); - counter.setEnabled(false); + super(receiver, 16, 64, "MethodCalls/10 sec", 1000 * 10); + counter.setEnabled(true); } @Override diff --git a/src/explorviz/live_trace_processing/filter/counting/TraceCountingFilter.java b/src/explorviz/live_trace_processing/filter/counting/TraceCountingFilter.java new file mode 100644 index 0000000000000000000000000000000000000000..84e1aaa2c23c5bf26e34e7822f1fa0dcc7329cdf --- /dev/null +++ b/src/explorviz/live_trace_processing/filter/counting/TraceCountingFilter.java @@ -0,0 +1,37 @@ +package explorviz.live_trace_processing.filter.counting; + +import explorviz.live_trace_processing.filter.AbstractFilter; +import explorviz.live_trace_processing.filter.IPipeReceiver; +import explorviz.live_trace_processing.record.IRecord; +import explorviz.live_trace_processing.record.misc.TerminateRecord; +import explorviz.live_trace_processing.record.misc.TimedPeriodRecord; +import explorviz.live_trace_processing.record.trace.Trace; + +public class TraceCountingFilter extends AbstractFilter implements IRecordCounting { + + public TraceCountingFilter(final IPipeReceiver receiver) { + super(receiver, 16, 64, "TraceCalls/10 sec", 1000 * 10); + counter.setEnabled(true); + } + + @Override + public void processRecord(final IRecord record) { + if (record instanceof Trace) { + final Trace trace = (Trace) record; + counter.inputObjectsCount(trace.getCalledTimes()); + deliver(record); + } else if (record instanceof TimedPeriodRecord) { + periodicFlush(record); + // deliver(record); + } else if (record instanceof TerminateRecord) { + terminate(); + deliver(record); + } else { + deliver(record); + } + } + + private void terminate() { + + } +} diff --git a/src/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionFilter.java b/src/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionFilter.java index cb5a78a41ad4c02dc82d1523cefa413a4904e673..9b08c63dd26d9859d71eb8d912ad190020f4c85d 100644 --- a/src/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionFilter.java +++ b/src/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionFilter.java @@ -24,7 +24,7 @@ public final class TraceReconstructionFilter extends AbstractFilter implements I public TraceReconstructionFilter(final long maxTraceTimeout, final ITraceReduction traceReduction) { super(traceReduction, Constants.TRACE_RECONSTRUCTION_DISRUPTOR_SIZE, - Constants.TRACE_RECONSTRUCTION_OUTPUT_BUFFER_SIZE, "Reconstructed traces/sec"); + Constants.TRACE_RECONSTRUCTION_OUTPUT_BUFFER_SIZE, "Reconstructed traces/sec", 1000); this.maxTraceTimeout = maxTraceTimeout; } diff --git a/src/explorviz/live_trace_processing/filter/reduction/AbstractReductionFilter.java b/src/explorviz/live_trace_processing/filter/reduction/AbstractReductionFilter.java index f12f7dc1487939a896677788864f8c9304809456..b99392e8097192a943ddede9c94be42839a8fc1d 100644 --- a/src/explorviz/live_trace_processing/filter/reduction/AbstractReductionFilter.java +++ b/src/explorviz/live_trace_processing/filter/reduction/AbstractReductionFilter.java @@ -12,7 +12,7 @@ public abstract class AbstractReductionFilter extends AbstractFilter implements public AbstractReductionFilter(final IPipeReceiver receiver, final String counterString) { super(receiver, Constants.TRACE_SUMMARIZATION_DISRUPTOR_SIZE, - Constants.TRACE_SUMMARIZATION_OUTPUT_BUFFER_SIZE, counterString); + Constants.TRACE_SUMMARIZATION_OUTPUT_BUFFER_SIZE, counterString, 1000); } public Trace testReduction(final Trace trace) { diff --git a/src/explorviz/live_trace_processing/filter/reduction/TracesSummarizationFilter.java b/src/explorviz/live_trace_processing/filter/reduction/TracesSummarizationFilter.java index c5bcd3e7ca3ce7e40731a3f00bb4756320b3f358..d87b91cc079062599f7be5529087e76428aa7616 100644 --- a/src/explorviz/live_trace_processing/filter/reduction/TracesSummarizationFilter.java +++ b/src/explorviz/live_trace_processing/filter/reduction/TracesSummarizationFilter.java @@ -27,7 +27,7 @@ public class TracesSummarizationFilter extends AbstractFilter implements ITraceR public TracesSummarizationFilter(final long maxCollectionDuration, final IPipeReceiver sinkReceiver) { super(sinkReceiver, Constants.TRACE_SUMMARIZATION_DISRUPTOR_SIZE, - Constants.TRACE_SUMMARIZATION_OUTPUT_BUFFER_SIZE, "Reduced traces/sec"); + Constants.TRACE_SUMMARIZATION_OUTPUT_BUFFER_SIZE, "Reduced traces/sec", 1000); this.maxCollectionDuration = maxCollectionDuration; } @@ -83,7 +83,7 @@ public class TracesSummarizationFilter extends AbstractFilter implements ITraceR final AbstractBeforeOperationEventRecord abstractBeforeOperationEventRecord = (AbstractBeforeOperationEventRecord) abstractBeforeEventRecord; abstractBeforeOperationEventRecord.getRuntimeStatisticInformation() - .makeAccumulator(abstractBeforeOperationEventRecord.getObjectId()); + .makeAccumulator(abstractBeforeOperationEventRecord.getObjectId()); } else { abstractBeforeEventRecord.getRuntimeStatisticInformation().makeAccumulator(0); diff --git a/src/explorviz/live_trace_processing/main/FilterConfiguration.java b/src/explorviz/live_trace_processing/main/FilterConfiguration.java index 61b94e29c1a9996a31cb6b06f745435b613cc967..af2623eee1586506b6986070f1452257535976e7 100644 --- a/src/explorviz/live_trace_processing/main/FilterConfiguration.java +++ b/src/explorviz/live_trace_processing/main/FilterConfiguration.java @@ -4,7 +4,7 @@ import java.util.concurrent.TimeUnit; import explorviz.live_trace_processing.configuration.Configuration; import explorviz.live_trace_processing.configuration.ConfigurationFactory; -import explorviz.live_trace_processing.filter.ITraceSink; +import explorviz.live_trace_processing.filter.IPipeReceiver; import explorviz.live_trace_processing.filter.reconstruction.ITraceReconstruction; import explorviz.live_trace_processing.filter.reconstruction.TraceReconstructionFilter; import explorviz.live_trace_processing.filter.reduction.ITraceReduction; @@ -13,9 +13,7 @@ import explorviz.live_trace_processing.reader.TCPReader; public class FilterConfiguration { public static void configureAndStartFilters(final Configuration configuration, - final ITraceSink sink) { - // final IRecordCounting recordCounting = new - // RecordCountingFilter(sink); + final IPipeReceiver sink) { final ITraceReduction traceReduction = new TracesSummarizationFilter( TimeUnit.MILLISECONDS.toNanos(990), sink); diff --git a/src/explorviz/live_trace_processing/main/WorkerStarter.java b/src/explorviz/live_trace_processing/main/WorkerStarter.java index c9e6fbed54f52cc6dd892f16b34edab957062384..4ba23be55d5abf84504603023b8f2639871baa0d 100644 --- a/src/explorviz/live_trace_processing/main/WorkerStarter.java +++ b/src/explorviz/live_trace_processing/main/WorkerStarter.java @@ -5,7 +5,9 @@ import java.io.IOException; import explorviz.live_trace_processing.configuration.Configuration; import explorviz.live_trace_processing.configuration.ConfigurationFactory; import explorviz.live_trace_processing.connector.TCPConnector; -import explorviz.live_trace_processing.filter.ITraceSink; +import explorviz.live_trace_processing.filter.IPipeReceiver; +import explorviz.live_trace_processing.filter.counting.RecordCountingFilter; +import explorviz.live_trace_processing.filter.counting.TraceCountingFilter; import explorviz.live_trace_processing.writer.load_balancer.LoadBalancer; public class WorkerStarter { @@ -16,7 +18,7 @@ public class WorkerStarter { final boolean isWorker = configuration .getBooleanProperty(ConfigurationFactory.WORKER_ENABLED); - ITraceSink sink = null; + IPipeReceiver sink = null; if (isWorker) { final TCPConnector connector = new TCPConnector( @@ -26,6 +28,9 @@ public class WorkerStarter { configureLoadBalancerIfEnabled(configuration, connector); sink = connector; + } else { + final RecordCountingFilter recordCountingFilter = new RecordCountingFilter(sink); + sink = new TraceCountingFilter(recordCountingFilter); } FilterConfiguration.configureAndStartFilters(configuration, sink); @@ -42,9 +47,9 @@ public class WorkerStarter { configuration.getIntProperty(ConfigurationFactory.LOAD_BALANCER_PORT, 9999), configuration.getIntProperty(ConfigurationFactory.LOAD_BALANCER_WAIT_TIME, 20000), - configuration + configuration .getStringProperty(ConfigurationFactory.LOAD_BALANCER_SCALING_GROUP), - tcpConnector); + tcpConnector); } else { try { tcpConnector.connect();