Skip to content
Snippets Groups Projects
Commit 263fa128 authored by Christian Wulf's avatar Christian Wulf
Browse files

added dur from beginning of cache

parent 983102f3
No related branches found
No related tags found
No related merge requests found
.handlers = java.util.logging.ConsoleHandler
.level= ALL
.level = ALL
java.util.logging.ConsoleHandler.level = INFO
java.util.logging.ConsoleHandler.level = WARNING
#java.util.logging.ConsoleHandler.formatter = java.util.logging.SimpleFormatter
java.util.logging.SimpleFormatter.format=[%1$tF %1$tr] %4$s: %5$s (%2$s)%n
......
......@@ -17,6 +17,9 @@ package kieker.analysis.stage;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import teetime.util.StopWatch;
import kieker.analysis.IProjectContext;
import kieker.analysis.plugin.annotation.InputPort;
......@@ -45,9 +48,13 @@ public class CacheFilter extends AbstractFilterPlugin {
@Override
public void terminate(final boolean error) {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
for (final Object data : this.cache) {
super.deliver(EmptyPassOnFilter.OUTPUT_PORT_NAME, data);
}
stopWatch.end();
System.out.println("dur: " + TimeUnit.NANOSECONDS.toMillis(stopWatch.getDurationInNs()) + " ms");
super.terminate(error);
}
......
......@@ -20,7 +20,10 @@ public abstract class ConsumerStage<I, O> extends AbstractStage<I, O> {
@Override
public void executeWithPorts() {
this.logger.debug("Executing stage...");
// if (this.logger.isDebugEnabled()) {
// this.logger.debug("Executing stage...");
// }
I element = this.getInputPort().receive();
this.setReschedulable(this.getInputPort().getPipe().size() > 0);
......
......@@ -2,7 +2,9 @@ package teetime.variant.methodcallWithPorts.stage;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import teetime.util.StopWatch;
import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage;
public class Cache<T> extends ConsumerStage<T, T> {
......@@ -17,9 +19,13 @@ public class Cache<T> extends ConsumerStage<T, T> {
@Override
public void onIsPipelineHead() {
this.logger.debug("Emitting cached elements...");
StopWatch stopWatch = new StopWatch();
stopWatch.start();
for (T cachedElement : this.cachedObjects) {
this.send(cachedElement);
}
stopWatch.end();
System.out.println("dur: " + TimeUnit.NANOSECONDS.toMillis(stopWatch.getDurationInNs()) + " ms");
super.onIsPipelineHead();
}
......
......@@ -9,6 +9,7 @@ public class CountingFilter<T> extends ConsumerStage<T, T> {
@Override
protected void execute5(final T element) {
this.numElementsPassed++;
// this.logger.info("count: " + this.numElementsPassed);
this.send(element);
}
......
......@@ -35,12 +35,13 @@ public class ThroughputFilter<T> extends ConsumerStage<T, T> {
private void computeThroughput() {
long diffInNs = System.nanoTime() - this.timestamp;
// long diffInMs = TimeUnit.NANOSECONDS.toMillis(diffInNs);
// long throughputPerMs = this.numPassedElements / diffInMs;
long diffInSec = TimeUnit.NANOSECONDS.toSeconds(diffInNs);
long throughputPerSec = this.numPassedElements / diffInSec;
this.throughputs.add(throughputPerSec);
this.logger.info("Throughput: " + throughputPerSec + " elements/s");
long diffInMs = TimeUnit.NANOSECONDS.toMillis(diffInNs);
long throughputPerMs = this.numPassedElements / diffInMs;
this.throughputs.add(throughputPerMs);
// this.logger.info("Throughput: " + throughputPerMs + " elements/ms");
// long diffInSec = TimeUnit.NANOSECONDS.toSeconds(diffInNs);
// long throughputPerSec = this.numPassedElements / diffInSec;
}
private void resetTimestamp() {
......
......@@ -52,7 +52,9 @@ public class Merger<T> extends ConsumerStage<T, T> {
@Override
public void executeWithPorts() {
this.logger.debug("Executing stage...");
// if (this.logger.isDebugEnabled()) {
// this.logger.debug("Executing stage...");
// }
this.execute5(null);
......
......@@ -56,7 +56,7 @@ public class TraceReconstructionAnalysis extends Analysis {
private StageWithPort<Void, Long> buildClockPipeline() {
Clock clock = new Clock();
clock.setIntervalDelayInMs(50);
clock.setIntervalDelayInMs(100);
return clock;
}
......@@ -64,19 +64,12 @@ public class TraceReconstructionAnalysis extends Analysis {
private Pipeline<File, Void> buildPipeline(final StageWithPort<Void, Long> clockStage) {
this.classNameRegistryRepository = new ClassNameRegistryRepository();
// final IsIMonitoringRecordInRange isIMonitoringRecordInRange = new IsIMonitoringRecordInRange(0, 1000);
// final IsOperationExecutionRecordTraceIdPredicate isOperationExecutionRecordTraceIdPredicate = new IsOperationExecutionRecordTraceIdPredicate(
// false, null);
// create stages
final Dir2RecordsFilter dir2RecordsFilter = new Dir2RecordsFilter(this.classNameRegistryRepository);
this.recordCounter = new CountingFilter<IMonitoringRecord>();
final Cache<IMonitoringRecord> cache = new Cache<IMonitoringRecord>();
final StringBufferFilter<IMonitoringRecord> stringBufferFilter = new StringBufferFilter<IMonitoringRecord>();
// final PredicateFilter<IMonitoringRecord> timestampFilter = new PredicateFilter<IMonitoringRecord>(
// isIMonitoringRecordInRange);
// final PredicateFilter<OperationExecutionRecord> traceIdFilter = new PredicateFilter<OperationExecutionRecord>(
// isOperationExecutionRecordTraceIdPredicate);
final InstanceOfFilter<IMonitoringRecord, IFlowRecord> instanceOfFilter = new InstanceOfFilter<IMonitoringRecord, IFlowRecord>(
IFlowRecord.class);
this.throughputFilter = new ThroughputFilter<IFlowRecord>();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment