diff --git a/src/main/java/teetime/framework/AbstractInterThreadPipe.java b/src/main/java/teetime/framework/AbstractInterThreadPipe.java index b3f25a75acbb0c255cdcbb6c1a0c5e3550650f97..f86e7f636534f275f3da78567becb1cbb11ce544 100644 --- a/src/main/java/teetime/framework/AbstractInterThreadPipe.java +++ b/src/main/java/teetime/framework/AbstractInterThreadPipe.java @@ -24,7 +24,6 @@ import org.jctools.queues.spec.Ordering; import org.jctools.queues.spec.Preference; import teetime.framework.signal.ISignal; -import teetime.framework.signal.InitializingSignal; import teetime.framework.signal.StartingSignal; import teetime.util.framework.concurrent.queue.PCBlockingQueue; import teetime.util.framework.concurrent.queue.putstrategy.PutStrategy; @@ -65,15 +64,6 @@ public abstract class AbstractInterThreadPipe<T> extends AbstractPipe<T> { // do nothing } - @Override - public final void waitForInitializingSignal() throws InterruptedException { - final ISignal signal = signalQueue.take(); - if (!(signal instanceof InitializingSignal)) { - throw new IllegalStateException("Expected InitializingSignal, but was " + signal.getClass().getSimpleName()); - } - cachedTargetStage.onSignal(signal, getTargetPort()); - } - @Override public final void waitForStartSignal() throws InterruptedException { final ISignal signal = signalQueue.take(); diff --git a/src/main/java/teetime/framework/AbstractIntraThreadPipe.java b/src/main/java/teetime/framework/AbstractIntraThreadPipe.java index 2be7f3e39aba05e91c22fe8226bc464a378a6e00..3c8f7c6c073b3e72948ab61c9d38ec9428f72b90 100644 --- a/src/main/java/teetime/framework/AbstractIntraThreadPipe.java +++ b/src/main/java/teetime/framework/AbstractIntraThreadPipe.java @@ -57,9 +57,4 @@ public abstract class AbstractIntraThreadPipe<T> extends AbstractPipe<T> { // do nothing } - @SuppressWarnings("PMD.EmptyMethodInAbstractClassShouldBeAbstract") - @Override - public void waitForInitializingSignal() throws InterruptedException { - // do nothing - } } diff --git a/src/main/java/teetime/framework/AbstractRunnableStage.java b/src/main/java/teetime/framework/AbstractRunnableStage.java index e2f87810a016fdadbf0e6221666c7d2937afa194..d9116d45cb552c56a3fc7c6ec96f6898c4dcf7e7 100644 --- a/src/main/java/teetime/framework/AbstractRunnableStage.java +++ b/src/main/java/teetime/framework/AbstractRunnableStage.java @@ -49,9 +49,9 @@ abstract class AbstractRunnableStage implements Runnable { throw new IllegalArgumentException("Argument stage may not have a nullable owning context"); } try { - do { + while (!stage.shouldBeTerminated()) { executeStage(); - } while (!stage.shouldBeTerminated()); + } } catch (TerminateException e) { this.stage.terminate(); stage.getOwningContext().abortConfigurationRun(); @@ -65,13 +65,16 @@ abstract class AbstractRunnableStage implements Runnable { } catch (InterruptedException e) { this.logger.error(TERMINATING_THREAD_DUE_TO_THE_FOLLOWING_EXCEPTION, e); } - } finally { + } finally + + { if (stage.getTerminationStrategy() != TerminationStrategy.BY_INTERRUPT) { stage.getOwningContext().getThreadService().getRunnableCounter().dec(); } } logger.debug("Finished runnable stage. (" + stage.getId() + ")"); + } protected abstract void beforeStageExecution() throws InterruptedException; diff --git a/src/main/java/teetime/framework/AbstractStage.java b/src/main/java/teetime/framework/AbstractStage.java index ec7070130bc0f6c3e3a41361b9282c192938f085..da8144202a25ec4c17f880f3b3b0e499ad3e7d38 100644 --- a/src/main/java/teetime/framework/AbstractStage.java +++ b/src/main/java/teetime/framework/AbstractStage.java @@ -55,7 +55,11 @@ public abstract class AbstractStage extends Stage { @Override public void onSignal(final ISignal signal, final InputPort<?> inputPort) { if (!this.signalAlreadyReceived(signal, inputPort)) { - signal.trigger(this); + try { + signal.trigger(this); + } catch (Exception e) { + this.getOwningContext().abortConfigurationRun(); + } for (OutputPort<?> outputPort : outputPorts.getOpenedPorts()) { outputPort.sendSignal(signal); } @@ -85,11 +89,6 @@ public abstract class AbstractStage extends Stage { return signalAlreadyReceived; } - @Override - public void onInitializing() throws Exception { - changeState(StageState.INITIALIZED); - } - private void changeState(final StageState newState) { currentState = newState; logger.trace(newState.toString()); diff --git a/src/main/java/teetime/framework/InputPort.java b/src/main/java/teetime/framework/InputPort.java index d749591db16f37d27ba36a154fdd922577a5b621..a11035a31c38955dde678e29dedcc40a28838c05 100644 --- a/src/main/java/teetime/framework/InputPort.java +++ b/src/main/java/teetime/framework/InputPort.java @@ -47,8 +47,4 @@ public class InputPort<T> extends AbstractPort<T> { pipe.waitForStartSignal(); } - public void waitForInitializingSignal() throws InterruptedException { - pipe.waitForInitializingSignal(); - }; - } diff --git a/src/main/java/teetime/framework/RunnableConsumerStage.java b/src/main/java/teetime/framework/RunnableConsumerStage.java index 03ecfdc28774964a797f4b2923d637267f9daa36..6064511953b3f71f88ad1e13e115ce5fafdc7c55 100644 --- a/src/main/java/teetime/framework/RunnableConsumerStage.java +++ b/src/main/java/teetime/framework/RunnableConsumerStage.java @@ -32,10 +32,6 @@ final class RunnableConsumerStage extends AbstractRunnableStage { @Override protected void beforeStageExecution() throws InterruptedException { - logger.trace("waitForInitializingSignal"); - for (InputPort<?> inputPort : stage.getInputPorts()) { - inputPort.waitForInitializingSignal(); - } logger.trace("waitForStartingSignal"); for (InputPort<?> inputPort : stage.getInputPorts()) { inputPort.waitForStartSignal(); diff --git a/src/main/java/teetime/framework/RunnableProducerStage.java b/src/main/java/teetime/framework/RunnableProducerStage.java index 9ccfe80cb4c35f32287cd27ad5d3e34b225fa16e..9ee6cf833bdc249273b8be00ca43538edd8f4c3d 100644 --- a/src/main/java/teetime/framework/RunnableProducerStage.java +++ b/src/main/java/teetime/framework/RunnableProducerStage.java @@ -17,14 +17,12 @@ package teetime.framework; import java.util.concurrent.Semaphore; -import teetime.framework.signal.InitializingSignal; import teetime.framework.signal.StartingSignal; import teetime.framework.signal.TerminatingSignal; public final class RunnableProducerStage extends AbstractRunnableStage { private final Semaphore startSemaphore = new Semaphore(0); - private final Semaphore initSemaphore = new Semaphore(0); RunnableProducerStage(final Stage stage) { super(stage); @@ -32,8 +30,6 @@ public final class RunnableProducerStage extends AbstractRunnableStage { @Override protected void beforeStageExecution() throws InterruptedException { - waitForInitializingSignal(); - this.stage.onSignal(new InitializingSignal(), null); waitForStartingSignal(); this.stage.onSignal(new StartingSignal(), null); } @@ -49,19 +45,10 @@ public final class RunnableProducerStage extends AbstractRunnableStage { this.stage.onSignal(terminatingSignal, null); } - public void triggerInitializingSignal() { - initSemaphore.release(); - } - public void triggerStartingSignal() { startSemaphore.release(); } - private void waitForInitializingSignal() throws InterruptedException { - logger.trace("waitForInitializingSignal"); - initSemaphore.acquire(); - } - private void waitForStartingSignal() throws InterruptedException { logger.trace("waitForStartingSignal"); startSemaphore.acquire(); diff --git a/src/main/java/teetime/framework/RuntimeServiceFacade.java b/src/main/java/teetime/framework/RuntimeServiceFacade.java index 2670bed14fc0d61274565b7415e3cef8cc5108f2..e3e34877e6f173fde2ed0b240ca4d9e4da4bd750 100644 --- a/src/main/java/teetime/framework/RuntimeServiceFacade.java +++ b/src/main/java/teetime/framework/RuntimeServiceFacade.java @@ -26,4 +26,5 @@ public final class RuntimeServiceFacade { public void startWithinNewThread(final Stage previousStage, final Stage stage) { previousStage.getOwningContext().getThreadService().startStageAtRuntime(stage); } + } diff --git a/src/main/java/teetime/framework/Stage.java b/src/main/java/teetime/framework/Stage.java index 582f7b3f1c128b41e8894dc6a23d1c1417e8e3f5..ae33ba66d87b285c4b6adb1f85f7b4e79ed358cd 100644 --- a/src/main/java/teetime/framework/Stage.java +++ b/src/main/java/teetime/framework/Stage.java @@ -165,9 +165,6 @@ public abstract class Stage { * @throws Exception * an arbitrary exception if an error occurs during the initialization */ - @SuppressWarnings("PMD.SignatureDeclareThrowsException") - public abstract void onInitializing() throws Exception; - @SuppressWarnings("PMD.SignatureDeclareThrowsException") public abstract void onStarting() throws Exception; diff --git a/src/main/java/teetime/framework/TeeTimeThread.java b/src/main/java/teetime/framework/TeeTimeThread.java index dc0d93997784bf897779a4264c849304367e870f..3fe0d2fe4ceb83df56dd2a7fa6292376ff5849d2 100644 --- a/src/main/java/teetime/framework/TeeTimeThread.java +++ b/src/main/java/teetime/framework/TeeTimeThread.java @@ -24,12 +24,6 @@ public class TeeTimeThread extends Thread { this.runnable = runnable; } - public void sendInitializingSignal() { - if (runnable instanceof RunnableProducerStage) { - ((RunnableProducerStage) runnable).triggerInitializingSignal(); - } - } - public void sendStartingSignal() { if (runnable instanceof RunnableProducerStage) { ((RunnableProducerStage) runnable).triggerStartingSignal(); diff --git a/src/main/java/teetime/framework/ThreadService.java b/src/main/java/teetime/framework/ThreadService.java index 3e27f69b6e01a1c8f8a7524241995e0b3e0346cc..de097190d01fbe68e913b394659ef9a747d6163f 100644 --- a/src/main/java/teetime/framework/ThreadService.java +++ b/src/main/java/teetime/framework/ThreadService.java @@ -58,7 +58,6 @@ class ThreadService extends AbstractService<ThreadService> { Set<Stage> newThreadableStages = initialize(startStage); startThreads(newThreadableStages); - sendInitializingSignal(newThreadableStages); } void startStageAtRuntime(final Stage newStage) { @@ -66,7 +65,6 @@ class ThreadService extends AbstractService<ThreadService> { Set<Stage> newThreadableStages = initialize(newStage); startThreads(newThreadableStages); - sendInitializingSignal(newThreadableStages); sendStartingSignal(newThreadableStages); } @@ -130,12 +128,6 @@ class ThreadService extends AbstractService<ThreadService> { } } - private void sendInitializingSignal(final Set<Stage> threadableStages) { - for (Stage stage : threadableStages) { - ((TeeTimeThread) stage.getOwningThread()).sendInitializingSignal(); - } - } - private void sendStartingSignal(final Set<Stage> newThreadableStages) { synchronized (newThreadableStages) { for (Stage stage : newThreadableStages) { diff --git a/src/main/java/teetime/framework/pipe/DummyPipe.java b/src/main/java/teetime/framework/pipe/DummyPipe.java index a74d68ddbc99159d653aaa3adcd6585e9f715f59..48639feef0f0316dbe8a73ddbcab5930a56f659b 100644 --- a/src/main/java/teetime/framework/pipe/DummyPipe.java +++ b/src/main/java/teetime/framework/pipe/DummyPipe.java @@ -91,11 +91,6 @@ public final class DummyPipe implements IPipe<Object> { } - @Override - public void waitForInitializingSignal() throws InterruptedException { - - } - @Override public void close() { diff --git a/src/main/java/teetime/framework/pipe/IPipe.java b/src/main/java/teetime/framework/pipe/IPipe.java index 7610bd7d348d5bfa8b7520feb892996dd400ee47..5f266c6fed4deb6ab0a46f47be9ea8f134cdaed1 100644 --- a/src/main/java/teetime/framework/pipe/IPipe.java +++ b/src/main/java/teetime/framework/pipe/IPipe.java @@ -97,8 +97,6 @@ public interface IPipe<T> { void waitForStartSignal() throws InterruptedException; - void waitForInitializingSignal() throws InterruptedException; - void close(); } diff --git a/src/main/java/teetime/framework/pipe/InstantiationPipe.java b/src/main/java/teetime/framework/pipe/InstantiationPipe.java index b0cae7f675e6f5d3ff85780745baa72d6b343f12..dc9f3ce55c91c793a39d446bf66eb05e4d5ffd25 100644 --- a/src/main/java/teetime/framework/pipe/InstantiationPipe.java +++ b/src/main/java/teetime/framework/pipe/InstantiationPipe.java @@ -101,11 +101,6 @@ public class InstantiationPipe<T> implements IPipe<T> { throw new IllegalStateException(ERROR_MESSAGE); } - @Override - public void waitForInitializingSignal() throws InterruptedException { - throw new IllegalStateException(ERROR_MESSAGE); - } - @Override public void close() { throw new IllegalStateException(ERROR_MESSAGE); diff --git a/src/main/java/teetime/framework/signal/AbstractSignal.java b/src/main/java/teetime/framework/signal/AbstractSignal.java deleted file mode 100644 index cd67b53cebfebd785c6fe3fdfecc3f7462a05782..0000000000000000000000000000000000000000 --- a/src/main/java/teetime/framework/signal/AbstractSignal.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://christianwulf.github.io/teetime) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package teetime.framework.signal; - -import java.util.LinkedList; -import java.util.List; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -abstract class AbstractSignal implements ISignal { - - protected static final Logger LOGGER = LoggerFactory.getLogger(AbstractSignal.class); - - protected final List<Exception> catchedExceptions = new LinkedList<Exception>(); - - protected AbstractSignal() { - super(); - } - - public List<Exception> getCatchedExceptions() { - return this.catchedExceptions; - } - -} diff --git a/src/main/java/teetime/framework/signal/ISignal.java b/src/main/java/teetime/framework/signal/ISignal.java index 13ecf747a31ae2d69a8d697267e8b7cf81e51f4b..80469661a1a5d7ff68667135081f7888c668ccd7 100644 --- a/src/main/java/teetime/framework/signal/ISignal.java +++ b/src/main/java/teetime/framework/signal/ISignal.java @@ -23,7 +23,7 @@ import teetime.framework.Stage; public interface ISignal { - void trigger(Stage stage); + void trigger(Stage stage) throws Exception; // Only used by the merger so far boolean mayBeTriggered(Set<InputPort<?>> receivedInputPorts, List<InputPort<?>> allInputPorts); diff --git a/src/main/java/teetime/framework/signal/InitializingSignal.java b/src/main/java/teetime/framework/signal/InitializingSignal.java deleted file mode 100644 index 5de5d00fdf15cc5b49a1bfe13c2ead05edebfab7..0000000000000000000000000000000000000000 --- a/src/main/java/teetime/framework/signal/InitializingSignal.java +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://christianwulf.github.io/teetime) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package teetime.framework.signal; - -import java.util.List; -import java.util.Set; - -import teetime.framework.InputPort; -import teetime.framework.Stage; - -public final class InitializingSignal extends AbstractSignal { - - @Override - public void trigger(final Stage stage) { - try { - stage.onInitializing(); - } catch (final Exception e) { // NOCS NOPMD (Stages can throw any arbitrary Exception) - this.catchedExceptions.add(e); - LOGGER.error("Exception while sending the initializing signal", e); - } - } - - @Override - public boolean mayBeTriggered(final Set<InputPort<?>> receivedInputPorts, final List<InputPort<?>> allInputPorts) { - return true; - } - -} diff --git a/src/main/java/teetime/framework/signal/StartingSignal.java b/src/main/java/teetime/framework/signal/StartingSignal.java index 5b547dd5e6fb6253af064271b5578fd72598adf3..f8e001592ba6250696beef8201c61fd8c64307da 100644 --- a/src/main/java/teetime/framework/signal/StartingSignal.java +++ b/src/main/java/teetime/framework/signal/StartingSignal.java @@ -21,16 +21,11 @@ import java.util.Set; import teetime.framework.InputPort; import teetime.framework.Stage; -public final class StartingSignal extends AbstractSignal { +public final class StartingSignal implements ISignal { @Override - public void trigger(final Stage stage) { - try { - stage.onStarting(); - } catch (final Exception e) { // NOCS NOPMD (Stages can throw any arbitrary Exception) - this.catchedExceptions.add(e); - LOGGER.error("Exception while sending the start signal", e); - } + public void trigger(final Stage stage) throws Exception { + stage.onStarting(); } @Override diff --git a/src/main/java/teetime/framework/signal/TerminatingSignal.java b/src/main/java/teetime/framework/signal/TerminatingSignal.java index 32ec5bf20e9d38a56c0f949d32954cae4490be52..f17b532a66bb26b634f5de32d35707e5a2249a32 100644 --- a/src/main/java/teetime/framework/signal/TerminatingSignal.java +++ b/src/main/java/teetime/framework/signal/TerminatingSignal.java @@ -21,16 +21,11 @@ import java.util.Set; import teetime.framework.InputPort; import teetime.framework.Stage; -public final class TerminatingSignal extends AbstractSignal { +public final class TerminatingSignal implements ISignal { @Override - public void trigger(final Stage stage) { - try { - stage.onTerminating(); - } catch (final Exception e) { // NOCS NOPMD (Stages can throw any arbitrary Exception) - this.catchedExceptions.add(e); - LOGGER.error("Exception while sending the termination signal", e); - } + public void trigger(final Stage stage) throws Exception { + stage.onTerminating(); } @Override diff --git a/src/main/java/teetime/stage/basic/distributor/dynamic/CreatePortAction.java b/src/main/java/teetime/stage/basic/distributor/dynamic/CreatePortAction.java index bc6faa47df1868e9b49235ad1d4a9b6439d261dc..3110e3cb12b37840e0e960bee1085c2c1b5de289 100644 --- a/src/main/java/teetime/stage/basic/distributor/dynamic/CreatePortAction.java +++ b/src/main/java/teetime/stage/basic/distributor/dynamic/CreatePortAction.java @@ -18,11 +18,10 @@ package teetime.stage.basic.distributor.dynamic; import java.util.ArrayList; import java.util.List; -import teetime.framework.RuntimeServiceFacade; import teetime.framework.InputPort; import teetime.framework.OutputPort; +import teetime.framework.RuntimeServiceFacade; import teetime.framework.pipe.SpScPipeFactory; -import teetime.framework.signal.InitializingSignal; import teetime.framework.signal.StartingSignal; import teetime.util.framework.port.PortAction; @@ -51,7 +50,6 @@ public class CreatePortAction<T> implements PortAction<DynamicDistributor<T>> { RuntimeServiceFacade.INSTANCE.startWithinNewThread(dynamicDistributor, inputPort.getOwningStage()); - newOutputPort.sendSignal(new InitializingSignal()); newOutputPort.sendSignal(new StartingSignal()); // FIXME pass the new thread to the analysis so that it can terminate the thread at the end diff --git a/src/test/java/teetime/framework/RunnableProducerStageTest.java b/src/test/java/teetime/framework/RunnableProducerStageTest.java index 24d7a085e1685e8e3873ee9dc339a878fc6039ce..c0f8e6bfca0bcb2f3bd55c9312304fff6f4e9b3a 100644 --- a/src/test/java/teetime/framework/RunnableProducerStageTest.java +++ b/src/test/java/teetime/framework/RunnableProducerStageTest.java @@ -40,12 +40,8 @@ public class RunnableProducerStageTest { thread.start(); - // Not running and not initialized - assertFalse(testStage.executed && testStage.initialized); - runnable.triggerInitializingSignal(); - // Not running, but initialized - assertFalse(testStage.executed && !testStage.initialized); + assertFalse(testStage.executed); runnable.triggerStartingSignal(); thread.join(); diff --git a/src/test/java/teetime/framework/RunnableTestStage.java b/src/test/java/teetime/framework/RunnableTestStage.java index 5fa78aadf0690da4b8bf62a01e9d40a68218a969..859741e095d5eb8789f270bf670226d8459ac56a 100644 --- a/src/test/java/teetime/framework/RunnableTestStage.java +++ b/src/test/java/teetime/framework/RunnableTestStage.java @@ -25,10 +25,4 @@ class RunnableTestStage extends AbstractProducerStage<Object> { this.terminate(); } - @Override - public void onInitializing() throws Exception { - super.onInitializing(); - initialized = true; - } - } diff --git a/src/test/java/teetime/framework/pipe/SpScPipeTest.java b/src/test/java/teetime/framework/pipe/SpScPipeTest.java index 3cc20d78487f1e5754f797f36cce9f3b3cf286a8..8e61718b5ba968f8e97775fec8a23dc914bc09d8 100644 --- a/src/test/java/teetime/framework/pipe/SpScPipeTest.java +++ b/src/test/java/teetime/framework/pipe/SpScPipeTest.java @@ -27,7 +27,6 @@ import teetime.framework.AbstractInterThreadPipe; import teetime.framework.InputPort; import teetime.framework.OutputPort; import teetime.framework.signal.ISignal; -import teetime.framework.signal.InitializingSignal; import teetime.framework.signal.StartingSignal; import teetime.framework.signal.TerminatingSignal; import teetime.framework.signal.ValidatingSignal; @@ -47,14 +46,11 @@ public class SpScPipeTest { List<ISignal> signals = new ArrayList<ISignal>(); signals.add(new StartingSignal()); signals.add(new TerminatingSignal()); - signals.add(new InitializingSignal()); signals.add(new ValidatingSignal()); signals.add(new StartingSignal()); signals.add(new TerminatingSignal()); - signals.add(new InitializingSignal()); signals.add(new ValidatingSignal()); signals.add(new StartingSignal()); - signals.add(new InitializingSignal()); signals.add(new TerminatingSignal()); signals.add(new ValidatingSignal()); diff --git a/src/test/java/teetime/stage/basic/merger/MergerTestingPipe.java b/src/test/java/teetime/stage/basic/merger/MergerTestingPipe.java index 62b2d97b49eebce2ef8a66be1b1a0efb4b6d258d..e0bd8f0a4035ddced7437c4815daeb7b56729cb3 100644 --- a/src/test/java/teetime/stage/basic/merger/MergerTestingPipe.java +++ b/src/test/java/teetime/stage/basic/merger/MergerTestingPipe.java @@ -107,9 +107,6 @@ class MergerTestingPipe implements IPipe { @Override public void waitForStartSignal() throws InterruptedException {} - @Override - public void waitForInitializingSignal() throws InterruptedException {} - @Override public void close() {}