From f4bb7747826b048cde3d5bd51f36e0ad8e7e6e97 Mon Sep 17 00:00:00 2001 From: Nelson Tavares de Sousa <stu103017@mail.uni-kiel.de> Date: Thu, 6 Aug 2015 14:44:16 +0200 Subject: [PATCH] removed InitializingSignal, added abort method in RuntimeServiceFacade --- .../framework/AbstractInterThreadPipe.java | 10 ----- .../framework/AbstractIntraThreadPipe.java | 5 --- .../java/teetime/framework/AbstractStage.java | 5 --- .../java/teetime/framework/InputPort.java | 4 -- .../framework/RunnableConsumerStage.java | 4 -- .../framework/RunnableProducerStage.java | 13 ------ .../framework/RuntimeServiceFacade.java | 4 ++ src/main/java/teetime/framework/Stage.java | 3 -- .../java/teetime/framework/TeeTimeThread.java | 6 --- .../java/teetime/framework/ThreadService.java | 8 ---- .../teetime/framework/pipe/DummyPipe.java | 5 --- .../java/teetime/framework/pipe/IPipe.java | 2 - .../framework/pipe/InstantiationPipe.java | 5 --- .../framework/signal/InitializingSignal.java | 41 ------------------- .../framework/signal/StartingSignal.java | 2 + .../distributor/dynamic/CreatePortAction.java | 4 +- .../framework/RunnableProducerStageTest.java | 6 +-- .../teetime/framework/RunnableTestStage.java | 6 --- .../teetime/framework/pipe/SpScPipeTest.java | 4 -- .../stage/basic/merger/MergerTestingPipe.java | 3 -- 20 files changed, 8 insertions(+), 132 deletions(-) delete mode 100644 src/main/java/teetime/framework/signal/InitializingSignal.java diff --git a/src/main/java/teetime/framework/AbstractInterThreadPipe.java b/src/main/java/teetime/framework/AbstractInterThreadPipe.java index b3f25a75..f86e7f63 100644 --- a/src/main/java/teetime/framework/AbstractInterThreadPipe.java +++ b/src/main/java/teetime/framework/AbstractInterThreadPipe.java @@ -24,7 +24,6 @@ import org.jctools.queues.spec.Ordering; import org.jctools.queues.spec.Preference; import teetime.framework.signal.ISignal; -import teetime.framework.signal.InitializingSignal; import teetime.framework.signal.StartingSignal; import teetime.util.framework.concurrent.queue.PCBlockingQueue; import teetime.util.framework.concurrent.queue.putstrategy.PutStrategy; @@ -65,15 +64,6 @@ public abstract class AbstractInterThreadPipe<T> extends AbstractPipe<T> { // do nothing } - @Override - public final void waitForInitializingSignal() throws InterruptedException { - final ISignal signal = signalQueue.take(); - if (!(signal instanceof InitializingSignal)) { - throw new IllegalStateException("Expected InitializingSignal, but was " + signal.getClass().getSimpleName()); - } - cachedTargetStage.onSignal(signal, getTargetPort()); - } - @Override public final void waitForStartSignal() throws InterruptedException { final ISignal signal = signalQueue.take(); diff --git a/src/main/java/teetime/framework/AbstractIntraThreadPipe.java b/src/main/java/teetime/framework/AbstractIntraThreadPipe.java index 2be7f3e3..3c8f7c6c 100644 --- a/src/main/java/teetime/framework/AbstractIntraThreadPipe.java +++ b/src/main/java/teetime/framework/AbstractIntraThreadPipe.java @@ -57,9 +57,4 @@ public abstract class AbstractIntraThreadPipe<T> extends AbstractPipe<T> { // do nothing } - @SuppressWarnings("PMD.EmptyMethodInAbstractClassShouldBeAbstract") - @Override - public void waitForInitializingSignal() throws InterruptedException { - // do nothing - } } diff --git a/src/main/java/teetime/framework/AbstractStage.java b/src/main/java/teetime/framework/AbstractStage.java index ec707013..308165a7 100644 --- a/src/main/java/teetime/framework/AbstractStage.java +++ b/src/main/java/teetime/framework/AbstractStage.java @@ -85,11 +85,6 @@ public abstract class AbstractStage extends Stage { return signalAlreadyReceived; } - @Override - public void onInitializing() throws Exception { - changeState(StageState.INITIALIZED); - } - private void changeState(final StageState newState) { currentState = newState; logger.trace(newState.toString()); diff --git a/src/main/java/teetime/framework/InputPort.java b/src/main/java/teetime/framework/InputPort.java index d749591d..a11035a3 100644 --- a/src/main/java/teetime/framework/InputPort.java +++ b/src/main/java/teetime/framework/InputPort.java @@ -47,8 +47,4 @@ public class InputPort<T> extends AbstractPort<T> { pipe.waitForStartSignal(); } - public void waitForInitializingSignal() throws InterruptedException { - pipe.waitForInitializingSignal(); - }; - } diff --git a/src/main/java/teetime/framework/RunnableConsumerStage.java b/src/main/java/teetime/framework/RunnableConsumerStage.java index 03ecfdc2..60645119 100644 --- a/src/main/java/teetime/framework/RunnableConsumerStage.java +++ b/src/main/java/teetime/framework/RunnableConsumerStage.java @@ -32,10 +32,6 @@ final class RunnableConsumerStage extends AbstractRunnableStage { @Override protected void beforeStageExecution() throws InterruptedException { - logger.trace("waitForInitializingSignal"); - for (InputPort<?> inputPort : stage.getInputPorts()) { - inputPort.waitForInitializingSignal(); - } logger.trace("waitForStartingSignal"); for (InputPort<?> inputPort : stage.getInputPorts()) { inputPort.waitForStartSignal(); diff --git a/src/main/java/teetime/framework/RunnableProducerStage.java b/src/main/java/teetime/framework/RunnableProducerStage.java index 9ccfe80c..9ee6cf83 100644 --- a/src/main/java/teetime/framework/RunnableProducerStage.java +++ b/src/main/java/teetime/framework/RunnableProducerStage.java @@ -17,14 +17,12 @@ package teetime.framework; import java.util.concurrent.Semaphore; -import teetime.framework.signal.InitializingSignal; import teetime.framework.signal.StartingSignal; import teetime.framework.signal.TerminatingSignal; public final class RunnableProducerStage extends AbstractRunnableStage { private final Semaphore startSemaphore = new Semaphore(0); - private final Semaphore initSemaphore = new Semaphore(0); RunnableProducerStage(final Stage stage) { super(stage); @@ -32,8 +30,6 @@ public final class RunnableProducerStage extends AbstractRunnableStage { @Override protected void beforeStageExecution() throws InterruptedException { - waitForInitializingSignal(); - this.stage.onSignal(new InitializingSignal(), null); waitForStartingSignal(); this.stage.onSignal(new StartingSignal(), null); } @@ -49,19 +45,10 @@ public final class RunnableProducerStage extends AbstractRunnableStage { this.stage.onSignal(terminatingSignal, null); } - public void triggerInitializingSignal() { - initSemaphore.release(); - } - public void triggerStartingSignal() { startSemaphore.release(); } - private void waitForInitializingSignal() throws InterruptedException { - logger.trace("waitForInitializingSignal"); - initSemaphore.acquire(); - } - private void waitForStartingSignal() throws InterruptedException { logger.trace("waitForStartingSignal"); startSemaphore.acquire(); diff --git a/src/main/java/teetime/framework/RuntimeServiceFacade.java b/src/main/java/teetime/framework/RuntimeServiceFacade.java index 2670bed1..b557b079 100644 --- a/src/main/java/teetime/framework/RuntimeServiceFacade.java +++ b/src/main/java/teetime/framework/RuntimeServiceFacade.java @@ -26,4 +26,8 @@ public final class RuntimeServiceFacade { public void startWithinNewThread(final Stage previousStage, final Stage stage) { previousStage.getOwningContext().getThreadService().startStageAtRuntime(stage); } + + public void abortExecution(final Stage stage) { + stage.getOwningContext().abortConfigurationRun(); + } } diff --git a/src/main/java/teetime/framework/Stage.java b/src/main/java/teetime/framework/Stage.java index 582f7b3f..ae33ba66 100644 --- a/src/main/java/teetime/framework/Stage.java +++ b/src/main/java/teetime/framework/Stage.java @@ -165,9 +165,6 @@ public abstract class Stage { * @throws Exception * an arbitrary exception if an error occurs during the initialization */ - @SuppressWarnings("PMD.SignatureDeclareThrowsException") - public abstract void onInitializing() throws Exception; - @SuppressWarnings("PMD.SignatureDeclareThrowsException") public abstract void onStarting() throws Exception; diff --git a/src/main/java/teetime/framework/TeeTimeThread.java b/src/main/java/teetime/framework/TeeTimeThread.java index dc0d9399..3fe0d2fe 100644 --- a/src/main/java/teetime/framework/TeeTimeThread.java +++ b/src/main/java/teetime/framework/TeeTimeThread.java @@ -24,12 +24,6 @@ public class TeeTimeThread extends Thread { this.runnable = runnable; } - public void sendInitializingSignal() { - if (runnable instanceof RunnableProducerStage) { - ((RunnableProducerStage) runnable).triggerInitializingSignal(); - } - } - public void sendStartingSignal() { if (runnable instanceof RunnableProducerStage) { ((RunnableProducerStage) runnable).triggerStartingSignal(); diff --git a/src/main/java/teetime/framework/ThreadService.java b/src/main/java/teetime/framework/ThreadService.java index 3197606e..ae0d49d1 100644 --- a/src/main/java/teetime/framework/ThreadService.java +++ b/src/main/java/teetime/framework/ThreadService.java @@ -58,7 +58,6 @@ class ThreadService extends AbstractService<ThreadService> { Set<Stage> newThreadableStages = initialize(startStage); startThreads(newThreadableStages); - sendInitializingSignal(newThreadableStages); } void startStageAtRuntime(final Stage newStage) { @@ -66,7 +65,6 @@ class ThreadService extends AbstractService<ThreadService> { Set<Stage> newThreadableStages = initialize(newStage); startThreads(newThreadableStages); - sendInitializingSignal(newThreadableStages); sendStartingSignal(newThreadableStages); } @@ -130,12 +128,6 @@ class ThreadService extends AbstractService<ThreadService> { } } - private void sendInitializingSignal(final Set<Stage> threadableStages) { - for (Stage stage : threadableStages) { - ((TeeTimeThread) stage.getOwningThread()).sendInitializingSignal(); - } - } - private void sendStartingSignal(final Set<Stage> newThreadableStages) { for (Stage stage : newThreadableStages) { ((TeeTimeThread) stage.getOwningThread()).sendStartingSignal(); diff --git a/src/main/java/teetime/framework/pipe/DummyPipe.java b/src/main/java/teetime/framework/pipe/DummyPipe.java index a74d68dd..48639fee 100644 --- a/src/main/java/teetime/framework/pipe/DummyPipe.java +++ b/src/main/java/teetime/framework/pipe/DummyPipe.java @@ -91,11 +91,6 @@ public final class DummyPipe implements IPipe<Object> { } - @Override - public void waitForInitializingSignal() throws InterruptedException { - - } - @Override public void close() { diff --git a/src/main/java/teetime/framework/pipe/IPipe.java b/src/main/java/teetime/framework/pipe/IPipe.java index 7610bd7d..5f266c6f 100644 --- a/src/main/java/teetime/framework/pipe/IPipe.java +++ b/src/main/java/teetime/framework/pipe/IPipe.java @@ -97,8 +97,6 @@ public interface IPipe<T> { void waitForStartSignal() throws InterruptedException; - void waitForInitializingSignal() throws InterruptedException; - void close(); } diff --git a/src/main/java/teetime/framework/pipe/InstantiationPipe.java b/src/main/java/teetime/framework/pipe/InstantiationPipe.java index b0cae7f6..dc9f3ce5 100644 --- a/src/main/java/teetime/framework/pipe/InstantiationPipe.java +++ b/src/main/java/teetime/framework/pipe/InstantiationPipe.java @@ -101,11 +101,6 @@ public class InstantiationPipe<T> implements IPipe<T> { throw new IllegalStateException(ERROR_MESSAGE); } - @Override - public void waitForInitializingSignal() throws InterruptedException { - throw new IllegalStateException(ERROR_MESSAGE); - } - @Override public void close() { throw new IllegalStateException(ERROR_MESSAGE); diff --git a/src/main/java/teetime/framework/signal/InitializingSignal.java b/src/main/java/teetime/framework/signal/InitializingSignal.java deleted file mode 100644 index 5de5d00f..00000000 --- a/src/main/java/teetime/framework/signal/InitializingSignal.java +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://christianwulf.github.io/teetime) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package teetime.framework.signal; - -import java.util.List; -import java.util.Set; - -import teetime.framework.InputPort; -import teetime.framework.Stage; - -public final class InitializingSignal extends AbstractSignal { - - @Override - public void trigger(final Stage stage) { - try { - stage.onInitializing(); - } catch (final Exception e) { // NOCS NOPMD (Stages can throw any arbitrary Exception) - this.catchedExceptions.add(e); - LOGGER.error("Exception while sending the initializing signal", e); - } - } - - @Override - public boolean mayBeTriggered(final Set<InputPort<?>> receivedInputPorts, final List<InputPort<?>> allInputPorts) { - return true; - } - -} diff --git a/src/main/java/teetime/framework/signal/StartingSignal.java b/src/main/java/teetime/framework/signal/StartingSignal.java index 5b547dd5..cb668ba9 100644 --- a/src/main/java/teetime/framework/signal/StartingSignal.java +++ b/src/main/java/teetime/framework/signal/StartingSignal.java @@ -19,6 +19,7 @@ import java.util.List; import java.util.Set; import teetime.framework.InputPort; +import teetime.framework.RuntimeServiceFacade; import teetime.framework.Stage; public final class StartingSignal extends AbstractSignal { @@ -29,6 +30,7 @@ public final class StartingSignal extends AbstractSignal { stage.onStarting(); } catch (final Exception e) { // NOCS NOPMD (Stages can throw any arbitrary Exception) this.catchedExceptions.add(e); + RuntimeServiceFacade.INSTANCE.abortExecution(stage); LOGGER.error("Exception while sending the start signal", e); } } diff --git a/src/main/java/teetime/stage/basic/distributor/dynamic/CreatePortAction.java b/src/main/java/teetime/stage/basic/distributor/dynamic/CreatePortAction.java index bc6faa47..3110e3cb 100644 --- a/src/main/java/teetime/stage/basic/distributor/dynamic/CreatePortAction.java +++ b/src/main/java/teetime/stage/basic/distributor/dynamic/CreatePortAction.java @@ -18,11 +18,10 @@ package teetime.stage.basic.distributor.dynamic; import java.util.ArrayList; import java.util.List; -import teetime.framework.RuntimeServiceFacade; import teetime.framework.InputPort; import teetime.framework.OutputPort; +import teetime.framework.RuntimeServiceFacade; import teetime.framework.pipe.SpScPipeFactory; -import teetime.framework.signal.InitializingSignal; import teetime.framework.signal.StartingSignal; import teetime.util.framework.port.PortAction; @@ -51,7 +50,6 @@ public class CreatePortAction<T> implements PortAction<DynamicDistributor<T>> { RuntimeServiceFacade.INSTANCE.startWithinNewThread(dynamicDistributor, inputPort.getOwningStage()); - newOutputPort.sendSignal(new InitializingSignal()); newOutputPort.sendSignal(new StartingSignal()); // FIXME pass the new thread to the analysis so that it can terminate the thread at the end diff --git a/src/test/java/teetime/framework/RunnableProducerStageTest.java b/src/test/java/teetime/framework/RunnableProducerStageTest.java index 24d7a085..c0f8e6bf 100644 --- a/src/test/java/teetime/framework/RunnableProducerStageTest.java +++ b/src/test/java/teetime/framework/RunnableProducerStageTest.java @@ -40,12 +40,8 @@ public class RunnableProducerStageTest { thread.start(); - // Not running and not initialized - assertFalse(testStage.executed && testStage.initialized); - runnable.triggerInitializingSignal(); - // Not running, but initialized - assertFalse(testStage.executed && !testStage.initialized); + assertFalse(testStage.executed); runnable.triggerStartingSignal(); thread.join(); diff --git a/src/test/java/teetime/framework/RunnableTestStage.java b/src/test/java/teetime/framework/RunnableTestStage.java index 5fa78aad..859741e0 100644 --- a/src/test/java/teetime/framework/RunnableTestStage.java +++ b/src/test/java/teetime/framework/RunnableTestStage.java @@ -25,10 +25,4 @@ class RunnableTestStage extends AbstractProducerStage<Object> { this.terminate(); } - @Override - public void onInitializing() throws Exception { - super.onInitializing(); - initialized = true; - } - } diff --git a/src/test/java/teetime/framework/pipe/SpScPipeTest.java b/src/test/java/teetime/framework/pipe/SpScPipeTest.java index 3cc20d78..8e61718b 100644 --- a/src/test/java/teetime/framework/pipe/SpScPipeTest.java +++ b/src/test/java/teetime/framework/pipe/SpScPipeTest.java @@ -27,7 +27,6 @@ import teetime.framework.AbstractInterThreadPipe; import teetime.framework.InputPort; import teetime.framework.OutputPort; import teetime.framework.signal.ISignal; -import teetime.framework.signal.InitializingSignal; import teetime.framework.signal.StartingSignal; import teetime.framework.signal.TerminatingSignal; import teetime.framework.signal.ValidatingSignal; @@ -47,14 +46,11 @@ public class SpScPipeTest { List<ISignal> signals = new ArrayList<ISignal>(); signals.add(new StartingSignal()); signals.add(new TerminatingSignal()); - signals.add(new InitializingSignal()); signals.add(new ValidatingSignal()); signals.add(new StartingSignal()); signals.add(new TerminatingSignal()); - signals.add(new InitializingSignal()); signals.add(new ValidatingSignal()); signals.add(new StartingSignal()); - signals.add(new InitializingSignal()); signals.add(new TerminatingSignal()); signals.add(new ValidatingSignal()); diff --git a/src/test/java/teetime/stage/basic/merger/MergerTestingPipe.java b/src/test/java/teetime/stage/basic/merger/MergerTestingPipe.java index 62b2d97b..e0bd8f0a 100644 --- a/src/test/java/teetime/stage/basic/merger/MergerTestingPipe.java +++ b/src/test/java/teetime/stage/basic/merger/MergerTestingPipe.java @@ -107,9 +107,6 @@ class MergerTestingPipe implements IPipe { @Override public void waitForStartSignal() throws InterruptedException {} - @Override - public void waitForInitializingSignal() throws InterruptedException {} - @Override public void close() {} -- GitLab