diff --git a/src/test/java/teetime/framework/TerminationTest.java b/src/test/java/teetime/framework/TerminationTest.java index 81a3c5a15d8004dbf0d329445442258b9da8425c..367636ec51ce16e6002d80be7c02ad5a2a1b8531 100644 --- a/src/test/java/teetime/framework/TerminationTest.java +++ b/src/test/java/teetime/framework/TerminationTest.java @@ -1,17 +1,23 @@ package teetime.framework; -import org.junit.Test; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + +import java.util.Arrays; -import teetime.stage.InitialElementProducer; +import org.junit.Test; public class TerminationTest { - @Test(timeout = 2000) + @Test(timeout = 3000) public void doesNotGetStuckInAdd() throws InterruptedException { - Execution<TerminationConfig> execution = new Execution<TerminationConfig>(new TerminationConfig(1)); + TerminationConfig configuration = new TerminationConfig(1); + Execution<TerminationConfig> execution = new Execution<TerminationConfig>(configuration); execution.executeNonBlocking(); - Thread.sleep(500); + Thread.sleep(100); execution.abortEventually(); + assertThat(configuration.sinkStage.time - 450, is(greaterThan(configuration.init.time))); } private class TerminationConfig extends Configuration { @@ -25,21 +31,50 @@ public class TerminationTest { } + private final class InitialElementProducer<T> extends AbstractProducerStage<T> { + + private final Iterable<T> elements; + public long time; + + public InitialElementProducer(final T... elements) { + this.elements = Arrays.asList(elements); + } + + @Override + protected void execute() { + for (final T element : this.elements) { + this.outputPort.send(element); + } + this.terminate(); + } + + @Override + protected void terminate() { + time = System.currentTimeMillis(); + super.terminate(); + } + + } + private class DoesNotRetrieveElements extends AbstractConsumerStage<Integer> { + public long time; + @Override protected void execute(final Integer element) { int i = 0; while (true) { i++; + try { + Thread.sleep(500); + } catch (InterruptedException e) { + // First sleep will throw this + } if (i > 1) { Thread.currentThread().interrupt(); + time = System.currentTimeMillis(); break; } - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - } } }