diff --git a/src/main/java/teetime/framework/AnalysisContext.java b/src/main/java/teetime/framework/AnalysisContext.java index 67348213fad569acb04b68f84eca90cc7d64fd1d..5c7633676845878c0e2c0e049fa80285b4ef2e76 100644 --- a/src/main/java/teetime/framework/AnalysisContext.java +++ b/src/main/java/teetime/framework/AnalysisContext.java @@ -192,6 +192,9 @@ public abstract class AnalysisContext extends Network { */ @Override protected final <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { + if (sourcePort.getOwningStage().getInputPorts().length == 0) { + addThreadableStage(sourcePort.getOwningStage()); + } new InstantiationPipe(sourcePort, targetPort, capacity); } diff --git a/src/test/java/teetime/framework/AnalysisTest.java b/src/test/java/teetime/framework/AnalysisTest.java index fd15835d21f2516b9921ad421abd9b36ba8097af..a77c101a2c00eeb7bab460654238a2a0b1cb9f8c 100644 --- a/src/test/java/teetime/framework/AnalysisTest.java +++ b/src/test/java/teetime/framework/AnalysisTest.java @@ -150,4 +150,33 @@ public class AnalysisTest { } } + @Test + public void automaticallyAddHeadStages() { + AutomaticallyConfig context = new AutomaticallyConfig(); + new Analysis<AnalysisContext>(context).executeBlocking(); + assertTrue(context.executed); + } + + private class AutomaticallyConfig extends AnalysisContext { + + public boolean executed; + + public AutomaticallyConfig() { + AutomaticallyAddedStage aas = new AutomaticallyAddedStage(); + Sink<Object> sink = new Sink<Object>(); + connectPorts(aas.getOutputPort(), sink.getInputPort()); + } + + private class AutomaticallyAddedStage extends AbstractProducerStage<Object> { + + @Override + protected void execute() { + executed = true; + terminate(); + } + + } + + } + }