Skip to content
Snippets Groups Projects
Commit 4749cca9 authored by Christian Wulf's avatar Christian Wulf
Browse files

Merge branch 'interthread-signal' into 'master'

Interthread signal

solved issue #45

See merge request !8
parents b043b877 66659f8f
No related branches found
No related tags found
No related merge requests found
......@@ -18,7 +18,7 @@ public final class OutputPort<T> extends AbstractPort<T> {
}
public void sendSignal(final ISignal signal) {
this.pipe.setSignal(signal);
this.pipe.sendSignal(signal);
}
public void reportNewElement() {
......
......@@ -44,7 +44,7 @@ public final class DummyPipe implements IPipe {
}
@Override
public void setSignal(final ISignal signal) {}
public void sendSignal(final ISignal signal) {}
@Override
public void connectPorts(final OutputPort sourcePort, final InputPort targetPort) {}
......
......@@ -18,7 +18,7 @@ public interface IPipe {
InputPort<?> getTargetPort();
void setSignal(ISignal signal);
void sendSignal(ISignal signal);
@Deprecated
<T> void connectPorts(OutputPort<? extends T> sourcePort, InputPort<T> targetPort);
......
package teetime.framework.pipe;
import java.util.concurrent.atomic.AtomicReference;
import java.util.Queue;
import org.jctools.queues.QueueFactory;
import org.jctools.queues.spec.ConcurrentQueueSpec;
import org.jctools.queues.spec.Ordering;
import org.jctools.queues.spec.Preference;
import teetime.framework.InputPort;
import teetime.framework.OutputPort;
......@@ -8,19 +13,24 @@ import teetime.framework.signal.ISignal;
public abstract class InterThreadPipe extends AbstractPipe {
private final AtomicReference<ISignal> signal = new AtomicReference<ISignal>();
private final Queue<ISignal> signalQueue = QueueFactory.newQueue(new ConcurrentQueueSpec(1, 1, 0, Ordering.FIFO, Preference.THROUGHPUT));
<T> InterThreadPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
super(sourcePort, targetPort);
}
@Override
public void setSignal(final ISignal signal) {
this.signal.lazySet(signal); // lazySet is legal due to our single-writer requirement
public void sendSignal(final ISignal signal) {
this.signalQueue.offer(signal);
}
/**
* Retrieves and removes the head of the signal queue
*
* @return Head of signal queue, <code>null</code> if signal queue is empty.
*/
public ISignal getSignal() {
return this.signal.get();
return this.signalQueue.poll();
}
@Override
......
......@@ -11,7 +11,7 @@ public abstract class IntraThreadPipe extends AbstractPipe {
}
@Override
public void setSignal(final ISignal signal) {
public void sendSignal(final ISignal signal) {
if (this.getTargetPort() != null) { // BETTER remove this check since there are DummyPorts
this.cachedTargetStage.onSignal(signal, this.getTargetPort());
}
......
package teetime.stage;
import teetime.framework.ProducerStage;
public class IterableProducer<O extends Iterable<T>, T> extends ProducerStage<T> {
private O iter = null;
public IterableProducer(final O iter) {
this.iter = iter;
}
@Override
protected void execute() {
for (T i : iter) {
this.send(this.outputPort, i);
}
}
}
......@@ -149,7 +149,7 @@ public class MethodCallThroughputAnalysis17 {
final CollectorSink<TimestampObject> collectorSink = new CollectorSink<TimestampObject>(timestampObjects);
IPipe startPipe = new RelayTestPipe<TimestampObject>(this.numInputObjects, this.inputObjectCreator);
startPipe.setSignal(new TerminatingSignal());
startPipe.sendSignal(new TerminatingSignal());
relay.getInputPort().setPipe(startPipe);
UnorderedGrowablePipe.connect(relay.getOutputPort(), startTimestampFilter.getInputPort());
......
package teetime.framework.pipe;
import java.util.ArrayList;
import java.util.List;
import org.junit.Assert;
import org.junit.Test;
import teetime.framework.InputPort;
import teetime.framework.OutputPort;
import teetime.framework.signal.ISignal;
import teetime.framework.signal.StartingSignal;
import teetime.framework.signal.TerminatingSignal;
import teetime.framework.signal.ValidatingSignal;
public class SpScPipeTest {
@Test
public void testSignalOrdering() throws Exception {
OutputPort<? extends Object> sourcePort = null;
InputPort<Object> targetPort = null;
InterThreadPipe pipe = new SpScPipe(sourcePort, targetPort, 1); // IPipe does not provide getSignal method
List<ISignal> list = new ArrayList<ISignal>();
list.add(new StartingSignal());
list.add(new TerminatingSignal());
list.add(new ValidatingSignal());
list.add(new StartingSignal());
list.add(new TerminatingSignal());
list.add(new ValidatingSignal());
list.add(new StartingSignal());
list.add(new TerminatingSignal());
list.add(new ValidatingSignal());
for (ISignal s : list) {
pipe.sendSignal(s);
}
List<ISignal> secondList = new ArrayList<ISignal>();
while (true) {
ISignal temp = pipe.getSignal();
if (temp == null) {
break;
}
secondList.add(temp);
}
Assert.assertEquals(list, secondList);
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment