diff --git a/src/main/java/teetime/util/concurrent/queue/takestrategy/SCParkTakeStrategy.java b/src/main/java/teetime/util/concurrent/queue/takestrategy/SCParkTakeStrategy.java index b90843ce5a2e0e777d6a4358626895de4ca3d559..0b010ba95aa480874cb5d17aedc0a6c58963cb9e 100644 --- a/src/main/java/teetime/util/concurrent/queue/takestrategy/SCParkTakeStrategy.java +++ b/src/main/java/teetime/util/concurrent/queue/takestrategy/SCParkTakeStrategy.java @@ -32,6 +32,9 @@ public final class SCParkTakeStrategy<E> implements TakeStrategy<E> { while ((e = q.poll()) == null) { LockSupport.park(); + if (Thread.currentThread().isInterrupted()) { + throw new InterruptedException("Interrupted while waiting for the queue to become non-empty."); + } } t.lazySet(null); diff --git a/src/test/java/teetime/framework/RunnableConsumerStageTest.java b/src/test/java/teetime/framework/RunnableConsumerStageTest.java index cb9d2e28a3726232e43ef086b5abdeea1d8468e9..225eac81a88aa35a13c2b1638a6dec182c118644 100644 --- a/src/test/java/teetime/framework/RunnableConsumerStageTest.java +++ b/src/test/java/teetime/framework/RunnableConsumerStageTest.java @@ -17,8 +17,10 @@ package teetime.framework; import static org.junit.Assert.assertEquals; +import java.lang.Thread.State; import java.util.Collection; +import org.junit.Ignore; import org.junit.Test; import teetime.util.Pair; @@ -27,6 +29,31 @@ import com.google.common.base.Joiner; public class RunnableConsumerStageTest { + @Test + public void testWaitingInfinitely() throws Exception { + RunnableConsumerStageTestConfiguration configuration = new RunnableConsumerStageTestConfiguration(); + + final Analysis analysis = new Analysis(configuration); + final Thread thread = new Thread(new Runnable() { + @Override + public void run() { + start(analysis); + } + }); + thread.start(); + + Thread.sleep(200); + + // assertEquals(State.WAITING, thread.getState()); + assertEquals(State.WAITING, configuration.getConsumerThread().getState()); + assertEquals(0, configuration.getCollectedElements().size()); + + // clean up + configuration.getConsumerThread().interrupt(); + configuration.getConsumerThread().join(); + thread.join(); + } + // @Test // public void testWaitingInfinitely() throws Exception { // WaitStrategyConfiguration waitStrategyConfiguration = new WaitStrategyConfiguration(300, 42); @@ -45,7 +72,7 @@ public class RunnableConsumerStageTest { // assertEquals(State.WAITING, thread.getState()); // assertEquals(0, waitStrategyConfiguration.getCollectorSink().getElements().size()); // } - // + // @Test // public void testWaitingFinitely() throws Exception { // WaitStrategyConfiguration waitStrategyConfiguration = new WaitStrategyConfiguration(300, 42); @@ -66,6 +93,7 @@ public class RunnableConsumerStageTest { // assertEquals(1, waitStrategyConfiguration.getCollectorSink().getElements().size()); // } + @Ignore @Test public void testYieldRun() throws Exception { YieldStrategyConfiguration waitStrategyConfiguration = new YieldStrategyConfiguration(42); diff --git a/src/test/java/teetime/framework/RunnableConsumerStageTestConfiguration.java b/src/test/java/teetime/framework/RunnableConsumerStageTestConfiguration.java new file mode 100644 index 0000000000000000000000000000000000000000..1eb57d6b74b1109bf1a1b958bdf036dc37b1bead --- /dev/null +++ b/src/test/java/teetime/framework/RunnableConsumerStageTestConfiguration.java @@ -0,0 +1,37 @@ +package teetime.framework; + +import java.util.ArrayList; +import java.util.List; + +import teetime.framework.pipe.IPipeFactory; +import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; +import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; +import teetime.stage.CollectorSink; +import teetime.stage.InitialElementProducer; + +public class RunnableConsumerStageTestConfiguration extends AnalysisConfiguration { + + private final List<Integer> collectedElements = new ArrayList<Integer>(); + private final CollectorSink<Integer> collectorSink; + + public RunnableConsumerStageTestConfiguration(final Integer... inputElements) { + InitialElementProducer<Integer> producer = new InitialElementProducer<Integer>(inputElements); + // addThreadableStage(producer); + + CollectorSink<Integer> collectorSink = new CollectorSink<Integer>(collectedElements); + addThreadableStage(collectorSink); + + IPipeFactory pipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false); + pipeFactory.create(producer.getOutputPort(), collectorSink.getInputPort()); + + this.collectorSink = collectorSink; + } + + public List<Integer> getCollectedElements() { + return collectedElements; + } + + public Thread getConsumerThread() { + return collectorSink.getOwningThread(); + } +} diff --git a/src/test/resources/logback-test.xml b/src/test/resources/logback-test.xml index 8a46d9dceb368d68288f19ace8edb4b8a6c2631b..a8db8b1d8aab88caf50bc8d38b250f8df5642e8e 100644 --- a/src/test/resources/logback-test.xml +++ b/src/test/resources/logback-test.xml @@ -20,9 +20,9 @@ </encoder> </appender> + <logger name="teetime" level="INFO" /> <!-- <logger name="teetime.framework" level="TRACE" /> --> <!-- <logger name="teetime.stage" level="TRACE" /> --> - <logger name="teetime" level="INFO" /> <logger name="util" level="INFO" /> <root level="WARN">