Skip to content
Snippets Groups Projects
Commit 009cd6ed authored by Christian Wulf's avatar Christian Wulf
Browse files

improved performance of RoundRobinStrategy

parent dd1534c6
No related branches found
No related tags found
No related merge requests found
Showing
with 232 additions and 49 deletions
......@@ -24,18 +24,18 @@ import teetime.framework.pipe.DummyPipe;
import teetime.framework.pipe.IPipe;
import teetime.framework.signal.ISignal;
import teetime.framework.validation.InvalidPortConnection;
import teetime.util.list.container.ContainerIterator;
import teetime.util.list.container.ContainerList;
public abstract class AbstractStage extends Stage {
private static final IPipe DUMMY_PORT = new DummyPipe();
private final List<InputPort<?>> inputPortList = new ArrayList<InputPort<?>>();
private final List<OutputPort<?>> outputPortList = new ArrayList<OutputPort<?>>();
protected final ContainerList<OutputPort<?>> outputPortList = new ContainerList<OutputPort<?>>();
/** A cached instance of <code>inputPortList</code> to avoid creating an iterator each time iterating it */
protected InputPort<?>[] cachedInputPorts = new InputPort[0];
/** A cached instance of <code>outputPortList</code> to avoid creating an iterator each time iterating it */
protected OutputPort<?>[] cachedOutputPorts;
private final Set<ISignal> triggeredSignals = new HashSet<ISignal>();
// BETTER aggregate both states in an enum
......@@ -51,11 +51,8 @@ public abstract class AbstractStage extends Stage {
return inputPortList.toArray(new InputPort<?>[0]); // FIXME remove work-around
}
/**
* @return the stage's output ports
*/
protected OutputPort<?>[] getOutputPorts() {
return this.cachedOutputPorts;
public ContainerList<OutputPort<?>> getOutputPorts() {
return outputPortList;
}
/**
......@@ -67,7 +64,8 @@ public abstract class AbstractStage extends Stage {
if (!this.signalAlreadyReceived(signal, inputPort)) {
signal.trigger(this);
for (OutputPort<?> outputPort : this.outputPortList) {
ContainerIterator<OutputPort<?>> iter = new ContainerIterator<OutputPort<?>>(outputPortList.getHead());
for (OutputPort<?> outputPort = iter.init(); iter.hasNext(); outputPort = iter.next()) {
outputPort.sendSignal(signal);
}
}
......@@ -103,7 +101,6 @@ public abstract class AbstractStage extends Stage {
public void onStarting() throws Exception {
this.owningThread = Thread.currentThread();
this.cachedInputPorts = this.inputPortList.toArray(new InputPort<?>[0]);
this.cachedOutputPorts = this.outputPortList.toArray(new OutputPort<?>[0]);
this.connectUnconnectedOutputPorts();
started = true;
......@@ -112,7 +109,8 @@ public abstract class AbstractStage extends Stage {
@SuppressWarnings("PMD.DataflowAnomalyAnalysis")
private void connectUnconnectedOutputPorts() {
for (OutputPort<?> outputPort : this.cachedOutputPorts) {
ContainerIterator<OutputPort<?>> iter = new ContainerIterator<OutputPort<?>>(outputPortList.getHead());
for (OutputPort<?> outputPort = iter.init(); iter.hasNext(); outputPort = iter.next()) {
if (null == outputPort.getPipe()) { // if port is unconnected
this.logger.warn("Unconnected output port: " + outputPort + ". Connecting with a dummy output port.");
outputPort.setPipe(DUMMY_PORT);
......@@ -141,8 +139,9 @@ public abstract class AbstractStage extends Stage {
*
* @return Newly added OutputPort
*/
protected <T> OutputPort<T> createOutputPort() {
final OutputPort<T> outputPort = new OutputPort<T>();
// TODO: add final Class<T> type
protected <E> OutputPort<E> createOutputPort() {
final OutputPort<E> outputPort = new OutputPort<E>();
// outputPort.setType(portType);
this.outputPortList.add(outputPort);
return outputPort;
......@@ -151,8 +150,8 @@ public abstract class AbstractStage extends Stage {
@SuppressWarnings("PMD.DataflowAnomalyAnalysis")
@Override
public void validateOutputPorts(final List<InvalidPortConnection> invalidPortConnections) {
// for (OutputPort<?> outputPort : this.getOutputPorts()) {
for (OutputPort<?> outputPort : this.outputPortList) {
ContainerIterator<OutputPort<?>> iter = new ContainerIterator<OutputPort<?>>(outputPortList.getHead());
for (OutputPort<?> outputPort = iter.init(); iter.hasNext(); outputPort = iter.next()) {
final IPipe pipe = outputPort.getPipe();
if (null != pipe) { // if output port is connected with another one
final Class<?> sourcePortType = outputPort.getType();
......
......@@ -16,16 +16,22 @@
package teetime.stage.basic.distributor;
import teetime.framework.OutputPort;
import teetime.util.list.container.ContainerList;
/**
* @author Nils Christian Ehmke
* @author Nils Christian Ehmke, Christian Wulf
*
* @since 1.0
*/
public final class CloneStrategy implements IDistributorStrategy {
public final class CloneStrategy<T> implements IDistributorStrategy<T> {
@Override
public <T> boolean distribute(final OutputPort<T>[] outputPorts, final T element) {
public void init(final ContainerList<OutputPort<?>> outputPorts) {
throw new UnsupportedOperationException();
}
@Override
public boolean distribute(final T element) {
throw new UnsupportedOperationException();
}
......
......@@ -16,21 +16,35 @@
package teetime.stage.basic.distributor;
import teetime.framework.OutputPort;
import teetime.util.list.container.ContainerIterator;
import teetime.util.list.container.ContainerList;
/**
* @author Nils Christian Ehmke
* @author Nils Christian Ehmke, Christian Wulf
*
* @since 1.0
*/
public final class CopyByReferenceStrategy implements IDistributorStrategy {
public final class CopyByReferenceStrategy<T> implements IDistributorStrategy<T> {
private ContainerIterator<OutputPort<?>> containerIterator;
@Override
public <T> boolean distribute(final OutputPort<T>[] outputPorts, final T element) {
for (final OutputPort<T> outputPort : outputPorts) {
outputPort.send(element);
public void init(final ContainerList<OutputPort<?>> outputPorts) {
if (outputPorts.getSize() == 0) {
throw new IllegalStateException("The number of output ports may not be 0.");
}
containerIterator = new ContainerIterator<OutputPort<?>>(outputPorts.getHead());
}
@Override
public boolean distribute(final T element) {
containerIterator.init();
do {
@SuppressWarnings("unchecked")
OutputPort<T> outputPort = (OutputPort<T>) containerIterator.next();
outputPort.send(element);
} while (containerIterator.hasNext());
return true;
}
}
......@@ -26,33 +26,38 @@ import teetime.framework.OutputPort;
* @param T
* the type of the input port and the output ports
*/
public class Distributor<T> extends AbstractConsumerStage<T> {
public final class Distributor<T> extends AbstractConsumerStage<T> {
private IDistributorStrategy strategy;
private IDistributorStrategy<T> strategy;
public Distributor() {
this(new RoundRobinStrategy());
this(new RoundRobinStrategy<T>());
}
public Distributor(final IDistributorStrategy strategy) {
public Distributor(final IDistributorStrategy<T> strategy) {
this.strategy = strategy;
}
@SuppressWarnings("unchecked")
@Override
public void onStarting() throws Exception {
this.strategy.init(this.getOutputPorts());
super.onStarting();
}
@Override
protected void execute(final T element) {
this.strategy.distribute((OutputPort<T>[]) this.getOutputPorts(), element);
this.strategy.distribute(element);
}
public OutputPort<T> getNewOutputPort() {
return this.createOutputPort();
}
public IDistributorStrategy getStrategy() {
public IDistributorStrategy<T> getStrategy() {
return this.strategy;
}
public void setStrategy(final IDistributorStrategy strategy) {
public void setStrategy(final IDistributorStrategy<T> strategy) {
this.strategy = strategy;
}
......
......@@ -16,14 +16,18 @@
package teetime.stage.basic.distributor;
import teetime.framework.OutputPort;
import teetime.util.list.container.ContainerList;
/**
* @author Nils Christian Ehmke
* @author Nils Christian Ehmke, Christian Wulf
*
* @since 1.0
*/
public interface IDistributorStrategy {
public interface IDistributorStrategy<T> {
public <T> boolean distribute(final OutputPort<T>[] allOutputPorts, final T element);
// we get List<OutputPort<?>> from AbstractStage, so we cannot expect a specific type here
public void init(ContainerList<OutputPort<?>> outputPorts);
public boolean distribute(T element);
}
......@@ -16,31 +16,29 @@
package teetime.stage.basic.distributor;
import teetime.framework.OutputPort;
import teetime.util.list.container.ContainerList;
import teetime.util.list.container.InfiniteStepByStepIterator;
/**
* @author Nils Christian Ehmke
* @author Nils Christian Ehmke, Christian Wulf
*
* @since 1.0
*/
public final class RoundRobinStrategy implements IDistributorStrategy {
public final class RoundRobinStrategy<T> implements IDistributorStrategy<T> {
private int index = 0;
private InfiniteStepByStepIterator<OutputPort<?>> containerIterator;
@Override
public <T> boolean distribute(final OutputPort<T>[] outputPorts, final T element) {
final OutputPort<T> outputPort = this.getNextPortInRoundRobinOrder(outputPorts);
public void init(final ContainerList<OutputPort<?>> outputPorts) {
containerIterator = new InfiniteStepByStepIterator<OutputPort<?>>(outputPorts.getHead());
}
@Override
public boolean distribute(final T element) {
@SuppressWarnings("unchecked")
OutputPort<T> outputPort = (OutputPort<T>) containerIterator.next();
outputPort.send(element);
return true;
}
private <T> OutputPort<T> getNextPortInRoundRobinOrder(final OutputPort<T>[] outputPorts) {
final OutputPort<T> outputPort = outputPorts[this.index];
this.index = (this.index + 1) % outputPorts.length;
return outputPort;
}
}
package teetime.util.list.container;
public class Container<T> {
private final T element;
private Container<T> next;
public Container(final T element) {
super();
this.element = element;
}
public T getElement() {
return element;
}
public Container<T> getNext() {
return next;
}
void setNext(final Container<T> next) {
this.next = next;
}
}
package teetime.util.list.container;
import java.util.Iterator;
public final class ContainerIterator<T> implements Iterator<T> {
private final Container<T> head;
private Container<T> currentContainer;
public ContainerIterator(final Container<T> head) {
this.head = head;
}
public T init() {
currentContainer = head;
return currentContainer.getElement();
}
@Override
public boolean hasNext() {
return currentContainer != head;
}
@Override
public T next() {
final Container<T> currentContainer = this.currentContainer;
this.currentContainer = currentContainer.getNext();
return currentContainer.getElement();
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
}
package teetime.util.list.container;
public class ContainerList<T> {
private Container<T> head;
private Container<T> tail;
private int size;
// public ContainerList(final Collection<T> elements) {
// for (T e : elements) {
// add(e);
// }
// }
public ContainerList() {
super();
}
/**
*
* @param element
* to be added to the tail
*/
public void add(final T element) {
Container<T> newContainer = new Container<T>(element);
if (head == null) {
head = tail = newContainer;
}
tail.setNext(newContainer);
tail = newContainer;
newContainer.setNext(head); // circularly
size++;
}
public Container<T> getHead() {
return head;
}
// public Container<T> getTail() {
// return tail;
// }
public int getSize() {
return size;
}
}
package teetime.util.list.container;
import java.util.Iterator;
public final class InfiniteStepByStepIterator<T> implements Iterator<T> {
private Container<T> currentContainer;
public InfiniteStepByStepIterator(final Container<T> head) {
this.currentContainer = head;
}
@Override
public boolean hasNext() {
return true;
}
@Override
public T next() {
final Container<T> currentContainer = this.currentContainer;
this.currentContainer = currentContainer.getNext();
return currentContainer.getElement();
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
}
package teetime.util.list.container;
public class RoundRobinContainer<T> {
private Container<T> currentContainer;
public RoundRobinContainer(final ContainerList<T> list) {
currentContainer = list.getHead();
}
public T getNextElement() {
final Container<T> currentContainer = this.currentContainer;
this.currentContainer = currentContainer.getNext();
return currentContainer.getElement();
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment