diff --git a/src/main/java/teetime/framework/AbstractRunnableStage.java b/src/main/java/teetime/framework/AbstractRunnableStage.java index 14868162c07e8f70f34d0ce4b1b9024e04d7b336..3057c121cc04fc684385fe8bf085e345f9619a27 100644 --- a/src/main/java/teetime/framework/AbstractRunnableStage.java +++ b/src/main/java/teetime/framework/AbstractRunnableStage.java @@ -48,7 +48,7 @@ abstract class AbstractRunnableStage implements Runnable { try { do { executeStage(); - } while (!stage.shouldBeTerminated()); + } while (!Thread.currentThread().isInterrupted()); } catch (TerminateException e) { this.stage.terminate(); stage.owningContext.abortConfigurationRun(); diff --git a/src/test/java/teetime/framework/TerminationTest.java b/src/test/java/teetime/framework/TerminationTest.java index ce339f90870866cbda5c89ba4801d14f70b1c828..81a3c5a15d8004dbf0d329445442258b9da8425c 100644 --- a/src/test/java/teetime/framework/TerminationTest.java +++ b/src/test/java/teetime/framework/TerminationTest.java @@ -8,9 +8,9 @@ public class TerminationTest { @Test(timeout = 2000) public void doesNotGetStuckInAdd() throws InterruptedException { - Execution<TerminationConfig> execution = new Execution<TerminationConfig>(new TerminationConfig()); + Execution<TerminationConfig> execution = new Execution<TerminationConfig>(new TerminationConfig(1)); execution.executeNonBlocking(); - Thread.sleep(1000); + Thread.sleep(500); execution.abortEventually(); } @@ -18,8 +18,8 @@ public class TerminationTest { InitialElementProducer<Integer> init = new InitialElementProducer<Integer>(1, 2, 3, 4, 5, 6); DoesNotRetrieveElements sinkStage = new DoesNotRetrieveElements(); - public TerminationConfig() { - connectPorts(init.getOutputPort(), sinkStage.getInputPort(), 1); + public TerminationConfig(final int capacity) { + connectPorts(init.getOutputPort(), sinkStage.getInputPort(), capacity); addThreadableStage(sinkStage); } @@ -29,19 +29,23 @@ public class TerminationTest { @Override protected void execute(final Integer element) { - try { - Thread.sleep(10000); - } catch (InterruptedException e) { - // Will happen in this test + int i = 0; + while (true) { + i++; + if (i > 1) { + Thread.currentThread().interrupt(); + break; + } + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + } } } @Override - protected void terminate() { - Thread.currentThread().interrupt(); - System.out.println("TADA " + this.shouldBeTerminated()); - } + protected void terminate() {} }