Skip to content
Snippets Groups Projects
Commit d4eb7172 authored by Christian Wulf's avatar Christian Wulf
Browse files

worked on the different performance results

parent b106588e
No related branches found
No related tags found
No related merge requests found
Showing
with 68 additions and 63 deletions
......@@ -33,7 +33,7 @@ import teetime.util.Pair;
/**
* @author Christian Wulf
*
*
* @since 1.10
*/
public class CountWordsAnalysis extends Analysis {
......@@ -100,21 +100,25 @@ public class CountWordsAnalysis extends Analysis {
repeaterSource.START.setAssociatedPipe(new MethodCallPipe<Boolean>(Boolean.TRUE));
final IPipeline pipeline = new IPipeline() {
@Override
@SuppressWarnings("unchecked")
public List<? extends AbstractFilter<?>> getStartStages() {
return Arrays.asList(repeaterSource);
}
@Override
public List<IStage> getStages() {
return stages;
}
@Override
public void fireStartNotification() throws Exception {
for (final IStage stage : this.getStartStages()) {
stage.notifyPipelineStarts();
}
}
@Override
public void fireStopNotification() {
for (final IStage stage : this.getStartStages()) {
stage.notifyPipelineStops();
......@@ -137,7 +141,7 @@ public class CountWordsAnalysis extends Analysis {
for (final IStage stage : analysis.pipeline.getStages()) {
if (stage instanceof AbstractFilter<?>) {
System.out.println(stage.getClass().getName() + ": " + ((AbstractFilter<?>) stage).getOverallDurationInNs()); // NOPMD (Just for example purposes)
// System.out.println(stage.getClass().getName() + ": " + ((AbstractFilter<?>) stage).getOverallDurationInNs()); // NOPMD (Just for example purposes)
}
}
......
......@@ -36,7 +36,7 @@ import teetime.util.Pair;
/**
* @author Christian Wulf
*
*
* @since 1.10
*/
public class QueuedCountWordsAnalysis extends Analysis {
......@@ -102,21 +102,25 @@ public class QueuedCountWordsAnalysis extends Analysis {
repeaterSource.START.setAssociatedPipe(new MethodCallPipe<Boolean>(Boolean.TRUE));
final IPipeline pipeline = new IPipeline() {
@Override
@SuppressWarnings("unchecked")
public List<? extends IStage> getStartStages() {
return Arrays.asList(repeaterSource);
}
@Override
public List<IStage> getStages() {
return stages;
}
@Override
public void fireStartNotification() throws Exception {
for (final IStage stage : this.getStartStages()) {
stage.notifyPipelineStarts();
}
}
@Override
public void fireStopNotification() {
for (final IStage stage : this.getStartStages()) {
stage.notifyPipelineStops();
......@@ -145,7 +149,7 @@ public class QueuedCountWordsAnalysis extends Analysis {
for (final IStage stage : pipeline.getStages()) {
if (stage instanceof AbstractFilter<?>) {
System.out.println(stage.getClass().getName() + ": " + ((AbstractFilter<?>) stage).getOverallDurationInNs()); // NOPMD (Just for example purposes)
// System.out.println(stage.getClass().getName() + ": " + ((AbstractFilter<?>) stage).getOverallDurationInNs()); // NOPMD (Just for example purposes)
}
}
......
......@@ -33,7 +33,7 @@ import teetime.stage.composite.CycledCountingFilter;
/**
* @author Christian Wulf
*
*
* @since 1.10
*/
public class CountingObjectsAnalysis extends Analysis {
......@@ -69,21 +69,25 @@ public class CountingObjectsAnalysis extends Analysis {
repeaterSource.START.setAssociatedPipe(new MethodCallPipe<Boolean>(Boolean.TRUE));
final IPipeline pipeline = new IPipeline() {
@Override
@SuppressWarnings("unchecked")
public List<? extends IStage> getStartStages() {
return Arrays.asList(repeaterSource);
}
@Override
public List<IStage> getStages() {
return stages;
}
@Override
public void fireStartNotification() throws Exception {
for (final IStage stage : this.getStartStages()) {
stage.notifyPipelineStarts();
}
}
@Override
public void fireStopNotification() {
for (final IStage stage : this.getStartStages()) {
stage.notifyPipelineStops();
......@@ -127,7 +131,7 @@ public class CountingObjectsAnalysis extends Analysis {
for (final IStage stage : analysis.pipeline.getStages()) {
if (stage instanceof AbstractFilter<?>) {
System.out.println(stage.getClass().getName() + ": " + ((AbstractFilter<?>) stage).getOverallDurationInNs()); // NOPMD (Just for example purposes)
// System.out.println(stage.getClass().getName() + ": " + ((AbstractFilter<?>) stage).getOverallDurationInNs()); // NOPMD (Just for example purposes)
}
}
......
......@@ -40,7 +40,7 @@ import teetime.stage.kieker.className.ClassNameRegistryRepository;
/**
* @author Christian Wulf
*
*
* @since 1.10
*/
public class RecordReaderAnalysis extends Analysis {
......@@ -152,7 +152,7 @@ public class RecordReaderAnalysis extends Analysis {
for (final IStage stage : pipeline.getStages()) {
if (stage instanceof AbstractFilter<?>) {
System.out.println(stage.getClass().getName() + ": " + ((AbstractFilter<?>) stage).getOverallDurationInNs()); // NOPMD (Just for example purposes)
// System.out.println(stage.getClass().getName() + ": " + ((AbstractFilter<?>) stage).getOverallDurationInNs()); // NOPMD (Just for example purposes)
}
}
}
......
......@@ -37,8 +37,10 @@ public class WorkerThread extends Thread {
private volatile boolean shouldTerminate = false;
private final int accessesDeviceId;
private int executedUnsuccessfullyCount;
private final StopWatch stopWatch = new StopWatch();
private final StopWatch iterationStopWatch = new StopWatch();
private final StopWatch stageExecutionStopWatch = new StopWatch();
private final List<Long> schedulingOverheadsInNs = new LinkedList<Long>();
private long durationInNs;
......@@ -69,7 +71,9 @@ public class WorkerThread extends Thread {
final IStage stage = this.stageScheduler.get();
this.startStageExecution(stage);
// stageExecutionStopWatch.start(); // expensive: takes 1/3 of overall time
final boolean executedSuccessfully = stage.execute();
// stageExecutionStopWatch.end();
this.finishStageExecution(stage, executedSuccessfully);
if (this.shouldTerminate) {
......@@ -78,7 +82,7 @@ public class WorkerThread extends Thread {
this.stageScheduler.determineNextStage(stage, executedSuccessfully);
this.iterationStopWatch.end();
final long schedulingOverhead = this.iterationStopWatch.getDurationInNs() - stage.getLastDuration();
final long schedulingOverhead = this.iterationStopWatch.getDurationInNs() - stageExecutionStopWatch.getDurationInNs();
schedulingOverheadInNs += schedulingOverhead;
if ((iterations % 10000) == 0) {
this.schedulingOverheadsInNs.add(schedulingOverheadInNs);
......
......@@ -22,18 +22,17 @@ import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import teetime.util.StopWatch;
import teetime.util.concurrent.workstealing.exception.DequePopException;
/**
*
*
* @author Christian Wulf
*
*
* @since 1.10
*
*
* @param <S>
* the extending stage
*
*
*/
public abstract class AbstractFilter<S extends IStage> extends AbstractStage implements ISink<S>, ISource, IPortListener<S> {
......@@ -41,7 +40,7 @@ public abstract class AbstractFilter<S extends IStage> extends AbstractStage imp
/**
* @author Christian Wulf
*
*
* @since 1.10
*/
public enum StageState {
......@@ -66,12 +65,14 @@ public abstract class AbstractFilter<S extends IStage> extends AbstractStage imp
private Context<S> context;
private final IPipeCommand closeCommand = new IPipeCommand() {
@Override
public void execute(final IPipe<?> pipe) throws Exception {
pipe.close();
}
};
private final IPipeCommand pipelineStartsCommand = new IPipeCommand() {
@Override
public void execute(final IPipe<?> pipe) throws Exception {
pipe.notifyPipelineStarts();
}
......@@ -83,20 +84,18 @@ public abstract class AbstractFilter<S extends IStage> extends AbstractStage imp
*/
private int accessesDeviceId = 0;
private final StopWatch stopWatch = new StopWatch();
private long overallDurationInNs = 0;
private long lastDuration;
@Override
public int getAccessesDeviceId() {
return this.accessesDeviceId;
}
@Override
public void setAccessesDeviceId(final int accessesDeviceId) {
this.accessesDeviceId = accessesDeviceId;
}
// BETTER return a limited context that allows "read" only
@Override
public Context<S> getContext() {
return this.context;
}
......@@ -104,10 +103,11 @@ public abstract class AbstractFilter<S extends IStage> extends AbstractStage imp
/**
* @since 1.10
*/
@Override
public final boolean execute() {
boolean success = false;
try {
success = this.executeLogged(this.context);
success = this.execute(this.context);
if (success) { // deprecated boolean return value
this.context.clear();
} else {
......@@ -121,20 +121,9 @@ public abstract class AbstractFilter<S extends IStage> extends AbstractStage imp
return success;
}
private boolean executeLogged(final Context<S> context) {
this.stopWatch.start();
try {
final boolean success = this.execute(context);
return success;
} finally {
this.stopWatch.end();
this.lastDuration = this.stopWatch.getDurationInNs();
this.overallDurationInNs += this.lastDuration;
}
}
protected abstract boolean execute(Context<S> context);
@Override
public final void notifyPipelineStarts() throws Exception {
if (this.state == StageState.UNINITIALIZED) {
this.state = StageState.PIPELINE_STARTED;
......@@ -145,7 +134,7 @@ public abstract class AbstractFilter<S extends IStage> extends AbstractStage imp
/**
* This method is called exactly once iff the pipeline is started.
*
*
* @throws Exception
* @since 1.10
*/
......@@ -156,6 +145,7 @@ public abstract class AbstractFilter<S extends IStage> extends AbstractStage imp
/**
* @since 1.10
*/
@Override
public void notifyOutputPipes(final IPipeCommand pipeCommand) throws Exception {
for (final IOutputPort<S, ?> outputPort : this.readOnlyOutputPorts) {
final IPipe<?> associatedPipe = outputPort.getAssociatedPipe();
......@@ -165,6 +155,7 @@ public abstract class AbstractFilter<S extends IStage> extends AbstractStage imp
}
}
@Override
public final void notifyPipelineStops() {
if (this.state != StageState.PIPELINE_STOPPED) {
this.state = StageState.PIPELINE_STOPPED;
......@@ -174,7 +165,7 @@ public abstract class AbstractFilter<S extends IStage> extends AbstractStage imp
/**
* This method is called exactly once iff the pipeline is stopped.
*
*
* @since 1.10
*/
public void onPipelineStops() {
......@@ -184,6 +175,7 @@ public abstract class AbstractFilter<S extends IStage> extends AbstractStage imp
/**
* @since 1.10
*/
@Override
public void onPortIsClosed(final IInputPort<S, ?> inputPort) {
// inputPort.setState(IInputPort.State.CLOSING);
this.enabledInputPorts--;
......@@ -209,6 +201,7 @@ public abstract class AbstractFilter<S extends IStage> extends AbstractStage imp
/**
* @since 1.10
*/
@Override
public void fireSignalClosingToAllInputPorts() {
// this.logger.info("Fire closing signal to all input ports of: " + this);
......@@ -224,6 +217,7 @@ public abstract class AbstractFilter<S extends IStage> extends AbstractStage imp
/**
* @since 1.10
*/
@Override
public void fireSignalClosingToAllOutputPorts() {
try {
this.notifyOutputPipes(this.closeCommand);
......@@ -235,6 +229,7 @@ public abstract class AbstractFilter<S extends IStage> extends AbstractStage imp
/**
* @since 1.10
*/
@Override
public boolean mayBeDisabled() {
return this.mayBeDisabled;
}
......@@ -275,6 +270,7 @@ public abstract class AbstractFilter<S extends IStage> extends AbstractStage imp
/**
* @since 1.10
*/
@Override
@SuppressWarnings("unchecked")
public List<IInputPort<S, ?>> getInputPorts() {
return this.readOnlyInputPorts;
......@@ -283,6 +279,7 @@ public abstract class AbstractFilter<S extends IStage> extends AbstractStage imp
/**
* @since 1.10
*/
@Override
@SuppressWarnings("unchecked")
public List<IOutputPort<S, ?>> getOutputPorts() {
return this.readOnlyOutputPorts;
......@@ -300,34 +297,32 @@ public abstract class AbstractFilter<S extends IStage> extends AbstractStage imp
return outputStages;
}
@Override
public IInputPort<?, ?> getInputPortByIndex(final int index) {
return this.readOnlyInputPorts.get(index);
}
@Override
public IOutputPort<?, ?> getOutputPortByIndex(final int index) {
return this.readOnlyOutputPorts.get(index);
}
public long getOverallDurationInNs() {
return this.overallDurationInNs;
}
public long getLastDuration() {
return this.lastDuration;
}
@Override
public int getDepth() {
return this.depth;
}
@Override
public void setDepth(final int depth) {
this.depth = depth;
}
@Override
public int getSchedulingIndex() {
return this.schedulingIndex;
}
@Override
public void setSchedulingIndex(final int schedulingIndex) {
this.schedulingIndex = schedulingIndex;
}
......
......@@ -3,6 +3,11 @@ package teetime.framework.core;
import java.util.ArrayList;
import java.util.List;
/**
* @author Christian Wulf
*
* @since 1.10
*/
public class Context<S extends IStage> {
/**
......
......@@ -21,7 +21,7 @@ import java.util.List;
/**
* @author Christian Wulf
*
*
* @since 1.10
*/
public interface IStage extends IBaseStage {
......@@ -36,11 +36,11 @@ public interface IStage extends IBaseStage {
/**
* @return <code>true</code> if the execution took enough tokens from the input ports so that the stage made progress due to this execution, <code>false</code>
* otherwise. The definition of <i>progress</i> depends on the semantics of the particular stage.
*
*
* <p>
* Example usage:
* </p>
*
*
* <pre>
* <code>
* boolean execute() {
......@@ -53,7 +53,7 @@ public interface IStage extends IBaseStage {
* }
* </code>
* </pre>
*
*
* @since 1.10
*/
boolean execute();
......@@ -70,9 +70,9 @@ public interface IStage extends IBaseStage {
// void execute(TaskBundle taskBundle);
/**
*
*
* @return <code>true</code> if the stage may be disabled by the pipeline scheduler, <code>false</code> otherwise.
*
*
* @since 1.10
*/
boolean mayBeDisabled();
......@@ -99,7 +99,7 @@ public interface IStage extends IBaseStage {
/**
* <i>Hint: Used by the scheduler</i>
*
*
* @since 1.10
*/
Context<?> getContext();
......@@ -126,7 +126,7 @@ public interface IStage extends IBaseStage {
/**
* <i>Hint: Only needed by stage schedulers.</i>
*
*
* @return
*/
public Collection<? extends IStage> getAllOutputStages();
......@@ -145,11 +145,6 @@ public interface IStage extends IBaseStage {
void setOwningThread(Thread owningThread);
/**
* @since 1.10
*/
long getLastDuration();
/**
* @since 1.10
*/
......
......@@ -47,23 +47,17 @@ public class StatisticsUtil {
System.out.println("Duration: " + TimeUnit.NANOSECONDS.toMillis(overallDurationInNs) + " ms");
final List<Long> sortedDurationsInNs = new ArrayList<Long>(timestampObjects.size() / 2);
long minDurationInNs = Long.MAX_VALUE;
long maxDurationInNs = Long.MIN_VALUE;
long sumInNs = 0;
for (int i = timestampObjects.size() / 2; i < timestampObjects.size(); i++) {
final TimestampObject timestampObject = timestampObjects.get(i);
final long durationInNs = timestampObject.getStopTimestamp() - timestampObject.getStartTimestamp();
// sortedDurationsInNs.set(i - (timestampObjects.size() / 2), durationInNs);
sortedDurationsInNs.add(durationInNs);
minDurationInNs = Math.min(durationInNs, minDurationInNs);
maxDurationInNs = Math.max(durationInNs, maxDurationInNs);
sumInNs += durationInNs;
}
final Map<Double, Long> quintileValues = StatisticsUtil.calculateQuintiles(sortedDurationsInNs);
System.out.println("min: " + TimeUnit.NANOSECONDS.toMicros(minDurationInNs) + " µs");
System.out.println("max: " + TimeUnit.NANOSECONDS.toMicros(maxDurationInNs) + " µs");
final long avgDurInNs = sumInNs / (timestampObjects.size() / 2);
System.out.println("avg duration: " + TimeUnit.NANOSECONDS.toMicros(avgDurInNs) + " µs");
......
......@@ -48,7 +48,7 @@ public class ThroughputTimestampAnalysisTest {
final ThroughputTimestampAnalysis analysis = new ThroughputTimestampAnalysis();
analysis.setShouldUseQueue(true);
analysis.setNumNoopFilters(8);
analysis.setNumNoopFilters(10); // 4+n
analysis.setTimestampObjects(timestampObjects);
analysis.setInput(NUM_OBJECTS_TO_CREATE, new Callable<TimestampObject>() {
@Override
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment