diff --git a/src/main/java/teetime/framework/AbstractRunnableStage.java b/src/main/java/teetime/framework/AbstractRunnableStage.java index a76c11505d77291a73cc5ed02d8aefd041bcfa3a..e2f87810a016fdadbf0e6221666c7d2937afa194 100644 --- a/src/main/java/teetime/framework/AbstractRunnableStage.java +++ b/src/main/java/teetime/framework/AbstractRunnableStage.java @@ -51,7 +51,7 @@ abstract class AbstractRunnableStage implements Runnable { try { do { executeStage(); - } while (!Thread.currentThread().isInterrupted()); + } while (!stage.shouldBeTerminated()); } catch (TerminateException e) { this.stage.terminate(); stage.getOwningContext().abortConfigurationRun(); diff --git a/src/test/java/teetime/framework/TerminationTest.java b/src/test/java/teetime/framework/TerminationTest.java index 84bc5425024cf422150d47bafa8383330b05f8c9..10d00f1c2e27bc2c2414601a17104e395077b37d 100644 --- a/src/test/java/teetime/framework/TerminationTest.java +++ b/src/test/java/teetime/framework/TerminationTest.java @@ -19,10 +19,9 @@ import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThat; -import java.util.Arrays; - import org.junit.Test; +import teetime.stage.InitialElementProducer; import teetime.stage.basic.Sink; public class TerminationTest { @@ -42,16 +41,20 @@ public class TerminationTest { execution.executeNonBlocking(); Thread.sleep(100); execution.abortEventually(); - assertThat(configuration.sinkStage.time - 450, is(greaterThan(configuration.init.time))); + assertThat(configuration.finalProp.time - 450, is(greaterThan(configuration.firstProp.time))); } private class TerminationConfig extends Configuration { InitialElementProducer<Integer> init = new InitialElementProducer<Integer>(1, 2, 3, 4, 5, 6); + Propagator firstProp = new Propagator(); DoesNotRetrieveElements sinkStage = new DoesNotRetrieveElements(); + Propagator finalProp = new Propagator(); public TerminationConfig(final int capacity) { if (capacity == 1) { - connectPorts(init.getOutputPort(), sinkStage.getInputPort(), capacity); + connectPorts(init.getOutputPort(), firstProp.getInputPort()); + connectPorts(firstProp.getOutputPort(), sinkStage.getInputPort(), capacity); + connectPorts(sinkStage.getOutputPort(), finalProp.getInputPort()); addThreadableStage(sinkStage); } else { Sink<Integer> sink = new Sink<Integer>(); @@ -62,34 +65,9 @@ public class TerminationTest { } - private final class InitialElementProducer<T> extends AbstractProducerStage<T> { - - private final Iterable<T> elements; - public long time; - - public InitialElementProducer(final T... elements) { - this.elements = Arrays.asList(elements); - } - - @Override - protected void execute() { - for (final T element : this.elements) { - this.outputPort.send(element); - } - this.terminate(); - } - - @Override - protected void terminate() { - time = System.currentTimeMillis(); - super.terminate(); - } - - } - private class DoesNotRetrieveElements extends AbstractConsumerStage<Integer> { - public long time; + private final OutputPort<Integer> output = createOutputPort(); @Override protected void execute(final Integer element) { @@ -102,17 +80,41 @@ public class TerminationTest { // First sleep will throw this } if (i > 1) { - Thread.currentThread().interrupt(); - time = System.currentTimeMillis(); + super.terminate(); break; } } } + public OutputPort<? extends Integer> getOutputPort() { + return output; + } + @Override protected void terminate() {} } + private class Propagator extends AbstractConsumerStage<Integer> { + + public long time; + private final OutputPort<Integer> output = createOutputPort(); + + @Override + protected void execute(final Integer element) { + output.send(element); + } + + public OutputPort<? extends Integer> getOutputPort() { + return output; + } + + @Override + public void onTerminating() throws Exception { + time = System.currentTimeMillis(); + super.onTerminating(); + } + } + }