Skip to content
Snippets Groups Projects
Commit aa7b2c93 authored by Christian Wulf's avatar Christian Wulf
Browse files

added a working more efficient distributor strategy

parent 35b33af2
No related branches found
No related tags found
No related merge requests found
......@@ -25,6 +25,11 @@ public abstract class AbstractIntraThreadPipe extends AbstractPipe {
super(sourcePort, targetPort);
}
@Override
public boolean addNonBlocking(final Object element) {
return add(element);
}
@Override
public final void sendSignal(final ISignal signal) {
// getTargetPort is always non-null since the framework adds dummy ports if necessary
......
......
......@@ -24,6 +24,18 @@ public final class OutputPort<T> extends AbstractPort<T> {
super();
}
/**
*
* Guarantees the delivery of the given <code>element</code>.
*
* @param element
* to be sent; May not be <code>null</code>.
*
*/
public void send(final T element) {
this.pipe.add(element);
}
/**
*
* @param element
......@@ -31,8 +43,8 @@ public final class OutputPort<T> extends AbstractPort<T> {
*
* @return <code>true</code> iff the <code>element</code> was sent; <code>false</code> otherwise.
*/
public boolean send(final T element) {
return this.pipe.add(element);
public boolean sendNonBlocking(final T element) {
return this.pipe.addNonBlocking(element);
}
/**
......
......
......@@ -32,7 +32,12 @@ public final class DummyPipe implements IPipe {
@Override
public boolean add(final Object element) {
return false;
return true;
}
@Override
public boolean addNonBlocking(final Object element) {
return add(element);
}
@Override
......
......
......@@ -31,7 +31,16 @@ public interface IPipe {
* Element which will be added
* @return <code>true</code> if the element could be added, false otherwise
*/
boolean add(Object element);
boolean add(Object element); // TODO correct javadoc: no return type since guarantee of element delivery
/**
* Adds an element to the Pipe.
*
* @param element
* Element which will be added
* @return <code>true</code> if the element could be added, false otherwise
*/
boolean addNonBlocking(Object element);
/**
* Checks whether the pipe is empty or not.
......
......
......@@ -34,6 +34,11 @@ public final class RelayTestPipe<T> extends AbstractInterThreadPipe {
return false;
}
@Override
public boolean addNonBlocking(final Object element) {
return add(element);
}
@Override
public T removeLast() {
if (this.numInputObjects == 0) {
......
......
......@@ -47,17 +47,22 @@ public final class SpScPipe extends AbstractInterThreadPipe {
@Override
public boolean add(final Object element) {
// BETTER introduce a QueueIsFullStrategy
// while (!this.queue.offer(element)) {
// this.numWaits++;
// // Thread.yield();
// try {
// Thread.sleep(0);
// } catch (InterruptedException e) {
// // TODO Auto-generated catch block
// e.printStackTrace();
// }
// }
while (!this.queue.offer(element)) {
this.numWaits++;
// Thread.yield();
try {
Thread.sleep(1);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
// this.reportNewElement();
return true;
}
@Override
public boolean addNonBlocking(final Object element) {
return this.queue.offer(element);
}
......
......
......@@ -41,6 +41,11 @@ public final class UnboundedSpScPipe extends AbstractInterThreadPipe {
return this.queue.offer(element);
}
@Override
public boolean addNonBlocking(final Object element) {
return add(element);
}
@Override
public Object removeLast() {
return this.queue.poll();
......
......
......@@ -18,13 +18,14 @@ package teetime.stage.basic.distributor;
import teetime.framework.OutputPort;
/**
* @author Nils Christian Ehmke
* @author Nils Christian Ehmke, Christian Wulf
*
* @since 1.0
*/
public final class RoundRobinStrategy2 implements IDistributorStrategy {
private int index = 0;
private int numWaits;
@Override
public <T> boolean distribute(final OutputPort<T>[] outputPorts, final T element) {
......@@ -33,9 +34,10 @@ public final class RoundRobinStrategy2 implements IDistributorStrategy {
boolean success;
do {
OutputPort<T> outputPort = getNextPortInRoundRobinOrder(outputPorts);
success = outputPort.send(element);
success = outputPort.sendNonBlocking(element);
numLoops--;
if (0 == numLoops) {
numWaits++;
// Thread.yield();
try {
Thread.sleep(1);
......@@ -59,4 +61,8 @@ public final class RoundRobinStrategy2 implements IDistributorStrategy {
return outputPort;
}
public int getNumWaits() {
return numWaits;
}
}
......@@ -51,6 +51,11 @@ public class MergerTestingPipe implements IPipe {
return false;
}
@Override
public boolean addNonBlocking(final Object element) {
return add(element);
}
@Override
public boolean isEmpty() {
// TODO Auto-generated method stub
......
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment