From d4eb717224d26601b1ba252ca2a5840e685da79b Mon Sep 17 00:00:00 2001 From: Christian Wulf <chw@informatik.uni-kiel.de> Date: Tue, 3 Jun 2014 22:52:38 +0200 Subject: [PATCH] worked on the different performance results --- .../countWords/CountWordsAnalysis.java | 8 ++- .../countWords/QueuedCountWordsAnalysis.java | 8 ++- .../CountingObjectsAnalysis.java | 8 ++- .../recordReader/RecordReaderAnalysis.java | 4 +- .../framework/concurrent/WorkerThread.java | 6 +- .../framework/core/AbstractFilter.java | 63 +++++++++---------- .../java/teetime/framework/core/Context.java | 5 ++ .../java/teetime/framework/core/IStage.java | 21 +++---- .../java/teetime/util/StatisticsUtil.java | 6 -- .../ThroughputTimestampAnalysisTest.java | 2 +- 10 files changed, 68 insertions(+), 63 deletions(-) diff --git a/src/main/java/teetime/examples/countWords/CountWordsAnalysis.java b/src/main/java/teetime/examples/countWords/CountWordsAnalysis.java index 0da2e7df..c958a6e4 100644 --- a/src/main/java/teetime/examples/countWords/CountWordsAnalysis.java +++ b/src/main/java/teetime/examples/countWords/CountWordsAnalysis.java @@ -33,7 +33,7 @@ import teetime.util.Pair; /** * @author Christian Wulf - * + * * @since 1.10 */ public class CountWordsAnalysis extends Analysis { @@ -100,21 +100,25 @@ public class CountWordsAnalysis extends Analysis { repeaterSource.START.setAssociatedPipe(new MethodCallPipe<Boolean>(Boolean.TRUE)); final IPipeline pipeline = new IPipeline() { + @Override @SuppressWarnings("unchecked") public List<? extends AbstractFilter<?>> getStartStages() { return Arrays.asList(repeaterSource); } + @Override public List<IStage> getStages() { return stages; } + @Override public void fireStartNotification() throws Exception { for (final IStage stage : this.getStartStages()) { stage.notifyPipelineStarts(); } } + @Override public void fireStopNotification() { for (final IStage stage : this.getStartStages()) { stage.notifyPipelineStops(); @@ -137,7 +141,7 @@ public class CountWordsAnalysis extends Analysis { for (final IStage stage : analysis.pipeline.getStages()) { if (stage instanceof AbstractFilter<?>) { - System.out.println(stage.getClass().getName() + ": " + ((AbstractFilter<?>) stage).getOverallDurationInNs()); // NOPMD (Just for example purposes) +// System.out.println(stage.getClass().getName() + ": " + ((AbstractFilter<?>) stage).getOverallDurationInNs()); // NOPMD (Just for example purposes) } } diff --git a/src/main/java/teetime/examples/countWords/QueuedCountWordsAnalysis.java b/src/main/java/teetime/examples/countWords/QueuedCountWordsAnalysis.java index d19e4a7a..3047df1c 100644 --- a/src/main/java/teetime/examples/countWords/QueuedCountWordsAnalysis.java +++ b/src/main/java/teetime/examples/countWords/QueuedCountWordsAnalysis.java @@ -36,7 +36,7 @@ import teetime.util.Pair; /** * @author Christian Wulf - * + * * @since 1.10 */ public class QueuedCountWordsAnalysis extends Analysis { @@ -102,21 +102,25 @@ public class QueuedCountWordsAnalysis extends Analysis { repeaterSource.START.setAssociatedPipe(new MethodCallPipe<Boolean>(Boolean.TRUE)); final IPipeline pipeline = new IPipeline() { + @Override @SuppressWarnings("unchecked") public List<? extends IStage> getStartStages() { return Arrays.asList(repeaterSource); } + @Override public List<IStage> getStages() { return stages; } + @Override public void fireStartNotification() throws Exception { for (final IStage stage : this.getStartStages()) { stage.notifyPipelineStarts(); } } + @Override public void fireStopNotification() { for (final IStage stage : this.getStartStages()) { stage.notifyPipelineStops(); @@ -145,7 +149,7 @@ public class QueuedCountWordsAnalysis extends Analysis { for (final IStage stage : pipeline.getStages()) { if (stage instanceof AbstractFilter<?>) { - System.out.println(stage.getClass().getName() + ": " + ((AbstractFilter<?>) stage).getOverallDurationInNs()); // NOPMD (Just for example purposes) +// System.out.println(stage.getClass().getName() + ": " + ((AbstractFilter<?>) stage).getOverallDurationInNs()); // NOPMD (Just for example purposes) } } diff --git a/src/main/java/teetime/examples/countingObjects/CountingObjectsAnalysis.java b/src/main/java/teetime/examples/countingObjects/CountingObjectsAnalysis.java index c286a540..cee84c0b 100644 --- a/src/main/java/teetime/examples/countingObjects/CountingObjectsAnalysis.java +++ b/src/main/java/teetime/examples/countingObjects/CountingObjectsAnalysis.java @@ -33,7 +33,7 @@ import teetime.stage.composite.CycledCountingFilter; /** * @author Christian Wulf - * + * * @since 1.10 */ public class CountingObjectsAnalysis extends Analysis { @@ -69,21 +69,25 @@ public class CountingObjectsAnalysis extends Analysis { repeaterSource.START.setAssociatedPipe(new MethodCallPipe<Boolean>(Boolean.TRUE)); final IPipeline pipeline = new IPipeline() { + @Override @SuppressWarnings("unchecked") public List<? extends IStage> getStartStages() { return Arrays.asList(repeaterSource); } + @Override public List<IStage> getStages() { return stages; } + @Override public void fireStartNotification() throws Exception { for (final IStage stage : this.getStartStages()) { stage.notifyPipelineStarts(); } } + @Override public void fireStopNotification() { for (final IStage stage : this.getStartStages()) { stage.notifyPipelineStops(); @@ -127,7 +131,7 @@ public class CountingObjectsAnalysis extends Analysis { for (final IStage stage : analysis.pipeline.getStages()) { if (stage instanceof AbstractFilter<?>) { - System.out.println(stage.getClass().getName() + ": " + ((AbstractFilter<?>) stage).getOverallDurationInNs()); // NOPMD (Just for example purposes) +// System.out.println(stage.getClass().getName() + ": " + ((AbstractFilter<?>) stage).getOverallDurationInNs()); // NOPMD (Just for example purposes) } } diff --git a/src/main/java/teetime/examples/recordReader/RecordReaderAnalysis.java b/src/main/java/teetime/examples/recordReader/RecordReaderAnalysis.java index cfadf9c8..532aa1fe 100644 --- a/src/main/java/teetime/examples/recordReader/RecordReaderAnalysis.java +++ b/src/main/java/teetime/examples/recordReader/RecordReaderAnalysis.java @@ -40,7 +40,7 @@ import teetime.stage.kieker.className.ClassNameRegistryRepository; /** * @author Christian Wulf - * + * * @since 1.10 */ public class RecordReaderAnalysis extends Analysis { @@ -152,7 +152,7 @@ public class RecordReaderAnalysis extends Analysis { for (final IStage stage : pipeline.getStages()) { if (stage instanceof AbstractFilter<?>) { - System.out.println(stage.getClass().getName() + ": " + ((AbstractFilter<?>) stage).getOverallDurationInNs()); // NOPMD (Just for example purposes) +// System.out.println(stage.getClass().getName() + ": " + ((AbstractFilter<?>) stage).getOverallDurationInNs()); // NOPMD (Just for example purposes) } } } diff --git a/src/main/java/teetime/framework/concurrent/WorkerThread.java b/src/main/java/teetime/framework/concurrent/WorkerThread.java index ceb04647..309265e1 100644 --- a/src/main/java/teetime/framework/concurrent/WorkerThread.java +++ b/src/main/java/teetime/framework/concurrent/WorkerThread.java @@ -37,8 +37,10 @@ public class WorkerThread extends Thread { private volatile boolean shouldTerminate = false; private final int accessesDeviceId; private int executedUnsuccessfullyCount; + private final StopWatch stopWatch = new StopWatch(); private final StopWatch iterationStopWatch = new StopWatch(); + private final StopWatch stageExecutionStopWatch = new StopWatch(); private final List<Long> schedulingOverheadsInNs = new LinkedList<Long>(); private long durationInNs; @@ -69,7 +71,9 @@ public class WorkerThread extends Thread { final IStage stage = this.stageScheduler.get(); this.startStageExecution(stage); +// stageExecutionStopWatch.start(); // expensive: takes 1/3 of overall time final boolean executedSuccessfully = stage.execute(); +// stageExecutionStopWatch.end(); this.finishStageExecution(stage, executedSuccessfully); if (this.shouldTerminate) { @@ -78,7 +82,7 @@ public class WorkerThread extends Thread { this.stageScheduler.determineNextStage(stage, executedSuccessfully); this.iterationStopWatch.end(); - final long schedulingOverhead = this.iterationStopWatch.getDurationInNs() - stage.getLastDuration(); + final long schedulingOverhead = this.iterationStopWatch.getDurationInNs() - stageExecutionStopWatch.getDurationInNs(); schedulingOverheadInNs += schedulingOverhead; if ((iterations % 10000) == 0) { this.schedulingOverheadsInNs.add(schedulingOverheadInNs); diff --git a/src/main/java/teetime/framework/core/AbstractFilter.java b/src/main/java/teetime/framework/core/AbstractFilter.java index f0a246a7..83b6b989 100644 --- a/src/main/java/teetime/framework/core/AbstractFilter.java +++ b/src/main/java/teetime/framework/core/AbstractFilter.java @@ -22,18 +22,17 @@ import java.util.Collections; import java.util.LinkedList; import java.util.List; -import teetime.util.StopWatch; import teetime.util.concurrent.workstealing.exception.DequePopException; /** - * + * * @author Christian Wulf - * + * * @since 1.10 - * + * * @param <S> * the extending stage - * + * */ public abstract class AbstractFilter<S extends IStage> extends AbstractStage implements ISink<S>, ISource, IPortListener<S> { @@ -41,7 +40,7 @@ public abstract class AbstractFilter<S extends IStage> extends AbstractStage imp /** * @author Christian Wulf - * + * * @since 1.10 */ public enum StageState { @@ -66,12 +65,14 @@ public abstract class AbstractFilter<S extends IStage> extends AbstractStage imp private Context<S> context; private final IPipeCommand closeCommand = new IPipeCommand() { + @Override public void execute(final IPipe<?> pipe) throws Exception { pipe.close(); } }; private final IPipeCommand pipelineStartsCommand = new IPipeCommand() { + @Override public void execute(final IPipe<?> pipe) throws Exception { pipe.notifyPipelineStarts(); } @@ -83,20 +84,18 @@ public abstract class AbstractFilter<S extends IStage> extends AbstractStage imp */ private int accessesDeviceId = 0; - private final StopWatch stopWatch = new StopWatch(); - private long overallDurationInNs = 0; - - private long lastDuration; - + @Override public int getAccessesDeviceId() { return this.accessesDeviceId; } + @Override public void setAccessesDeviceId(final int accessesDeviceId) { this.accessesDeviceId = accessesDeviceId; } // BETTER return a limited context that allows "read" only + @Override public Context<S> getContext() { return this.context; } @@ -104,10 +103,11 @@ public abstract class AbstractFilter<S extends IStage> extends AbstractStage imp /** * @since 1.10 */ + @Override public final boolean execute() { boolean success = false; try { - success = this.executeLogged(this.context); + success = this.execute(this.context); if (success) { // deprecated boolean return value this.context.clear(); } else { @@ -121,20 +121,9 @@ public abstract class AbstractFilter<S extends IStage> extends AbstractStage imp return success; } - private boolean executeLogged(final Context<S> context) { - this.stopWatch.start(); - try { - final boolean success = this.execute(context); - return success; - } finally { - this.stopWatch.end(); - this.lastDuration = this.stopWatch.getDurationInNs(); - this.overallDurationInNs += this.lastDuration; - } - } - protected abstract boolean execute(Context<S> context); + @Override public final void notifyPipelineStarts() throws Exception { if (this.state == StageState.UNINITIALIZED) { this.state = StageState.PIPELINE_STARTED; @@ -145,7 +134,7 @@ public abstract class AbstractFilter<S extends IStage> extends AbstractStage imp /** * This method is called exactly once iff the pipeline is started. - * + * * @throws Exception * @since 1.10 */ @@ -156,6 +145,7 @@ public abstract class AbstractFilter<S extends IStage> extends AbstractStage imp /** * @since 1.10 */ + @Override public void notifyOutputPipes(final IPipeCommand pipeCommand) throws Exception { for (final IOutputPort<S, ?> outputPort : this.readOnlyOutputPorts) { final IPipe<?> associatedPipe = outputPort.getAssociatedPipe(); @@ -165,6 +155,7 @@ public abstract class AbstractFilter<S extends IStage> extends AbstractStage imp } } + @Override public final void notifyPipelineStops() { if (this.state != StageState.PIPELINE_STOPPED) { this.state = StageState.PIPELINE_STOPPED; @@ -174,7 +165,7 @@ public abstract class AbstractFilter<S extends IStage> extends AbstractStage imp /** * This method is called exactly once iff the pipeline is stopped. - * + * * @since 1.10 */ public void onPipelineStops() { @@ -184,6 +175,7 @@ public abstract class AbstractFilter<S extends IStage> extends AbstractStage imp /** * @since 1.10 */ + @Override public void onPortIsClosed(final IInputPort<S, ?> inputPort) { // inputPort.setState(IInputPort.State.CLOSING); this.enabledInputPorts--; @@ -209,6 +201,7 @@ public abstract class AbstractFilter<S extends IStage> extends AbstractStage imp /** * @since 1.10 */ + @Override public void fireSignalClosingToAllInputPorts() { // this.logger.info("Fire closing signal to all input ports of: " + this); @@ -224,6 +217,7 @@ public abstract class AbstractFilter<S extends IStage> extends AbstractStage imp /** * @since 1.10 */ + @Override public void fireSignalClosingToAllOutputPorts() { try { this.notifyOutputPipes(this.closeCommand); @@ -235,6 +229,7 @@ public abstract class AbstractFilter<S extends IStage> extends AbstractStage imp /** * @since 1.10 */ + @Override public boolean mayBeDisabled() { return this.mayBeDisabled; } @@ -275,6 +270,7 @@ public abstract class AbstractFilter<S extends IStage> extends AbstractStage imp /** * @since 1.10 */ + @Override @SuppressWarnings("unchecked") public List<IInputPort<S, ?>> getInputPorts() { return this.readOnlyInputPorts; @@ -283,6 +279,7 @@ public abstract class AbstractFilter<S extends IStage> extends AbstractStage imp /** * @since 1.10 */ + @Override @SuppressWarnings("unchecked") public List<IOutputPort<S, ?>> getOutputPorts() { return this.readOnlyOutputPorts; @@ -300,34 +297,32 @@ public abstract class AbstractFilter<S extends IStage> extends AbstractStage imp return outputStages; } + @Override public IInputPort<?, ?> getInputPortByIndex(final int index) { return this.readOnlyInputPorts.get(index); } + @Override public IOutputPort<?, ?> getOutputPortByIndex(final int index) { return this.readOnlyOutputPorts.get(index); } - public long getOverallDurationInNs() { - return this.overallDurationInNs; - } - - public long getLastDuration() { - return this.lastDuration; - } - + @Override public int getDepth() { return this.depth; } + @Override public void setDepth(final int depth) { this.depth = depth; } + @Override public int getSchedulingIndex() { return this.schedulingIndex; } + @Override public void setSchedulingIndex(final int schedulingIndex) { this.schedulingIndex = schedulingIndex; } diff --git a/src/main/java/teetime/framework/core/Context.java b/src/main/java/teetime/framework/core/Context.java index f0fc5450..f39f5dd8 100644 --- a/src/main/java/teetime/framework/core/Context.java +++ b/src/main/java/teetime/framework/core/Context.java @@ -3,6 +3,11 @@ package teetime.framework.core; import java.util.ArrayList; import java.util.List; +/** + * @author Christian Wulf + * + * @since 1.10 + */ public class Context<S extends IStage> { /** diff --git a/src/main/java/teetime/framework/core/IStage.java b/src/main/java/teetime/framework/core/IStage.java index bd5c09f0..19c68d9d 100644 --- a/src/main/java/teetime/framework/core/IStage.java +++ b/src/main/java/teetime/framework/core/IStage.java @@ -21,7 +21,7 @@ import java.util.List; /** * @author Christian Wulf - * + * * @since 1.10 */ public interface IStage extends IBaseStage { @@ -36,11 +36,11 @@ public interface IStage extends IBaseStage { /** * @return <code>true</code> if the execution took enough tokens from the input ports so that the stage made progress due to this execution, <code>false</code> * otherwise. The definition of <i>progress</i> depends on the semantics of the particular stage. - * + * * <p> * Example usage: * </p> - * + * * <pre> * <code> * boolean execute() { @@ -53,7 +53,7 @@ public interface IStage extends IBaseStage { * } * </code> * </pre> - * + * * @since 1.10 */ boolean execute(); @@ -70,9 +70,9 @@ public interface IStage extends IBaseStage { // void execute(TaskBundle taskBundle); /** - * + * * @return <code>true</code> if the stage may be disabled by the pipeline scheduler, <code>false</code> otherwise. - * + * * @since 1.10 */ boolean mayBeDisabled(); @@ -99,7 +99,7 @@ public interface IStage extends IBaseStage { /** * <i>Hint: Used by the scheduler</i> - * + * * @since 1.10 */ Context<?> getContext(); @@ -126,7 +126,7 @@ public interface IStage extends IBaseStage { /** * <i>Hint: Only needed by stage schedulers.</i> - * + * * @return */ public Collection<? extends IStage> getAllOutputStages(); @@ -145,11 +145,6 @@ public interface IStage extends IBaseStage { void setOwningThread(Thread owningThread); - /** - * @since 1.10 - */ - long getLastDuration(); - /** * @since 1.10 */ diff --git a/src/main/java/teetime/util/StatisticsUtil.java b/src/main/java/teetime/util/StatisticsUtil.java index 6c664294..00e2b16f 100644 --- a/src/main/java/teetime/util/StatisticsUtil.java +++ b/src/main/java/teetime/util/StatisticsUtil.java @@ -47,23 +47,17 @@ public class StatisticsUtil { System.out.println("Duration: " + TimeUnit.NANOSECONDS.toMillis(overallDurationInNs) + " ms"); final List<Long> sortedDurationsInNs = new ArrayList<Long>(timestampObjects.size() / 2); - long minDurationInNs = Long.MAX_VALUE; - long maxDurationInNs = Long.MIN_VALUE; long sumInNs = 0; for (int i = timestampObjects.size() / 2; i < timestampObjects.size(); i++) { final TimestampObject timestampObject = timestampObjects.get(i); final long durationInNs = timestampObject.getStopTimestamp() - timestampObject.getStartTimestamp(); // sortedDurationsInNs.set(i - (timestampObjects.size() / 2), durationInNs); sortedDurationsInNs.add(durationInNs); - minDurationInNs = Math.min(durationInNs, minDurationInNs); - maxDurationInNs = Math.max(durationInNs, maxDurationInNs); sumInNs += durationInNs; } final Map<Double, Long> quintileValues = StatisticsUtil.calculateQuintiles(sortedDurationsInNs); - System.out.println("min: " + TimeUnit.NANOSECONDS.toMicros(minDurationInNs) + " µs"); - System.out.println("max: " + TimeUnit.NANOSECONDS.toMicros(maxDurationInNs) + " µs"); final long avgDurInNs = sumInNs / (timestampObjects.size() / 2); System.out.println("avg duration: " + TimeUnit.NANOSECONDS.toMicros(avgDurInNs) + " µs"); diff --git a/src/test/java/teetime/examples/throughput/ThroughputTimestampAnalysisTest.java b/src/test/java/teetime/examples/throughput/ThroughputTimestampAnalysisTest.java index b6f1f8a3..c11ce939 100644 --- a/src/test/java/teetime/examples/throughput/ThroughputTimestampAnalysisTest.java +++ b/src/test/java/teetime/examples/throughput/ThroughputTimestampAnalysisTest.java @@ -48,7 +48,7 @@ public class ThroughputTimestampAnalysisTest { final ThroughputTimestampAnalysis analysis = new ThroughputTimestampAnalysis(); analysis.setShouldUseQueue(true); - analysis.setNumNoopFilters(8); + analysis.setNumNoopFilters(10); // 4+n analysis.setTimestampObjects(timestampObjects); analysis.setInput(NUM_OBJECTS_TO_CREATE, new Callable<TimestampObject>() { @Override -- GitLab