diff --git a/src/main/java/teetime/framework/DynamicOutputPort.java b/src/main/java/teetime/framework/DynamicOutputPort.java index ef84e209bbdd08f187599194f9e0eddf8433c0d8..a84bd9c4ec6c23eea75ceb1564c134c341cd0c32 100644 --- a/src/main/java/teetime/framework/DynamicOutputPort.java +++ b/src/main/java/teetime/framework/DynamicOutputPort.java @@ -24,11 +24,11 @@ package teetime.framework; * * @since 1.2 */ -public final class DynamicOutputPort<T> extends OutputPort<T> { +public class DynamicOutputPort<T> extends OutputPort<T> { private int index; - DynamicOutputPort(final Class<T> type, final Stage owningStage, final int index) { + protected DynamicOutputPort(final Class<T> type, final Stage owningStage, final int index) { super(type, owningStage, null); this.index = index; } diff --git a/src/main/java/teetime/stage/basic/distributor/Distributor.java b/src/main/java/teetime/stage/basic/distributor/Distributor.java index a9813bcd01f484c7d3f794eeb173b545e34825e2..cae215321a0c9dc885e0f46e524cb53beb5de1ef 100644 --- a/src/main/java/teetime/stage/basic/distributor/Distributor.java +++ b/src/main/java/teetime/stage/basic/distributor/Distributor.java @@ -16,6 +16,7 @@ package teetime.stage.basic.distributor; import teetime.framework.AbstractConsumerStage; +import teetime.framework.DynamicOutputPort; import teetime.framework.OutputPort; import teetime.stage.basic.distributor.strategy.IDistributorStrategy; import teetime.stage.basic.distributor.strategy.RoundRobinStrategy2; @@ -47,7 +48,7 @@ public class Distributor<T> extends AbstractConsumerStage<T> { this.strategy.distribute((OutputPort<T>[]) this.getOutputPorts(), element); } - public OutputPort<T> getNewOutputPort() { + public DynamicOutputPort<T> getNewOutputPort() { return this.createDynamicOutputPort(); } diff --git a/src/main/java/teetime/stage/basic/distributor/dynamic/CreatePortAction.java b/src/main/java/teetime/stage/basic/distributor/dynamic/CreatePortAction.java index 0727cdd75f1c6c0bf6d68cb336d754cce5e36b9e..1e04ca08fc7a7e9b469a7611ee26f4dc4946698a 100644 --- a/src/main/java/teetime/stage/basic/distributor/dynamic/CreatePortAction.java +++ b/src/main/java/teetime/stage/basic/distributor/dynamic/CreatePortAction.java @@ -15,9 +15,12 @@ */ package teetime.stage.basic.distributor.dynamic; +import java.util.ArrayList; +import java.util.List; + import teetime.framework.DynamicActuator; +import teetime.framework.DynamicOutputPort; import teetime.framework.InputPort; -import teetime.framework.OutputPort; import teetime.framework.pipe.SpScPipeFactory; import teetime.framework.signal.InitializingSignal; import teetime.framework.signal.StartingSignal; @@ -30,18 +33,21 @@ public class CreatePortAction<T> implements PortAction<DynamicDistributor<T>> { private final InputPort<T> inputPort; + private final List<PortActionListener<T>> listeners = new ArrayList<PortActionListener<T>>(); + public CreatePortAction(final InputPort<T> inputPort) { this.inputPort = inputPort; } @Override public void execute(final DynamicDistributor<T> dynamicDistributor) { - OutputPort<? extends T> newOutputPort = dynamicDistributor.getNewOutputPort(); + DynamicOutputPort<T> newOutputPort = dynamicDistributor.getNewOutputPort(); - onOutputPortCreated(newOutputPort); + processOutputPort(newOutputPort); + onOutputPortCreated(dynamicDistributor, newOutputPort); } - private void onOutputPortCreated(final OutputPort<? extends T> newOutputPort) { + private void processOutputPort(final DynamicOutputPort<T> newOutputPort) { INTER_THREAD_PIPE_FACTORY.create(newOutputPort, inputPort); DYNAMIC_ACTUATOR.startWithinNewThread(inputPort.getOwningStage()); @@ -52,7 +58,17 @@ public class CreatePortAction<T> implements PortAction<DynamicDistributor<T>> { // FIXME pass the new thread to the analysis so that it can terminate the thread at the end } + private void onOutputPortCreated(final DynamicDistributor<T> dynamicDistributor, final DynamicOutputPort<T> newOutputPort) { + for (PortActionListener<T> listener : listeners) { + listener.onOutputPortCreated(dynamicDistributor, newOutputPort); + } + } + InputPort<T> getInputPort() { // for testing purposes only return inputPort; } + + public void addPortActionListener(final PortActionListener<T> listener) { + listeners.add(listener); + } } diff --git a/src/main/java/teetime/stage/basic/distributor/dynamic/PortActionListener.java b/src/main/java/teetime/stage/basic/distributor/dynamic/PortActionListener.java new file mode 100644 index 0000000000000000000000000000000000000000..10cd6c4715afdc90d442da21d0af953a414b2cf5 --- /dev/null +++ b/src/main/java/teetime/stage/basic/distributor/dynamic/PortActionListener.java @@ -0,0 +1,8 @@ +package teetime.stage.basic.distributor.dynamic; + +import teetime.framework.DynamicOutputPort; + +public interface PortActionListener<T> { + + void onOutputPortCreated(DynamicDistributor<T> distributor, DynamicOutputPort<T> port); +} diff --git a/src/main/java/teetime/stage/basic/distributor/dynamic/RemovePortAction.java b/src/main/java/teetime/stage/basic/distributor/dynamic/RemovePortAction.java index 3387fd57bba1143a6c0d5d0d423afeab8a98a0f1..461d3fdba022d1609d5fba2e52c25cb50d41ccd1 100644 --- a/src/main/java/teetime/stage/basic/distributor/dynamic/RemovePortAction.java +++ b/src/main/java/teetime/stage/basic/distributor/dynamic/RemovePortAction.java @@ -16,7 +16,6 @@ package teetime.stage.basic.distributor.dynamic; import teetime.framework.DynamicOutputPort; -import teetime.framework.OutputPort; import teetime.util.framework.port.PortAction; public class RemovePortAction<T> implements PortAction<DynamicDistributor<T>> { @@ -24,22 +23,14 @@ public class RemovePortAction<T> implements PortAction<DynamicDistributor<T>> { private final DynamicOutputPort<T> outputPort; public RemovePortAction(final DynamicOutputPort<T> outputPort) { - super(); + if (null == outputPort) { + throw new IllegalArgumentException("outputPort may not be null"); + } this.outputPort = outputPort; } @Override public void execute(final DynamicDistributor<T> dynamicDistributor) { - OutputPort<?> outputPortToRemove; - - if (null == outputPort) { - // for testing purposes only - OutputPort<?>[] outputPorts = ((DynamicDistributor<?>) dynamicDistributor).getOutputPorts(); - outputPortToRemove = outputPorts[outputPorts.length - 1]; - } else { - outputPortToRemove = outputPort; - } - - dynamicDistributor.removeDynamicPort((DynamicOutputPort<?>) outputPortToRemove); + dynamicDistributor.removeDynamicPort(outputPort); } } diff --git a/src/test/java/teetime/stage/basic/distributor/dynamic/DynamicDistributorTest.java b/src/test/java/teetime/stage/basic/distributor/dynamic/DynamicDistributorTest.java index 8d45c5767bde6cee829c5354272534308765c43d..c62002f798e8c85ed5bfe29d6d8fecf64a5acef0 100644 --- a/src/test/java/teetime/stage/basic/distributor/dynamic/DynamicDistributorTest.java +++ b/src/test/java/teetime/stage/basic/distributor/dynamic/DynamicDistributorTest.java @@ -26,6 +26,7 @@ import java.util.List; import org.junit.Test; import teetime.framework.Configuration; +import teetime.framework.DynamicOutputPort; import teetime.framework.Execution; import teetime.framework.Stage; import teetime.framework.exceptionHandling.TerminatingExceptionListenerFactory; @@ -59,7 +60,7 @@ public class DynamicDistributorTest { @SuppressWarnings("unchecked") PortAction<DynamicDistributor<Integer>>[] inputActions = new PortAction[5]; for (int i = 0; i < inputActions.length; i++) { - PortAction<DynamicDistributor<Integer>> createAction = createPortCreateAction(); + PortAction<DynamicDistributor<Integer>> createAction = createPortCreateAction(new PortContainer<Integer>()); inputActions[i] = createAction; } @@ -83,12 +84,17 @@ public class DynamicDistributorTest { @SuppressWarnings("unchecked") PortAction<DynamicDistributor<Integer>>[] inputActions = new PortAction[6]; - inputActions[0] = createPortCreateAction(); - inputActions[1] = new RemovePortAction<Integer>(null); - inputActions[2] = createPortCreateAction(); - inputActions[3] = createPortCreateAction(); - inputActions[4] = new RemovePortAction<Integer>(null); - inputActions[5] = new RemovePortAction<Integer>(null); + + final PortContainer<Integer> portContainer0 = new PortContainer<Integer>(); + final PortContainer<Integer> portContainer1 = new PortContainer<Integer>(); + final PortContainer<Integer> portContainer2 = new PortContainer<Integer>(); + + inputActions[0] = createPortCreateAction(portContainer0); + inputActions[1] = new RemovePortAction<Integer>(portContainer0); + inputActions[2] = createPortCreateAction(portContainer1); + inputActions[3] = createPortCreateAction(portContainer2); + inputActions[4] = new RemovePortAction<Integer>(portContainer1); + inputActions[5] = new RemovePortAction<Integer>(portContainer2); DynamicDistributorTestConfig<Integer> config = new DynamicDistributorTestConfig<Integer>(inputNumbers, Arrays.asList(inputActions)); Execution<DynamicDistributorTestConfig<Integer>> analysis = new Execution<DynamicDistributorTestConfig<Integer>>(config, @@ -102,9 +108,15 @@ public class DynamicDistributorTest { assertValuesForIndex(inputActions, Collections.<Integer> emptyList(), 3); } - private PortAction<DynamicDistributor<Integer>> createPortCreateAction() { + private CreatePortAction<Integer> createPortCreateAction(final PortContainer<Integer> portContainer) { CollectorSink<Integer> newStage = new CollectorSink<Integer>(); - PortAction<DynamicDistributor<Integer>> portAction = new CreatePortAction<Integer>(newStage.getInputPort()); + CreatePortAction<Integer> portAction = new CreatePortAction<Integer>(newStage.getInputPort()); + portAction.addPortActionListener(new PortActionListener<Integer>() { + @Override + public void onOutputPortCreated(final DynamicDistributor<Integer> distributor, final DynamicOutputPort<Integer> port) { + portContainer.setPort(port); + } + }); return portAction; } diff --git a/src/test/java/teetime/stage/basic/distributor/dynamic/PortContainer.java b/src/test/java/teetime/stage/basic/distributor/dynamic/PortContainer.java new file mode 100644 index 0000000000000000000000000000000000000000..8c35499536150d582e062ccb714a36aa2fb8c044 --- /dev/null +++ b/src/test/java/teetime/stage/basic/distributor/dynamic/PortContainer.java @@ -0,0 +1,29 @@ +package teetime.stage.basic.distributor.dynamic; + +import teetime.framework.DynamicOutputPort; + +/** + * Represents a container that eventually holds the output port that a {@link RemovePortAction} can use. + * + * @author Christian Wulf + * + * @param <T> + */ +class PortContainer<T> extends DynamicOutputPort<T> { + + private DynamicOutputPort<T> port; + + PortContainer() { + super(null, null, -1); + } + + @Override + public int getIndex() { + return port.getIndex(); + } + + public void setPort(final DynamicOutputPort<T> port) { + this.port = port; + } + +}