Skip to content
Snippets Groups Projects
Commit f4bb7747 authored by Nelson Tavares de Sousa's avatar Nelson Tavares de Sousa
Browse files

removed InitializingSignal, added abort method in RuntimeServiceFacade

parent 9fbc4116
No related branches found
No related tags found
No related merge requests found
Showing
with 8 additions and 132 deletions
...@@ -24,7 +24,6 @@ import org.jctools.queues.spec.Ordering; ...@@ -24,7 +24,6 @@ import org.jctools.queues.spec.Ordering;
import org.jctools.queues.spec.Preference; import org.jctools.queues.spec.Preference;
import teetime.framework.signal.ISignal; import teetime.framework.signal.ISignal;
import teetime.framework.signal.InitializingSignal;
import teetime.framework.signal.StartingSignal; import teetime.framework.signal.StartingSignal;
import teetime.util.framework.concurrent.queue.PCBlockingQueue; import teetime.util.framework.concurrent.queue.PCBlockingQueue;
import teetime.util.framework.concurrent.queue.putstrategy.PutStrategy; import teetime.util.framework.concurrent.queue.putstrategy.PutStrategy;
...@@ -65,15 +64,6 @@ public abstract class AbstractInterThreadPipe<T> extends AbstractPipe<T> { ...@@ -65,15 +64,6 @@ public abstract class AbstractInterThreadPipe<T> extends AbstractPipe<T> {
// do nothing // do nothing
} }
@Override
public final void waitForInitializingSignal() throws InterruptedException {
final ISignal signal = signalQueue.take();
if (!(signal instanceof InitializingSignal)) {
throw new IllegalStateException("Expected InitializingSignal, but was " + signal.getClass().getSimpleName());
}
cachedTargetStage.onSignal(signal, getTargetPort());
}
@Override @Override
public final void waitForStartSignal() throws InterruptedException { public final void waitForStartSignal() throws InterruptedException {
final ISignal signal = signalQueue.take(); final ISignal signal = signalQueue.take();
......
...@@ -57,9 +57,4 @@ public abstract class AbstractIntraThreadPipe<T> extends AbstractPipe<T> { ...@@ -57,9 +57,4 @@ public abstract class AbstractIntraThreadPipe<T> extends AbstractPipe<T> {
// do nothing // do nothing
} }
@SuppressWarnings("PMD.EmptyMethodInAbstractClassShouldBeAbstract")
@Override
public void waitForInitializingSignal() throws InterruptedException {
// do nothing
}
} }
...@@ -85,11 +85,6 @@ public abstract class AbstractStage extends Stage { ...@@ -85,11 +85,6 @@ public abstract class AbstractStage extends Stage {
return signalAlreadyReceived; return signalAlreadyReceived;
} }
@Override
public void onInitializing() throws Exception {
changeState(StageState.INITIALIZED);
}
private void changeState(final StageState newState) { private void changeState(final StageState newState) {
currentState = newState; currentState = newState;
logger.trace(newState.toString()); logger.trace(newState.toString());
......
...@@ -47,8 +47,4 @@ public class InputPort<T> extends AbstractPort<T> { ...@@ -47,8 +47,4 @@ public class InputPort<T> extends AbstractPort<T> {
pipe.waitForStartSignal(); pipe.waitForStartSignal();
} }
public void waitForInitializingSignal() throws InterruptedException {
pipe.waitForInitializingSignal();
};
} }
...@@ -32,10 +32,6 @@ final class RunnableConsumerStage extends AbstractRunnableStage { ...@@ -32,10 +32,6 @@ final class RunnableConsumerStage extends AbstractRunnableStage {
@Override @Override
protected void beforeStageExecution() throws InterruptedException { protected void beforeStageExecution() throws InterruptedException {
logger.trace("waitForInitializingSignal");
for (InputPort<?> inputPort : stage.getInputPorts()) {
inputPort.waitForInitializingSignal();
}
logger.trace("waitForStartingSignal"); logger.trace("waitForStartingSignal");
for (InputPort<?> inputPort : stage.getInputPorts()) { for (InputPort<?> inputPort : stage.getInputPorts()) {
inputPort.waitForStartSignal(); inputPort.waitForStartSignal();
......
...@@ -17,14 +17,12 @@ package teetime.framework; ...@@ -17,14 +17,12 @@ package teetime.framework;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
import teetime.framework.signal.InitializingSignal;
import teetime.framework.signal.StartingSignal; import teetime.framework.signal.StartingSignal;
import teetime.framework.signal.TerminatingSignal; import teetime.framework.signal.TerminatingSignal;
public final class RunnableProducerStage extends AbstractRunnableStage { public final class RunnableProducerStage extends AbstractRunnableStage {
private final Semaphore startSemaphore = new Semaphore(0); private final Semaphore startSemaphore = new Semaphore(0);
private final Semaphore initSemaphore = new Semaphore(0);
RunnableProducerStage(final Stage stage) { RunnableProducerStage(final Stage stage) {
super(stage); super(stage);
...@@ -32,8 +30,6 @@ public final class RunnableProducerStage extends AbstractRunnableStage { ...@@ -32,8 +30,6 @@ public final class RunnableProducerStage extends AbstractRunnableStage {
@Override @Override
protected void beforeStageExecution() throws InterruptedException { protected void beforeStageExecution() throws InterruptedException {
waitForInitializingSignal();
this.stage.onSignal(new InitializingSignal(), null);
waitForStartingSignal(); waitForStartingSignal();
this.stage.onSignal(new StartingSignal(), null); this.stage.onSignal(new StartingSignal(), null);
} }
...@@ -49,19 +45,10 @@ public final class RunnableProducerStage extends AbstractRunnableStage { ...@@ -49,19 +45,10 @@ public final class RunnableProducerStage extends AbstractRunnableStage {
this.stage.onSignal(terminatingSignal, null); this.stage.onSignal(terminatingSignal, null);
} }
public void triggerInitializingSignal() {
initSemaphore.release();
}
public void triggerStartingSignal() { public void triggerStartingSignal() {
startSemaphore.release(); startSemaphore.release();
} }
private void waitForInitializingSignal() throws InterruptedException {
logger.trace("waitForInitializingSignal");
initSemaphore.acquire();
}
private void waitForStartingSignal() throws InterruptedException { private void waitForStartingSignal() throws InterruptedException {
logger.trace("waitForStartingSignal"); logger.trace("waitForStartingSignal");
startSemaphore.acquire(); startSemaphore.acquire();
......
...@@ -26,4 +26,8 @@ public final class RuntimeServiceFacade { ...@@ -26,4 +26,8 @@ public final class RuntimeServiceFacade {
public void startWithinNewThread(final Stage previousStage, final Stage stage) { public void startWithinNewThread(final Stage previousStage, final Stage stage) {
previousStage.getOwningContext().getThreadService().startStageAtRuntime(stage); previousStage.getOwningContext().getThreadService().startStageAtRuntime(stage);
} }
public void abortExecution(final Stage stage) {
stage.getOwningContext().abortConfigurationRun();
}
} }
...@@ -165,9 +165,6 @@ public abstract class Stage { ...@@ -165,9 +165,6 @@ public abstract class Stage {
* @throws Exception * @throws Exception
* an arbitrary exception if an error occurs during the initialization * an arbitrary exception if an error occurs during the initialization
*/ */
@SuppressWarnings("PMD.SignatureDeclareThrowsException")
public abstract void onInitializing() throws Exception;
@SuppressWarnings("PMD.SignatureDeclareThrowsException") @SuppressWarnings("PMD.SignatureDeclareThrowsException")
public abstract void onStarting() throws Exception; public abstract void onStarting() throws Exception;
......
...@@ -24,12 +24,6 @@ public class TeeTimeThread extends Thread { ...@@ -24,12 +24,6 @@ public class TeeTimeThread extends Thread {
this.runnable = runnable; this.runnable = runnable;
} }
public void sendInitializingSignal() {
if (runnable instanceof RunnableProducerStage) {
((RunnableProducerStage) runnable).triggerInitializingSignal();
}
}
public void sendStartingSignal() { public void sendStartingSignal() {
if (runnable instanceof RunnableProducerStage) { if (runnable instanceof RunnableProducerStage) {
((RunnableProducerStage) runnable).triggerStartingSignal(); ((RunnableProducerStage) runnable).triggerStartingSignal();
......
...@@ -58,7 +58,6 @@ class ThreadService extends AbstractService<ThreadService> { ...@@ -58,7 +58,6 @@ class ThreadService extends AbstractService<ThreadService> {
Set<Stage> newThreadableStages = initialize(startStage); Set<Stage> newThreadableStages = initialize(startStage);
startThreads(newThreadableStages); startThreads(newThreadableStages);
sendInitializingSignal(newThreadableStages);
} }
void startStageAtRuntime(final Stage newStage) { void startStageAtRuntime(final Stage newStage) {
...@@ -66,7 +65,6 @@ class ThreadService extends AbstractService<ThreadService> { ...@@ -66,7 +65,6 @@ class ThreadService extends AbstractService<ThreadService> {
Set<Stage> newThreadableStages = initialize(newStage); Set<Stage> newThreadableStages = initialize(newStage);
startThreads(newThreadableStages); startThreads(newThreadableStages);
sendInitializingSignal(newThreadableStages);
sendStartingSignal(newThreadableStages); sendStartingSignal(newThreadableStages);
} }
...@@ -130,12 +128,6 @@ class ThreadService extends AbstractService<ThreadService> { ...@@ -130,12 +128,6 @@ class ThreadService extends AbstractService<ThreadService> {
} }
} }
private void sendInitializingSignal(final Set<Stage> threadableStages) {
for (Stage stage : threadableStages) {
((TeeTimeThread) stage.getOwningThread()).sendInitializingSignal();
}
}
private void sendStartingSignal(final Set<Stage> newThreadableStages) { private void sendStartingSignal(final Set<Stage> newThreadableStages) {
for (Stage stage : newThreadableStages) { for (Stage stage : newThreadableStages) {
((TeeTimeThread) stage.getOwningThread()).sendStartingSignal(); ((TeeTimeThread) stage.getOwningThread()).sendStartingSignal();
......
...@@ -91,11 +91,6 @@ public final class DummyPipe implements IPipe<Object> { ...@@ -91,11 +91,6 @@ public final class DummyPipe implements IPipe<Object> {
} }
@Override
public void waitForInitializingSignal() throws InterruptedException {
}
@Override @Override
public void close() { public void close() {
......
...@@ -97,8 +97,6 @@ public interface IPipe<T> { ...@@ -97,8 +97,6 @@ public interface IPipe<T> {
void waitForStartSignal() throws InterruptedException; void waitForStartSignal() throws InterruptedException;
void waitForInitializingSignal() throws InterruptedException;
void close(); void close();
} }
...@@ -101,11 +101,6 @@ public class InstantiationPipe<T> implements IPipe<T> { ...@@ -101,11 +101,6 @@ public class InstantiationPipe<T> implements IPipe<T> {
throw new IllegalStateException(ERROR_MESSAGE); throw new IllegalStateException(ERROR_MESSAGE);
} }
@Override
public void waitForInitializingSignal() throws InterruptedException {
throw new IllegalStateException(ERROR_MESSAGE);
}
@Override @Override
public void close() { public void close() {
throw new IllegalStateException(ERROR_MESSAGE); throw new IllegalStateException(ERROR_MESSAGE);
......
/**
* Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://christianwulf.github.io/teetime)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package teetime.framework.signal;
import java.util.List;
import java.util.Set;
import teetime.framework.InputPort;
import teetime.framework.Stage;
public final class InitializingSignal extends AbstractSignal {
@Override
public void trigger(final Stage stage) {
try {
stage.onInitializing();
} catch (final Exception e) { // NOCS NOPMD (Stages can throw any arbitrary Exception)
this.catchedExceptions.add(e);
LOGGER.error("Exception while sending the initializing signal", e);
}
}
@Override
public boolean mayBeTriggered(final Set<InputPort<?>> receivedInputPorts, final List<InputPort<?>> allInputPorts) {
return true;
}
}
...@@ -19,6 +19,7 @@ import java.util.List; ...@@ -19,6 +19,7 @@ import java.util.List;
import java.util.Set; import java.util.Set;
import teetime.framework.InputPort; import teetime.framework.InputPort;
import teetime.framework.RuntimeServiceFacade;
import teetime.framework.Stage; import teetime.framework.Stage;
public final class StartingSignal extends AbstractSignal { public final class StartingSignal extends AbstractSignal {
...@@ -29,6 +30,7 @@ public final class StartingSignal extends AbstractSignal { ...@@ -29,6 +30,7 @@ public final class StartingSignal extends AbstractSignal {
stage.onStarting(); stage.onStarting();
} catch (final Exception e) { // NOCS NOPMD (Stages can throw any arbitrary Exception) } catch (final Exception e) { // NOCS NOPMD (Stages can throw any arbitrary Exception)
this.catchedExceptions.add(e); this.catchedExceptions.add(e);
RuntimeServiceFacade.INSTANCE.abortExecution(stage);
LOGGER.error("Exception while sending the start signal", e); LOGGER.error("Exception while sending the start signal", e);
} }
} }
......
...@@ -18,11 +18,10 @@ package teetime.stage.basic.distributor.dynamic; ...@@ -18,11 +18,10 @@ package teetime.stage.basic.distributor.dynamic;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import teetime.framework.RuntimeServiceFacade;
import teetime.framework.InputPort; import teetime.framework.InputPort;
import teetime.framework.OutputPort; import teetime.framework.OutputPort;
import teetime.framework.RuntimeServiceFacade;
import teetime.framework.pipe.SpScPipeFactory; import teetime.framework.pipe.SpScPipeFactory;
import teetime.framework.signal.InitializingSignal;
import teetime.framework.signal.StartingSignal; import teetime.framework.signal.StartingSignal;
import teetime.util.framework.port.PortAction; import teetime.util.framework.port.PortAction;
...@@ -51,7 +50,6 @@ public class CreatePortAction<T> implements PortAction<DynamicDistributor<T>> { ...@@ -51,7 +50,6 @@ public class CreatePortAction<T> implements PortAction<DynamicDistributor<T>> {
RuntimeServiceFacade.INSTANCE.startWithinNewThread(dynamicDistributor, inputPort.getOwningStage()); RuntimeServiceFacade.INSTANCE.startWithinNewThread(dynamicDistributor, inputPort.getOwningStage());
newOutputPort.sendSignal(new InitializingSignal());
newOutputPort.sendSignal(new StartingSignal()); newOutputPort.sendSignal(new StartingSignal());
// FIXME pass the new thread to the analysis so that it can terminate the thread at the end // FIXME pass the new thread to the analysis so that it can terminate the thread at the end
......
...@@ -40,12 +40,8 @@ public class RunnableProducerStageTest { ...@@ -40,12 +40,8 @@ public class RunnableProducerStageTest {
thread.start(); thread.start();
// Not running and not initialized
assertFalse(testStage.executed && testStage.initialized);
runnable.triggerInitializingSignal();
// Not running, but initialized // Not running, but initialized
assertFalse(testStage.executed && !testStage.initialized); assertFalse(testStage.executed);
runnable.triggerStartingSignal(); runnable.triggerStartingSignal();
thread.join(); thread.join();
......
...@@ -25,10 +25,4 @@ class RunnableTestStage extends AbstractProducerStage<Object> { ...@@ -25,10 +25,4 @@ class RunnableTestStage extends AbstractProducerStage<Object> {
this.terminate(); this.terminate();
} }
@Override
public void onInitializing() throws Exception {
super.onInitializing();
initialized = true;
}
} }
...@@ -27,7 +27,6 @@ import teetime.framework.AbstractInterThreadPipe; ...@@ -27,7 +27,6 @@ import teetime.framework.AbstractInterThreadPipe;
import teetime.framework.InputPort; import teetime.framework.InputPort;
import teetime.framework.OutputPort; import teetime.framework.OutputPort;
import teetime.framework.signal.ISignal; import teetime.framework.signal.ISignal;
import teetime.framework.signal.InitializingSignal;
import teetime.framework.signal.StartingSignal; import teetime.framework.signal.StartingSignal;
import teetime.framework.signal.TerminatingSignal; import teetime.framework.signal.TerminatingSignal;
import teetime.framework.signal.ValidatingSignal; import teetime.framework.signal.ValidatingSignal;
...@@ -47,14 +46,11 @@ public class SpScPipeTest { ...@@ -47,14 +46,11 @@ public class SpScPipeTest {
List<ISignal> signals = new ArrayList<ISignal>(); List<ISignal> signals = new ArrayList<ISignal>();
signals.add(new StartingSignal()); signals.add(new StartingSignal());
signals.add(new TerminatingSignal()); signals.add(new TerminatingSignal());
signals.add(new InitializingSignal());
signals.add(new ValidatingSignal()); signals.add(new ValidatingSignal());
signals.add(new StartingSignal()); signals.add(new StartingSignal());
signals.add(new TerminatingSignal()); signals.add(new TerminatingSignal());
signals.add(new InitializingSignal());
signals.add(new ValidatingSignal()); signals.add(new ValidatingSignal());
signals.add(new StartingSignal()); signals.add(new StartingSignal());
signals.add(new InitializingSignal());
signals.add(new TerminatingSignal()); signals.add(new TerminatingSignal());
signals.add(new ValidatingSignal()); signals.add(new ValidatingSignal());
......
...@@ -107,9 +107,6 @@ class MergerTestingPipe implements IPipe { ...@@ -107,9 +107,6 @@ class MergerTestingPipe implements IPipe {
@Override @Override
public void waitForStartSignal() throws InterruptedException {} public void waitForStartSignal() throws InterruptedException {}
@Override
public void waitForInitializingSignal() throws InterruptedException {}
@Override @Override
public void close() {} public void close() {}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment