From 933b878694691795f299bfcd0f50bbb9c1eead53 Mon Sep 17 00:00:00 2001 From: Christian Wulf <chw@informatik.uni-kiel.de> Date: Fri, 6 Jun 2014 15:27:25 +0200 Subject: [PATCH] added IStage.setSchedulable(); added analysis to Experiment1 --- src/main/java/experiment/Experiment1.java | 75 ++++++++++++++++++- .../ConcurrentCountWordsAnalysis.java | 4 +- .../countWords/QueuedCountWordsAnalysis.java | 6 +- .../recordReader/RecordReaderAnalysis.java | 9 ++- .../throughput/ThroughputAnalysis.java | 4 +- .../ThroughputTimestampAnalysis.java | 28 ++++--- .../TraceReconstructionAnalysis.java | 7 +- .../TraceReconstructionAnalysis2.java | 13 ++-- .../concurrent/StageWorkArrayList.java | 4 +- .../framework/concurrent/WorkerThread.java | 20 +---- .../framework/core/AbstractFilter.java | 24 ++++-- .../framework/core/CompositeFilter.java | 12 +++ .../java/teetime/framework/core/IStage.java | 17 +++-- .../stage/kieker/File2RecordFilter.java | 24 +++--- ...hodCallThoughputTimestampAnalysisTest.java | 11 ++- .../ThroughputTimestampAnalysisTest.java | 2 +- 16 files changed, 179 insertions(+), 81 deletions(-) diff --git a/src/main/java/experiment/Experiment1.java b/src/main/java/experiment/Experiment1.java index bcd2fb55..e663f583 100644 --- a/src/main/java/experiment/Experiment1.java +++ b/src/main/java/experiment/Experiment1.java @@ -29,6 +29,7 @@ import teetime.framework.concurrent.WorkerThread; import teetime.framework.core.Analysis; import teetime.framework.core.IStage; import teetime.framework.core.Pipeline; +import teetime.framework.sequential.MethodCallPipe; import teetime.framework.sequential.QueuePipe; import teetime.stage.NoopFilter; import teetime.util.StatisticsUtil; @@ -41,7 +42,7 @@ import kieker.common.configuration.Configuration; /** * @author Nils Christian Ehmke - * + * * @since 1.10 */ public class Experiment1 { @@ -55,7 +56,7 @@ public class Experiment1 { private static final int NUMBER_OF_MAXIMAL_FILTERS = 1000; private static final int NUMBER_OF_FILTERS_PER_STEP = 50; - private static final IAnalysis[] analyses = { new TeeTimeAnalysis(), new KiekerAnalysis() }; + private static final IAnalysis[] analyses = { new TeeTimeMethodCallAnalysis(), new TeeTimeAnalysis(), new KiekerAnalysis() }; private static final List<Long> measuredTimes = new ArrayList<Long>(); @@ -122,6 +123,74 @@ public class Experiment1 { } + private static final class TeeTimeMethodCallAnalysis extends Analysis implements IAnalysis { + + private static final int SECONDS = 1000; + + private Pipeline pipeline; + private WorkerThread workerThread; + + public TeeTimeMethodCallAnalysis() {} + + @Override + public void initialize(final int numberOfFilters, final int numberOfObjectsToSend) { + + @SuppressWarnings("unchecked") + final NoopFilter<Object>[] noopFilters = new NoopFilter[numberOfFilters]; + // create stages + final teetime.stage.basic.ObjectProducer<Object> objectProducer = new teetime.stage.basic.ObjectProducer<Object>( + numberOfObjectsToSend, new Callable<Object>() { + @Override + public Object call() throws Exception { + return new Object(); + } + }); + for (int i = 0; i < noopFilters.length; i++) { + noopFilters[i] = new NoopFilter<Object>(); + noopFilters[i].setSchedulable(false); + } + + // add each stage to a stage list + final List<IStage> startStages = new LinkedList<IStage>(); + startStages.add(objectProducer); + + final List<IStage> stages = new LinkedList<IStage>(); + stages.add(objectProducer); + stages.addAll(Arrays.asList(noopFilters)); + + // connect stages by pipes + MethodCallPipe.connect(objectProducer.outputPort, noopFilters[0].inputPort); + for (int i = 1; i < noopFilters.length; i++) { + MethodCallPipe.connect(noopFilters[i - 1].outputPort, noopFilters[i].inputPort); + } + + this.pipeline = new Pipeline(); + this.pipeline.setStartStages(startStages); + this.pipeline.setStages(stages); + + this.workerThread = new WorkerThread(this.pipeline, 0); + this.workerThread.setTerminationPolicy(StageTerminationPolicy.TERMINATE_STAGE_AFTER_UNSUCCESSFUL_EXECUTION); + } + + @Override + public String getName() { + return "TeeTimeMethodCall"; + } + + @Override + public void execute() { + super.start(); + + this.workerThread.start(); + try { + this.workerThread.join(60 * SECONDS); + } catch (final InterruptedException e) { + e.printStackTrace(); + } + } + + } + private static final class TeeTimeAnalysis extends Analysis implements IAnalysis { private static final int SECONDS = 1000; @@ -167,7 +236,7 @@ public class Experiment1 { this.pipeline.setStages(stages); this.workerThread = new WorkerThread(this.pipeline, 0); - this.workerThread.terminate(StageTerminationPolicy.TERMINATE_STAGE_AFTER_UNSUCCESSFUL_EXECUTION); + this.workerThread.setTerminationPolicy(StageTerminationPolicy.TERMINATE_STAGE_AFTER_UNSUCCESSFUL_EXECUTION); } @Override diff --git a/src/main/java/teetime/examples/countWords/ConcurrentCountWordsAnalysis.java b/src/main/java/teetime/examples/countWords/ConcurrentCountWordsAnalysis.java index 70243758..26d7ecc9 100644 --- a/src/main/java/teetime/examples/countWords/ConcurrentCountWordsAnalysis.java +++ b/src/main/java/teetime/examples/countWords/ConcurrentCountWordsAnalysis.java @@ -43,7 +43,7 @@ import teetime.util.Pair; /** * @author Christian Wulf - * + * * @since 1.10 */ public class ConcurrentCountWordsAnalysis extends Analysis { @@ -70,7 +70,7 @@ public class ConcurrentCountWordsAnalysis extends Analysis { final Distributor<File> distributor = (Distributor<File>) readerThreadPipeline.getStages().get(readerThreadPipeline.getStages().size() - 1); this.ioThreads[0] = new WorkerThread(readerThreadPipeline, 1); this.ioThreads[0].setName("startThread"); - this.ioThreads[0].terminate(StageTerminationPolicy.TERMINATE_STAGE_AFTER_UNSUCCESSFUL_EXECUTION); + this.ioThreads[0].setTerminationPolicy(StageTerminationPolicy.TERMINATE_STAGE_AFTER_UNSUCCESSFUL_EXECUTION); final IPipeline printingThreadPipeline = this.printingThreadPipeline(); @SuppressWarnings("unchecked") diff --git a/src/main/java/teetime/examples/countWords/QueuedCountWordsAnalysis.java b/src/main/java/teetime/examples/countWords/QueuedCountWordsAnalysis.java index afb82bbc..e686457e 100644 --- a/src/main/java/teetime/examples/countWords/QueuedCountWordsAnalysis.java +++ b/src/main/java/teetime/examples/countWords/QueuedCountWordsAnalysis.java @@ -37,7 +37,7 @@ import teetime.util.Pair; /** * @author Christian Wulf - * + * * @since 1.10 */ public class QueuedCountWordsAnalysis extends Analysis { @@ -61,7 +61,7 @@ public class QueuedCountWordsAnalysis extends Analysis { public void start() { super.start(); - this.workerThread.terminate(StageTerminationPolicy.TERMINATE_STAGE_AFTER_UNSUCCESSFUL_EXECUTION); + this.workerThread.setTerminationPolicy(StageTerminationPolicy.TERMINATE_STAGE_AFTER_UNSUCCESSFUL_EXECUTION); this.workerThread.start(); try { @@ -130,7 +130,7 @@ public class QueuedCountWordsAnalysis extends Analysis { for (final IStage stage : pipeline.getStages()) { if (stage instanceof AbstractFilter<?>) { -// System.out.println(stage.getClass().getName() + ": " + ((AbstractFilter<?>) stage).getOverallDurationInNs()); // NOPMD (Just for example purposes) + // System.out.println(stage.getClass().getName() + ": " + ((AbstractFilter<?>) stage).getOverallDurationInNs()); // NOPMD (Just for example purposes) } } diff --git a/src/main/java/teetime/examples/recordReader/RecordReaderAnalysis.java b/src/main/java/teetime/examples/recordReader/RecordReaderAnalysis.java index 532aa1fe..360629b6 100644 --- a/src/main/java/teetime/examples/recordReader/RecordReaderAnalysis.java +++ b/src/main/java/teetime/examples/recordReader/RecordReaderAnalysis.java @@ -19,7 +19,6 @@ import java.io.File; import java.util.LinkedList; import java.util.List; -import kieker.common.record.IMonitoringRecord; import teetime.framework.concurrent.StageTerminationPolicy; import teetime.framework.concurrent.WorkerThread; import teetime.framework.core.AbstractFilter; @@ -38,9 +37,11 @@ import teetime.stage.CollectorSink; import teetime.stage.kieker.File2RecordFilter; import teetime.stage.kieker.className.ClassNameRegistryRepository; +import kieker.common.record.IMonitoringRecord; + /** * @author Christian Wulf - * + * * @since 1.10 */ public class RecordReaderAnalysis extends Analysis { @@ -65,7 +66,7 @@ public class RecordReaderAnalysis extends Analysis { public void start() { super.start(); - this.workerThread.terminate(StageTerminationPolicy.TERMINATE_STAGE_AFTER_UNSUCCESSFUL_EXECUTION); + this.workerThread.setTerminationPolicy(StageTerminationPolicy.TERMINATE_STAGE_AFTER_UNSUCCESSFUL_EXECUTION); this.workerThread.start(); try { @@ -152,7 +153,7 @@ public class RecordReaderAnalysis extends Analysis { for (final IStage stage : pipeline.getStages()) { if (stage instanceof AbstractFilter<?>) { -// System.out.println(stage.getClass().getName() + ": " + ((AbstractFilter<?>) stage).getOverallDurationInNs()); // NOPMD (Just for example purposes) + // System.out.println(stage.getClass().getName() + ": " + ((AbstractFilter<?>) stage).getOverallDurationInNs()); // NOPMD (Just for example purposes) } } } diff --git a/src/main/java/teetime/examples/throughput/ThroughputAnalysis.java b/src/main/java/teetime/examples/throughput/ThroughputAnalysis.java index 98984ffd..65dfa7fd 100644 --- a/src/main/java/teetime/examples/throughput/ThroughputAnalysis.java +++ b/src/main/java/teetime/examples/throughput/ThroughputAnalysis.java @@ -32,7 +32,7 @@ import teetime.stage.basic.ObjectProducer; /** * @author Christian Wulf - * + * * @since 1.10 */ public class ThroughputAnalysis<T> extends Analysis { @@ -53,7 +53,7 @@ public class ThroughputAnalysis<T> extends Analysis { final IPipeline pipeline = this.buildPipeline(this.numNoopFilters); this.workerThread = new WorkerThread(pipeline, 0); - this.workerThread.terminate(StageTerminationPolicy.TERMINATE_STAGE_AFTER_UNSUCCESSFUL_EXECUTION); + this.workerThread.setTerminationPolicy(StageTerminationPolicy.TERMINATE_STAGE_AFTER_UNSUCCESSFUL_EXECUTION); } /** diff --git a/src/main/java/teetime/examples/throughput/ThroughputTimestampAnalysis.java b/src/main/java/teetime/examples/throughput/ThroughputTimestampAnalysis.java index 547a02f4..e0838d6d 100644 --- a/src/main/java/teetime/examples/throughput/ThroughputTimestampAnalysis.java +++ b/src/main/java/teetime/examples/throughput/ThroughputTimestampAnalysis.java @@ -38,7 +38,7 @@ import teetime.stage.basic.ObjectProducer; /** * @author Christian Wulf - * + * * @since 1.10 */ public class ThroughputTimestampAnalysis extends Analysis { @@ -63,7 +63,7 @@ public class ThroughputTimestampAnalysis extends Analysis { final IPipeline pipeline = this.buildPipeline(this.numNoopFilters); this.workerThread = new WorkerThread(pipeline, 0); - this.workerThread.terminate(StageTerminationPolicy.TERMINATE_STAGE_AFTER_UNSUCCESSFUL_EXECUTION); + this.workerThread.setTerminationPolicy(StageTerminationPolicy.TERMINATE_STAGE_AFTER_UNSUCCESSFUL_EXECUTION); } /** @@ -89,12 +89,12 @@ public class ThroughputTimestampAnalysis extends Analysis { final List<IStage> stages = new LinkedList<IStage>(); stages.add(objectProducer); - if (this.shouldUseQueue) { - stages.add(startTimestampFilter); - stages.addAll(Arrays.asList(noopFilters)); - stages.add(stopTimestampFilter); - stages.add(collectorSink); + stages.add(startTimestampFilter); + stages.addAll(Arrays.asList(noopFilters)); + stages.add(stopTimestampFilter); + stages.add(collectorSink); + if (this.shouldUseQueue) { // connect stages by pipes QueuePipe.connect(objectProducer.outputPort, startTimestampFilter.inputPort); QueuePipe.connect(startTimestampFilter.outputPort, noopFilters[0].inputPort); @@ -104,6 +104,12 @@ public class ThroughputTimestampAnalysis extends Analysis { QueuePipe.connect(noopFilters[noopFilters.length - 1].outputPort, stopTimestampFilter.inputPort); QueuePipe.connect(stopTimestampFilter.outputPort, collectorSink.objectInputPort); } else { + startTimestampFilter.setSchedulable(false); + for (NoopFilter<TimestampObject> noopFilter : noopFilters) { + noopFilter.setSchedulable(false); + } + stopTimestampFilter.setSchedulable(false); + collectorSink.setSchedulable(false); // connect stages by pipes MethodCallPipe.connect(objectProducer.outputPort, startTimestampFilter.inputPort); MethodCallPipe.connect(startTimestampFilter.outputPort, noopFilters[0].inputPort); @@ -131,7 +137,7 @@ public class ThroughputTimestampAnalysis extends Analysis { e.printStackTrace(); } - List<Long> durationPer10000IterationsInNs = workerThread.getDurationPer10000IterationsInNs(); + List<Long> durationPer10000IterationsInNs = this.workerThread.getDurationPer10000IterationsInNs(); long overallSumInNs = 0; for (int i = 0; i < durationPer10000IterationsInNs.size(); i++) { @@ -143,11 +149,11 @@ public class ThroughputTimestampAnalysis extends Analysis { sumInNs += durationPer10000IterationsInNs.get(i); } - System.out.println("Thread iterations: " + workerThread.getIterations() + " times"); + System.out.println("Thread iterations: " + this.workerThread.getIterations() + " times"); System.out.println("Thread execution time: " + TimeUnit.NANOSECONDS.toMillis(overallSumInNs) + " ms"); - System.out.println("Thread half duration/iterations: " + sumInNs / (workerThread.getIterations() / 2) + System.out.println("Thread half duration/iterations: " + sumInNs / (this.workerThread.getIterations() / 2) + " ns/iteration"); - System.out.println("Thread unsuccessfully executed stages: " + workerThread.getExecutedUnsuccessfullyCount() + System.out.println("Thread unsuccessfully executed stages: " + this.workerThread.getExecutedUnsuccessfullyCount() + " times"); } diff --git a/src/main/java/teetime/examples/traceReconstruction/TraceReconstructionAnalysis.java b/src/main/java/teetime/examples/traceReconstruction/TraceReconstructionAnalysis.java index 73381658..94ed60cf 100644 --- a/src/main/java/teetime/examples/traceReconstruction/TraceReconstructionAnalysis.java +++ b/src/main/java/teetime/examples/traceReconstruction/TraceReconstructionAnalysis.java @@ -67,21 +67,24 @@ public class TraceReconstructionAnalysis extends Analysis { final List<IStage> stages = new LinkedList<IStage>(); final IPipeline pipeline = new IPipeline() { - @SuppressWarnings("unchecked") + @Override public List<? extends IStage> getStartStages() { return startStages; } + @Override public List<IStage> getStages() { return stages; } + @Override public void fireStartNotification() throws Exception { for (final IStage stage : this.getStartStages()) { stage.notifyPipelineStarts(); } } + @Override public void fireStopNotification() { for (final IStage stage : this.getStartStages()) { stage.notifyPipelineStops(); @@ -96,7 +99,7 @@ public class TraceReconstructionAnalysis extends Analysis { public void start() { super.start(); - this.workerThread.terminate(StageTerminationPolicy.TERMINATE_STAGE_AFTER_UNSUCCESSFUL_EXECUTION); + this.workerThread.setTerminationPolicy(StageTerminationPolicy.TERMINATE_STAGE_AFTER_UNSUCCESSFUL_EXECUTION); this.workerThread.start(); try { diff --git a/src/main/java/teetime/examples/traceReconstruction/TraceReconstructionAnalysis2.java b/src/main/java/teetime/examples/traceReconstruction/TraceReconstructionAnalysis2.java index a950bacd..7f2d41db 100644 --- a/src/main/java/teetime/examples/traceReconstruction/TraceReconstructionAnalysis2.java +++ b/src/main/java/teetime/examples/traceReconstruction/TraceReconstructionAnalysis2.java @@ -18,10 +18,6 @@ package teetime.examples.traceReconstruction; import java.util.LinkedList; import java.util.List; -import kieker.analysis.plugin.filter.flow.TraceEventRecords; -import kieker.common.record.IMonitoringRecord; -import kieker.common.record.controlflow.OperationExecutionRecord; -import kieker.common.record.flow.IFlowRecord; import teetime.framework.concurrent.StageTerminationPolicy; import teetime.framework.concurrent.WorkerThread; import teetime.framework.core.Analysis; @@ -46,9 +42,14 @@ import teetime.stage.stringBuffer.handler.IMonitoringRecordHandler; import teetime.stage.stringBuffer.handler.StringHandler; import teetime.stage.util.TextLine; +import kieker.analysis.plugin.filter.flow.TraceEventRecords; +import kieker.common.record.IMonitoringRecord; +import kieker.common.record.controlflow.OperationExecutionRecord; +import kieker.common.record.flow.IFlowRecord; + /** * @author Christian Wulf - * + * * @since 1.10 */ public class TraceReconstructionAnalysis2 extends Analysis { @@ -150,7 +151,7 @@ public class TraceReconstructionAnalysis2 extends Analysis { public void start() { super.start(); - this.workerThread.terminate(StageTerminationPolicy.TERMINATE_STAGE_AFTER_UNSUCCESSFUL_EXECUTION); + this.workerThread.setTerminationPolicy(StageTerminationPolicy.TERMINATE_STAGE_AFTER_UNSUCCESSFUL_EXECUTION); this.workerThread.start(); try { diff --git a/src/main/java/teetime/framework/concurrent/StageWorkArrayList.java b/src/main/java/teetime/framework/concurrent/StageWorkArrayList.java index 6f9f2c3f..a9df41eb 100644 --- a/src/main/java/teetime/framework/concurrent/StageWorkArrayList.java +++ b/src/main/java/teetime/framework/concurrent/StageWorkArrayList.java @@ -23,7 +23,7 @@ import teetime.framework.core.IStage; /** * @author Christian Wulf - * + * * @since 1.10 */ public class StageWorkArrayList implements IStageWorkList { @@ -75,7 +75,7 @@ public class StageWorkArrayList implements IStageWorkList { } private void push(final IStage stage) { - if (this.isValid(stage)) { + if (stage.isSchedulable() && this.isValid(stage)) { this.firstIndex = Math.min(stage.getSchedulingIndex(), this.firstIndex); this.lastIndex = Math.max(stage.getSchedulingIndex(), this.lastIndex); this.stages[stage.getSchedulingIndex()].numToBeExecuted++; diff --git a/src/main/java/teetime/framework/concurrent/WorkerThread.java b/src/main/java/teetime/framework/concurrent/WorkerThread.java index d1dff2d1..a0f7eaf9 100644 --- a/src/main/java/teetime/framework/concurrent/WorkerThread.java +++ b/src/main/java/teetime/framework/concurrent/WorkerThread.java @@ -34,7 +34,7 @@ import teetime.util.StopWatch; /** * @author Christian Wulf - * + * * @since 1.10 */ public class WorkerThread extends Thread { @@ -182,8 +182,8 @@ public class WorkerThread extends Thread { } private void executeTerminationPolicy(final IStage executedStage, final boolean executedSuccessfully) { -// System.out.println("executeTerminationPolicy executedStage=" + executedStage + ", executedSuccessfully=" + executedSuccessfully); -// System.out.println("executeTerminationPolicy areAllInputPortsClosed(executedStage)=" + this.stageStateManager.areAllInputPortsClosed(executedStage)); + // System.out.println("executeTerminationPolicy executedStage=" + executedStage + ", executedSuccessfully=" + executedSuccessfully); + // System.out.println("executeTerminationPolicy areAllInputPortsClosed(executedStage)=" + this.stageStateManager.areAllInputPortsClosed(executedStage)); switch (this.terminationPolicy) { case TERMINATE_STAGE_AFTER_NEXT_EXECUTION: @@ -243,21 +243,9 @@ public class WorkerThread extends Thread { return this.pipeline; } - // BETTER remove this method since it is not intuitive; add a check to onStartPipeline so that a stage automatically - // disables itself if it has no input ports - public void terminate(final StageTerminationPolicy terminationPolicyToUse) { -// for (final IStage startStage : this.pipeline.getStartStages()) { -// if (this.stageStateManager.areAllInputPortsClosed(startStage)) { -// startStage.fireSignalClosingToAllInputPorts(); -// } -// } - - this.setTerminationPolicy(terminationPolicyToUse); - } - /** * If not set, this thread will run infinitely. - * + * * @param terminationPolicyToUse */ public void setTerminationPolicy(final StageTerminationPolicy terminationPolicyToUse) { diff --git a/src/main/java/teetime/framework/core/AbstractFilter.java b/src/main/java/teetime/framework/core/AbstractFilter.java index 6222eabb..50dd6e9d 100644 --- a/src/main/java/teetime/framework/core/AbstractFilter.java +++ b/src/main/java/teetime/framework/core/AbstractFilter.java @@ -25,20 +25,20 @@ import java.util.List; import teetime.util.concurrent.workstealing.exception.DequePopException; /** - * + * * @author Christian Wulf - * + * * @since 1.10 - * + * * @param <S> * the extending stage - * + * */ public abstract class AbstractFilter<S extends IStage> extends AbstractStage implements ISink<S>, ISource { /** * @author Christian Wulf - * + * * @since 1.10 */ public enum StageState { @@ -53,6 +53,7 @@ public abstract class AbstractFilter<S extends IStage> extends AbstractStage imp private int depth = IStage.DEPTH_NOT_SET; private int schedulingIndex; + private boolean schedulable = true; private final List<IInputPort<S, ?>> inputPorts = new ArrayList<IInputPort<S, ?>>(); private final List<IInputPort<S, ?>> readOnlyInputPorts = Collections.unmodifiableList(this.inputPorts); @@ -131,7 +132,7 @@ public abstract class AbstractFilter<S extends IStage> extends AbstractStage imp /** * This method is called exactly once iff the pipeline is started. - * + * * @throws Exception * @since 1.10 */ @@ -162,7 +163,7 @@ public abstract class AbstractFilter<S extends IStage> extends AbstractStage imp /** * This method is called exactly once iff the pipeline is stopped. - * + * * @since 1.10 */ public void onPipelineStops() { @@ -271,4 +272,13 @@ public abstract class AbstractFilter<S extends IStage> extends AbstractStage imp public void setSchedulingIndex(final int schedulingIndex) { this.schedulingIndex = schedulingIndex; } + + @Override + public boolean isSchedulable() { + return this.schedulable; + } + + public void setSchedulable(final boolean schedulable) { + this.schedulable = schedulable; + } } diff --git a/src/main/java/teetime/framework/core/CompositeFilter.java b/src/main/java/teetime/framework/core/CompositeFilter.java index 353913b0..113befe4 100644 --- a/src/main/java/teetime/framework/core/CompositeFilter.java +++ b/src/main/java/teetime/framework/core/CompositeFilter.java @@ -16,7 +16,9 @@ package teetime.framework.core; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; /** * @author Christian Wulf @@ -27,7 +29,17 @@ public abstract class CompositeFilter implements IBaseStage { protected final List<IBaseStage> schedulableStages = new ArrayList<IBaseStage>(); + private final Map<IOutputPort<?, ?>, IInputPort<?, ?>> connections = new HashMap<IOutputPort<?, ?>, IInputPort<?, ?>>(); + public List<IBaseStage> getSchedulableStages() { return this.schedulableStages; } + + protected <T, S1 extends ISink<S1>, S0 extends ISource> void connectWithPipe(final IOutputPort<S0, T> sourcePort, final IInputPort<S1, T> targetPort) { + this.connections.put(sourcePort, targetPort); + } + + public Map<IOutputPort<?, ?>, IInputPort<?, ?>> getConnections() { + return this.connections; + } } diff --git a/src/main/java/teetime/framework/core/IStage.java b/src/main/java/teetime/framework/core/IStage.java index bda94d98..5d7aba44 100644 --- a/src/main/java/teetime/framework/core/IStage.java +++ b/src/main/java/teetime/framework/core/IStage.java @@ -21,7 +21,7 @@ import java.util.List; /** * @author Christian Wulf - * + * * @since 1.10 */ public interface IStage extends IBaseStage { @@ -36,11 +36,11 @@ public interface IStage extends IBaseStage { /** * @return <code>true</code> if the execution took enough tokens from the input ports so that the stage made progress due to this execution, <code>false</code> * otherwise. The definition of <i>progress</i> depends on the semantics of the particular stage. - * + * * <p> * Example usage: * </p> - * + * * <pre> * <code> * boolean execute() { @@ -53,7 +53,7 @@ public interface IStage extends IBaseStage { * } * </code> * </pre> - * + * * @since 1.10 */ boolean execute(); @@ -86,7 +86,7 @@ public interface IStage extends IBaseStage { /** * <i>Hint: Used by the scheduler</i> - * + * * @since 1.10 */ Context<?> getContext(); @@ -113,7 +113,7 @@ public interface IStage extends IBaseStage { /** * <i>Hint: Only needed by stage schedulers.</i> - * + * * @return */ public Collection<? extends IStage> getAllOutputStages(); @@ -157,4 +157,9 @@ public interface IStage extends IBaseStage { */ public void setSchedulingIndex(int schedulingIndex); + /** + * @since 1.10 + */ + public boolean isSchedulable(); + } diff --git a/src/main/java/teetime/stage/kieker/File2RecordFilter.java b/src/main/java/teetime/stage/kieker/File2RecordFilter.java index 0fc986d8..09e9a733 100644 --- a/src/main/java/teetime/stage/kieker/File2RecordFilter.java +++ b/src/main/java/teetime/stage/kieker/File2RecordFilter.java @@ -17,15 +17,11 @@ package teetime.stage.kieker; import java.io.File; -import kieker.common.record.IMonitoringRecord; -import kieker.common.util.filesystem.BinaryCompressionMethod; -import kieker.common.util.filesystem.FSUtil; import teetime.framework.concurrent.ConcurrentWorkStealingPipe; import teetime.framework.concurrent.ConcurrentWorkStealingPipeFactory; import teetime.framework.core.CompositeFilter; import teetime.framework.core.IInputPort; import teetime.framework.core.IOutputPort; -import teetime.framework.sequential.MethodCallPipe; import teetime.stage.FileExtensionFilter; import teetime.stage.basic.merger.Merger; import teetime.stage.kieker.className.ClassNameRegistryCreationFilter; @@ -36,9 +32,13 @@ import teetime.stage.kieker.fileToRecord.ZipFile2RecordFilter; import teetime.stage.predicate.IsDirectoryPredicate; import teetime.stage.predicate.PredicateFilter; +import kieker.common.record.IMonitoringRecord; +import kieker.common.util.filesystem.BinaryCompressionMethod; +import kieker.common.util.filesystem.FSUtil; + /** * @author Christian Wulf - * + * * @since 1.10 */ public class File2RecordFilter extends CompositeFilter { @@ -55,7 +55,7 @@ public class File2RecordFilter extends CompositeFilter { public File2RecordFilter(final ClassNameRegistryRepository classNameRegistryRepository) { this.classNameRegistryRepository = classNameRegistryRepository; - // FIXME does not yet work with more than one thread due to classNameRegistryRepository + // FIXME does not yet work with more than one thread due to classNameRegistryRepository (reason not comprehensible) // create stages final PredicateFilter<File> isDirectoryFilter = new PredicateFilter<File>(new IsDirectoryPredicate()); final ClassNameRegistryCreationFilter classNameRegistryCreationFilter = new ClassNameRegistryCreationFilter(this.classNameRegistryRepository); @@ -76,12 +76,12 @@ public class File2RecordFilter extends CompositeFilter { final IOutputPort<FileExtensionFilter, File> zipFileOutputPort = fileExtensionFilter.createOutputPortForFileExtension(FSUtil.ZIP_FILE_EXTENSION); // connect ports by pipes - MethodCallPipe.connect(isDirectoryFilter.matchingOutputPort, classNameRegistryCreationFilter.directoryInputPort); - MethodCallPipe.connect(isDirectoryFilter.mismatchingOutputPort, fileMerger.getNewInputPort()); // BETTER restructure pipeline - MethodCallPipe.connect(classNameRegistryCreationFilter.relayDirectoryOutputPort, directory2FilesFilter.directoryInputPort); - MethodCallPipe.connect(classNameRegistryCreationFilter.filePrefixOutputPort, directory2FilesFilter.filePrefixInputPort); - MethodCallPipe.connect(directory2FilesFilter.fileOutputPort, fileExtensionFilter.fileInputPort); - MethodCallPipe.connect(zipFileOutputPort, fileMerger.getNewInputPort()); + this.connectWithPipe(isDirectoryFilter.matchingOutputPort, classNameRegistryCreationFilter.directoryInputPort); + this.connectWithPipe(isDirectoryFilter.mismatchingOutputPort, fileMerger.getNewInputPort()); // BETTER restructure pipeline + this.connectWithPipe(classNameRegistryCreationFilter.relayDirectoryOutputPort, directory2FilesFilter.directoryInputPort); + this.connectWithPipe(classNameRegistryCreationFilter.filePrefixOutputPort, directory2FilesFilter.filePrefixInputPort); + this.connectWithPipe(directory2FilesFilter.fileOutputPort, fileExtensionFilter.fileInputPort); + this.connectWithPipe(zipFileOutputPort, fileMerger.getNewInputPort()); final ConcurrentWorkStealingPipeFactory<File> concurrentWorkStealingPipeFactory0 = new ConcurrentWorkStealingPipeFactory<File>(); final ConcurrentWorkStealingPipe<File> concurrentWorkStealingPipe0 = concurrentWorkStealingPipeFactory0.create(); diff --git a/src/test/java/teetime/examples/throughput/MethodCallThoughputTimestampAnalysisTest.java b/src/test/java/teetime/examples/throughput/MethodCallThoughputTimestampAnalysisTest.java index f6d10d99..f4a3e6e1 100644 --- a/src/test/java/teetime/examples/throughput/MethodCallThoughputTimestampAnalysisTest.java +++ b/src/test/java/teetime/examples/throughput/MethodCallThoughputTimestampAnalysisTest.java @@ -19,8 +19,6 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; -import kieker.common.logging.LogFactory; - import org.junit.Before; import org.junit.Test; @@ -28,14 +26,17 @@ import teetime.examples.throughput.methodcall.MethodCallThroughputAnalysis; import teetime.util.StatisticsUtil; import teetime.util.StopWatch; +import kieker.common.logging.LogFactory; + /** * @author Christian Wulf - * + * * @since 1.10 */ public class MethodCallThoughputTimestampAnalysisTest { private static final int NUM_OBJECTS_TO_CREATE = 100000; + private static final int NUM_NOOP_FILTERS = 800; @Before public void before() { @@ -47,11 +48,13 @@ public class MethodCallThoughputTimestampAnalysisTest { @Test public void testWithManyObjects() { + System.out.println("Testing teetime (mc) with NUM_OBJECTS_TO_CREATE=" + NUM_OBJECTS_TO_CREATE + ", NUM_NOOP_FILTERS=" + + NUM_NOOP_FILTERS + "..."); final StopWatch stopWatch = new StopWatch(); final List<TimestampObject> timestampObjects = new ArrayList<TimestampObject>(NUM_OBJECTS_TO_CREATE); final MethodCallThroughputAnalysis analysis = new MethodCallThroughputAnalysis(); - analysis.setNumNoopFilters(800); + analysis.setNumNoopFilters(NUM_NOOP_FILTERS); analysis.setTimestampObjects(timestampObjects); analysis.setInput(NUM_OBJECTS_TO_CREATE, new Callable<TimestampObject>() { @Override diff --git a/src/test/java/teetime/examples/throughput/ThroughputTimestampAnalysisTest.java b/src/test/java/teetime/examples/throughput/ThroughputTimestampAnalysisTest.java index be0e8e86..51cd195f 100644 --- a/src/test/java/teetime/examples/throughput/ThroughputTimestampAnalysisTest.java +++ b/src/test/java/teetime/examples/throughput/ThroughputTimestampAnalysisTest.java @@ -29,7 +29,7 @@ import kieker.common.logging.LogFactory; /** * @author Christian Wulf - * + * * @since 1.10 */ public class ThroughputTimestampAnalysisTest { -- GitLab