diff --git a/src/main/java/teetime/framework/AbstractInterThreadPipe.java b/src/main/java/teetime/framework/AbstractInterThreadPipe.java index b3f25a75acbb0c255cdcbb6c1a0c5e3550650f97..f86e7f636534f275f3da78567becb1cbb11ce544 100644 --- a/src/main/java/teetime/framework/AbstractInterThreadPipe.java +++ b/src/main/java/teetime/framework/AbstractInterThreadPipe.java @@ -24,7 +24,6 @@ import org.jctools.queues.spec.Ordering; import org.jctools.queues.spec.Preference; import teetime.framework.signal.ISignal; -import teetime.framework.signal.InitializingSignal; import teetime.framework.signal.StartingSignal; import teetime.util.framework.concurrent.queue.PCBlockingQueue; import teetime.util.framework.concurrent.queue.putstrategy.PutStrategy; @@ -65,15 +64,6 @@ public abstract class AbstractInterThreadPipe<T> extends AbstractPipe<T> { // do nothing } - @Override - public final void waitForInitializingSignal() throws InterruptedException { - final ISignal signal = signalQueue.take(); - if (!(signal instanceof InitializingSignal)) { - throw new IllegalStateException("Expected InitializingSignal, but was " + signal.getClass().getSimpleName()); - } - cachedTargetStage.onSignal(signal, getTargetPort()); - } - @Override public final void waitForStartSignal() throws InterruptedException { final ISignal signal = signalQueue.take(); diff --git a/src/main/java/teetime/framework/AbstractIntraThreadPipe.java b/src/main/java/teetime/framework/AbstractIntraThreadPipe.java index 2be7f3e39aba05e91c22fe8226bc464a378a6e00..3c8f7c6c073b3e72948ab61c9d38ec9428f72b90 100644 --- a/src/main/java/teetime/framework/AbstractIntraThreadPipe.java +++ b/src/main/java/teetime/framework/AbstractIntraThreadPipe.java @@ -57,9 +57,4 @@ public abstract class AbstractIntraThreadPipe<T> extends AbstractPipe<T> { // do nothing } - @SuppressWarnings("PMD.EmptyMethodInAbstractClassShouldBeAbstract") - @Override - public void waitForInitializingSignal() throws InterruptedException { - // do nothing - } } diff --git a/src/main/java/teetime/framework/AbstractStage.java b/src/main/java/teetime/framework/AbstractStage.java index ec7070130bc0f6c3e3a41361b9282c192938f085..308165a764b4100c8b730b0eb345f0ca51c50988 100644 --- a/src/main/java/teetime/framework/AbstractStage.java +++ b/src/main/java/teetime/framework/AbstractStage.java @@ -85,11 +85,6 @@ public abstract class AbstractStage extends Stage { return signalAlreadyReceived; } - @Override - public void onInitializing() throws Exception { - changeState(StageState.INITIALIZED); - } - private void changeState(final StageState newState) { currentState = newState; logger.trace(newState.toString()); diff --git a/src/main/java/teetime/framework/InputPort.java b/src/main/java/teetime/framework/InputPort.java index d749591db16f37d27ba36a154fdd922577a5b621..a11035a31c38955dde678e29dedcc40a28838c05 100644 --- a/src/main/java/teetime/framework/InputPort.java +++ b/src/main/java/teetime/framework/InputPort.java @@ -47,8 +47,4 @@ public class InputPort<T> extends AbstractPort<T> { pipe.waitForStartSignal(); } - public void waitForInitializingSignal() throws InterruptedException { - pipe.waitForInitializingSignal(); - }; - } diff --git a/src/main/java/teetime/framework/RunnableConsumerStage.java b/src/main/java/teetime/framework/RunnableConsumerStage.java index 03ecfdc28774964a797f4b2923d637267f9daa36..6064511953b3f71f88ad1e13e115ce5fafdc7c55 100644 --- a/src/main/java/teetime/framework/RunnableConsumerStage.java +++ b/src/main/java/teetime/framework/RunnableConsumerStage.java @@ -32,10 +32,6 @@ final class RunnableConsumerStage extends AbstractRunnableStage { @Override protected void beforeStageExecution() throws InterruptedException { - logger.trace("waitForInitializingSignal"); - for (InputPort<?> inputPort : stage.getInputPorts()) { - inputPort.waitForInitializingSignal(); - } logger.trace("waitForStartingSignal"); for (InputPort<?> inputPort : stage.getInputPorts()) { inputPort.waitForStartSignal(); diff --git a/src/main/java/teetime/framework/RunnableProducerStage.java b/src/main/java/teetime/framework/RunnableProducerStage.java index 9ccfe80cb4c35f32287cd27ad5d3e34b225fa16e..9ee6cf833bdc249273b8be00ca43538edd8f4c3d 100644 --- a/src/main/java/teetime/framework/RunnableProducerStage.java +++ b/src/main/java/teetime/framework/RunnableProducerStage.java @@ -17,14 +17,12 @@ package teetime.framework; import java.util.concurrent.Semaphore; -import teetime.framework.signal.InitializingSignal; import teetime.framework.signal.StartingSignal; import teetime.framework.signal.TerminatingSignal; public final class RunnableProducerStage extends AbstractRunnableStage { private final Semaphore startSemaphore = new Semaphore(0); - private final Semaphore initSemaphore = new Semaphore(0); RunnableProducerStage(final Stage stage) { super(stage); @@ -32,8 +30,6 @@ public final class RunnableProducerStage extends AbstractRunnableStage { @Override protected void beforeStageExecution() throws InterruptedException { - waitForInitializingSignal(); - this.stage.onSignal(new InitializingSignal(), null); waitForStartingSignal(); this.stage.onSignal(new StartingSignal(), null); } @@ -49,19 +45,10 @@ public final class RunnableProducerStage extends AbstractRunnableStage { this.stage.onSignal(terminatingSignal, null); } - public void triggerInitializingSignal() { - initSemaphore.release(); - } - public void triggerStartingSignal() { startSemaphore.release(); } - private void waitForInitializingSignal() throws InterruptedException { - logger.trace("waitForInitializingSignal"); - initSemaphore.acquire(); - } - private void waitForStartingSignal() throws InterruptedException { logger.trace("waitForStartingSignal"); startSemaphore.acquire(); diff --git a/src/main/java/teetime/framework/RuntimeServiceFacade.java b/src/main/java/teetime/framework/RuntimeServiceFacade.java index 2670bed14fc0d61274565b7415e3cef8cc5108f2..b557b07998910de135d19057814458bd9b72f8fe 100644 --- a/src/main/java/teetime/framework/RuntimeServiceFacade.java +++ b/src/main/java/teetime/framework/RuntimeServiceFacade.java @@ -26,4 +26,8 @@ public final class RuntimeServiceFacade { public void startWithinNewThread(final Stage previousStage, final Stage stage) { previousStage.getOwningContext().getThreadService().startStageAtRuntime(stage); } + + public void abortExecution(final Stage stage) { + stage.getOwningContext().abortConfigurationRun(); + } } diff --git a/src/main/java/teetime/framework/Stage.java b/src/main/java/teetime/framework/Stage.java index 582f7b3f1c128b41e8894dc6a23d1c1417e8e3f5..ae33ba66d87b285c4b6adb1f85f7b4e79ed358cd 100644 --- a/src/main/java/teetime/framework/Stage.java +++ b/src/main/java/teetime/framework/Stage.java @@ -165,9 +165,6 @@ public abstract class Stage { * @throws Exception * an arbitrary exception if an error occurs during the initialization */ - @SuppressWarnings("PMD.SignatureDeclareThrowsException") - public abstract void onInitializing() throws Exception; - @SuppressWarnings("PMD.SignatureDeclareThrowsException") public abstract void onStarting() throws Exception; diff --git a/src/main/java/teetime/framework/TeeTimeThread.java b/src/main/java/teetime/framework/TeeTimeThread.java index dc0d93997784bf897779a4264c849304367e870f..3fe0d2fe4ceb83df56dd2a7fa6292376ff5849d2 100644 --- a/src/main/java/teetime/framework/TeeTimeThread.java +++ b/src/main/java/teetime/framework/TeeTimeThread.java @@ -24,12 +24,6 @@ public class TeeTimeThread extends Thread { this.runnable = runnable; } - public void sendInitializingSignal() { - if (runnable instanceof RunnableProducerStage) { - ((RunnableProducerStage) runnable).triggerInitializingSignal(); - } - } - public void sendStartingSignal() { if (runnable instanceof RunnableProducerStage) { ((RunnableProducerStage) runnable).triggerStartingSignal(); diff --git a/src/main/java/teetime/framework/ThreadService.java b/src/main/java/teetime/framework/ThreadService.java index 3197606ed3e6eaa18bf8b6e6c070b859d3465d74..ae0d49d11367469caa51f1a3937e2bd08313594d 100644 --- a/src/main/java/teetime/framework/ThreadService.java +++ b/src/main/java/teetime/framework/ThreadService.java @@ -58,7 +58,6 @@ class ThreadService extends AbstractService<ThreadService> { Set<Stage> newThreadableStages = initialize(startStage); startThreads(newThreadableStages); - sendInitializingSignal(newThreadableStages); } void startStageAtRuntime(final Stage newStage) { @@ -66,7 +65,6 @@ class ThreadService extends AbstractService<ThreadService> { Set<Stage> newThreadableStages = initialize(newStage); startThreads(newThreadableStages); - sendInitializingSignal(newThreadableStages); sendStartingSignal(newThreadableStages); } @@ -130,12 +128,6 @@ class ThreadService extends AbstractService<ThreadService> { } } - private void sendInitializingSignal(final Set<Stage> threadableStages) { - for (Stage stage : threadableStages) { - ((TeeTimeThread) stage.getOwningThread()).sendInitializingSignal(); - } - } - private void sendStartingSignal(final Set<Stage> newThreadableStages) { for (Stage stage : newThreadableStages) { ((TeeTimeThread) stage.getOwningThread()).sendStartingSignal(); diff --git a/src/main/java/teetime/framework/pipe/DummyPipe.java b/src/main/java/teetime/framework/pipe/DummyPipe.java index a74d68ddbc99159d653aaa3adcd6585e9f715f59..48639feef0f0316dbe8a73ddbcab5930a56f659b 100644 --- a/src/main/java/teetime/framework/pipe/DummyPipe.java +++ b/src/main/java/teetime/framework/pipe/DummyPipe.java @@ -91,11 +91,6 @@ public final class DummyPipe implements IPipe<Object> { } - @Override - public void waitForInitializingSignal() throws InterruptedException { - - } - @Override public void close() { diff --git a/src/main/java/teetime/framework/pipe/IPipe.java b/src/main/java/teetime/framework/pipe/IPipe.java index 7610bd7d348d5bfa8b7520feb892996dd400ee47..5f266c6fed4deb6ab0a46f47be9ea8f134cdaed1 100644 --- a/src/main/java/teetime/framework/pipe/IPipe.java +++ b/src/main/java/teetime/framework/pipe/IPipe.java @@ -97,8 +97,6 @@ public interface IPipe<T> { void waitForStartSignal() throws InterruptedException; - void waitForInitializingSignal() throws InterruptedException; - void close(); } diff --git a/src/main/java/teetime/framework/pipe/InstantiationPipe.java b/src/main/java/teetime/framework/pipe/InstantiationPipe.java index b0cae7f675e6f5d3ff85780745baa72d6b343f12..dc9f3ce55c91c793a39d446bf66eb05e4d5ffd25 100644 --- a/src/main/java/teetime/framework/pipe/InstantiationPipe.java +++ b/src/main/java/teetime/framework/pipe/InstantiationPipe.java @@ -101,11 +101,6 @@ public class InstantiationPipe<T> implements IPipe<T> { throw new IllegalStateException(ERROR_MESSAGE); } - @Override - public void waitForInitializingSignal() throws InterruptedException { - throw new IllegalStateException(ERROR_MESSAGE); - } - @Override public void close() { throw new IllegalStateException(ERROR_MESSAGE); diff --git a/src/main/java/teetime/framework/signal/InitializingSignal.java b/src/main/java/teetime/framework/signal/InitializingSignal.java deleted file mode 100644 index 5de5d00fdf15cc5b49a1bfe13c2ead05edebfab7..0000000000000000000000000000000000000000 --- a/src/main/java/teetime/framework/signal/InitializingSignal.java +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://christianwulf.github.io/teetime) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package teetime.framework.signal; - -import java.util.List; -import java.util.Set; - -import teetime.framework.InputPort; -import teetime.framework.Stage; - -public final class InitializingSignal extends AbstractSignal { - - @Override - public void trigger(final Stage stage) { - try { - stage.onInitializing(); - } catch (final Exception e) { // NOCS NOPMD (Stages can throw any arbitrary Exception) - this.catchedExceptions.add(e); - LOGGER.error("Exception while sending the initializing signal", e); - } - } - - @Override - public boolean mayBeTriggered(final Set<InputPort<?>> receivedInputPorts, final List<InputPort<?>> allInputPorts) { - return true; - } - -} diff --git a/src/main/java/teetime/framework/signal/StartingSignal.java b/src/main/java/teetime/framework/signal/StartingSignal.java index 5b547dd5e6fb6253af064271b5578fd72598adf3..cb668ba995174e82d2b0643c327e5729d54c986b 100644 --- a/src/main/java/teetime/framework/signal/StartingSignal.java +++ b/src/main/java/teetime/framework/signal/StartingSignal.java @@ -19,6 +19,7 @@ import java.util.List; import java.util.Set; import teetime.framework.InputPort; +import teetime.framework.RuntimeServiceFacade; import teetime.framework.Stage; public final class StartingSignal extends AbstractSignal { @@ -29,6 +30,7 @@ public final class StartingSignal extends AbstractSignal { stage.onStarting(); } catch (final Exception e) { // NOCS NOPMD (Stages can throw any arbitrary Exception) this.catchedExceptions.add(e); + RuntimeServiceFacade.INSTANCE.abortExecution(stage); LOGGER.error("Exception while sending the start signal", e); } } diff --git a/src/main/java/teetime/stage/basic/distributor/dynamic/CreatePortAction.java b/src/main/java/teetime/stage/basic/distributor/dynamic/CreatePortAction.java index bc6faa47df1868e9b49235ad1d4a9b6439d261dc..3110e3cb12b37840e0e960bee1085c2c1b5de289 100644 --- a/src/main/java/teetime/stage/basic/distributor/dynamic/CreatePortAction.java +++ b/src/main/java/teetime/stage/basic/distributor/dynamic/CreatePortAction.java @@ -18,11 +18,10 @@ package teetime.stage.basic.distributor.dynamic; import java.util.ArrayList; import java.util.List; -import teetime.framework.RuntimeServiceFacade; import teetime.framework.InputPort; import teetime.framework.OutputPort; +import teetime.framework.RuntimeServiceFacade; import teetime.framework.pipe.SpScPipeFactory; -import teetime.framework.signal.InitializingSignal; import teetime.framework.signal.StartingSignal; import teetime.util.framework.port.PortAction; @@ -51,7 +50,6 @@ public class CreatePortAction<T> implements PortAction<DynamicDistributor<T>> { RuntimeServiceFacade.INSTANCE.startWithinNewThread(dynamicDistributor, inputPort.getOwningStage()); - newOutputPort.sendSignal(new InitializingSignal()); newOutputPort.sendSignal(new StartingSignal()); // FIXME pass the new thread to the analysis so that it can terminate the thread at the end diff --git a/src/test/java/teetime/framework/RunnableProducerStageTest.java b/src/test/java/teetime/framework/RunnableProducerStageTest.java index 24d7a085e1685e8e3873ee9dc339a878fc6039ce..c0f8e6bfca0bcb2f3bd55c9312304fff6f4e9b3a 100644 --- a/src/test/java/teetime/framework/RunnableProducerStageTest.java +++ b/src/test/java/teetime/framework/RunnableProducerStageTest.java @@ -40,12 +40,8 @@ public class RunnableProducerStageTest { thread.start(); - // Not running and not initialized - assertFalse(testStage.executed && testStage.initialized); - runnable.triggerInitializingSignal(); - // Not running, but initialized - assertFalse(testStage.executed && !testStage.initialized); + assertFalse(testStage.executed); runnable.triggerStartingSignal(); thread.join(); diff --git a/src/test/java/teetime/framework/RunnableTestStage.java b/src/test/java/teetime/framework/RunnableTestStage.java index 5fa78aadf0690da4b8bf62a01e9d40a68218a969..859741e095d5eb8789f270bf670226d8459ac56a 100644 --- a/src/test/java/teetime/framework/RunnableTestStage.java +++ b/src/test/java/teetime/framework/RunnableTestStage.java @@ -25,10 +25,4 @@ class RunnableTestStage extends AbstractProducerStage<Object> { this.terminate(); } - @Override - public void onInitializing() throws Exception { - super.onInitializing(); - initialized = true; - } - } diff --git a/src/test/java/teetime/framework/pipe/SpScPipeTest.java b/src/test/java/teetime/framework/pipe/SpScPipeTest.java index 3cc20d78487f1e5754f797f36cce9f3b3cf286a8..8e61718b5ba968f8e97775fec8a23dc914bc09d8 100644 --- a/src/test/java/teetime/framework/pipe/SpScPipeTest.java +++ b/src/test/java/teetime/framework/pipe/SpScPipeTest.java @@ -27,7 +27,6 @@ import teetime.framework.AbstractInterThreadPipe; import teetime.framework.InputPort; import teetime.framework.OutputPort; import teetime.framework.signal.ISignal; -import teetime.framework.signal.InitializingSignal; import teetime.framework.signal.StartingSignal; import teetime.framework.signal.TerminatingSignal; import teetime.framework.signal.ValidatingSignal; @@ -47,14 +46,11 @@ public class SpScPipeTest { List<ISignal> signals = new ArrayList<ISignal>(); signals.add(new StartingSignal()); signals.add(new TerminatingSignal()); - signals.add(new InitializingSignal()); signals.add(new ValidatingSignal()); signals.add(new StartingSignal()); signals.add(new TerminatingSignal()); - signals.add(new InitializingSignal()); signals.add(new ValidatingSignal()); signals.add(new StartingSignal()); - signals.add(new InitializingSignal()); signals.add(new TerminatingSignal()); signals.add(new ValidatingSignal()); diff --git a/src/test/java/teetime/stage/basic/merger/MergerTestingPipe.java b/src/test/java/teetime/stage/basic/merger/MergerTestingPipe.java index 62b2d97b49eebce2ef8a66be1b1a0efb4b6d258d..e0bd8f0a4035ddced7437c4815daeb7b56729cb3 100644 --- a/src/test/java/teetime/stage/basic/merger/MergerTestingPipe.java +++ b/src/test/java/teetime/stage/basic/merger/MergerTestingPipe.java @@ -107,9 +107,6 @@ class MergerTestingPipe implements IPipe { @Override public void waitForStartSignal() throws InterruptedException {} - @Override - public void waitForInitializingSignal() throws InterruptedException {} - @Override public void close() {}