Skip to content
Snippets Groups Projects
Commit d8fc64b3 authored by Christian Wulf's avatar Christian Wulf
Browse files

fixed perf test 19

parent 487bece8
No related branches found
No related tags found
No related merge requests found
...@@ -14,9 +14,7 @@ public class AnalysisConfiguration { ...@@ -14,9 +14,7 @@ public class AnalysisConfiguration {
protected static final PipeFactoryRegistry PIPE_FACTORY_REGISTRY = PipeFactoryRegistry.INSTANCE; protected static final PipeFactoryRegistry PIPE_FACTORY_REGISTRY = PipeFactoryRegistry.INSTANCE;
private final List<Stage> threadableStageJobs = new LinkedList<Stage>(); private final List<Stage> threadableStageJobs = new LinkedList<Stage>();
public AnalysisConfiguration() {} List<Stage> getThreadableStageJobs() {
List<Stage> getThreadableStageJobs() {
return this.threadableStageJobs; return this.threadableStageJobs;
} }
......
...@@ -19,6 +19,7 @@ import org.junit.FixMethodOrder; ...@@ -19,6 +19,7 @@ import org.junit.FixMethodOrder;
import org.junit.Test; import org.junit.Test;
import org.junit.runners.MethodSorters; import org.junit.runners.MethodSorters;
import teetime.framework.Analysis;
import teetime.util.ConstructorClosure; import teetime.util.ConstructorClosure;
import teetime.util.ListUtil; import teetime.util.ListUtil;
import teetime.util.TimestampObject; import teetime.util.TimestampObject;
...@@ -66,15 +67,16 @@ public class MethodCallThoughputTimestampAnalysis19Test extends PerformanceTest ...@@ -66,15 +67,16 @@ public class MethodCallThoughputTimestampAnalysis19Test extends PerformanceTest
System.out.println("Testing teetime (mc) with NUM_OBJECTS_TO_CREATE=" + NUM_OBJECTS_TO_CREATE + ", NUM_NOOP_FILTERS=" System.out.println("Testing teetime (mc) with NUM_OBJECTS_TO_CREATE=" + NUM_OBJECTS_TO_CREATE + ", NUM_NOOP_FILTERS="
+ NUM_NOOP_FILTERS + "..."); + NUM_NOOP_FILTERS + "...");
final MethodCallThroughputAnalysis19 analysis = new MethodCallThroughputAnalysis19(); final MethodCallThroughputAnalysis19 configuration = new MethodCallThroughputAnalysis19(numThreads, NUM_NOOP_FILTERS);
analysis.setNumWorkerThreads(numThreads); configuration.setInput(NUM_OBJECTS_TO_CREATE, new ConstructorClosure<TimestampObject>() {
analysis.setNumNoopFilters(NUM_NOOP_FILTERS);
analysis.setInput(NUM_OBJECTS_TO_CREATE, new ConstructorClosure<TimestampObject>() {
@Override @Override
public TimestampObject create() { public TimestampObject create() {
return new TimestampObject(); return new TimestampObject();
} }
}); });
configuration.build();
final Analysis analysis = new Analysis(configuration);
analysis.init(); analysis.init();
this.stopWatch.start(); this.stopWatch.start();
...@@ -84,7 +86,7 @@ public class MethodCallThoughputTimestampAnalysis19Test extends PerformanceTest ...@@ -84,7 +86,7 @@ public class MethodCallThoughputTimestampAnalysis19Test extends PerformanceTest
this.stopWatch.end(); this.stopWatch.end();
} }
this.timestampObjects = ListUtil.merge(analysis.getTimestampObjectsList()); this.timestampObjects = ListUtil.merge(configuration.getTimestampObjectsList());
} }
} }
...@@ -19,8 +19,8 @@ import java.util.ArrayList; ...@@ -19,8 +19,8 @@ import java.util.ArrayList;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import teetime.framework.AnalysisConfiguration;
import teetime.framework.OldHeadPipeline; import teetime.framework.OldHeadPipeline;
import teetime.framework.RunnableProducerStage;
import teetime.framework.pipe.OrderedGrowableArrayPipe; import teetime.framework.pipe.OrderedGrowableArrayPipe;
import teetime.framework.pipe.SpScPipe; import teetime.framework.pipe.SpScPipe;
import teetime.stage.CollectorSink; import teetime.stage.CollectorSink;
...@@ -30,6 +30,7 @@ import teetime.stage.Relay; ...@@ -30,6 +30,7 @@ import teetime.stage.Relay;
import teetime.stage.StartTimestampFilter; import teetime.stage.StartTimestampFilter;
import teetime.stage.StopTimestampFilter; import teetime.stage.StopTimestampFilter;
import teetime.stage.basic.distributor.Distributor; import teetime.stage.basic.distributor.Distributor;
import teetime.stage.io.EveryXthPrinter;
import teetime.util.ConstructorClosure; import teetime.util.ConstructorClosure;
import teetime.util.TimestampObject; import teetime.util.TimestampObject;
...@@ -38,39 +39,37 @@ import teetime.util.TimestampObject; ...@@ -38,39 +39,37 @@ import teetime.util.TimestampObject;
* *
* @since 1.10 * @since 1.10
*/ */
public class MethodCallThroughputAnalysis19 { public class MethodCallThroughputAnalysis19 extends AnalysisConfiguration {
private static final int SPSC_INITIAL_CAPACITY = 100100; private static final int SPSC_INITIAL_CAPACITY = 100100;
private static final int NUM_WORKER_THREADS = Runtime.getRuntime().availableProcessors(); private static final int NUM_WORKER_THREADS = Runtime.getRuntime().availableProcessors();
private int numInputObjects;
private ConstructorClosure<TimestampObject> inputObjectCreator;
private int numNoopFilters;
private final List<List<TimestampObject>> timestampObjectsList = new LinkedList<List<TimestampObject>>(); private final List<List<TimestampObject>> timestampObjectsList = new LinkedList<List<TimestampObject>>();
private Thread producerThread; private int numInputObjects;
private ConstructorClosure<TimestampObject> inputObjectCreator;
private Thread[] workerThreads; private final int numNoopFilters;
private int numWorkerThreads; private int numWorkerThreads;
public void init() { public MethodCallThroughputAnalysis19(final int numWorkerThreads, final int numNoopFilters) {
this.numWorkerThreads = numWorkerThreads;
this.numNoopFilters = numNoopFilters;
}
public void build() {
OldHeadPipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> producerPipeline = this.buildProducerPipeline(this.numInputObjects, OldHeadPipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> producerPipeline = this.buildProducerPipeline(this.numInputObjects,
this.inputObjectCreator); this.inputObjectCreator);
this.producerThread = new Thread(new RunnableProducerStage(producerPipeline)); addThreadableStage(producerPipeline);
this.numWorkerThreads = Math.min(NUM_WORKER_THREADS, this.numWorkerThreads); this.numWorkerThreads = Math.min(NUM_WORKER_THREADS, this.numWorkerThreads);
this.workerThreads = new Thread[this.numWorkerThreads]; for (int i = 0; i < numWorkerThreads; i++) {
for (int i = 0; i < this.workerThreads.length; i++) {
List<TimestampObject> resultList = new ArrayList<TimestampObject>(this.numInputObjects); List<TimestampObject> resultList = new ArrayList<TimestampObject>(this.numInputObjects);
this.timestampObjectsList.add(resultList); this.timestampObjectsList.add(resultList);
OldHeadPipeline<?, ?> pipeline = this.buildPipeline(producerPipeline.getLastStage(), resultList); OldHeadPipeline<?, ?> pipeline = this.buildPipeline(producerPipeline.getLastStage(), resultList);
this.workerThreads[i] = new Thread(new RunnableProducerStage(pipeline)); addThreadableStage(pipeline);
} }
} }
private OldHeadPipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> buildProducerPipeline(final int numInputObjects, private OldHeadPipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> buildProducerPipeline(final int numInputObjects,
...@@ -97,6 +96,7 @@ public class MethodCallThroughputAnalysis19 { ...@@ -97,6 +96,7 @@ public class MethodCallThroughputAnalysis19 {
noopFilters[i] = new NoopFilter<TimestampObject>(); noopFilters[i] = new NoopFilter<TimestampObject>();
} }
final StopTimestampFilter stopTimestampFilter = new StopTimestampFilter(); final StopTimestampFilter stopTimestampFilter = new StopTimestampFilter();
EveryXthPrinter<TimestampObject> everyXthPrinter = new EveryXthPrinter<TimestampObject>(100000);
final CollectorSink<TimestampObject> collectorSink = new CollectorSink<TimestampObject>(timestampObjects); final CollectorSink<TimestampObject> collectorSink = new CollectorSink<TimestampObject>(timestampObjects);
final OldHeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new OldHeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>>(); final OldHeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new OldHeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>>();
...@@ -112,36 +112,12 @@ public class MethodCallThroughputAnalysis19 { ...@@ -112,36 +112,12 @@ public class MethodCallThroughputAnalysis19 {
OrderedGrowableArrayPipe.connect(noopFilters[i].getOutputPort(), noopFilters[i + 1].getInputPort()); OrderedGrowableArrayPipe.connect(noopFilters[i].getOutputPort(), noopFilters[i + 1].getInputPort());
} }
OrderedGrowableArrayPipe.connect(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort()); OrderedGrowableArrayPipe.connect(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort());
OrderedGrowableArrayPipe.connect(stopTimestampFilter.getOutputPort(), collectorSink.getInputPort()); OrderedGrowableArrayPipe.connect(stopTimestampFilter.getOutputPort(), everyXthPrinter.getInputPort());
OrderedGrowableArrayPipe.connect(everyXthPrinter.getNewOutputPort(), collectorSink.getInputPort());
return pipeline; return pipeline;
} }
public void start() {
this.producerThread.start();
for (Thread workerThread : this.workerThreads) {
workerThread.start();
}
try {
this.producerThread.join();
} catch (InterruptedException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
try {
for (Thread workerThread : this.workerThreads) {
workerThread.join();
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void setInput(final int numInputObjects, final ConstructorClosure<TimestampObject> inputObjectCreator) { public void setInput(final int numInputObjects, final ConstructorClosure<TimestampObject> inputObjectCreator) {
this.numInputObjects = numInputObjects; this.numInputObjects = numInputObjects;
this.inputObjectCreator = inputObjectCreator; this.inputObjectCreator = inputObjectCreator;
...@@ -151,10 +127,6 @@ public class MethodCallThroughputAnalysis19 { ...@@ -151,10 +127,6 @@ public class MethodCallThroughputAnalysis19 {
return this.numNoopFilters; return this.numNoopFilters;
} }
public void setNumNoopFilters(final int numNoopFilters) {
this.numNoopFilters = numNoopFilters;
}
public List<List<TimestampObject>> getTimestampObjectsList() { public List<List<TimestampObject>> getTimestampObjectsList() {
return this.timestampObjectsList; return this.timestampObjectsList;
} }
...@@ -163,8 +135,4 @@ public class MethodCallThroughputAnalysis19 { ...@@ -163,8 +135,4 @@ public class MethodCallThroughputAnalysis19 {
return this.numWorkerThreads; return this.numWorkerThreads;
} }
public void setNumWorkerThreads(final int numWorkerThreads) {
this.numWorkerThreads = numWorkerThreads;
}
} }
...@@ -42,11 +42,6 @@ public class OldPipeline<FirstStage extends Stage, LastStage extends Stage> exte ...@@ -42,11 +42,6 @@ public class OldPipeline<FirstStage extends Stage, LastStage extends Stage> exte
this.lastStage.validateOutputPorts(invalidPortConnections); this.lastStage.validateOutputPorts(invalidPortConnections);
} }
@Override
public TerminationStrategy getTerminationStrategy() {
return firstStage.getTerminationStrategy();
}
@Override @Override
public void terminate() { public void terminate() {
firstStage.terminate(); firstStage.terminate();
...@@ -72,4 +67,9 @@ public class OldPipeline<FirstStage extends Stage, LastStage extends Stage> exte ...@@ -72,4 +67,9 @@ public class OldPipeline<FirstStage extends Stage, LastStage extends Stage> exte
return firstStage.getOwningThread(); return firstStage.getOwningThread();
} }
@Override
public TerminationStrategy getTerminationStrategy() {
return firstStage.getTerminationStrategy();
}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment