diff --git a/src/main/java/teetime/framework/AbstractStage.java b/src/main/java/teetime/framework/AbstractStage.java index 0aa030eda5cb4dd67fe6058add464a978be1508a..27488c2d18ea3c274d9764e027d05c57c81d2d3c 100644 --- a/src/main/java/teetime/framework/AbstractStage.java +++ b/src/main/java/teetime/framework/AbstractStage.java @@ -15,6 +15,7 @@ */ package teetime.framework; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; import java.util.List; @@ -35,6 +36,8 @@ public abstract class AbstractStage extends Stage { private OutputPort<?>[] outputPorts = new OutputPort<?>[0]; private StageState currentState = StageState.CREATED; + private final Set<OutputPortRemovedListener> outputPortRemovedListeners = new HashSet<OutputPortRemovedListener>(); + @Override public InputPort<?>[] getInputPorts() { return inputPorts; @@ -311,8 +314,8 @@ public abstract class AbstractStage extends Stage { @Override protected void removeDynamicPort(final DynamicOutputPort<?> dynamicOutputPort) { int index = dynamicOutputPort.getIndex(); - List<OutputPort<?>> tempOutputPorts = Arrays.asList(outputPorts); - tempOutputPorts.remove(index); + List<OutputPort<?>> tempOutputPorts = new ArrayList<OutputPort<?>>(Arrays.asList(outputPorts)); + OutputPort<?> removedOutputPort = tempOutputPorts.remove(index); for (int i = index; i < tempOutputPorts.size(); i++) { OutputPort<?> outputPort = tempOutputPorts.get(i); if (outputPort instanceof DynamicOutputPort) { @@ -320,6 +323,18 @@ public abstract class AbstractStage extends Stage { } } outputPorts = tempOutputPorts.toArray(new OutputPort[0]); + + fireOutputPortRemoved(removedOutputPort); + } + + private void fireOutputPortRemoved(final OutputPort<?> removedOutputPort) { + for (OutputPortRemovedListener listener : outputPortRemovedListeners) { + listener.onOutputPortRemoved(this, removedOutputPort); + } + } + + protected void addOutputPortRemovedListener(final OutputPortRemovedListener outputPortRemovedListener) { + outputPortRemovedListeners.add(outputPortRemovedListener); } } diff --git a/src/main/java/teetime/framework/OutputPortRemovedListener.java b/src/main/java/teetime/framework/OutputPortRemovedListener.java new file mode 100644 index 0000000000000000000000000000000000000000..85a71b7ae561bc91fc07d8c5dc264b908d5d97bd --- /dev/null +++ b/src/main/java/teetime/framework/OutputPortRemovedListener.java @@ -0,0 +1,7 @@ +package teetime.framework; + +public interface OutputPortRemovedListener { + + void onOutputPortRemoved(Stage stage, OutputPort<?> removedOutputPort); + +} diff --git a/src/main/java/teetime/stage/basic/distributor/CloneStrategy.java b/src/main/java/teetime/stage/basic/distributor/CloneStrategy.java index 7baf292a0da0e72a2f5b92977a1efc1974840ea6..c41d4177f0510ddfb39a762eeb38885f2a85ed76 100644 --- a/src/main/java/teetime/stage/basic/distributor/CloneStrategy.java +++ b/src/main/java/teetime/stage/basic/distributor/CloneStrategy.java @@ -22,6 +22,7 @@ import java.util.Collection; import java.util.List; import teetime.framework.OutputPort; +import teetime.framework.Stage; /** * @author Nils Christian Ehmke @@ -111,4 +112,9 @@ public final class CloneStrategy implements IDistributorStrategy { return null; } + @Override + public void onOutputPortRemoved(final Stage stage, final OutputPort<?> removedOutputPort) { + // do nothing + } + } diff --git a/src/main/java/teetime/stage/basic/distributor/CopyByReferenceStrategy.java b/src/main/java/teetime/stage/basic/distributor/CopyByReferenceStrategy.java index 537aa22dc2ecc266f94001e818a646eb8b350b20..317a6eae0fbebe980227c64805e5d3f4e228f83e 100644 --- a/src/main/java/teetime/stage/basic/distributor/CopyByReferenceStrategy.java +++ b/src/main/java/teetime/stage/basic/distributor/CopyByReferenceStrategy.java @@ -16,6 +16,7 @@ package teetime.stage.basic.distributor; import teetime.framework.OutputPort; +import teetime.framework.Stage; /** * @author Nils Christian Ehmke @@ -33,4 +34,8 @@ public final class CopyByReferenceStrategy implements IDistributorStrategy { return true; } + @Override + public void onOutputPortRemoved(final Stage stage, final OutputPort<?> removedOutputPort) { + // do nothing + } } diff --git a/src/main/java/teetime/stage/basic/distributor/Distributor.java b/src/main/java/teetime/stage/basic/distributor/Distributor.java index 485fb876e81f17fb95ac99e6addf9a8c9e1b3f93..f780e247c0d54e4638ad69b0ccf80b155fb18301 100644 --- a/src/main/java/teetime/stage/basic/distributor/Distributor.java +++ b/src/main/java/teetime/stage/basic/distributor/Distributor.java @@ -36,6 +36,7 @@ public class Distributor<T> extends AbstractConsumerStage<T> { public Distributor(final IDistributorStrategy strategy) { this.strategy = strategy; + addOutputPortRemovedListener(strategy); } @SuppressWarnings("unchecked") @@ -45,7 +46,7 @@ public class Distributor<T> extends AbstractConsumerStage<T> { } public OutputPort<T> getNewOutputPort() { - return this.createOutputPort(); + return this.createDynamicOutputPort(); } public IDistributorStrategy getStrategy() { @@ -56,4 +57,9 @@ public class Distributor<T> extends AbstractConsumerStage<T> { this.strategy = strategy; } + @Override + public OutputPort<?>[] getOutputPorts() { + return super.getOutputPorts(); + } + } diff --git a/src/main/java/teetime/stage/basic/distributor/IDistributorStrategy.java b/src/main/java/teetime/stage/basic/distributor/IDistributorStrategy.java index 9a404d9dc34afbcb80e7c1401863fef129fe9bd2..4e42dc15d3910150c878dc25e451abc1a19f137e 100644 --- a/src/main/java/teetime/stage/basic/distributor/IDistributorStrategy.java +++ b/src/main/java/teetime/stage/basic/distributor/IDistributorStrategy.java @@ -16,13 +16,14 @@ package teetime.stage.basic.distributor; import teetime.framework.OutputPort; +import teetime.framework.OutputPortRemovedListener; /** * @author Nils Christian Ehmke * * @since 1.0 */ -public interface IDistributorStrategy { +public interface IDistributorStrategy extends OutputPortRemovedListener { public <T> boolean distribute(final OutputPort<T>[] allOutputPorts, final T element); diff --git a/src/main/java/teetime/stage/basic/distributor/RoundRobinStrategy.java b/src/main/java/teetime/stage/basic/distributor/RoundRobinStrategy.java index 3f7f22e77df2b7439cb463fe18c4b88204e1d714..de7df44939c81d24ff6698996dfd12f32368823a 100644 --- a/src/main/java/teetime/stage/basic/distributor/RoundRobinStrategy.java +++ b/src/main/java/teetime/stage/basic/distributor/RoundRobinStrategy.java @@ -16,6 +16,7 @@ package teetime.stage.basic.distributor; import teetime.framework.OutputPort; +import teetime.framework.Stage; /** * @author Nils Christian Ehmke @@ -43,4 +44,11 @@ public final class RoundRobinStrategy implements IDistributorStrategy { return outputPort; } + @Override + public void onOutputPortRemoved(final Stage stage, final OutputPort<?> removedOutputPort) { + Distributor<?> distributor = (Distributor<?>) stage; + // correct the index if it is out-of-bounds + this.index = this.index % distributor.getOutputPorts().length; + } + } diff --git a/src/main/java/teetime/stage/basic/distributor/RoundRobinStrategy2.java b/src/main/java/teetime/stage/basic/distributor/RoundRobinStrategy2.java index 829e9f7eb797d54d5070805449fe3e94ffd6643e..fb11cefcad5287f01e8e5a14976a5e2137302802 100644 --- a/src/main/java/teetime/stage/basic/distributor/RoundRobinStrategy2.java +++ b/src/main/java/teetime/stage/basic/distributor/RoundRobinStrategy2.java @@ -16,6 +16,7 @@ package teetime.stage.basic.distributor; import teetime.framework.OutputPort; +import teetime.framework.Stage; /** * @author Christian Wulf @@ -33,8 +34,9 @@ public final class RoundRobinStrategy2 implements IDistributorStrategy { int numLoops = numOutputPorts; boolean success; + OutputPort<T> outputPort; do { - final OutputPort<T> outputPort = getNextPortInRoundRobinOrder(outputPorts); + outputPort = getNextPortInRoundRobinOrder(outputPorts); success = outputPort.sendNonBlocking(element); if (0 == numLoops) { numWaits++; @@ -44,6 +46,8 @@ public final class RoundRobinStrategy2 implements IDistributorStrategy { numLoops--; } while (!success); + System.out.println("Sent " + element + " via " + outputPort); + return true; } @@ -69,4 +73,11 @@ public final class RoundRobinStrategy2 implements IDistributorStrategy { return numWaits; } + @Override + public void onOutputPortRemoved(final Stage stage, final OutputPort<?> removedOutputPort) { + Distributor<?> distributor = (Distributor<?>) stage; + // correct the index if it is out-of-bounds + this.index = this.index % distributor.getOutputPorts().length; + } + } diff --git a/src/main/java/teetime/stage/basic/distributor/dynamic/ControlledDynamicDistributor.java b/src/main/java/teetime/stage/basic/distributor/dynamic/ControlledDynamicDistributor.java index a61c394997c3af2b5ddf0a15875e67c93d57ab62..cd6ef44b3297a46585081acd70518783cd4b0e86 100644 --- a/src/main/java/teetime/stage/basic/distributor/dynamic/ControlledDynamicDistributor.java +++ b/src/main/java/teetime/stage/basic/distributor/dynamic/ControlledDynamicDistributor.java @@ -1,5 +1,6 @@ package teetime.stage.basic.distributor.dynamic; + class ControlledDynamicDistributor<T> extends DynamicDistributor<T> { @Override @@ -7,4 +8,9 @@ class ControlledDynamicDistributor<T> extends DynamicDistributor<T> { return portActions.take(); } + // @Override + // protected OutputPort<?>[] getOutputPorts() { // repeated declaration for testing purposes + // return super.getOutputPorts(); + // } + } diff --git a/src/main/java/teetime/stage/basic/distributor/dynamic/CreatePortAction.java b/src/main/java/teetime/stage/basic/distributor/dynamic/CreatePortAction.java index 1d91c76cd86df3a1c2d5add603d2e92581bf682b..7141ff95c0ce3c3973cf30dd2296b6d5094a9e1b 100644 --- a/src/main/java/teetime/stage/basic/distributor/dynamic/CreatePortAction.java +++ b/src/main/java/teetime/stage/basic/distributor/dynamic/CreatePortAction.java @@ -25,6 +25,7 @@ public class CreatePortAction<T> implements PortAction<T> { @Override public void execute(final DynamicDistributor<T> dynamicDistributor) { + System.out.println("Creating..."); OutputPort<? extends T> newOutputPort = dynamicDistributor.getNewOutputPort(); INTER_THREAD_PIPE_FACTORY.create(newOutputPort, inputPort); @@ -37,5 +38,6 @@ public class CreatePortAction<T> implements PortAction<T> { newOutputPort.sendSignal(new StartingSignal()); // FIXME pass the new thread to the analysis so that it can terminate the thread at the end + System.out.println("Created."); } } diff --git a/src/main/java/teetime/stage/basic/distributor/dynamic/DynamicDistributor.java b/src/main/java/teetime/stage/basic/distributor/dynamic/DynamicDistributor.java index d3f9c39e39e6601226d300781eb3bfb5ab9bcf20..0520d8880396be715100ea70f2487f242bfe82d7 100644 --- a/src/main/java/teetime/stage/basic/distributor/dynamic/DynamicDistributor.java +++ b/src/main/java/teetime/stage/basic/distributor/dynamic/DynamicDistributor.java @@ -40,7 +40,7 @@ public class DynamicDistributor<T> extends Distributor<T> { private void checkForPendingPortActionRequest() throws InterruptedException { PortAction<T> dynamicPortAction = getPortAction(); - if (null != dynamicPortAction) { + if (null != dynamicPortAction) { // check if getPortAction() uses polling dynamicPortAction.execute(this); } } diff --git a/src/main/java/teetime/stage/basic/distributor/dynamic/RemovePortAction.java b/src/main/java/teetime/stage/basic/distributor/dynamic/RemovePortAction.java new file mode 100644 index 0000000000000000000000000000000000000000..848445ff3219c04a80520244da9375a5523bac9c --- /dev/null +++ b/src/main/java/teetime/stage/basic/distributor/dynamic/RemovePortAction.java @@ -0,0 +1,34 @@ +package teetime.stage.basic.distributor.dynamic; + +import teetime.framework.DynamicOutputPort; +import teetime.framework.OutputPort; +import teetime.framework.signal.TerminatingSignal; + +public class RemovePortAction<T> implements PortAction<T> { + + private final DynamicOutputPort<T> outputPort; + + public RemovePortAction(final DynamicOutputPort<T> outputPort) { + super(); + this.outputPort = outputPort; + } + + // public DynamicOutputPort<T> getOutputPort() { + // return outputPort; + // } + + @Override + public void execute(final DynamicDistributor<T> dynamicDistributor) { + System.out.println("Removing..."); + if (dynamicDistributor instanceof ControlledDynamicDistributor) { + OutputPort<?>[] outputPorts = ((ControlledDynamicDistributor<?>) dynamicDistributor).getOutputPorts(); + OutputPort<?> outputPortToRemove = outputPorts[outputPorts.length - 1]; + // outputPortToRemove = outputPort; + + outputPortToRemove.sendSignal(new TerminatingSignal()); + + dynamicDistributor.removeDynamicPort((DynamicOutputPort<?>) outputPortToRemove); + } + System.out.println("Removed."); + } +} diff --git a/src/test/java/teetime/stage/basic/distributor/dynamic/ControlledDistributorTest.java b/src/test/java/teetime/stage/basic/distributor/dynamic/ControlledDistributorTest.java index b2db943f5cea02e17bbc0172a77c5f6f79adb9c3..74841da74264807720f73eeea2baa24bbaff79ef 100644 --- a/src/test/java/teetime/stage/basic/distributor/dynamic/ControlledDistributorTest.java +++ b/src/test/java/teetime/stage/basic/distributor/dynamic/ControlledDistributorTest.java @@ -14,13 +14,9 @@ import org.junit.Test; import teetime.framework.Analysis; import teetime.framework.AnalysisConfiguration; import teetime.framework.Stage; +import teetime.framework.exceptionHandling.TerminatingExceptionListenerFactory; import teetime.stage.CollectorSink; import teetime.stage.InitialElementProducer; -import teetime.stage.basic.distributor.dynamic.ControlledDynamicDistributor; -import teetime.stage.basic.distributor.dynamic.CreatePortAction; -import teetime.stage.basic.distributor.dynamic.DoNothingPortAction; -import teetime.stage.basic.distributor.dynamic.DynamicDistributor; -import teetime.stage.basic.distributor.dynamic.PortAction; public class ControlledDistributorTest { @@ -40,7 +36,8 @@ public class ControlledDistributorTest { List<PortAction<Integer>> inputActions = Arrays.asList(createAction, createAction, createAction, createAction, createAction); ControlledDistributorTestConfig<Integer> config = new ControlledDistributorTestConfig<Integer>(inputNumbers, inputActions); - Analysis<ControlledDistributorTestConfig<Integer>> analysis = new Analysis<ControlledDistributorTestConfig<Integer>>(config); + Analysis<ControlledDistributorTestConfig<Integer>> analysis = new Analysis<ControlledDistributorTestConfig<Integer>>(config, + new TerminatingExceptionListenerFactory()); analysis.executeBlocking(); @@ -48,34 +45,22 @@ public class ControlledDistributorTest { } @Test - public void shouldWorkWithActionTriggers() throws Exception { + public void shouldWorkWithCreateActionTriggers() throws Exception { List<Integer> inputNumbers = Arrays.asList(0, 1, 2, 3, 4); @SuppressWarnings("unchecked") PortAction<Integer>[] inputActions = new PortAction[5]; for (int i = 0; i < inputActions.length; i++) { - CollectorSink<Integer> newStage = new CollectorSink<Integer>(); - - // Runnable runnable = dynamicActuator.wrap(newStage); - // Thread thread = new Thread(runnable); - // thread.start(); - - PortAction<Integer> createAction = new CreatePortAction<Integer>(newStage.getInputPort()); + PortAction<Integer> createAction = createPortCreateAction(); inputActions[i] = createAction; } ControlledDistributorTestConfig<Integer> config = new ControlledDistributorTestConfig<Integer>(inputNumbers, Arrays.asList(inputActions)); - Analysis<ControlledDistributorTestConfig<Integer>> analysis = new Analysis<ControlledDistributorTestConfig<Integer>>(config); + Analysis<ControlledDistributorTestConfig<Integer>> analysis = new Analysis<ControlledDistributorTestConfig<Integer>>(config, + new TerminatingExceptionListenerFactory()); analysis.executeBlocking(); - for (PortAction<Integer> ia : inputActions) { - Stage stage = ((CreatePortAction<Integer>) ia).getInputPort().getOwningStage(); - @SuppressWarnings("unchecked") - CollectorSink<Integer> collectorSink = (CollectorSink<Integer>) stage; - System.out.println("collectorSink: " + collectorSink.getElements()); - } - assertThat(config.getOutputElements(), contains(0)); assertValuesForIndex(inputActions, Arrays.asList(1), 0); assertValuesForIndex(inputActions, Arrays.asList(2), 1); @@ -84,6 +69,37 @@ public class ControlledDistributorTest { assertValuesForIndex(inputActions, Collections.<Integer> emptyList(), 4); } + @Test + public void shouldWorkWithRemoveActionTriggers() throws Exception { + List<Integer> inputNumbers = Arrays.asList(0, 1, 2, 3, 4, 5); + + @SuppressWarnings("unchecked") + PortAction<Integer>[] inputActions = new PortAction[6]; + inputActions[0] = createPortCreateAction(); + inputActions[1] = new RemovePortAction<Integer>(null); + inputActions[2] = createPortCreateAction(); + inputActions[3] = createPortCreateAction(); + inputActions[4] = new RemovePortAction<Integer>(null); + inputActions[5] = new RemovePortAction<Integer>(null); + + ControlledDistributorTestConfig<Integer> config = new ControlledDistributorTestConfig<Integer>(inputNumbers, Arrays.asList(inputActions)); + Analysis<ControlledDistributorTestConfig<Integer>> analysis = new Analysis<ControlledDistributorTestConfig<Integer>>(config, + new TerminatingExceptionListenerFactory()); + + analysis.executeBlocking(); + + assertThat(config.getOutputElements(), contains(0, 1, 2, 4, 5)); + assertValuesForIndex(inputActions, Collections.<Integer> emptyList(), 0); + assertValuesForIndex(inputActions, Arrays.asList(3), 2); + assertValuesForIndex(inputActions, Collections.<Integer> emptyList(), 3); + } + + private PortAction<Integer> createPortCreateAction() { + CollectorSink<Integer> newStage = new CollectorSink<Integer>(); + PortAction<Integer> portAction = new CreatePortAction<Integer>(newStage.getInputPort()); + return portAction; + } + private void assertValuesForIndex(final PortAction<Integer>[] inputActions, final List<Integer> values, final int index) { PortAction<Integer> ia = inputActions[index]; @@ -99,16 +115,13 @@ public class ControlledDistributorTest { public ControlledDistributorTestConfig(final List<T> elements, final List<PortAction<T>> portActions) { InitialElementProducer<T> initialElementProducer = new InitialElementProducer<T>(elements); - // InitialElementProducer<PortAction<T>> initialActionProducer = new InitialElementProducer<PortAction<T>>(actions); DynamicDistributor<T> distributor = new ControlledDynamicDistributor<T>(); collectorSink = new CollectorSink<T>(); connectPorts(initialElementProducer.getOutputPort(), distributor.getInputPort()); - // connectPorts(initialActionProducer.getOutputPort(), controlledDistributor.getDynamicPortActionInputPort()); connectPorts(distributor.getNewOutputPort(), collectorSink.getInputPort()); addThreadableStage(initialElementProducer); - // addThreadableStage(initialActionProducer); // simulates the AdaptationThread addThreadableStage(distributor); addThreadableStage(collectorSink);