diff --git a/.settings/edu.umd.cs.findbugs.core.prefs b/.settings/edu.umd.cs.findbugs.core.prefs index 129e6553113110c92ede9914e509ee4d4c68a676..6aba28e5986cd8348751ed6d3c025c915ad7b0d2 100644 --- a/.settings/edu.umd.cs.findbugs.core.prefs +++ b/.settings/edu.umd.cs.findbugs.core.prefs @@ -1,5 +1,5 @@ #FindBugs User Preferences -#Wed May 13 15:54:28 CEST 2015 +#Fri Jun 12 08:15:02 CEST 2015 detector_threshold=2 effort=max excludefilter0=.fbExcludeFilterFile|true diff --git a/src/changes/changes.xml b/src/changes/changes.xml index 1b6d66eea209bdb3465f78d0a73fc0b33cb8dee9..61b694ea33eae506552220642f122d535ee1b6c2 100644 --- a/src/changes/changes.xml +++ b/src/changes/changes.xml @@ -5,71 +5,88 @@ <title>Release Notes</title> </properties> <body> - <release version="Snapshot" date="Daily basis" description="Unstable preview of oncoming versions"> + <release version="Snapshot" date="Daily basis" + description="Unstable preview of oncoming versions"> <action dev="ntd" type="add" issue="33"> - TeeTime automatically chooses the correct type of pipe for all connections. + TeeTime automatically + chooses the correct type of pipe for all connections. </action> <action dev="ntd" type="fix" issue="93"> - Introduced a new concept for composing stages. + Introduced a new concept + for composing stages. </action> <action dev="ntd" type="remove"> Marked Pair class as deprecated. </action> + <action dev="ntd" type="add" issue="154"> + All stages will be + initialized before starting the analysis. + </action> </release> - + <release version="1.1.2" date="12.05.2015" description="Minor bugfixes for 1.1"> <action dev="chw" due-to="Nils C. Ehmke" type="fix" issue="151"> Solved a bug in the merger stage. </action> </release> - + <release version="1.1.1" date="06.05.2015" description="Minor bugfixes for 1.1"> <action dev="ntd" due-to="Nils C. Ehmke" type="fix" issue="151"> Solved a bug which led to a NullPointerExceptions. </action> <action dev="ntd" type="update" issue="102"> - Removed deprecated methods. + Removed deprecated + methods. </action> - + </release> - + <release version="1.1" date="30.04.2015" description="Second release"> <action dev="ntd" type="add" issue="32"> - New concept: exception handling incl. Wiki tutorial. + New concept: exception + handling incl. Wiki tutorial. </action> <action due-to="Nils C. Ehmke" type="add" issue="107"> - New concept: unit test framework for testing a single stage. + New concept: + unit test framework for testing a single stage. </action> <action dev="chw" type="add"> - New class: AbstractTransformation; + New class: AbstractTransformation; Represents a stage with a single input and a single output port. </action> <action dev="chw" type="add"> New class: AbstractFilter; - Represents a stage with a single input and a single output port of the same type. + Represents a + stage with a single input and a single output port of the same type. </action> - - + + <action dev="ntd" type="update" issue="92"> - Analysis.start() is now deprecated. Use Analysis.execute() instead. + Analysis.start() is now + deprecated. Use Analysis.execute() instead. </action> <action due-to="Arne J. Salveter" type="update" issue="120"> - Renamed Stage.executeWithPorts() to Stage.executeStage(). + Renamed + Stage.executeWithPorts() to Stage.executeStage(). </action> <action dev="ntd" type="update" issue="112"> - Removed IterableProducer. Use InitialElementProducer instead. + Removed + IterableProducer. Use InitialElementProducer instead. </action> - - + + <action dev="chw" type="fix" issue="143"> - #143 Null values can block the analysis. + #143 Null values can block + the analysis. </action> <action dev="ntd" type="fix" issue="109"> - #109 Minor bug in ObjectProducer stage. + #109 Minor bug in + ObjectProducer stage. </action> <action dev="ntd" type="fix" issue="75"> - #75 Signal passing is incorrect. + #75 Signal passing is + incorrect. </action> @@ -77,10 +94,12 @@ Updated dependencies. </action> <action dev="ntd" type="update" issue="72"> - Jar is not only published via the Central Maven Repository, but also via our CI server Jenkins. + Jar is not only + published via the Central Maven Repository, but also via our CI + server Jenkins. </action> </release> - + <release version="1.0" date="19.12.2014" description="Initial release"> <action dev="ntd" type="add" issue="66"> Created a new site to diff --git a/src/main/java/teetime/framework/AbstractInterThreadPipe.java b/src/main/java/teetime/framework/AbstractInterThreadPipe.java index 4deee9e7b64f5eb96fbf393e55cc879b2ca0c023..acb1752ef08afe1795d079ee994326d5c91ff555 100644 --- a/src/main/java/teetime/framework/AbstractInterThreadPipe.java +++ b/src/main/java/teetime/framework/AbstractInterThreadPipe.java @@ -24,6 +24,7 @@ import org.jctools.queues.spec.Ordering; import org.jctools.queues.spec.Preference; import teetime.framework.signal.ISignal; +import teetime.framework.signal.InitializingSignal; import teetime.util.concurrent.queue.PCBlockingQueue; import teetime.util.concurrent.queue.putstrategy.PutStrategy; import teetime.util.concurrent.queue.putstrategy.YieldPutStrategy; @@ -69,6 +70,15 @@ public abstract class AbstractInterThreadPipe extends AbstractPipe { cachedTargetStage.onSignal(signal, getTargetPort()); } + @Override + public final void waitForInitializingSignal() throws InterruptedException { + final ISignal signal = signalQueue.take(); + if (!(signal instanceof InitializingSignal)) { + throw new IllegalStateException("Expected InitializingSignal, but was not the first arriving signal"); + } + cachedTargetStage.onSignal(signal, getTargetPort()); + } + @Override public final boolean isClosed() { return isClosed; diff --git a/src/main/java/teetime/framework/AbstractIntraThreadPipe.java b/src/main/java/teetime/framework/AbstractIntraThreadPipe.java index b3e47b9cf8e333a5f26cc55d70d7b7234b310e88..ccfce741b2945ac94d707b34d6a48a0eaf399f3b 100644 --- a/src/main/java/teetime/framework/AbstractIntraThreadPipe.java +++ b/src/main/java/teetime/framework/AbstractIntraThreadPipe.java @@ -56,4 +56,10 @@ public abstract class AbstractIntraThreadPipe extends AbstractPipe { public void waitForStartSignal() throws InterruptedException { // do nothing } + + @SuppressWarnings("PMD.EmptyMethodInAbstractClassShouldBeAbstract") + @Override + public void waitForInitializingSignal() throws InterruptedException { + // do nothing + } } diff --git a/src/main/java/teetime/framework/AbstractRunnableStage.java b/src/main/java/teetime/framework/AbstractRunnableStage.java index a5d537c2c89629975cfbdb6d62801bda249e010b..535503166b3b564d2a1fa106f51e9daf992993a9 100644 --- a/src/main/java/teetime/framework/AbstractRunnableStage.java +++ b/src/main/java/teetime/framework/AbstractRunnableStage.java @@ -25,7 +25,7 @@ abstract class AbstractRunnableStage implements Runnable { private static final String TERMINATING_THREAD_DUE_TO_THE_FOLLOWING_EXCEPTION = "Terminating thread due to the following exception: "; - private final Stage stage; + protected final Stage stage; @SuppressWarnings("PMD.LoggerIsNotStaticFinal") protected final Logger logger; @@ -39,16 +39,16 @@ abstract class AbstractRunnableStage implements Runnable { this.logger.debug("Executing runnable stage..."); boolean failed = false; try { - beforeStageExecution(stage); + beforeStageExecution(); try { do { - executeStage(stage); + executeStage(); } while (!stage.shouldBeTerminated()); } catch (StageException e) { this.stage.terminate(); failed = true; } - afterStageExecution(stage); + afterStageExecution(); } catch (RuntimeException e) { this.logger.error(TERMINATING_THREAD_DUE_TO_THE_FOLLOWING_EXCEPTION, e); @@ -72,10 +72,10 @@ abstract class AbstractRunnableStage implements Runnable { } - protected abstract void beforeStageExecution(Stage stage) throws InterruptedException; + protected abstract void beforeStageExecution() throws InterruptedException; - protected abstract void executeStage(Stage stage); + protected abstract void executeStage(); - protected abstract void afterStageExecution(Stage stage); + protected abstract void afterStageExecution(); } diff --git a/src/main/java/teetime/framework/Analysis.java b/src/main/java/teetime/framework/Analysis.java index 9beeb874950b9b1029224aedb716417216be2a25..50e4f96f2b963cb38de20df715375b45829d04ad 100644 --- a/src/main/java/teetime/framework/Analysis.java +++ b/src/main/java/teetime/framework/Analysis.java @@ -75,6 +75,8 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught private final IPipeFactory intraThreadPipeFactory = new SingleElementPipeFactory(); private int createdConnections = 0; + private final List<RunnableProducerStage> producerRunnables = new LinkedList<RunnableProducerStage>(); + /** * Creates a new {@link Analysis} that skips validating the port connections and uses the default listener. * @@ -130,10 +132,6 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught * */ private final void init() { - if (initialized) { - return; - } - initialized = true; instantiatePipes(); @@ -150,6 +148,12 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught initializeIntraStages(intraStages, thread, newListener); } + startThreads(this.consumerThreads); + startThreads(this.finiteProducerThreads); + startThreads(this.infiniteProducerThreads); + + sendInitializingSignal(); + } private Thread initializeThreadableStages(final Stage stage) { @@ -165,6 +169,7 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught } case BY_SELF_DECISION: { final RunnableProducerStage runnable = new RunnableProducerStage(stage); + producerRunnables.add(runnable); thread = createThread(runnable, stage.getId()); this.finiteProducerThreads.add(thread); InitializingSignal initializingSignal = new InitializingSignal(); @@ -173,6 +178,7 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught } case BY_INTERRUPT: { final RunnableProducerStage runnable = new RunnableProducerStage(stage); + producerRunnables.add(runnable); thread = createThread(runnable, stage.getId()); InitializingSignal initializingSignal = new InitializingSignal(); stage.onSignal(initializingSignal, null); @@ -315,9 +321,7 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught * @since 1.1 */ public void executeNonBlocking() { - startThreads(this.consumerThreads); - startThreads(this.finiteProducerThreads); - startThreads(this.infiniteProducerThreads); + sendStartingSignal(); } private void startThreads(final Iterable<Thread> threads) { @@ -326,6 +330,18 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught } } + private void sendInitializingSignal() { + for (RunnableProducerStage runnable : producerRunnables) { + runnable.triggerInitializingSignal(); + } + } + + private void sendStartingSignal() { + for (RunnableProducerStage runnable : producerRunnables) { + runnable.triggerStartingSignal(); + } + } + /** * Retrieves the Configuration which was used to add and arrange all stages needed for the Analysis * diff --git a/src/main/java/teetime/framework/InputPort.java b/src/main/java/teetime/framework/InputPort.java index 2c2fee6f833fd6fc8476962a9ea3cf371845366b..f6e2a30d170607b2f237ec57a4145810ce6f2024 100644 --- a/src/main/java/teetime/framework/InputPort.java +++ b/src/main/java/teetime/framework/InputPort.java @@ -38,4 +38,8 @@ public final class InputPort<T> extends AbstractPort<T> { pipe.waitForStartSignal(); } + public void waitForInitializingSignal() throws InterruptedException { + pipe.waitForInitializingSignal(); + }; + } diff --git a/src/main/java/teetime/framework/RunnableConsumerStage.java b/src/main/java/teetime/framework/RunnableConsumerStage.java index ded489ebed6581caff6da9b0cc4ae39c82fb20cb..c1a78e20917d21a7052dd47f66fd3ed18e3f023b 100644 --- a/src/main/java/teetime/framework/RunnableConsumerStage.java +++ b/src/main/java/teetime/framework/RunnableConsumerStage.java @@ -42,19 +42,19 @@ final class RunnableConsumerStage extends AbstractRunnableStage { @SuppressWarnings("PMD.GuardLogStatement") @Override - protected void beforeStageExecution(final Stage stage) throws InterruptedException { - logger.trace("Waiting for start signals..." + inputPorts); + protected void beforeStageExecution() throws InterruptedException { + logger.trace("Waiting for start signals... " + stage); for (InputPort<?> inputPort : inputPorts) { - inputPort.waitForStartSignal(); + inputPort.waitForInitializingSignal(); } for (InputPort<?> inputPort : inputPorts) { inputPort.waitForStartSignal(); } - logger.trace("Starting..." + stage); + logger.trace("Starting... " + stage); } @Override - protected void executeStage(final Stage stage) { + protected void executeStage() { try { stage.executeStage(); } catch (NotEnoughInputException e) { @@ -73,7 +73,7 @@ final class RunnableConsumerStage extends AbstractRunnableStage { } @Override - protected void afterStageExecution(final Stage stage) { + protected void afterStageExecution() { final ISignal signal = new TerminatingSignal(); for (InputPort<?> inputPort : inputPorts) { stage.onSignal(signal, inputPort); diff --git a/src/main/java/teetime/framework/RunnableProducerStage.java b/src/main/java/teetime/framework/RunnableProducerStage.java index 2b7b556af36be9f5dd00b4c827c48dea1b831dc5..f8e9eb482df8956166be0ed421d1b51472d4ab9d 100644 --- a/src/main/java/teetime/framework/RunnableProducerStage.java +++ b/src/main/java/teetime/framework/RunnableProducerStage.java @@ -15,30 +15,53 @@ */ package teetime.framework; +import java.util.concurrent.Semaphore; + +import teetime.framework.signal.InitializingSignal; import teetime.framework.signal.StartingSignal; import teetime.framework.signal.TerminatingSignal; -public final class RunnableProducerStage extends AbstractRunnableStage { +final class RunnableProducerStage extends AbstractRunnableStage { + + private final Semaphore startSemaphore = new Semaphore(0); + private final Semaphore initSemaphore = new Semaphore(0); public RunnableProducerStage(final Stage stage) { super(stage); } @Override - protected void beforeStageExecution(final Stage stage) { - final StartingSignal startingSignal = new StartingSignal(); - stage.onSignal(startingSignal, null); + protected void beforeStageExecution() throws InterruptedException { + waitForInitializingSignal(); + this.stage.onSignal(new InitializingSignal(), null); + waitForStartingSignal(); + this.stage.onSignal(new StartingSignal(), null); } @Override - protected void executeStage(final Stage stage) { - stage.executeStage(); + protected void executeStage() { + this.stage.executeStage(); } @Override - protected void afterStageExecution(final Stage stage) { + protected void afterStageExecution() { final TerminatingSignal terminatingSignal = new TerminatingSignal(); - stage.onSignal(terminatingSignal, null); + this.stage.onSignal(terminatingSignal, null); } + public void triggerInitializingSignal() { + initSemaphore.release(); + } + + public void triggerStartingSignal() { + startSemaphore.release(); + } + + public void waitForInitializingSignal() throws InterruptedException { + initSemaphore.acquire(); + } + + public void waitForStartingSignal() throws InterruptedException { + startSemaphore.acquire(); + } } diff --git a/src/main/java/teetime/framework/pipe/DummyPipe.java b/src/main/java/teetime/framework/pipe/DummyPipe.java index 23031fe477fcca371ecca1ba43dbfc73a9527c62..2fe5d8833e237931be44dc2aa69ab3f3b4d8606b 100644 --- a/src/main/java/teetime/framework/pipe/DummyPipe.java +++ b/src/main/java/teetime/framework/pipe/DummyPipe.java @@ -86,6 +86,11 @@ public final class DummyPipe implements IPipe { } + @Override + public void waitForInitializingSignal() throws InterruptedException { + + } + @Override public void close() { diff --git a/src/main/java/teetime/framework/pipe/IPipe.java b/src/main/java/teetime/framework/pipe/IPipe.java index f2fd15ecd4adf2a8f72f8d53e3f3fec9a9bf9a37..183aac72c8b50343ca9939dd1daf7ae7b4aa6bf7 100644 --- a/src/main/java/teetime/framework/pipe/IPipe.java +++ b/src/main/java/teetime/framework/pipe/IPipe.java @@ -92,6 +92,8 @@ public interface IPipe { void waitForStartSignal() throws InterruptedException; + void waitForInitializingSignal() throws InterruptedException; + void close(); } diff --git a/src/main/java/teetime/stage/Counter.java b/src/main/java/teetime/stage/Counter.java index 0dcfc2d54eaf34750fca64c430fa82dbc294de4c..9bfc67c23c9a6994b9e12d1d92bf3f25e7910705 100644 --- a/src/main/java/teetime/stage/Counter.java +++ b/src/main/java/teetime/stage/Counter.java @@ -31,8 +31,10 @@ public final class Counter<T> extends AbstractConsumerStage<T> { outputPort.send(element); } - // BETTER find a solution w/o any thread-safe code in this stage - public synchronized int getNumElementsPassed() { + /** + * <i>Hint:</i> This method may not be invoked by another thread since it is not thread-safe. + */ + public int getNumElementsPassed() { return this.numElementsPassed; } diff --git a/src/test/java/teetime/framework/RunnableProducerStageTest.java b/src/test/java/teetime/framework/RunnableProducerStageTest.java new file mode 100644 index 0000000000000000000000000000000000000000..4f2120ac2207c57286c23b61e0d91a09b5b296b6 --- /dev/null +++ b/src/test/java/teetime/framework/RunnableProducerStageTest.java @@ -0,0 +1,27 @@ +package teetime.framework; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.junit.Test; + +public class RunnableProducerStageTest { + + @Test + public void testInit() { + RunnableTestStage testStage = new RunnableTestStage(); + RunnableProducerStage runnable = new RunnableProducerStage(testStage); + Thread thread = new Thread(runnable); + thread.start(); + // Not running and not initialized + assertFalse(testStage.executed && testStage.initialized); + runnable.triggerInitializingSignal(); + // Not running, but initialized + assertFalse(testStage.executed && !testStage.initialized); + runnable.triggerStartingSignal(); + while (!(testStage.getCurrentState() == StageState.TERMINATED)) { + Thread.yield(); + } + assertTrue(testStage.executed); + } +} diff --git a/src/test/java/teetime/framework/RunnableTestStage.java b/src/test/java/teetime/framework/RunnableTestStage.java new file mode 100644 index 0000000000000000000000000000000000000000..47f4cd0c31f542a2e8b40a9626cb001b47ec4c6f --- /dev/null +++ b/src/test/java/teetime/framework/RunnableTestStage.java @@ -0,0 +1,24 @@ +package teetime.framework; + +class RunnableTestStage extends AbstractProducerStage<Object> { + + boolean executed, initialized; + + @Override + protected void executeStage() { + executed = true; + this.terminate(); + } + + @Override + protected void execute() { + + } + + @Override + public void onInitializing() throws Exception { + super.onInitializing(); + initialized = true; + } + +} diff --git a/src/test/java/teetime/stage/basic/merger/MergerTestingPipe.java b/src/test/java/teetime/stage/basic/merger/MergerTestingPipe.java index 42696bf77420db1cbc4693771311b38ef5d47152..51c82d813ae7f7f70674160f60e9cc998fa124c8 100644 --- a/src/test/java/teetime/stage/basic/merger/MergerTestingPipe.java +++ b/src/test/java/teetime/stage/basic/merger/MergerTestingPipe.java @@ -102,6 +102,9 @@ class MergerTestingPipe implements IPipe { @Override public void waitForStartSignal() throws InterruptedException {} + @Override + public void waitForInitializingSignal() throws InterruptedException {} + @Override public void close() {}