From 13e4fa2fcfb39245711498531e0509486f70639e Mon Sep 17 00:00:00 2001 From: Christian Wulf <chw@informatik.uni-kiel.de> Date: Thu, 18 Dec 2014 20:05:09 +0100 Subject: [PATCH] added tests for idle strategies --- .../framework/AbstractConsumerStage.java | 11 +++ .../framework/AbstractInterThreadPipe.java | 4 +- .../java/teetime/framework/AbstractStage.java | 12 ++- src/main/java/teetime/framework/Analysis.java | 8 +- .../framework/RunnableConsumerStage.java | 12 ++- src/main/java/teetime/framework/Stage.java | 2 + .../java/teetime/framework/pipe/SpScPipe.java | 5 +- .../teetime/stage/io/EveryXthPrinter.java | 5 ++ .../java/teetime/framework/OldPipeline.java | 5 ++ .../framework/RunnableConsumerStageTest.java | 80 +++++++++++++++++++ .../framework/WaitStrategyConfiguration.java | 70 ++++++++++++++++ .../framework/YieldStrategyConfiguration.java | 51 ++++++++++++ 12 files changed, 255 insertions(+), 10 deletions(-) create mode 100644 src/test/java/teetime/framework/RunnableConsumerStageTest.java create mode 100644 src/test/java/teetime/framework/WaitStrategyConfiguration.java create mode 100644 src/test/java/teetime/framework/YieldStrategyConfiguration.java diff --git a/src/main/java/teetime/framework/AbstractConsumerStage.java b/src/main/java/teetime/framework/AbstractConsumerStage.java index 66333405..7346e281 100644 --- a/src/main/java/teetime/framework/AbstractConsumerStage.java +++ b/src/main/java/teetime/framework/AbstractConsumerStage.java @@ -1,9 +1,13 @@ package teetime.framework; +import teetime.framework.idle.IdleStrategy; + public abstract class AbstractConsumerStage<I> extends AbstractStage { protected final InputPort<I> inputPort = this.createInputPort(); + private IdleStrategy idleStrategy; // FIXME remove this word-around + public final InputPort<I> getInputPort() { return this.inputPort; } @@ -17,4 +21,11 @@ public abstract class AbstractConsumerStage<I> extends AbstractStage { protected abstract void execute(I element); + public IdleStrategy getIdleStrategy() { + return idleStrategy; + } + + public void setIdleStrategy(final IdleStrategy idleStrategy) { + this.idleStrategy = idleStrategy; + } } diff --git a/src/main/java/teetime/framework/AbstractInterThreadPipe.java b/src/main/java/teetime/framework/AbstractInterThreadPipe.java index 10646a4a..e2c7d349 100644 --- a/src/main/java/teetime/framework/AbstractInterThreadPipe.java +++ b/src/main/java/teetime/framework/AbstractInterThreadPipe.java @@ -24,12 +24,10 @@ public abstract class AbstractInterThreadPipe extends AbstractPipe { System.out.println("send signal: " + signal + " to " + cachedTargetStage); Thread owningThread = cachedTargetStage.getOwningThread(); - if (owningThread.getState() == State.WAITING || owningThread.getState() == State.TIMED_WAITING) { + if (null != owningThread && (owningThread.getState() == State.WAITING || owningThread.getState() == State.TIMED_WAITING)) { owningThread.interrupt(); System.out.println("interrupted " + owningThread); } - - System.out.println("Signal sent."); } /** diff --git a/src/main/java/teetime/framework/AbstractStage.java b/src/main/java/teetime/framework/AbstractStage.java index bfcfe993..174f5bd7 100644 --- a/src/main/java/teetime/framework/AbstractStage.java +++ b/src/main/java/teetime/framework/AbstractStage.java @@ -23,6 +23,8 @@ public abstract class AbstractStage extends Stage { private final Set<ISignal> triggeredSignals = new HashSet<ISignal>(); private boolean shouldTerminate; + private boolean started; + private void connectUnconnectedOutputPorts() { for (OutputPort<?> outputPort : this.cachedOutputPorts) { if (null == outputPort.getPipe()) { // if port is unconnected @@ -37,7 +39,9 @@ public abstract class AbstractStage extends Stage { */ @Override public InputPort<?>[] getInputPorts() { - return this.cachedInputPorts; + // return this.cachedInputPorts; + System.out.println("inputPortList: " + inputPortList); + return inputPortList.toArray(new InputPort<?>[0]); } /** @@ -54,6 +58,7 @@ public abstract class AbstractStage extends Stage { public void onSignal(final ISignal signal, final InputPort<?> inputPort) { if (!this.signalAlreadyReceived(signal, inputPort)) { signal.trigger(this); + started = true; for (OutputPort<?> outputPort : this.outputPortList) { outputPort.sendSignal(signal); @@ -61,6 +66,11 @@ public abstract class AbstractStage extends Stage { } } + @Override + public boolean isStarted() { + return started; + } + /** * @param signal * arriving signal diff --git a/src/main/java/teetime/framework/Analysis.java b/src/main/java/teetime/framework/Analysis.java index 3f14c1aa..75485f6a 100644 --- a/src/main/java/teetime/framework/Analysis.java +++ b/src/main/java/teetime/framework/Analysis.java @@ -72,7 +72,13 @@ public class Analysis implements UncaughtExceptionHandler { for (Stage stage : threadableStageJobs) { switch (stage.getTerminationStrategy()) { case BY_SIGNAL: { - final Thread thread = new Thread(new RunnableConsumerStage(stage)); + RunnableConsumerStage runnable; + if (stage instanceof AbstractConsumerStage<?>) { + runnable = new RunnableConsumerStage(stage, ((AbstractConsumerStage<?>) stage).getIdleStrategy()); // FIXME remove this word-around + } else { + runnable = new RunnableConsumerStage(stage); + } + final Thread thread = new Thread(runnable); stage.setOwningThread(thread); this.consumerThreads.add(thread); break; diff --git a/src/main/java/teetime/framework/RunnableConsumerStage.java b/src/main/java/teetime/framework/RunnableConsumerStage.java index 8a74b08f..d2a6ebf2 100644 --- a/src/main/java/teetime/framework/RunnableConsumerStage.java +++ b/src/main/java/teetime/framework/RunnableConsumerStage.java @@ -1,5 +1,7 @@ package teetime.framework; +import java.util.Arrays; + import teetime.framework.idle.IdleStrategy; import teetime.framework.idle.YieldStrategy; import teetime.framework.pipe.IPipe; @@ -20,13 +22,14 @@ public final class RunnableConsumerStage extends RunnableStage { @Override protected void beforeStageExecution() { - // TODO wait for starting signal + logger.trace("ENTRY beforeStageExecution"); + do { checkforSignals(); - // logger.trace("Signals checked."); Thread.yield(); - } while (stage.getInputPorts().length == 0); - logger.debug("Stage initialized"); + } while (!stage.isStarted()); + + logger.trace("EXIT beforeStageExecution"); } @Override @@ -50,6 +53,7 @@ public final class RunnableConsumerStage extends RunnableStage { private void checkforSignals() { // FIXME should getInputPorts() really be defined in Stage? InputPort<?>[] inputPorts = stage.getInputPorts(); + logger.debug("inputPorts: " + Arrays.toString(inputPorts)); for (InputPort<?> inputPort : inputPorts) { IPipe pipe = inputPort.getPipe(); if (pipe instanceof AbstractInterThreadPipe) { diff --git a/src/main/java/teetime/framework/Stage.java b/src/main/java/teetime/framework/Stage.java index f709fc50..d7f0b043 100644 --- a/src/main/java/teetime/framework/Stage.java +++ b/src/main/java/teetime/framework/Stage.java @@ -94,4 +94,6 @@ public abstract class Stage { } protected abstract InputPort<?>[] getInputPorts(); + + protected abstract boolean isStarted(); } diff --git a/src/main/java/teetime/framework/pipe/SpScPipe.java b/src/main/java/teetime/framework/pipe/SpScPipe.java index f32e2fbe..58a3789d 100644 --- a/src/main/java/teetime/framework/pipe/SpScPipe.java +++ b/src/main/java/teetime/framework/pipe/SpScPipe.java @@ -38,10 +38,13 @@ public final class SpScPipe extends AbstractInterThreadPipe { Thread.yield(); } + System.out.println("Added: " + element); + Thread owningThread = cachedTargetStage.getOwningThread(); - if (owningThread.getState() == State.WAITING || owningThread.getState() == State.TIMED_WAITING) { + if (null != owningThread && (owningThread.getState() == State.WAITING || owningThread.getState() == State.TIMED_WAITING)) { synchronized (cachedTargetStage) { cachedTargetStage.notify(); + System.out.println("Notified: " + cachedTargetStage); } } diff --git a/src/main/java/teetime/stage/io/EveryXthPrinter.java b/src/main/java/teetime/stage/io/EveryXthPrinter.java index e76e71d2..0d686f10 100644 --- a/src/main/java/teetime/stage/io/EveryXthPrinter.java +++ b/src/main/java/teetime/stage/io/EveryXthPrinter.java @@ -75,4 +75,9 @@ public final class EveryXthPrinter<T> extends Stage { return distributor.getInputPorts(); } + @Override + protected boolean isStarted() { + return distributor.isStarted(); + } + } diff --git a/src/performancetest/java/teetime/framework/OldPipeline.java b/src/performancetest/java/teetime/framework/OldPipeline.java index 7109b83d..3ee0a72a 100644 --- a/src/performancetest/java/teetime/framework/OldPipeline.java +++ b/src/performancetest/java/teetime/framework/OldPipeline.java @@ -72,4 +72,9 @@ public class OldPipeline<FirstStage extends Stage, LastStage extends Stage> exte return firstStage.getTerminationStrategy(); } + @Override + protected boolean isStarted() { + return firstStage.isStarted(); + } + } diff --git a/src/test/java/teetime/framework/RunnableConsumerStageTest.java b/src/test/java/teetime/framework/RunnableConsumerStageTest.java new file mode 100644 index 00000000..c892f849 --- /dev/null +++ b/src/test/java/teetime/framework/RunnableConsumerStageTest.java @@ -0,0 +1,80 @@ +package teetime.framework; + +import static org.junit.Assert.assertEquals; + +import java.lang.Thread.State; +import java.util.Collection; + +import org.junit.Test; + +import teetime.util.Pair; + +import com.google.common.base.Joiner; + +public class RunnableConsumerStageTest { + + @Test + public void testWaitingInfinitely() throws Exception { + WaitStrategyConfiguration waitStrategyConfiguration = new WaitStrategyConfiguration(5000, 1); + + final Analysis analysis = new Analysis(waitStrategyConfiguration); + analysis.init(); + Thread thread = new Thread(new Runnable() { + @Override + public void run() { + start(analysis); + } + }); + thread.start(); + + Thread.sleep(200); + + assertEquals(State.WAITING, thread.getState()); + assertEquals(0, waitStrategyConfiguration.getCollectorSink().getElements().size()); + } + + @Test + public void testWaitingFinitely() throws Exception { + WaitStrategyConfiguration waitStrategyConfiguration = new WaitStrategyConfiguration(300, 1); + + final Analysis analysis = new Analysis(waitStrategyConfiguration); + analysis.init(); + Thread thread = new Thread(new Runnable() { + @Override + public void run() { + start(analysis); + } + }); + thread.start(); + + Thread.sleep(200); + assertEquals(State.WAITING, thread.getState()); + + Thread.sleep(500); + assertEquals(State.TERMINATED, thread.getState()); + assertEquals(1, waitStrategyConfiguration.getCollectorSink().getElements().get(0)); + assertEquals(0, waitStrategyConfiguration.getCollectorSink().getElements().size()); + } + + @Test + public void testSimpleRun() throws Exception { + YieldStrategyConfiguration waitStrategyConfiguration = new YieldStrategyConfiguration(42); + + final Analysis analysis = new Analysis(waitStrategyConfiguration); + analysis.init(); + + start(analysis); + + assertEquals(42, waitStrategyConfiguration.getCollectorSink().getElements().get(0)); + assertEquals(1, waitStrategyConfiguration.getCollectorSink().getElements().size()); + } + + private void start(final Analysis analysis) { + Collection<Pair<Thread, Throwable>> exceptions = analysis.start(); + for (Pair<Thread, Throwable> pair : exceptions) { + System.out.println(pair.getSecond()); + System.out.println(Joiner.on("\n").join(pair.getSecond().getStackTrace())); + } + assertEquals(0, exceptions.size()); + } +} diff --git a/src/test/java/teetime/framework/WaitStrategyConfiguration.java b/src/test/java/teetime/framework/WaitStrategyConfiguration.java new file mode 100644 index 00000000..62f9ebff --- /dev/null +++ b/src/test/java/teetime/framework/WaitStrategyConfiguration.java @@ -0,0 +1,70 @@ +package teetime.framework; + +import teetime.framework.idle.WaitStrategy; +import teetime.framework.pipe.IPipeFactory; +import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; +import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; +import teetime.stage.Clock; +import teetime.stage.CollectorSink; +import teetime.stage.InitialElementProducer; +import teetime.stage.Relay; +import teetime.stage.basic.Delay; + +class WaitStrategyConfiguration extends AnalysisConfiguration { + + private final IPipeFactory intraThreadPipeFactory; + private final IPipeFactory interThreadPipeFactory; + + private Delay<Object> delay; + private CollectorSink<Object> collectorSink; + + public WaitStrategyConfiguration(final long initialDelayInMs, final Object... elements) { + intraThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false); + interThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false); + + Stage producer = buildProducer(elements); + addThreadableStage(producer); + + Stage consumer = buildConsumer(delay); + addThreadableStage(consumer); + + Clock clock = buildClock(initialDelayInMs, delay); + addThreadableStage(clock); + } + + private Clock buildClock(final long initialDelayInMs, final Delay<Object> delay) { + Clock clock = new Clock(); + clock.setInitialDelayInMs(initialDelayInMs); + + interThreadPipeFactory.create(clock.getOutputPort(), delay.getTimestampTriggerInputPort()); + + return clock; + } + + private Stage buildProducer(final Object... elements) { + InitialElementProducer<Object> initialElementProducer = new InitialElementProducer<Object>(elements); + delay = new Delay<Object>(); + + intraThreadPipeFactory.create(initialElementProducer.getOutputPort(), delay.getInputPort()); + + return initialElementProducer; + } + + private Relay<Object> buildConsumer(final Delay<Object> delay) { + Relay<Object> relay = new Relay<Object>(); + CollectorSink<Object> collectorSink = new CollectorSink<Object>(); + + relay.setIdleStrategy(new WaitStrategy(relay)); + + interThreadPipeFactory.create(delay.getOutputPort(), relay.getInputPort()); + intraThreadPipeFactory.create(relay.getOutputPort(), collectorSink.getInputPort()); + + this.collectorSink = collectorSink; + + return relay; + } + + public CollectorSink<Object> getCollectorSink() { + return collectorSink; + } +} diff --git a/src/test/java/teetime/framework/YieldStrategyConfiguration.java b/src/test/java/teetime/framework/YieldStrategyConfiguration.java new file mode 100644 index 00000000..57212fd9 --- /dev/null +++ b/src/test/java/teetime/framework/YieldStrategyConfiguration.java @@ -0,0 +1,51 @@ +package teetime.framework; + +import teetime.framework.idle.YieldStrategy; +import teetime.framework.pipe.IPipeFactory; +import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; +import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; +import teetime.stage.CollectorSink; +import teetime.stage.InitialElementProducer; +import teetime.stage.Relay; + +class YieldStrategyConfiguration extends AnalysisConfiguration { + private final IPipeFactory intraThreadPipeFactory; + private final IPipeFactory interThreadPipeFactory; + + private CollectorSink<Object> collectorSink; + + public YieldStrategyConfiguration(final Object... elements) { + intraThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false); + interThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false); + + InitialElementProducer<Object> producer = buildProducer(elements); + addThreadableStage(producer); + + Stage consumer = buildConsumer(producer); + addThreadableStage(consumer); + } + + private InitialElementProducer<Object> buildProducer(final Object... elements) { + InitialElementProducer<Object> initialElementProducer = new InitialElementProducer<Object>(elements); + + return initialElementProducer; + } + + private Relay<Object> buildConsumer(final InitialElementProducer<Object> producer) { + Relay<Object> relay = new Relay<Object>(); + CollectorSink<Object> collectorSink = new CollectorSink<Object>(); + + relay.setIdleStrategy(new YieldStrategy()); + + interThreadPipeFactory.create(producer.getOutputPort(), relay.getInputPort()); + intraThreadPipeFactory.create(relay.getOutputPort(), collectorSink.getInputPort()); + + this.collectorSink = collectorSink; + + return relay; + } + + public CollectorSink<Object> getCollectorSink() { + return collectorSink; + } +} -- GitLab