Skip to content
Snippets Groups Projects
Commit b1d8f7bf authored by Nelson Tavares de Sousa's avatar Nelson Tavares de Sousa
Browse files

moved executeStage to Stage

renamed exceptionHandler to exceptionListener
parent 8b598ca3
No related branches found
No related tags found
No related merge requests found
Showing
with 60 additions and 97 deletions
......@@ -15,9 +15,6 @@
*/
package teetime.framework;
import teetime.framework.exceptionHandling.AbstractExceptionListener.FurtherExecution;
import teetime.framework.exceptionHandling.StageException;
public abstract class AbstractConsumerStage<I> extends AbstractStage {
protected final InputPort<I> inputPort = this.createInputPort();
......@@ -27,20 +24,13 @@ public abstract class AbstractConsumerStage<I> extends AbstractStage {
}
@Override
protected final void executeStage() {
protected final void execute() {
final I element = this.getInputPort().receive();
if (null == element) {
returnNoElement();
}
try {
this.execute(element);
} catch (Exception e) {
final FurtherExecution furtherExecution = exceptionHandler.onStageException(e, this);
if (furtherExecution == FurtherExecution.TERMINATE) {
throw new StageException(e, this);
}
}
this.execute(element);
}
protected abstract void execute(I element);
......
......@@ -15,9 +15,6 @@
*/
package teetime.framework;
import teetime.framework.exceptionHandling.AbstractExceptionListener.FurtherExecution;
import teetime.framework.exceptionHandling.StageException;
/**
* The <code>ProducerStage</code> produces at least one element at each execution.<br>
*
......@@ -35,23 +32,9 @@ public abstract class AbstractProducerStage<O> extends AbstractStage {
return this.outputPort;
}
@Override
protected void executeStage() {
try {
this.execute();
} catch (Exception e) {
final FurtherExecution furtherExecution = this.exceptionHandler.onStageException(e, this);
if (furtherExecution == FurtherExecution.TERMINATE) {
throw new StageException(e, this);
}
}
}
@Override
public TerminationStrategy getTerminationStrategy() {
return TerminationStrategy.BY_SELF_DECISION;
}
protected abstract void execute();
}
......@@ -18,7 +18,7 @@ package teetime.framework;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import teetime.framework.exceptionHandling.StageException;
import teetime.framework.exceptionHandling.TerminateException;
abstract class AbstractRunnableStage implements Runnable {
......@@ -48,7 +48,7 @@ abstract class AbstractRunnableStage implements Runnable {
do {
executeStage();
} while (!stage.shouldBeTerminated());
} catch (StageException e) {
} catch (TerminateException e) {
this.stage.terminate();
throw e;
} finally {
......
......@@ -260,16 +260,8 @@ public final class Execution<T extends Configuration> implements UncaughtExcepti
// TODO: implement
private void abortEventually() {
for (Thread thread : this.finiteProducerThreads) {
thread.interrupt();
}
for (Thread thread : this.consumerThreads) {
thread.interrupt();
}
for (Thread thread : this.infiniteProducerThreads) {
thread.interrupt();
for (Stage stage : configuration.getContext().getThreadableStages().keySet()) {
stage.terminate();
}
}
......@@ -347,7 +339,7 @@ public final class Execution<T extends Configuration> implements UncaughtExcepti
/**
* @return
* the given ExceptionListenerFactory instance
* the given ExceptionListenerFactory instance
*
* @since 2.0
*/
......
......@@ -23,6 +23,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import teetime.framework.exceptionHandling.AbstractExceptionListener;
import teetime.framework.exceptionHandling.AbstractExceptionListener.FurtherExecution;
import teetime.framework.exceptionHandling.TerminateException;
import teetime.framework.signal.ISignal;
import teetime.framework.validation.InvalidPortConnection;
......@@ -43,7 +45,7 @@ public abstract class Stage {
@SuppressWarnings("PMD.LoggerIsNotStaticFinal")
protected final Logger logger;
protected AbstractExceptionListener exceptionHandler;
protected AbstractExceptionListener exceptionListener;
/** The owning thread of this stage if this stage is directly executed by a {@link AbstractRunnableStage}, <code>null</code> otherwise. */
protected Thread owningThread;
......@@ -101,7 +103,22 @@ public abstract class Stage {
*/
public abstract void validateOutputPorts(List<InvalidPortConnection> invalidPortConnections);
protected abstract void executeStage();
protected void executeStage() {
try {
this.execute();
} catch (NotEnoughInputException e) {
throw e;
} catch (TerminateException e) {
throw e;
} catch (Exception e) {
final FurtherExecution furtherExecution = this.exceptionListener.onStageException(e, this);
if (furtherExecution == FurtherExecution.TERMINATE) {
throw TerminateException.INSTANCE;
}
}
}
protected abstract void execute();
protected abstract void onSignal(ISignal signal, InputPort<?> inputPort);
......@@ -147,7 +164,7 @@ public abstract class Stage {
public abstract void onTerminating() throws Exception;
protected final void setExceptionHandler(final AbstractExceptionListener exceptionHandler) {
this.exceptionHandler = exceptionHandler;
this.exceptionListener = exceptionHandler;
}
protected abstract void removeDynamicPort(OutputPort<?> outputPort);
......
/**
* Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://christianwulf.github.io/teetime)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package teetime.framework;
public final class TerminateException extends RuntimeException {
private static final long serialVersionUID = 6841651916837487909L;
@SuppressWarnings("PMD.AvoidSynchronizedAtMethodLevel")
@Override
public synchronized Throwable fillInStackTrace() {
return this;
}
}
......@@ -15,7 +15,7 @@
*/
package teetime.framework.exceptionHandling;
import teetime.framework.Stage;
import teetime.util.StacklessException;
/**
* Represents an Exception, which is thrown by stages in case of they import teetime.framework.Stage;
......@@ -23,32 +23,17 @@ import teetime.framework.Stage;
*
* @since 1.1
*/
public class StageException extends RuntimeException {
public class TerminateException extends StacklessException {
public static final TerminateException INSTANCE = new TerminateException("Framework Exception");
/**
* Generated UID
*/
private static final long serialVersionUID = 6724637605943897808L;
private final Stage throwingStage;
public StageException(final Exception e, final Stage throwingStage) {
super(e);
this.throwingStage = throwingStage;
}
/**
* Returns the stage, which failed with an uncatched exception
*
* @return stage instance, which throws the exception
*/
public Stage getThrowingStage() {
return throwingStage;
}
@Override
public String toString() {
return getCause() + " in " + throwingStage.getId();
}
private TerminateException(final String string) {
super(string);
};
}
......@@ -82,4 +82,10 @@ public final class Delay<T> extends AbstractStage {
return this.outputPort;
}
@Override
protected void execute() {
// TODO Auto-generated method stub
}
}
......@@ -119,4 +119,10 @@ public class Merger<T> extends AbstractStage {
return this.outputPort;
}
@Override
protected void execute() {
// TODO Auto-generated method stub
}
}
......@@ -19,6 +19,10 @@ public class StacklessException extends RuntimeException {
private static final long serialVersionUID = -9040980547278981254L;
public StacklessException(final String string) {
super(string);
}
@Override
public synchronized Throwable fillInStackTrace() { // greatly improves performance when constructing
return this;
......
......@@ -49,8 +49,8 @@ public class StageTest {
TestConfig tc = new TestConfig();
new Execution<TestConfig>(tc);
assertEquals(tc.init.owningThread, tc.delay.owningThread);
assertThat(tc.delay.exceptionHandler, is(notNullValue()));
assertEquals(tc.init.exceptionHandler, tc.delay.exceptionHandler);
assertThat(tc.delay.exceptionListener, is(notNullValue()));
assertEquals(tc.init.exceptionListener, tc.delay.exceptionListener);
}
private static class TestConfig extends Configuration {
......
......@@ -55,4 +55,12 @@ public class TokenizerTest {
assertThat(results, contains("Hello", "World"));
}
public static void main(final String[] args) {
TokenizerTest toker = new TokenizerTest();
for (int i = 0; i < 1000; i++) {
toker.initializeTokenizer();
toker.tokenizerShouldSplitMultipleToken();
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment