diff --git a/src/main/java/teetime/examples/countWords/ConcurrentCountWordsAnalysis.java b/src/main/java/teetime/examples/countWords/ConcurrentCountWordsAnalysis.java index cfc813af151a2d90b022c5d34078d5b5b9c25a2e..4fa5036223f19f0de33dbe846523a500dd48ed5c 100644 --- a/src/main/java/teetime/examples/countWords/ConcurrentCountWordsAnalysis.java +++ b/src/main/java/teetime/examples/countWords/ConcurrentCountWordsAnalysis.java @@ -20,7 +20,6 @@ import java.io.File; import java.util.Arrays; import java.util.LinkedList; import java.util.List; -import java.util.concurrent.TimeUnit; import teetime.framework.concurrent.ConcurrentWorkStealingPipe; import teetime.framework.concurrent.ConcurrentWorkStealingPipeFactory; @@ -43,7 +42,7 @@ import teetime.util.Pair; /** * @author Christian Wulf - * + * * @since 1.10 */ public class ConcurrentCountWordsAnalysis extends Analysis { @@ -155,21 +154,25 @@ public class ConcurrentCountWordsAnalysis extends Analysis { repeaterSource.START.setAssociatedPipe(new MethodCallPipe<Boolean>(Boolean.TRUE)); final IPipeline pipeline = new IPipeline() { + @Override @SuppressWarnings("unchecked") public List<? extends IStage> getStartStages() { return Arrays.asList(repeaterSource); } + @Override public List<IStage> getStages() { return stages; } + @Override public void fireStartNotification() throws Exception { for (final IStage stage : this.getStartStages()) { stage.notifyPipelineStarts(); } } + @Override public void fireStopNotification() { for (final IStage stage : this.getStartStages()) { stage.notifyPipelineStops(); @@ -205,21 +208,25 @@ public class ConcurrentCountWordsAnalysis extends Analysis { SingleProducerSingleConsumerPipe.connect(merger.outputPort, printingMerger.getNewInputPort()); final IPipeline pipeline = new IPipeline() { + @Override @SuppressWarnings("unchecked") public List<? extends IStage> getStartStages() { return Arrays.asList(distributor); } + @Override public List<IStage> getStages() { return stages; } + @Override public void fireStartNotification() throws Exception { for (final IStage stage : this.getStartStages()) { stage.notifyPipelineStarts(); } } + @Override public void fireStopNotification() { for (final IStage stage : this.getStartStages()) { stage.notifyPipelineStops(); @@ -246,21 +253,25 @@ public class ConcurrentCountWordsAnalysis extends Analysis { QueuePipe.connect(merger.outputPort, outputWordsCountStage.fileWordcountTupleInputPort); final IPipeline pipeline = new IPipeline() { + @Override @SuppressWarnings("unchecked") public List<? extends IStage> getStartStages() { return Arrays.asList(merger); } + @Override public List<IStage> getStages() { return stages; } + @Override public void fireStartNotification() throws Exception { for (final IStage stage : this.getStartStages()) { stage.notifyPipelineStarts(); } } + @Override public void fireStopNotification() { for (final IStage stage : this.getStartStages()) { stage.notifyPipelineStops(); @@ -316,8 +327,8 @@ public class ConcurrentCountWordsAnalysis extends Analysis { System.out.println(stage); // NOPMD (Just for example purposes) } - final long durationInNs = thread.getDurationInNs(); - System.out.println(thread + " takes " + TimeUnit.NANOSECONDS.toMillis(durationInNs) + " ms"); +// final long durationInNs = thread.getDurationInNs(); +// System.out.println(thread + " takes " + TimeUnit.NANOSECONDS.toMillis(durationInNs) + " ms"); } for (final WorkerThread thread : this.nonIoThreads) { @@ -326,16 +337,16 @@ public class ConcurrentCountWordsAnalysis extends Analysis { System.out.println(stage); // NOPMD (Just for example purposes) } - final long durationInNs = thread.getDurationInNs(); - System.out.println(thread + " takes " + TimeUnit.NANOSECONDS.toMillis(durationInNs) + " ms"); +// final long durationInNs = thread.getDurationInNs(); +// System.out.println(thread + " takes " + TimeUnit.NANOSECONDS.toMillis(durationInNs) + " ms"); - if (durationInNs > maxDuration) { - maxDuration = durationInNs; - maxThread = thread; - } +// if (durationInNs > maxDuration) { +// maxDuration = durationInNs; +// maxThread = thread; +// } } - System.out.println("maxThread: " + maxThread.toString() + " takes " + TimeUnit.NANOSECONDS.toMillis(maxDuration) + " ms"); // NOPMD (Just for example +// System.out.println("maxThread: " + maxThread.toString() + " takes " + TimeUnit.NANOSECONDS.toMillis(maxDuration) + " ms"); // NOPMD (Just for example // purposes) } } diff --git a/src/main/java/teetime/examples/throughput/ThroughputAnalysis.java b/src/main/java/teetime/examples/throughput/ThroughputAnalysis.java index 6a87b53ba4c8598938915d5ea196c3d0f64de67a..98984ffdbc4bf2a153dab702269310a68ddc9260 100644 --- a/src/main/java/teetime/examples/throughput/ThroughputAnalysis.java +++ b/src/main/java/teetime/examples/throughput/ThroughputAnalysis.java @@ -19,7 +19,6 @@ import java.util.Arrays; import java.util.LinkedList; import java.util.List; import java.util.concurrent.Callable; -import java.util.concurrent.TimeUnit; import teetime.framework.concurrent.StageTerminationPolicy; import teetime.framework.concurrent.WorkerThread; @@ -33,7 +32,7 @@ import teetime.stage.basic.ObjectProducer; /** * @author Christian Wulf - * + * * @since 1.10 */ public class ThroughputAnalysis<T> extends Analysis { @@ -101,7 +100,6 @@ public class ThroughputAnalysis<T> extends Analysis { e.printStackTrace(); } - System.out.println("SchedulingOverhead: " + TimeUnit.NANOSECONDS.toMillis(this.workerThread.computeSchedulingOverheadInNs()) + " ms"); } public int getNumNoopFilters() { diff --git a/src/main/java/teetime/examples/throughput/ThroughputTimestampAnalysis.java b/src/main/java/teetime/examples/throughput/ThroughputTimestampAnalysis.java index 4c813011109ec57765a85dbaca57a792d2030185..547a02f4dc6cd502e34199a2ea0bf7d19b4c6033 100644 --- a/src/main/java/teetime/examples/throughput/ThroughputTimestampAnalysis.java +++ b/src/main/java/teetime/examples/throughput/ThroughputTimestampAnalysis.java @@ -131,15 +131,24 @@ public class ThroughputTimestampAnalysis extends Analysis { e.printStackTrace(); } - final long schedulingOverheadInNs = this.workerThread.computeSchedulingOverheadInNs(); - final int size = this.workerThread.getSchedulingOverheadsInNs().size(); - System.out.println("scheduling overhead times: " + size); - if (size > 0) { - System.out.println("SchedulingOverhead: " + TimeUnit.NANOSECONDS.toMillis(schedulingOverheadInNs) + " ms"); - System.out.println("avg overhead of iteration: " - + TimeUnit.NANOSECONDS.toMillis(schedulingOverheadInNs * 2 / size) + " ms"); - System.out.println("ExecutedUnsuccessfullyCount: " + this.workerThread.getExecutedUnsuccessfullyCount()); + List<Long> durationPer10000IterationsInNs = workerThread.getDurationPer10000IterationsInNs(); + + long overallSumInNs = 0; + for (int i = 0; i < durationPer10000IterationsInNs.size(); i++) { + overallSumInNs += durationPer10000IterationsInNs.get(i); + } + + long sumInNs = 0; + for (int i = durationPer10000IterationsInNs.size() / 2; i < durationPer10000IterationsInNs.size(); i++) { + sumInNs += durationPer10000IterationsInNs.get(i); } + + System.out.println("Thread iterations: " + workerThread.getIterations() + " times"); + System.out.println("Thread execution time: " + TimeUnit.NANOSECONDS.toMillis(overallSumInNs) + " ms"); + System.out.println("Thread half duration/iterations: " + sumInNs / (workerThread.getIterations() / 2) + + " ns/iteration"); + System.out.println("Thread unsuccessfully executed stages: " + workerThread.getExecutedUnsuccessfullyCount() + + " times"); } public int getNumNoopFilters() { diff --git a/src/main/java/teetime/framework/concurrent/NextStageScheduler.java b/src/main/java/teetime/framework/concurrent/NextStageScheduler.java index e45a786abe3b46c9e5b517d6fa69e9cd5ace15c3..82263b5b569a1057e730cabd07bbb8e37f0b5962 100644 --- a/src/main/java/teetime/framework/concurrent/NextStageScheduler.java +++ b/src/main/java/teetime/framework/concurrent/NextStageScheduler.java @@ -19,18 +19,15 @@ package teetime.framework.concurrent; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; import java.util.Map; import teetime.framework.core.IOutputPort; import teetime.framework.core.IPipeline; import teetime.framework.core.IStage; -import teetime.util.StopWatch; /** * @author Christian Wulf - * + * * @since 1.10 */ public class NextStageScheduler implements IStageScheduler { @@ -39,11 +36,6 @@ public class NextStageScheduler implements IStageScheduler { private final Collection<IStage> highestPrioritizedEnabledStages = new ArrayList<IStage>(); private final IStageWorkList workList; - private final StopWatch stopWatch = new StopWatch(); - private long durationInNs; - private int iterations; - private final List<Long> durations = new LinkedList<Long>(); - public NextStageScheduler(final IPipeline pipeline, final int accessesDeviceId) throws Exception { // this.workList = new StageWorkList(accessesDeviceId, pipeline.getStages().size()); this.workList = new StageWorkArrayList(pipeline, accessesDeviceId); // faster implementation @@ -93,10 +85,6 @@ public class NextStageScheduler implements IStageScheduler { @Override public void determineNextStage(final IStage stage, final boolean executedSuccessfully) { - this.iterations++; - - this.stopWatch.start(); - // final Collection<? extends IStage> outputStages = stage.getContext().getOutputStages(); final IOutputPort<?, ?>[] outputPorts = stage.getContext().getOutputPorts(); if (outputPorts.length > 0) { @@ -120,17 +108,5 @@ public class NextStageScheduler implements IStageScheduler { if (this.workList.isEmpty()) { this.workList.pushAll(this.highestPrioritizedEnabledStages); } - - this.stopWatch.end(); - - this.durationInNs += this.stopWatch.getDurationInNs(); - if ((this.iterations % 10000) == 0) { - this.durations.add(this.durationInNs); - this.durationInNs = 0; - } - } - - public List<Long> getDurations() { - return this.durations; } } diff --git a/src/main/java/teetime/framework/concurrent/WorkerThread.java b/src/main/java/teetime/framework/concurrent/WorkerThread.java index fe1623646ffb3e60ddc5930ca25eda20610569f2..feb78b689bf9106f08b0a8673bc64d5721469f30 100644 --- a/src/main/java/teetime/framework/concurrent/WorkerThread.java +++ b/src/main/java/teetime/framework/concurrent/WorkerThread.java @@ -38,13 +38,10 @@ public class WorkerThread extends Thread { private final int accessesDeviceId; private int executedUnsuccessfullyCount; + // statistics private final StopWatch stopWatch = new StopWatch(); - private final StopWatch iterationStopWatch = new StopWatch(); - private final StopWatch beforeStageExecutionStopWatch = new StopWatch(); - private final StopWatch afterStageExecutionStopWatch = new StopWatch(); - private final StopWatch stageExecutionStopWatch = new StopWatch(); - private final List<Long> schedulingOverheadsInNs = new LinkedList<Long>(); - private long durationInNs; + private final List<Long> durationPer10000IterationsInNs = new LinkedList<Long>(); + private int iterations; public WorkerThread(final IPipeline pipeline, final int accessesDeviceId) { this.pipeline = pipeline; @@ -62,63 +59,62 @@ public class WorkerThread extends Thread { throw new IllegalStateException(e); } - long iterations = 0; - long schedulingOverheadInNs = 0; + this.iterations = 0; this.stopWatch.start(); while (this.stageScheduler.isAnyStageActive()) { - iterations++; - this.iterationStopWatch.start(); + this.iterations++; + // this.iterationStopWatch.start(); -// beforeStageExecutionStopWatch.start(); + // beforeStageExecutionStopWatch.start(); final IStage stage = this.stageScheduler.get(); -// beforeStageExecutionStopWatch.end(); + // beforeStageExecutionStopWatch.end(); this.startStageExecution(stage); - stageExecutionStopWatch.start(); // expensive: takes 1/3 of overall time + // stageExecutionStopWatch.start(); // expensive: takes 1/3 of overall time final boolean executedSuccessfully = stage.execute(); - stageExecutionStopWatch.end(); + // stageExecutionStopWatch.end(); this.finishStageExecution(stage, executedSuccessfully); -// afterStageExecutionStopWatch.start(); + // afterStageExecutionStopWatch.start(); if (this.shouldTerminate) { this.executeTerminationPolicy(stage, executedSuccessfully); } this.stageScheduler.determineNextStage(stage, executedSuccessfully); -// afterStageExecutionStopWatch.end(); + // afterStageExecutionStopWatch.end(); - this.iterationStopWatch.end(); - final long schedulingOverhead = this.iterationStopWatch.getDurationInNs() - stageExecutionStopWatch.getDurationInNs(); //3198 ms -// final long schedulingOverhead = this.iterationStopWatch.getDurationInNs(); //3656 ms -// final long schedulingOverhead = beforeStageExecutionStopWatch.getDurationInNs(); //417 ms -// final long schedulingOverhead = stageExecutionStopWatch.getDurationInNs(); //503 ms -// final long schedulingOverhead = afterStageExecutionStopWatch.getDurationInNs(); //1214 ms - schedulingOverheadInNs += schedulingOverhead; + // this.iterationStopWatch.end(); + + // all stop watches are activated + // final long schedulingOverhead = this.iterationStopWatch.getDurationInNs() - + // stageExecutionStopWatch.getDurationInNs(); //4952 + + // 6268 -> 5350 (w/o after) -> 4450 (w/o before) -> 3800 (w/o stage) +// final long schedulingOverhead = this.iterationStopWatch.getDurationInNs(); + // final long schedulingOverhead = beforeStageExecutionStopWatch.getDurationInNs(); //327 + // final long schedulingOverhead = stageExecutionStopWatch.getDurationInNs(); //1416 + // final long schedulingOverhead = afterStageExecutionStopWatch.getDurationInNs(); //2450 + // rest: ~2000 (measurement overhead?) if ((iterations % 10000) == 0) { - this.schedulingOverheadsInNs.add(schedulingOverheadInNs); - schedulingOverheadInNs = 0; + this.stopWatch.end(); + this.durationPer10000IterationsInNs.add(stopWatch.getDurationInNs()); + this.stopWatch.start(); } } this.stopWatch.end(); - this.durationInNs = this.stopWatch.getDurationInNs(); - - final List<Long> durations = ((NextStageScheduler) this.stageScheduler).getDurations(); - long overallDuration = 0; - for (int i = durations.size() / 2; i < durations.size(); i++) { - overallDuration += durations.get(i); - } - // System.out.println("Scheduler determine next stage (" + (durations.size() / 2) + "): " + TimeUnit.NANOSECONDS.toMillis(overallDuration) + " ms"); + this.durationPer10000IterationsInNs.add(stopWatch.getDurationInNs()); this.cleanUpDatastructures(); } private void executeTerminationPolicy(final IStage executedStage, final boolean executedSuccessfully) { - // System.out.println("WorkerThread.executeTerminationPolicy(): " + this.terminationPolicy + ", executedSuccessfully=" + executedSuccessfully + // System.out.println("WorkerThread.executeTerminationPolicy(): " + this.terminationPolicy + + // ", executedSuccessfully=" + executedSuccessfully // + ", mayBeDisabled=" + executedStage.mayBeDisabled()); switch (this.terminationPolicy) { @@ -172,7 +168,8 @@ public class WorkerThread extends Thread { return this.pipeline; } - // BETTER remove this method since it is not intuitive; add a check to onStartPipeline so that a stage automatically disables itself if it has no input ports + // BETTER remove this method since it is not intuitive; add a check to onStartPipeline so that a stage automatically + // disables itself if it has no input ports public void terminate(final StageTerminationPolicy terminationPolicyToUse) { for (final IStage startStage : this.pipeline.getStartStages()) { startStage.fireSignalClosingToAllInputPorts(); @@ -195,31 +192,15 @@ public class WorkerThread extends Thread { return this.executedUnsuccessfullyCount; } - public List<Long> getSchedulingOverheadsInNs() { - return this.schedulingOverheadsInNs; + public List<Long> getDurationPer10000IterationsInNs() { + return durationPer10000IterationsInNs; } /** * @since 1.10 */ - public long getDurationInNs() { - return this.durationInNs; + public int getIterations() { + return iterations; } - /** - * Uses the last half of values to compute the scheduling overall overhead in ns - * - * @since 1.10 - */ - public long computeSchedulingOverheadInNs() { - final int size = this.schedulingOverheadsInNs.size(); - - long schedulingOverheadInNs = 0; - for (int i = size / 2; i < size; i++) { - final Long iterationOverhead = this.schedulingOverheadsInNs.get(i); - schedulingOverheadInNs += iterationOverhead; - } - - return schedulingOverheadInNs; - } } diff --git a/src/main/java/teetime/stage/CollectorSink.java b/src/main/java/teetime/stage/CollectorSink.java index ccefdc7bd34acd3820ec1029d837787e5146c426..b2a135c4914402e76276fe5bfc7092144caf2f2b 100644 --- a/src/main/java/teetime/stage/CollectorSink.java +++ b/src/main/java/teetime/stage/CollectorSink.java @@ -20,7 +20,6 @@ import java.util.Collection; import teetime.framework.core.AbstractFilter; import teetime.framework.core.Context; import teetime.framework.core.IInputPort; -import teetime.util.StopWatch; /** * @author Christian Wulf @@ -50,22 +49,13 @@ public class CollectorSink<T> extends AbstractFilter<CollectorSink<T>> { } this.objects.add(object); + if ((this.objects.size() % THRESHOLD) == 0) { System.out.println("size: " + this.objects.size()); -// stopWatch.end(); -// System.out.println("duration: "+TimeUnit.NANOSECONDS.toMillis(stopWatch.getDurationInNs())+" ms"); -// stopWatch.start(); } return true; } -StopWatch stopWatch=new StopWatch(); - - @Override - public void onPipelineStarts() throws Exception { - stopWatch.start(); - super.onPipelineStarts(); - } public Collection<T> getObjects() { return this.objects; diff --git a/src/test/java/kieker/analysis/examples/throughput/ThroughputTimestampAnalysisTest.java b/src/test/java/kieker/analysis/examples/throughput/ThroughputTimestampAnalysisTest.java index fe5a6c4c469a5d1e0680a53f725f9d154338a143..e324375a3586747058b36aea81e6345da6031438 100644 --- a/src/test/java/kieker/analysis/examples/throughput/ThroughputTimestampAnalysisTest.java +++ b/src/test/java/kieker/analysis/examples/throughput/ThroughputTimestampAnalysisTest.java @@ -39,6 +39,7 @@ import teetime.util.StopWatch; public class ThroughputTimestampAnalysisTest { private static final int NUM_OBJECTS_TO_CREATE = 100000; + private static final int NUM_NOOP_FILTERS = 800; @Before public void before() { @@ -47,11 +48,13 @@ public class ThroughputTimestampAnalysisTest { @Test public void testWithManyObjects() throws IllegalStateException, AnalysisConfigurationException { + System.out.println("Testing kieker with NUM_OBJECTS_TO_CREATE=" + NUM_OBJECTS_TO_CREATE + ", NUM_NOOP_FILTERS=" + + NUM_NOOP_FILTERS + "..."); final StopWatch stopWatch = new StopWatch(); final List<TimestampObject> timestampObjects = new ArrayList<TimestampObject>(NUM_OBJECTS_TO_CREATE); final ThroughputTimestampAnalysis analysis = new ThroughputTimestampAnalysis(); - analysis.setNumNoopFilters(800); + analysis.setNumNoopFilters(NUM_NOOP_FILTERS); analysis.setTimestampObjects(timestampObjects); analysis.setInput(NUM_OBJECTS_TO_CREATE, new Callable<TimestampObject>() { @Override diff --git a/src/test/java/teetime/examples/throughput/ThroughputTimestampAnalysisTest.java b/src/test/java/teetime/examples/throughput/ThroughputTimestampAnalysisTest.java index bce03257e251593e379594ddfb0a4bd490cf2a36..a122c8ec94850bb614f9ed6ac7c8fc314341057c 100644 --- a/src/test/java/teetime/examples/throughput/ThroughputTimestampAnalysisTest.java +++ b/src/test/java/teetime/examples/throughput/ThroughputTimestampAnalysisTest.java @@ -35,6 +35,7 @@ import teetime.util.StopWatch; public class ThroughputTimestampAnalysisTest { private static final int NUM_OBJECTS_TO_CREATE = 100000; + private static final int NUM_NOOP_FILTERS = 800; @Before public void before() { @@ -43,12 +44,14 @@ public class ThroughputTimestampAnalysisTest { @Test public void testWithManyObjects() { + System.out.println("Testing teetime with NUM_OBJECTS_TO_CREATE=" + NUM_OBJECTS_TO_CREATE + ", NUM_NOOP_FILTERS=" + + NUM_NOOP_FILTERS + "..."); final StopWatch stopWatch = new StopWatch(); final List<TimestampObject> timestampObjects = new ArrayList<TimestampObject>(NUM_OBJECTS_TO_CREATE); final ThroughputTimestampAnalysis analysis = new ThroughputTimestampAnalysis(); analysis.setShouldUseQueue(true); - analysis.setNumNoopFilters(800); // 4+n + analysis.setNumNoopFilters(NUM_NOOP_FILTERS); // 4+n analysis.setTimestampObjects(timestampObjects); analysis.setInput(NUM_OBJECTS_TO_CREATE, new Callable<TimestampObject>() { @Override