From b273388e6decee18bc79e41673894cd0481e73a4 Mon Sep 17 00:00:00 2001 From: Christian Wulf <chw@informatik.uni-kiel.de> Date: Wed, 17 Dec 2014 14:55:53 +0100 Subject: [PATCH] added idle strategies --- .../framework/NotEnoughInputException.java | 2 +- .../framework/RunnableConsumerStage.java | 45 +++++++++++++++++++ .../framework/RunnableProducerStage.java | 5 +++ .../java/teetime/framework/RunnableStage.java | 14 ++---- .../teetime/framework/idle/IdleStrategy.java | 6 +++ .../teetime/framework/idle/SleepStrategy.java | 17 +++++++ .../teetime/framework/idle/WaitStrategy.java | 21 +++++++++ .../teetime/framework/idle/YieldStrategy.java | 10 +++++ src/main/java/teetime/stage/Relay.java | 20 ++++++--- 9 files changed, 121 insertions(+), 19 deletions(-) create mode 100644 src/main/java/teetime/framework/idle/IdleStrategy.java create mode 100644 src/main/java/teetime/framework/idle/SleepStrategy.java create mode 100644 src/main/java/teetime/framework/idle/WaitStrategy.java create mode 100644 src/main/java/teetime/framework/idle/YieldStrategy.java diff --git a/src/main/java/teetime/framework/NotEnoughInputException.java b/src/main/java/teetime/framework/NotEnoughInputException.java index a26f482b..dc241b82 100644 --- a/src/main/java/teetime/framework/NotEnoughInputException.java +++ b/src/main/java/teetime/framework/NotEnoughInputException.java @@ -1,6 +1,6 @@ package teetime.framework; -final class NotEnoughInputException extends RuntimeException { +public final class NotEnoughInputException extends RuntimeException { private static final long serialVersionUID = -2517233596919204396L; diff --git a/src/main/java/teetime/framework/RunnableConsumerStage.java b/src/main/java/teetime/framework/RunnableConsumerStage.java index 3b111a94..27e5e1d9 100644 --- a/src/main/java/teetime/framework/RunnableConsumerStage.java +++ b/src/main/java/teetime/framework/RunnableConsumerStage.java @@ -1,9 +1,21 @@ package teetime.framework; +import teetime.framework.idle.IdleStrategy; +import teetime.framework.idle.YieldStrategy; +import teetime.framework.pipe.IPipe; +import teetime.framework.signal.ISignal; + public final class RunnableConsumerStage extends RunnableStage { + private final IdleStrategy idleStrategy; + public RunnableConsumerStage(final Stage stage) { + this(stage, new YieldStrategy()); + } + + public RunnableConsumerStage(final Stage stage, final IdleStrategy idleStrategy) { super(stage); + this.idleStrategy = idleStrategy; } @Override @@ -11,6 +23,39 @@ public final class RunnableConsumerStage extends RunnableStage { // TODO wait for starting signal } + @Override + protected void executeStage() { + try { + this.stage.executeWithPorts(); + } catch (NotEnoughInputException e) { + checkforSignals(); // check for termination + executeIdleStrategy(); + } + } + + private void executeIdleStrategy() { + try { + idleStrategy.execute(); + } catch (InterruptedException e) { + checkforSignals(); // check for termination + } + } + + private void checkforSignals() { + // FIXME consider to use AbstractStage or to move getInputPorts() to Stage or... + InputPort<?>[] inputPorts = ((AbstractStage) stage).getInputPorts(); + for (InputPort<?> inputPort : inputPorts) { + IPipe pipe = inputPort.getPipe(); + if (pipe instanceof AbstractInterThreadPipe) { + AbstractInterThreadPipe intraThreadPipe = (AbstractInterThreadPipe) pipe; + ISignal signal = intraThreadPipe.getSignal(); + if (null != signal) { + stage.onSignal(signal, inputPort); + } + } + } + } + @Override protected void afterStageExecution() { // do nothing diff --git a/src/main/java/teetime/framework/RunnableProducerStage.java b/src/main/java/teetime/framework/RunnableProducerStage.java index 35aeb24e..a2941b06 100644 --- a/src/main/java/teetime/framework/RunnableProducerStage.java +++ b/src/main/java/teetime/framework/RunnableProducerStage.java @@ -15,6 +15,11 @@ public final class RunnableProducerStage extends RunnableStage { this.stage.onSignal(startingSignal, null); } + @Override + protected void executeStage() { + this.stage.executeWithPorts(); + } + @Override protected void afterStageExecution() { final TerminatingSignal terminatingSignal = new TerminatingSignal(); diff --git a/src/main/java/teetime/framework/RunnableStage.java b/src/main/java/teetime/framework/RunnableStage.java index 7f92379d..f23e7ed8 100644 --- a/src/main/java/teetime/framework/RunnableStage.java +++ b/src/main/java/teetime/framework/RunnableStage.java @@ -22,17 +22,7 @@ abstract class RunnableStage implements Runnable { beforeStageExecution(); do { - try { - this.stage.executeWithPorts(); - } catch (NotEnoughInputException e) { - // 1. check for terminating signal - // new Thread().getState() == State.WAITING - - // 2. check for no input reaction: this.getStrategy() - // 2.1 if BUSY_WAITING with timeout to then sleep(to) - // 2.2 if BLOCKING_WAIT then - - } + executeStage(); } while (!this.stage.shouldBeTerminated()); afterStageExecution(); @@ -47,5 +37,7 @@ abstract class RunnableStage implements Runnable { protected abstract void beforeStageExecution(); + protected abstract void executeStage(); + protected abstract void afterStageExecution(); } diff --git a/src/main/java/teetime/framework/idle/IdleStrategy.java b/src/main/java/teetime/framework/idle/IdleStrategy.java new file mode 100644 index 00000000..b9dc3524 --- /dev/null +++ b/src/main/java/teetime/framework/idle/IdleStrategy.java @@ -0,0 +1,6 @@ +package teetime.framework.idle; + +public interface IdleStrategy { + + void execute() throws InterruptedException; +} diff --git a/src/main/java/teetime/framework/idle/SleepStrategy.java b/src/main/java/teetime/framework/idle/SleepStrategy.java new file mode 100644 index 00000000..3fabdc2e --- /dev/null +++ b/src/main/java/teetime/framework/idle/SleepStrategy.java @@ -0,0 +1,17 @@ +package teetime.framework.idle; + +public class SleepStrategy implements IdleStrategy { + + private final long timeoutInMs; + + public SleepStrategy(final long timeoutInMs) { + super(); + this.timeoutInMs = timeoutInMs; + } + + @Override + public void execute() throws InterruptedException { + Thread.sleep(timeoutInMs); + } + +} diff --git a/src/main/java/teetime/framework/idle/WaitStrategy.java b/src/main/java/teetime/framework/idle/WaitStrategy.java new file mode 100644 index 00000000..6a4c5e7f --- /dev/null +++ b/src/main/java/teetime/framework/idle/WaitStrategy.java @@ -0,0 +1,21 @@ +package teetime.framework.idle; + +import teetime.framework.Stage; + +public final class WaitStrategy implements IdleStrategy { + + private final Stage stage; + + public WaitStrategy(final Stage stage) { + super(); + this.stage = stage; + } + + @Override + public void execute() throws InterruptedException { + synchronized (stage) { + stage.wait(); + } + } + +} diff --git a/src/main/java/teetime/framework/idle/YieldStrategy.java b/src/main/java/teetime/framework/idle/YieldStrategy.java new file mode 100644 index 00000000..80a508d9 --- /dev/null +++ b/src/main/java/teetime/framework/idle/YieldStrategy.java @@ -0,0 +1,10 @@ +package teetime.framework.idle; + +public class YieldStrategy implements IdleStrategy { + + @Override + public void execute() throws InterruptedException { + Thread.yield(); + } + +} diff --git a/src/main/java/teetime/stage/Relay.java b/src/main/java/teetime/stage/Relay.java index 4c1c19a2..1840dac6 100644 --- a/src/main/java/teetime/stage/Relay.java +++ b/src/main/java/teetime/stage/Relay.java @@ -3,7 +3,7 @@ package teetime.stage; import teetime.framework.AbstractInterThreadPipe; import teetime.framework.AbstractProducerStage; import teetime.framework.InputPort; -import teetime.framework.signal.TerminatingSignal; +import teetime.framework.NotEnoughInputException; public final class Relay<T> extends AbstractProducerStage<T> { @@ -11,20 +11,26 @@ public final class Relay<T> extends AbstractProducerStage<T> { private AbstractInterThreadPipe cachedCastedInputPipe; + private static final NotEnoughInputException NOT_ENOUGH_INPUT_EXCEPTION = new NotEnoughInputException(); + @Override public void execute() { T element = this.inputPort.receive(); if (null == element) { - if (this.cachedCastedInputPipe.getSignal() instanceof TerminatingSignal) { - this.terminate(); - } - Thread.yield(); - return; - // returnNoElement(); + // if (this.cachedCastedInputPipe.getSignal() instanceof TerminatingSignal) { + // this.terminate(); + // } + // Thread.yield(); + // return; + returnNoElement(); } outputPort.send(element); } + private void returnNoElement() { + throw NOT_ENOUGH_INPUT_EXCEPTION; + } + @Override public void onStarting() throws Exception { super.onStarting(); -- GitLab