Skip to content
Snippets Groups Projects
Commit 06feba96 authored by Christian Wulf's avatar Christian Wulf
Browse files

fixed analysis 16

parent 02db4ad2
No related branches found
No related tags found
No related merge requests found
......@@ -31,4 +31,4 @@
13: 3300 ns (recursive; argument/return w/o pipe; w/o pipeline class)
14: 21,000 ns (spsc pipe)
16: 14,500 ns (with distributor thread)
17: 9800 ns (as 16, but with direct feeding of SpScPipe)
17: 8600 ns (as 16, but with direct feeding of SpScPipe)
......@@ -16,11 +16,11 @@
package teetime.examples.throughput;
import java.util.List;
import java.util.concurrent.Callable;
import org.junit.Before;
import org.junit.Test;
import teetime.examples.throughput.methodcall.Closure;
import teetime.examples.throughput.methodcall.MethodCallThroughputAnalysis16;
import teetime.util.ListUtil;
import teetime.util.StatisticsUtil;
......@@ -51,9 +51,9 @@ public class MethodCallThoughputTimestampAnalysis16Test {
final MethodCallThroughputAnalysis16 analysis = new MethodCallThroughputAnalysis16();
analysis.setNumNoopFilters(NUM_NOOP_FILTERS);
analysis.setInput(NUM_OBJECTS_TO_CREATE, new Callable<TimestampObject>() {
analysis.setInput(NUM_OBJECTS_TO_CREATE, new Closure<Void, TimestampObject>() {
@Override
public TimestampObject call() throws Exception {
public TimestampObject execute(final Void element) {
return new TimestampObject();
}
});
......@@ -70,4 +70,10 @@ public class MethodCallThoughputTimestampAnalysis16Test {
List<TimestampObject> timestampObjects = ListUtil.merge(analysis.getTimestampObjectsList());
StatisticsUtil.printStatistics(stopWatch.getDurationInNs(), timestampObjects);
}
public static void main(final String[] args) {
MethodCallThoughputTimestampAnalysis16Test test = new MethodCallThoughputTimestampAnalysis16Test();
test.before();
test.testWithManyObjects();
}
}
......@@ -2,5 +2,5 @@ package teetime.examples.throughput.methodcall;
public interface Closure<I, O> {
O execute(I element);
public abstract O execute(I element);
}
......@@ -18,7 +18,6 @@ package teetime.examples.throughput.methodcall;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Callable;
import teetime.examples.throughput.TimestampObject;
import teetime.examples.throughput.methodcall.stage.CollectorSink;
......@@ -41,7 +40,7 @@ public class MethodCallThroughputAnalysis16 extends Analysis {
private static final int NUM_WORKER_THREADS = Runtime.getRuntime().availableProcessors();
private int numInputObjects;
private Callable<TimestampObject> inputObjectCreator;
private Closure<Void, TimestampObject> inputObjectCreator;
private int numNoopFilters;
private final List<List<TimestampObject>> timestampObjectsList = new LinkedList<List<TimestampObject>>();
......@@ -54,7 +53,7 @@ public class MethodCallThroughputAnalysis16 extends Analysis {
@Override
public void init() {
super.init();
Pipeline<Void, TimestampObject> producerPipeline = this.buildProducerPipeline();
Pipeline<Void, TimestampObject> producerPipeline = this.buildProducerPipeline(this.numInputObjects, this.inputObjectCreator);
this.producerThread = new Thread(new RunnableStage(producerPipeline));
int numWorkerThreads = Math.min(NUM_WORKER_THREADS, 1); // only for testing purpose
......@@ -78,8 +77,8 @@ public class MethodCallThroughputAnalysis16 extends Analysis {
}
}
private Pipeline<Void, TimestampObject> buildProducerPipeline() {
final ObjectProducer<TimestampObject> objectProducer = new ObjectProducer<TimestampObject>(this.numInputObjects, this.inputObjectCreator);
private Pipeline<Void, TimestampObject> buildProducerPipeline(final int numInputObjects, final Closure<Void, TimestampObject> inputObjectCreator) {
final ObjectProducer<TimestampObject> objectProducer = new ObjectProducer<TimestampObject>(numInputObjects, inputObjectCreator);
this.distributor = new Distributor<TimestampObject>();
final Pipeline<Void, TimestampObject> pipeline = new Pipeline<Void, TimestampObject>();
......@@ -146,7 +145,7 @@ public class MethodCallThroughputAnalysis16 extends Analysis {
}
}
public void setInput(final int numInputObjects, final Callable<TimestampObject> inputObjectCreator) {
public void setInput(final int numInputObjects, final Closure<Void, TimestampObject> inputObjectCreator) {
this.numInputObjects = numInputObjects;
this.inputObjectCreator = inputObjectCreator;
}
......
......@@ -87,7 +87,7 @@ public class MethodCallThroughputAnalysis17 extends Analysis {
// this.producerThread.start();
// this.producerThread.run();
new RunnableStage(producerPipeline).run();
// new RunnableStage(producerPipeline).run();
// Pipeline<Void, TimestampObject> stage = producerPipeline;
// stage.onStart();
......
package teetime.examples.throughput.methodcall;
public class SchedulingInformation {
private boolean active = true;
public boolean isActive() {
return this.active;
}
public void setActive(final boolean active) {
this.active = active;
}
}
......@@ -12,7 +12,7 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> {
private final OutputPort<O> outputPort = new OutputPort<O>();
// protected final CommittableQueue<O> outputElements = new CommittableResizableArrayQueue<O>(null, 4);
protected final CommittableQueue<O> outputElements = null;
private final CommittableQueue<O> outputElements = null;
private Stage<?, ?> parentStage;
......
......@@ -6,7 +6,6 @@ import java.util.List;
import teetime.examples.throughput.methodcall.InputPort;
import teetime.examples.throughput.methodcall.OutputPort;
import teetime.examples.throughput.methodcall.SchedulingInformation;
import teetime.examples.throughput.methodcall.Stage;
import teetime.examples.throughput.methodcall.StageWithPort;
import teetime.util.list.CommittableQueue;
......@@ -14,13 +13,11 @@ import teetime.util.list.CommittableQueue;
public class Pipeline<I, O> implements StageWithPort<I, O> {
private StageWithPort<I, ?> firstStage;
private final List<StageWithPort> intermediateStages = new LinkedList<StageWithPort>();
private final List<StageWithPort<?, ?>> intermediateStages = new LinkedList<StageWithPort<?, ?>>();
private StageWithPort<?, O> lastStage;
private final SchedulingInformation schedulingInformation = new SchedulingInformation();
private StageWithPort[] stages;
private Stage parentStage;
private StageWithPort<?, ?>[] stages;
private Stage<?, ?> parentStage;
private int index;
private int startIndex;
......@@ -31,7 +28,7 @@ public class Pipeline<I, O> implements StageWithPort<I, O> {
this.firstStage = stage;
}
public void addIntermediateStages(final StageWithPort... stages) {
public void addIntermediateStages(final StageWithPort<?, ?>... stages) {
this.intermediateStages.addAll(Arrays.asList(stages));
}
......@@ -59,8 +56,8 @@ public class Pipeline<I, O> implements StageWithPort<I, O> {
// queue = stage.execute2(queue);
// }
this.stages[0].execute2(elements);
this.setReschedulable(this.stages[0].isReschedulable());
this.firstStage.execute2(elements);
this.setReschedulable(this.firstStage.isReschedulable());
return queue;
}
......@@ -132,7 +129,7 @@ public class Pipeline<I, O> implements StageWithPort<I, O> {
for (int i = 0; i < this.stages.length; i++) {
StageWithPort<?, ?> stage = this.stages[i];
stage.setParentStage(this, i);
// stage.setParentStage(this, i);
// stage.setListener(this);
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment