Skip to content
Snippets Groups Projects
Commit b106588e authored by Christian Wulf's avatar Christian Wulf
Browse files

added performance analysis code in CollectorSink

parent 8b41ea42
No related branches found
No related tags found
No related merge requests found
...@@ -64,7 +64,7 @@ public class WorkerThread extends Thread { ...@@ -64,7 +64,7 @@ public class WorkerThread extends Thread {
while (this.stageScheduler.isAnyStageActive()) { while (this.stageScheduler.isAnyStageActive()) {
iterations++; iterations++;
// this.iterationStopWatch.start(); this.iterationStopWatch.start();
final IStage stage = this.stageScheduler.get(); final IStage stage = this.stageScheduler.get();
...@@ -77,13 +77,13 @@ public class WorkerThread extends Thread { ...@@ -77,13 +77,13 @@ public class WorkerThread extends Thread {
} }
this.stageScheduler.determineNextStage(stage, executedSuccessfully); this.stageScheduler.determineNextStage(stage, executedSuccessfully);
// this.iterationStopWatch.end(); this.iterationStopWatch.end();
// final long schedulingOverhead = this.iterationStopWatch.getDurationInNs() - stage.getLastDuration(); final long schedulingOverhead = this.iterationStopWatch.getDurationInNs() - stage.getLastDuration();
// schedulingOverheadInNs += schedulingOverhead; schedulingOverheadInNs += schedulingOverhead;
// if ((iterations % 10000) == 0) { if ((iterations % 10000) == 0) {
// this.schedulingOverheadsInNs.add(schedulingOverheadInNs); this.schedulingOverheadsInNs.add(schedulingOverheadInNs);
// schedulingOverheadInNs = 0; schedulingOverheadInNs = 0;
// } }
} }
this.stopWatch.end(); this.stopWatch.end();
......
...@@ -5,12 +5,9 @@ import java.util.List; ...@@ -5,12 +5,9 @@ import java.util.List;
public class Context<S extends IStage> { public class Context<S extends IStage> {
// private final Map<IPipe<Object>, List<Object>> pipesTakenFrom;
// private final Set<IStage> pipesPutTo = new HashSet<IStage>();
/** /**
* @author Christian Wulf * @author Christian Wulf
* *
* @since 1.10 * @since 1.10
*/ */
private static class InputPortContainer { private static class InputPortContainer {
...@@ -24,12 +21,11 @@ public class Context<S extends IStage> { ...@@ -24,12 +21,11 @@ public class Context<S extends IStage> {
private final IOutputPort<S, ?>[] outputPorts; private final IOutputPort<S, ?>[] outputPorts;
// statistics values // statistics values
private int numPushedElements = 0; private long numPushedElements = 0;
private int numTakenElements = 0; private long numTakenElements = 0;
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public Context(final IStage owningStage, final List<IInputPort<S, ?>> allTargetPorts) { public Context(final IStage owningStage, final List<IInputPort<S, ?>> allTargetPorts) {
// this.pipesTakenFrom = this.createPipeMap(allTargetPorts);
this.inputPortContainers = this.createInputPortLists(owningStage.getInputPorts()); this.inputPortContainers = this.createInputPortLists(owningStage.getInputPorts());
this.outputPorts = new IOutputPort[owningStage.getOutputPorts().size()]; this.outputPorts = new IOutputPort[owningStage.getOutputPorts().size()];
} }
...@@ -56,12 +52,11 @@ public class Context<S extends IStage> { ...@@ -56,12 +52,11 @@ public class Context<S extends IStage> {
associatedPipe.put(object); associatedPipe.put(object);
this.outputPorts[port.getIndex()] = port; this.outputPorts[port.getIndex()] = port;
// this.pipesPutTo.add(associatedPipe.getTargetPort().getOwningStage());
this.numPushedElements++; this.numPushedElements++;
} }
/** /**
* *
* @param inputPort * @param inputPort
* @return * @return
* @since 1.10 * @since 1.10
...@@ -76,7 +71,7 @@ public class Context<S extends IStage> { ...@@ -76,7 +71,7 @@ public class Context<S extends IStage> {
} }
/** /**
* *
* @param inputPort * @param inputPort
* @return * @return
* @since 1.10 * @since 1.10
...@@ -99,10 +94,10 @@ public class Context<S extends IStage> { ...@@ -99,10 +94,10 @@ public class Context<S extends IStage> {
} }
/** /**
* *
* @param inputPort * @param inputPort
* @return * @return
* *
* @since 1.10 * @since 1.10
*/ */
public <T> T read(final IInputPort<S, T> inputPort) { public <T> T read(final IInputPort<S, T> inputPort) {
......
...@@ -16,14 +16,16 @@ ...@@ -16,14 +16,16 @@
package teetime.stage; package teetime.stage;
import java.util.Collection; import java.util.Collection;
import java.util.concurrent.TimeUnit;
import teetime.framework.core.AbstractFilter; import teetime.framework.core.AbstractFilter;
import teetime.framework.core.Context; import teetime.framework.core.Context;
import teetime.framework.core.IInputPort; import teetime.framework.core.IInputPort;
import teetime.util.StopWatch;
/** /**
* @author Christian Wulf * @author Christian Wulf
* *
* @since 1.10 * @since 1.10
*/ */
public class CollectorSink<T> extends AbstractFilter<CollectorSink<T>> { public class CollectorSink<T> extends AbstractFilter<CollectorSink<T>> {
...@@ -51,10 +53,20 @@ public class CollectorSink<T> extends AbstractFilter<CollectorSink<T>> { ...@@ -51,10 +53,20 @@ public class CollectorSink<T> extends AbstractFilter<CollectorSink<T>> {
this.objects.add(object); this.objects.add(object);
if ((this.objects.size() % THRESHOLD) == 0) { if ((this.objects.size() % THRESHOLD) == 0) {
System.out.println("size: " + this.objects.size()); System.out.println("size: " + this.objects.size());
stopWatch.end();
System.out.println("duration: "+TimeUnit.NANOSECONDS.toMillis(stopWatch.getDurationInNs())+" ms");
stopWatch.start();
} }
return true; return true;
} }
StopWatch stopWatch=new StopWatch();
@Override
public void onPipelineStarts() throws Exception {
stopWatch.start();
super.onPipelineStarts();
}
public Collection<T> getObjects() { public Collection<T> getObjects() {
return this.objects; return this.objects;
......
...@@ -34,29 +34,13 @@ import teetime.util.StopWatch; ...@@ -34,29 +34,13 @@ import teetime.util.StopWatch;
*/ */
public class ThroughputTimestampAnalysisTest { public class ThroughputTimestampAnalysisTest {
private static final int NUM_OBJECTS_TO_CREATE = 50000; private static final int NUM_OBJECTS_TO_CREATE = 100000;
@Before @Before
public void before() { public void before() {
System.setProperty(LogFactory.CUSTOM_LOGGER_JVM, "NONE"); System.setProperty(LogFactory.CUSTOM_LOGGER_JVM, "NONE");
} }
// Using QueuePipes ist 1/3 faster than using MethodCallPipes
// reason:
/*
* MethodCallPipes:
* <ul>
* <li>SchedulingOverhead: 12629 ms
* <li>ExecutedUnsuccessfullyCount: 80300001
* </ul>
*
* QueuePipes:
* <ul>
* <li>SchedulingOverhead: 11337 ms
* <li>ExecutedUnsuccessfullyCount: 804
* </ul>
*/
@Test @Test
public void testWithManyObjects() { public void testWithManyObjects() {
final StopWatch stopWatch = new StopWatch(); final StopWatch stopWatch = new StopWatch();
...@@ -64,7 +48,7 @@ public class ThroughputTimestampAnalysisTest { ...@@ -64,7 +48,7 @@ public class ThroughputTimestampAnalysisTest {
final ThroughputTimestampAnalysis analysis = new ThroughputTimestampAnalysis(); final ThroughputTimestampAnalysis analysis = new ThroughputTimestampAnalysis();
analysis.setShouldUseQueue(true); analysis.setShouldUseQueue(true);
analysis.setNumNoopFilters(800); analysis.setNumNoopFilters(8);
analysis.setTimestampObjects(timestampObjects); analysis.setTimestampObjects(timestampObjects);
analysis.setInput(NUM_OBJECTS_TO_CREATE, new Callable<TimestampObject>() { analysis.setInput(NUM_OBJECTS_TO_CREATE, new Callable<TimestampObject>() {
@Override @Override
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment