diff --git a/src/main/java/teetime/framework/ConfigurationContext.java b/src/main/java/teetime/framework/ConfigurationContext.java index 444bafaabadb2bfb28cd11b106be3098263d8a0f..483b1ecbdb93ee23cfd9fec89906d947ba1d9787 100644 --- a/src/main/java/teetime/framework/ConfigurationContext.java +++ b/src/main/java/teetime/framework/ConfigurationContext.java @@ -55,7 +55,9 @@ public final class ConfigurationContext { */ final <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { if (sourcePort.getOwningStage().getInputPorts().length == 0) { - addThreadableStage(sourcePort.getOwningStage(), sourcePort.getOwningStage().getId()); + if (!threadableStages.containsKey(sourcePort.getOwningStage())) { + addThreadableStage(sourcePort.getOwningStage(), sourcePort.getOwningStage().getId()); + } } if (sourcePort.getPipe() != null || targetPort.getPipe() != null) { LOGGER.warn("Overwriting existing pipe while connecting stages " + diff --git a/src/test/java/teetime/framework/ExecutionTest.java b/src/test/java/teetime/framework/ExecutionTest.java index 028513cf2a29be48d60d00445efb186fade57d70..bc7d850c158edd47da941c1f9bbea2dbd896ec95 100644 --- a/src/test/java/teetime/framework/ExecutionTest.java +++ b/src/test/java/teetime/framework/ExecutionTest.java @@ -176,4 +176,23 @@ public class ExecutionTest { } + @Test + public void threadNameing() { + NameConfig configuration = new NameConfig(); + Execution<NameConfig> execution = new Execution<NameConfig>(configuration); + assertThat(configuration.stageWithNamedThread.getOwningThread().getName(), is("TestName")); + } + + private class NameConfig extends Configuration { + + public InitialElementProducer<Object> stageWithNamedThread; + + public NameConfig() { + stageWithNamedThread = new InitialElementProducer<Object>(new Object()); + addThreadableStage(stageWithNamedThread, "TestName"); + connectPorts(stageWithNamedThread.getOutputPort(), new Sink().getInputPort()); + } + + } + }