diff --git a/.settings/edu.umd.cs.findbugs.core.prefs b/.settings/edu.umd.cs.findbugs.core.prefs index ea3e4a51534a26c1a1cb963891c1838d4fcd02ed..6428bfcdf4bd941fab0b1cff6b8148f19fc75747 100644 --- a/.settings/edu.umd.cs.findbugs.core.prefs +++ b/.settings/edu.umd.cs.findbugs.core.prefs @@ -1,5 +1,5 @@ #FindBugs User Preferences -#Tue Nov 04 15:49:25 CET 2014 +#Tue Nov 18 10:57:28 CET 2014 detector_threshold=3 effort=max excludefilter0=.fbExcludeFilterFile|true diff --git a/src/main/java/teetime/framework/AbstractStage.java b/src/main/java/teetime/framework/AbstractStage.java index d6a4a4845fe4388f33ac398ffba3655d883b7116..5b8a5d846c85db96df37c7bcbda1dc0f8a040823 100644 --- a/src/main/java/teetime/framework/AbstractStage.java +++ b/src/main/java/teetime/framework/AbstractStage.java @@ -33,6 +33,7 @@ public abstract class AbstractStage implements Stage { protected OutputPort<?>[] cachedOutputPorts; private final Map<ISignal, Void> visited = new HashMap<ISignal, Void>(); + private boolean shouldTerminate; public AbstractStage() { this.id = UUID.randomUUID().toString(); // the id should only be represented by a UUID, not additionally by the class name @@ -124,8 +125,8 @@ public abstract class AbstractStage implements Stage { this.connectUnconnectedOutputPorts(); } - public void onTerminating() throws Exception { // NOPMD - // empty default implementation + public void onTerminating() throws Exception { + terminate(); } protected <T> InputPort<T> createInputPort() { @@ -162,4 +163,19 @@ public abstract class AbstractStage implements Stage { return this.getClass().getName() + ": " + this.id; } + @Override + public void terminate() { + this.shouldTerminate = true; + } + + @Override + public boolean shouldBeTerminated() { + return this.shouldTerminate; + } + + @Override + public TerminationStrategy getTerminationStrategy() { + return TerminationStrategy.BY_SIGNAL; + } + } diff --git a/src/main/java/teetime/framework/Analysis.java b/src/main/java/teetime/framework/Analysis.java index c9fd8e112c7a8cfb9e2be4f2df0de17ac660054b..40aad3696585c5ba634b637563518c5c47a7cf2f 100644 --- a/src/main/java/teetime/framework/Analysis.java +++ b/src/main/java/teetime/framework/Analysis.java @@ -28,17 +28,17 @@ public class Analysis implements UncaughtExceptionHandler { } public void init() { - for (HeadStage stage : this.configuration.getConsumerStages()) { + for (Stage stage : this.configuration.getConsumerStages()) { Thread thread = new Thread(new RunnableStage(stage)); this.consumerThreads.add(thread); } - for (HeadStage stage : this.configuration.getFiniteProducerStages()) { + for (Stage stage : this.configuration.getFiniteProducerStages()) { Thread thread = new Thread(new RunnableStage(stage)); this.finiteProducerThreads.add(thread); } - for (HeadStage stage : this.configuration.getInfiniteProducerStages()) { + for (Stage stage : this.configuration.getInfiniteProducerStages()) { Thread thread = new Thread(new RunnableStage(stage)); this.infiniteProducerThreads.add(thread); } diff --git a/src/main/java/teetime/framework/AnalysisConfiguration.java b/src/main/java/teetime/framework/AnalysisConfiguration.java index c051594442e7982ccad776bf71597a6aaeb95547..434410124a93abf1535b1f12f560a08acdf3fd1e 100644 --- a/src/main/java/teetime/framework/AnalysisConfiguration.java +++ b/src/main/java/teetime/framework/AnalysisConfiguration.java @@ -1,7 +1,9 @@ package teetime.framework; +import java.util.HashSet; import java.util.LinkedList; import java.util.List; +import java.util.Set; import teetime.framework.pipe.PipeFactoryRegistry; @@ -9,20 +11,51 @@ public class AnalysisConfiguration { protected static final PipeFactoryRegistry PIPE_FACTORY_REGISTRY = PipeFactoryRegistry.INSTANCE; - private final List<HeadStage> consumerStages = new LinkedList<HeadStage>(); - private final List<HeadStage> finiteProducerStages = new LinkedList<HeadStage>(); - private final List<HeadStage> infiniteProducerStages = new LinkedList<HeadStage>(); + private final List<Runnable> threadableStageJobs = new LinkedList<Runnable>(); - public List<HeadStage> getConsumerStages() { + private final Set<Stage> consumerStages = new HashSet<Stage>(); + private final Set<Stage> finiteProducerStages = new HashSet<Stage>(); + private final Set<Stage> infiniteProducerStages = new HashSet<Stage>(); + + public Set<Stage> getConsumerStages() { return this.consumerStages; } - public List<HeadStage> getFiniteProducerStages() { + public Set<Stage> getFiniteProducerStages() { return this.finiteProducerStages; } - public List<HeadStage> getInfiniteProducerStages() { + public Set<Stage> getInfiniteProducerStages() { return this.infiniteProducerStages; } + public void addThreadableStage(final Stage stage) { + // wrap the stage categorization in a runnable + // because the termination strategy could depend on port configuration that is set later + final Runnable addThreadableStageJob = new Runnable() { + @Override + public void run() { + switch (stage.getTerminationStrategy()) { + case BY_SIGNAL: + consumerStages.add(stage); + break; + case BY_SELF_DECISION: + finiteProducerStages.add(stage); + break; + case BY_INTERRUPT: + infiniteProducerStages.add(stage); + break; + } + } + }; + + threadableStageJobs.add(addThreadableStageJob); + } + + void init() { + for (Runnable job : threadableStageJobs) { + job.run(); + } + } + } diff --git a/src/main/java/teetime/framework/ProducerStage.java b/src/main/java/teetime/framework/ProducerStage.java index ac3dbd1f078dfe477e5029bae8f83a1e2eba5ae6..02348cd9bd4a1efaa2aa6b12b5612bb9628b25ca 100644 --- a/src/main/java/teetime/framework/ProducerStage.java +++ b/src/main/java/teetime/framework/ProducerStage.java @@ -9,10 +9,9 @@ package teetime.framework; * the type of the default output port * */ -public abstract class ProducerStage<O> extends AbstractStage implements HeadStage { +public abstract class ProducerStage<O> extends AbstractStage implements Stage { protected final OutputPort<O> outputPort = this.createOutputPort(); - private boolean shouldTerminate; public final OutputPort<O> getOutputPort() { return this.outputPort; @@ -24,13 +23,8 @@ public abstract class ProducerStage<O> extends AbstractStage implements HeadStag } @Override - public void terminate() { - this.shouldTerminate = true; - } - - @Override - public boolean shouldBeTerminated() { - return this.shouldTerminate; + public TerminationStrategy getTerminationStrategy() { + return TerminationStrategy.BY_SELF_DECISION; } protected abstract void execute(); diff --git a/src/main/java/teetime/framework/RunnableStage.java b/src/main/java/teetime/framework/RunnableStage.java index f269740da0c3a5a109eeafd4cf0087219f8b4dd6..8103bd7efe5d0cb638645287f8589978b805ad5d 100644 --- a/src/main/java/teetime/framework/RunnableStage.java +++ b/src/main/java/teetime/framework/RunnableStage.java @@ -11,11 +11,11 @@ import teetime.framework.validation.AnalysisNotValidException; @SuppressWarnings("PMD.BeanMembersShouldSerialize") public class RunnableStage implements Runnable { - private final HeadStage stage; + private final Stage stage; private final Logger logger; // NOPMD private boolean validationEnabled; - public RunnableStage(final HeadStage stage) { + public RunnableStage(final Stage stage) { this.stage = stage; this.logger = LoggerFactory.getLogger(stage.getClass()); } diff --git a/src/main/java/teetime/framework/Stage.java b/src/main/java/teetime/framework/Stage.java index 3f0798ac9bd35ed3ad195e1ec1f5feefdc973fb1..55d676b994eb8af5a312f624f36a7c4b18a34a36 100644 --- a/src/main/java/teetime/framework/Stage.java +++ b/src/main/java/teetime/framework/Stage.java @@ -5,7 +5,7 @@ import java.util.List; import teetime.framework.signal.ISignal; import teetime.framework.validation.InvalidPortConnection; -public interface Stage { +public interface Stage extends Terminable { String getId(); diff --git a/src/main/java/teetime/framework/HeadStage.java b/src/main/java/teetime/framework/Terminable.java similarity index 53% rename from src/main/java/teetime/framework/HeadStage.java rename to src/main/java/teetime/framework/Terminable.java index 5e8de76568b84801027f947e8fadfbd214b39891..fd606b2f2f66c6e48ae7c5d7154629de7af31c53 100644 --- a/src/main/java/teetime/framework/HeadStage.java +++ b/src/main/java/teetime/framework/Terminable.java @@ -1,8 +1,11 @@ package teetime.framework; -public interface HeadStage extends Stage { +interface Terminable { - boolean shouldBeTerminated(); + TerminationStrategy getTerminationStrategy(); void terminate(); + + boolean shouldBeTerminated(); + } diff --git a/src/main/java/teetime/framework/TerminationStrategy.java b/src/main/java/teetime/framework/TerminationStrategy.java new file mode 100644 index 0000000000000000000000000000000000000000..eef7d78714c01930938ad098ebff14a6ec49ae89 --- /dev/null +++ b/src/main/java/teetime/framework/TerminationStrategy.java @@ -0,0 +1,5 @@ +package teetime.framework; + +public enum TerminationStrategy { + BY_SIGNAL, BY_SELF_DECISION, BY_INTERRUPT +} diff --git a/src/main/java/teetime/stage/Clock.java b/src/main/java/teetime/stage/Clock.java index e9439886d9c0ac1d5df7d225d43ec46cb49d753f..5cf5375016107c23123f056c047c0517d1b24b97 100644 --- a/src/main/java/teetime/stage/Clock.java +++ b/src/main/java/teetime/stage/Clock.java @@ -1,6 +1,7 @@ package teetime.stage; import teetime.framework.ProducerStage; +import teetime.framework.TerminationStrategy; public class Clock extends ProducerStage<Long> { @@ -9,6 +10,11 @@ public class Clock extends ProducerStage<Long> { private long initialDelayInMs; private long intervalDelayInMs; + @Override + public TerminationStrategy getTerminationStrategy() { + return TerminationStrategy.BY_INTERRUPT; + } + @Override protected void execute() { if (!this.initialDelayExceeded) { diff --git a/src/main/java/teetime/stage/io/File2ByteArray.java b/src/main/java/teetime/stage/io/File2ByteArray.java index 390598a17bc13d0a5c9dbbb5a1ba684dc338235a..aa3376877b246814d1b34c160f3deae05025a26c 100644 --- a/src/main/java/teetime/stage/io/File2ByteArray.java +++ b/src/main/java/teetime/stage/io/File2ByteArray.java @@ -4,12 +4,12 @@ import java.io.File; import java.io.IOException; import teetime.framework.ConsumerStage; -import teetime.framework.HeadStage; import teetime.framework.OutputPort; +import teetime.framework.Stage; import com.google.common.io.Files; -public class File2ByteArray extends ConsumerStage<File> implements HeadStage { +public class File2ByteArray extends ConsumerStage<File> implements Stage { private final OutputPort<byte[]> outputPort = this.createOutputPort(); diff --git a/src/performancetest/java/teetime/examples/experiment09/MethodCallThroughputAnalysis9.java b/src/performancetest/java/teetime/examples/experiment09/MethodCallThroughputAnalysis9.java index 882af15ee9f929163877da0ce189d88c7bb99c95..785ef39d4914adb9d7449dd2e0b65a3605322e5e 100644 --- a/src/performancetest/java/teetime/examples/experiment09/MethodCallThroughputAnalysis9.java +++ b/src/performancetest/java/teetime/examples/experiment09/MethodCallThroughputAnalysis9.java @@ -18,8 +18,8 @@ package teetime.examples.experiment09; import java.util.List; import teetime.framework.OldHeadPipeline; -import teetime.framework.HeadStage; import teetime.framework.RunnableStage; +import teetime.framework.Stage; import teetime.framework.pipe.CommittablePipe; import teetime.stage.CollectorSink; import teetime.stage.NoopFilter; @@ -43,7 +43,7 @@ public class MethodCallThroughputAnalysis9 { private Runnable runnable; public void init() { - HeadStage pipeline = this.buildPipeline(); + Stage pipeline = this.buildPipeline(); this.runnable = new RunnableStage(pipeline); } diff --git a/src/performancetest/java/teetime/examples/experiment11/MethodCallThroughputAnalysis11.java b/src/performancetest/java/teetime/examples/experiment11/MethodCallThroughputAnalysis11.java index e24c7d487f0c37e794583a33dcad8338f9ee1d12..31e7dfbdd4895bf17d6cb9fbea303dbb9347240b 100644 --- a/src/performancetest/java/teetime/examples/experiment11/MethodCallThroughputAnalysis11.java +++ b/src/performancetest/java/teetime/examples/experiment11/MethodCallThroughputAnalysis11.java @@ -18,8 +18,8 @@ package teetime.examples.experiment11; import java.util.List; import teetime.framework.OldHeadPipeline; -import teetime.framework.HeadStage; import teetime.framework.RunnableStage; +import teetime.framework.Stage; import teetime.framework.pipe.UnorderedGrowablePipe; import teetime.stage.CollectorSink; import teetime.stage.NoopFilter; @@ -43,7 +43,7 @@ public class MethodCallThroughputAnalysis11 { private Runnable runnable; public void init() { - HeadStage pipeline = this.buildPipeline(this.numInputObjects, this.inputObjectCreator); + Stage pipeline = this.buildPipeline(this.numInputObjects, this.inputObjectCreator); this.runnable = new RunnableStage(pipeline); } diff --git a/src/performancetest/java/teetime/examples/experiment14/MethodCallThroughputAnalysis14.java b/src/performancetest/java/teetime/examples/experiment14/MethodCallThroughputAnalysis14.java index 01de043b0a339896b44c16264095174723711afa..f744ee4ac333db997b76afcc860c2c8d17d04af0 100644 --- a/src/performancetest/java/teetime/examples/experiment14/MethodCallThroughputAnalysis14.java +++ b/src/performancetest/java/teetime/examples/experiment14/MethodCallThroughputAnalysis14.java @@ -18,8 +18,8 @@ package teetime.examples.experiment14; import java.util.List; import teetime.framework.OldHeadPipeline; -import teetime.framework.HeadStage; import teetime.framework.RunnableStage; +import teetime.framework.Stage; import teetime.framework.pipe.IPipeFactory; import teetime.framework.pipe.PipeFactoryRegistry; import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; @@ -47,7 +47,7 @@ public class MethodCallThroughputAnalysis14 { private final PipeFactoryRegistry pipeFactory = PipeFactoryRegistry.INSTANCE; public void init() { - HeadStage pipeline = this.buildPipeline(); + Stage pipeline = this.buildPipeline(); this.runnable = new RunnableStage(pipeline); } diff --git a/src/performancetest/java/teetime/examples/experiment15/MethodCallThroughputAnalysis15.java b/src/performancetest/java/teetime/examples/experiment15/MethodCallThroughputAnalysis15.java index 6b92bec5038741b7f4356ae676f6d6c187e0ad37..df9e7a4fb89b3b9656747d177d6c3c87df95ac6b 100644 --- a/src/performancetest/java/teetime/examples/experiment15/MethodCallThroughputAnalysis15.java +++ b/src/performancetest/java/teetime/examples/experiment15/MethodCallThroughputAnalysis15.java @@ -18,8 +18,8 @@ package teetime.examples.experiment15; import java.util.List; import teetime.framework.OldHeadPipeline; -import teetime.framework.HeadStage; import teetime.framework.RunnableStage; +import teetime.framework.Stage; import teetime.framework.pipe.OrderedGrowableArrayPipe; import teetime.framework.pipe.SingleElementPipe; import teetime.framework.pipe.SpScPipe; @@ -58,7 +58,7 @@ public class MethodCallThroughputAnalysis15 { OldHeadPipeline<Clock, Sink<Long>> clockPipeline = this.buildClockPipeline(); this.clockRunnable = new RunnableStage(clockPipeline); - HeadStage pipeline = this.buildPipeline(this.clock); + Stage pipeline = this.buildPipeline(this.clock); this.runnable = new RunnableStage(pipeline); } diff --git a/src/performancetest/java/teetime/framework/OldHeadPipeline.java b/src/performancetest/java/teetime/framework/OldHeadPipeline.java index d3dd64147f40dc4a77f07ba5008aa1b542a35dfd..4d166117c90a0dc97fe7373458c39a22f077c9f7 100644 --- a/src/performancetest/java/teetime/framework/OldHeadPipeline.java +++ b/src/performancetest/java/teetime/framework/OldHeadPipeline.java @@ -1,7 +1,7 @@ package teetime.framework; @Deprecated -public class OldHeadPipeline<FirstStage extends HeadStage, LastStage extends Stage> extends OldPipeline<FirstStage, LastStage> implements HeadStage { +public class OldHeadPipeline<FirstStage extends Stage, LastStage extends Stage> extends OldPipeline<FirstStage, LastStage> implements Stage { public OldHeadPipeline() {} diff --git a/src/performancetest/java/teetime/framework/OldPipeline.java b/src/performancetest/java/teetime/framework/OldPipeline.java index 11ff018d9957535db87d6ac2d45ce099ab074b81..e6c8244b8e80009fa15a43aa850f58c30590f97d 100644 --- a/src/performancetest/java/teetime/framework/OldPipeline.java +++ b/src/performancetest/java/teetime/framework/OldPipeline.java @@ -57,4 +57,22 @@ public class OldPipeline<FirstStage extends Stage, LastStage extends Stage> impl this.lastStage.validateOutputPorts(invalidPortConnections); } + @Override + public TerminationStrategy getTerminationStrategy() { + // TODO Auto-generated method stub + return null; + } + + @Override + public void terminate() { + // TODO Auto-generated method stub + + } + + @Override + public boolean shouldBeTerminated() { + // TODO Auto-generated method stub + return false; + } + } diff --git a/src/test/java/teetime/framework/AnalysisConfigurationTest.java b/src/test/java/teetime/framework/AnalysisConfigurationTest.java new file mode 100644 index 0000000000000000000000000000000000000000..265e3991e36e8a91a1d5458ccf943eb23afd44ad --- /dev/null +++ b/src/test/java/teetime/framework/AnalysisConfigurationTest.java @@ -0,0 +1,33 @@ +package teetime.framework; + +import org.junit.Assert; +import org.junit.Test; + +import teetime.stage.Clock; +import teetime.stage.Counter; +import teetime.stage.InitialElementProducer; + +public class AnalysisConfigurationTest { + + @Test + public void checkIfCorrectAdded() { + AnalysisConfiguration config = new AnalysisConfiguration(); + + // Consumer -> BY_SIGNAL + Counter<String> counter = new Counter<String>(); + config.addThreadableStage(counter); + + // Infinite producer -> BY_INTERRUPT + Clock clock = new Clock(); + config.addThreadableStage(clock); + + // Finite Producer -> BY_SELF_DECISION + InitialElementProducer<Integer> producer = new InitialElementProducer<Integer>(1, 2, 3, 4); + config.addThreadableStage(producer); + + config.init(); + Assert.assertTrue(config.getConsumerStages().remove(counter)); + Assert.assertTrue(config.getInfiniteProducerStages().remove(clock)); + Assert.assertTrue(config.getFiniteProducerStages().remove(producer)); + } +}