diff --git a/src/main/java/teetime/framework/AbstractStage.java b/src/main/java/teetime/framework/AbstractStage.java index 12c339e67f152c75c7bf61a94660dc44f2d38320..16e91ddd7a3bcaf8ff0259bf24fafefcc0b3df84 100644 --- a/src/main/java/teetime/framework/AbstractStage.java +++ b/src/main/java/teetime/framework/AbstractStage.java @@ -24,18 +24,18 @@ import teetime.framework.pipe.DummyPipe; import teetime.framework.pipe.IPipe; import teetime.framework.signal.ISignal; import teetime.framework.validation.InvalidPortConnection; +import teetime.util.list.container.ContainerIterator; +import teetime.util.list.container.ContainerList; public abstract class AbstractStage extends Stage { private static final IPipe DUMMY_PORT = new DummyPipe(); private final List<InputPort<?>> inputPortList = new ArrayList<InputPort<?>>(); - private final List<OutputPort<?>> outputPortList = new ArrayList<OutputPort<?>>(); + protected final ContainerList<OutputPort<?>> outputPortList = new ContainerList<OutputPort<?>>(); /** A cached instance of <code>inputPortList</code> to avoid creating an iterator each time iterating it */ protected InputPort<?>[] cachedInputPorts = new InputPort[0]; - /** A cached instance of <code>outputPortList</code> to avoid creating an iterator each time iterating it */ - protected OutputPort<?>[] cachedOutputPorts; private final Set<ISignal> triggeredSignals = new HashSet<ISignal>(); // BETTER aggregate both states in an enum @@ -51,11 +51,8 @@ public abstract class AbstractStage extends Stage { return inputPortList.toArray(new InputPort<?>[0]); // FIXME remove work-around } - /** - * @return the stage's output ports - */ - protected OutputPort<?>[] getOutputPorts() { - return this.cachedOutputPorts; + public ContainerList<OutputPort<?>> getOutputPorts() { + return outputPortList; } /** @@ -67,7 +64,8 @@ public abstract class AbstractStage extends Stage { if (!this.signalAlreadyReceived(signal, inputPort)) { signal.trigger(this); - for (OutputPort<?> outputPort : this.outputPortList) { + ContainerIterator<OutputPort<?>> iter = new ContainerIterator<OutputPort<?>>(outputPortList.getHead()); + for (OutputPort<?> outputPort = iter.init(); iter.hasNext(); outputPort = iter.next()) { outputPort.sendSignal(signal); } } @@ -103,7 +101,6 @@ public abstract class AbstractStage extends Stage { public void onStarting() throws Exception { this.owningThread = Thread.currentThread(); this.cachedInputPorts = this.inputPortList.toArray(new InputPort<?>[0]); - this.cachedOutputPorts = this.outputPortList.toArray(new OutputPort<?>[0]); this.connectUnconnectedOutputPorts(); started = true; @@ -112,7 +109,8 @@ public abstract class AbstractStage extends Stage { @SuppressWarnings("PMD.DataflowAnomalyAnalysis") private void connectUnconnectedOutputPorts() { - for (OutputPort<?> outputPort : this.cachedOutputPorts) { + ContainerIterator<OutputPort<?>> iter = new ContainerIterator<OutputPort<?>>(outputPortList.getHead()); + for (OutputPort<?> outputPort = iter.init(); iter.hasNext(); outputPort = iter.next()) { if (null == outputPort.getPipe()) { // if port is unconnected this.logger.warn("Unconnected output port: " + outputPort + ". Connecting with a dummy output port."); outputPort.setPipe(DUMMY_PORT); @@ -141,8 +139,9 @@ public abstract class AbstractStage extends Stage { * * @return Newly added OutputPort */ - protected <T> OutputPort<T> createOutputPort() { - final OutputPort<T> outputPort = new OutputPort<T>(); + // TODO: add final Class<T> type + protected <E> OutputPort<E> createOutputPort() { + final OutputPort<E> outputPort = new OutputPort<E>(); // outputPort.setType(portType); this.outputPortList.add(outputPort); return outputPort; @@ -151,8 +150,8 @@ public abstract class AbstractStage extends Stage { @SuppressWarnings("PMD.DataflowAnomalyAnalysis") @Override public void validateOutputPorts(final List<InvalidPortConnection> invalidPortConnections) { - // for (OutputPort<?> outputPort : this.getOutputPorts()) { - for (OutputPort<?> outputPort : this.outputPortList) { + ContainerIterator<OutputPort<?>> iter = new ContainerIterator<OutputPort<?>>(outputPortList.getHead()); + for (OutputPort<?> outputPort = iter.init(); iter.hasNext(); outputPort = iter.next()) { final IPipe pipe = outputPort.getPipe(); if (null != pipe) { // if output port is connected with another one final Class<?> sourcePortType = outputPort.getType(); diff --git a/src/main/java/teetime/stage/basic/distributor/CloneStrategy.java b/src/main/java/teetime/stage/basic/distributor/CloneStrategy.java index 1264c6b81c8c187a1b9b29ff7fe4408ef0ca9bea..f1376e3b726c0b46e66a6cc38500b6603c91833e 100644 --- a/src/main/java/teetime/stage/basic/distributor/CloneStrategy.java +++ b/src/main/java/teetime/stage/basic/distributor/CloneStrategy.java @@ -16,16 +16,22 @@ package teetime.stage.basic.distributor; import teetime.framework.OutputPort; +import teetime.util.list.container.ContainerList; /** - * @author Nils Christian Ehmke + * @author Nils Christian Ehmke, Christian Wulf * * @since 1.0 */ -public final class CloneStrategy implements IDistributorStrategy { +public final class CloneStrategy<T> implements IDistributorStrategy<T> { @Override - public <T> boolean distribute(final OutputPort<T>[] outputPorts, final T element) { + public void init(final ContainerList<OutputPort<?>> outputPorts) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean distribute(final T element) { throw new UnsupportedOperationException(); } diff --git a/src/main/java/teetime/stage/basic/distributor/CopyByReferenceStrategy.java b/src/main/java/teetime/stage/basic/distributor/CopyByReferenceStrategy.java index 5ca51a061f6606071a8109596055e3e7857a4ff9..54a8aed63bd07cb930e642d70f44c6639de36ad1 100644 --- a/src/main/java/teetime/stage/basic/distributor/CopyByReferenceStrategy.java +++ b/src/main/java/teetime/stage/basic/distributor/CopyByReferenceStrategy.java @@ -16,21 +16,35 @@ package teetime.stage.basic.distributor; import teetime.framework.OutputPort; +import teetime.util.list.container.ContainerIterator; +import teetime.util.list.container.ContainerList; /** - * @author Nils Christian Ehmke + * @author Nils Christian Ehmke, Christian Wulf * * @since 1.0 */ -public final class CopyByReferenceStrategy implements IDistributorStrategy { +public final class CopyByReferenceStrategy<T> implements IDistributorStrategy<T> { + + private ContainerIterator<OutputPort<?>> containerIterator; @Override - public <T> boolean distribute(final OutputPort<T>[] outputPorts, final T element) { - for (final OutputPort<T> outputPort : outputPorts) { - outputPort.send(element); + public void init(final ContainerList<OutputPort<?>> outputPorts) { + if (outputPorts.getSize() == 0) { + throw new IllegalStateException("The number of output ports may not be 0."); } + containerIterator = new ContainerIterator<OutputPort<?>>(outputPorts.getHead()); + } + + @Override + public boolean distribute(final T element) { + containerIterator.init(); + do { + @SuppressWarnings("unchecked") + OutputPort<T> outputPort = (OutputPort<T>) containerIterator.next(); + outputPort.send(element); + } while (containerIterator.hasNext()); return true; } - } diff --git a/src/main/java/teetime/stage/basic/distributor/Distributor.java b/src/main/java/teetime/stage/basic/distributor/Distributor.java index e591cfd20aa189a5f4ae305dc42933c9c4107620..1e82045d05f94d77c0b6a7a6c06370b1c785316f 100644 --- a/src/main/java/teetime/stage/basic/distributor/Distributor.java +++ b/src/main/java/teetime/stage/basic/distributor/Distributor.java @@ -26,33 +26,38 @@ import teetime.framework.OutputPort; * @param T * the type of the input port and the output ports */ -public class Distributor<T> extends AbstractConsumerStage<T> { +public final class Distributor<T> extends AbstractConsumerStage<T> { - private IDistributorStrategy strategy; + private IDistributorStrategy<T> strategy; public Distributor() { - this(new RoundRobinStrategy()); + this(new RoundRobinStrategy<T>()); } - public Distributor(final IDistributorStrategy strategy) { + public Distributor(final IDistributorStrategy<T> strategy) { this.strategy = strategy; } - @SuppressWarnings("unchecked") + @Override + public void onStarting() throws Exception { + this.strategy.init(this.getOutputPorts()); + super.onStarting(); + } + @Override protected void execute(final T element) { - this.strategy.distribute((OutputPort<T>[]) this.getOutputPorts(), element); + this.strategy.distribute(element); } public OutputPort<T> getNewOutputPort() { return this.createOutputPort(); } - public IDistributorStrategy getStrategy() { + public IDistributorStrategy<T> getStrategy() { return this.strategy; } - public void setStrategy(final IDistributorStrategy strategy) { + public void setStrategy(final IDistributorStrategy<T> strategy) { this.strategy = strategy; } diff --git a/src/main/java/teetime/stage/basic/distributor/IDistributorStrategy.java b/src/main/java/teetime/stage/basic/distributor/IDistributorStrategy.java index cc0946a5f6341c14b32178d016d6477515e40fbc..d7143828d5b1baa5476400c2c7fe70846767507e 100644 --- a/src/main/java/teetime/stage/basic/distributor/IDistributorStrategy.java +++ b/src/main/java/teetime/stage/basic/distributor/IDistributorStrategy.java @@ -16,14 +16,18 @@ package teetime.stage.basic.distributor; import teetime.framework.OutputPort; +import teetime.util.list.container.ContainerList; /** - * @author Nils Christian Ehmke + * @author Nils Christian Ehmke, Christian Wulf * * @since 1.0 */ -public interface IDistributorStrategy { +public interface IDistributorStrategy<T> { - public <T> boolean distribute(final OutputPort<T>[] allOutputPorts, final T element); + // we get List<OutputPort<?>> from AbstractStage, so we cannot expect a specific type here + public void init(ContainerList<OutputPort<?>> outputPorts); + + public boolean distribute(T element); } diff --git a/src/main/java/teetime/stage/basic/distributor/RoundRobinStrategy.java b/src/main/java/teetime/stage/basic/distributor/RoundRobinStrategy.java index cdf05f46ecb16d8aaad4fd66116c88433f535e10..f3c7ac622667547750e37fadf2f8d3710c7395fc 100644 --- a/src/main/java/teetime/stage/basic/distributor/RoundRobinStrategy.java +++ b/src/main/java/teetime/stage/basic/distributor/RoundRobinStrategy.java @@ -16,31 +16,29 @@ package teetime.stage.basic.distributor; import teetime.framework.OutputPort; +import teetime.util.list.container.ContainerList; +import teetime.util.list.container.InfiniteStepByStepIterator; /** - * @author Nils Christian Ehmke + * @author Nils Christian Ehmke, Christian Wulf * * @since 1.0 */ -public final class RoundRobinStrategy implements IDistributorStrategy { +public final class RoundRobinStrategy<T> implements IDistributorStrategy<T> { - private int index = 0; + private InfiniteStepByStepIterator<OutputPort<?>> containerIterator; @Override - public <T> boolean distribute(final OutputPort<T>[] outputPorts, final T element) { - final OutputPort<T> outputPort = this.getNextPortInRoundRobinOrder(outputPorts); + public void init(final ContainerList<OutputPort<?>> outputPorts) { + containerIterator = new InfiniteStepByStepIterator<OutputPort<?>>(outputPorts.getHead()); + } + @Override + public boolean distribute(final T element) { + @SuppressWarnings("unchecked") + OutputPort<T> outputPort = (OutputPort<T>) containerIterator.next(); outputPort.send(element); - return true; } - private <T> OutputPort<T> getNextPortInRoundRobinOrder(final OutputPort<T>[] outputPorts) { - final OutputPort<T> outputPort = outputPorts[this.index]; - - this.index = (this.index + 1) % outputPorts.length; - - return outputPort; - } - } diff --git a/src/main/java/teetime/util/list/container/Container.java b/src/main/java/teetime/util/list/container/Container.java new file mode 100644 index 0000000000000000000000000000000000000000..66305e76dfac67c27f47880dde73d3bc8151bdb8 --- /dev/null +++ b/src/main/java/teetime/util/list/container/Container.java @@ -0,0 +1,25 @@ +package teetime.util.list.container; + + +public class Container<T> { + + private final T element; + private Container<T> next; + + public Container(final T element) { + super(); + this.element = element; + } + + public T getElement() { + return element; + } + + public Container<T> getNext() { + return next; + } + + void setNext(final Container<T> next) { + this.next = next; + } +} diff --git a/src/main/java/teetime/util/list/container/ContainerIterator.java b/src/main/java/teetime/util/list/container/ContainerIterator.java new file mode 100644 index 0000000000000000000000000000000000000000..cf3fb84cb8f165e1c1df553d99e052d008e12629 --- /dev/null +++ b/src/main/java/teetime/util/list/container/ContainerIterator.java @@ -0,0 +1,37 @@ +package teetime.util.list.container; + +import java.util.Iterator; + +public final class ContainerIterator<T> implements Iterator<T> { + + private final Container<T> head; + + private Container<T> currentContainer; + + public ContainerIterator(final Container<T> head) { + this.head = head; + } + + public T init() { + currentContainer = head; + return currentContainer.getElement(); + } + + @Override + public boolean hasNext() { + return currentContainer != head; + } + + @Override + public T next() { + final Container<T> currentContainer = this.currentContainer; + this.currentContainer = currentContainer.getNext(); + return currentContainer.getElement(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + +} diff --git a/src/main/java/teetime/util/list/container/ContainerList.java b/src/main/java/teetime/util/list/container/ContainerList.java new file mode 100644 index 0000000000000000000000000000000000000000..c8c39ae7831a83739aa5729a8298274249f26346 --- /dev/null +++ b/src/main/java/teetime/util/list/container/ContainerList.java @@ -0,0 +1,49 @@ +package teetime.util.list.container; + + +public class ContainerList<T> { + + private Container<T> head; + private Container<T> tail; + private int size; + + // public ContainerList(final Collection<T> elements) { + // for (T e : elements) { + // add(e); + // } + // } + + public ContainerList() { + super(); + } + + /** + * + * @param element + * to be added to the tail + */ + public void add(final T element) { + Container<T> newContainer = new Container<T>(element); + if (head == null) { + head = tail = newContainer; + } + tail.setNext(newContainer); + tail = newContainer; + newContainer.setNext(head); // circularly + + size++; + } + + public Container<T> getHead() { + return head; + } + + // public Container<T> getTail() { + // return tail; + // } + + public int getSize() { + return size; + } + +} diff --git a/src/main/java/teetime/util/list/container/InfiniteStepByStepIterator.java b/src/main/java/teetime/util/list/container/InfiniteStepByStepIterator.java new file mode 100644 index 0000000000000000000000000000000000000000..ecc01b5b046ce78c8030d5cfedf73eabd1dd70d1 --- /dev/null +++ b/src/main/java/teetime/util/list/container/InfiniteStepByStepIterator.java @@ -0,0 +1,30 @@ +package teetime.util.list.container; + +import java.util.Iterator; + +public final class InfiniteStepByStepIterator<T> implements Iterator<T> { + + private Container<T> currentContainer; + + public InfiniteStepByStepIterator(final Container<T> head) { + this.currentContainer = head; + } + + @Override + public boolean hasNext() { + return true; + } + + @Override + public T next() { + final Container<T> currentContainer = this.currentContainer; + this.currentContainer = currentContainer.getNext(); + return currentContainer.getElement(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + +} diff --git a/src/main/java/teetime/util/list/container/RoundRobinContainer.java b/src/main/java/teetime/util/list/container/RoundRobinContainer.java new file mode 100644 index 0000000000000000000000000000000000000000..55ecdadee874c4a67615dc98996089c8da863356 --- /dev/null +++ b/src/main/java/teetime/util/list/container/RoundRobinContainer.java @@ -0,0 +1,16 @@ +package teetime.util.list.container; + +public class RoundRobinContainer<T> { + + private Container<T> currentContainer; + + public RoundRobinContainer(final ContainerList<T> list) { + currentContainer = list.getHead(); + } + + public T getNextElement() { + final Container<T> currentContainer = this.currentContainer; + this.currentContainer = currentContainer.getNext(); + return currentContainer.getElement(); + } +}