diff --git a/src/main/java/teetime/framework/NotEnoughInputException.java b/src/main/java/teetime/framework/NotEnoughInputException.java index a26f482b90a082882aa6e62de0ddf828d16fbdc1..dc241b8212c8fb16ffc05532f3672cb843e482b0 100644 --- a/src/main/java/teetime/framework/NotEnoughInputException.java +++ b/src/main/java/teetime/framework/NotEnoughInputException.java @@ -1,6 +1,6 @@ package teetime.framework; -final class NotEnoughInputException extends RuntimeException { +public final class NotEnoughInputException extends RuntimeException { private static final long serialVersionUID = -2517233596919204396L; diff --git a/src/main/java/teetime/framework/RunnableConsumerStage.java b/src/main/java/teetime/framework/RunnableConsumerStage.java index 3b111a944f15fad76a73eb11a4682cb806e93a65..27e5e1d9d99a6efabc87546abad44b347c99596a 100644 --- a/src/main/java/teetime/framework/RunnableConsumerStage.java +++ b/src/main/java/teetime/framework/RunnableConsumerStage.java @@ -1,9 +1,21 @@ package teetime.framework; +import teetime.framework.idle.IdleStrategy; +import teetime.framework.idle.YieldStrategy; +import teetime.framework.pipe.IPipe; +import teetime.framework.signal.ISignal; + public final class RunnableConsumerStage extends RunnableStage { + private final IdleStrategy idleStrategy; + public RunnableConsumerStage(final Stage stage) { + this(stage, new YieldStrategy()); + } + + public RunnableConsumerStage(final Stage stage, final IdleStrategy idleStrategy) { super(stage); + this.idleStrategy = idleStrategy; } @Override @@ -11,6 +23,39 @@ public final class RunnableConsumerStage extends RunnableStage { // TODO wait for starting signal } + @Override + protected void executeStage() { + try { + this.stage.executeWithPorts(); + } catch (NotEnoughInputException e) { + checkforSignals(); // check for termination + executeIdleStrategy(); + } + } + + private void executeIdleStrategy() { + try { + idleStrategy.execute(); + } catch (InterruptedException e) { + checkforSignals(); // check for termination + } + } + + private void checkforSignals() { + // FIXME consider to use AbstractStage or to move getInputPorts() to Stage or... + InputPort<?>[] inputPorts = ((AbstractStage) stage).getInputPorts(); + for (InputPort<?> inputPort : inputPorts) { + IPipe pipe = inputPort.getPipe(); + if (pipe instanceof AbstractInterThreadPipe) { + AbstractInterThreadPipe intraThreadPipe = (AbstractInterThreadPipe) pipe; + ISignal signal = intraThreadPipe.getSignal(); + if (null != signal) { + stage.onSignal(signal, inputPort); + } + } + } + } + @Override protected void afterStageExecution() { // do nothing diff --git a/src/main/java/teetime/framework/RunnableProducerStage.java b/src/main/java/teetime/framework/RunnableProducerStage.java index 35aeb24efe3b90fedf1fb240da39adb82e9ee07c..a2941b066f0fd794781a07e99d5f6f6820366bc9 100644 --- a/src/main/java/teetime/framework/RunnableProducerStage.java +++ b/src/main/java/teetime/framework/RunnableProducerStage.java @@ -15,6 +15,11 @@ public final class RunnableProducerStage extends RunnableStage { this.stage.onSignal(startingSignal, null); } + @Override + protected void executeStage() { + this.stage.executeWithPorts(); + } + @Override protected void afterStageExecution() { final TerminatingSignal terminatingSignal = new TerminatingSignal(); diff --git a/src/main/java/teetime/framework/RunnableStage.java b/src/main/java/teetime/framework/RunnableStage.java index 7f92379d912567b6059603125ad8c18b6754d23d..f23e7ed8409390b2c62a25b731501e468628a7ca 100644 --- a/src/main/java/teetime/framework/RunnableStage.java +++ b/src/main/java/teetime/framework/RunnableStage.java @@ -22,17 +22,7 @@ abstract class RunnableStage implements Runnable { beforeStageExecution(); do { - try { - this.stage.executeWithPorts(); - } catch (NotEnoughInputException e) { - // 1. check for terminating signal - // new Thread().getState() == State.WAITING - - // 2. check for no input reaction: this.getStrategy() - // 2.1 if BUSY_WAITING with timeout to then sleep(to) - // 2.2 if BLOCKING_WAIT then - - } + executeStage(); } while (!this.stage.shouldBeTerminated()); afterStageExecution(); @@ -47,5 +37,7 @@ abstract class RunnableStage implements Runnable { protected abstract void beforeStageExecution(); + protected abstract void executeStage(); + protected abstract void afterStageExecution(); } diff --git a/src/main/java/teetime/framework/idle/IdleStrategy.java b/src/main/java/teetime/framework/idle/IdleStrategy.java new file mode 100644 index 0000000000000000000000000000000000000000..b9dc35247a1c7fe86c3fa1d693e6fbfec3491f20 --- /dev/null +++ b/src/main/java/teetime/framework/idle/IdleStrategy.java @@ -0,0 +1,6 @@ +package teetime.framework.idle; + +public interface IdleStrategy { + + void execute() throws InterruptedException; +} diff --git a/src/main/java/teetime/framework/idle/SleepStrategy.java b/src/main/java/teetime/framework/idle/SleepStrategy.java new file mode 100644 index 0000000000000000000000000000000000000000..3fabdc2e645d084f88708f5a0dfd124a16b5624c --- /dev/null +++ b/src/main/java/teetime/framework/idle/SleepStrategy.java @@ -0,0 +1,17 @@ +package teetime.framework.idle; + +public class SleepStrategy implements IdleStrategy { + + private final long timeoutInMs; + + public SleepStrategy(final long timeoutInMs) { + super(); + this.timeoutInMs = timeoutInMs; + } + + @Override + public void execute() throws InterruptedException { + Thread.sleep(timeoutInMs); + } + +} diff --git a/src/main/java/teetime/framework/idle/WaitStrategy.java b/src/main/java/teetime/framework/idle/WaitStrategy.java new file mode 100644 index 0000000000000000000000000000000000000000..6a4c5e7f115d98ae212b87326b2e160fcc45937c --- /dev/null +++ b/src/main/java/teetime/framework/idle/WaitStrategy.java @@ -0,0 +1,21 @@ +package teetime.framework.idle; + +import teetime.framework.Stage; + +public final class WaitStrategy implements IdleStrategy { + + private final Stage stage; + + public WaitStrategy(final Stage stage) { + super(); + this.stage = stage; + } + + @Override + public void execute() throws InterruptedException { + synchronized (stage) { + stage.wait(); + } + } + +} diff --git a/src/main/java/teetime/framework/idle/YieldStrategy.java b/src/main/java/teetime/framework/idle/YieldStrategy.java new file mode 100644 index 0000000000000000000000000000000000000000..80a508d98936dbaa4cdd4fb6bb2dc4297d70e531 --- /dev/null +++ b/src/main/java/teetime/framework/idle/YieldStrategy.java @@ -0,0 +1,10 @@ +package teetime.framework.idle; + +public class YieldStrategy implements IdleStrategy { + + @Override + public void execute() throws InterruptedException { + Thread.yield(); + } + +} diff --git a/src/main/java/teetime/stage/Relay.java b/src/main/java/teetime/stage/Relay.java index 4c1c19a2a306d07968ce3f12c0c1587e1b497fc6..1840dac654789144f2afc44848ce940f6d2a7487 100644 --- a/src/main/java/teetime/stage/Relay.java +++ b/src/main/java/teetime/stage/Relay.java @@ -3,7 +3,7 @@ package teetime.stage; import teetime.framework.AbstractInterThreadPipe; import teetime.framework.AbstractProducerStage; import teetime.framework.InputPort; -import teetime.framework.signal.TerminatingSignal; +import teetime.framework.NotEnoughInputException; public final class Relay<T> extends AbstractProducerStage<T> { @@ -11,20 +11,26 @@ public final class Relay<T> extends AbstractProducerStage<T> { private AbstractInterThreadPipe cachedCastedInputPipe; + private static final NotEnoughInputException NOT_ENOUGH_INPUT_EXCEPTION = new NotEnoughInputException(); + @Override public void execute() { T element = this.inputPort.receive(); if (null == element) { - if (this.cachedCastedInputPipe.getSignal() instanceof TerminatingSignal) { - this.terminate(); - } - Thread.yield(); - return; - // returnNoElement(); + // if (this.cachedCastedInputPipe.getSignal() instanceof TerminatingSignal) { + // this.terminate(); + // } + // Thread.yield(); + // return; + returnNoElement(); } outputPort.send(element); } + private void returnNoElement() { + throw NOT_ENOUGH_INPUT_EXCEPTION; + } + @Override public void onStarting() throws Exception { super.onStarting();