Skip to content
Snippets Groups Projects
Commit 3349f88f authored by Christian Wulf's avatar Christian Wulf
Browse files

tried to fix perf test 16

parent 6ab592c7
No related branches found
No related tags found
No related merge requests found
...@@ -21,11 +21,15 @@ public abstract class AbstractInterThreadPipe extends AbstractPipe { ...@@ -21,11 +21,15 @@ public abstract class AbstractInterThreadPipe extends AbstractPipe {
@Override @Override
public void sendSignal(final ISignal signal) { public void sendSignal(final ISignal signal) {
this.signalQueue.offer(signal); this.signalQueue.offer(signal);
System.out.println("send signal: " + signal + " to " + cachedTargetStage);
Thread owningThread = cachedTargetStage.getOwningThread(); Thread owningThread = cachedTargetStage.getOwningThread();
if (owningThread.getState() == State.WAITING || owningThread.getState() == State.TIMED_WAITING) { if (owningThread.getState() == State.WAITING || owningThread.getState() == State.TIMED_WAITING) {
owningThread.interrupt(); owningThread.interrupt();
System.out.println("interrupted " + owningThread);
} }
System.out.println("Signal sent.");
} }
/** /**
......
...@@ -21,6 +21,12 @@ public final class RunnableConsumerStage extends RunnableStage { ...@@ -21,6 +21,12 @@ public final class RunnableConsumerStage extends RunnableStage {
@Override @Override
protected void beforeStageExecution() { protected void beforeStageExecution() {
// TODO wait for starting signal // TODO wait for starting signal
do {
checkforSignals();
// logger.trace("Signals checked.");
Thread.yield();
} while (stage.getInputPorts().length == 0);
logger.debug("Stage initialized");
} }
@Override @Override
......
...@@ -30,7 +30,7 @@ public abstract class Stage { ...@@ -30,7 +30,7 @@ public abstract class Stage {
protected Stage() { protected Stage() {
this.id = this.createId(); this.id = this.createId();
this.logger = LoggerFactory.getLogger(this.id); this.logger = LoggerFactory.getLogger(this.getClass().getCanonicalName() + ":" + id);
} }
/** /**
......
package teetime.stage; package teetime.stage;
import teetime.framework.AbstractInterThreadPipe; import teetime.framework.AbstractConsumerStage;
import teetime.framework.AbstractProducerStage;
import teetime.framework.InputPort;
import teetime.framework.NotEnoughInputException; import teetime.framework.NotEnoughInputException;
import teetime.framework.OutputPort;
public final class Relay<T> extends AbstractProducerStage<T> { public final class Relay<T> extends AbstractConsumerStage<T> {
private final InputPort<T> inputPort = this.createInputPort(); // private final InputPort<T> inputPort = this.createInputPort();
private final OutputPort<T> outputPort = this.createOutputPort();
private AbstractInterThreadPipe cachedCastedInputPipe; // private AbstractInterThreadPipe cachedCastedInputPipe;
private static final NotEnoughInputException NOT_ENOUGH_INPUT_EXCEPTION = new NotEnoughInputException(); private static final NotEnoughInputException NOT_ENOUGH_INPUT_EXCEPTION = new NotEnoughInputException();
@Override @Override
public void execute() { protected void execute(final T element) {
T element = this.inputPort.receive();
if (null == element) { if (null == element) {
// if (this.cachedCastedInputPipe.getSignal() instanceof TerminatingSignal) { // if (this.cachedCastedInputPipe.getSignal() instanceof TerminatingSignal) {
// this.terminate(); // this.terminate();
// } // }
// Thread.yield(); // Thread.yield();
// return; // return;
logger.trace("relay: returnNoElement");
returnNoElement(); returnNoElement();
} }
logger.trace("relay: " + element);
outputPort.send(element); outputPort.send(element);
} }
...@@ -31,13 +32,14 @@ public final class Relay<T> extends AbstractProducerStage<T> { ...@@ -31,13 +32,14 @@ public final class Relay<T> extends AbstractProducerStage<T> {
throw NOT_ENOUGH_INPUT_EXCEPTION; throw NOT_ENOUGH_INPUT_EXCEPTION;
} }
@Override // @Override
public void onStarting() throws Exception { // public void onStarting() throws Exception {
super.onStarting(); // super.onStarting();
this.cachedCastedInputPipe = (AbstractInterThreadPipe) this.inputPort.getPipe(); // this.cachedCastedInputPipe = (AbstractInterThreadPipe) this.inputPort.getPipe();
} // }
public InputPort<T> getInputPort() { public OutputPort<T> getOutputPort() {
return this.inputPort; return outputPort;
} }
} }
...@@ -21,8 +21,6 @@ import java.util.List; ...@@ -21,8 +21,6 @@ import java.util.List;
import teetime.framework.AnalysisConfiguration; import teetime.framework.AnalysisConfiguration;
import teetime.framework.OldHeadPipeline; import teetime.framework.OldHeadPipeline;
import teetime.framework.RunnableConsumerStage;
import teetime.framework.RunnableProducerStage;
import teetime.framework.pipe.IPipeFactory; import teetime.framework.pipe.IPipeFactory;
import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
...@@ -43,7 +41,7 @@ import teetime.util.TimestampObject; ...@@ -43,7 +41,7 @@ import teetime.util.TimestampObject;
* *
* @since 1.10 * @since 1.10
*/ */
public class MethodCallThroughputAnalysis16 extends AnalysisConfiguration { class AnalysisConfiguration16 extends AnalysisConfiguration {
private static final int SPSC_INITIAL_CAPACITY = 100100; private static final int SPSC_INITIAL_CAPACITY = 100100;
private static final int NUM_WORKER_THREADS = Runtime.getRuntime().availableProcessors(); private static final int NUM_WORKER_THREADS = Runtime.getRuntime().availableProcessors();
...@@ -52,35 +50,31 @@ public class MethodCallThroughputAnalysis16 extends AnalysisConfiguration { ...@@ -52,35 +50,31 @@ public class MethodCallThroughputAnalysis16 extends AnalysisConfiguration {
private int numInputObjects; private int numInputObjects;
private ConstructorClosure<TimestampObject> inputObjectCreator; private ConstructorClosure<TimestampObject> inputObjectCreator;
private int numNoopFilters; private final int numNoopFilters;
private final List<List<TimestampObject>> timestampObjectsList = new LinkedList<List<TimestampObject>>(); private final List<List<TimestampObject>> timestampObjectsList = new LinkedList<List<TimestampObject>>();
private Thread producerThread;
private Thread[] workerThreads;
private int numWorkerThreads; private int numWorkerThreads;
public MethodCallThroughputAnalysis16() { public AnalysisConfiguration16(final int numWorkerThreads, final int numNoopFilters) {
intraThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false); this.numWorkerThreads = numWorkerThreads;
this.numNoopFilters = numNoopFilters;
this.intraThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false);
} }
public void init() { public void build() {
OldHeadPipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> producerPipeline = this.buildProducerPipeline(this.numInputObjects, OldHeadPipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> producerPipeline = this.buildProducerPipeline(this.numInputObjects,
this.inputObjectCreator); this.inputObjectCreator);
this.producerThread = new Thread(new RunnableProducerStage(producerPipeline)); addThreadableStage(producerPipeline);
this.numWorkerThreads = Math.min(NUM_WORKER_THREADS, this.numWorkerThreads); this.numWorkerThreads = Math.min(NUM_WORKER_THREADS, this.numWorkerThreads);
this.workerThreads = new Thread[this.numWorkerThreads]; for (int i = 0; i < numWorkerThreads; i++) {
for (int i = 0; i < this.workerThreads.length; i++) {
List<TimestampObject> resultList = new ArrayList<TimestampObject>(this.numInputObjects); List<TimestampObject> resultList = new ArrayList<TimestampObject>(this.numInputObjects);
this.timestampObjectsList.add(resultList); this.timestampObjectsList.add(resultList);
OldHeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>> workerPipeline = this.buildPipeline(producerPipeline, resultList); OldHeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>> workerPipeline = this.buildPipeline(producerPipeline, resultList);
this.workerThreads[i] = new Thread(new RunnableConsumerStage(workerPipeline)); addThreadableStage(workerPipeline);
workerPipeline.setOwningThread(this.workerThreads[i]);
} }
} }
...@@ -136,30 +130,6 @@ public class MethodCallThroughputAnalysis16 extends AnalysisConfiguration { ...@@ -136,30 +130,6 @@ public class MethodCallThroughputAnalysis16 extends AnalysisConfiguration {
return pipeline; return pipeline;
} }
public void start() {
this.producerThread.start();
for (Thread workerThread : this.workerThreads) {
workerThread.start();
}
try {
this.producerThread.join();
} catch (InterruptedException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
try {
for (Thread workerThread : this.workerThreads) {
workerThread.join();
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void setInput(final int numInputObjects, final ConstructorClosure<TimestampObject> inputObjectCreator) { public void setInput(final int numInputObjects, final ConstructorClosure<TimestampObject> inputObjectCreator) {
this.numInputObjects = numInputObjects; this.numInputObjects = numInputObjects;
this.inputObjectCreator = inputObjectCreator; this.inputObjectCreator = inputObjectCreator;
...@@ -169,10 +139,6 @@ public class MethodCallThroughputAnalysis16 extends AnalysisConfiguration { ...@@ -169,10 +139,6 @@ public class MethodCallThroughputAnalysis16 extends AnalysisConfiguration {
return this.numNoopFilters; return this.numNoopFilters;
} }
public void setNumNoopFilters(final int numNoopFilters) {
this.numNoopFilters = numNoopFilters;
}
public List<List<TimestampObject>> getTimestampObjectsList() { public List<List<TimestampObject>> getTimestampObjectsList() {
return this.timestampObjectsList; return this.timestampObjectsList;
} }
...@@ -181,8 +147,4 @@ public class MethodCallThroughputAnalysis16 extends AnalysisConfiguration { ...@@ -181,8 +147,4 @@ public class MethodCallThroughputAnalysis16 extends AnalysisConfiguration {
return this.numWorkerThreads; return this.numWorkerThreads;
} }
public void setNumWorkerThreads(final int numWorkerThreads) {
this.numWorkerThreads = numWorkerThreads;
}
} }
...@@ -21,6 +21,7 @@ import org.junit.FixMethodOrder; ...@@ -21,6 +21,7 @@ import org.junit.FixMethodOrder;
import org.junit.Test; import org.junit.Test;
import org.junit.runners.MethodSorters; import org.junit.runners.MethodSorters;
import teetime.framework.Analysis;
import teetime.util.ConstructorClosure; import teetime.util.ConstructorClosure;
import teetime.util.ListUtil; import teetime.util.ListUtil;
import teetime.util.TimestampObject; import teetime.util.TimestampObject;
...@@ -81,15 +82,16 @@ public class MethodCallThoughputTimestampAnalysis16Test extends PerformanceTest ...@@ -81,15 +82,16 @@ public class MethodCallThoughputTimestampAnalysis16Test extends PerformanceTest
System.out.println("Testing teetime (mc) with NUM_OBJECTS_TO_CREATE=" + NUM_OBJECTS_TO_CREATE + ", NUM_NOOP_FILTERS=" System.out.println("Testing teetime (mc) with NUM_OBJECTS_TO_CREATE=" + NUM_OBJECTS_TO_CREATE + ", NUM_NOOP_FILTERS="
+ NUM_NOOP_FILTERS + "..."); + NUM_NOOP_FILTERS + "...");
final MethodCallThroughputAnalysis16 analysis = new MethodCallThroughputAnalysis16(); final AnalysisConfiguration16 configuration = new AnalysisConfiguration16(numThreads, NUM_NOOP_FILTERS);
analysis.setNumWorkerThreads(numThreads); configuration.setInput(NUM_OBJECTS_TO_CREATE, new ConstructorClosure<TimestampObject>() {
analysis.setNumNoopFilters(NUM_NOOP_FILTERS);
analysis.setInput(NUM_OBJECTS_TO_CREATE, new ConstructorClosure<TimestampObject>() {
@Override @Override
public TimestampObject create() { public TimestampObject create() {
return new TimestampObject(); return new TimestampObject();
} }
}); });
configuration.build();
final Analysis analysis = new Analysis(configuration);
analysis.init(); analysis.init();
this.stopWatch.start(); this.stopWatch.start();
...@@ -99,7 +101,7 @@ public class MethodCallThoughputTimestampAnalysis16Test extends PerformanceTest ...@@ -99,7 +101,7 @@ public class MethodCallThoughputTimestampAnalysis16Test extends PerformanceTest
this.stopWatch.end(); this.stopWatch.end();
} }
this.timestampObjects = ListUtil.merge(analysis.getTimestampObjectsList()); this.timestampObjects = ListUtil.merge(configuration.getTimestampObjectsList());
} }
} }
...@@ -20,7 +20,8 @@ ...@@ -20,7 +20,8 @@
</encoder> </encoder>
</appender> </appender>
<logger name="teetime.stage" level="INFO" /> <logger name="teetime.framework" level="TRACE" />
<logger name="teetime.stage" level="TRACE" />
<logger name="util" level="INFO" /> <logger name="util" level="INFO" />
<root level="ERROR"> <root level="ERROR">
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment