diff --git a/src/main/java/teetime/framework/AbstractStage.java b/src/main/java/teetime/framework/AbstractStage.java index af9bfb0176a194f146cecd0ae6a990ea21218738..c64759009269e25d8448f01690ea513400b427d5 100644 --- a/src/main/java/teetime/framework/AbstractStage.java +++ b/src/main/java/teetime/framework/AbstractStage.java @@ -298,6 +298,12 @@ public abstract class AbstractStage extends Stage { return outputPort; } + protected <T> DynamicInputPort<T> createDynamicInputPort() { + final DynamicInputPort<T> inputPort = new DynamicInputPort<T>(null, this, inputPorts.length); + inputPorts = addElementToArray(inputPort, inputPorts); + return inputPort; + } + @Override protected void removeDynamicPort(final DynamicOutputPort<?> dynamicOutputPort) { int index = dynamicOutputPort.getIndex(); diff --git a/src/main/java/teetime/stage/basic/merger/BusyWaitingRoundRobinStrategy.java b/src/main/java/teetime/stage/basic/merger/BusyWaitingRoundRobinStrategy.java index a440a1cf25ed7d4fb9af8a79671d7c6e58b59578..0f7dcba71ad478a2933fdfebf42610b4e1bd090b 100644 --- a/src/main/java/teetime/stage/basic/merger/BusyWaitingRoundRobinStrategy.java +++ b/src/main/java/teetime/stage/basic/merger/BusyWaitingRoundRobinStrategy.java @@ -16,7 +16,7 @@ package teetime.stage.basic.merger; import teetime.framework.InputPort; -import teetime.framework.NotEnoughInputException; +import teetime.framework.Stage; /** * @author Christian Wulf @@ -31,6 +31,9 @@ public final class BusyWaitingRoundRobinStrategy implements IMergerStrategy { public <T> T getNextInput(final Merger<T> merger) { final InputPort<T>[] inputPorts = merger.getInputPorts(); final InputPort<T> inputPort = getOpenInputPort(inputPorts); + if (null == inputPort) { + return null; + } final T token = inputPort.receive(); if (null != token) { @@ -47,7 +50,7 @@ public final class BusyWaitingRoundRobinStrategy implements IMergerStrategy { while (inputPort.isClosed()) { this.index = (this.index + 1) % inputPorts.length; if (index == startedIndex) { - throw new NotEnoughInputException(); + return null; } inputPort = inputPorts[this.index]; } @@ -55,4 +58,10 @@ public final class BusyWaitingRoundRobinStrategy implements IMergerStrategy { return inputPort; } + @Override + public void onInputPortRemoved(final Stage stage, final InputPort<?> removedInputPort) { + Merger<?> merger = (Merger<?>) stage; + // correct the index if it is out-of-bounds + this.index = (this.index + 1) % merger.getInputPorts().length; + } } diff --git a/src/main/java/teetime/stage/basic/merger/IMergerStrategy.java b/src/main/java/teetime/stage/basic/merger/IMergerStrategy.java index 10e22f807ba56df122815d215b8436e6be2ee93c..4fa1b4dab399ff94cf3e5eb7e5536847b5beaa0e 100644 --- a/src/main/java/teetime/stage/basic/merger/IMergerStrategy.java +++ b/src/main/java/teetime/stage/basic/merger/IMergerStrategy.java @@ -15,12 +15,14 @@ */ package teetime.stage.basic.merger; +import teetime.framework.InputPortRemovedListener; + /** * @author Nils Christian Ehmke * * @since 1.0 */ -public interface IMergerStrategy { +public interface IMergerStrategy extends InputPortRemovedListener { public <T> T getNextInput(Merger<T> merger); diff --git a/src/main/java/teetime/stage/basic/merger/Merger.java b/src/main/java/teetime/stage/basic/merger/Merger.java index df36520de40757208d871bfb8511c012589ef4fb..3e37f472727d447ad78990d2ddf96ab69af9df17 100644 --- a/src/main/java/teetime/stage/basic/merger/Merger.java +++ b/src/main/java/teetime/stage/basic/merger/Merger.java @@ -21,6 +21,7 @@ import java.util.Map; import java.util.Set; import teetime.framework.AbstractStage; +import teetime.framework.DynamicInputPort; import teetime.framework.InputPort; import teetime.framework.OutputPort; import teetime.framework.signal.ISignal; @@ -51,6 +52,7 @@ public class Merger<T> extends AbstractStage { public Merger(final IMergerStrategy strategy) { this.signalMap = new HashMap<Class<? extends ISignal>, Set<InputPort<?>>>(); this.strategy = strategy; + addInputPortRemovedListener(strategy); } @Override @@ -108,8 +110,8 @@ public class Merger<T> extends AbstractStage { return (InputPort<T>[]) super.getInputPorts(); } - public InputPort<T> getNewInputPort() { - return this.createInputPort(); + public DynamicInputPort<T> getNewInputPort() { + return this.createDynamicInputPort(); } public OutputPort<T> getOutputPort() { diff --git a/src/main/java/teetime/stage/basic/merger/RoundRobinStrategy.java b/src/main/java/teetime/stage/basic/merger/RoundRobinStrategy.java index 89dee9a14b0f7b5efda942bd9d241450360cb5fa..d554e0acd6123dc4692c875c76f9cc52ee4e9638 100644 --- a/src/main/java/teetime/stage/basic/merger/RoundRobinStrategy.java +++ b/src/main/java/teetime/stage/basic/merger/RoundRobinStrategy.java @@ -16,6 +16,7 @@ package teetime.stage.basic.merger; import teetime.framework.InputPort; +import teetime.framework.Stage; /** * @author Nils Christian Ehmke @@ -49,4 +50,11 @@ public final class RoundRobinStrategy implements IMergerStrategy { return inputPort; } + @Override + public void onInputPortRemoved(final Stage stage, final InputPort<?> removedInputPort) { + Merger<?> merger = (Merger<?>) stage; + // correct the index if it is out-of-bounds + this.index = (this.index + 1) % merger.getInputPorts().length; + } + } diff --git a/src/main/java/teetime/stage/basic/merger/dynamic/DynamicMerger.java b/src/main/java/teetime/stage/basic/merger/dynamic/DynamicMerger.java index ce7016c51f40785a7f35c54986955c056b6883a8..4e70a825c008feb7512c8e87ba3baa22cfb61cce 100644 --- a/src/main/java/teetime/stage/basic/merger/dynamic/DynamicMerger.java +++ b/src/main/java/teetime/stage/basic/merger/dynamic/DynamicMerger.java @@ -35,4 +35,5 @@ public class DynamicMerger<T> extends Merger<T> { public boolean addPortActionRequest(final PortAction<DynamicMerger<T>> newPortActionRequest) { return portActions.offer(newPortActionRequest); } + } diff --git a/src/main/java/teetime/stage/basic/merger/dynamic/RemovePortAction.java b/src/main/java/teetime/stage/basic/merger/dynamic/RemovePortAction.java index 0195625176d3caa417884d97513ff749dfeaab9c..0bef9337f7ee1f1af22405c9be672626bce88c03 100644 --- a/src/main/java/teetime/stage/basic/merger/dynamic/RemovePortAction.java +++ b/src/main/java/teetime/stage/basic/merger/dynamic/RemovePortAction.java @@ -17,13 +17,13 @@ public class RemovePortAction<T> implements PortAction<DynamicMerger<T>> { public void execute(final DynamicMerger<T> dynamicMerger) { InputPort<?> inputPortsToRemove; - // if (dynamicMerger instanceof ControlledDynamicMerger) { - // // for testing purposes only - // InputPort<?>[] inputPorts = ((ControlledDynamicMerger<?>) dynamicMerger).getInputPorts(); - // inputPortsToRemove = inputPorts[inputPorts.length - 1]; - // } else { - inputPortsToRemove = inputPort; - // } + if (null == inputPort) { + // for testing purposes only + InputPort<?>[] inputPorts = ((DynamicMerger<?>) dynamicMerger).getInputPorts(); + inputPortsToRemove = inputPorts[inputPorts.length - 1]; + } else { + inputPortsToRemove = inputPort; + } dynamicMerger.removeDynamicPort((DynamicInputPort<?>) inputPortsToRemove); } diff --git a/src/test/java/teetime/stage/basic/merger/dynamic/DynamicMergerTest.java b/src/test/java/teetime/stage/basic/merger/dynamic/DynamicMergerTest.java index 9f49fad6ad4bf931263e65cf33ad71a1e59a0743..ee7dbf57f2f57ea530137f49ec3941f5be350a29 100644 --- a/src/test/java/teetime/stage/basic/merger/dynamic/DynamicMergerTest.java +++ b/src/test/java/teetime/stage/basic/merger/dynamic/DynamicMergerTest.java @@ -1,7 +1,6 @@ package teetime.stage.basic.merger.dynamic; import static org.hamcrest.Matchers.contains; -import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; @@ -59,32 +58,29 @@ public class DynamicMergerTest { analysis.executeBlocking(); - assertThat(config.getOutputElements(), containsInAnyOrder(0, 1, 2, 3, 4, 5, 6)); + assertThat(config.getOutputElements(), contains(0, 1, 2, 3, 4, 5, 6)); } @Test public void shouldWorkWithRemoveActionTriggers() throws Exception { - List<Integer> inputNumbers = Arrays.asList(0, 1, 2, 3, 4, 5); + List<Integer> inputNumbers = Arrays.asList(0, 1, 2); @SuppressWarnings("unchecked") PortAction<DynamicMerger<Integer>>[] inputActions = new PortAction[6]; - // inputActions[0] = createPortCreateAction(); - // inputActions[1] = new RemovePortAction<Integer>(null); - // inputActions[2] = createPortCreateAction(); - // inputActions[3] = createPortCreateAction(); - // inputActions[4] = new RemovePortAction<Integer>(null); - // inputActions[5] = new RemovePortAction<Integer>(null); - // - // ControlledMergerTestConfig<Integer> config = new ControlledMergerTestConfig<Integer>(inputNumbers, Arrays.asList(inputActions)); - // Execution<ControlledMergerTestConfig<Integer>> analysis = new Execution<ControlledMergerTestConfig<Integer>>(config, - // new TerminatingExceptionListenerFactory()); - // - // analysis.executeBlocking(); - // - // assertThat(config.getOutputElements(), contains(0, 1, 2, 4, 5)); - // assertValuesForIndex(inputActions, Collections.<Integer> emptyList(), 0); - // assertValuesForIndex(inputActions, Arrays.asList(3), 2); - // assertValuesForIndex(inputActions, Collections.<Integer> emptyList(), 3); + inputActions[0] = createPortCreateAction(3); + inputActions[1] = new RemovePortAction<Integer>(null); + inputActions[2] = createPortCreateAction(4); + inputActions[3] = createPortCreateAction(5); + inputActions[4] = new RemovePortAction<Integer>(null); + inputActions[5] = new RemovePortAction<Integer>(null); + + DynamicMergerTestConfig<Integer> config = new DynamicMergerTestConfig<Integer>(inputNumbers, Arrays.asList(inputActions)); + Execution<DynamicMergerTestConfig<Integer>> analysis = new Execution<DynamicMergerTestConfig<Integer>>(config, + new TerminatingExceptionListenerFactory()); + + analysis.executeBlocking(); + + assertThat(config.getOutputElements(), contains(0, 1, 2, 4, 5)); } private PortAction<DynamicMerger<Integer>> createPortCreateAction(final Integer number) {