From 98b2edb632ccc93160c37cf7868fb73c9e8ac705 Mon Sep 17 00:00:00 2001 From: Christian Wulf <chw@informatik.uni-kiel.de> Date: Sat, 27 Jun 2015 23:29:02 +0200 Subject: [PATCH] fixed dynamic merger test --- .../java/teetime/framework/AbstractStage.java | 6 ++++ .../merger/BusyWaitingRoundRobinStrategy.java | 13 +++++-- .../stage/basic/merger/IMergerStrategy.java | 4 ++- .../teetime/stage/basic/merger/Merger.java | 6 ++-- .../basic/merger/RoundRobinStrategy.java | 8 +++++ .../basic/merger/dynamic/DynamicMerger.java | 1 + .../merger/dynamic/RemovePortAction.java | 14 ++++---- .../merger/dynamic/DynamicMergerTest.java | 36 +++++++++---------- 8 files changed, 56 insertions(+), 32 deletions(-) diff --git a/src/main/java/teetime/framework/AbstractStage.java b/src/main/java/teetime/framework/AbstractStage.java index af9bfb01..c6475900 100644 --- a/src/main/java/teetime/framework/AbstractStage.java +++ b/src/main/java/teetime/framework/AbstractStage.java @@ -298,6 +298,12 @@ public abstract class AbstractStage extends Stage { return outputPort; } + protected <T> DynamicInputPort<T> createDynamicInputPort() { + final DynamicInputPort<T> inputPort = new DynamicInputPort<T>(null, this, inputPorts.length); + inputPorts = addElementToArray(inputPort, inputPorts); + return inputPort; + } + @Override protected void removeDynamicPort(final DynamicOutputPort<?> dynamicOutputPort) { int index = dynamicOutputPort.getIndex(); diff --git a/src/main/java/teetime/stage/basic/merger/BusyWaitingRoundRobinStrategy.java b/src/main/java/teetime/stage/basic/merger/BusyWaitingRoundRobinStrategy.java index a440a1cf..0f7dcba7 100644 --- a/src/main/java/teetime/stage/basic/merger/BusyWaitingRoundRobinStrategy.java +++ b/src/main/java/teetime/stage/basic/merger/BusyWaitingRoundRobinStrategy.java @@ -16,7 +16,7 @@ package teetime.stage.basic.merger; import teetime.framework.InputPort; -import teetime.framework.NotEnoughInputException; +import teetime.framework.Stage; /** * @author Christian Wulf @@ -31,6 +31,9 @@ public final class BusyWaitingRoundRobinStrategy implements IMergerStrategy { public <T> T getNextInput(final Merger<T> merger) { final InputPort<T>[] inputPorts = merger.getInputPorts(); final InputPort<T> inputPort = getOpenInputPort(inputPorts); + if (null == inputPort) { + return null; + } final T token = inputPort.receive(); if (null != token) { @@ -47,7 +50,7 @@ public final class BusyWaitingRoundRobinStrategy implements IMergerStrategy { while (inputPort.isClosed()) { this.index = (this.index + 1) % inputPorts.length; if (index == startedIndex) { - throw new NotEnoughInputException(); + return null; } inputPort = inputPorts[this.index]; } @@ -55,4 +58,10 @@ public final class BusyWaitingRoundRobinStrategy implements IMergerStrategy { return inputPort; } + @Override + public void onInputPortRemoved(final Stage stage, final InputPort<?> removedInputPort) { + Merger<?> merger = (Merger<?>) stage; + // correct the index if it is out-of-bounds + this.index = (this.index + 1) % merger.getInputPorts().length; + } } diff --git a/src/main/java/teetime/stage/basic/merger/IMergerStrategy.java b/src/main/java/teetime/stage/basic/merger/IMergerStrategy.java index 10e22f80..4fa1b4da 100644 --- a/src/main/java/teetime/stage/basic/merger/IMergerStrategy.java +++ b/src/main/java/teetime/stage/basic/merger/IMergerStrategy.java @@ -15,12 +15,14 @@ */ package teetime.stage.basic.merger; +import teetime.framework.InputPortRemovedListener; + /** * @author Nils Christian Ehmke * * @since 1.0 */ -public interface IMergerStrategy { +public interface IMergerStrategy extends InputPortRemovedListener { public <T> T getNextInput(Merger<T> merger); diff --git a/src/main/java/teetime/stage/basic/merger/Merger.java b/src/main/java/teetime/stage/basic/merger/Merger.java index df36520d..3e37f472 100644 --- a/src/main/java/teetime/stage/basic/merger/Merger.java +++ b/src/main/java/teetime/stage/basic/merger/Merger.java @@ -21,6 +21,7 @@ import java.util.Map; import java.util.Set; import teetime.framework.AbstractStage; +import teetime.framework.DynamicInputPort; import teetime.framework.InputPort; import teetime.framework.OutputPort; import teetime.framework.signal.ISignal; @@ -51,6 +52,7 @@ public class Merger<T> extends AbstractStage { public Merger(final IMergerStrategy strategy) { this.signalMap = new HashMap<Class<? extends ISignal>, Set<InputPort<?>>>(); this.strategy = strategy; + addInputPortRemovedListener(strategy); } @Override @@ -108,8 +110,8 @@ public class Merger<T> extends AbstractStage { return (InputPort<T>[]) super.getInputPorts(); } - public InputPort<T> getNewInputPort() { - return this.createInputPort(); + public DynamicInputPort<T> getNewInputPort() { + return this.createDynamicInputPort(); } public OutputPort<T> getOutputPort() { diff --git a/src/main/java/teetime/stage/basic/merger/RoundRobinStrategy.java b/src/main/java/teetime/stage/basic/merger/RoundRobinStrategy.java index 89dee9a1..d554e0ac 100644 --- a/src/main/java/teetime/stage/basic/merger/RoundRobinStrategy.java +++ b/src/main/java/teetime/stage/basic/merger/RoundRobinStrategy.java @@ -16,6 +16,7 @@ package teetime.stage.basic.merger; import teetime.framework.InputPort; +import teetime.framework.Stage; /** * @author Nils Christian Ehmke @@ -49,4 +50,11 @@ public final class RoundRobinStrategy implements IMergerStrategy { return inputPort; } + @Override + public void onInputPortRemoved(final Stage stage, final InputPort<?> removedInputPort) { + Merger<?> merger = (Merger<?>) stage; + // correct the index if it is out-of-bounds + this.index = (this.index + 1) % merger.getInputPorts().length; + } + } diff --git a/src/main/java/teetime/stage/basic/merger/dynamic/DynamicMerger.java b/src/main/java/teetime/stage/basic/merger/dynamic/DynamicMerger.java index ce7016c5..4e70a825 100644 --- a/src/main/java/teetime/stage/basic/merger/dynamic/DynamicMerger.java +++ b/src/main/java/teetime/stage/basic/merger/dynamic/DynamicMerger.java @@ -35,4 +35,5 @@ public class DynamicMerger<T> extends Merger<T> { public boolean addPortActionRequest(final PortAction<DynamicMerger<T>> newPortActionRequest) { return portActions.offer(newPortActionRequest); } + } diff --git a/src/main/java/teetime/stage/basic/merger/dynamic/RemovePortAction.java b/src/main/java/teetime/stage/basic/merger/dynamic/RemovePortAction.java index 01956251..0bef9337 100644 --- a/src/main/java/teetime/stage/basic/merger/dynamic/RemovePortAction.java +++ b/src/main/java/teetime/stage/basic/merger/dynamic/RemovePortAction.java @@ -17,13 +17,13 @@ public class RemovePortAction<T> implements PortAction<DynamicMerger<T>> { public void execute(final DynamicMerger<T> dynamicMerger) { InputPort<?> inputPortsToRemove; - // if (dynamicMerger instanceof ControlledDynamicMerger) { - // // for testing purposes only - // InputPort<?>[] inputPorts = ((ControlledDynamicMerger<?>) dynamicMerger).getInputPorts(); - // inputPortsToRemove = inputPorts[inputPorts.length - 1]; - // } else { - inputPortsToRemove = inputPort; - // } + if (null == inputPort) { + // for testing purposes only + InputPort<?>[] inputPorts = ((DynamicMerger<?>) dynamicMerger).getInputPorts(); + inputPortsToRemove = inputPorts[inputPorts.length - 1]; + } else { + inputPortsToRemove = inputPort; + } dynamicMerger.removeDynamicPort((DynamicInputPort<?>) inputPortsToRemove); } diff --git a/src/test/java/teetime/stage/basic/merger/dynamic/DynamicMergerTest.java b/src/test/java/teetime/stage/basic/merger/dynamic/DynamicMergerTest.java index 9f49fad6..ee7dbf57 100644 --- a/src/test/java/teetime/stage/basic/merger/dynamic/DynamicMergerTest.java +++ b/src/test/java/teetime/stage/basic/merger/dynamic/DynamicMergerTest.java @@ -1,7 +1,6 @@ package teetime.stage.basic.merger.dynamic; import static org.hamcrest.Matchers.contains; -import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; @@ -59,32 +58,29 @@ public class DynamicMergerTest { analysis.executeBlocking(); - assertThat(config.getOutputElements(), containsInAnyOrder(0, 1, 2, 3, 4, 5, 6)); + assertThat(config.getOutputElements(), contains(0, 1, 2, 3, 4, 5, 6)); } @Test public void shouldWorkWithRemoveActionTriggers() throws Exception { - List<Integer> inputNumbers = Arrays.asList(0, 1, 2, 3, 4, 5); + List<Integer> inputNumbers = Arrays.asList(0, 1, 2); @SuppressWarnings("unchecked") PortAction<DynamicMerger<Integer>>[] inputActions = new PortAction[6]; - // inputActions[0] = createPortCreateAction(); - // inputActions[1] = new RemovePortAction<Integer>(null); - // inputActions[2] = createPortCreateAction(); - // inputActions[3] = createPortCreateAction(); - // inputActions[4] = new RemovePortAction<Integer>(null); - // inputActions[5] = new RemovePortAction<Integer>(null); - // - // ControlledMergerTestConfig<Integer> config = new ControlledMergerTestConfig<Integer>(inputNumbers, Arrays.asList(inputActions)); - // Execution<ControlledMergerTestConfig<Integer>> analysis = new Execution<ControlledMergerTestConfig<Integer>>(config, - // new TerminatingExceptionListenerFactory()); - // - // analysis.executeBlocking(); - // - // assertThat(config.getOutputElements(), contains(0, 1, 2, 4, 5)); - // assertValuesForIndex(inputActions, Collections.<Integer> emptyList(), 0); - // assertValuesForIndex(inputActions, Arrays.asList(3), 2); - // assertValuesForIndex(inputActions, Collections.<Integer> emptyList(), 3); + inputActions[0] = createPortCreateAction(3); + inputActions[1] = new RemovePortAction<Integer>(null); + inputActions[2] = createPortCreateAction(4); + inputActions[3] = createPortCreateAction(5); + inputActions[4] = new RemovePortAction<Integer>(null); + inputActions[5] = new RemovePortAction<Integer>(null); + + DynamicMergerTestConfig<Integer> config = new DynamicMergerTestConfig<Integer>(inputNumbers, Arrays.asList(inputActions)); + Execution<DynamicMergerTestConfig<Integer>> analysis = new Execution<DynamicMergerTestConfig<Integer>>(config, + new TerminatingExceptionListenerFactory()); + + analysis.executeBlocking(); + + assertThat(config.getOutputElements(), contains(0, 1, 2, 4, 5)); } private PortAction<DynamicMerger<Integer>> createPortCreateAction(final Integer number) { -- GitLab