Skip to content
Snippets Groups Projects
Commit 3febc4d2 authored by Christian Wulf's avatar Christian Wulf
Browse files

optimized performance to the maximum by removing all StopWatch calls in

stages and by reducing calls in WorkerThread
parent 543d16d6
No related branches found
No related tags found
No related merge requests found
......@@ -20,7 +20,6 @@ import java.io.File;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import teetime.framework.concurrent.ConcurrentWorkStealingPipe;
import teetime.framework.concurrent.ConcurrentWorkStealingPipeFactory;
......@@ -43,7 +42,7 @@ import teetime.util.Pair;
/**
* @author Christian Wulf
*
*
* @since 1.10
*/
public class ConcurrentCountWordsAnalysis extends Analysis {
......@@ -155,21 +154,25 @@ public class ConcurrentCountWordsAnalysis extends Analysis {
repeaterSource.START.setAssociatedPipe(new MethodCallPipe<Boolean>(Boolean.TRUE));
final IPipeline pipeline = new IPipeline() {
@Override
@SuppressWarnings("unchecked")
public List<? extends IStage> getStartStages() {
return Arrays.asList(repeaterSource);
}
@Override
public List<IStage> getStages() {
return stages;
}
@Override
public void fireStartNotification() throws Exception {
for (final IStage stage : this.getStartStages()) {
stage.notifyPipelineStarts();
}
}
@Override
public void fireStopNotification() {
for (final IStage stage : this.getStartStages()) {
stage.notifyPipelineStops();
......@@ -205,21 +208,25 @@ public class ConcurrentCountWordsAnalysis extends Analysis {
SingleProducerSingleConsumerPipe.connect(merger.outputPort, printingMerger.getNewInputPort());
final IPipeline pipeline = new IPipeline() {
@Override
@SuppressWarnings("unchecked")
public List<? extends IStage> getStartStages() {
return Arrays.asList(distributor);
}
@Override
public List<IStage> getStages() {
return stages;
}
@Override
public void fireStartNotification() throws Exception {
for (final IStage stage : this.getStartStages()) {
stage.notifyPipelineStarts();
}
}
@Override
public void fireStopNotification() {
for (final IStage stage : this.getStartStages()) {
stage.notifyPipelineStops();
......@@ -246,21 +253,25 @@ public class ConcurrentCountWordsAnalysis extends Analysis {
QueuePipe.connect(merger.outputPort, outputWordsCountStage.fileWordcountTupleInputPort);
final IPipeline pipeline = new IPipeline() {
@Override
@SuppressWarnings("unchecked")
public List<? extends IStage> getStartStages() {
return Arrays.asList(merger);
}
@Override
public List<IStage> getStages() {
return stages;
}
@Override
public void fireStartNotification() throws Exception {
for (final IStage stage : this.getStartStages()) {
stage.notifyPipelineStarts();
}
}
@Override
public void fireStopNotification() {
for (final IStage stage : this.getStartStages()) {
stage.notifyPipelineStops();
......@@ -316,8 +327,8 @@ public class ConcurrentCountWordsAnalysis extends Analysis {
System.out.println(stage); // NOPMD (Just for example purposes)
}
final long durationInNs = thread.getDurationInNs();
System.out.println(thread + " takes " + TimeUnit.NANOSECONDS.toMillis(durationInNs) + " ms");
// final long durationInNs = thread.getDurationInNs();
// System.out.println(thread + " takes " + TimeUnit.NANOSECONDS.toMillis(durationInNs) + " ms");
}
for (final WorkerThread thread : this.nonIoThreads) {
......@@ -326,16 +337,16 @@ public class ConcurrentCountWordsAnalysis extends Analysis {
System.out.println(stage); // NOPMD (Just for example purposes)
}
final long durationInNs = thread.getDurationInNs();
System.out.println(thread + " takes " + TimeUnit.NANOSECONDS.toMillis(durationInNs) + " ms");
// final long durationInNs = thread.getDurationInNs();
// System.out.println(thread + " takes " + TimeUnit.NANOSECONDS.toMillis(durationInNs) + " ms");
if (durationInNs > maxDuration) {
maxDuration = durationInNs;
maxThread = thread;
}
// if (durationInNs > maxDuration) {
// maxDuration = durationInNs;
// maxThread = thread;
// }
}
System.out.println("maxThread: " + maxThread.toString() + " takes " + TimeUnit.NANOSECONDS.toMillis(maxDuration) + " ms"); // NOPMD (Just for example
// System.out.println("maxThread: " + maxThread.toString() + " takes " + TimeUnit.NANOSECONDS.toMillis(maxDuration) + " ms"); // NOPMD (Just for example
// purposes)
}
}
......@@ -19,7 +19,6 @@ import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import teetime.framework.concurrent.StageTerminationPolicy;
import teetime.framework.concurrent.WorkerThread;
......@@ -33,7 +32,7 @@ import teetime.stage.basic.ObjectProducer;
/**
* @author Christian Wulf
*
*
* @since 1.10
*/
public class ThroughputAnalysis<T> extends Analysis {
......@@ -101,7 +100,6 @@ public class ThroughputAnalysis<T> extends Analysis {
e.printStackTrace();
}
System.out.println("SchedulingOverhead: " + TimeUnit.NANOSECONDS.toMillis(this.workerThread.computeSchedulingOverheadInNs()) + " ms");
}
public int getNumNoopFilters() {
......
......@@ -131,15 +131,24 @@ public class ThroughputTimestampAnalysis extends Analysis {
e.printStackTrace();
}
final long schedulingOverheadInNs = this.workerThread.computeSchedulingOverheadInNs();
final int size = this.workerThread.getSchedulingOverheadsInNs().size();
System.out.println("scheduling overhead times: " + size);
if (size > 0) {
System.out.println("SchedulingOverhead: " + TimeUnit.NANOSECONDS.toMillis(schedulingOverheadInNs) + " ms");
System.out.println("avg overhead of iteration: "
+ TimeUnit.NANOSECONDS.toMillis(schedulingOverheadInNs * 2 / size) + " ms");
System.out.println("ExecutedUnsuccessfullyCount: " + this.workerThread.getExecutedUnsuccessfullyCount());
List<Long> durationPer10000IterationsInNs = workerThread.getDurationPer10000IterationsInNs();
long overallSumInNs = 0;
for (int i = 0; i < durationPer10000IterationsInNs.size(); i++) {
overallSumInNs += durationPer10000IterationsInNs.get(i);
}
long sumInNs = 0;
for (int i = durationPer10000IterationsInNs.size() / 2; i < durationPer10000IterationsInNs.size(); i++) {
sumInNs += durationPer10000IterationsInNs.get(i);
}
System.out.println("Thread iterations: " + workerThread.getIterations() + " times");
System.out.println("Thread execution time: " + TimeUnit.NANOSECONDS.toMillis(overallSumInNs) + " ms");
System.out.println("Thread half duration/iterations: " + sumInNs / (workerThread.getIterations() / 2)
+ " ns/iteration");
System.out.println("Thread unsuccessfully executed stages: " + workerThread.getExecutedUnsuccessfullyCount()
+ " times");
}
public int getNumNoopFilters() {
......
......@@ -19,18 +19,15 @@ package teetime.framework.concurrent;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import teetime.framework.core.IOutputPort;
import teetime.framework.core.IPipeline;
import teetime.framework.core.IStage;
import teetime.util.StopWatch;
/**
* @author Christian Wulf
*
*
* @since 1.10
*/
public class NextStageScheduler implements IStageScheduler {
......@@ -39,11 +36,6 @@ public class NextStageScheduler implements IStageScheduler {
private final Collection<IStage> highestPrioritizedEnabledStages = new ArrayList<IStage>();
private final IStageWorkList workList;
private final StopWatch stopWatch = new StopWatch();
private long durationInNs;
private int iterations;
private final List<Long> durations = new LinkedList<Long>();
public NextStageScheduler(final IPipeline pipeline, final int accessesDeviceId) throws Exception {
// this.workList = new StageWorkList(accessesDeviceId, pipeline.getStages().size());
this.workList = new StageWorkArrayList(pipeline, accessesDeviceId); // faster implementation
......@@ -93,10 +85,6 @@ public class NextStageScheduler implements IStageScheduler {
@Override
public void determineNextStage(final IStage stage, final boolean executedSuccessfully) {
this.iterations++;
this.stopWatch.start();
// final Collection<? extends IStage> outputStages = stage.getContext().getOutputStages();
final IOutputPort<?, ?>[] outputPorts = stage.getContext().getOutputPorts();
if (outputPorts.length > 0) {
......@@ -120,17 +108,5 @@ public class NextStageScheduler implements IStageScheduler {
if (this.workList.isEmpty()) {
this.workList.pushAll(this.highestPrioritizedEnabledStages);
}
this.stopWatch.end();
this.durationInNs += this.stopWatch.getDurationInNs();
if ((this.iterations % 10000) == 0) {
this.durations.add(this.durationInNs);
this.durationInNs = 0;
}
}
public List<Long> getDurations() {
return this.durations;
}
}
......@@ -38,13 +38,10 @@ public class WorkerThread extends Thread {
private final int accessesDeviceId;
private int executedUnsuccessfullyCount;
// statistics
private final StopWatch stopWatch = new StopWatch();
private final StopWatch iterationStopWatch = new StopWatch();
private final StopWatch beforeStageExecutionStopWatch = new StopWatch();
private final StopWatch afterStageExecutionStopWatch = new StopWatch();
private final StopWatch stageExecutionStopWatch = new StopWatch();
private final List<Long> schedulingOverheadsInNs = new LinkedList<Long>();
private long durationInNs;
private final List<Long> durationPer10000IterationsInNs = new LinkedList<Long>();
private int iterations;
public WorkerThread(final IPipeline pipeline, final int accessesDeviceId) {
this.pipeline = pipeline;
......@@ -62,63 +59,62 @@ public class WorkerThread extends Thread {
throw new IllegalStateException(e);
}
long iterations = 0;
long schedulingOverheadInNs = 0;
this.iterations = 0;
this.stopWatch.start();
while (this.stageScheduler.isAnyStageActive()) {
iterations++;
this.iterationStopWatch.start();
this.iterations++;
// this.iterationStopWatch.start();
// beforeStageExecutionStopWatch.start();
// beforeStageExecutionStopWatch.start();
final IStage stage = this.stageScheduler.get();
// beforeStageExecutionStopWatch.end();
// beforeStageExecutionStopWatch.end();
this.startStageExecution(stage);
stageExecutionStopWatch.start(); // expensive: takes 1/3 of overall time
// stageExecutionStopWatch.start(); // expensive: takes 1/3 of overall time
final boolean executedSuccessfully = stage.execute();
stageExecutionStopWatch.end();
// stageExecutionStopWatch.end();
this.finishStageExecution(stage, executedSuccessfully);
// afterStageExecutionStopWatch.start();
// afterStageExecutionStopWatch.start();
if (this.shouldTerminate) {
this.executeTerminationPolicy(stage, executedSuccessfully);
}
this.stageScheduler.determineNextStage(stage, executedSuccessfully);
// afterStageExecutionStopWatch.end();
// afterStageExecutionStopWatch.end();
this.iterationStopWatch.end();
final long schedulingOverhead = this.iterationStopWatch.getDurationInNs() - stageExecutionStopWatch.getDurationInNs(); //3198 ms
// final long schedulingOverhead = this.iterationStopWatch.getDurationInNs(); //3656 ms
// final long schedulingOverhead = beforeStageExecutionStopWatch.getDurationInNs(); //417 ms
// final long schedulingOverhead = stageExecutionStopWatch.getDurationInNs(); //503 ms
// final long schedulingOverhead = afterStageExecutionStopWatch.getDurationInNs(); //1214 ms
schedulingOverheadInNs += schedulingOverhead;
// this.iterationStopWatch.end();
// all stop watches are activated
// final long schedulingOverhead = this.iterationStopWatch.getDurationInNs() -
// stageExecutionStopWatch.getDurationInNs(); //4952
// 6268 -> 5350 (w/o after) -> 4450 (w/o before) -> 3800 (w/o stage)
// final long schedulingOverhead = this.iterationStopWatch.getDurationInNs();
// final long schedulingOverhead = beforeStageExecutionStopWatch.getDurationInNs(); //327
// final long schedulingOverhead = stageExecutionStopWatch.getDurationInNs(); //1416
// final long schedulingOverhead = afterStageExecutionStopWatch.getDurationInNs(); //2450
// rest: ~2000 (measurement overhead?)
if ((iterations % 10000) == 0) {
this.schedulingOverheadsInNs.add(schedulingOverheadInNs);
schedulingOverheadInNs = 0;
this.stopWatch.end();
this.durationPer10000IterationsInNs.add(stopWatch.getDurationInNs());
this.stopWatch.start();
}
}
this.stopWatch.end();
this.durationInNs = this.stopWatch.getDurationInNs();
final List<Long> durations = ((NextStageScheduler) this.stageScheduler).getDurations();
long overallDuration = 0;
for (int i = durations.size() / 2; i < durations.size(); i++) {
overallDuration += durations.get(i);
}
// System.out.println("Scheduler determine next stage (" + (durations.size() / 2) + "): " + TimeUnit.NANOSECONDS.toMillis(overallDuration) + " ms");
this.durationPer10000IterationsInNs.add(stopWatch.getDurationInNs());
this.cleanUpDatastructures();
}
private void executeTerminationPolicy(final IStage executedStage, final boolean executedSuccessfully) {
// System.out.println("WorkerThread.executeTerminationPolicy(): " + this.terminationPolicy + ", executedSuccessfully=" + executedSuccessfully
// System.out.println("WorkerThread.executeTerminationPolicy(): " + this.terminationPolicy +
// ", executedSuccessfully=" + executedSuccessfully
// + ", mayBeDisabled=" + executedStage.mayBeDisabled());
switch (this.terminationPolicy) {
......@@ -172,7 +168,8 @@ public class WorkerThread extends Thread {
return this.pipeline;
}
// BETTER remove this method since it is not intuitive; add a check to onStartPipeline so that a stage automatically disables itself if it has no input ports
// BETTER remove this method since it is not intuitive; add a check to onStartPipeline so that a stage automatically
// disables itself if it has no input ports
public void terminate(final StageTerminationPolicy terminationPolicyToUse) {
for (final IStage startStage : this.pipeline.getStartStages()) {
startStage.fireSignalClosingToAllInputPorts();
......@@ -195,31 +192,15 @@ public class WorkerThread extends Thread {
return this.executedUnsuccessfullyCount;
}
public List<Long> getSchedulingOverheadsInNs() {
return this.schedulingOverheadsInNs;
public List<Long> getDurationPer10000IterationsInNs() {
return durationPer10000IterationsInNs;
}
/**
* @since 1.10
*/
public long getDurationInNs() {
return this.durationInNs;
public int getIterations() {
return iterations;
}
/**
* Uses the last half of values to compute the scheduling overall overhead in ns
*
* @since 1.10
*/
public long computeSchedulingOverheadInNs() {
final int size = this.schedulingOverheadsInNs.size();
long schedulingOverheadInNs = 0;
for (int i = size / 2; i < size; i++) {
final Long iterationOverhead = this.schedulingOverheadsInNs.get(i);
schedulingOverheadInNs += iterationOverhead;
}
return schedulingOverheadInNs;
}
}
......@@ -20,7 +20,6 @@ import java.util.Collection;
import teetime.framework.core.AbstractFilter;
import teetime.framework.core.Context;
import teetime.framework.core.IInputPort;
import teetime.util.StopWatch;
/**
* @author Christian Wulf
......@@ -50,22 +49,13 @@ public class CollectorSink<T> extends AbstractFilter<CollectorSink<T>> {
}
this.objects.add(object);
if ((this.objects.size() % THRESHOLD) == 0) {
System.out.println("size: " + this.objects.size());
// stopWatch.end();
// System.out.println("duration: "+TimeUnit.NANOSECONDS.toMillis(stopWatch.getDurationInNs())+" ms");
// stopWatch.start();
}
return true;
}
StopWatch stopWatch=new StopWatch();
@Override
public void onPipelineStarts() throws Exception {
stopWatch.start();
super.onPipelineStarts();
}
public Collection<T> getObjects() {
return this.objects;
......
......@@ -39,6 +39,7 @@ import teetime.util.StopWatch;
public class ThroughputTimestampAnalysisTest {
private static final int NUM_OBJECTS_TO_CREATE = 100000;
private static final int NUM_NOOP_FILTERS = 800;
@Before
public void before() {
......@@ -47,11 +48,13 @@ public class ThroughputTimestampAnalysisTest {
@Test
public void testWithManyObjects() throws IllegalStateException, AnalysisConfigurationException {
System.out.println("Testing kieker with NUM_OBJECTS_TO_CREATE=" + NUM_OBJECTS_TO_CREATE + ", NUM_NOOP_FILTERS="
+ NUM_NOOP_FILTERS + "...");
final StopWatch stopWatch = new StopWatch();
final List<TimestampObject> timestampObjects = new ArrayList<TimestampObject>(NUM_OBJECTS_TO_CREATE);
final ThroughputTimestampAnalysis analysis = new ThroughputTimestampAnalysis();
analysis.setNumNoopFilters(800);
analysis.setNumNoopFilters(NUM_NOOP_FILTERS);
analysis.setTimestampObjects(timestampObjects);
analysis.setInput(NUM_OBJECTS_TO_CREATE, new Callable<TimestampObject>() {
@Override
......
......@@ -35,6 +35,7 @@ import teetime.util.StopWatch;
public class ThroughputTimestampAnalysisTest {
private static final int NUM_OBJECTS_TO_CREATE = 100000;
private static final int NUM_NOOP_FILTERS = 800;
@Before
public void before() {
......@@ -43,12 +44,14 @@ public class ThroughputTimestampAnalysisTest {
@Test
public void testWithManyObjects() {
System.out.println("Testing teetime with NUM_OBJECTS_TO_CREATE=" + NUM_OBJECTS_TO_CREATE + ", NUM_NOOP_FILTERS="
+ NUM_NOOP_FILTERS + "...");
final StopWatch stopWatch = new StopWatch();
final List<TimestampObject> timestampObjects = new ArrayList<TimestampObject>(NUM_OBJECTS_TO_CREATE);
final ThroughputTimestampAnalysis analysis = new ThroughputTimestampAnalysis();
analysis.setShouldUseQueue(true);
analysis.setNumNoopFilters(800); // 4+n
analysis.setNumNoopFilters(NUM_NOOP_FILTERS); // 4+n
analysis.setTimestampObjects(timestampObjects);
analysis.setInput(NUM_OBJECTS_TO_CREATE, new Callable<TimestampObject>() {
@Override
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment