diff --git a/src/main/java/teetime/framework/AbstractCompositeStage.java b/src/main/java/teetime/framework/AbstractCompositeStage.java index 2a9f0e79e031689c559d81ee8223d28baf26a5de..59b8a5ef242a301beb788f1ba142c7fde144d365 100644 --- a/src/main/java/teetime/framework/AbstractCompositeStage.java +++ b/src/main/java/teetime/framework/AbstractCompositeStage.java @@ -28,6 +28,9 @@ public abstract class AbstractCompositeStage extends Configuration { private final ConfigurationContext context; public AbstractCompositeStage(final ConfigurationContext context) { + if (null == context) { + throw new IllegalArgumentException("Context may not be null."); + } this.context = context; } diff --git a/src/main/java/teetime/framework/ConfigurationContext.java b/src/main/java/teetime/framework/ConfigurationContext.java index a9acaebaeb676b792c369d2f588efad2fcbe24e1..4e2f5f6acff99a2f951db19ec8f93e6a000fab7e 100644 --- a/src/main/java/teetime/framework/ConfigurationContext.java +++ b/src/main/java/teetime/framework/ConfigurationContext.java @@ -202,9 +202,9 @@ public abstract class ConfigurationContext extends Configuration { if (sourcePort.getOwningStage().getInputPorts().length == 0 && !threadableStages.contains(sourcePort.getOwningStage())) { addThreadableStage(sourcePort.getOwningStage()); } - if (sourcePort.pipe != null) { - LOGGER.warn("Overwritting existing pipe while connecting stages " + sourcePort.getOwningStage().getId() + " and " + targetPort.getOwningStage().getId() - + "."); + if (sourcePort.getPipe() != null || targetPort.getPipe() != null) { + LOGGER.warn("Overwriting existing pipe while connecting stages " + + sourcePort.getOwningStage().getId() + " and " + targetPort.getOwningStage().getId() + "."); } new InstantiationPipe(sourcePort, targetPort, capacity); } diff --git a/src/main/java/teetime/framework/pipe/InstantiationPipe.java b/src/main/java/teetime/framework/pipe/InstantiationPipe.java index 64e6dbf21689280e9f0c9e95444374e2aefde725..b922f15ec7978fa255fa9aae4d8c2a4d99ebe6dd 100644 --- a/src/main/java/teetime/framework/pipe/InstantiationPipe.java +++ b/src/main/java/teetime/framework/pipe/InstantiationPipe.java @@ -21,13 +21,16 @@ import teetime.framework.signal.ISignal; public class InstantiationPipe implements IPipe { - private final InputPort<?> target; + private static final String ERROR_MESSAGE = "This must not be called while executing the configuration"; + + private final InputPort<?> targetPort; private final int capacity; public <T> InstantiationPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { - this.target = targetPort; + this.targetPort = targetPort; this.capacity = capacity; sourcePort.setPipe(this); + targetPort.setPipe(this); } public int getCapacity() { @@ -36,67 +39,67 @@ public class InstantiationPipe implements IPipe { @Override public InputPort<?> getTargetPort() { - return this.target; + return this.targetPort; } @Override public boolean add(final Object element) { - throw new IllegalStateException("This must not be called while executing the configuration"); + throw new IllegalStateException(ERROR_MESSAGE); } @Override public boolean addNonBlocking(final Object element) { - throw new IllegalStateException("This must not be called while executing the configuration"); + throw new IllegalStateException(ERROR_MESSAGE); } @Override public boolean isEmpty() { - throw new IllegalStateException("This must not be called while executing the configuration"); + throw new IllegalStateException(ERROR_MESSAGE); } @Override public int size() { - throw new IllegalStateException("This must not be called while executing the configuration"); + throw new IllegalStateException(ERROR_MESSAGE); } @Override public Object removeLast() { - throw new IllegalStateException("This must not be called while executing the configuration"); + throw new IllegalStateException(ERROR_MESSAGE); } @Override public void sendSignal(final ISignal signal) { - throw new IllegalStateException("This must not be called while executing the configuration"); + throw new IllegalStateException(ERROR_MESSAGE); } @Override public void reportNewElement() { - throw new IllegalStateException("This must not be called while executing the configuration"); + throw new IllegalStateException(ERROR_MESSAGE); } @Override public boolean isClosed() { - throw new IllegalStateException("This must not be called while executing the configuration"); + throw new IllegalStateException(ERROR_MESSAGE); } @Override public boolean hasMore() { - throw new IllegalStateException("This must not be called while executing the configuration"); + throw new IllegalStateException(ERROR_MESSAGE); } @Override public void waitForStartSignal() throws InterruptedException { - throw new IllegalStateException("This must not be called while executing the configuration"); + throw new IllegalStateException(ERROR_MESSAGE); } @Override public void waitForInitializingSignal() throws InterruptedException { - throw new IllegalStateException("This must not be called while executing the configuration"); + throw new IllegalStateException(ERROR_MESSAGE); } @Override public void close() { - throw new IllegalStateException("This must not be called while executing the configuration"); + throw new IllegalStateException(ERROR_MESSAGE); } }