diff --git a/src/main/java/teetime/framework/AbstractInterThreadPipe.java b/src/main/java/teetime/framework/AbstractInterThreadPipe.java index e2c7d3492b7908bce21debc8024a696a603cabc1..5471576ba9def3c1a59caa923df4237d2495559b 100644 --- a/src/main/java/teetime/framework/AbstractInterThreadPipe.java +++ b/src/main/java/teetime/framework/AbstractInterThreadPipe.java @@ -21,15 +21,17 @@ public abstract class AbstractInterThreadPipe extends AbstractPipe { @Override public void sendSignal(final ISignal signal) { this.signalQueue.offer(signal); - System.out.println("send signal: " + signal + " to " + cachedTargetStage); Thread owningThread = cachedTargetStage.getOwningThread(); - if (null != owningThread && (owningThread.getState() == State.WAITING || owningThread.getState() == State.TIMED_WAITING)) { + if (null != owningThread && isThreadWaiting(owningThread)) { // FIXME remove the null check for performance owningThread.interrupt(); - System.out.println("interrupted " + owningThread); } } + protected boolean isThreadWaiting(final Thread thread) { + return thread.getState() == State.WAITING || thread.getState() == State.TIMED_WAITING; + } + /** * Retrieves and removes the head of the signal queue * diff --git a/src/main/java/teetime/framework/pipe/SpScPipe.java b/src/main/java/teetime/framework/pipe/SpScPipe.java index 7564e870a452593ff7ebd53e3b8f107095a080c9..c4b474640eae022cbd2da54be1e4b628b848ae2b 100644 --- a/src/main/java/teetime/framework/pipe/SpScPipe.java +++ b/src/main/java/teetime/framework/pipe/SpScPipe.java @@ -1,6 +1,5 @@ package teetime.framework.pipe; -import java.lang.Thread.State; import java.util.Queue; import org.jctools.queues.QueueFactory; @@ -41,7 +40,7 @@ public final class SpScPipe extends AbstractInterThreadPipe { } Thread owningThread = cachedTargetStage.getOwningThread(); - if (null != owningThread && (owningThread.getState() == State.WAITING || owningThread.getState() == State.TIMED_WAITING)) { + if (null != owningThread && isThreadWaiting(owningThread)) { // FIXME remove the null check for performance synchronized (cachedTargetStage) { cachedTargetStage.notify(); // LOGGER.trace("Notified: " + cachedTargetStage); diff --git a/src/test/java/teetime/framework/RunnableConsumerStageTest.java b/src/test/java/teetime/framework/RunnableConsumerStageTest.java index 4399cd68026aec3cbc667937e5e208b255594805..ab2f75b04e4953571e0de17fffd9741254f45560 100644 --- a/src/test/java/teetime/framework/RunnableConsumerStageTest.java +++ b/src/test/java/teetime/framework/RunnableConsumerStageTest.java @@ -9,8 +9,6 @@ import org.junit.Test; import teetime.util.Pair; -import com.google.common.base.Joiner; - public class RunnableConsumerStageTest { @Test @@ -22,7 +20,7 @@ public class RunnableConsumerStageTest { Thread thread = new Thread(new Runnable() { @Override public void run() { - start(analysis); + start(analysis); // FIXME react on exceptions } }); thread.start(); @@ -42,7 +40,7 @@ public class RunnableConsumerStageTest { Thread thread = new Thread(new Runnable() { @Override public void run() { - start(analysis); + start(analysis); // FIXME react on exceptions } }); thread.start(); @@ -70,8 +68,9 @@ public class RunnableConsumerStageTest { private void start(final Analysis analysis) { Collection<Pair<Thread, Throwable>> exceptions = analysis.start(); for (Pair<Thread, Throwable> pair : exceptions) { - System.out.println(pair.getSecond()); - System.out.println(Joiner.on("\n").join(pair.getSecond().getStackTrace())); + // System.out.println(pair.getSecond()); + // System.out.println(Joiner.on("\n").join(pair.getSecond().getStackTrace())); + throw new RuntimeException(pair.getSecond()); } assertEquals(0, exceptions.size()); }