diff --git a/src/main/java/teetime/examples/countWords/CountWordsAnalysis.java b/src/main/java/teetime/examples/countWords/CountWordsAnalysis.java index 0da2e7dfd8155bbac58e37e840c29ca7c0f7f89e..c958a6e467c872254d51f6e414238dbfccf74458 100644 --- a/src/main/java/teetime/examples/countWords/CountWordsAnalysis.java +++ b/src/main/java/teetime/examples/countWords/CountWordsAnalysis.java @@ -33,7 +33,7 @@ import teetime.util.Pair; /** * @author Christian Wulf - * + * * @since 1.10 */ public class CountWordsAnalysis extends Analysis { @@ -100,21 +100,25 @@ public class CountWordsAnalysis extends Analysis { repeaterSource.START.setAssociatedPipe(new MethodCallPipe<Boolean>(Boolean.TRUE)); final IPipeline pipeline = new IPipeline() { + @Override @SuppressWarnings("unchecked") public List<? extends AbstractFilter<?>> getStartStages() { return Arrays.asList(repeaterSource); } + @Override public List<IStage> getStages() { return stages; } + @Override public void fireStartNotification() throws Exception { for (final IStage stage : this.getStartStages()) { stage.notifyPipelineStarts(); } } + @Override public void fireStopNotification() { for (final IStage stage : this.getStartStages()) { stage.notifyPipelineStops(); @@ -137,7 +141,7 @@ public class CountWordsAnalysis extends Analysis { for (final IStage stage : analysis.pipeline.getStages()) { if (stage instanceof AbstractFilter<?>) { - System.out.println(stage.getClass().getName() + ": " + ((AbstractFilter<?>) stage).getOverallDurationInNs()); // NOPMD (Just for example purposes) +// System.out.println(stage.getClass().getName() + ": " + ((AbstractFilter<?>) stage).getOverallDurationInNs()); // NOPMD (Just for example purposes) } } diff --git a/src/main/java/teetime/examples/countWords/QueuedCountWordsAnalysis.java b/src/main/java/teetime/examples/countWords/QueuedCountWordsAnalysis.java index d19e4a7a4d05f534a5a11bcc690eee2a6d108174..3047df1cfb64c67bff3795dd2c264db56c834ed9 100644 --- a/src/main/java/teetime/examples/countWords/QueuedCountWordsAnalysis.java +++ b/src/main/java/teetime/examples/countWords/QueuedCountWordsAnalysis.java @@ -36,7 +36,7 @@ import teetime.util.Pair; /** * @author Christian Wulf - * + * * @since 1.10 */ public class QueuedCountWordsAnalysis extends Analysis { @@ -102,21 +102,25 @@ public class QueuedCountWordsAnalysis extends Analysis { repeaterSource.START.setAssociatedPipe(new MethodCallPipe<Boolean>(Boolean.TRUE)); final IPipeline pipeline = new IPipeline() { + @Override @SuppressWarnings("unchecked") public List<? extends IStage> getStartStages() { return Arrays.asList(repeaterSource); } + @Override public List<IStage> getStages() { return stages; } + @Override public void fireStartNotification() throws Exception { for (final IStage stage : this.getStartStages()) { stage.notifyPipelineStarts(); } } + @Override public void fireStopNotification() { for (final IStage stage : this.getStartStages()) { stage.notifyPipelineStops(); @@ -145,7 +149,7 @@ public class QueuedCountWordsAnalysis extends Analysis { for (final IStage stage : pipeline.getStages()) { if (stage instanceof AbstractFilter<?>) { - System.out.println(stage.getClass().getName() + ": " + ((AbstractFilter<?>) stage).getOverallDurationInNs()); // NOPMD (Just for example purposes) +// System.out.println(stage.getClass().getName() + ": " + ((AbstractFilter<?>) stage).getOverallDurationInNs()); // NOPMD (Just for example purposes) } } diff --git a/src/main/java/teetime/examples/countingObjects/CountingObjectsAnalysis.java b/src/main/java/teetime/examples/countingObjects/CountingObjectsAnalysis.java index c286a540eac7522d19f89e33a304e1a965bb3d03..cee84c0b3525aac0fc6933713fe79e94216494ef 100644 --- a/src/main/java/teetime/examples/countingObjects/CountingObjectsAnalysis.java +++ b/src/main/java/teetime/examples/countingObjects/CountingObjectsAnalysis.java @@ -33,7 +33,7 @@ import teetime.stage.composite.CycledCountingFilter; /** * @author Christian Wulf - * + * * @since 1.10 */ public class CountingObjectsAnalysis extends Analysis { @@ -69,21 +69,25 @@ public class CountingObjectsAnalysis extends Analysis { repeaterSource.START.setAssociatedPipe(new MethodCallPipe<Boolean>(Boolean.TRUE)); final IPipeline pipeline = new IPipeline() { + @Override @SuppressWarnings("unchecked") public List<? extends IStage> getStartStages() { return Arrays.asList(repeaterSource); } + @Override public List<IStage> getStages() { return stages; } + @Override public void fireStartNotification() throws Exception { for (final IStage stage : this.getStartStages()) { stage.notifyPipelineStarts(); } } + @Override public void fireStopNotification() { for (final IStage stage : this.getStartStages()) { stage.notifyPipelineStops(); @@ -127,7 +131,7 @@ public class CountingObjectsAnalysis extends Analysis { for (final IStage stage : analysis.pipeline.getStages()) { if (stage instanceof AbstractFilter<?>) { - System.out.println(stage.getClass().getName() + ": " + ((AbstractFilter<?>) stage).getOverallDurationInNs()); // NOPMD (Just for example purposes) +// System.out.println(stage.getClass().getName() + ": " + ((AbstractFilter<?>) stage).getOverallDurationInNs()); // NOPMD (Just for example purposes) } } diff --git a/src/main/java/teetime/examples/recordReader/RecordReaderAnalysis.java b/src/main/java/teetime/examples/recordReader/RecordReaderAnalysis.java index cfadf9c89e4e86c3d56915950d315fd728b080ba..532aa1fed22ca99dd7dfdd6f3eabb8adc57f88b9 100644 --- a/src/main/java/teetime/examples/recordReader/RecordReaderAnalysis.java +++ b/src/main/java/teetime/examples/recordReader/RecordReaderAnalysis.java @@ -40,7 +40,7 @@ import teetime.stage.kieker.className.ClassNameRegistryRepository; /** * @author Christian Wulf - * + * * @since 1.10 */ public class RecordReaderAnalysis extends Analysis { @@ -152,7 +152,7 @@ public class RecordReaderAnalysis extends Analysis { for (final IStage stage : pipeline.getStages()) { if (stage instanceof AbstractFilter<?>) { - System.out.println(stage.getClass().getName() + ": " + ((AbstractFilter<?>) stage).getOverallDurationInNs()); // NOPMD (Just for example purposes) +// System.out.println(stage.getClass().getName() + ": " + ((AbstractFilter<?>) stage).getOverallDurationInNs()); // NOPMD (Just for example purposes) } } } diff --git a/src/main/java/teetime/framework/concurrent/WorkerThread.java b/src/main/java/teetime/framework/concurrent/WorkerThread.java index ceb046473fe12f7b5677ff006f0e6397a441896d..309265e1727d280b78452c1c6577b09e6df23638 100644 --- a/src/main/java/teetime/framework/concurrent/WorkerThread.java +++ b/src/main/java/teetime/framework/concurrent/WorkerThread.java @@ -37,8 +37,10 @@ public class WorkerThread extends Thread { private volatile boolean shouldTerminate = false; private final int accessesDeviceId; private int executedUnsuccessfullyCount; + private final StopWatch stopWatch = new StopWatch(); private final StopWatch iterationStopWatch = new StopWatch(); + private final StopWatch stageExecutionStopWatch = new StopWatch(); private final List<Long> schedulingOverheadsInNs = new LinkedList<Long>(); private long durationInNs; @@ -69,7 +71,9 @@ public class WorkerThread extends Thread { final IStage stage = this.stageScheduler.get(); this.startStageExecution(stage); +// stageExecutionStopWatch.start(); // expensive: takes 1/3 of overall time final boolean executedSuccessfully = stage.execute(); +// stageExecutionStopWatch.end(); this.finishStageExecution(stage, executedSuccessfully); if (this.shouldTerminate) { @@ -78,7 +82,7 @@ public class WorkerThread extends Thread { this.stageScheduler.determineNextStage(stage, executedSuccessfully); this.iterationStopWatch.end(); - final long schedulingOverhead = this.iterationStopWatch.getDurationInNs() - stage.getLastDuration(); + final long schedulingOverhead = this.iterationStopWatch.getDurationInNs() - stageExecutionStopWatch.getDurationInNs(); schedulingOverheadInNs += schedulingOverhead; if ((iterations % 10000) == 0) { this.schedulingOverheadsInNs.add(schedulingOverheadInNs); diff --git a/src/main/java/teetime/framework/core/AbstractFilter.java b/src/main/java/teetime/framework/core/AbstractFilter.java index f0a246a7b0c41c6a1ec6a0f056a700d49a7f9551..83b6b9895c6a7263cfda7708b150c508b54c263c 100644 --- a/src/main/java/teetime/framework/core/AbstractFilter.java +++ b/src/main/java/teetime/framework/core/AbstractFilter.java @@ -22,18 +22,17 @@ import java.util.Collections; import java.util.LinkedList; import java.util.List; -import teetime.util.StopWatch; import teetime.util.concurrent.workstealing.exception.DequePopException; /** - * + * * @author Christian Wulf - * + * * @since 1.10 - * + * * @param <S> * the extending stage - * + * */ public abstract class AbstractFilter<S extends IStage> extends AbstractStage implements ISink<S>, ISource, IPortListener<S> { @@ -41,7 +40,7 @@ public abstract class AbstractFilter<S extends IStage> extends AbstractStage imp /** * @author Christian Wulf - * + * * @since 1.10 */ public enum StageState { @@ -66,12 +65,14 @@ public abstract class AbstractFilter<S extends IStage> extends AbstractStage imp private Context<S> context; private final IPipeCommand closeCommand = new IPipeCommand() { + @Override public void execute(final IPipe<?> pipe) throws Exception { pipe.close(); } }; private final IPipeCommand pipelineStartsCommand = new IPipeCommand() { + @Override public void execute(final IPipe<?> pipe) throws Exception { pipe.notifyPipelineStarts(); } @@ -83,20 +84,18 @@ public abstract class AbstractFilter<S extends IStage> extends AbstractStage imp */ private int accessesDeviceId = 0; - private final StopWatch stopWatch = new StopWatch(); - private long overallDurationInNs = 0; - - private long lastDuration; - + @Override public int getAccessesDeviceId() { return this.accessesDeviceId; } + @Override public void setAccessesDeviceId(final int accessesDeviceId) { this.accessesDeviceId = accessesDeviceId; } // BETTER return a limited context that allows "read" only + @Override public Context<S> getContext() { return this.context; } @@ -104,10 +103,11 @@ public abstract class AbstractFilter<S extends IStage> extends AbstractStage imp /** * @since 1.10 */ + @Override public final boolean execute() { boolean success = false; try { - success = this.executeLogged(this.context); + success = this.execute(this.context); if (success) { // deprecated boolean return value this.context.clear(); } else { @@ -121,20 +121,9 @@ public abstract class AbstractFilter<S extends IStage> extends AbstractStage imp return success; } - private boolean executeLogged(final Context<S> context) { - this.stopWatch.start(); - try { - final boolean success = this.execute(context); - return success; - } finally { - this.stopWatch.end(); - this.lastDuration = this.stopWatch.getDurationInNs(); - this.overallDurationInNs += this.lastDuration; - } - } - protected abstract boolean execute(Context<S> context); + @Override public final void notifyPipelineStarts() throws Exception { if (this.state == StageState.UNINITIALIZED) { this.state = StageState.PIPELINE_STARTED; @@ -145,7 +134,7 @@ public abstract class AbstractFilter<S extends IStage> extends AbstractStage imp /** * This method is called exactly once iff the pipeline is started. - * + * * @throws Exception * @since 1.10 */ @@ -156,6 +145,7 @@ public abstract class AbstractFilter<S extends IStage> extends AbstractStage imp /** * @since 1.10 */ + @Override public void notifyOutputPipes(final IPipeCommand pipeCommand) throws Exception { for (final IOutputPort<S, ?> outputPort : this.readOnlyOutputPorts) { final IPipe<?> associatedPipe = outputPort.getAssociatedPipe(); @@ -165,6 +155,7 @@ public abstract class AbstractFilter<S extends IStage> extends AbstractStage imp } } + @Override public final void notifyPipelineStops() { if (this.state != StageState.PIPELINE_STOPPED) { this.state = StageState.PIPELINE_STOPPED; @@ -174,7 +165,7 @@ public abstract class AbstractFilter<S extends IStage> extends AbstractStage imp /** * This method is called exactly once iff the pipeline is stopped. - * + * * @since 1.10 */ public void onPipelineStops() { @@ -184,6 +175,7 @@ public abstract class AbstractFilter<S extends IStage> extends AbstractStage imp /** * @since 1.10 */ + @Override public void onPortIsClosed(final IInputPort<S, ?> inputPort) { // inputPort.setState(IInputPort.State.CLOSING); this.enabledInputPorts--; @@ -209,6 +201,7 @@ public abstract class AbstractFilter<S extends IStage> extends AbstractStage imp /** * @since 1.10 */ + @Override public void fireSignalClosingToAllInputPorts() { // this.logger.info("Fire closing signal to all input ports of: " + this); @@ -224,6 +217,7 @@ public abstract class AbstractFilter<S extends IStage> extends AbstractStage imp /** * @since 1.10 */ + @Override public void fireSignalClosingToAllOutputPorts() { try { this.notifyOutputPipes(this.closeCommand); @@ -235,6 +229,7 @@ public abstract class AbstractFilter<S extends IStage> extends AbstractStage imp /** * @since 1.10 */ + @Override public boolean mayBeDisabled() { return this.mayBeDisabled; } @@ -275,6 +270,7 @@ public abstract class AbstractFilter<S extends IStage> extends AbstractStage imp /** * @since 1.10 */ + @Override @SuppressWarnings("unchecked") public List<IInputPort<S, ?>> getInputPorts() { return this.readOnlyInputPorts; @@ -283,6 +279,7 @@ public abstract class AbstractFilter<S extends IStage> extends AbstractStage imp /** * @since 1.10 */ + @Override @SuppressWarnings("unchecked") public List<IOutputPort<S, ?>> getOutputPorts() { return this.readOnlyOutputPorts; @@ -300,34 +297,32 @@ public abstract class AbstractFilter<S extends IStage> extends AbstractStage imp return outputStages; } + @Override public IInputPort<?, ?> getInputPortByIndex(final int index) { return this.readOnlyInputPorts.get(index); } + @Override public IOutputPort<?, ?> getOutputPortByIndex(final int index) { return this.readOnlyOutputPorts.get(index); } - public long getOverallDurationInNs() { - return this.overallDurationInNs; - } - - public long getLastDuration() { - return this.lastDuration; - } - + @Override public int getDepth() { return this.depth; } + @Override public void setDepth(final int depth) { this.depth = depth; } + @Override public int getSchedulingIndex() { return this.schedulingIndex; } + @Override public void setSchedulingIndex(final int schedulingIndex) { this.schedulingIndex = schedulingIndex; } diff --git a/src/main/java/teetime/framework/core/Context.java b/src/main/java/teetime/framework/core/Context.java index f0fc54509f8bef80b54bf8b0ab029722fd375a72..f39f5dd89e6de0f45501c603e79f105f1fe4dad9 100644 --- a/src/main/java/teetime/framework/core/Context.java +++ b/src/main/java/teetime/framework/core/Context.java @@ -3,6 +3,11 @@ package teetime.framework.core; import java.util.ArrayList; import java.util.List; +/** + * @author Christian Wulf + * + * @since 1.10 + */ public class Context<S extends IStage> { /** diff --git a/src/main/java/teetime/framework/core/IStage.java b/src/main/java/teetime/framework/core/IStage.java index bd5c09f06d77be84015e2651ed2ec17977b51676..19c68d9d3ef068cc0211f5ab8353e4372c28b8c7 100644 --- a/src/main/java/teetime/framework/core/IStage.java +++ b/src/main/java/teetime/framework/core/IStage.java @@ -21,7 +21,7 @@ import java.util.List; /** * @author Christian Wulf - * + * * @since 1.10 */ public interface IStage extends IBaseStage { @@ -36,11 +36,11 @@ public interface IStage extends IBaseStage { /** * @return <code>true</code> if the execution took enough tokens from the input ports so that the stage made progress due to this execution, <code>false</code> * otherwise. The definition of <i>progress</i> depends on the semantics of the particular stage. - * + * * <p> * Example usage: * </p> - * + * * <pre> * <code> * boolean execute() { @@ -53,7 +53,7 @@ public interface IStage extends IBaseStage { * } * </code> * </pre> - * + * * @since 1.10 */ boolean execute(); @@ -70,9 +70,9 @@ public interface IStage extends IBaseStage { // void execute(TaskBundle taskBundle); /** - * + * * @return <code>true</code> if the stage may be disabled by the pipeline scheduler, <code>false</code> otherwise. - * + * * @since 1.10 */ boolean mayBeDisabled(); @@ -99,7 +99,7 @@ public interface IStage extends IBaseStage { /** * <i>Hint: Used by the scheduler</i> - * + * * @since 1.10 */ Context<?> getContext(); @@ -126,7 +126,7 @@ public interface IStage extends IBaseStage { /** * <i>Hint: Only needed by stage schedulers.</i> - * + * * @return */ public Collection<? extends IStage> getAllOutputStages(); @@ -145,11 +145,6 @@ public interface IStage extends IBaseStage { void setOwningThread(Thread owningThread); - /** - * @since 1.10 - */ - long getLastDuration(); - /** * @since 1.10 */ diff --git a/src/main/java/teetime/util/StatisticsUtil.java b/src/main/java/teetime/util/StatisticsUtil.java index 6c664294a409af0f380437033268ca12003a4ab8..00e2b16fc696d7d3661a6722a8e3db95bf739efb 100644 --- a/src/main/java/teetime/util/StatisticsUtil.java +++ b/src/main/java/teetime/util/StatisticsUtil.java @@ -47,23 +47,17 @@ public class StatisticsUtil { System.out.println("Duration: " + TimeUnit.NANOSECONDS.toMillis(overallDurationInNs) + " ms"); final List<Long> sortedDurationsInNs = new ArrayList<Long>(timestampObjects.size() / 2); - long minDurationInNs = Long.MAX_VALUE; - long maxDurationInNs = Long.MIN_VALUE; long sumInNs = 0; for (int i = timestampObjects.size() / 2; i < timestampObjects.size(); i++) { final TimestampObject timestampObject = timestampObjects.get(i); final long durationInNs = timestampObject.getStopTimestamp() - timestampObject.getStartTimestamp(); // sortedDurationsInNs.set(i - (timestampObjects.size() / 2), durationInNs); sortedDurationsInNs.add(durationInNs); - minDurationInNs = Math.min(durationInNs, minDurationInNs); - maxDurationInNs = Math.max(durationInNs, maxDurationInNs); sumInNs += durationInNs; } final Map<Double, Long> quintileValues = StatisticsUtil.calculateQuintiles(sortedDurationsInNs); - System.out.println("min: " + TimeUnit.NANOSECONDS.toMicros(minDurationInNs) + " µs"); - System.out.println("max: " + TimeUnit.NANOSECONDS.toMicros(maxDurationInNs) + " µs"); final long avgDurInNs = sumInNs / (timestampObjects.size() / 2); System.out.println("avg duration: " + TimeUnit.NANOSECONDS.toMicros(avgDurInNs) + " µs"); diff --git a/src/test/java/teetime/examples/throughput/ThroughputTimestampAnalysisTest.java b/src/test/java/teetime/examples/throughput/ThroughputTimestampAnalysisTest.java index b6f1f8a37460e14d6f25e9d78287fb83f5df3d62..c11ce9390642f6a57d19d686734b652c866b2d62 100644 --- a/src/test/java/teetime/examples/throughput/ThroughputTimestampAnalysisTest.java +++ b/src/test/java/teetime/examples/throughput/ThroughputTimestampAnalysisTest.java @@ -48,7 +48,7 @@ public class ThroughputTimestampAnalysisTest { final ThroughputTimestampAnalysis analysis = new ThroughputTimestampAnalysis(); analysis.setShouldUseQueue(true); - analysis.setNumNoopFilters(8); + analysis.setNumNoopFilters(10); // 4+n analysis.setTimestampObjects(timestampObjects); analysis.setInput(NUM_OBJECTS_TO_CREATE, new Callable<TimestampObject>() { @Override