From a0d0750e9694496443a520ad20170424fa6079e5 Mon Sep 17 00:00:00 2001 From: Christian Wulf <chw@informatik.uni-kiel.de> Date: Wed, 24 Jun 2015 06:12:29 +0200 Subject: [PATCH] added DynamicMerger --- .../framework/AbstractInterThreadPipe.java | 10 +- .../java/teetime/framework/AbstractStage.java | 33 ++++- .../teetime/framework/DynamicActuator.java | 15 ++- .../teetime/framework/DynamicInputPort.java | 29 ++++ .../java/teetime/framework/InputPort.java | 2 +- .../framework/InputPortRemovedListener.java | 7 + src/main/java/teetime/framework/Stage.java | 2 + .../java/teetime/framework/pipe/SpScPipe.java | 2 +- .../dynamic/ControlledDynamicDistributor.java | 9 +- .../distributor/dynamic/CreatePortAction.java | 12 +- .../dynamic/DoNothingPortAction.java | 4 +- .../dynamic/DynamicDistributor.java | 55 +++----- .../basic/distributor/dynamic/PortAction.java | 8 -- .../distributor/dynamic/RemovePortAction.java | 6 +- .../teetime/stage/basic/merger/Merger.java | 25 ++-- .../dynamic/ControlledDynamicMerger.java | 15 +++ .../merger/dynamic/CreatePortAction.java | 28 ++++ .../merger/dynamic/DoNothingPortAction.java | 12 ++ .../basic/merger/dynamic/DynamicMerger.java | 37 ++++++ .../merger/dynamic/RemovePortAction.java | 30 +++++ .../queue/ObservableSpScArrayQueue.java | 2 +- .../concurrent/queue/PCBlockingQueue.java | 6 +- .../queue/putstrategy/PutStrategy.java | 2 +- .../queue/putstrategy/YieldPutStrategy.java | 2 +- .../takestrategy/SCParkTakeStrategy.java | 2 +- .../queue/takestrategy/TakeStrategy.java | 2 +- .../queue/takestrategy/YieldTakeStrategy.java | 2 +- .../util/{ => framework}/list/ArrayPool.java | 2 +- .../{ => framework}/list/CircularList.java | 2 +- .../list/CommittableQueue.java | 2 +- .../list/CommittableResizableArrayQueue.java | 2 +- .../{ => framework}/list/ListContainer.java | 2 +- .../list/ListContainerPool.java | 2 +- .../util/{ => framework}/list/ObjectPool.java | 2 +- .../list/ObjectPooledLinkedList.java | 2 +- .../util/framework/port/PortAction.java | 9 ++ .../util/framework/port/PortActionHelper.java | 45 +++++++ .../dynamic/ControlledDistributorTest.java | 33 ++--- .../stage/basic/merger/MergerTest.java | 6 +- .../merger/dynamic/ControlledMergerTest.java | 125 ++++++++++++++++++ .../teetime/stage/io/File2SeqOfWordsTest.java | 6 +- 41 files changed, 476 insertions(+), 123 deletions(-) create mode 100644 src/main/java/teetime/framework/DynamicInputPort.java create mode 100644 src/main/java/teetime/framework/InputPortRemovedListener.java delete mode 100644 src/main/java/teetime/stage/basic/distributor/dynamic/PortAction.java create mode 100644 src/main/java/teetime/stage/basic/merger/dynamic/ControlledDynamicMerger.java create mode 100644 src/main/java/teetime/stage/basic/merger/dynamic/CreatePortAction.java create mode 100644 src/main/java/teetime/stage/basic/merger/dynamic/DoNothingPortAction.java create mode 100644 src/main/java/teetime/stage/basic/merger/dynamic/DynamicMerger.java create mode 100644 src/main/java/teetime/stage/basic/merger/dynamic/RemovePortAction.java rename src/main/java/teetime/util/{ => framework}/concurrent/queue/ObservableSpScArrayQueue.java (96%) rename src/main/java/teetime/util/{ => framework}/concurrent/queue/PCBlockingQueue.java (95%) rename src/main/java/teetime/util/{ => framework}/concurrent/queue/putstrategy/PutStrategy.java (92%) rename src/main/java/teetime/util/{ => framework}/concurrent/queue/putstrategy/YieldPutStrategy.java (93%) rename src/main/java/teetime/util/{ => framework}/concurrent/queue/takestrategy/SCParkTakeStrategy.java (96%) rename src/main/java/teetime/util/{ => framework}/concurrent/queue/takestrategy/TakeStrategy.java (92%) rename src/main/java/teetime/util/{ => framework}/concurrent/queue/takestrategy/YieldTakeStrategy.java (94%) rename src/main/java/teetime/util/{ => framework}/list/ArrayPool.java (96%) rename src/main/java/teetime/util/{ => framework}/list/CircularList.java (97%) rename src/main/java/teetime/util/{ => framework}/list/CommittableQueue.java (96%) rename src/main/java/teetime/util/{ => framework}/list/CommittableResizableArrayQueue.java (99%) rename src/main/java/teetime/util/{ => framework}/list/ListContainer.java (95%) rename src/main/java/teetime/util/{ => framework}/list/ListContainerPool.java (97%) rename src/main/java/teetime/util/{ => framework}/list/ObjectPool.java (95%) rename src/main/java/teetime/util/{ => framework}/list/ObjectPooledLinkedList.java (97%) create mode 100644 src/main/java/teetime/util/framework/port/PortAction.java create mode 100644 src/main/java/teetime/util/framework/port/PortActionHelper.java create mode 100644 src/test/java/teetime/stage/basic/merger/dynamic/ControlledMergerTest.java diff --git a/src/main/java/teetime/framework/AbstractInterThreadPipe.java b/src/main/java/teetime/framework/AbstractInterThreadPipe.java index 9ed1d4c9..55058eed 100644 --- a/src/main/java/teetime/framework/AbstractInterThreadPipe.java +++ b/src/main/java/teetime/framework/AbstractInterThreadPipe.java @@ -26,11 +26,11 @@ import org.jctools.queues.spec.Preference; import teetime.framework.signal.ISignal; import teetime.framework.signal.InitializingSignal; import teetime.framework.signal.StartingSignal; -import teetime.util.concurrent.queue.PCBlockingQueue; -import teetime.util.concurrent.queue.putstrategy.PutStrategy; -import teetime.util.concurrent.queue.putstrategy.YieldPutStrategy; -import teetime.util.concurrent.queue.takestrategy.SCParkTakeStrategy; -import teetime.util.concurrent.queue.takestrategy.TakeStrategy; +import teetime.util.framework.concurrent.queue.PCBlockingQueue; +import teetime.util.framework.concurrent.queue.putstrategy.PutStrategy; +import teetime.util.framework.concurrent.queue.putstrategy.YieldPutStrategy; +import teetime.util.framework.concurrent.queue.takestrategy.SCParkTakeStrategy; +import teetime.util.framework.concurrent.queue.takestrategy.TakeStrategy; public abstract class AbstractInterThreadPipe extends AbstractPipe { diff --git a/src/main/java/teetime/framework/AbstractStage.java b/src/main/java/teetime/framework/AbstractStage.java index cf68ec0d..7cc7b101 100644 --- a/src/main/java/teetime/framework/AbstractStage.java +++ b/src/main/java/teetime/framework/AbstractStage.java @@ -37,6 +37,7 @@ public abstract class AbstractStage extends Stage { private StageState currentState = StageState.CREATED; private final Set<OutputPortRemovedListener> outputPortRemovedListeners = new HashSet<OutputPortRemovedListener>(); + private final Set<InputPortRemovedListener> inputPortsRemovedListeners = new HashSet<InputPortRemovedListener>(); @Override public InputPort<?>[] getInputPorts() { @@ -252,17 +253,43 @@ public abstract class AbstractStage extends Stage { } outputPorts = tempOutputPorts.toArray(new OutputPort[0]); - fireOutputPortRemoved(removedOutputPort); + firePortRemoved(removedOutputPort); } - private void fireOutputPortRemoved(final OutputPort<?> removedOutputPort) { + private void firePortRemoved(final OutputPort<?> removedOutputPort) { for (OutputPortRemovedListener listener : outputPortRemovedListeners) { listener.onOutputPortRemoved(this, removedOutputPort); } } - protected void addOutputPortRemovedListener(final OutputPortRemovedListener outputPortRemovedListener) { + protected final void addOutputPortRemovedListener(final OutputPortRemovedListener outputPortRemovedListener) { outputPortRemovedListeners.add(outputPortRemovedListener); } + @Override + protected void removeDynamicPort(final DynamicInputPort<?> dynamicInputPort) { + int index = dynamicInputPort.getIndex(); + List<InputPort<?>> tempInputPorts = new ArrayList<InputPort<?>>(Arrays.asList(inputPorts)); + InputPort<?> removedInputPort = tempInputPorts.remove(index); + for (int i = index; i < tempInputPorts.size(); i++) { + InputPort<?> inputPort = tempInputPorts.get(i); + if (inputPort instanceof DynamicInputPort) { + ((DynamicInputPort<?>) inputPort).setIndex(i); + } + } + inputPorts = tempInputPorts.toArray(new InputPort[0]); + + firePortRemoved(removedInputPort); + } + + private void firePortRemoved(final InputPort<?> removedInputPort) { + for (InputPortRemovedListener listener : inputPortsRemovedListeners) { + listener.onInputPortRemoved(this, removedInputPort); + } + } + + protected final void addInputPortRemovedListener(final InputPortRemovedListener outputPortRemovedListener) { + inputPortsRemovedListeners.add(outputPortRemovedListener); + } + } diff --git a/src/main/java/teetime/framework/DynamicActuator.java b/src/main/java/teetime/framework/DynamicActuator.java index c53abdd4..bd8e6dd3 100644 --- a/src/main/java/teetime/framework/DynamicActuator.java +++ b/src/main/java/teetime/framework/DynamicActuator.java @@ -2,7 +2,20 @@ package teetime.framework; public class DynamicActuator { + /** + * @deprecated Use {@link #startWithinNewThread(Stage)} instead. + */ + @Deprecated public Runnable wrap(final Stage stage) { - return new RunnableConsumerStage(stage); + if (stage.getInputPorts().length > 0) { + return new RunnableConsumerStage(stage); + } + return new RunnableProducerStage(stage); + } + + public void startWithinNewThread(final Stage stage) { + Runnable runnable = wrap(stage); + Thread thread = new Thread(runnable); + thread.start(); } } diff --git a/src/main/java/teetime/framework/DynamicInputPort.java b/src/main/java/teetime/framework/DynamicInputPort.java new file mode 100644 index 00000000..d8119afc --- /dev/null +++ b/src/main/java/teetime/framework/DynamicInputPort.java @@ -0,0 +1,29 @@ +package teetime.framework; + +/** + * + * @author Christian Wulf + * + * @param <T> + * the type of elements to be received + * + * @since 1.2 + */ +public final class DynamicInputPort<T> extends InputPort<T> { + + private int index; + + DynamicInputPort(final Class<T> type, final Stage owningStage, final int index) { + super(type, owningStage); + this.index = index; + } + + public int getIndex() { + return index; + } + + public void setIndex(final int index) { + this.index = index; + } + +} diff --git a/src/main/java/teetime/framework/InputPort.java b/src/main/java/teetime/framework/InputPort.java index d2ffc259..4ff0a7d8 100644 --- a/src/main/java/teetime/framework/InputPort.java +++ b/src/main/java/teetime/framework/InputPort.java @@ -24,7 +24,7 @@ package teetime.framework; * * @since 1.0 */ -public final class InputPort<T> extends AbstractPort<T> { +public class InputPort<T> extends AbstractPort<T> { InputPort(final Class<T> type, final Stage owningStage) { super(type, owningStage); diff --git a/src/main/java/teetime/framework/InputPortRemovedListener.java b/src/main/java/teetime/framework/InputPortRemovedListener.java new file mode 100644 index 00000000..2268facf --- /dev/null +++ b/src/main/java/teetime/framework/InputPortRemovedListener.java @@ -0,0 +1,7 @@ +package teetime.framework; + +public interface InputPortRemovedListener { + + void onInputPortRemoved(Stage stage, InputPort<?> removedInputPort); + +} diff --git a/src/main/java/teetime/framework/Stage.java b/src/main/java/teetime/framework/Stage.java index 5f6b0d41..1c9ce2e2 100644 --- a/src/main/java/teetime/framework/Stage.java +++ b/src/main/java/teetime/framework/Stage.java @@ -150,4 +150,6 @@ public abstract class Stage { protected abstract void removeDynamicPort(DynamicOutputPort<?> dynamicOutputPort); + protected abstract void removeDynamicPort(DynamicInputPort<?> dynamicInputPort); + } diff --git a/src/main/java/teetime/framework/pipe/SpScPipe.java b/src/main/java/teetime/framework/pipe/SpScPipe.java index d515bcd5..6450934d 100644 --- a/src/main/java/teetime/framework/pipe/SpScPipe.java +++ b/src/main/java/teetime/framework/pipe/SpScPipe.java @@ -19,7 +19,7 @@ import teetime.framework.AbstractInterThreadPipe; import teetime.framework.InputPort; import teetime.framework.OutputPort; import teetime.framework.StageState; -import teetime.util.concurrent.queue.ObservableSpScArrayQueue; +import teetime.util.framework.concurrent.queue.ObservableSpScArrayQueue; final class SpScPipe extends AbstractInterThreadPipe implements IMonitorablePipe { diff --git a/src/main/java/teetime/stage/basic/distributor/dynamic/ControlledDynamicDistributor.java b/src/main/java/teetime/stage/basic/distributor/dynamic/ControlledDynamicDistributor.java index cd6ef44b..4d079651 100644 --- a/src/main/java/teetime/stage/basic/distributor/dynamic/ControlledDynamicDistributor.java +++ b/src/main/java/teetime/stage/basic/distributor/dynamic/ControlledDynamicDistributor.java @@ -1,11 +1,16 @@ package teetime.stage.basic.distributor.dynamic; +import teetime.util.framework.port.PortActionHelper; class ControlledDynamicDistributor<T> extends DynamicDistributor<T> { @Override - protected PortAction<T> getPortAction() throws InterruptedException { - return portActions.take(); + protected void checkForPendingPortActionRequest() { + try { + PortActionHelper.checkBlockingForPendingPortActionRequest(this, portActions); + } catch (InterruptedException e) { + throw new IllegalStateException(e); + } } // @Override diff --git a/src/main/java/teetime/stage/basic/distributor/dynamic/CreatePortAction.java b/src/main/java/teetime/stage/basic/distributor/dynamic/CreatePortAction.java index 5c1cf8fa..f1c15653 100644 --- a/src/main/java/teetime/stage/basic/distributor/dynamic/CreatePortAction.java +++ b/src/main/java/teetime/stage/basic/distributor/dynamic/CreatePortAction.java @@ -6,8 +6,9 @@ import teetime.framework.OutputPort; import teetime.framework.pipe.SpScPipeFactory; import teetime.framework.signal.InitializingSignal; import teetime.framework.signal.StartingSignal; +import teetime.util.framework.port.PortAction; -public class CreatePortAction<T> implements PortAction<T> { +public class CreatePortAction<T> implements PortAction<DynamicDistributor<T>> { private static final SpScPipeFactory INTER_THREAD_PIPE_FACTORY = new SpScPipeFactory(); private static final DynamicActuator DYNAMIC_ACTUATOR = new DynamicActuator(); @@ -15,7 +16,6 @@ public class CreatePortAction<T> implements PortAction<T> { private final InputPort<T> inputPort; public CreatePortAction(final InputPort<T> inputPort) { - super(); this.inputPort = inputPort; } @@ -23,11 +23,13 @@ public class CreatePortAction<T> implements PortAction<T> { public void execute(final DynamicDistributor<T> dynamicDistributor) { OutputPort<? extends T> newOutputPort = dynamicDistributor.getNewOutputPort(); + onOutputPortCreated(newOutputPort); + } + + private void onOutputPortCreated(final OutputPort<? extends T> newOutputPort) { INTER_THREAD_PIPE_FACTORY.create(newOutputPort, inputPort); - Runnable runnable = DYNAMIC_ACTUATOR.wrap(inputPort.getOwningStage()); - Thread thread = new Thread(runnable); - thread.start(); + DYNAMIC_ACTUATOR.startWithinNewThread(inputPort.getOwningStage()); newOutputPort.sendSignal(new InitializingSignal()); newOutputPort.sendSignal(new StartingSignal()); diff --git a/src/main/java/teetime/stage/basic/distributor/dynamic/DoNothingPortAction.java b/src/main/java/teetime/stage/basic/distributor/dynamic/DoNothingPortAction.java index bb4779a4..e7e6a321 100644 --- a/src/main/java/teetime/stage/basic/distributor/dynamic/DoNothingPortAction.java +++ b/src/main/java/teetime/stage/basic/distributor/dynamic/DoNothingPortAction.java @@ -1,6 +1,8 @@ package teetime.stage.basic.distributor.dynamic; -public class DoNothingPortAction<T> implements PortAction<T> { +import teetime.util.framework.port.PortAction; + +public class DoNothingPortAction<T> implements PortAction<DynamicDistributor<T>> { @Override public void execute(final DynamicDistributor<T> dynamicDistributor) { diff --git a/src/main/java/teetime/stage/basic/distributor/dynamic/DynamicDistributor.java b/src/main/java/teetime/stage/basic/distributor/dynamic/DynamicDistributor.java index 0520d888..ab49def6 100644 --- a/src/main/java/teetime/stage/basic/distributor/dynamic/DynamicDistributor.java +++ b/src/main/java/teetime/stage/basic/distributor/dynamic/DynamicDistributor.java @@ -1,60 +1,47 @@ package teetime.stage.basic.distributor.dynamic; -import java.util.Queue; - -import org.jctools.queues.QueueFactory; -import org.jctools.queues.spec.ConcurrentQueueSpec; -import org.jctools.queues.spec.Ordering; -import org.jctools.queues.spec.Preference; +import java.util.concurrent.BlockingQueue; import teetime.framework.DynamicOutputPort; +import teetime.framework.OutputPort; +import teetime.framework.OutputPortRemovedListener; +import teetime.framework.Stage; +import teetime.framework.signal.TerminatingSignal; import teetime.stage.basic.distributor.Distributor; -import teetime.util.concurrent.queue.PCBlockingQueue; -import teetime.util.concurrent.queue.putstrategy.PutStrategy; -import teetime.util.concurrent.queue.putstrategy.YieldPutStrategy; -import teetime.util.concurrent.queue.takestrategy.SCParkTakeStrategy; -import teetime.util.concurrent.queue.takestrategy.TakeStrategy; +import teetime.util.framework.port.PortAction; +import teetime.util.framework.port.PortActionHelper; -public class DynamicDistributor<T> extends Distributor<T> { +public class DynamicDistributor<T> extends Distributor<T> implements OutputPortRemovedListener { - protected final PCBlockingQueue<PortAction<T>> portActions; + protected final BlockingQueue<PortAction<DynamicDistributor<T>>> portActions; public DynamicDistributor() { - final Queue<PortAction<T>> localQueue = QueueFactory.newQueue(new ConcurrentQueueSpec(1, 1, 0, Ordering.FIFO, Preference.THROUGHPUT)); - final PutStrategy<PortAction<T>> putStrategy = new YieldPutStrategy<PortAction<T>>(); - final TakeStrategy<PortAction<T>> takeStrategy = new SCParkTakeStrategy<PortAction<T>>(); - portActions = new PCBlockingQueue<PortAction<T>>(localQueue, putStrategy, takeStrategy); + portActions = PortActionHelper.createPortActionQueue(); + addOutputPortRemovedListener(this); } @Override protected void execute(final T element) { - try { - checkForPendingPortActionRequest(); - } catch (InterruptedException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } + checkForPendingPortActionRequest(); super.execute(element); } - private void checkForPendingPortActionRequest() throws InterruptedException { - PortAction<T> dynamicPortAction = getPortAction(); - if (null != dynamicPortAction) { // check if getPortAction() uses polling - dynamicPortAction.execute(this); - } - } - - protected PortAction<T> getPortAction() throws InterruptedException { - return portActions.poll(); + protected void checkForPendingPortActionRequest() { + PortActionHelper.checkForPendingPortActionRequest(this, portActions); } @Override - public void removeDynamicPort(final DynamicOutputPort<?> dynamicOutputPort) { + public void removeDynamicPort(final DynamicOutputPort<?> dynamicOutputPort) { // make public super.removeDynamicPort(dynamicOutputPort); } - public boolean addPortActionRequest(final PortAction<T> newPortActionRequest) { + public boolean addPortActionRequest(final PortAction<DynamicDistributor<T>> newPortActionRequest) { return portActions.offer(newPortActionRequest); } + + @Override + public void onOutputPortRemoved(final Stage stage, final OutputPort<?> removedOutputPort) { + removedOutputPort.sendSignal(new TerminatingSignal()); + } } diff --git a/src/main/java/teetime/stage/basic/distributor/dynamic/PortAction.java b/src/main/java/teetime/stage/basic/distributor/dynamic/PortAction.java deleted file mode 100644 index 885b26f1..00000000 --- a/src/main/java/teetime/stage/basic/distributor/dynamic/PortAction.java +++ /dev/null @@ -1,8 +0,0 @@ -package teetime.stage.basic.distributor.dynamic; - - -public interface PortAction<T> { - - public abstract void execute(final DynamicDistributor<T> dynamicDistributor); - -} diff --git a/src/main/java/teetime/stage/basic/distributor/dynamic/RemovePortAction.java b/src/main/java/teetime/stage/basic/distributor/dynamic/RemovePortAction.java index 46d7f09f..345f5c59 100644 --- a/src/main/java/teetime/stage/basic/distributor/dynamic/RemovePortAction.java +++ b/src/main/java/teetime/stage/basic/distributor/dynamic/RemovePortAction.java @@ -2,9 +2,9 @@ package teetime.stage.basic.distributor.dynamic; import teetime.framework.DynamicOutputPort; import teetime.framework.OutputPort; -import teetime.framework.signal.TerminatingSignal; +import teetime.util.framework.port.PortAction; -public class RemovePortAction<T> implements PortAction<T> { +public class RemovePortAction<T> implements PortAction<DynamicDistributor<T>> { private final DynamicOutputPort<T> outputPort; @@ -25,8 +25,6 @@ public class RemovePortAction<T> implements PortAction<T> { outputPortToRemove = outputPort; } - outputPortToRemove.sendSignal(new TerminatingSignal()); - dynamicDistributor.removeDynamicPort((DynamicOutputPort<?>) outputPortToRemove); } } diff --git a/src/main/java/teetime/stage/basic/merger/Merger.java b/src/main/java/teetime/stage/basic/merger/Merger.java index 3a4672ca..20eb73bf 100644 --- a/src/main/java/teetime/stage/basic/merger/Merger.java +++ b/src/main/java/teetime/stage/basic/merger/Merger.java @@ -37,18 +37,19 @@ import teetime.framework.signal.ISignal; * @param <T> * the type of both the input and output ports */ -public final class Merger<T> extends AbstractStage { +public class Merger<T> extends AbstractStage { private final OutputPort<T> outputPort = this.createOutputPort(); - private final Map<Class<? extends ISignal>, Set<InputPort<?>>> signalMap = new HashMap<Class<? extends ISignal>, Set<InputPort<?>>>(); - private IMergerStrategy strategy; + private final IMergerStrategy strategy; + private final Map<Class<? extends ISignal>, Set<InputPort<?>>> signalMap; public Merger() { this(new RoundRobinStrategy()); } public Merger(final IMergerStrategy strategy) { + this.signalMap = new HashMap<Class<? extends ISignal>, Set<InputPort<?>>>(); this.strategy = strategy; } @@ -80,19 +81,19 @@ public final class Merger<T> extends AbstractStage { Class<? extends ISignal> signalClass = signal.getClass(); - Set<InputPort<?>> inputPorts; + Set<InputPort<?>> signalReceivedInputPorts; if (signalMap.containsKey(signalClass)) { - inputPorts = signalMap.get(signalClass); + signalReceivedInputPorts = signalMap.get(signalClass); } else { - inputPorts = new HashSet<InputPort<?>>(); - signalMap.put(signalClass, inputPorts); + signalReceivedInputPorts = new HashSet<InputPort<?>>(); + signalMap.put(signalClass, signalReceivedInputPorts); } - if (!inputPorts.add(inputPort)) { + if (!signalReceivedInputPorts.add(inputPort)) { this.logger.warn("Received more than one signal - " + signal + " - from input port: " + inputPort); } - if (signal.mayBeTriggered(inputPorts, getInputPorts())) { + if (signal.mayBeTriggered(signalReceivedInputPorts, getInputPorts())) { super.onSignal(signal, inputPort); } } @@ -101,12 +102,8 @@ public final class Merger<T> extends AbstractStage { return this.strategy; } - public void setStrategy(final IMergerStrategy strategy) { - this.strategy = strategy; - } - @Override - public InputPort<?>[] getInputPorts() { + public InputPort<?>[] getInputPorts() { // make public return super.getInputPorts(); } diff --git a/src/main/java/teetime/stage/basic/merger/dynamic/ControlledDynamicMerger.java b/src/main/java/teetime/stage/basic/merger/dynamic/ControlledDynamicMerger.java new file mode 100644 index 00000000..6a905b64 --- /dev/null +++ b/src/main/java/teetime/stage/basic/merger/dynamic/ControlledDynamicMerger.java @@ -0,0 +1,15 @@ +package teetime.stage.basic.merger.dynamic; + +import teetime.util.framework.port.PortActionHelper; + +public class ControlledDynamicMerger<T> extends DynamicMerger<T> { + + @Override + protected void checkForPendingPortActionRequest() { + try { + PortActionHelper.checkBlockingForPendingPortActionRequest(this, portActions); + } catch (InterruptedException e) { + throw new IllegalStateException(e); + } + } +} diff --git a/src/main/java/teetime/stage/basic/merger/dynamic/CreatePortAction.java b/src/main/java/teetime/stage/basic/merger/dynamic/CreatePortAction.java new file mode 100644 index 00000000..0356cc55 --- /dev/null +++ b/src/main/java/teetime/stage/basic/merger/dynamic/CreatePortAction.java @@ -0,0 +1,28 @@ +package teetime.stage.basic.merger.dynamic; + +import teetime.framework.InputPort; +import teetime.framework.OutputPort; +import teetime.framework.pipe.SpScPipeFactory; +import teetime.util.framework.port.PortAction; + +public class CreatePortAction<T> implements PortAction<DynamicMerger<T>> { + + private static final SpScPipeFactory INTER_THREAD_PIPE_FACTORY = new SpScPipeFactory(); + + private final OutputPort<T> outputPort; + + public CreatePortAction(final OutputPort<T> outputPort) { + this.outputPort = outputPort; + } + + @Override + public void execute(final DynamicMerger<T> dynamicDistributor) { + InputPort<T> newInputPort = dynamicDistributor.getNewInputPort(); + + onInputPortCreated(newInputPort); + } + + private void onInputPortCreated(final InputPort<T> newInputPort) { + INTER_THREAD_PIPE_FACTORY.create(outputPort, newInputPort); + } +} diff --git a/src/main/java/teetime/stage/basic/merger/dynamic/DoNothingPortAction.java b/src/main/java/teetime/stage/basic/merger/dynamic/DoNothingPortAction.java new file mode 100644 index 00000000..519bb033 --- /dev/null +++ b/src/main/java/teetime/stage/basic/merger/dynamic/DoNothingPortAction.java @@ -0,0 +1,12 @@ +package teetime.stage.basic.merger.dynamic; + +import teetime.util.framework.port.PortAction; + +public class DoNothingPortAction<T> implements PortAction<DynamicMerger<T>> { + + @Override + public void execute(final DynamicMerger<T> dynamicDistributor) { + // do nothing for testing purpose + } + +} diff --git a/src/main/java/teetime/stage/basic/merger/dynamic/DynamicMerger.java b/src/main/java/teetime/stage/basic/merger/dynamic/DynamicMerger.java new file mode 100644 index 00000000..c4de167d --- /dev/null +++ b/src/main/java/teetime/stage/basic/merger/dynamic/DynamicMerger.java @@ -0,0 +1,37 @@ +package teetime.stage.basic.merger.dynamic; + +import java.util.concurrent.BlockingQueue; + +import teetime.framework.DynamicInputPort; +import teetime.stage.basic.merger.Merger; +import teetime.util.framework.port.PortAction; +import teetime.util.framework.port.PortActionHelper; + +public class DynamicMerger<T> extends Merger<T> { + + protected final BlockingQueue<PortAction<DynamicMerger<T>>> portActions; + + public DynamicMerger() { + portActions = PortActionHelper.createPortActionQueue(); + } + + @Override + public void executeStage() { + checkForPendingPortActionRequest(); + + super.executeStage(); + } + + protected void checkForPendingPortActionRequest() { + PortActionHelper.checkForPendingPortActionRequest(this, portActions); + } + + @Override + public void removeDynamicPort(final DynamicInputPort<?> dynamicInputPort) { // make public + super.removeDynamicPort(dynamicInputPort); + } + + public boolean addPortActionRequest(final PortAction<DynamicMerger<T>> newPortActionRequest) { + return portActions.offer(newPortActionRequest); + } +} diff --git a/src/main/java/teetime/stage/basic/merger/dynamic/RemovePortAction.java b/src/main/java/teetime/stage/basic/merger/dynamic/RemovePortAction.java new file mode 100644 index 00000000..6abcebde --- /dev/null +++ b/src/main/java/teetime/stage/basic/merger/dynamic/RemovePortAction.java @@ -0,0 +1,30 @@ +package teetime.stage.basic.merger.dynamic; + +import teetime.framework.DynamicInputPort; +import teetime.framework.InputPort; +import teetime.util.framework.port.PortAction; + +public class RemovePortAction<T> implements PortAction<DynamicMerger<T>> { + + private final DynamicInputPort<T> inputPort; + + public RemovePortAction(final DynamicInputPort<T> inputPort) { + super(); + this.inputPort = inputPort; + } + + @Override + public void execute(final DynamicMerger<T> dynamicMerger) { + InputPort<?> inputPortsToRemove; + + if (dynamicMerger instanceof ControlledDynamicMerger) { + // for testing purposes only + InputPort<?>[] inputPorts = ((ControlledDynamicMerger<?>) dynamicMerger).getInputPorts(); + inputPortsToRemove = inputPorts[inputPorts.length - 1]; + } else { + inputPortsToRemove = inputPort; + } + + dynamicMerger.removeDynamicPort((DynamicInputPort<?>) inputPortsToRemove); + } +} diff --git a/src/main/java/teetime/util/concurrent/queue/ObservableSpScArrayQueue.java b/src/main/java/teetime/util/framework/concurrent/queue/ObservableSpScArrayQueue.java similarity index 96% rename from src/main/java/teetime/util/concurrent/queue/ObservableSpScArrayQueue.java rename to src/main/java/teetime/util/framework/concurrent/queue/ObservableSpScArrayQueue.java index a1d253a4..cba858e3 100644 --- a/src/main/java/teetime/util/concurrent/queue/ObservableSpScArrayQueue.java +++ b/src/main/java/teetime/util/framework/concurrent/queue/ObservableSpScArrayQueue.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package teetime.util.concurrent.queue; +package teetime.util.framework.concurrent.queue; import org.jctools.queues.SpscArrayQueue; diff --git a/src/main/java/teetime/util/concurrent/queue/PCBlockingQueue.java b/src/main/java/teetime/util/framework/concurrent/queue/PCBlockingQueue.java similarity index 95% rename from src/main/java/teetime/util/concurrent/queue/PCBlockingQueue.java rename to src/main/java/teetime/util/framework/concurrent/queue/PCBlockingQueue.java index adf2678d..8289abb1 100644 --- a/src/main/java/teetime/util/concurrent/queue/PCBlockingQueue.java +++ b/src/main/java/teetime/util/framework/concurrent/queue/PCBlockingQueue.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package teetime.util.concurrent.queue; +package teetime.util.framework.concurrent.queue; import java.util.Collection; import java.util.Iterator; @@ -21,8 +21,8 @@ import java.util.Queue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; -import teetime.util.concurrent.queue.putstrategy.PutStrategy; -import teetime.util.concurrent.queue.takestrategy.TakeStrategy; +import teetime.util.framework.concurrent.queue.putstrategy.PutStrategy; +import teetime.util.framework.concurrent.queue.takestrategy.TakeStrategy; public final class PCBlockingQueue<E> implements BlockingQueue<E> { diff --git a/src/main/java/teetime/util/concurrent/queue/putstrategy/PutStrategy.java b/src/main/java/teetime/util/framework/concurrent/queue/putstrategy/PutStrategy.java similarity index 92% rename from src/main/java/teetime/util/concurrent/queue/putstrategy/PutStrategy.java rename to src/main/java/teetime/util/framework/concurrent/queue/putstrategy/PutStrategy.java index 90043b7b..d8e3158e 100644 --- a/src/main/java/teetime/util/concurrent/queue/putstrategy/PutStrategy.java +++ b/src/main/java/teetime/util/framework/concurrent/queue/putstrategy/PutStrategy.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package teetime.util.concurrent.queue.putstrategy; +package teetime.util.framework.concurrent.queue.putstrategy; import java.util.Queue; diff --git a/src/main/java/teetime/util/concurrent/queue/putstrategy/YieldPutStrategy.java b/src/main/java/teetime/util/framework/concurrent/queue/putstrategy/YieldPutStrategy.java similarity index 93% rename from src/main/java/teetime/util/concurrent/queue/putstrategy/YieldPutStrategy.java rename to src/main/java/teetime/util/framework/concurrent/queue/putstrategy/YieldPutStrategy.java index 4f3da0be..dfdc60d0 100644 --- a/src/main/java/teetime/util/concurrent/queue/putstrategy/YieldPutStrategy.java +++ b/src/main/java/teetime/util/framework/concurrent/queue/putstrategy/YieldPutStrategy.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package teetime.util.concurrent.queue.putstrategy; +package teetime.util.framework.concurrent.queue.putstrategy; import java.util.Queue; diff --git a/src/main/java/teetime/util/concurrent/queue/takestrategy/SCParkTakeStrategy.java b/src/main/java/teetime/util/framework/concurrent/queue/takestrategy/SCParkTakeStrategy.java similarity index 96% rename from src/main/java/teetime/util/concurrent/queue/takestrategy/SCParkTakeStrategy.java rename to src/main/java/teetime/util/framework/concurrent/queue/takestrategy/SCParkTakeStrategy.java index ad814812..61122694 100644 --- a/src/main/java/teetime/util/concurrent/queue/takestrategy/SCParkTakeStrategy.java +++ b/src/main/java/teetime/util/framework/concurrent/queue/takestrategy/SCParkTakeStrategy.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package teetime.util.concurrent.queue.takestrategy; +package teetime.util.framework.concurrent.queue.takestrategy; import java.util.Queue; import java.util.concurrent.atomic.AtomicReference; diff --git a/src/main/java/teetime/util/concurrent/queue/takestrategy/TakeStrategy.java b/src/main/java/teetime/util/framework/concurrent/queue/takestrategy/TakeStrategy.java similarity index 92% rename from src/main/java/teetime/util/concurrent/queue/takestrategy/TakeStrategy.java rename to src/main/java/teetime/util/framework/concurrent/queue/takestrategy/TakeStrategy.java index d1fb3b9d..69dad73d 100644 --- a/src/main/java/teetime/util/concurrent/queue/takestrategy/TakeStrategy.java +++ b/src/main/java/teetime/util/framework/concurrent/queue/takestrategy/TakeStrategy.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package teetime.util.concurrent.queue.takestrategy; +package teetime.util.framework.concurrent.queue.takestrategy; import java.util.Queue; diff --git a/src/main/java/teetime/util/concurrent/queue/takestrategy/YieldTakeStrategy.java b/src/main/java/teetime/util/framework/concurrent/queue/takestrategy/YieldTakeStrategy.java similarity index 94% rename from src/main/java/teetime/util/concurrent/queue/takestrategy/YieldTakeStrategy.java rename to src/main/java/teetime/util/framework/concurrent/queue/takestrategy/YieldTakeStrategy.java index 9cd971f2..bfc4abdf 100644 --- a/src/main/java/teetime/util/concurrent/queue/takestrategy/YieldTakeStrategy.java +++ b/src/main/java/teetime/util/framework/concurrent/queue/takestrategy/YieldTakeStrategy.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package teetime.util.concurrent.queue.takestrategy; +package teetime.util.framework.concurrent.queue.takestrategy; import java.util.Queue; diff --git a/src/main/java/teetime/util/list/ArrayPool.java b/src/main/java/teetime/util/framework/list/ArrayPool.java similarity index 96% rename from src/main/java/teetime/util/list/ArrayPool.java rename to src/main/java/teetime/util/framework/list/ArrayPool.java index 40c23cc5..95056fea 100644 --- a/src/main/java/teetime/util/list/ArrayPool.java +++ b/src/main/java/teetime/util/framework/list/ArrayPool.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package teetime.util.list; +package teetime.util.framework.list; import java.util.HashMap; import java.util.Map; diff --git a/src/main/java/teetime/util/list/CircularList.java b/src/main/java/teetime/util/framework/list/CircularList.java similarity index 97% rename from src/main/java/teetime/util/list/CircularList.java rename to src/main/java/teetime/util/framework/list/CircularList.java index bded6dd1..1037fbb7 100644 --- a/src/main/java/teetime/util/list/CircularList.java +++ b/src/main/java/teetime/util/framework/list/CircularList.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package teetime.util.list; +package teetime.util.framework.list; public final class CircularList<T> { diff --git a/src/main/java/teetime/util/list/CommittableQueue.java b/src/main/java/teetime/util/framework/list/CommittableQueue.java similarity index 96% rename from src/main/java/teetime/util/list/CommittableQueue.java rename to src/main/java/teetime/util/framework/list/CommittableQueue.java index 8c31a75f..cb5fb964 100644 --- a/src/main/java/teetime/util/list/CommittableQueue.java +++ b/src/main/java/teetime/util/framework/list/CommittableQueue.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package teetime.util.list; +package teetime.util.framework.list; public interface CommittableQueue<T> { diff --git a/src/main/java/teetime/util/list/CommittableResizableArrayQueue.java b/src/main/java/teetime/util/framework/list/CommittableResizableArrayQueue.java similarity index 99% rename from src/main/java/teetime/util/list/CommittableResizableArrayQueue.java rename to src/main/java/teetime/util/framework/list/CommittableResizableArrayQueue.java index 709d22d7..cc595755 100644 --- a/src/main/java/teetime/util/list/CommittableResizableArrayQueue.java +++ b/src/main/java/teetime/util/framework/list/CommittableResizableArrayQueue.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package teetime.util.list; +package teetime.util.framework.list; public final class CommittableResizableArrayQueue<T> implements CommittableQueue<T> { diff --git a/src/main/java/teetime/util/list/ListContainer.java b/src/main/java/teetime/util/framework/list/ListContainer.java similarity index 95% rename from src/main/java/teetime/util/list/ListContainer.java rename to src/main/java/teetime/util/framework/list/ListContainer.java index dd6d66a2..9281474f 100644 --- a/src/main/java/teetime/util/list/ListContainer.java +++ b/src/main/java/teetime/util/framework/list/ListContainer.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package teetime.util.list; +package teetime.util.framework.list; public final class ListContainer<T> { diff --git a/src/main/java/teetime/util/list/ListContainerPool.java b/src/main/java/teetime/util/framework/list/ListContainerPool.java similarity index 97% rename from src/main/java/teetime/util/list/ListContainerPool.java rename to src/main/java/teetime/util/framework/list/ListContainerPool.java index 8d1d7f94..e8f4f833 100644 --- a/src/main/java/teetime/util/list/ListContainerPool.java +++ b/src/main/java/teetime/util/framework/list/ListContainerPool.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package teetime.util.list; +package teetime.util.framework.list; import java.util.ArrayList; import java.util.List; diff --git a/src/main/java/teetime/util/list/ObjectPool.java b/src/main/java/teetime/util/framework/list/ObjectPool.java similarity index 95% rename from src/main/java/teetime/util/list/ObjectPool.java rename to src/main/java/teetime/util/framework/list/ObjectPool.java index 490dfcf7..a787f6a3 100644 --- a/src/main/java/teetime/util/list/ObjectPool.java +++ b/src/main/java/teetime/util/framework/list/ObjectPool.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package teetime.util.list; +package teetime.util.framework.list; public interface ObjectPool<T> { diff --git a/src/main/java/teetime/util/list/ObjectPooledLinkedList.java b/src/main/java/teetime/util/framework/list/ObjectPooledLinkedList.java similarity index 97% rename from src/main/java/teetime/util/list/ObjectPooledLinkedList.java rename to src/main/java/teetime/util/framework/list/ObjectPooledLinkedList.java index 12b17638..307061e9 100644 --- a/src/main/java/teetime/util/list/ObjectPooledLinkedList.java +++ b/src/main/java/teetime/util/framework/list/ObjectPooledLinkedList.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package teetime.util.list; +package teetime.util.framework.list; public final class ObjectPooledLinkedList<T> { diff --git a/src/main/java/teetime/util/framework/port/PortAction.java b/src/main/java/teetime/util/framework/port/PortAction.java new file mode 100644 index 00000000..09f24bd7 --- /dev/null +++ b/src/main/java/teetime/util/framework/port/PortAction.java @@ -0,0 +1,9 @@ +package teetime.util.framework.port; + +import teetime.framework.Stage; + +public interface PortAction<T extends Stage> { + + public abstract void execute(final T stage); + +} diff --git a/src/main/java/teetime/util/framework/port/PortActionHelper.java b/src/main/java/teetime/util/framework/port/PortActionHelper.java new file mode 100644 index 00000000..ac7c2e23 --- /dev/null +++ b/src/main/java/teetime/util/framework/port/PortActionHelper.java @@ -0,0 +1,45 @@ +package teetime.util.framework.port; + +import java.util.Queue; +import java.util.concurrent.BlockingQueue; + +import org.jctools.queues.QueueFactory; +import org.jctools.queues.spec.ConcurrentQueueSpec; +import org.jctools.queues.spec.Ordering; +import org.jctools.queues.spec.Preference; + +import teetime.framework.Stage; +import teetime.util.framework.concurrent.queue.PCBlockingQueue; +import teetime.util.framework.concurrent.queue.putstrategy.PutStrategy; +import teetime.util.framework.concurrent.queue.putstrategy.YieldPutStrategy; +import teetime.util.framework.concurrent.queue.takestrategy.SCParkTakeStrategy; +import teetime.util.framework.concurrent.queue.takestrategy.TakeStrategy; + +public final class PortActionHelper { + + private PortActionHelper() { + // utility class + } + + public static <T> BlockingQueue<T> createPortActionQueue() { + final Queue<T> localQueue = QueueFactory.newQueue(new ConcurrentQueueSpec(1, 1, 0, Ordering.FIFO, Preference.THROUGHPUT)); + final PutStrategy<T> putStrategy = new YieldPutStrategy<T>(); + final TakeStrategy<T> takeStrategy = new SCParkTakeStrategy<T>(); + PCBlockingQueue<T> portActions = new PCBlockingQueue<T>(localQueue, putStrategy, takeStrategy); + return portActions; + } + + public static <T extends Stage> void checkForPendingPortActionRequest(final T stage, final BlockingQueue<PortAction<T>> portActions) { + PortAction<T> dynamicPortAction = portActions.poll(); + if (null != dynamicPortAction) { // check if getPortAction() uses polling + dynamicPortAction.execute(stage); + } + } + + public static <T extends Stage> void checkBlockingForPendingPortActionRequest(final T stage, final BlockingQueue<PortAction<T>> portActions) + throws InterruptedException { + PortAction<T> dynamicPortAction = portActions.take(); + dynamicPortAction.execute(stage); + } + +} diff --git a/src/test/java/teetime/stage/basic/distributor/dynamic/ControlledDistributorTest.java b/src/test/java/teetime/stage/basic/distributor/dynamic/ControlledDistributorTest.java index d988259d..cfc63a0f 100644 --- a/src/test/java/teetime/stage/basic/distributor/dynamic/ControlledDistributorTest.java +++ b/src/test/java/teetime/stage/basic/distributor/dynamic/ControlledDistributorTest.java @@ -1,14 +1,13 @@ package teetime.stage.basic.distributor.dynamic; +import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.is; -import static org.hamcrest.collection.IsIterableContainingInOrder.contains; import static org.junit.Assert.assertThat; import java.util.Arrays; import java.util.Collections; import java.util.List; -import org.junit.Before; import org.junit.Test; import teetime.framework.ConfigurationContext; @@ -17,23 +16,17 @@ import teetime.framework.Stage; import teetime.framework.exceptionHandling.TerminatingExceptionListenerFactory; import teetime.stage.CollectorSink; import teetime.stage.InitialElementProducer; +import teetime.util.framework.port.PortAction; public class ControlledDistributorTest { - // private ControlledDistributor<Integer> controlledDistributor; - - @Before - public void setUp() throws Exception { - // controlledDistributor = new ControlledDistributor<Integer>(); - } - @Test public void shouldWorkWithoutActionTriggers() throws Exception { - PortAction<Integer> createAction = new DoNothingPortAction<Integer>(); + PortAction<DynamicDistributor<Integer>> createAction = new DoNothingPortAction<Integer>(); List<Integer> inputNumbers = Arrays.asList(0, 1, 2, 3, 4); @SuppressWarnings("unchecked") - List<PortAction<Integer>> inputActions = Arrays.asList(createAction, createAction, createAction, createAction, createAction); + List<PortAction<DynamicDistributor<Integer>>> inputActions = Arrays.asList(createAction, createAction, createAction, createAction, createAction); ControlledDistributorTestConfig<Integer> config = new ControlledDistributorTestConfig<Integer>(inputNumbers, inputActions); Execution<ControlledDistributorTestConfig<Integer>> analysis = new Execution<ControlledDistributorTestConfig<Integer>>(config, @@ -49,9 +42,9 @@ public class ControlledDistributorTest { List<Integer> inputNumbers = Arrays.asList(0, 1, 2, 3, 4); @SuppressWarnings("unchecked") - PortAction<Integer>[] inputActions = new PortAction[5]; + PortAction<DynamicDistributor<Integer>>[] inputActions = new PortAction[5]; for (int i = 0; i < inputActions.length; i++) { - PortAction<Integer> createAction = createPortCreateAction(); + PortAction<DynamicDistributor<Integer>> createAction = createPortCreateAction(); inputActions[i] = createAction; } @@ -74,7 +67,7 @@ public class ControlledDistributorTest { List<Integer> inputNumbers = Arrays.asList(0, 1, 2, 3, 4, 5); @SuppressWarnings("unchecked") - PortAction<Integer>[] inputActions = new PortAction[6]; + PortAction<DynamicDistributor<Integer>>[] inputActions = new PortAction[6]; inputActions[0] = createPortCreateAction(); inputActions[1] = new RemovePortAction<Integer>(null); inputActions[2] = createPortCreateAction(); @@ -94,15 +87,15 @@ public class ControlledDistributorTest { assertValuesForIndex(inputActions, Collections.<Integer> emptyList(), 3); } - private PortAction<Integer> createPortCreateAction() { + private PortAction<DynamicDistributor<Integer>> createPortCreateAction() { CollectorSink<Integer> newStage = new CollectorSink<Integer>(); - PortAction<Integer> portAction = new CreatePortAction<Integer>(newStage.getInputPort()); + PortAction<DynamicDistributor<Integer>> portAction = new CreatePortAction<Integer>(newStage.getInputPort()); return portAction; } - private void assertValuesForIndex(final PortAction<Integer>[] inputActions, + private void assertValuesForIndex(final PortAction<DynamicDistributor<Integer>>[] inputActions, final List<Integer> values, final int index) { - PortAction<Integer> ia = inputActions[index]; + PortAction<DynamicDistributor<Integer>> ia = inputActions[index]; Stage stage = ((CreatePortAction<Integer>) ia).getInputPort().getOwningStage(); @SuppressWarnings("unchecked") CollectorSink<Integer> collectorSink = (CollectorSink<Integer>) stage; @@ -113,7 +106,7 @@ public class ControlledDistributorTest { private final CollectorSink<T> collectorSink; - public ControlledDistributorTestConfig(final List<T> elements, final List<PortAction<T>> portActions) { + public ControlledDistributorTestConfig(final List<T> elements, final List<PortAction<DynamicDistributor<T>>> inputActions) { InitialElementProducer<T> initialElementProducer = new InitialElementProducer<T>(elements); DynamicDistributor<T> distributor = new ControlledDynamicDistributor<T>(); collectorSink = new CollectorSink<T>(); @@ -124,7 +117,7 @@ public class ControlledDistributorTest { addThreadableStage(distributor); addThreadableStage(collectorSink); - for (PortAction<T> a : portActions) { + for (PortAction<DynamicDistributor<T>> a : inputActions) { distributor.addPortActionRequest(a); } } diff --git a/src/test/java/teetime/stage/basic/merger/MergerTest.java b/src/test/java/teetime/stage/basic/merger/MergerTest.java index c61f0cce..6306cd1c 100644 --- a/src/test/java/teetime/stage/basic/merger/MergerTest.java +++ b/src/test/java/teetime/stage/basic/merger/MergerTest.java @@ -34,8 +34,7 @@ public class MergerTest { @Test public void roundRobinShouldWork() { - Merger<Integer> mergerUnderTest = new Merger<Integer>(); - mergerUnderTest.setStrategy(new RoundRobinStrategy()); + Merger<Integer> mergerUnderTest = new Merger<Integer>(new RoundRobinStrategy()); List<Integer> mergedElements = new ArrayList<Integer>(); @@ -50,8 +49,7 @@ public class MergerTest { @Test public void roundRobinWithSingleProducerShouldWork() { - Merger<Integer> mergerUnderTest = new Merger<Integer>(); - mergerUnderTest.setStrategy(new RoundRobinStrategy()); + Merger<Integer> mergerUnderTest = new Merger<Integer>(new RoundRobinStrategy()); List<Integer> mergedElements = new ArrayList<Integer>(); diff --git a/src/test/java/teetime/stage/basic/merger/dynamic/ControlledMergerTest.java b/src/test/java/teetime/stage/basic/merger/dynamic/ControlledMergerTest.java new file mode 100644 index 00000000..2cb73b4b --- /dev/null +++ b/src/test/java/teetime/stage/basic/merger/dynamic/ControlledMergerTest.java @@ -0,0 +1,125 @@ +package teetime.stage.basic.merger.dynamic; + +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertThat; + +import java.util.Arrays; +import java.util.List; + +import org.junit.Test; + +import teetime.framework.ConfigurationContext; +import teetime.framework.DynamicActuator; +import teetime.framework.Execution; +import teetime.framework.exceptionHandling.TerminatingExceptionListenerFactory; +import teetime.framework.signal.InitializingSignal; +import teetime.framework.signal.StartingSignal; +import teetime.stage.CollectorSink; +import teetime.stage.InitialElementProducer; +import teetime.util.framework.port.PortAction; + +public class ControlledMergerTest { + + private static final DynamicActuator DYNAMIC_ACTUATOR = new DynamicActuator(); + + @Test + public void shouldWorkWithoutActionTriggers() throws Exception { + PortAction<DynamicMerger<Integer>> createAction = new DoNothingPortAction<Integer>(); + + List<Integer> inputNumbers = Arrays.asList(0, 1, 2, 3, 4); + @SuppressWarnings("unchecked") + List<PortAction<DynamicMerger<Integer>>> inputActions = Arrays.asList(createAction, createAction, createAction, createAction, createAction); + + ControlledMergerTestConfig<Integer> config = new ControlledMergerTestConfig<Integer>(inputNumbers, inputActions); + Execution<ControlledMergerTestConfig<Integer>> analysis = new Execution<ControlledMergerTestConfig<Integer>>(config, + new TerminatingExceptionListenerFactory()); + + analysis.executeBlocking(); + + assertThat(config.getOutputElements(), contains(0, 1, 2, 3, 4)); + } + + @Test + public void shouldWorkWithCreateActionTriggers() throws Exception { + List<Integer> inputNumbers = Arrays.asList(0); + + @SuppressWarnings("unchecked") + PortAction<DynamicMerger<Integer>>[] inputActions = new PortAction[5]; + for (int i = 0; i < inputActions.length; i++) { + inputActions[i] = createPortCreateAction(i + 1); + } + + ControlledMergerTestConfig<Integer> config = new ControlledMergerTestConfig<Integer>(inputNumbers, Arrays.asList(inputActions)); + Execution<ControlledMergerTestConfig<Integer>> analysis = new Execution<ControlledMergerTestConfig<Integer>>(config, + new TerminatingExceptionListenerFactory()); + + analysis.executeBlocking(); + + assertThat(config.getOutputElements(), containsInAnyOrder(0, 1, 2, 3, 4, 5, 6)); + } + + @Test + public void shouldWorkWithRemoveActionTriggers() throws Exception { + List<Integer> inputNumbers = Arrays.asList(0, 1, 2, 3, 4, 5); + + @SuppressWarnings("unchecked") + PortAction<DynamicMerger<Integer>>[] inputActions = new PortAction[6]; + // inputActions[0] = createPortCreateAction(); + // inputActions[1] = new RemovePortAction<Integer>(null); + // inputActions[2] = createPortCreateAction(); + // inputActions[3] = createPortCreateAction(); + // inputActions[4] = new RemovePortAction<Integer>(null); + // inputActions[5] = new RemovePortAction<Integer>(null); + // + // ControlledMergerTestConfig<Integer> config = new ControlledMergerTestConfig<Integer>(inputNumbers, Arrays.asList(inputActions)); + // Execution<ControlledMergerTestConfig<Integer>> analysis = new Execution<ControlledMergerTestConfig<Integer>>(config, + // new TerminatingExceptionListenerFactory()); + // + // analysis.executeBlocking(); + // + // assertThat(config.getOutputElements(), contains(0, 1, 2, 4, 5)); + // assertValuesForIndex(inputActions, Collections.<Integer> emptyList(), 0); + // assertValuesForIndex(inputActions, Arrays.asList(3), 2); + // assertValuesForIndex(inputActions, Collections.<Integer> emptyList(), 3); + } + + private PortAction<DynamicMerger<Integer>> createPortCreateAction(final Integer number) { + final InitialElementProducer<Integer> initialElementProducer = new InitialElementProducer<Integer>(number); + DYNAMIC_ACTUATOR.startWithinNewThread(initialElementProducer); + + PortAction<DynamicMerger<Integer>> portAction = new CreatePortAction<Integer>(initialElementProducer.getOutputPort()) { + @Override + public void execute(final DynamicMerger<Integer> dynamicDistributor) { + super.execute(dynamicDistributor); + initialElementProducer.getOutputPort().sendSignal(new InitializingSignal()); + initialElementProducer.getOutputPort().sendSignal(new StartingSignal()); + } + }; + return portAction; + } + + private static class ControlledMergerTestConfig<T> extends ConfigurationContext { + + private final CollectorSink<T> collectorSink; + + public ControlledMergerTestConfig(final List<T> elements, final List<PortAction<DynamicMerger<T>>> inputActions) { + InitialElementProducer<T> initialElementProducer = new InitialElementProducer<T>(elements); + DynamicMerger<T> merger = new ControlledDynamicMerger<T>(); + collectorSink = new CollectorSink<T>(); + + connectPorts(initialElementProducer.getOutputPort(), merger.getNewInputPort()); + connectPorts(merger.getOutputPort(), collectorSink.getInputPort()); + + addThreadableStage(merger); + + for (PortAction<DynamicMerger<T>> a : inputActions) { + merger.addPortActionRequest(a); + } + } + + public List<T> getOutputElements() { + return collectorSink.getElements(); + } + } +} diff --git a/src/test/java/teetime/stage/io/File2SeqOfWordsTest.java b/src/test/java/teetime/stage/io/File2SeqOfWordsTest.java index d6990c69..211f4e77 100644 --- a/src/test/java/teetime/stage/io/File2SeqOfWordsTest.java +++ b/src/test/java/teetime/stage/io/File2SeqOfWordsTest.java @@ -31,10 +31,10 @@ public class File2SeqOfWordsTest { @Test public void testExecute() throws Exception { File2SeqOfWords stage = new File2SeqOfWords(14); - List<String> outputList = new ArrayList<String>(); - StageTester.test(stage).send(Arrays.asList(new File("./src/test/resources/data/input.txt"))).to(stage.getInputPort()).and().receive(outputList) + List<String> outputSeqOfWords = new ArrayList<String>(); + StageTester.test(stage).send(Arrays.asList(new File("./src/test/resources/data/input.txt"))).to(stage.getInputPort()).and().receive(outputSeqOfWords) .from(stage.getOutputPort()).start(); - assertEquals(outputList.get(0), "Lorem ipsum"); + assertEquals(outputSeqOfWords.get(0), "Lorem ipsum"); } } -- GitLab