diff --git a/src/main/java/teetime/stage/basic/distributor/dynamic/ControlledDynamicDistributor.java b/src/main/java/teetime/stage/basic/distributor/dynamic/ControlledDynamicDistributor.java deleted file mode 100644 index 4d07965154b33cda8b0b10742e2fd32d4826146b..0000000000000000000000000000000000000000 --- a/src/main/java/teetime/stage/basic/distributor/dynamic/ControlledDynamicDistributor.java +++ /dev/null @@ -1,21 +0,0 @@ -package teetime.stage.basic.distributor.dynamic; - -import teetime.util.framework.port.PortActionHelper; - -class ControlledDynamicDistributor<T> extends DynamicDistributor<T> { - - @Override - protected void checkForPendingPortActionRequest() { - try { - PortActionHelper.checkBlockingForPendingPortActionRequest(this, portActions); - } catch (InterruptedException e) { - throw new IllegalStateException(e); - } - } - - // @Override - // protected OutputPort<?>[] getOutputPorts() { // repeated declaration for testing purposes - // return super.getOutputPorts(); - // } - -} diff --git a/src/main/java/teetime/stage/basic/distributor/dynamic/DynamicDistributor.java b/src/main/java/teetime/stage/basic/distributor/dynamic/DynamicDistributor.java index 72cf755a31c17c10e58cee81ebcd5277446faabd..b068f4df2384342114e8061b8099c5dec792290e 100644 --- a/src/main/java/teetime/stage/basic/distributor/dynamic/DynamicDistributor.java +++ b/src/main/java/teetime/stage/basic/distributor/dynamic/DynamicDistributor.java @@ -8,6 +8,8 @@ import teetime.framework.OutputPortRemovedListener; import teetime.framework.Stage; import teetime.framework.signal.TerminatingSignal; import teetime.stage.basic.distributor.Distributor; +import teetime.stage.basic.distributor.IDistributorStrategy; +import teetime.stage.basic.distributor.RoundRobinStrategy2; import teetime.util.framework.port.PortAction; import teetime.util.framework.port.PortActionHelper; @@ -15,7 +17,15 @@ public class DynamicDistributor<T> extends Distributor<T> implements OutputPortR protected final BlockingQueue<PortAction<DynamicDistributor<T>>> portActions; + /** + * Uses {@link RoundRobinStrategy2} as default distributor strategy. + */ public DynamicDistributor() { + this(new RoundRobinStrategy2()); + } + + public DynamicDistributor(final IDistributorStrategy strategy) { + super(strategy); this.portActions = PortActionHelper.createPortActionQueue(); addOutputPortRemovedListener(this); } diff --git a/src/main/java/teetime/stage/basic/distributor/dynamic/RemovePortAction.java b/src/main/java/teetime/stage/basic/distributor/dynamic/RemovePortAction.java index 345f5c59e0c41096284cd9745d74a674f11e1ca5..866b82780be6fb58b96206ca924724fff0f8b749 100644 --- a/src/main/java/teetime/stage/basic/distributor/dynamic/RemovePortAction.java +++ b/src/main/java/teetime/stage/basic/distributor/dynamic/RemovePortAction.java @@ -17,9 +17,9 @@ public class RemovePortAction<T> implements PortAction<DynamicDistributor<T>> { public void execute(final DynamicDistributor<T> dynamicDistributor) { OutputPort<?> outputPortToRemove; - if (dynamicDistributor instanceof ControlledDynamicDistributor) { + if (null == outputPort) { // for testing purposes only - OutputPort<?>[] outputPorts = ((ControlledDynamicDistributor<?>) dynamicDistributor).getOutputPorts(); + OutputPort<?>[] outputPorts = ((DynamicDistributor<?>) dynamicDistributor).getOutputPorts(); outputPortToRemove = outputPorts[outputPorts.length - 1]; } else { outputPortToRemove = outputPort; diff --git a/src/test/java/teetime/stage/basic/distributor/dynamic/ControlledDistributorTest.java b/src/test/java/teetime/stage/basic/distributor/dynamic/ControlledDistributorTest.java index 28262b674daa340af34a8625e7fabc44be6d5cd4..86667178dbcf7f1866c38f6b661a5c3eb4858b07 100644 --- a/src/test/java/teetime/stage/basic/distributor/dynamic/ControlledDistributorTest.java +++ b/src/test/java/teetime/stage/basic/distributor/dynamic/ControlledDistributorTest.java @@ -28,8 +28,8 @@ public class ControlledDistributorTest { @SuppressWarnings("unchecked") List<PortAction<DynamicDistributor<Integer>>> inputActions = Arrays.asList(createAction, createAction, createAction, createAction, createAction); - ControlledDistributorTestConfig<Integer> config = new ControlledDistributorTestConfig<Integer>(inputNumbers, inputActions); - Execution<ControlledDistributorTestConfig<Integer>> analysis = new Execution<ControlledDistributorTestConfig<Integer>>(config, + DynamicDistributorTestConfig<Integer> config = new DynamicDistributorTestConfig<Integer>(inputNumbers, inputActions); + Execution<DynamicDistributorTestConfig<Integer>> analysis = new Execution<DynamicDistributorTestConfig<Integer>>(config, new TerminatingExceptionListenerFactory()); analysis.executeBlocking(); @@ -48,8 +48,8 @@ public class ControlledDistributorTest { inputActions[i] = createAction; } - ControlledDistributorTestConfig<Integer> config = new ControlledDistributorTestConfig<Integer>(inputNumbers, Arrays.asList(inputActions)); - Execution<ControlledDistributorTestConfig<Integer>> analysis = new Execution<ControlledDistributorTestConfig<Integer>>(config, + DynamicDistributorTestConfig<Integer> config = new DynamicDistributorTestConfig<Integer>(inputNumbers, Arrays.asList(inputActions)); + Execution<DynamicDistributorTestConfig<Integer>> analysis = new Execution<DynamicDistributorTestConfig<Integer>>(config, new TerminatingExceptionListenerFactory()); analysis.executeBlocking(); @@ -75,8 +75,8 @@ public class ControlledDistributorTest { inputActions[4] = new RemovePortAction<Integer>(null); inputActions[5] = new RemovePortAction<Integer>(null); - ControlledDistributorTestConfig<Integer> config = new ControlledDistributorTestConfig<Integer>(inputNumbers, Arrays.asList(inputActions)); - Execution<ControlledDistributorTestConfig<Integer>> analysis = new Execution<ControlledDistributorTestConfig<Integer>>(config, + DynamicDistributorTestConfig<Integer> config = new DynamicDistributorTestConfig<Integer>(inputNumbers, Arrays.asList(inputActions)); + Execution<DynamicDistributorTestConfig<Integer>> analysis = new Execution<DynamicDistributorTestConfig<Integer>>(config, new TerminatingExceptionListenerFactory()); analysis.executeBlocking(); @@ -102,13 +102,13 @@ public class ControlledDistributorTest { assertThat(collectorSink.getElements(), is(values)); } - private static class ControlledDistributorTestConfig<T> extends Configuration { + private static class DynamicDistributorTestConfig<T> extends Configuration { private final CollectorSink<T> collectorSink; - public ControlledDistributorTestConfig(final List<T> elements, final List<PortAction<DynamicDistributor<T>>> inputActions) { + public DynamicDistributorTestConfig(final List<T> elements, final List<PortAction<DynamicDistributor<T>>> inputActions) { InitialElementProducer<T> initialElementProducer = new InitialElementProducer<T>(elements); - DynamicDistributor<T> distributor = new ControlledDynamicDistributor<T>(); + DynamicDistributor<T> distributor = new DynamicDistributor<T>(); collectorSink = new CollectorSink<T>(); connectPorts(initialElementProducer.getOutputPort(), distributor.getInputPort());