diff --git a/src/main/java/teetime/framework/AbstractInterThreadPipe.java b/src/main/java/teetime/framework/AbstractInterThreadPipe.java index 847d10ef6ea868cceaa0d60ea5b1db70061784c1..10646a4a966e7e767a3bc05a80ccf2c02f656fe0 100644 --- a/src/main/java/teetime/framework/AbstractInterThreadPipe.java +++ b/src/main/java/teetime/framework/AbstractInterThreadPipe.java @@ -21,11 +21,15 @@ public abstract class AbstractInterThreadPipe extends AbstractPipe { @Override public void sendSignal(final ISignal signal) { this.signalQueue.offer(signal); + System.out.println("send signal: " + signal + " to " + cachedTargetStage); Thread owningThread = cachedTargetStage.getOwningThread(); if (owningThread.getState() == State.WAITING || owningThread.getState() == State.TIMED_WAITING) { owningThread.interrupt(); + System.out.println("interrupted " + owningThread); } + + System.out.println("Signal sent."); } /** diff --git a/src/main/java/teetime/framework/RunnableConsumerStage.java b/src/main/java/teetime/framework/RunnableConsumerStage.java index bfdefd7b9d49b6aa826091b95db7c5131e16e802..8a74b08fb2db13ca7e0aa8190cf06b62dfa43eb2 100644 --- a/src/main/java/teetime/framework/RunnableConsumerStage.java +++ b/src/main/java/teetime/framework/RunnableConsumerStage.java @@ -21,6 +21,12 @@ public final class RunnableConsumerStage extends RunnableStage { @Override protected void beforeStageExecution() { // TODO wait for starting signal + do { + checkforSignals(); + // logger.trace("Signals checked."); + Thread.yield(); + } while (stage.getInputPorts().length == 0); + logger.debug("Stage initialized"); } @Override diff --git a/src/main/java/teetime/framework/Stage.java b/src/main/java/teetime/framework/Stage.java index 96e16370f82bdb3bcfd82f25ab19886b5450127b..f709fc50d11f7a434cef194be25f9b31380bde89 100644 --- a/src/main/java/teetime/framework/Stage.java +++ b/src/main/java/teetime/framework/Stage.java @@ -30,7 +30,7 @@ public abstract class Stage { protected Stage() { this.id = this.createId(); - this.logger = LoggerFactory.getLogger(this.id); + this.logger = LoggerFactory.getLogger(this.getClass().getCanonicalName() + ":" + id); } /** diff --git a/src/main/java/teetime/stage/Relay.java b/src/main/java/teetime/stage/Relay.java index 1840dac654789144f2afc44848ce940f6d2a7487..54f09ed8ff322a0e40c4020083620e2c815f2c23 100644 --- a/src/main/java/teetime/stage/Relay.java +++ b/src/main/java/teetime/stage/Relay.java @@ -1,29 +1,30 @@ package teetime.stage; -import teetime.framework.AbstractInterThreadPipe; -import teetime.framework.AbstractProducerStage; -import teetime.framework.InputPort; +import teetime.framework.AbstractConsumerStage; import teetime.framework.NotEnoughInputException; +import teetime.framework.OutputPort; -public final class Relay<T> extends AbstractProducerStage<T> { +public final class Relay<T> extends AbstractConsumerStage<T> { - private final InputPort<T> inputPort = this.createInputPort(); + // private final InputPort<T> inputPort = this.createInputPort(); + private final OutputPort<T> outputPort = this.createOutputPort(); - private AbstractInterThreadPipe cachedCastedInputPipe; + // private AbstractInterThreadPipe cachedCastedInputPipe; private static final NotEnoughInputException NOT_ENOUGH_INPUT_EXCEPTION = new NotEnoughInputException(); @Override - public void execute() { - T element = this.inputPort.receive(); + protected void execute(final T element) { if (null == element) { // if (this.cachedCastedInputPipe.getSignal() instanceof TerminatingSignal) { // this.terminate(); // } // Thread.yield(); // return; + logger.trace("relay: returnNoElement"); returnNoElement(); } + logger.trace("relay: " + element); outputPort.send(element); } @@ -31,13 +32,14 @@ public final class Relay<T> extends AbstractProducerStage<T> { throw NOT_ENOUGH_INPUT_EXCEPTION; } - @Override - public void onStarting() throws Exception { - super.onStarting(); - this.cachedCastedInputPipe = (AbstractInterThreadPipe) this.inputPort.getPipe(); - } + // @Override + // public void onStarting() throws Exception { + // super.onStarting(); + // this.cachedCastedInputPipe = (AbstractInterThreadPipe) this.inputPort.getPipe(); + // } - public InputPort<T> getInputPort() { - return this.inputPort; + public OutputPort<T> getOutputPort() { + return outputPort; } + } diff --git a/src/performancetest/java/teetime/examples/experiment16/MethodCallThroughputAnalysis16.java b/src/performancetest/java/teetime/examples/experiment16/AnalysisConfiguration16.java similarity index 81% rename from src/performancetest/java/teetime/examples/experiment16/MethodCallThroughputAnalysis16.java rename to src/performancetest/java/teetime/examples/experiment16/AnalysisConfiguration16.java index 5ea97ae65635c950bd1ecdea8b9b76bd95d3824e..f341bc27065a689a7daba8870cfd0e9a23888422 100644 --- a/src/performancetest/java/teetime/examples/experiment16/MethodCallThroughputAnalysis16.java +++ b/src/performancetest/java/teetime/examples/experiment16/AnalysisConfiguration16.java @@ -21,8 +21,6 @@ import java.util.List; import teetime.framework.AnalysisConfiguration; import teetime.framework.OldHeadPipeline; -import teetime.framework.RunnableConsumerStage; -import teetime.framework.RunnableProducerStage; import teetime.framework.pipe.IPipeFactory; import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; @@ -43,7 +41,7 @@ import teetime.util.TimestampObject; * * @since 1.10 */ -public class MethodCallThroughputAnalysis16 extends AnalysisConfiguration { +class AnalysisConfiguration16 extends AnalysisConfiguration { private static final int SPSC_INITIAL_CAPACITY = 100100; private static final int NUM_WORKER_THREADS = Runtime.getRuntime().availableProcessors(); @@ -52,35 +50,31 @@ public class MethodCallThroughputAnalysis16 extends AnalysisConfiguration { private int numInputObjects; private ConstructorClosure<TimestampObject> inputObjectCreator; - private int numNoopFilters; + private final int numNoopFilters; private final List<List<TimestampObject>> timestampObjectsList = new LinkedList<List<TimestampObject>>(); - private Thread producerThread; - - private Thread[] workerThreads; - private int numWorkerThreads; - public MethodCallThroughputAnalysis16() { - intraThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false); + public AnalysisConfiguration16(final int numWorkerThreads, final int numNoopFilters) { + this.numWorkerThreads = numWorkerThreads; + this.numNoopFilters = numNoopFilters; + this.intraThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false); } - public void init() { + public void build() { OldHeadPipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> producerPipeline = this.buildProducerPipeline(this.numInputObjects, this.inputObjectCreator); - this.producerThread = new Thread(new RunnableProducerStage(producerPipeline)); + addThreadableStage(producerPipeline); this.numWorkerThreads = Math.min(NUM_WORKER_THREADS, this.numWorkerThreads); - this.workerThreads = new Thread[this.numWorkerThreads]; - for (int i = 0; i < this.workerThreads.length; i++) { + for (int i = 0; i < numWorkerThreads; i++) { List<TimestampObject> resultList = new ArrayList<TimestampObject>(this.numInputObjects); this.timestampObjectsList.add(resultList); OldHeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>> workerPipeline = this.buildPipeline(producerPipeline, resultList); - this.workerThreads[i] = new Thread(new RunnableConsumerStage(workerPipeline)); - workerPipeline.setOwningThread(this.workerThreads[i]); + addThreadableStage(workerPipeline); } } @@ -136,30 +130,6 @@ public class MethodCallThroughputAnalysis16 extends AnalysisConfiguration { return pipeline; } - public void start() { - this.producerThread.start(); - - for (Thread workerThread : this.workerThreads) { - workerThread.start(); - } - - try { - this.producerThread.join(); - } catch (InterruptedException e1) { - // TODO Auto-generated catch block - e1.printStackTrace(); - } - - try { - for (Thread workerThread : this.workerThreads) { - workerThread.join(); - } - } catch (InterruptedException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - } - public void setInput(final int numInputObjects, final ConstructorClosure<TimestampObject> inputObjectCreator) { this.numInputObjects = numInputObjects; this.inputObjectCreator = inputObjectCreator; @@ -169,10 +139,6 @@ public class MethodCallThroughputAnalysis16 extends AnalysisConfiguration { return this.numNoopFilters; } - public void setNumNoopFilters(final int numNoopFilters) { - this.numNoopFilters = numNoopFilters; - } - public List<List<TimestampObject>> getTimestampObjectsList() { return this.timestampObjectsList; } @@ -181,8 +147,4 @@ public class MethodCallThroughputAnalysis16 extends AnalysisConfiguration { return this.numWorkerThreads; } - public void setNumWorkerThreads(final int numWorkerThreads) { - this.numWorkerThreads = numWorkerThreads; - } - } diff --git a/src/performancetest/java/teetime/examples/experiment16/MethodCallThoughputTimestampAnalysis16Test.java b/src/performancetest/java/teetime/examples/experiment16/MethodCallThoughputTimestampAnalysis16Test.java index f6b61ff996bb0c27eeac2da4683ba3813b3cd586..73cdeb5275bef1e9ff98bb5043c25793e38e113d 100644 --- a/src/performancetest/java/teetime/examples/experiment16/MethodCallThoughputTimestampAnalysis16Test.java +++ b/src/performancetest/java/teetime/examples/experiment16/MethodCallThoughputTimestampAnalysis16Test.java @@ -21,6 +21,7 @@ import org.junit.FixMethodOrder; import org.junit.Test; import org.junit.runners.MethodSorters; +import teetime.framework.Analysis; import teetime.util.ConstructorClosure; import teetime.util.ListUtil; import teetime.util.TimestampObject; @@ -81,15 +82,16 @@ public class MethodCallThoughputTimestampAnalysis16Test extends PerformanceTest System.out.println("Testing teetime (mc) with NUM_OBJECTS_TO_CREATE=" + NUM_OBJECTS_TO_CREATE + ", NUM_NOOP_FILTERS=" + NUM_NOOP_FILTERS + "..."); - final MethodCallThroughputAnalysis16 analysis = new MethodCallThroughputAnalysis16(); - analysis.setNumWorkerThreads(numThreads); - analysis.setNumNoopFilters(NUM_NOOP_FILTERS); - analysis.setInput(NUM_OBJECTS_TO_CREATE, new ConstructorClosure<TimestampObject>() { + final AnalysisConfiguration16 configuration = new AnalysisConfiguration16(numThreads, NUM_NOOP_FILTERS); + configuration.setInput(NUM_OBJECTS_TO_CREATE, new ConstructorClosure<TimestampObject>() { @Override public TimestampObject create() { return new TimestampObject(); } }); + configuration.build(); + + final Analysis analysis = new Analysis(configuration); analysis.init(); this.stopWatch.start(); @@ -99,7 +101,7 @@ public class MethodCallThoughputTimestampAnalysis16Test extends PerformanceTest this.stopWatch.end(); } - this.timestampObjects = ListUtil.merge(analysis.getTimestampObjectsList()); + this.timestampObjects = ListUtil.merge(configuration.getTimestampObjectsList()); } } diff --git a/src/test/resources/logback.xml b/src/test/resources/logback.xml index 0c6041095f144317b7397cdd45a4bd82c385a8d1..407015358bddf280cf340b753af45988394ff8b8 100644 --- a/src/test/resources/logback.xml +++ b/src/test/resources/logback.xml @@ -20,7 +20,8 @@ </encoder> </appender> - <logger name="teetime.stage" level="INFO" /> + <logger name="teetime.framework" level="TRACE" /> + <logger name="teetime.stage" level="TRACE" /> <logger name="util" level="INFO" /> <root level="ERROR">