From 009cd6edfed25eaf14ce43836f3cdcdc449f768e Mon Sep 17 00:00:00 2001
From: Christian Wulf <chw@informatik.uni-kiel.de>
Date: Sun, 15 Feb 2015 19:03:08 +0100
Subject: [PATCH] improved performance of RoundRobinStrategy
---
.../java/teetime/framework/AbstractStage.java | 29 ++++++-----
.../basic/distributor/CloneStrategy.java | 12 +++--
.../distributor/CopyByReferenceStrategy.java | 26 +++++++---
.../stage/basic/distributor/Distributor.java | 21 +++++---
.../distributor/IDistributorStrategy.java | 10 ++--
.../basic/distributor/RoundRobinStrategy.java | 26 +++++-----
.../util/list/container/Container.java | 25 ++++++++++
.../list/container/ContainerIterator.java | 37 ++++++++++++++
.../util/list/container/ContainerList.java | 49 +++++++++++++++++++
.../container/InfiniteStepByStepIterator.java | 30 ++++++++++++
.../list/container/RoundRobinContainer.java | 16 ++++++
11 files changed, 232 insertions(+), 49 deletions(-)
create mode 100644 src/main/java/teetime/util/list/container/Container.java
create mode 100644 src/main/java/teetime/util/list/container/ContainerIterator.java
create mode 100644 src/main/java/teetime/util/list/container/ContainerList.java
create mode 100644 src/main/java/teetime/util/list/container/InfiniteStepByStepIterator.java
create mode 100644 src/main/java/teetime/util/list/container/RoundRobinContainer.java
diff --git a/src/main/java/teetime/framework/AbstractStage.java b/src/main/java/teetime/framework/AbstractStage.java
index 12c339e6..16e91ddd 100644
--- a/src/main/java/teetime/framework/AbstractStage.java
+++ b/src/main/java/teetime/framework/AbstractStage.java
@@ -24,18 +24,18 @@ import teetime.framework.pipe.DummyPipe;
import teetime.framework.pipe.IPipe;
import teetime.framework.signal.ISignal;
import teetime.framework.validation.InvalidPortConnection;
+import teetime.util.list.container.ContainerIterator;
+import teetime.util.list.container.ContainerList;
public abstract class AbstractStage extends Stage {
private static final IPipe DUMMY_PORT = new DummyPipe();
private final List<InputPort<?>> inputPortList = new ArrayList<InputPort<?>>();
- private final List<OutputPort<?>> outputPortList = new ArrayList<OutputPort<?>>();
+ protected final ContainerList<OutputPort<?>> outputPortList = new ContainerList<OutputPort<?>>();
/** A cached instance of <code>inputPortList</code> to avoid creating an iterator each time iterating it */
protected InputPort<?>[] cachedInputPorts = new InputPort[0];
- /** A cached instance of <code>outputPortList</code> to avoid creating an iterator each time iterating it */
- protected OutputPort<?>[] cachedOutputPorts;
private final Set<ISignal> triggeredSignals = new HashSet<ISignal>();
// BETTER aggregate both states in an enum
@@ -51,11 +51,8 @@ public abstract class AbstractStage extends Stage {
return inputPortList.toArray(new InputPort<?>[0]); // FIXME remove work-around
}
- /**
- * @return the stage's output ports
- */
- protected OutputPort<?>[] getOutputPorts() {
- return this.cachedOutputPorts;
+ public ContainerList<OutputPort<?>> getOutputPorts() {
+ return outputPortList;
}
/**
@@ -67,7 +64,8 @@ public abstract class AbstractStage extends Stage {
if (!this.signalAlreadyReceived(signal, inputPort)) {
signal.trigger(this);
- for (OutputPort<?> outputPort : this.outputPortList) {
+ ContainerIterator<OutputPort<?>> iter = new ContainerIterator<OutputPort<?>>(outputPortList.getHead());
+ for (OutputPort<?> outputPort = iter.init(); iter.hasNext(); outputPort = iter.next()) {
outputPort.sendSignal(signal);
}
}
@@ -103,7 +101,6 @@ public abstract class AbstractStage extends Stage {
public void onStarting() throws Exception {
this.owningThread = Thread.currentThread();
this.cachedInputPorts = this.inputPortList.toArray(new InputPort<?>[0]);
- this.cachedOutputPorts = this.outputPortList.toArray(new OutputPort<?>[0]);
this.connectUnconnectedOutputPorts();
started = true;
@@ -112,7 +109,8 @@ public abstract class AbstractStage extends Stage {
@SuppressWarnings("PMD.DataflowAnomalyAnalysis")
private void connectUnconnectedOutputPorts() {
- for (OutputPort<?> outputPort : this.cachedOutputPorts) {
+ ContainerIterator<OutputPort<?>> iter = new ContainerIterator<OutputPort<?>>(outputPortList.getHead());
+ for (OutputPort<?> outputPort = iter.init(); iter.hasNext(); outputPort = iter.next()) {
if (null == outputPort.getPipe()) { // if port is unconnected
this.logger.warn("Unconnected output port: " + outputPort + ". Connecting with a dummy output port.");
outputPort.setPipe(DUMMY_PORT);
@@ -141,8 +139,9 @@ public abstract class AbstractStage extends Stage {
*
* @return Newly added OutputPort
*/
- protected <T> OutputPort<T> createOutputPort() {
- final OutputPort<T> outputPort = new OutputPort<T>();
+ // TODO: add final Class<T> type
+ protected <E> OutputPort<E> createOutputPort() {
+ final OutputPort<E> outputPort = new OutputPort<E>();
// outputPort.setType(portType);
this.outputPortList.add(outputPort);
return outputPort;
@@ -151,8 +150,8 @@ public abstract class AbstractStage extends Stage {
@SuppressWarnings("PMD.DataflowAnomalyAnalysis")
@Override
public void validateOutputPorts(final List<InvalidPortConnection> invalidPortConnections) {
- // for (OutputPort<?> outputPort : this.getOutputPorts()) {
- for (OutputPort<?> outputPort : this.outputPortList) {
+ ContainerIterator<OutputPort<?>> iter = new ContainerIterator<OutputPort<?>>(outputPortList.getHead());
+ for (OutputPort<?> outputPort = iter.init(); iter.hasNext(); outputPort = iter.next()) {
final IPipe pipe = outputPort.getPipe();
if (null != pipe) { // if output port is connected with another one
final Class<?> sourcePortType = outputPort.getType();
diff --git a/src/main/java/teetime/stage/basic/distributor/CloneStrategy.java b/src/main/java/teetime/stage/basic/distributor/CloneStrategy.java
index 1264c6b8..f1376e3b 100644
--- a/src/main/java/teetime/stage/basic/distributor/CloneStrategy.java
+++ b/src/main/java/teetime/stage/basic/distributor/CloneStrategy.java
@@ -16,16 +16,22 @@
package teetime.stage.basic.distributor;
import teetime.framework.OutputPort;
+import teetime.util.list.container.ContainerList;
/**
- * @author Nils Christian Ehmke
+ * @author Nils Christian Ehmke, Christian Wulf
*
* @since 1.0
*/
-public final class CloneStrategy implements IDistributorStrategy {
+public final class CloneStrategy<T> implements IDistributorStrategy<T> {
@Override
- public <T> boolean distribute(final OutputPort<T>[] outputPorts, final T element) {
+ public void init(final ContainerList<OutputPort<?>> outputPorts) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean distribute(final T element) {
throw new UnsupportedOperationException();
}
diff --git a/src/main/java/teetime/stage/basic/distributor/CopyByReferenceStrategy.java b/src/main/java/teetime/stage/basic/distributor/CopyByReferenceStrategy.java
index 5ca51a06..54a8aed6 100644
--- a/src/main/java/teetime/stage/basic/distributor/CopyByReferenceStrategy.java
+++ b/src/main/java/teetime/stage/basic/distributor/CopyByReferenceStrategy.java
@@ -16,21 +16,35 @@
package teetime.stage.basic.distributor;
import teetime.framework.OutputPort;
+import teetime.util.list.container.ContainerIterator;
+import teetime.util.list.container.ContainerList;
/**
- * @author Nils Christian Ehmke
+ * @author Nils Christian Ehmke, Christian Wulf
*
* @since 1.0
*/
-public final class CopyByReferenceStrategy implements IDistributorStrategy {
+public final class CopyByReferenceStrategy<T> implements IDistributorStrategy<T> {
+
+ private ContainerIterator<OutputPort<?>> containerIterator;
@Override
- public <T> boolean distribute(final OutputPort<T>[] outputPorts, final T element) {
- for (final OutputPort<T> outputPort : outputPorts) {
- outputPort.send(element);
+ public void init(final ContainerList<OutputPort<?>> outputPorts) {
+ if (outputPorts.getSize() == 0) {
+ throw new IllegalStateException("The number of output ports may not be 0.");
}
+ containerIterator = new ContainerIterator<OutputPort<?>>(outputPorts.getHead());
+ }
+
+ @Override
+ public boolean distribute(final T element) {
+ containerIterator.init();
+ do {
+ @SuppressWarnings("unchecked")
+ OutputPort<T> outputPort = (OutputPort<T>) containerIterator.next();
+ outputPort.send(element);
+ } while (containerIterator.hasNext());
return true;
}
-
}
diff --git a/src/main/java/teetime/stage/basic/distributor/Distributor.java b/src/main/java/teetime/stage/basic/distributor/Distributor.java
index e591cfd2..1e82045d 100644
--- a/src/main/java/teetime/stage/basic/distributor/Distributor.java
+++ b/src/main/java/teetime/stage/basic/distributor/Distributor.java
@@ -26,33 +26,38 @@ import teetime.framework.OutputPort;
* @param T
* the type of the input port and the output ports
*/
-public class Distributor<T> extends AbstractConsumerStage<T> {
+public final class Distributor<T> extends AbstractConsumerStage<T> {
- private IDistributorStrategy strategy;
+ private IDistributorStrategy<T> strategy;
public Distributor() {
- this(new RoundRobinStrategy());
+ this(new RoundRobinStrategy<T>());
}
- public Distributor(final IDistributorStrategy strategy) {
+ public Distributor(final IDistributorStrategy<T> strategy) {
this.strategy = strategy;
}
- @SuppressWarnings("unchecked")
+ @Override
+ public void onStarting() throws Exception {
+ this.strategy.init(this.getOutputPorts());
+ super.onStarting();
+ }
+
@Override
protected void execute(final T element) {
- this.strategy.distribute((OutputPort<T>[]) this.getOutputPorts(), element);
+ this.strategy.distribute(element);
}
public OutputPort<T> getNewOutputPort() {
return this.createOutputPort();
}
- public IDistributorStrategy getStrategy() {
+ public IDistributorStrategy<T> getStrategy() {
return this.strategy;
}
- public void setStrategy(final IDistributorStrategy strategy) {
+ public void setStrategy(final IDistributorStrategy<T> strategy) {
this.strategy = strategy;
}
diff --git a/src/main/java/teetime/stage/basic/distributor/IDistributorStrategy.java b/src/main/java/teetime/stage/basic/distributor/IDistributorStrategy.java
index cc0946a5..d7143828 100644
--- a/src/main/java/teetime/stage/basic/distributor/IDistributorStrategy.java
+++ b/src/main/java/teetime/stage/basic/distributor/IDistributorStrategy.java
@@ -16,14 +16,18 @@
package teetime.stage.basic.distributor;
import teetime.framework.OutputPort;
+import teetime.util.list.container.ContainerList;
/**
- * @author Nils Christian Ehmke
+ * @author Nils Christian Ehmke, Christian Wulf
*
* @since 1.0
*/
-public interface IDistributorStrategy {
+public interface IDistributorStrategy<T> {
- public <T> boolean distribute(final OutputPort<T>[] allOutputPorts, final T element);
+ // we get List<OutputPort<?>> from AbstractStage, so we cannot expect a specific type here
+ public void init(ContainerList<OutputPort<?>> outputPorts);
+
+ public boolean distribute(T element);
}
diff --git a/src/main/java/teetime/stage/basic/distributor/RoundRobinStrategy.java b/src/main/java/teetime/stage/basic/distributor/RoundRobinStrategy.java
index cdf05f46..f3c7ac62 100644
--- a/src/main/java/teetime/stage/basic/distributor/RoundRobinStrategy.java
+++ b/src/main/java/teetime/stage/basic/distributor/RoundRobinStrategy.java
@@ -16,31 +16,29 @@
package teetime.stage.basic.distributor;
import teetime.framework.OutputPort;
+import teetime.util.list.container.ContainerList;
+import teetime.util.list.container.InfiniteStepByStepIterator;
/**
- * @author Nils Christian Ehmke
+ * @author Nils Christian Ehmke, Christian Wulf
*
* @since 1.0
*/
-public final class RoundRobinStrategy implements IDistributorStrategy {
+public final class RoundRobinStrategy<T> implements IDistributorStrategy<T> {
- private int index = 0;
+ private InfiniteStepByStepIterator<OutputPort<?>> containerIterator;
@Override
- public <T> boolean distribute(final OutputPort<T>[] outputPorts, final T element) {
- final OutputPort<T> outputPort = this.getNextPortInRoundRobinOrder(outputPorts);
+ public void init(final ContainerList<OutputPort<?>> outputPorts) {
+ containerIterator = new InfiniteStepByStepIterator<OutputPort<?>>(outputPorts.getHead());
+ }
+ @Override
+ public boolean distribute(final T element) {
+ @SuppressWarnings("unchecked")
+ OutputPort<T> outputPort = (OutputPort<T>) containerIterator.next();
outputPort.send(element);
-
return true;
}
- private <T> OutputPort<T> getNextPortInRoundRobinOrder(final OutputPort<T>[] outputPorts) {
- final OutputPort<T> outputPort = outputPorts[this.index];
-
- this.index = (this.index + 1) % outputPorts.length;
-
- return outputPort;
- }
-
}
diff --git a/src/main/java/teetime/util/list/container/Container.java b/src/main/java/teetime/util/list/container/Container.java
new file mode 100644
index 00000000..66305e76
--- /dev/null
+++ b/src/main/java/teetime/util/list/container/Container.java
@@ -0,0 +1,25 @@
+package teetime.util.list.container;
+
+
+public class Container<T> {
+
+ private final T element;
+ private Container<T> next;
+
+ public Container(final T element) {
+ super();
+ this.element = element;
+ }
+
+ public T getElement() {
+ return element;
+ }
+
+ public Container<T> getNext() {
+ return next;
+ }
+
+ void setNext(final Container<T> next) {
+ this.next = next;
+ }
+}
diff --git a/src/main/java/teetime/util/list/container/ContainerIterator.java b/src/main/java/teetime/util/list/container/ContainerIterator.java
new file mode 100644
index 00000000..cf3fb84c
--- /dev/null
+++ b/src/main/java/teetime/util/list/container/ContainerIterator.java
@@ -0,0 +1,37 @@
+package teetime.util.list.container;
+
+import java.util.Iterator;
+
+public final class ContainerIterator<T> implements Iterator<T> {
+
+ private final Container<T> head;
+
+ private Container<T> currentContainer;
+
+ public ContainerIterator(final Container<T> head) {
+ this.head = head;
+ }
+
+ public T init() {
+ currentContainer = head;
+ return currentContainer.getElement();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return currentContainer != head;
+ }
+
+ @Override
+ public T next() {
+ final Container<T> currentContainer = this.currentContainer;
+ this.currentContainer = currentContainer.getNext();
+ return currentContainer.getElement();
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+}
diff --git a/src/main/java/teetime/util/list/container/ContainerList.java b/src/main/java/teetime/util/list/container/ContainerList.java
new file mode 100644
index 00000000..c8c39ae7
--- /dev/null
+++ b/src/main/java/teetime/util/list/container/ContainerList.java
@@ -0,0 +1,49 @@
+package teetime.util.list.container;
+
+
+public class ContainerList<T> {
+
+ private Container<T> head;
+ private Container<T> tail;
+ private int size;
+
+ // public ContainerList(final Collection<T> elements) {
+ // for (T e : elements) {
+ // add(e);
+ // }
+ // }
+
+ public ContainerList() {
+ super();
+ }
+
+ /**
+ *
+ * @param element
+ * to be added to the tail
+ */
+ public void add(final T element) {
+ Container<T> newContainer = new Container<T>(element);
+ if (head == null) {
+ head = tail = newContainer;
+ }
+ tail.setNext(newContainer);
+ tail = newContainer;
+ newContainer.setNext(head); // circularly
+
+ size++;
+ }
+
+ public Container<T> getHead() {
+ return head;
+ }
+
+ // public Container<T> getTail() {
+ // return tail;
+ // }
+
+ public int getSize() {
+ return size;
+ }
+
+}
diff --git a/src/main/java/teetime/util/list/container/InfiniteStepByStepIterator.java b/src/main/java/teetime/util/list/container/InfiniteStepByStepIterator.java
new file mode 100644
index 00000000..ecc01b5b
--- /dev/null
+++ b/src/main/java/teetime/util/list/container/InfiniteStepByStepIterator.java
@@ -0,0 +1,30 @@
+package teetime.util.list.container;
+
+import java.util.Iterator;
+
+public final class InfiniteStepByStepIterator<T> implements Iterator<T> {
+
+ private Container<T> currentContainer;
+
+ public InfiniteStepByStepIterator(final Container<T> head) {
+ this.currentContainer = head;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return true;
+ }
+
+ @Override
+ public T next() {
+ final Container<T> currentContainer = this.currentContainer;
+ this.currentContainer = currentContainer.getNext();
+ return currentContainer.getElement();
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+}
diff --git a/src/main/java/teetime/util/list/container/RoundRobinContainer.java b/src/main/java/teetime/util/list/container/RoundRobinContainer.java
new file mode 100644
index 00000000..55ecdade
--- /dev/null
+++ b/src/main/java/teetime/util/list/container/RoundRobinContainer.java
@@ -0,0 +1,16 @@
+package teetime.util.list.container;
+
+public class RoundRobinContainer<T> {
+
+ private Container<T> currentContainer;
+
+ public RoundRobinContainer(final ContainerList<T> list) {
+ currentContainer = list.getHead();
+ }
+
+ public T getNextElement() {
+ final Container<T> currentContainer = this.currentContainer;
+ this.currentContainer = currentContainer.getNext();
+ return currentContainer.getElement();
+ }
+}
--
GitLab