diff --git a/src/main/java/teetime/framework/AbstractConsumerStage.java b/src/main/java/teetime/framework/AbstractConsumerStage.java index 66333405a17928b2faddcc948affe6e146c4e17a..7346e281fc93a3a52da90d5fdcedd3ff1b08897b 100644 --- a/src/main/java/teetime/framework/AbstractConsumerStage.java +++ b/src/main/java/teetime/framework/AbstractConsumerStage.java @@ -1,9 +1,13 @@ package teetime.framework; +import teetime.framework.idle.IdleStrategy; + public abstract class AbstractConsumerStage<I> extends AbstractStage { protected final InputPort<I> inputPort = this.createInputPort(); + private IdleStrategy idleStrategy; // FIXME remove this word-around + public final InputPort<I> getInputPort() { return this.inputPort; } @@ -17,4 +21,11 @@ public abstract class AbstractConsumerStage<I> extends AbstractStage { protected abstract void execute(I element); + public IdleStrategy getIdleStrategy() { + return idleStrategy; + } + + public void setIdleStrategy(final IdleStrategy idleStrategy) { + this.idleStrategy = idleStrategy; + } } diff --git a/src/main/java/teetime/framework/AbstractInterThreadPipe.java b/src/main/java/teetime/framework/AbstractInterThreadPipe.java index 10646a4a966e7e767a3bc05a80ccf2c02f656fe0..e2c7d3492b7908bce21debc8024a696a603cabc1 100644 --- a/src/main/java/teetime/framework/AbstractInterThreadPipe.java +++ b/src/main/java/teetime/framework/AbstractInterThreadPipe.java @@ -24,12 +24,10 @@ public abstract class AbstractInterThreadPipe extends AbstractPipe { System.out.println("send signal: " + signal + " to " + cachedTargetStage); Thread owningThread = cachedTargetStage.getOwningThread(); - if (owningThread.getState() == State.WAITING || owningThread.getState() == State.TIMED_WAITING) { + if (null != owningThread && (owningThread.getState() == State.WAITING || owningThread.getState() == State.TIMED_WAITING)) { owningThread.interrupt(); System.out.println("interrupted " + owningThread); } - - System.out.println("Signal sent."); } /** diff --git a/src/main/java/teetime/framework/AbstractStage.java b/src/main/java/teetime/framework/AbstractStage.java index bfcfe9934b91a345980746a0366836fc9db3fd4c..174f5bd7f608bad8d5c955cea247394f747dbca4 100644 --- a/src/main/java/teetime/framework/AbstractStage.java +++ b/src/main/java/teetime/framework/AbstractStage.java @@ -23,6 +23,8 @@ public abstract class AbstractStage extends Stage { private final Set<ISignal> triggeredSignals = new HashSet<ISignal>(); private boolean shouldTerminate; + private boolean started; + private void connectUnconnectedOutputPorts() { for (OutputPort<?> outputPort : this.cachedOutputPorts) { if (null == outputPort.getPipe()) { // if port is unconnected @@ -37,7 +39,9 @@ public abstract class AbstractStage extends Stage { */ @Override public InputPort<?>[] getInputPorts() { - return this.cachedInputPorts; + // return this.cachedInputPorts; + System.out.println("inputPortList: " + inputPortList); + return inputPortList.toArray(new InputPort<?>[0]); } /** @@ -54,6 +58,7 @@ public abstract class AbstractStage extends Stage { public void onSignal(final ISignal signal, final InputPort<?> inputPort) { if (!this.signalAlreadyReceived(signal, inputPort)) { signal.trigger(this); + started = true; for (OutputPort<?> outputPort : this.outputPortList) { outputPort.sendSignal(signal); @@ -61,6 +66,11 @@ public abstract class AbstractStage extends Stage { } } + @Override + public boolean isStarted() { + return started; + } + /** * @param signal * arriving signal diff --git a/src/main/java/teetime/framework/Analysis.java b/src/main/java/teetime/framework/Analysis.java index 3f14c1aaa9883bde900f352846a42f5973f403a3..75485f6a0b678fa1018697dc8598de4d91dbe643 100644 --- a/src/main/java/teetime/framework/Analysis.java +++ b/src/main/java/teetime/framework/Analysis.java @@ -72,7 +72,13 @@ public class Analysis implements UncaughtExceptionHandler { for (Stage stage : threadableStageJobs) { switch (stage.getTerminationStrategy()) { case BY_SIGNAL: { - final Thread thread = new Thread(new RunnableConsumerStage(stage)); + RunnableConsumerStage runnable; + if (stage instanceof AbstractConsumerStage<?>) { + runnable = new RunnableConsumerStage(stage, ((AbstractConsumerStage<?>) stage).getIdleStrategy()); // FIXME remove this word-around + } else { + runnable = new RunnableConsumerStage(stage); + } + final Thread thread = new Thread(runnable); stage.setOwningThread(thread); this.consumerThreads.add(thread); break; diff --git a/src/main/java/teetime/framework/RunnableConsumerStage.java b/src/main/java/teetime/framework/RunnableConsumerStage.java index 8a74b08fb2db13ca7e0aa8190cf06b62dfa43eb2..d2a6ebf250a9e8020172473cb1295ac6b929e430 100644 --- a/src/main/java/teetime/framework/RunnableConsumerStage.java +++ b/src/main/java/teetime/framework/RunnableConsumerStage.java @@ -1,5 +1,7 @@ package teetime.framework; +import java.util.Arrays; + import teetime.framework.idle.IdleStrategy; import teetime.framework.idle.YieldStrategy; import teetime.framework.pipe.IPipe; @@ -20,13 +22,14 @@ public final class RunnableConsumerStage extends RunnableStage { @Override protected void beforeStageExecution() { - // TODO wait for starting signal + logger.trace("ENTRY beforeStageExecution"); + do { checkforSignals(); - // logger.trace("Signals checked."); Thread.yield(); - } while (stage.getInputPorts().length == 0); - logger.debug("Stage initialized"); + } while (!stage.isStarted()); + + logger.trace("EXIT beforeStageExecution"); } @Override @@ -50,6 +53,7 @@ public final class RunnableConsumerStage extends RunnableStage { private void checkforSignals() { // FIXME should getInputPorts() really be defined in Stage? InputPort<?>[] inputPorts = stage.getInputPorts(); + logger.debug("inputPorts: " + Arrays.toString(inputPorts)); for (InputPort<?> inputPort : inputPorts) { IPipe pipe = inputPort.getPipe(); if (pipe instanceof AbstractInterThreadPipe) { diff --git a/src/main/java/teetime/framework/Stage.java b/src/main/java/teetime/framework/Stage.java index f709fc50d11f7a434cef194be25f9b31380bde89..d7f0b0435d4e698def04ddc808c6c7cf8fad4ea4 100644 --- a/src/main/java/teetime/framework/Stage.java +++ b/src/main/java/teetime/framework/Stage.java @@ -94,4 +94,6 @@ public abstract class Stage { } protected abstract InputPort<?>[] getInputPorts(); + + protected abstract boolean isStarted(); } diff --git a/src/main/java/teetime/framework/pipe/SpScPipe.java b/src/main/java/teetime/framework/pipe/SpScPipe.java index f32e2fbec45041edf94959d497437376783269f2..58a3789d066dda5fb76a29c1ee2ffdc3fee9acdc 100644 --- a/src/main/java/teetime/framework/pipe/SpScPipe.java +++ b/src/main/java/teetime/framework/pipe/SpScPipe.java @@ -38,10 +38,13 @@ public final class SpScPipe extends AbstractInterThreadPipe { Thread.yield(); } + System.out.println("Added: " + element); + Thread owningThread = cachedTargetStage.getOwningThread(); - if (owningThread.getState() == State.WAITING || owningThread.getState() == State.TIMED_WAITING) { + if (null != owningThread && (owningThread.getState() == State.WAITING || owningThread.getState() == State.TIMED_WAITING)) { synchronized (cachedTargetStage) { cachedTargetStage.notify(); + System.out.println("Notified: " + cachedTargetStage); } } diff --git a/src/main/java/teetime/stage/io/EveryXthPrinter.java b/src/main/java/teetime/stage/io/EveryXthPrinter.java index e76e71d23ebc6c96418068cc56b88a6dc95d7af4..0d686f107701ee7ebe19183281b813f98f5ec4f6 100644 --- a/src/main/java/teetime/stage/io/EveryXthPrinter.java +++ b/src/main/java/teetime/stage/io/EveryXthPrinter.java @@ -75,4 +75,9 @@ public final class EveryXthPrinter<T> extends Stage { return distributor.getInputPorts(); } + @Override + protected boolean isStarted() { + return distributor.isStarted(); + } + } diff --git a/src/performancetest/java/teetime/framework/OldPipeline.java b/src/performancetest/java/teetime/framework/OldPipeline.java index 7109b83ddba42678889a377b4967d5060a035787..3ee0a72a15b0adf7dae5839e77602e16c6792189 100644 --- a/src/performancetest/java/teetime/framework/OldPipeline.java +++ b/src/performancetest/java/teetime/framework/OldPipeline.java @@ -72,4 +72,9 @@ public class OldPipeline<FirstStage extends Stage, LastStage extends Stage> exte return firstStage.getTerminationStrategy(); } + @Override + protected boolean isStarted() { + return firstStage.isStarted(); + } + } diff --git a/src/test/java/teetime/framework/RunnableConsumerStageTest.java b/src/test/java/teetime/framework/RunnableConsumerStageTest.java new file mode 100644 index 0000000000000000000000000000000000000000..c892f8496822a2dc6ae017aa2060811b2e74c223 --- /dev/null +++ b/src/test/java/teetime/framework/RunnableConsumerStageTest.java @@ -0,0 +1,80 @@ +package teetime.framework; + +import static org.junit.Assert.assertEquals; + +import java.lang.Thread.State; +import java.util.Collection; + +import org.junit.Test; + +import teetime.util.Pair; + +import com.google.common.base.Joiner; + +public class RunnableConsumerStageTest { + + @Test + public void testWaitingInfinitely() throws Exception { + WaitStrategyConfiguration waitStrategyConfiguration = new WaitStrategyConfiguration(5000, 1); + + final Analysis analysis = new Analysis(waitStrategyConfiguration); + analysis.init(); + Thread thread = new Thread(new Runnable() { + @Override + public void run() { + start(analysis); + } + }); + thread.start(); + + Thread.sleep(200); + + assertEquals(State.WAITING, thread.getState()); + assertEquals(0, waitStrategyConfiguration.getCollectorSink().getElements().size()); + } + + @Test + public void testWaitingFinitely() throws Exception { + WaitStrategyConfiguration waitStrategyConfiguration = new WaitStrategyConfiguration(300, 1); + + final Analysis analysis = new Analysis(waitStrategyConfiguration); + analysis.init(); + Thread thread = new Thread(new Runnable() { + @Override + public void run() { + start(analysis); + } + }); + thread.start(); + + Thread.sleep(200); + assertEquals(State.WAITING, thread.getState()); + + Thread.sleep(500); + assertEquals(State.TERMINATED, thread.getState()); + assertEquals(1, waitStrategyConfiguration.getCollectorSink().getElements().get(0)); + assertEquals(0, waitStrategyConfiguration.getCollectorSink().getElements().size()); + } + + @Test + public void testSimpleRun() throws Exception { + YieldStrategyConfiguration waitStrategyConfiguration = new YieldStrategyConfiguration(42); + + final Analysis analysis = new Analysis(waitStrategyConfiguration); + analysis.init(); + + start(analysis); + + assertEquals(42, waitStrategyConfiguration.getCollectorSink().getElements().get(0)); + assertEquals(1, waitStrategyConfiguration.getCollectorSink().getElements().size()); + } + + private void start(final Analysis analysis) { + Collection<Pair<Thread, Throwable>> exceptions = analysis.start(); + for (Pair<Thread, Throwable> pair : exceptions) { + System.out.println(pair.getSecond()); + System.out.println(Joiner.on("\n").join(pair.getSecond().getStackTrace())); + } + assertEquals(0, exceptions.size()); + } +} diff --git a/src/test/java/teetime/framework/WaitStrategyConfiguration.java b/src/test/java/teetime/framework/WaitStrategyConfiguration.java new file mode 100644 index 0000000000000000000000000000000000000000..62f9ebff91d56ceb71732836e3649e8e4613a2b0 --- /dev/null +++ b/src/test/java/teetime/framework/WaitStrategyConfiguration.java @@ -0,0 +1,70 @@ +package teetime.framework; + +import teetime.framework.idle.WaitStrategy; +import teetime.framework.pipe.IPipeFactory; +import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; +import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; +import teetime.stage.Clock; +import teetime.stage.CollectorSink; +import teetime.stage.InitialElementProducer; +import teetime.stage.Relay; +import teetime.stage.basic.Delay; + +class WaitStrategyConfiguration extends AnalysisConfiguration { + + private final IPipeFactory intraThreadPipeFactory; + private final IPipeFactory interThreadPipeFactory; + + private Delay<Object> delay; + private CollectorSink<Object> collectorSink; + + public WaitStrategyConfiguration(final long initialDelayInMs, final Object... elements) { + intraThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false); + interThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false); + + Stage producer = buildProducer(elements); + addThreadableStage(producer); + + Stage consumer = buildConsumer(delay); + addThreadableStage(consumer); + + Clock clock = buildClock(initialDelayInMs, delay); + addThreadableStage(clock); + } + + private Clock buildClock(final long initialDelayInMs, final Delay<Object> delay) { + Clock clock = new Clock(); + clock.setInitialDelayInMs(initialDelayInMs); + + interThreadPipeFactory.create(clock.getOutputPort(), delay.getTimestampTriggerInputPort()); + + return clock; + } + + private Stage buildProducer(final Object... elements) { + InitialElementProducer<Object> initialElementProducer = new InitialElementProducer<Object>(elements); + delay = new Delay<Object>(); + + intraThreadPipeFactory.create(initialElementProducer.getOutputPort(), delay.getInputPort()); + + return initialElementProducer; + } + + private Relay<Object> buildConsumer(final Delay<Object> delay) { + Relay<Object> relay = new Relay<Object>(); + CollectorSink<Object> collectorSink = new CollectorSink<Object>(); + + relay.setIdleStrategy(new WaitStrategy(relay)); + + interThreadPipeFactory.create(delay.getOutputPort(), relay.getInputPort()); + intraThreadPipeFactory.create(relay.getOutputPort(), collectorSink.getInputPort()); + + this.collectorSink = collectorSink; + + return relay; + } + + public CollectorSink<Object> getCollectorSink() { + return collectorSink; + } +} diff --git a/src/test/java/teetime/framework/YieldStrategyConfiguration.java b/src/test/java/teetime/framework/YieldStrategyConfiguration.java new file mode 100644 index 0000000000000000000000000000000000000000..57212fd9de8a48e7360ae62e1caa734757e2b178 --- /dev/null +++ b/src/test/java/teetime/framework/YieldStrategyConfiguration.java @@ -0,0 +1,51 @@ +package teetime.framework; + +import teetime.framework.idle.YieldStrategy; +import teetime.framework.pipe.IPipeFactory; +import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; +import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; +import teetime.stage.CollectorSink; +import teetime.stage.InitialElementProducer; +import teetime.stage.Relay; + +class YieldStrategyConfiguration extends AnalysisConfiguration { + private final IPipeFactory intraThreadPipeFactory; + private final IPipeFactory interThreadPipeFactory; + + private CollectorSink<Object> collectorSink; + + public YieldStrategyConfiguration(final Object... elements) { + intraThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false); + interThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false); + + InitialElementProducer<Object> producer = buildProducer(elements); + addThreadableStage(producer); + + Stage consumer = buildConsumer(producer); + addThreadableStage(consumer); + } + + private InitialElementProducer<Object> buildProducer(final Object... elements) { + InitialElementProducer<Object> initialElementProducer = new InitialElementProducer<Object>(elements); + + return initialElementProducer; + } + + private Relay<Object> buildConsumer(final InitialElementProducer<Object> producer) { + Relay<Object> relay = new Relay<Object>(); + CollectorSink<Object> collectorSink = new CollectorSink<Object>(); + + relay.setIdleStrategy(new YieldStrategy()); + + interThreadPipeFactory.create(producer.getOutputPort(), relay.getInputPort()); + intraThreadPipeFactory.create(relay.getOutputPort(), collectorSink.getInputPort()); + + this.collectorSink = collectorSink; + + return relay; + } + + public CollectorSink<Object> getCollectorSink() { + return collectorSink; + } +}