From aa7b2c930209b8f4521cc99759509afc6e600d23 Mon Sep 17 00:00:00 2001 From: Christian Wulf <chw@informatik.uni-kiel.de> Date: Thu, 19 Mar 2015 14:12:45 +0100 Subject: [PATCH] added a working more efficient distributor strategy --- .../framework/AbstractIntraThreadPipe.java | 5 ++++ .../java/teetime/framework/OutputPort.java | 16 ++++++++++-- .../teetime/framework/pipe/DummyPipe.java | 7 +++++- .../java/teetime/framework/pipe/IPipe.java | 11 +++++++- .../teetime/framework/pipe/RelayTestPipe.java | 5 ++++ .../java/teetime/framework/pipe/SpScPipe.java | 25 +++++++++++-------- .../framework/pipe/UnboundedSpScPipe.java | 5 ++++ .../distributor/RoundRobinStrategy2.java | 10 ++++++-- .../stage/basic/merger/MergerTestingPipe.java | 5 ++++ 9 files changed, 73 insertions(+), 16 deletions(-) diff --git a/src/main/java/teetime/framework/AbstractIntraThreadPipe.java b/src/main/java/teetime/framework/AbstractIntraThreadPipe.java index 4efd0cbf..47f3af1d 100644 --- a/src/main/java/teetime/framework/AbstractIntraThreadPipe.java +++ b/src/main/java/teetime/framework/AbstractIntraThreadPipe.java @@ -25,6 +25,11 @@ public abstract class AbstractIntraThreadPipe extends AbstractPipe { super(sourcePort, targetPort); } + @Override + public boolean addNonBlocking(final Object element) { + return add(element); + } + @Override public final void sendSignal(final ISignal signal) { // getTargetPort is always non-null since the framework adds dummy ports if necessary diff --git a/src/main/java/teetime/framework/OutputPort.java b/src/main/java/teetime/framework/OutputPort.java index 50a54d6a..904e567b 100644 --- a/src/main/java/teetime/framework/OutputPort.java +++ b/src/main/java/teetime/framework/OutputPort.java @@ -24,6 +24,18 @@ public final class OutputPort<T> extends AbstractPort<T> { super(); } + /** + * + * Guarantees the delivery of the given <code>element</code>. + * + * @param element + * to be sent; May not be <code>null</code>. + * + */ + public void send(final T element) { + this.pipe.add(element); + } + /** * * @param element @@ -31,8 +43,8 @@ public final class OutputPort<T> extends AbstractPort<T> { * * @return <code>true</code> iff the <code>element</code> was sent; <code>false</code> otherwise. */ - public boolean send(final T element) { - return this.pipe.add(element); + public boolean sendNonBlocking(final T element) { + return this.pipe.addNonBlocking(element); } /** diff --git a/src/main/java/teetime/framework/pipe/DummyPipe.java b/src/main/java/teetime/framework/pipe/DummyPipe.java index 0ca1ea9b..6cdd1fe9 100644 --- a/src/main/java/teetime/framework/pipe/DummyPipe.java +++ b/src/main/java/teetime/framework/pipe/DummyPipe.java @@ -32,7 +32,12 @@ public final class DummyPipe implements IPipe { @Override public boolean add(final Object element) { - return false; + return true; + } + + @Override + public boolean addNonBlocking(final Object element) { + return add(element); } @Override diff --git a/src/main/java/teetime/framework/pipe/IPipe.java b/src/main/java/teetime/framework/pipe/IPipe.java index f15b9f5e..5758e345 100644 --- a/src/main/java/teetime/framework/pipe/IPipe.java +++ b/src/main/java/teetime/framework/pipe/IPipe.java @@ -31,7 +31,16 @@ public interface IPipe { * Element which will be added * @return <code>true</code> if the element could be added, false otherwise */ - boolean add(Object element); + boolean add(Object element); // TODO correct javadoc: no return type since guarantee of element delivery + + /** + * Adds an element to the Pipe. + * + * @param element + * Element which will be added + * @return <code>true</code> if the element could be added, false otherwise + */ + boolean addNonBlocking(Object element); /** * Checks whether the pipe is empty or not. diff --git a/src/main/java/teetime/framework/pipe/RelayTestPipe.java b/src/main/java/teetime/framework/pipe/RelayTestPipe.java index 3b112db9..9afebf5f 100644 --- a/src/main/java/teetime/framework/pipe/RelayTestPipe.java +++ b/src/main/java/teetime/framework/pipe/RelayTestPipe.java @@ -34,6 +34,11 @@ public final class RelayTestPipe<T> extends AbstractInterThreadPipe { return false; } + @Override + public boolean addNonBlocking(final Object element) { + return add(element); + } + @Override public T removeLast() { if (this.numInputObjects == 0) { diff --git a/src/main/java/teetime/framework/pipe/SpScPipe.java b/src/main/java/teetime/framework/pipe/SpScPipe.java index b4d5d0a5..106506de 100644 --- a/src/main/java/teetime/framework/pipe/SpScPipe.java +++ b/src/main/java/teetime/framework/pipe/SpScPipe.java @@ -47,17 +47,22 @@ public final class SpScPipe extends AbstractInterThreadPipe { @Override public boolean add(final Object element) { // BETTER introduce a QueueIsFullStrategy - // while (!this.queue.offer(element)) { - // this.numWaits++; - // // Thread.yield(); - // try { - // Thread.sleep(0); - // } catch (InterruptedException e) { - // // TODO Auto-generated catch block - // e.printStackTrace(); - // } - // } + while (!this.queue.offer(element)) { + this.numWaits++; + // Thread.yield(); + try { + Thread.sleep(1); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } // this.reportNewElement(); + return true; + } + + @Override + public boolean addNonBlocking(final Object element) { return this.queue.offer(element); } diff --git a/src/main/java/teetime/framework/pipe/UnboundedSpScPipe.java b/src/main/java/teetime/framework/pipe/UnboundedSpScPipe.java index 424e6b49..ded5c049 100644 --- a/src/main/java/teetime/framework/pipe/UnboundedSpScPipe.java +++ b/src/main/java/teetime/framework/pipe/UnboundedSpScPipe.java @@ -41,6 +41,11 @@ public final class UnboundedSpScPipe extends AbstractInterThreadPipe { return this.queue.offer(element); } + @Override + public boolean addNonBlocking(final Object element) { + return add(element); + } + @Override public Object removeLast() { return this.queue.poll(); diff --git a/src/main/java/teetime/stage/basic/distributor/RoundRobinStrategy2.java b/src/main/java/teetime/stage/basic/distributor/RoundRobinStrategy2.java index 49350b97..402daddc 100644 --- a/src/main/java/teetime/stage/basic/distributor/RoundRobinStrategy2.java +++ b/src/main/java/teetime/stage/basic/distributor/RoundRobinStrategy2.java @@ -18,13 +18,14 @@ package teetime.stage.basic.distributor; import teetime.framework.OutputPort; /** - * @author Nils Christian Ehmke + * @author Nils Christian Ehmke, Christian Wulf * * @since 1.0 */ public final class RoundRobinStrategy2 implements IDistributorStrategy { private int index = 0; + private int numWaits; @Override public <T> boolean distribute(final OutputPort<T>[] outputPorts, final T element) { @@ -33,9 +34,10 @@ public final class RoundRobinStrategy2 implements IDistributorStrategy { boolean success; do { OutputPort<T> outputPort = getNextPortInRoundRobinOrder(outputPorts); - success = outputPort.send(element); + success = outputPort.sendNonBlocking(element); numLoops--; if (0 == numLoops) { + numWaits++; // Thread.yield(); try { Thread.sleep(1); @@ -59,4 +61,8 @@ public final class RoundRobinStrategy2 implements IDistributorStrategy { return outputPort; } + public int getNumWaits() { + return numWaits; + } + } diff --git a/src/test/java/teetime/stage/basic/merger/MergerTestingPipe.java b/src/test/java/teetime/stage/basic/merger/MergerTestingPipe.java index 5cf29ff7..9b0e3904 100644 --- a/src/test/java/teetime/stage/basic/merger/MergerTestingPipe.java +++ b/src/test/java/teetime/stage/basic/merger/MergerTestingPipe.java @@ -51,6 +51,11 @@ public class MergerTestingPipe implements IPipe { return false; } + @Override + public boolean addNonBlocking(final Object element) { + return add(element); + } + @Override public boolean isEmpty() { // TODO Auto-generated method stub -- GitLab