Skip to content
Snippets Groups Projects
Commit 8e2c0366 authored by Christian Wulf's avatar Christian Wulf
Browse files

Merge branch 'abort-init' into 'master'

Abort init

fixes #170

See merge request !51
parents 4f9044eb 1ee2e6fc
No related branches found
No related tags found
No related merge requests found
Showing
with 20 additions and 173 deletions
......@@ -24,7 +24,6 @@ import org.jctools.queues.spec.Ordering;
import org.jctools.queues.spec.Preference;
import teetime.framework.signal.ISignal;
import teetime.framework.signal.InitializingSignal;
import teetime.framework.signal.StartingSignal;
import teetime.util.framework.concurrent.queue.PCBlockingQueue;
import teetime.util.framework.concurrent.queue.putstrategy.PutStrategy;
......@@ -65,15 +64,6 @@ public abstract class AbstractInterThreadPipe<T> extends AbstractPipe<T> {
// do nothing
}
@Override
public final void waitForInitializingSignal() throws InterruptedException {
final ISignal signal = signalQueue.take();
if (!(signal instanceof InitializingSignal)) {
throw new IllegalStateException("Expected InitializingSignal, but was " + signal.getClass().getSimpleName());
}
cachedTargetStage.onSignal(signal, getTargetPort());
}
@Override
public final void waitForStartSignal() throws InterruptedException {
final ISignal signal = signalQueue.take();
......
......@@ -57,9 +57,4 @@ public abstract class AbstractIntraThreadPipe<T> extends AbstractPipe<T> {
// do nothing
}
@SuppressWarnings("PMD.EmptyMethodInAbstractClassShouldBeAbstract")
@Override
public void waitForInitializingSignal() throws InterruptedException {
// do nothing
}
}
......@@ -49,9 +49,9 @@ abstract class AbstractRunnableStage implements Runnable {
throw new IllegalArgumentException("Argument stage may not have a nullable owning context");
}
try {
do {
while (!stage.shouldBeTerminated()) {
executeStage();
} while (!stage.shouldBeTerminated());
}
} catch (TerminateException e) {
this.stage.terminate();
stage.getOwningContext().abortConfigurationRun();
......@@ -65,13 +65,16 @@ abstract class AbstractRunnableStage implements Runnable {
} catch (InterruptedException e) {
this.logger.error(TERMINATING_THREAD_DUE_TO_THE_FOLLOWING_EXCEPTION, e);
}
} finally {
} finally
{
if (stage.getTerminationStrategy() != TerminationStrategy.BY_INTERRUPT) {
stage.getOwningContext().getThreadService().getRunnableCounter().dec();
}
}
logger.debug("Finished runnable stage. (" + stage.getId() + ")");
}
protected abstract void beforeStageExecution() throws InterruptedException;
......
......@@ -55,7 +55,11 @@ public abstract class AbstractStage extends Stage {
@Override
public void onSignal(final ISignal signal, final InputPort<?> inputPort) {
if (!this.signalAlreadyReceived(signal, inputPort)) {
signal.trigger(this);
try {
signal.trigger(this);
} catch (Exception e) {
this.getOwningContext().abortConfigurationRun();
}
for (OutputPort<?> outputPort : outputPorts.getOpenedPorts()) {
outputPort.sendSignal(signal);
}
......@@ -85,11 +89,6 @@ public abstract class AbstractStage extends Stage {
return signalAlreadyReceived;
}
@Override
public void onInitializing() throws Exception {
changeState(StageState.INITIALIZED);
}
private void changeState(final StageState newState) {
currentState = newState;
logger.trace(newState.toString());
......
......@@ -47,8 +47,4 @@ public class InputPort<T> extends AbstractPort<T> {
pipe.waitForStartSignal();
}
public void waitForInitializingSignal() throws InterruptedException {
pipe.waitForInitializingSignal();
};
}
......@@ -32,10 +32,6 @@ final class RunnableConsumerStage extends AbstractRunnableStage {
@Override
protected void beforeStageExecution() throws InterruptedException {
logger.trace("waitForInitializingSignal");
for (InputPort<?> inputPort : stage.getInputPorts()) {
inputPort.waitForInitializingSignal();
}
logger.trace("waitForStartingSignal");
for (InputPort<?> inputPort : stage.getInputPorts()) {
inputPort.waitForStartSignal();
......
......@@ -17,14 +17,12 @@ package teetime.framework;
import java.util.concurrent.Semaphore;
import teetime.framework.signal.InitializingSignal;
import teetime.framework.signal.StartingSignal;
import teetime.framework.signal.TerminatingSignal;
public final class RunnableProducerStage extends AbstractRunnableStage {
private final Semaphore startSemaphore = new Semaphore(0);
private final Semaphore initSemaphore = new Semaphore(0);
RunnableProducerStage(final Stage stage) {
super(stage);
......@@ -32,8 +30,6 @@ public final class RunnableProducerStage extends AbstractRunnableStage {
@Override
protected void beforeStageExecution() throws InterruptedException {
waitForInitializingSignal();
this.stage.onSignal(new InitializingSignal(), null);
waitForStartingSignal();
this.stage.onSignal(new StartingSignal(), null);
}
......@@ -49,19 +45,10 @@ public final class RunnableProducerStage extends AbstractRunnableStage {
this.stage.onSignal(terminatingSignal, null);
}
public void triggerInitializingSignal() {
initSemaphore.release();
}
public void triggerStartingSignal() {
startSemaphore.release();
}
private void waitForInitializingSignal() throws InterruptedException {
logger.trace("waitForInitializingSignal");
initSemaphore.acquire();
}
private void waitForStartingSignal() throws InterruptedException {
logger.trace("waitForStartingSignal");
startSemaphore.acquire();
......
......@@ -26,4 +26,5 @@ public final class RuntimeServiceFacade {
public void startWithinNewThread(final Stage previousStage, final Stage stage) {
previousStage.getOwningContext().getThreadService().startStageAtRuntime(stage);
}
}
......@@ -165,9 +165,6 @@ public abstract class Stage {
* @throws Exception
* an arbitrary exception if an error occurs during the initialization
*/
@SuppressWarnings("PMD.SignatureDeclareThrowsException")
public abstract void onInitializing() throws Exception;
@SuppressWarnings("PMD.SignatureDeclareThrowsException")
public abstract void onStarting() throws Exception;
......
......@@ -24,12 +24,6 @@ public class TeeTimeThread extends Thread {
this.runnable = runnable;
}
public void sendInitializingSignal() {
if (runnable instanceof RunnableProducerStage) {
((RunnableProducerStage) runnable).triggerInitializingSignal();
}
}
public void sendStartingSignal() {
if (runnable instanceof RunnableProducerStage) {
((RunnableProducerStage) runnable).triggerStartingSignal();
......
......@@ -58,7 +58,6 @@ class ThreadService extends AbstractService<ThreadService> {
Set<Stage> newThreadableStages = initialize(startStage);
startThreads(newThreadableStages);
sendInitializingSignal(newThreadableStages);
}
void startStageAtRuntime(final Stage newStage) {
......@@ -66,7 +65,6 @@ class ThreadService extends AbstractService<ThreadService> {
Set<Stage> newThreadableStages = initialize(newStage);
startThreads(newThreadableStages);
sendInitializingSignal(newThreadableStages);
sendStartingSignal(newThreadableStages);
}
......@@ -130,12 +128,6 @@ class ThreadService extends AbstractService<ThreadService> {
}
}
private void sendInitializingSignal(final Set<Stage> threadableStages) {
for (Stage stage : threadableStages) {
((TeeTimeThread) stage.getOwningThread()).sendInitializingSignal();
}
}
private void sendStartingSignal(final Set<Stage> newThreadableStages) {
synchronized (newThreadableStages) {
for (Stage stage : newThreadableStages) {
......
......@@ -91,11 +91,6 @@ public final class DummyPipe implements IPipe<Object> {
}
@Override
public void waitForInitializingSignal() throws InterruptedException {
}
@Override
public void close() {
......
......@@ -97,8 +97,6 @@ public interface IPipe<T> {
void waitForStartSignal() throws InterruptedException;
void waitForInitializingSignal() throws InterruptedException;
void close();
}
......@@ -101,11 +101,6 @@ public class InstantiationPipe<T> implements IPipe<T> {
throw new IllegalStateException(ERROR_MESSAGE);
}
@Override
public void waitForInitializingSignal() throws InterruptedException {
throw new IllegalStateException(ERROR_MESSAGE);
}
@Override
public void close() {
throw new IllegalStateException(ERROR_MESSAGE);
......
/**
* Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://christianwulf.github.io/teetime)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package teetime.framework.signal;
import java.util.LinkedList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
abstract class AbstractSignal implements ISignal {
protected static final Logger LOGGER = LoggerFactory.getLogger(AbstractSignal.class);
protected final List<Exception> catchedExceptions = new LinkedList<Exception>();
protected AbstractSignal() {
super();
}
public List<Exception> getCatchedExceptions() {
return this.catchedExceptions;
}
}
......@@ -23,7 +23,7 @@ import teetime.framework.Stage;
public interface ISignal {
void trigger(Stage stage);
void trigger(Stage stage) throws Exception;
// Only used by the merger so far
boolean mayBeTriggered(Set<InputPort<?>> receivedInputPorts, List<InputPort<?>> allInputPorts);
......
/**
* Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://christianwulf.github.io/teetime)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package teetime.framework.signal;
import java.util.List;
import java.util.Set;
import teetime.framework.InputPort;
import teetime.framework.Stage;
public final class InitializingSignal extends AbstractSignal {
@Override
public void trigger(final Stage stage) {
try {
stage.onInitializing();
} catch (final Exception e) { // NOCS NOPMD (Stages can throw any arbitrary Exception)
this.catchedExceptions.add(e);
LOGGER.error("Exception while sending the initializing signal", e);
}
}
@Override
public boolean mayBeTriggered(final Set<InputPort<?>> receivedInputPorts, final List<InputPort<?>> allInputPorts) {
return true;
}
}
......@@ -21,16 +21,11 @@ import java.util.Set;
import teetime.framework.InputPort;
import teetime.framework.Stage;
public final class StartingSignal extends AbstractSignal {
public final class StartingSignal implements ISignal {
@Override
public void trigger(final Stage stage) {
try {
stage.onStarting();
} catch (final Exception e) { // NOCS NOPMD (Stages can throw any arbitrary Exception)
this.catchedExceptions.add(e);
LOGGER.error("Exception while sending the start signal", e);
}
public void trigger(final Stage stage) throws Exception {
stage.onStarting();
}
@Override
......
......@@ -21,16 +21,11 @@ import java.util.Set;
import teetime.framework.InputPort;
import teetime.framework.Stage;
public final class TerminatingSignal extends AbstractSignal {
public final class TerminatingSignal implements ISignal {
@Override
public void trigger(final Stage stage) {
try {
stage.onTerminating();
} catch (final Exception e) { // NOCS NOPMD (Stages can throw any arbitrary Exception)
this.catchedExceptions.add(e);
LOGGER.error("Exception while sending the termination signal", e);
}
public void trigger(final Stage stage) throws Exception {
stage.onTerminating();
}
@Override
......
......@@ -18,11 +18,10 @@ package teetime.stage.basic.distributor.dynamic;
import java.util.ArrayList;
import java.util.List;
import teetime.framework.RuntimeServiceFacade;
import teetime.framework.InputPort;
import teetime.framework.OutputPort;
import teetime.framework.RuntimeServiceFacade;
import teetime.framework.pipe.SpScPipeFactory;
import teetime.framework.signal.InitializingSignal;
import teetime.framework.signal.StartingSignal;
import teetime.util.framework.port.PortAction;
......@@ -51,7 +50,6 @@ public class CreatePortAction<T> implements PortAction<DynamicDistributor<T>> {
RuntimeServiceFacade.INSTANCE.startWithinNewThread(dynamicDistributor, inputPort.getOwningStage());
newOutputPort.sendSignal(new InitializingSignal());
newOutputPort.sendSignal(new StartingSignal());
// FIXME pass the new thread to the analysis so that it can terminate the thread at the end
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment