Skip to content
Snippets Groups Projects
Commit bd6cfa6c authored by Christian Wulf's avatar Christian Wulf
Browse files

simplified distributor.dynamic.RemovePortAction

parent 9e5fbfe5
Branches
Tags
No related merge requests found
...@@ -24,11 +24,11 @@ package teetime.framework; ...@@ -24,11 +24,11 @@ package teetime.framework;
* *
* @since 1.2 * @since 1.2
*/ */
public final class DynamicOutputPort<T> extends OutputPort<T> { public class DynamicOutputPort<T> extends OutputPort<T> {
private int index; private int index;
DynamicOutputPort(final Class<T> type, final Stage owningStage, final int index) { protected DynamicOutputPort(final Class<T> type, final Stage owningStage, final int index) {
super(type, owningStage, null); super(type, owningStage, null);
this.index = index; this.index = index;
} }
... ...
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
package teetime.stage.basic.distributor; package teetime.stage.basic.distributor;
import teetime.framework.AbstractConsumerStage; import teetime.framework.AbstractConsumerStage;
import teetime.framework.DynamicOutputPort;
import teetime.framework.OutputPort; import teetime.framework.OutputPort;
import teetime.stage.basic.distributor.strategy.IDistributorStrategy; import teetime.stage.basic.distributor.strategy.IDistributorStrategy;
import teetime.stage.basic.distributor.strategy.RoundRobinStrategy2; import teetime.stage.basic.distributor.strategy.RoundRobinStrategy2;
...@@ -47,7 +48,7 @@ public class Distributor<T> extends AbstractConsumerStage<T> { ...@@ -47,7 +48,7 @@ public class Distributor<T> extends AbstractConsumerStage<T> {
this.strategy.distribute((OutputPort<T>[]) this.getOutputPorts(), element); this.strategy.distribute((OutputPort<T>[]) this.getOutputPorts(), element);
} }
public OutputPort<T> getNewOutputPort() { public DynamicOutputPort<T> getNewOutputPort() {
return this.createDynamicOutputPort(); return this.createDynamicOutputPort();
} }
... ...
......
...@@ -15,9 +15,12 @@ ...@@ -15,9 +15,12 @@
*/ */
package teetime.stage.basic.distributor.dynamic; package teetime.stage.basic.distributor.dynamic;
import java.util.ArrayList;
import java.util.List;
import teetime.framework.DynamicActuator; import teetime.framework.DynamicActuator;
import teetime.framework.DynamicOutputPort;
import teetime.framework.InputPort; import teetime.framework.InputPort;
import teetime.framework.OutputPort;
import teetime.framework.pipe.SpScPipeFactory; import teetime.framework.pipe.SpScPipeFactory;
import teetime.framework.signal.InitializingSignal; import teetime.framework.signal.InitializingSignal;
import teetime.framework.signal.StartingSignal; import teetime.framework.signal.StartingSignal;
...@@ -30,18 +33,21 @@ public class CreatePortAction<T> implements PortAction<DynamicDistributor<T>> { ...@@ -30,18 +33,21 @@ public class CreatePortAction<T> implements PortAction<DynamicDistributor<T>> {
private final InputPort<T> inputPort; private final InputPort<T> inputPort;
private final List<PortActionListener<T>> listeners = new ArrayList<PortActionListener<T>>();
public CreatePortAction(final InputPort<T> inputPort) { public CreatePortAction(final InputPort<T> inputPort) {
this.inputPort = inputPort; this.inputPort = inputPort;
} }
@Override @Override
public void execute(final DynamicDistributor<T> dynamicDistributor) { public void execute(final DynamicDistributor<T> dynamicDistributor) {
OutputPort<? extends T> newOutputPort = dynamicDistributor.getNewOutputPort(); DynamicOutputPort<T> newOutputPort = dynamicDistributor.getNewOutputPort();
onOutputPortCreated(newOutputPort); processOutputPort(newOutputPort);
onOutputPortCreated(dynamicDistributor, newOutputPort);
} }
private void onOutputPortCreated(final OutputPort<? extends T> newOutputPort) { private void processOutputPort(final DynamicOutputPort<T> newOutputPort) {
INTER_THREAD_PIPE_FACTORY.create(newOutputPort, inputPort); INTER_THREAD_PIPE_FACTORY.create(newOutputPort, inputPort);
DYNAMIC_ACTUATOR.startWithinNewThread(inputPort.getOwningStage()); DYNAMIC_ACTUATOR.startWithinNewThread(inputPort.getOwningStage());
...@@ -52,7 +58,17 @@ public class CreatePortAction<T> implements PortAction<DynamicDistributor<T>> { ...@@ -52,7 +58,17 @@ public class CreatePortAction<T> implements PortAction<DynamicDistributor<T>> {
// FIXME pass the new thread to the analysis so that it can terminate the thread at the end // FIXME pass the new thread to the analysis so that it can terminate the thread at the end
} }
private void onOutputPortCreated(final DynamicDistributor<T> dynamicDistributor, final DynamicOutputPort<T> newOutputPort) {
for (PortActionListener<T> listener : listeners) {
listener.onOutputPortCreated(dynamicDistributor, newOutputPort);
}
}
InputPort<T> getInputPort() { // for testing purposes only InputPort<T> getInputPort() { // for testing purposes only
return inputPort; return inputPort;
} }
public void addPortActionListener(final PortActionListener<T> listener) {
listeners.add(listener);
}
} }
package teetime.stage.basic.distributor.dynamic;
import teetime.framework.DynamicOutputPort;
public interface PortActionListener<T> {
void onOutputPortCreated(DynamicDistributor<T> distributor, DynamicOutputPort<T> port);
}
...@@ -16,7 +16,6 @@ ...@@ -16,7 +16,6 @@
package teetime.stage.basic.distributor.dynamic; package teetime.stage.basic.distributor.dynamic;
import teetime.framework.DynamicOutputPort; import teetime.framework.DynamicOutputPort;
import teetime.framework.OutputPort;
import teetime.util.framework.port.PortAction; import teetime.util.framework.port.PortAction;
public class RemovePortAction<T> implements PortAction<DynamicDistributor<T>> { public class RemovePortAction<T> implements PortAction<DynamicDistributor<T>> {
...@@ -24,22 +23,14 @@ public class RemovePortAction<T> implements PortAction<DynamicDistributor<T>> { ...@@ -24,22 +23,14 @@ public class RemovePortAction<T> implements PortAction<DynamicDistributor<T>> {
private final DynamicOutputPort<T> outputPort; private final DynamicOutputPort<T> outputPort;
public RemovePortAction(final DynamicOutputPort<T> outputPort) { public RemovePortAction(final DynamicOutputPort<T> outputPort) {
super(); if (null == outputPort) {
throw new IllegalArgumentException("outputPort may not be null");
}
this.outputPort = outputPort; this.outputPort = outputPort;
} }
@Override @Override
public void execute(final DynamicDistributor<T> dynamicDistributor) { public void execute(final DynamicDistributor<T> dynamicDistributor) {
OutputPort<?> outputPortToRemove; dynamicDistributor.removeDynamicPort(outputPort);
if (null == outputPort) {
// for testing purposes only
OutputPort<?>[] outputPorts = ((DynamicDistributor<?>) dynamicDistributor).getOutputPorts();
outputPortToRemove = outputPorts[outputPorts.length - 1];
} else {
outputPortToRemove = outputPort;
}
dynamicDistributor.removeDynamicPort((DynamicOutputPort<?>) outputPortToRemove);
} }
} }
...@@ -26,6 +26,7 @@ import java.util.List; ...@@ -26,6 +26,7 @@ import java.util.List;
import org.junit.Test; import org.junit.Test;
import teetime.framework.Configuration; import teetime.framework.Configuration;
import teetime.framework.DynamicOutputPort;
import teetime.framework.Execution; import teetime.framework.Execution;
import teetime.framework.Stage; import teetime.framework.Stage;
import teetime.framework.exceptionHandling.TerminatingExceptionListenerFactory; import teetime.framework.exceptionHandling.TerminatingExceptionListenerFactory;
...@@ -59,7 +60,7 @@ public class DynamicDistributorTest { ...@@ -59,7 +60,7 @@ public class DynamicDistributorTest {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
PortAction<DynamicDistributor<Integer>>[] inputActions = new PortAction[5]; PortAction<DynamicDistributor<Integer>>[] inputActions = new PortAction[5];
for (int i = 0; i < inputActions.length; i++) { for (int i = 0; i < inputActions.length; i++) {
PortAction<DynamicDistributor<Integer>> createAction = createPortCreateAction(); PortAction<DynamicDistributor<Integer>> createAction = createPortCreateAction(new PortContainer<Integer>());
inputActions[i] = createAction; inputActions[i] = createAction;
} }
...@@ -83,12 +84,17 @@ public class DynamicDistributorTest { ...@@ -83,12 +84,17 @@ public class DynamicDistributorTest {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
PortAction<DynamicDistributor<Integer>>[] inputActions = new PortAction[6]; PortAction<DynamicDistributor<Integer>>[] inputActions = new PortAction[6];
inputActions[0] = createPortCreateAction();
inputActions[1] = new RemovePortAction<Integer>(null); final PortContainer<Integer> portContainer0 = new PortContainer<Integer>();
inputActions[2] = createPortCreateAction(); final PortContainer<Integer> portContainer1 = new PortContainer<Integer>();
inputActions[3] = createPortCreateAction(); final PortContainer<Integer> portContainer2 = new PortContainer<Integer>();
inputActions[4] = new RemovePortAction<Integer>(null);
inputActions[5] = new RemovePortAction<Integer>(null); inputActions[0] = createPortCreateAction(portContainer0);
inputActions[1] = new RemovePortAction<Integer>(portContainer0);
inputActions[2] = createPortCreateAction(portContainer1);
inputActions[3] = createPortCreateAction(portContainer2);
inputActions[4] = new RemovePortAction<Integer>(portContainer1);
inputActions[5] = new RemovePortAction<Integer>(portContainer2);
DynamicDistributorTestConfig<Integer> config = new DynamicDistributorTestConfig<Integer>(inputNumbers, Arrays.asList(inputActions)); DynamicDistributorTestConfig<Integer> config = new DynamicDistributorTestConfig<Integer>(inputNumbers, Arrays.asList(inputActions));
Execution<DynamicDistributorTestConfig<Integer>> analysis = new Execution<DynamicDistributorTestConfig<Integer>>(config, Execution<DynamicDistributorTestConfig<Integer>> analysis = new Execution<DynamicDistributorTestConfig<Integer>>(config,
...@@ -102,9 +108,15 @@ public class DynamicDistributorTest { ...@@ -102,9 +108,15 @@ public class DynamicDistributorTest {
assertValuesForIndex(inputActions, Collections.<Integer> emptyList(), 3); assertValuesForIndex(inputActions, Collections.<Integer> emptyList(), 3);
} }
private PortAction<DynamicDistributor<Integer>> createPortCreateAction() { private CreatePortAction<Integer> createPortCreateAction(final PortContainer<Integer> portContainer) {
CollectorSink<Integer> newStage = new CollectorSink<Integer>(); CollectorSink<Integer> newStage = new CollectorSink<Integer>();
PortAction<DynamicDistributor<Integer>> portAction = new CreatePortAction<Integer>(newStage.getInputPort()); CreatePortAction<Integer> portAction = new CreatePortAction<Integer>(newStage.getInputPort());
portAction.addPortActionListener(new PortActionListener<Integer>() {
@Override
public void onOutputPortCreated(final DynamicDistributor<Integer> distributor, final DynamicOutputPort<Integer> port) {
portContainer.setPort(port);
}
});
return portAction; return portAction;
} }
... ...
......
package teetime.stage.basic.distributor.dynamic;
import teetime.framework.DynamicOutputPort;
/**
* Represents a container that eventually holds the output port that a {@link RemovePortAction} can use.
*
* @author Christian Wulf
*
* @param <T>
*/
class PortContainer<T> extends DynamicOutputPort<T> {
private DynamicOutputPort<T> port;
PortContainer() {
super(null, null, -1);
}
@Override
public int getIndex() {
return port.getIndex();
}
public void setPort(final DynamicOutputPort<T> port) {
this.port = port;
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment