From 3349f88fe30cd35d22d62b7248ddac79c04c651e Mon Sep 17 00:00:00 2001 From: Christian Wulf <chw@informatik.uni-kiel.de> Date: Thu, 18 Dec 2014 16:16:30 +0100 Subject: [PATCH] tried to fix perf test 16 --- .../framework/AbstractInterThreadPipe.java | 4 ++ .../framework/RunnableConsumerStage.java | 6 ++ src/main/java/teetime/framework/Stage.java | 2 +- src/main/java/teetime/stage/Relay.java | 32 +++++----- ...is16.java => AnalysisConfiguration16.java} | 58 ++++--------------- ...dCallThoughputTimestampAnalysis16Test.java | 12 ++-- src/test/resources/logback.xml | 3 +- 7 files changed, 47 insertions(+), 70 deletions(-) rename src/performancetest/java/teetime/examples/experiment16/{MethodCallThroughputAnalysis16.java => AnalysisConfiguration16.java} (81%) diff --git a/src/main/java/teetime/framework/AbstractInterThreadPipe.java b/src/main/java/teetime/framework/AbstractInterThreadPipe.java index 847d10ef..10646a4a 100644 --- a/src/main/java/teetime/framework/AbstractInterThreadPipe.java +++ b/src/main/java/teetime/framework/AbstractInterThreadPipe.java @@ -21,11 +21,15 @@ public abstract class AbstractInterThreadPipe extends AbstractPipe { @Override public void sendSignal(final ISignal signal) { this.signalQueue.offer(signal); + System.out.println("send signal: " + signal + " to " + cachedTargetStage); Thread owningThread = cachedTargetStage.getOwningThread(); if (owningThread.getState() == State.WAITING || owningThread.getState() == State.TIMED_WAITING) { owningThread.interrupt(); + System.out.println("interrupted " + owningThread); } + + System.out.println("Signal sent."); } /** diff --git a/src/main/java/teetime/framework/RunnableConsumerStage.java b/src/main/java/teetime/framework/RunnableConsumerStage.java index bfdefd7b..8a74b08f 100644 --- a/src/main/java/teetime/framework/RunnableConsumerStage.java +++ b/src/main/java/teetime/framework/RunnableConsumerStage.java @@ -21,6 +21,12 @@ public final class RunnableConsumerStage extends RunnableStage { @Override protected void beforeStageExecution() { // TODO wait for starting signal + do { + checkforSignals(); + // logger.trace("Signals checked."); + Thread.yield(); + } while (stage.getInputPorts().length == 0); + logger.debug("Stage initialized"); } @Override diff --git a/src/main/java/teetime/framework/Stage.java b/src/main/java/teetime/framework/Stage.java index 96e16370..f709fc50 100644 --- a/src/main/java/teetime/framework/Stage.java +++ b/src/main/java/teetime/framework/Stage.java @@ -30,7 +30,7 @@ public abstract class Stage { protected Stage() { this.id = this.createId(); - this.logger = LoggerFactory.getLogger(this.id); + this.logger = LoggerFactory.getLogger(this.getClass().getCanonicalName() + ":" + id); } /** diff --git a/src/main/java/teetime/stage/Relay.java b/src/main/java/teetime/stage/Relay.java index 1840dac6..54f09ed8 100644 --- a/src/main/java/teetime/stage/Relay.java +++ b/src/main/java/teetime/stage/Relay.java @@ -1,29 +1,30 @@ package teetime.stage; -import teetime.framework.AbstractInterThreadPipe; -import teetime.framework.AbstractProducerStage; -import teetime.framework.InputPort; +import teetime.framework.AbstractConsumerStage; import teetime.framework.NotEnoughInputException; +import teetime.framework.OutputPort; -public final class Relay<T> extends AbstractProducerStage<T> { +public final class Relay<T> extends AbstractConsumerStage<T> { - private final InputPort<T> inputPort = this.createInputPort(); + // private final InputPort<T> inputPort = this.createInputPort(); + private final OutputPort<T> outputPort = this.createOutputPort(); - private AbstractInterThreadPipe cachedCastedInputPipe; + // private AbstractInterThreadPipe cachedCastedInputPipe; private static final NotEnoughInputException NOT_ENOUGH_INPUT_EXCEPTION = new NotEnoughInputException(); @Override - public void execute() { - T element = this.inputPort.receive(); + protected void execute(final T element) { if (null == element) { // if (this.cachedCastedInputPipe.getSignal() instanceof TerminatingSignal) { // this.terminate(); // } // Thread.yield(); // return; + logger.trace("relay: returnNoElement"); returnNoElement(); } + logger.trace("relay: " + element); outputPort.send(element); } @@ -31,13 +32,14 @@ public final class Relay<T> extends AbstractProducerStage<T> { throw NOT_ENOUGH_INPUT_EXCEPTION; } - @Override - public void onStarting() throws Exception { - super.onStarting(); - this.cachedCastedInputPipe = (AbstractInterThreadPipe) this.inputPort.getPipe(); - } + // @Override + // public void onStarting() throws Exception { + // super.onStarting(); + // this.cachedCastedInputPipe = (AbstractInterThreadPipe) this.inputPort.getPipe(); + // } - public InputPort<T> getInputPort() { - return this.inputPort; + public OutputPort<T> getOutputPort() { + return outputPort; } + } diff --git a/src/performancetest/java/teetime/examples/experiment16/MethodCallThroughputAnalysis16.java b/src/performancetest/java/teetime/examples/experiment16/AnalysisConfiguration16.java similarity index 81% rename from src/performancetest/java/teetime/examples/experiment16/MethodCallThroughputAnalysis16.java rename to src/performancetest/java/teetime/examples/experiment16/AnalysisConfiguration16.java index 5ea97ae6..f341bc27 100644 --- a/src/performancetest/java/teetime/examples/experiment16/MethodCallThroughputAnalysis16.java +++ b/src/performancetest/java/teetime/examples/experiment16/AnalysisConfiguration16.java @@ -21,8 +21,6 @@ import java.util.List; import teetime.framework.AnalysisConfiguration; import teetime.framework.OldHeadPipeline; -import teetime.framework.RunnableConsumerStage; -import teetime.framework.RunnableProducerStage; import teetime.framework.pipe.IPipeFactory; import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; @@ -43,7 +41,7 @@ import teetime.util.TimestampObject; * * @since 1.10 */ -public class MethodCallThroughputAnalysis16 extends AnalysisConfiguration { +class AnalysisConfiguration16 extends AnalysisConfiguration { private static final int SPSC_INITIAL_CAPACITY = 100100; private static final int NUM_WORKER_THREADS = Runtime.getRuntime().availableProcessors(); @@ -52,35 +50,31 @@ public class MethodCallThroughputAnalysis16 extends AnalysisConfiguration { private int numInputObjects; private ConstructorClosure<TimestampObject> inputObjectCreator; - private int numNoopFilters; + private final int numNoopFilters; private final List<List<TimestampObject>> timestampObjectsList = new LinkedList<List<TimestampObject>>(); - private Thread producerThread; - - private Thread[] workerThreads; - private int numWorkerThreads; - public MethodCallThroughputAnalysis16() { - intraThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false); + public AnalysisConfiguration16(final int numWorkerThreads, final int numNoopFilters) { + this.numWorkerThreads = numWorkerThreads; + this.numNoopFilters = numNoopFilters; + this.intraThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false); } - public void init() { + public void build() { OldHeadPipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> producerPipeline = this.buildProducerPipeline(this.numInputObjects, this.inputObjectCreator); - this.producerThread = new Thread(new RunnableProducerStage(producerPipeline)); + addThreadableStage(producerPipeline); this.numWorkerThreads = Math.min(NUM_WORKER_THREADS, this.numWorkerThreads); - this.workerThreads = new Thread[this.numWorkerThreads]; - for (int i = 0; i < this.workerThreads.length; i++) { + for (int i = 0; i < numWorkerThreads; i++) { List<TimestampObject> resultList = new ArrayList<TimestampObject>(this.numInputObjects); this.timestampObjectsList.add(resultList); OldHeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>> workerPipeline = this.buildPipeline(producerPipeline, resultList); - this.workerThreads[i] = new Thread(new RunnableConsumerStage(workerPipeline)); - workerPipeline.setOwningThread(this.workerThreads[i]); + addThreadableStage(workerPipeline); } } @@ -136,30 +130,6 @@ public class MethodCallThroughputAnalysis16 extends AnalysisConfiguration { return pipeline; } - public void start() { - this.producerThread.start(); - - for (Thread workerThread : this.workerThreads) { - workerThread.start(); - } - - try { - this.producerThread.join(); - } catch (InterruptedException e1) { - // TODO Auto-generated catch block - e1.printStackTrace(); - } - - try { - for (Thread workerThread : this.workerThreads) { - workerThread.join(); - } - } catch (InterruptedException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - } - public void setInput(final int numInputObjects, final ConstructorClosure<TimestampObject> inputObjectCreator) { this.numInputObjects = numInputObjects; this.inputObjectCreator = inputObjectCreator; @@ -169,10 +139,6 @@ public class MethodCallThroughputAnalysis16 extends AnalysisConfiguration { return this.numNoopFilters; } - public void setNumNoopFilters(final int numNoopFilters) { - this.numNoopFilters = numNoopFilters; - } - public List<List<TimestampObject>> getTimestampObjectsList() { return this.timestampObjectsList; } @@ -181,8 +147,4 @@ public class MethodCallThroughputAnalysis16 extends AnalysisConfiguration { return this.numWorkerThreads; } - public void setNumWorkerThreads(final int numWorkerThreads) { - this.numWorkerThreads = numWorkerThreads; - } - } diff --git a/src/performancetest/java/teetime/examples/experiment16/MethodCallThoughputTimestampAnalysis16Test.java b/src/performancetest/java/teetime/examples/experiment16/MethodCallThoughputTimestampAnalysis16Test.java index f6b61ff9..73cdeb52 100644 --- a/src/performancetest/java/teetime/examples/experiment16/MethodCallThoughputTimestampAnalysis16Test.java +++ b/src/performancetest/java/teetime/examples/experiment16/MethodCallThoughputTimestampAnalysis16Test.java @@ -21,6 +21,7 @@ import org.junit.FixMethodOrder; import org.junit.Test; import org.junit.runners.MethodSorters; +import teetime.framework.Analysis; import teetime.util.ConstructorClosure; import teetime.util.ListUtil; import teetime.util.TimestampObject; @@ -81,15 +82,16 @@ public class MethodCallThoughputTimestampAnalysis16Test extends PerformanceTest System.out.println("Testing teetime (mc) with NUM_OBJECTS_TO_CREATE=" + NUM_OBJECTS_TO_CREATE + ", NUM_NOOP_FILTERS=" + NUM_NOOP_FILTERS + "..."); - final MethodCallThroughputAnalysis16 analysis = new MethodCallThroughputAnalysis16(); - analysis.setNumWorkerThreads(numThreads); - analysis.setNumNoopFilters(NUM_NOOP_FILTERS); - analysis.setInput(NUM_OBJECTS_TO_CREATE, new ConstructorClosure<TimestampObject>() { + final AnalysisConfiguration16 configuration = new AnalysisConfiguration16(numThreads, NUM_NOOP_FILTERS); + configuration.setInput(NUM_OBJECTS_TO_CREATE, new ConstructorClosure<TimestampObject>() { @Override public TimestampObject create() { return new TimestampObject(); } }); + configuration.build(); + + final Analysis analysis = new Analysis(configuration); analysis.init(); this.stopWatch.start(); @@ -99,7 +101,7 @@ public class MethodCallThoughputTimestampAnalysis16Test extends PerformanceTest this.stopWatch.end(); } - this.timestampObjects = ListUtil.merge(analysis.getTimestampObjectsList()); + this.timestampObjects = ListUtil.merge(configuration.getTimestampObjectsList()); } } diff --git a/src/test/resources/logback.xml b/src/test/resources/logback.xml index 0c604109..40701535 100644 --- a/src/test/resources/logback.xml +++ b/src/test/resources/logback.xml @@ -20,7 +20,8 @@ </encoder> </appender> - <logger name="teetime.stage" level="INFO" /> + <logger name="teetime.framework" level="TRACE" /> + <logger name="teetime.stage" level="TRACE" /> <logger name="util" level="INFO" /> <root level="ERROR"> -- GitLab