Skip to content
Snippets Groups Projects
Commit 482bba80 authored by Christian Wulf's avatar Christian Wulf
Browse files

added performance analysis code in CollectorSink

parent 6e8e30f3
No related branches found
No related tags found
No related merge requests found
......@@ -64,7 +64,7 @@ public class WorkerThread extends Thread {
while (this.stageScheduler.isAnyStageActive()) {
iterations++;
// this.iterationStopWatch.start();
this.iterationStopWatch.start();
final IStage stage = this.stageScheduler.get();
......@@ -77,13 +77,13 @@ public class WorkerThread extends Thread {
}
this.stageScheduler.determineNextStage(stage, executedSuccessfully);
// this.iterationStopWatch.end();
// final long schedulingOverhead = this.iterationStopWatch.getDurationInNs() - stage.getLastDuration();
// schedulingOverheadInNs += schedulingOverhead;
// if ((iterations % 10000) == 0) {
// this.schedulingOverheadsInNs.add(schedulingOverheadInNs);
// schedulingOverheadInNs = 0;
// }
this.iterationStopWatch.end();
final long schedulingOverhead = this.iterationStopWatch.getDurationInNs() - stage.getLastDuration();
schedulingOverheadInNs += schedulingOverhead;
if ((iterations % 10000) == 0) {
this.schedulingOverheadsInNs.add(schedulingOverheadInNs);
schedulingOverheadInNs = 0;
}
}
this.stopWatch.end();
......
......@@ -5,9 +5,6 @@ import java.util.List;
public class Context<S extends IStage> {
// private final Map<IPipe<Object>, List<Object>> pipesTakenFrom;
// private final Set<IStage> pipesPutTo = new HashSet<IStage>();
/**
* @author Christian Wulf
*
......@@ -24,12 +21,11 @@ public class Context<S extends IStage> {
private final IOutputPort<S, ?>[] outputPorts;
// statistics values
private int numPushedElements = 0;
private int numTakenElements = 0;
private long numPushedElements = 0;
private long numTakenElements = 0;
@SuppressWarnings("unchecked")
public Context(final IStage owningStage, final List<IInputPort<S, ?>> allTargetPorts) {
// this.pipesTakenFrom = this.createPipeMap(allTargetPorts);
this.inputPortContainers = this.createInputPortLists(owningStage.getInputPorts());
this.outputPorts = new IOutputPort[owningStage.getOutputPorts().size()];
}
......@@ -56,7 +52,6 @@ public class Context<S extends IStage> {
associatedPipe.put(object);
this.outputPorts[port.getIndex()] = port;
// this.pipesPutTo.add(associatedPipe.getTargetPort().getOwningStage());
this.numPushedElements++;
}
......
......@@ -16,10 +16,12 @@
package teetime.stage;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import teetime.framework.core.AbstractFilter;
import teetime.framework.core.Context;
import teetime.framework.core.IInputPort;
import teetime.util.StopWatch;
/**
* @author Christian Wulf
......@@ -51,10 +53,20 @@ public class CollectorSink<T> extends AbstractFilter<CollectorSink<T>> {
this.objects.add(object);
if ((this.objects.size() % THRESHOLD) == 0) {
System.out.println("size: " + this.objects.size());
stopWatch.end();
System.out.println("duration: "+TimeUnit.NANOSECONDS.toMillis(stopWatch.getDurationInNs())+" ms");
stopWatch.start();
}
return true;
}
StopWatch stopWatch=new StopWatch();
@Override
public void onPipelineStarts() throws Exception {
stopWatch.start();
super.onPipelineStarts();
}
public Collection<T> getObjects() {
return this.objects;
......
......@@ -34,29 +34,13 @@ import teetime.util.StopWatch;
*/
public class ThroughputTimestampAnalysisTest {
private static final int NUM_OBJECTS_TO_CREATE = 50000;
private static final int NUM_OBJECTS_TO_CREATE = 100000;
@Before
public void before() {
System.setProperty(LogFactory.CUSTOM_LOGGER_JVM, "NONE");
}
// Using QueuePipes ist 1/3 faster than using MethodCallPipes
// reason:
/*
* MethodCallPipes:
* <ul>
* <li>SchedulingOverhead: 12629 ms
* <li>ExecutedUnsuccessfullyCount: 80300001
* </ul>
*
* QueuePipes:
* <ul>
* <li>SchedulingOverhead: 11337 ms
* <li>ExecutedUnsuccessfullyCount: 804
* </ul>
*/
@Test
public void testWithManyObjects() {
final StopWatch stopWatch = new StopWatch();
......@@ -64,7 +48,7 @@ public class ThroughputTimestampAnalysisTest {
final ThroughputTimestampAnalysis analysis = new ThroughputTimestampAnalysis();
analysis.setShouldUseQueue(true);
analysis.setNumNoopFilters(800);
analysis.setNumNoopFilters(8);
analysis.setTimestampObjects(timestampObjects);
analysis.setInput(NUM_OBJECTS_TO_CREATE, new Callable<TimestampObject>() {
@Override
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment