diff --git a/src/main/java/teetime/framework/AbstractIntraThreadPipe.java b/src/main/java/teetime/framework/AbstractIntraThreadPipe.java index 4efd0cbfd14ab0fb53f3f74053c0d4400dbc10f2..47f3af1d875c0ffbbb6140d1929d2e5793d28671 100644 --- a/src/main/java/teetime/framework/AbstractIntraThreadPipe.java +++ b/src/main/java/teetime/framework/AbstractIntraThreadPipe.java @@ -25,6 +25,11 @@ public abstract class AbstractIntraThreadPipe extends AbstractPipe { super(sourcePort, targetPort); } + @Override + public boolean addNonBlocking(final Object element) { + return add(element); + } + @Override public final void sendSignal(final ISignal signal) { // getTargetPort is always non-null since the framework adds dummy ports if necessary diff --git a/src/main/java/teetime/framework/OutputPort.java b/src/main/java/teetime/framework/OutputPort.java index 50a54d6a4d2af89c013de029dfc04b71ee242142..904e567b6b9e11d5cef3d02ccc476a84cc6cea13 100644 --- a/src/main/java/teetime/framework/OutputPort.java +++ b/src/main/java/teetime/framework/OutputPort.java @@ -24,6 +24,18 @@ public final class OutputPort<T> extends AbstractPort<T> { super(); } + /** + * + * Guarantees the delivery of the given <code>element</code>. + * + * @param element + * to be sent; May not be <code>null</code>. + * + */ + public void send(final T element) { + this.pipe.add(element); + } + /** * * @param element @@ -31,8 +43,8 @@ public final class OutputPort<T> extends AbstractPort<T> { * * @return <code>true</code> iff the <code>element</code> was sent; <code>false</code> otherwise. */ - public boolean send(final T element) { - return this.pipe.add(element); + public boolean sendNonBlocking(final T element) { + return this.pipe.addNonBlocking(element); } /** diff --git a/src/main/java/teetime/framework/pipe/DummyPipe.java b/src/main/java/teetime/framework/pipe/DummyPipe.java index 0ca1ea9b07584f9b768a93a5b8a65381fb03a71f..6cdd1fe960adb09b213458e366915964ee8b859d 100644 --- a/src/main/java/teetime/framework/pipe/DummyPipe.java +++ b/src/main/java/teetime/framework/pipe/DummyPipe.java @@ -32,7 +32,12 @@ public final class DummyPipe implements IPipe { @Override public boolean add(final Object element) { - return false; + return true; + } + + @Override + public boolean addNonBlocking(final Object element) { + return add(element); } @Override diff --git a/src/main/java/teetime/framework/pipe/IPipe.java b/src/main/java/teetime/framework/pipe/IPipe.java index f15b9f5e9d9066f49254ca70d8a54c9cef2e8e95..5758e345fd01cea5607a0545bc197a0a9d51e6d0 100644 --- a/src/main/java/teetime/framework/pipe/IPipe.java +++ b/src/main/java/teetime/framework/pipe/IPipe.java @@ -31,7 +31,16 @@ public interface IPipe { * Element which will be added * @return <code>true</code> if the element could be added, false otherwise */ - boolean add(Object element); + boolean add(Object element); // TODO correct javadoc: no return type since guarantee of element delivery + + /** + * Adds an element to the Pipe. + * + * @param element + * Element which will be added + * @return <code>true</code> if the element could be added, false otherwise + */ + boolean addNonBlocking(Object element); /** * Checks whether the pipe is empty or not. diff --git a/src/main/java/teetime/framework/pipe/RelayTestPipe.java b/src/main/java/teetime/framework/pipe/RelayTestPipe.java index 3b112db9ddd9ab4b966bc3f406f32ebe7690a99b..9afebf5f51f6fd485ec8f580dfa237b61c8e58d4 100644 --- a/src/main/java/teetime/framework/pipe/RelayTestPipe.java +++ b/src/main/java/teetime/framework/pipe/RelayTestPipe.java @@ -34,6 +34,11 @@ public final class RelayTestPipe<T> extends AbstractInterThreadPipe { return false; } + @Override + public boolean addNonBlocking(final Object element) { + return add(element); + } + @Override public T removeLast() { if (this.numInputObjects == 0) { diff --git a/src/main/java/teetime/framework/pipe/SpScPipe.java b/src/main/java/teetime/framework/pipe/SpScPipe.java index b4d5d0a5f74c896f260715dda516b8c5017ccd3b..106506de63287c84e55649fb665c472531943805 100644 --- a/src/main/java/teetime/framework/pipe/SpScPipe.java +++ b/src/main/java/teetime/framework/pipe/SpScPipe.java @@ -47,17 +47,22 @@ public final class SpScPipe extends AbstractInterThreadPipe { @Override public boolean add(final Object element) { // BETTER introduce a QueueIsFullStrategy - // while (!this.queue.offer(element)) { - // this.numWaits++; - // // Thread.yield(); - // try { - // Thread.sleep(0); - // } catch (InterruptedException e) { - // // TODO Auto-generated catch block - // e.printStackTrace(); - // } - // } + while (!this.queue.offer(element)) { + this.numWaits++; + // Thread.yield(); + try { + Thread.sleep(1); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } // this.reportNewElement(); + return true; + } + + @Override + public boolean addNonBlocking(final Object element) { return this.queue.offer(element); } diff --git a/src/main/java/teetime/framework/pipe/UnboundedSpScPipe.java b/src/main/java/teetime/framework/pipe/UnboundedSpScPipe.java index 424e6b494812c8cfd548d630d10be832b2861bf5..ded5c049f8ab3d3a28f1bfe07bb5d3aab5ad2658 100644 --- a/src/main/java/teetime/framework/pipe/UnboundedSpScPipe.java +++ b/src/main/java/teetime/framework/pipe/UnboundedSpScPipe.java @@ -41,6 +41,11 @@ public final class UnboundedSpScPipe extends AbstractInterThreadPipe { return this.queue.offer(element); } + @Override + public boolean addNonBlocking(final Object element) { + return add(element); + } + @Override public Object removeLast() { return this.queue.poll(); diff --git a/src/main/java/teetime/stage/basic/distributor/RoundRobinStrategy2.java b/src/main/java/teetime/stage/basic/distributor/RoundRobinStrategy2.java index 49350b9751850ccf68696d0c0db5a6cd22eb0089..402daddc74d99e019bb848bfef27b1b3a9b39d04 100644 --- a/src/main/java/teetime/stage/basic/distributor/RoundRobinStrategy2.java +++ b/src/main/java/teetime/stage/basic/distributor/RoundRobinStrategy2.java @@ -18,13 +18,14 @@ package teetime.stage.basic.distributor; import teetime.framework.OutputPort; /** - * @author Nils Christian Ehmke + * @author Nils Christian Ehmke, Christian Wulf * * @since 1.0 */ public final class RoundRobinStrategy2 implements IDistributorStrategy { private int index = 0; + private int numWaits; @Override public <T> boolean distribute(final OutputPort<T>[] outputPorts, final T element) { @@ -33,9 +34,10 @@ public final class RoundRobinStrategy2 implements IDistributorStrategy { boolean success; do { OutputPort<T> outputPort = getNextPortInRoundRobinOrder(outputPorts); - success = outputPort.send(element); + success = outputPort.sendNonBlocking(element); numLoops--; if (0 == numLoops) { + numWaits++; // Thread.yield(); try { Thread.sleep(1); @@ -59,4 +61,8 @@ public final class RoundRobinStrategy2 implements IDistributorStrategy { return outputPort; } + public int getNumWaits() { + return numWaits; + } + } diff --git a/src/test/java/teetime/stage/basic/merger/MergerTestingPipe.java b/src/test/java/teetime/stage/basic/merger/MergerTestingPipe.java index 5cf29ff72592bce09de069099b75406e03df3e35..9b0e3904c1ef14a643ab5620fb6df67ce4f03747 100644 --- a/src/test/java/teetime/stage/basic/merger/MergerTestingPipe.java +++ b/src/test/java/teetime/stage/basic/merger/MergerTestingPipe.java @@ -51,6 +51,11 @@ public class MergerTestingPipe implements IPipe { return false; } + @Override + public boolean addNonBlocking(final Object element) { + return add(element); + } + @Override public boolean isEmpty() { // TODO Auto-generated method stub