Skip to content
Snippets Groups Projects
Commit 53ff970a authored by Christian Wulf's avatar Christian Wulf
Browse files

added InterThreadPipe;

added RelayTestPipe;
parent 30e22232
No related branches found
No related tags found
No related merge requests found
Showing
with 111 additions and 67 deletions
......@@ -53,6 +53,7 @@ public abstract class AbstractStage implements StageWithPort {
outputPort.reportNewElement();
return true;
// return outputPort.send(element);
}
@SuppressWarnings("unchecked")
......
......@@ -4,19 +4,19 @@ import teetime.util.list.CommittableResizableArrayQueue;
import teetime.variant.methodcallWithPorts.framework.core.InputPort;
import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
public class Pipe<T> extends IntraThreadPipe<T> {
public final class CommittablePipe<T> extends IntraThreadPipe<T> {
private final CommittableResizableArrayQueue<T> elements = new CommittableResizableArrayQueue<T>(null, 4);
@Deprecated
public static <T> void connect(final OutputPort<T> sourcePort, final InputPort<T> targetPort) {
IPipe<T> pipe = new Pipe<T>();
IPipe<T> pipe = new CommittablePipe<T>();
pipe.connectPorts(sourcePort, targetPort);
}
/*
* (non-Javadoc)
*
*
* @see teetime.examples.throughput.methodcall.IPipe#add(T)
*/
@Override
......@@ -28,7 +28,7 @@ public class Pipe<T> extends IntraThreadPipe<T> {
/*
* (non-Javadoc)
*
*
* @see teetime.examples.throughput.methodcall.IPipe#removeLast()
*/
@Override
......@@ -40,7 +40,7 @@ public class Pipe<T> extends IntraThreadPipe<T> {
/*
* (non-Javadoc)
*
*
* @see teetime.examples.throughput.methodcall.IPipe#isEmpty()
*/
@Override
......@@ -50,7 +50,7 @@ public class Pipe<T> extends IntraThreadPipe<T> {
/*
* (non-Javadoc)
*
*
* @see teetime.examples.throughput.methodcall.IPipe#readLast()
*/
@Override
......
package teetime.variant.methodcallWithPorts.framework.core.pipe;
import java.util.concurrent.atomic.AtomicReference;
import teetime.variant.methodcallWithPorts.framework.core.signal.Signal;
public abstract class InterThreadPipe<T> extends AbstractPipe<T> {
private final AtomicReference<Signal> signal = new AtomicReference<Signal>();
@Override
public void setSignal(final Signal signal) {
this.signal.lazySet(signal); // lazySet is legal due to our single-writer requirement
}
public Signal getSignal() {
return this.signal.get();
}
@Override
public void reportNewElement() {
// do nothing
}
}
......@@ -4,7 +4,7 @@ import teetime.util.concurrent.workstealing.CircularArray;
import teetime.variant.methodcallWithPorts.framework.core.InputPort;
import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
public class OrderedGrowableArrayPipe<T> extends IntraThreadPipe<T> {
public final class OrderedGrowableArrayPipe<T> extends IntraThreadPipe<T> {
private CircularArray<T> elements;
private int head;
......
package teetime.variant.methodcallWithPorts.framework.core.pipe;
import teetime.util.ConstructorClosure;
public final class RelayTestPipe<T> extends InterThreadPipe<T> {
private int numInputObjects;
private final ConstructorClosure<T> inputObjectCreator;
public RelayTestPipe(final int numInputObjects,
final ConstructorClosure<T> inputObjectCreator) {
this.numInputObjects = numInputObjects;
this.inputObjectCreator = inputObjectCreator;
}
@Override
public boolean add(final T element) {
return false;
}
@Override
public T removeLast() {
if (this.numInputObjects == 0) {
return null;
} else {
this.numInputObjects--;
return this.inputObjectCreator.create();
}
}
@Override
public boolean isEmpty() {
return (this.numInputObjects == 0);
}
@Override
public int size() {
return this.numInputObjects;
}
@Override
public T readLast() {
return null;
}
}
......@@ -3,10 +3,14 @@ package teetime.variant.methodcallWithPorts.framework.core.pipe;
import teetime.variant.methodcallWithPorts.framework.core.InputPort;
import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
public class SingleElementPipe<T> extends IntraThreadPipe<T> {
public final class SingleElementPipe<T> extends IntraThreadPipe<T> {
private T element;
SingleElementPipe() {
super();
}
@Deprecated
public static <T> void connect(final OutputPort<T> sourcePort, final InputPort<T> targetPort) {
IPipe<T> pipe = new SingleElementPipe<T>();
......
package teetime.variant.methodcallWithPorts.framework.core.pipe;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicReference;
import org.jctools.queues.QueueFactory;
import org.jctools.queues.spec.ConcurrentQueueSpec;
......@@ -10,12 +9,10 @@ import org.jctools.queues.spec.Preference;
import teetime.variant.methodcallWithPorts.framework.core.InputPort;
import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
import teetime.variant.methodcallWithPorts.framework.core.signal.Signal;
public class SpScPipe<T> extends AbstractPipe<T> {
public final class SpScPipe<T> extends InterThreadPipe<T> {
private final Queue<T> queue;
private final AtomicReference<Signal> signal = new AtomicReference<Signal>();
// statistics
private int numWaits;
......@@ -67,18 +64,4 @@ public class SpScPipe<T> extends AbstractPipe<T> {
return this.numWaits;
}
@Override
public void setSignal(final Signal signal) {
this.signal.lazySet(signal); // lazySet is legal due to our single-writer requirement
}
public Signal getSignal() {
return this.signal.get();
}
@Override
public void reportNewElement() {
// do nothing
}
}
......@@ -3,7 +3,7 @@ package teetime.variant.methodcallWithPorts.framework.core.pipe;
import teetime.variant.methodcallWithPorts.framework.core.InputPort;
import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
public class UnorderedGrowablePipe<T> extends IntraThreadPipe<T> {
public final class UnorderedGrowablePipe<T> extends IntraThreadPipe<T> {
private final int MIN_CAPACITY;
......@@ -12,7 +12,7 @@ public class UnorderedGrowablePipe<T> extends IntraThreadPipe<T> {
private int lastFreeIndex;
@SuppressWarnings("unchecked")
public UnorderedGrowablePipe() {
UnorderedGrowablePipe() {
this.MIN_CAPACITY = 4;
this.elements = (T[]) new Object[this.MIN_CAPACITY];
}
......@@ -23,12 +23,6 @@ public class UnorderedGrowablePipe<T> extends IntraThreadPipe<T> {
pipe.connectPorts(sourcePort, targetPort);
}
@Override
public void connectPorts(final OutputPort<T> sourcePort, final InputPort<T> targetPort) {
sourcePort.setPipe(this);
targetPort.setPipe(this);
}
@Override
public boolean add(final T element) {
if (this.lastFreeIndex == this.elements.length) {
......
......@@ -2,14 +2,14 @@ package teetime.variant.methodcallWithPorts.stage;
import teetime.variant.methodcallWithPorts.framework.core.InputPort;
import teetime.variant.methodcallWithPorts.framework.core.ProducerStage;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
import teetime.variant.methodcallWithPorts.framework.core.pipe.InterThreadPipe;
import teetime.variant.methodcallWithPorts.framework.core.signal.TerminatingSignal;
public class Relay<T> extends ProducerStage<T> {
private final InputPort<T> inputPort = this.createInputPort();
private SpScPipe<T> cachedCastedInputPipe;
private InterThreadPipe<T> cachedCastedInputPipe;
@Override
public void execute() {
......@@ -26,7 +26,7 @@ public class Relay<T> extends ProducerStage<T> {
@Override
public void onStarting() {
this.cachedCastedInputPipe = (SpScPipe<T>) this.inputPort.getPipe();
this.cachedCastedInputPipe = (InterThreadPipe<T>) this.inputPort.getPipe();
super.onStarting();
}
......
......@@ -17,7 +17,7 @@ import teetime.variant.explicitScheduling.examples.throughput.TimestampObject;
public abstract class PerformanceTest {
protected static final int NUM_OBJECTS_TO_CREATE = 100000;
protected static final int NUM_OBJECTS_TO_CREATE = 1000000;
protected static final int NUM_NOOP_FILTERS = 800;
public static final MeasurementRepository measurementRepository = new MeasurementRepository();
......
......@@ -23,7 +23,7 @@ import teetime.variant.explicitScheduling.framework.core.Analysis;
import teetime.variant.methodcallWithPorts.framework.core.HeadPipeline;
import teetime.variant.methodcallWithPorts.framework.core.HeadStage;
import teetime.variant.methodcallWithPorts.framework.core.RunnableStage;
import teetime.variant.methodcallWithPorts.framework.core.pipe.Pipe;
import teetime.variant.methodcallWithPorts.framework.core.pipe.CommittablePipe;
import teetime.variant.methodcallWithPorts.stage.CollectorSink;
import teetime.variant.methodcallWithPorts.stage.NoopFilter;
import teetime.variant.methodcallWithPorts.stage.ObjectProducer;
......@@ -71,13 +71,13 @@ public class MethodCallThroughputAnalysis9 extends Analysis {
pipeline.setFirstStage(objectProducer);
pipeline.setLastStage(collectorSink);
Pipe.connect(objectProducer.getOutputPort(), startTimestampFilter.getInputPort());
Pipe.connect(startTimestampFilter.getOutputPort(), noopFilters[0].getInputPort());
CommittablePipe.connect(objectProducer.getOutputPort(), startTimestampFilter.getInputPort());
CommittablePipe.connect(startTimestampFilter.getOutputPort(), noopFilters[0].getInputPort());
for (int i = 0; i < noopFilters.length - 1; i++) {
Pipe.connect(noopFilters[i].getOutputPort(), noopFilters[i + 1].getInputPort());
CommittablePipe.connect(noopFilters[i].getOutputPort(), noopFilters[i + 1].getInputPort());
}
Pipe.connect(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort());
Pipe.connect(stopTimestampFilter.getOutputPort(), collectorSink.getInputPort());
CommittablePipe.connect(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort());
CommittablePipe.connect(stopTimestampFilter.getOutputPort(), collectorSink.getInputPort());
return pipeline;
}
......
......@@ -28,6 +28,6 @@ public class ChwHomePerformanceCheck implements PerformanceCheckProfile {
System.out.println("speedupC: " + speedupC);
assertEquals(2, speedupB, 0.3);
assertEquals(3.5, speedupC, 0.3);
assertEquals(3.6, speedupC, 0.3);
}
}
......@@ -25,10 +25,10 @@ import teetime.variant.explicitScheduling.framework.core.Analysis;
import teetime.variant.methodcallWithPorts.framework.core.HeadPipeline;
import teetime.variant.methodcallWithPorts.framework.core.RunnableStage;
import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
import teetime.variant.methodcallWithPorts.framework.core.pipe.DummyPipe;
import teetime.variant.methodcallWithPorts.framework.core.pipe.IPipe;
import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory;
import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.PipeOrdering;
import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.ThreadCommunication;
import teetime.variant.methodcallWithPorts.framework.core.pipe.RelayTestPipe;
import teetime.variant.methodcallWithPorts.framework.core.pipe.UnorderedGrowablePipe;
import teetime.variant.methodcallWithPorts.framework.core.signal.TerminatingSignal;
import teetime.variant.methodcallWithPorts.stage.CollectorSink;
......@@ -110,6 +110,7 @@ public class MethodCallThroughputAnalysis17 extends Analysis {
super.init();
}
@SuppressWarnings("unchecked")
private HeadPipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> buildProducerPipeline(final int numInputObjects,
final ConstructorClosure<TimestampObject> inputObjectCreator) {
final ObjectProducer<TimestampObject> objectProducer = new ObjectProducer<TimestampObject>(numInputObjects, inputObjectCreator);
......@@ -117,21 +118,19 @@ public class MethodCallThroughputAnalysis17 extends Analysis {
Sink<TimestampObject> sink = new Sink<TimestampObject>();
Sink<Void> endStage = new Sink<Void>();
// UnorderedGrowablePipe.connect(objectProducer.getOutputPort(), sink.getInputPort());
// objectProducer.getOutputPort().pipe = new UnorderedGrowablePipe<TimestampObject>();
UnorderedGrowablePipe.connect(objectProducer.getOutputPort(), distributor.getInputPort());
distributor.getNewOutputPort().setPipe(new DummyPipe());
final HeadPipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> pipeline = new HeadPipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>>();
pipeline.setFirstStage(objectProducer);
// pipeline.setFirstStage(sink);
// pipeline.setFirstStage(endStage);
pipeline.setLastStage(distributor);
// pipeline.setLastStage(sink);
// pipeline.setLastStage(new EndStage<TimestampObject>());
// UnorderedGrowablePipe.connect(objectProducer.getOutputPort(), sink.getInputPort());
// objectProducer.getOutputPort().pipe = new UnorderedGrowablePipe<TimestampObject>();
UnorderedGrowablePipe.connect(objectProducer.getOutputPort(), distributor.getInputPort());
distributor.getNewOutputPort().setPipe(new UnorderedGrowablePipe<TimestampObject>());
return pipeline;
}
......@@ -152,21 +151,11 @@ public class MethodCallThroughputAnalysis17 extends Analysis {
final StopTimestampFilter stopTimestampFilter = new StopTimestampFilter();
final CollectorSink<TimestampObject> collectorSink = new CollectorSink<TimestampObject>(timestampObjects);
final HeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new HeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>>();
pipeline.setFirstStage(relay);
pipeline.setLastStage(collectorSink);
IPipe<TimestampObject> pipe = this.pipeFactory.create(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false, SPSC_INITIAL_CAPACITY);
relay.getInputPort().setPipe(pipe);
IPipe<TimestampObject> startPipe = relay.getInputPort().getPipe();
for (int i = 0; i < this.numInputObjects; i++) {
startPipe.add(this.inputObjectCreator.create());
}
// startPipe.close();
IPipe<TimestampObject> startPipe = new RelayTestPipe<TimestampObject>(this.numInputObjects, this.inputObjectCreator);
startPipe.setSignal(new TerminatingSignal());
relay.getInputPort().setPipe(startPipe);
UnorderedGrowablePipe.connect(relay.getOutputPort(), startTimestampFilter.getInputPort());
UnorderedGrowablePipe.connect(startTimestampFilter.getOutputPort(), noopFilters[0].getInputPort());
for (int i = 0; i < noopFilters.length - 1; i++) {
UnorderedGrowablePipe.connect(noopFilters[i].getOutputPort(), noopFilters[i + 1].getInputPort());
......@@ -174,6 +163,9 @@ public class MethodCallThroughputAnalysis17 extends Analysis {
UnorderedGrowablePipe.connect(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort());
UnorderedGrowablePipe.connect(stopTimestampFilter.getOutputPort(), collectorSink.getInputPort());
final HeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new HeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>>();
pipeline.setFirstStage(relay);
pipeline.setLastStage(collectorSink);
return pipeline;
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment