Skip to content
Snippets Groups Projects
Commit 5536972e authored by Christian Wulf's avatar Christian Wulf
Browse files

added currentState to Stage;

fixed termination bug in RunnableConsumerStage;
added check in StageTester;
removed isStarted() in AbstractStage
parent 628a087b
No related branches found
No related tags found
No related merge requests found
......@@ -31,16 +31,14 @@ public abstract class AbstractStage extends Stage {
private final List<InputPort<?>> inputPortList = new ArrayList<InputPort<?>>();
private final List<OutputPort<?>> outputPortList = new ArrayList<OutputPort<?>>();
private final Set<ISignal> triggeredSignals = new HashSet<ISignal>();
/** A cached instance of <code>inputPortList</code> to avoid creating an iterator each time iterating it */
protected InputPort<?>[] cachedInputPorts = new InputPort[0];
/** A cached instance of <code>outputPortList</code> to avoid creating an iterator each time iterating it */
protected OutputPort<?>[] cachedOutputPorts;
private final Set<ISignal> triggeredSignals = new HashSet<ISignal>();
// BETTER aggregate both states in an enum
private boolean shouldTerminate;
private boolean started;
/** The current state of this stage */
private StageState currentState = StageState.CREATED;
/**
* @return the stage's input ports
......@@ -58,6 +56,11 @@ public abstract class AbstractStage extends Stage {
return this.cachedOutputPorts;
}
@Override
public StageState getCurrentState() {
return currentState;
}
/**
* May not be invoked outside of IPipe implementations
*/
......@@ -73,11 +76,6 @@ public abstract class AbstractStage extends Stage {
}
}
@Override
public boolean isStarted() {
return started;
}
/**
* @param signal
* arriving signal
......@@ -103,6 +101,7 @@ public abstract class AbstractStage extends Stage {
@Override
public void onValidating(final List<InvalidPortConnection> invalidPortConnections) {
this.validateOutputPorts(invalidPortConnections);
currentState = StageState.VALIDATED;
}
@Override
......@@ -112,7 +111,7 @@ public abstract class AbstractStage extends Stage {
this.cachedOutputPorts = this.outputPortList.toArray(new OutputPort<?>[0]);
this.connectUnconnectedOutputPorts();
started = true;
currentState = StageState.STARTED;
logger.debug("Started.");
}
......@@ -129,7 +128,7 @@ public abstract class AbstractStage extends Stage {
@Override
public void onTerminating() throws Exception {
logger.trace("onTerminating: " + this.getId());
this.terminate();
currentState = StageState.TERMINATED;
}
/**
......@@ -174,17 +173,17 @@ public abstract class AbstractStage extends Stage {
}
@Override
public void terminate() {
this.shouldTerminate = true;
protected void terminate() {
currentState = StageState.TERMINATING;
}
@Override
public boolean shouldBeTerminated() {
return this.shouldTerminate;
protected boolean shouldBeTerminated() {
return (currentState == StageState.TERMINATING);
}
@Override
public TerminationStrategy getTerminationStrategy() {
protected TerminationStrategy getTerminationStrategy() {
return TerminationStrategy.BY_SIGNAL;
}
......
......
......@@ -73,19 +73,15 @@ public abstract class CompositeStage extends Stage {
}
@Override
public final void validateOutputPorts(final List<InvalidPortConnection> invalidPortConnections) {
for (final Stage s : getLastStages()) {
s.validateOutputPorts(invalidPortConnections);
}
public final StageState getCurrentState() {
return getFirstStage().getCurrentState();
}
@Override
protected final boolean isStarted() {
boolean isStarted = true;
public final void validateOutputPorts(final List<InvalidPortConnection> invalidPortConnections) {
for (final Stage s : getLastStages()) {
isStarted = isStarted && s.isStarted();
s.validateOutputPorts(invalidPortConnections);
}
return isStarted;
}
@Override
......
......
......@@ -71,15 +71,15 @@ final class RunnableConsumerStage extends AbstractRunnableStage {
}
}
final ISignal signal = new TerminatingSignal();
for (InputPort<?> inputPort : inputPorts) {
stage.onSignal(signal, inputPort);
}
stage.terminate();
}
@Override
protected void afterStageExecution(final Stage stage) {
// do nothing
final ISignal signal = new TerminatingSignal();
for (InputPort<?> inputPort : inputPorts) {
stage.onSignal(signal, inputPort);
}
}
}
......@@ -106,6 +106,8 @@ public abstract class Stage {
protected abstract boolean shouldBeTerminated();
public abstract StageState getCurrentState();
public Thread getOwningThread() {
return owningThread;
}
......@@ -116,8 +118,6 @@ public abstract class Stage {
protected abstract InputPort<?>[] getInputPorts();
protected abstract boolean isStarted();
// events
public abstract void onValidating(List<InvalidPortConnection> invalidPortConnections);
......
......
package teetime.framework;
public enum StageState {
CREATED,
VALIDATING, VALIDATED,
STARTING, STARTED,
TERMINATING, TERMINATED
}
......@@ -3,7 +3,6 @@ package teetime.framework.test;
import java.util.List;
import teetime.framework.OutputPort;
import teetime.framework.Stage;
public final class OutputHolder<O> {
......@@ -13,7 +12,7 @@ public final class OutputHolder<O> {
private OutputPort<Object> port;
@SuppressWarnings("unchecked")
OutputHolder(final StageTester stageTester, final Stage stage, final List<O> outputList) {
OutputHolder(final StageTester stageTester, final List<O> outputList) {
this.stageTester = stageTester;
this.outputElements = (List<Object>) outputList;
}
......
......
......@@ -23,6 +23,7 @@ import java.util.List;
import teetime.framework.Analysis;
import teetime.framework.AnalysisConfiguration;
import teetime.framework.Stage;
import teetime.framework.StageState;
import teetime.framework.pipe.IPipeFactory;
import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
......@@ -46,6 +47,9 @@ public final class StageTester {
}
public static StageTester test(final Stage stage) {
if (stage.getCurrentState() != StageState.CREATED) {
throw new AssertionError("This stage has already been tested in this test method. Move this test into a new test method.");
}
return new StageTester(stage);
}
......@@ -60,7 +64,7 @@ public final class StageTester {
}
public <O> OutputHolder<O> receive(final List<O> outputList) {
final OutputHolder<O> outputHolder = new OutputHolder<O>(this, stage, outputList);
final OutputHolder<O> outputHolder = new OutputHolder<O>(this, outputList);
this.outputHolders.add(outputHolder);
return outputHolder;
}
......@@ -79,7 +83,6 @@ public final class StageTester {
public Configuration(final List<InputHolder<?>> inputHolders, final Stage stage, final List<OutputHolder<?>> outputHolders) {
final IPipeFactory interPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false);
for (InputHolder<?> inputHolder : inputHolders) {
final IterableProducer<Object> producer = new IterableProducer<Object>(inputHolder.getInput());
interPipeFactory.create(producer.getOutputPort(), inputHolder.getPort());
......
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment