From bd6cfa6c48cb46b4ea4b022999761859270bab58 Mon Sep 17 00:00:00 2001 From: Christian Wulf <chw@informatik.uni-kiel.de> Date: Thu, 2 Jul 2015 17:17:34 +0200 Subject: [PATCH] simplified distributor.dynamic.RemovePortAction --- .../teetime/framework/DynamicOutputPort.java | 4 +-- .../stage/basic/distributor/Distributor.java | 3 +- .../distributor/dynamic/CreatePortAction.java | 24 ++++++++++++--- .../dynamic/PortActionListener.java | 8 +++++ .../distributor/dynamic/RemovePortAction.java | 17 +++-------- .../dynamic/DynamicDistributorTest.java | 30 +++++++++++++------ .../distributor/dynamic/PortContainer.java | 29 ++++++++++++++++++ 7 files changed, 86 insertions(+), 29 deletions(-) create mode 100644 src/main/java/teetime/stage/basic/distributor/dynamic/PortActionListener.java create mode 100644 src/test/java/teetime/stage/basic/distributor/dynamic/PortContainer.java diff --git a/src/main/java/teetime/framework/DynamicOutputPort.java b/src/main/java/teetime/framework/DynamicOutputPort.java index ef84e209..a84bd9c4 100644 --- a/src/main/java/teetime/framework/DynamicOutputPort.java +++ b/src/main/java/teetime/framework/DynamicOutputPort.java @@ -24,11 +24,11 @@ package teetime.framework; * * @since 1.2 */ -public final class DynamicOutputPort<T> extends OutputPort<T> { +public class DynamicOutputPort<T> extends OutputPort<T> { private int index; - DynamicOutputPort(final Class<T> type, final Stage owningStage, final int index) { + protected DynamicOutputPort(final Class<T> type, final Stage owningStage, final int index) { super(type, owningStage, null); this.index = index; } diff --git a/src/main/java/teetime/stage/basic/distributor/Distributor.java b/src/main/java/teetime/stage/basic/distributor/Distributor.java index a9813bcd..cae21532 100644 --- a/src/main/java/teetime/stage/basic/distributor/Distributor.java +++ b/src/main/java/teetime/stage/basic/distributor/Distributor.java @@ -16,6 +16,7 @@ package teetime.stage.basic.distributor; import teetime.framework.AbstractConsumerStage; +import teetime.framework.DynamicOutputPort; import teetime.framework.OutputPort; import teetime.stage.basic.distributor.strategy.IDistributorStrategy; import teetime.stage.basic.distributor.strategy.RoundRobinStrategy2; @@ -47,7 +48,7 @@ public class Distributor<T> extends AbstractConsumerStage<T> { this.strategy.distribute((OutputPort<T>[]) this.getOutputPorts(), element); } - public OutputPort<T> getNewOutputPort() { + public DynamicOutputPort<T> getNewOutputPort() { return this.createDynamicOutputPort(); } diff --git a/src/main/java/teetime/stage/basic/distributor/dynamic/CreatePortAction.java b/src/main/java/teetime/stage/basic/distributor/dynamic/CreatePortAction.java index 0727cdd7..1e04ca08 100644 --- a/src/main/java/teetime/stage/basic/distributor/dynamic/CreatePortAction.java +++ b/src/main/java/teetime/stage/basic/distributor/dynamic/CreatePortAction.java @@ -15,9 +15,12 @@ */ package teetime.stage.basic.distributor.dynamic; +import java.util.ArrayList; +import java.util.List; + import teetime.framework.DynamicActuator; +import teetime.framework.DynamicOutputPort; import teetime.framework.InputPort; -import teetime.framework.OutputPort; import teetime.framework.pipe.SpScPipeFactory; import teetime.framework.signal.InitializingSignal; import teetime.framework.signal.StartingSignal; @@ -30,18 +33,21 @@ public class CreatePortAction<T> implements PortAction<DynamicDistributor<T>> { private final InputPort<T> inputPort; + private final List<PortActionListener<T>> listeners = new ArrayList<PortActionListener<T>>(); + public CreatePortAction(final InputPort<T> inputPort) { this.inputPort = inputPort; } @Override public void execute(final DynamicDistributor<T> dynamicDistributor) { - OutputPort<? extends T> newOutputPort = dynamicDistributor.getNewOutputPort(); + DynamicOutputPort<T> newOutputPort = dynamicDistributor.getNewOutputPort(); - onOutputPortCreated(newOutputPort); + processOutputPort(newOutputPort); + onOutputPortCreated(dynamicDistributor, newOutputPort); } - private void onOutputPortCreated(final OutputPort<? extends T> newOutputPort) { + private void processOutputPort(final DynamicOutputPort<T> newOutputPort) { INTER_THREAD_PIPE_FACTORY.create(newOutputPort, inputPort); DYNAMIC_ACTUATOR.startWithinNewThread(inputPort.getOwningStage()); @@ -52,7 +58,17 @@ public class CreatePortAction<T> implements PortAction<DynamicDistributor<T>> { // FIXME pass the new thread to the analysis so that it can terminate the thread at the end } + private void onOutputPortCreated(final DynamicDistributor<T> dynamicDistributor, final DynamicOutputPort<T> newOutputPort) { + for (PortActionListener<T> listener : listeners) { + listener.onOutputPortCreated(dynamicDistributor, newOutputPort); + } + } + InputPort<T> getInputPort() { // for testing purposes only return inputPort; } + + public void addPortActionListener(final PortActionListener<T> listener) { + listeners.add(listener); + } } diff --git a/src/main/java/teetime/stage/basic/distributor/dynamic/PortActionListener.java b/src/main/java/teetime/stage/basic/distributor/dynamic/PortActionListener.java new file mode 100644 index 00000000..10cd6c47 --- /dev/null +++ b/src/main/java/teetime/stage/basic/distributor/dynamic/PortActionListener.java @@ -0,0 +1,8 @@ +package teetime.stage.basic.distributor.dynamic; + +import teetime.framework.DynamicOutputPort; + +public interface PortActionListener<T> { + + void onOutputPortCreated(DynamicDistributor<T> distributor, DynamicOutputPort<T> port); +} diff --git a/src/main/java/teetime/stage/basic/distributor/dynamic/RemovePortAction.java b/src/main/java/teetime/stage/basic/distributor/dynamic/RemovePortAction.java index 3387fd57..461d3fdb 100644 --- a/src/main/java/teetime/stage/basic/distributor/dynamic/RemovePortAction.java +++ b/src/main/java/teetime/stage/basic/distributor/dynamic/RemovePortAction.java @@ -16,7 +16,6 @@ package teetime.stage.basic.distributor.dynamic; import teetime.framework.DynamicOutputPort; -import teetime.framework.OutputPort; import teetime.util.framework.port.PortAction; public class RemovePortAction<T> implements PortAction<DynamicDistributor<T>> { @@ -24,22 +23,14 @@ public class RemovePortAction<T> implements PortAction<DynamicDistributor<T>> { private final DynamicOutputPort<T> outputPort; public RemovePortAction(final DynamicOutputPort<T> outputPort) { - super(); + if (null == outputPort) { + throw new IllegalArgumentException("outputPort may not be null"); + } this.outputPort = outputPort; } @Override public void execute(final DynamicDistributor<T> dynamicDistributor) { - OutputPort<?> outputPortToRemove; - - if (null == outputPort) { - // for testing purposes only - OutputPort<?>[] outputPorts = ((DynamicDistributor<?>) dynamicDistributor).getOutputPorts(); - outputPortToRemove = outputPorts[outputPorts.length - 1]; - } else { - outputPortToRemove = outputPort; - } - - dynamicDistributor.removeDynamicPort((DynamicOutputPort<?>) outputPortToRemove); + dynamicDistributor.removeDynamicPort(outputPort); } } diff --git a/src/test/java/teetime/stage/basic/distributor/dynamic/DynamicDistributorTest.java b/src/test/java/teetime/stage/basic/distributor/dynamic/DynamicDistributorTest.java index 8d45c576..c62002f7 100644 --- a/src/test/java/teetime/stage/basic/distributor/dynamic/DynamicDistributorTest.java +++ b/src/test/java/teetime/stage/basic/distributor/dynamic/DynamicDistributorTest.java @@ -26,6 +26,7 @@ import java.util.List; import org.junit.Test; import teetime.framework.Configuration; +import teetime.framework.DynamicOutputPort; import teetime.framework.Execution; import teetime.framework.Stage; import teetime.framework.exceptionHandling.TerminatingExceptionListenerFactory; @@ -59,7 +60,7 @@ public class DynamicDistributorTest { @SuppressWarnings("unchecked") PortAction<DynamicDistributor<Integer>>[] inputActions = new PortAction[5]; for (int i = 0; i < inputActions.length; i++) { - PortAction<DynamicDistributor<Integer>> createAction = createPortCreateAction(); + PortAction<DynamicDistributor<Integer>> createAction = createPortCreateAction(new PortContainer<Integer>()); inputActions[i] = createAction; } @@ -83,12 +84,17 @@ public class DynamicDistributorTest { @SuppressWarnings("unchecked") PortAction<DynamicDistributor<Integer>>[] inputActions = new PortAction[6]; - inputActions[0] = createPortCreateAction(); - inputActions[1] = new RemovePortAction<Integer>(null); - inputActions[2] = createPortCreateAction(); - inputActions[3] = createPortCreateAction(); - inputActions[4] = new RemovePortAction<Integer>(null); - inputActions[5] = new RemovePortAction<Integer>(null); + + final PortContainer<Integer> portContainer0 = new PortContainer<Integer>(); + final PortContainer<Integer> portContainer1 = new PortContainer<Integer>(); + final PortContainer<Integer> portContainer2 = new PortContainer<Integer>(); + + inputActions[0] = createPortCreateAction(portContainer0); + inputActions[1] = new RemovePortAction<Integer>(portContainer0); + inputActions[2] = createPortCreateAction(portContainer1); + inputActions[3] = createPortCreateAction(portContainer2); + inputActions[4] = new RemovePortAction<Integer>(portContainer1); + inputActions[5] = new RemovePortAction<Integer>(portContainer2); DynamicDistributorTestConfig<Integer> config = new DynamicDistributorTestConfig<Integer>(inputNumbers, Arrays.asList(inputActions)); Execution<DynamicDistributorTestConfig<Integer>> analysis = new Execution<DynamicDistributorTestConfig<Integer>>(config, @@ -102,9 +108,15 @@ public class DynamicDistributorTest { assertValuesForIndex(inputActions, Collections.<Integer> emptyList(), 3); } - private PortAction<DynamicDistributor<Integer>> createPortCreateAction() { + private CreatePortAction<Integer> createPortCreateAction(final PortContainer<Integer> portContainer) { CollectorSink<Integer> newStage = new CollectorSink<Integer>(); - PortAction<DynamicDistributor<Integer>> portAction = new CreatePortAction<Integer>(newStage.getInputPort()); + CreatePortAction<Integer> portAction = new CreatePortAction<Integer>(newStage.getInputPort()); + portAction.addPortActionListener(new PortActionListener<Integer>() { + @Override + public void onOutputPortCreated(final DynamicDistributor<Integer> distributor, final DynamicOutputPort<Integer> port) { + portContainer.setPort(port); + } + }); return portAction; } diff --git a/src/test/java/teetime/stage/basic/distributor/dynamic/PortContainer.java b/src/test/java/teetime/stage/basic/distributor/dynamic/PortContainer.java new file mode 100644 index 00000000..8c354995 --- /dev/null +++ b/src/test/java/teetime/stage/basic/distributor/dynamic/PortContainer.java @@ -0,0 +1,29 @@ +package teetime.stage.basic.distributor.dynamic; + +import teetime.framework.DynamicOutputPort; + +/** + * Represents a container that eventually holds the output port that a {@link RemovePortAction} can use. + * + * @author Christian Wulf + * + * @param <T> + */ +class PortContainer<T> extends DynamicOutputPort<T> { + + private DynamicOutputPort<T> port; + + PortContainer() { + super(null, null, -1); + } + + @Override + public int getIndex() { + return port.getIndex(); + } + + public void setPort(final DynamicOutputPort<T> port) { + this.port = port; + } + +} -- GitLab