diff --git a/src/main/java/teetime/framework/AnalysisConfiguration.java b/src/main/java/teetime/framework/AnalysisConfiguration.java index 82e8152cc347cce60b5dbf844f9911188632b0d6..db99b83be562eece6c4d925de1e667bc96b1a0e1 100644 --- a/src/main/java/teetime/framework/AnalysisConfiguration.java +++ b/src/main/java/teetime/framework/AnalysisConfiguration.java @@ -18,7 +18,11 @@ package teetime.framework; import java.util.LinkedList; import java.util.List; +import teetime.framework.pipe.IPipe; +import teetime.framework.pipe.IPipeFactory; import teetime.framework.pipe.PipeFactoryRegistry; +import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; +import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; /** * Represents a configuration of connected stages, which is needed to run a analysis. @@ -26,11 +30,17 @@ import teetime.framework.pipe.PipeFactoryRegistry; */ public abstract class AnalysisConfiguration { + private final List<Stage> threadableStageJobs = new LinkedList<Stage>(); + private final IPipeFactory intraThreadFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false); + private final IPipeFactory interBoundedThreadFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, true); + private final IPipeFactory interUnboundedThreadFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false); + /** * Can be used by subclasses, to obtain pipe factories */ + @Deprecated + // TODO: set private protected static final PipeFactoryRegistry PIPE_FACTORY_REGISTRY = PipeFactoryRegistry.INSTANCE; - private final List<Stage> threadableStageJobs = new LinkedList<Stage>(); List<Stage> getThreadableStageJobs() { return this.threadableStageJobs; @@ -40,10 +50,30 @@ public abstract class AnalysisConfiguration { * Execute this method, to add a stage to the configuration, which should be executed in a own thread. * * @param stage - * A arbitrary stage, which will be added to the configuration und executed in a thread. + * A arbitrary stage, which will be added to the configuration and executed in a thread. */ protected void addThreadableStage(final Stage stage) { this.threadableStageJobs.add(stage); } + protected <T> IPipe connectIntraThreads(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { + return intraThreadFactory.create(sourcePort, targetPort); + } + + protected <T> IPipe connectBoundedInterThreads(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { + return interBoundedThreadFactory.create(sourcePort, targetPort); + } + + protected <T> IPipe connectUnboundedInterThreads(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { + return interUnboundedThreadFactory.create(sourcePort, targetPort); + } + + protected <T> IPipe connectBoundedInterThreads(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { + return interBoundedThreadFactory.create(sourcePort, targetPort, capacity); + } + + protected <T> IPipe connectUnboundedInterThreads(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { + return interUnboundedThreadFactory.create(sourcePort, targetPort, capacity); + } + } diff --git a/src/main/java/teetime/framework/test/StageTester.java b/src/main/java/teetime/framework/test/StageTester.java index 4d3cf95e78314b784402db6fb3b6134c7c683148..29b64eeb1bbd27c0da5bdb20d1554e999e363b0c 100644 --- a/src/main/java/teetime/framework/test/StageTester.java +++ b/src/main/java/teetime/framework/test/StageTester.java @@ -23,9 +23,6 @@ import teetime.framework.Analysis; import teetime.framework.AnalysisConfiguration; import teetime.framework.Stage; import teetime.framework.StageState; -import teetime.framework.pipe.IPipeFactory; -import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; -import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; import teetime.stage.CollectorSink; import teetime.stage.IterableProducer; @@ -80,22 +77,19 @@ public final class StageTester { private final class Configuration extends AnalysisConfiguration { public Configuration(final List<InputHolder<?>> inputHolders, final Stage stage, final List<OutputHolder<?>> outputHolders) { - final IPipeFactory interPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false); for (InputHolder<?> inputHolder : inputHolders) { final IterableProducer<Object> producer = new IterableProducer<Object>(inputHolder.getInput()); - interPipeFactory.create(producer.getOutputPort(), inputHolder.getPort()); + connectBoundedInterThreads(producer.getOutputPort(), inputHolder.getPort()); addThreadableStage(producer); } addThreadableStage(stage); - final IPipeFactory intraPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false); for (OutputHolder<?> outputHolder : outputHolders) { final CollectorSink<Object> sink = new CollectorSink<Object>(outputHolder.getOutputElements()); - intraPipeFactory.create(outputHolder.getPort(), sink.getInputPort()); + connectIntraThreads(outputHolder.getPort(), sink.getInputPort()); } } - } } diff --git a/src/performancetest/java/teetime/examples/experiment15/MethodCallThroughputAnalysis15.java b/src/performancetest/java/teetime/examples/experiment15/MethodCallThroughputAnalysis15.java index be73f9db478370dedc30e91069d85486a38c90f1..51586ffbc451af8bdd86cf291df374c99bd360a2 100644 --- a/src/performancetest/java/teetime/examples/experiment15/MethodCallThroughputAnalysis15.java +++ b/src/performancetest/java/teetime/examples/experiment15/MethodCallThroughputAnalysis15.java @@ -21,10 +21,7 @@ import teetime.framework.AnalysisConfiguration; import teetime.framework.OldHeadPipeline; import teetime.framework.RunnableProducerStage; import teetime.framework.Stage; -import teetime.framework.pipe.IPipeFactory; import teetime.framework.pipe.OrderedGrowableArrayPipe; -import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; -import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; import teetime.framework.pipe.SpScPipe; import teetime.stage.Clock; import teetime.stage.CollectorSink; @@ -47,8 +44,6 @@ public class MethodCallThroughputAnalysis15 extends AnalysisConfiguration { private static final int SPSC_INITIAL_CAPACITY = 4; - private final IPipeFactory intraThreadPipeFactory; - private int numInputObjects; private ConstructorClosure<TimestampObject> inputObjectCreator; private int numNoopFilters; @@ -58,10 +53,6 @@ public class MethodCallThroughputAnalysis15 extends AnalysisConfiguration { private Runnable runnable; private Clock clock; - public MethodCallThroughputAnalysis15() { - intraThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false); - } - public void init() { OldHeadPipeline<Clock, Sink<Long>> clockPipeline = this.buildClockPipeline(); this.clockRunnable = new RunnableProducerStage(clockPipeline); @@ -107,15 +98,15 @@ public class MethodCallThroughputAnalysis15 extends AnalysisConfiguration { SpScPipe.connect(clock.getOutputPort(), delay.getTimestampTriggerInputPort(), SPSC_INITIAL_CAPACITY); - intraThreadPipeFactory.create(objectProducer.getOutputPort(), startTimestampFilter.getInputPort()); - intraThreadPipeFactory.create(startTimestampFilter.getOutputPort(), noopFilters[0].getInputPort()); + connectIntraThreads(objectProducer.getOutputPort(), startTimestampFilter.getInputPort()); + connectIntraThreads(startTimestampFilter.getOutputPort(), noopFilters[0].getInputPort()); for (int i = 0; i < noopFilters.length - 1; i++) { - intraThreadPipeFactory.create(noopFilters[i].getOutputPort(), noopFilters[i + 1].getInputPort()); + connectIntraThreads(noopFilters[i].getOutputPort(), noopFilters[i + 1].getInputPort()); } - intraThreadPipeFactory.create(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort()); + connectIntraThreads(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort()); OrderedGrowableArrayPipe.connect(stopTimestampFilter.getOutputPort(), delay.getInputPort()); - intraThreadPipeFactory.create(delay.getOutputPort(), collectorSink.getInputPort()); + connectIntraThreads(delay.getOutputPort(), collectorSink.getInputPort()); return pipeline; } diff --git a/src/performancetest/java/teetime/examples/experiment16/AnalysisConfiguration16.java b/src/performancetest/java/teetime/examples/experiment16/AnalysisConfiguration16.java index feeb708ca8f7d37437744885acae54a84776f66c..396cce7abdc3cecea77113e780c43787710a3422 100644 --- a/src/performancetest/java/teetime/examples/experiment16/AnalysisConfiguration16.java +++ b/src/performancetest/java/teetime/examples/experiment16/AnalysisConfiguration16.java @@ -21,9 +21,6 @@ import java.util.List; import teetime.framework.AnalysisConfiguration; import teetime.framework.OldHeadPipeline; -import teetime.framework.pipe.IPipeFactory; -import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; -import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; import teetime.framework.pipe.SpScPipe; import teetime.stage.CollectorSink; import teetime.stage.NoopFilter; @@ -46,8 +43,6 @@ class AnalysisConfiguration16 extends AnalysisConfiguration { private static final int SPSC_INITIAL_CAPACITY = 100100; private static final int NUM_WORKER_THREADS = Runtime.getRuntime().availableProcessors(); - private final IPipeFactory intraThreadPipeFactory; - private int numInputObjects; private ConstructorClosure<TimestampObject> inputObjectCreator; private final int numNoopFilters; @@ -59,7 +54,6 @@ class AnalysisConfiguration16 extends AnalysisConfiguration { public AnalysisConfiguration16(final int numWorkerThreads, final int numNoopFilters) { this.numWorkerThreads = numWorkerThreads; this.numNoopFilters = numNoopFilters; - this.intraThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false); } public void build() { @@ -87,7 +81,7 @@ class AnalysisConfiguration16 extends AnalysisConfiguration { pipeline.setFirstStage(objectProducer); pipeline.setLastStage(distributor); - intraThreadPipeFactory.create(objectProducer.getOutputPort(), distributor.getInputPort()); + connectIntraThreads(objectProducer.getOutputPort(), distributor.getInputPort()); return pipeline; } @@ -117,15 +111,15 @@ class AnalysisConfiguration16 extends AnalysisConfiguration { SpScPipe.connect(previousStage.getLastStage().getNewOutputPort(), relay.getInputPort(), SPSC_INITIAL_CAPACITY); - intraThreadPipeFactory.create(relay.getOutputPort(), startTimestampFilter.getInputPort()); + connectIntraThreads(relay.getOutputPort(), startTimestampFilter.getInputPort()); - intraThreadPipeFactory.create(startTimestampFilter.getOutputPort(), noopFilters[0].getInputPort()); + connectIntraThreads(startTimestampFilter.getOutputPort(), noopFilters[0].getInputPort()); for (int i = 0; i < noopFilters.length - 1; i++) { - intraThreadPipeFactory.create(noopFilters[i].getOutputPort(), noopFilters[i + 1].getInputPort()); + connectIntraThreads(noopFilters[i].getOutputPort(), noopFilters[i + 1].getInputPort()); } - intraThreadPipeFactory.create(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort()); - intraThreadPipeFactory.create(stopTimestampFilter.getOutputPort(), everyXthPrinter.getInputPort()); - intraThreadPipeFactory.create(everyXthPrinter.getNewOutputPort(), collectorSink.getInputPort()); + connectIntraThreads(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort()); + connectIntraThreads(stopTimestampFilter.getOutputPort(), everyXthPrinter.getInputPort()); + connectIntraThreads(everyXthPrinter.getNewOutputPort(), collectorSink.getInputPort()); return pipeline; } diff --git a/src/performancetest/java/teetime/examples/loopStage/LoopStageAnalysisConfiguration.java b/src/performancetest/java/teetime/examples/loopStage/LoopStageAnalysisConfiguration.java index 65823155b48ffdc755fc8ce0738486fabc62372b..3978b766fae83c61cec12a5948e95cbc5543ab60 100644 --- a/src/performancetest/java/teetime/examples/loopStage/LoopStageAnalysisConfiguration.java +++ b/src/performancetest/java/teetime/examples/loopStage/LoopStageAnalysisConfiguration.java @@ -16,17 +16,13 @@ package teetime.examples.loopStage; import teetime.framework.AnalysisConfiguration; -import teetime.framework.pipe.IPipeFactory; -import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; -import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; public class LoopStageAnalysisConfiguration extends AnalysisConfiguration { public LoopStageAnalysisConfiguration() { Countdown countdown = new Countdown(10); - IPipeFactory factory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.QUEUE_BASED, true); - factory.create(countdown.getNewCountdownOutputPort(), countdown.getCountdownInputPort()); + connectIntraThreads(countdown.getNewCountdownOutputPort(), countdown.getCountdownInputPort()); // this.getFiniteProducerStages().add(countdown); this.addThreadableStage(countdown); diff --git a/src/test/java/teetime/examples/cipher/CipherConfiguration.java b/src/test/java/teetime/examples/cipher/CipherConfiguration.java index 5531ef8c14c79634fa1cb299a7de6f7b1191b342..64dbbb0bd3918397ec0c31134dca9346bc116575 100644 --- a/src/test/java/teetime/examples/cipher/CipherConfiguration.java +++ b/src/test/java/teetime/examples/cipher/CipherConfiguration.java @@ -18,9 +18,6 @@ package teetime.examples.cipher; import java.io.File; import teetime.framework.AnalysisConfiguration; -import teetime.framework.pipe.IPipeFactory; -import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; -import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; import teetime.stage.CipherByteArray; import teetime.stage.CipherByteArray.CipherMode; import teetime.stage.InitialElementProducer; @@ -43,14 +40,12 @@ public class CipherConfiguration extends AnalysisConfiguration { final CipherByteArray decrypt = new CipherByteArray(password, CipherMode.DECRYPT); final ByteArrayFileWriter writer = new ByteArrayFileWriter(output); - final IPipeFactory intraFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false); - - intraFactory.create(init.getOutputPort(), f2b.getInputPort()); - intraFactory.create(f2b.getOutputPort(), enc.getInputPort()); - intraFactory.create(enc.getOutputPort(), comp.getInputPort()); - intraFactory.create(comp.getOutputPort(), decomp.getInputPort()); - intraFactory.create(decomp.getOutputPort(), decrypt.getInputPort()); - intraFactory.create(decrypt.getOutputPort(), writer.getInputPort()); + connectIntraThreads(init.getOutputPort(), f2b.getInputPort()); + connectIntraThreads(f2b.getOutputPort(), enc.getInputPort()); + connectIntraThreads(enc.getOutputPort(), comp.getInputPort()); + connectIntraThreads(comp.getOutputPort(), decomp.getInputPort()); + connectIntraThreads(decomp.getOutputPort(), decrypt.getInputPort()); + connectIntraThreads(decrypt.getOutputPort(), writer.getInputPort()); // this.getFiniteProducerStages().add(init); this.addThreadableStage(init); diff --git a/src/test/java/teetime/examples/tokenizer/TokenizerConfiguration.java b/src/test/java/teetime/examples/tokenizer/TokenizerConfiguration.java index 5d00b8f672b5debf1f1422a6aaa999a8aac254a5..129cdd4fca5665eebcb832534c024ae8ad5d4b0f 100644 --- a/src/test/java/teetime/examples/tokenizer/TokenizerConfiguration.java +++ b/src/test/java/teetime/examples/tokenizer/TokenizerConfiguration.java @@ -18,9 +18,6 @@ package teetime.examples.tokenizer; import java.io.File; import teetime.framework.AnalysisConfiguration; -import teetime.framework.pipe.IPipeFactory; -import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; -import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; import teetime.stage.ByteArray2String; import teetime.stage.CipherByteArray; import teetime.stage.CipherByteArray.CipherMode; @@ -33,7 +30,6 @@ import teetime.stage.string.Tokenizer; public class TokenizerConfiguration extends AnalysisConfiguration { - private static final IPipeFactory INTRA_PIPE_FACTORY = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false); private final Counter<String> counter; public TokenizerConfiguration(final String inputFile, final String password) { @@ -47,12 +43,12 @@ public class TokenizerConfiguration extends AnalysisConfiguration { final Tokenizer tokenizer = new Tokenizer(" "); this.counter = new Counter<String>(); - INTRA_PIPE_FACTORY.create(init.getOutputPort(), f2b.getInputPort()); - INTRA_PIPE_FACTORY.create(f2b.getOutputPort(), decomp.getInputPort()); - INTRA_PIPE_FACTORY.create(decomp.getOutputPort(), decrypt.getInputPort()); - INTRA_PIPE_FACTORY.create(decrypt.getOutputPort(), b2s.getInputPort()); - INTRA_PIPE_FACTORY.create(b2s.getOutputPort(), tokenizer.getInputPort()); - INTRA_PIPE_FACTORY.create(tokenizer.getOutputPort(), this.counter.getInputPort()); + connectIntraThreads(init.getOutputPort(), f2b.getInputPort()); + connectIntraThreads(f2b.getOutputPort(), decomp.getInputPort()); + connectIntraThreads(decomp.getOutputPort(), decrypt.getInputPort()); + connectIntraThreads(decrypt.getOutputPort(), b2s.getInputPort()); + connectIntraThreads(b2s.getOutputPort(), tokenizer.getInputPort()); + connectIntraThreads(tokenizer.getOutputPort(), this.counter.getInputPort()); this.addThreadableStage(init); } diff --git a/src/test/java/teetime/framework/AnalysisTest.java b/src/test/java/teetime/framework/AnalysisTest.java index 61929e185cdd5ef58826e36f7f63f218b21a22ae..f9905c60a8fac7d41cb99320757d98a05dc6f8fa 100644 --- a/src/test/java/teetime/framework/AnalysisTest.java +++ b/src/test/java/teetime/framework/AnalysisTest.java @@ -25,9 +25,6 @@ import static org.junit.Assert.assertTrue; import org.junit.Before; import org.junit.Test; -import teetime.framework.pipe.IPipeFactory; -import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; -import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; import teetime.stage.InitialElementProducer; import teetime.util.StopWatch; @@ -69,13 +66,12 @@ public class AnalysisTest { } private static class TestConfig extends AnalysisConfiguration { - final IPipeFactory intraFact = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false); public final DelayAndTerminate delay; public TestConfig() { final InitialElementProducer<String> init = new InitialElementProducer<String>("Hello"); delay = new DelayAndTerminate(DELAY_IN_MS); - intraFact.create(init.getOutputPort(), delay.getInputPort()); + connectIntraThreads(init.getOutputPort(), delay.getInputPort()); addThreadableStage(init); } } diff --git a/src/test/java/teetime/framework/RunnableConsumerStageTestConfiguration.java b/src/test/java/teetime/framework/RunnableConsumerStageTestConfiguration.java index d542c2d3b41019d4f7bd71505897136d73fff85d..892c3cd0de01cd336d150f66e418535b2d439ed1 100644 --- a/src/test/java/teetime/framework/RunnableConsumerStageTestConfiguration.java +++ b/src/test/java/teetime/framework/RunnableConsumerStageTestConfiguration.java @@ -18,9 +18,6 @@ package teetime.framework; import java.util.ArrayList; import java.util.List; -import teetime.framework.pipe.IPipeFactory; -import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; -import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; import teetime.stage.CollectorSink; import teetime.stage.InitialElementProducer; @@ -38,8 +35,7 @@ public class RunnableConsumerStageTestConfiguration extends AnalysisConfiguratio CollectorSink<Integer> collectorSink = new CollectorSink<Integer>(collectedElements); addThreadableStage(collectorSink); - IPipeFactory pipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false); - pipeFactory.create(producer.getOutputPort(), collectorSink.getInputPort()); + connectBoundedInterThreads(producer.getOutputPort(), collectorSink.getInputPort()); this.collectorSink = collectorSink; } diff --git a/src/test/java/teetime/framework/StageTest.java b/src/test/java/teetime/framework/StageTest.java index ea66d118ff5989d312cf2f84d9ebaaf019ea1b55..5fcc1dfee8718ee9335dab311cb93bb363a3cc84 100644 --- a/src/test/java/teetime/framework/StageTest.java +++ b/src/test/java/teetime/framework/StageTest.java @@ -23,9 +23,6 @@ import static org.junit.Assert.assertThat; import org.junit.Assert; import org.junit.Test; -import teetime.framework.pipe.IPipeFactory; -import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; -import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; import teetime.stage.Cache; import teetime.stage.Counter; import teetime.stage.InitialElementProducer; @@ -57,14 +54,13 @@ public class StageTest { } private static class TestConfig extends AnalysisConfiguration { - final IPipeFactory intraFact = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false); public final DelayAndTerminate delay; public InitialElementProducer<String> init; public TestConfig() { init = new InitialElementProducer<String>("Hello"); delay = new DelayAndTerminate(0); - intraFact.create(init.getOutputPort(), delay.getInputPort()); + connectIntraThreads(init.getOutputPort(), delay.getInputPort()); addThreadableStage(init); } } diff --git a/src/test/java/teetime/framework/TraversorTest.java b/src/test/java/teetime/framework/TraversorTest.java index 14d168e13f61e524582f77c2c7b01c6b733b0b9d..9f4ef25077219463a7ea4c4df72ba4082b89f94d 100644 --- a/src/test/java/teetime/framework/TraversorTest.java +++ b/src/test/java/teetime/framework/TraversorTest.java @@ -24,9 +24,6 @@ import java.util.Set; import org.junit.Test; import teetime.framework.pipe.IPipe; -import teetime.framework.pipe.IPipeFactory; -import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; -import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; import teetime.stage.CountingMapMerger; import teetime.stage.InitialElementProducer; import teetime.stage.basic.distributor.Distributor; @@ -69,13 +66,9 @@ public class TraversorTest { final Merger<CountingMap<String>> merger = new Merger<CountingMap<String>>(); // CountingMapMerger (already as field) - // PipeFactory instaces for intra- and inter-thread communication - final IPipeFactory interFact = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false); - final IPipeFactory intraFact = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false); - // Connecting the stages of the first part of the config - intraFact.create(init.getOutputPort(), f2b.getInputPort()); - intraFact.create(f2b.getOutputPort(), distributor.getInputPort()); + connectIntraThreads(init.getOutputPort(), f2b.getInputPort()); + connectIntraThreads(f2b.getOutputPort(), distributor.getInputPort()); // Middle part... multiple instances of WordCounter are created and connected to the merger and distrubuter stages for (int i = 0; i < threads; i++) { @@ -83,15 +76,15 @@ public class TraversorTest { final WordCounter wc = new WordCounter(); // intraFact.create(inputPortSizePrinter.getOutputPort(), wc.getInputPort()); - final IPipe distributorPipe = interFact.create(distributor.getNewOutputPort(), wc.getInputPort(), 10000); - final IPipe mergerPipe = interFact.create(wc.getOutputPort(), merger.getNewInputPort()); + final IPipe distributorPipe = connectBoundedInterThreads(distributor.getNewOutputPort(), wc.getInputPort(), 10000); + final IPipe mergerPipe = connectBoundedInterThreads(wc.getOutputPort(), merger.getNewInputPort()); // Add WordCounter as a threadable stage, so it runs in its own thread addThreadableStage(wc); } // Connect the stages of the last part - intraFact.create(merger.getOutputPort(), result.getInputPort()); + connectIntraThreads(merger.getOutputPort(), result.getInputPort()); // Add the first and last part to the threadable stages addThreadableStage(init); diff --git a/src/test/java/teetime/framework/WaitStrategyConfiguration.java b/src/test/java/teetime/framework/WaitStrategyConfiguration.java index 269ce5bbf2a93bf23a20cf1e0b4efb3fb8489dea..4d4dff162c62b5b719effb511beceff0148f32a1 100644 --- a/src/test/java/teetime/framework/WaitStrategyConfiguration.java +++ b/src/test/java/teetime/framework/WaitStrategyConfiguration.java @@ -15,9 +15,6 @@ */ package teetime.framework; -import teetime.framework.pipe.IPipeFactory; -import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; -import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; import teetime.stage.Clock; import teetime.stage.CollectorSink; import teetime.stage.InitialElementProducer; @@ -26,15 +23,10 @@ import teetime.stage.basic.Delay; class WaitStrategyConfiguration extends AnalysisConfiguration { - private final IPipeFactory intraThreadPipeFactory; - private final IPipeFactory interThreadPipeFactory; - private Delay<Object> delay; private CollectorSink<Object> collectorSink; public WaitStrategyConfiguration(final long initialDelayInMs, final Object... elements) { - intraThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false); - interThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false); Stage producer = buildProducer(elements); addThreadableStage(producer); @@ -50,7 +42,7 @@ class WaitStrategyConfiguration extends AnalysisConfiguration { Clock clock = new Clock(); clock.setInitialDelayInMs(initialDelayInMs); - interThreadPipeFactory.create(clock.getOutputPort(), delay.getTimestampTriggerInputPort()); + connectBoundedInterThreads(clock.getOutputPort(), delay.getTimestampTriggerInputPort()); return clock; } @@ -59,7 +51,7 @@ class WaitStrategyConfiguration extends AnalysisConfiguration { InitialElementProducer<Object> initialElementProducer = new InitialElementProducer<Object>(elements); delay = new Delay<Object>(); - intraThreadPipeFactory.create(initialElementProducer.getOutputPort(), delay.getInputPort()); + connectIntraThreads(initialElementProducer.getOutputPort(), delay.getInputPort()); return initialElementProducer; } @@ -70,8 +62,8 @@ class WaitStrategyConfiguration extends AnalysisConfiguration { // relay.setIdleStrategy(new WaitStrategy(relay)); - interThreadPipeFactory.create(delay.getOutputPort(), relay.getInputPort()); - intraThreadPipeFactory.create(relay.getOutputPort(), collectorSink.getInputPort()); + connectBoundedInterThreads(delay.getOutputPort(), relay.getInputPort()); + connectIntraThreads(relay.getOutputPort(), collectorSink.getInputPort()); this.collectorSink = collectorSink; diff --git a/src/test/java/teetime/framework/YieldStrategyConfiguration.java b/src/test/java/teetime/framework/YieldStrategyConfiguration.java index 3802a2c5bd2cb84031eaf04abffe620391b002d3..f3e48a4d32c21c1e8bf1c01087657593e4d54099 100644 --- a/src/test/java/teetime/framework/YieldStrategyConfiguration.java +++ b/src/test/java/teetime/framework/YieldStrategyConfiguration.java @@ -15,22 +15,15 @@ */ package teetime.framework; -import teetime.framework.pipe.IPipeFactory; -import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; -import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; import teetime.stage.CollectorSink; import teetime.stage.InitialElementProducer; import teetime.stage.Relay; class YieldStrategyConfiguration extends AnalysisConfiguration { - private final IPipeFactory intraThreadPipeFactory; - private final IPipeFactory interThreadPipeFactory; private CollectorSink<Object> collectorSink; public YieldStrategyConfiguration(final Object... elements) { - intraThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false); - interThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false); InitialElementProducer<Object> producer = buildProducer(elements); addThreadableStage(producer); @@ -51,8 +44,8 @@ class YieldStrategyConfiguration extends AnalysisConfiguration { // relay.setIdleStrategy(new YieldStrategy()); - interThreadPipeFactory.create(producer.getOutputPort(), relay.getInputPort()); - intraThreadPipeFactory.create(relay.getOutputPort(), collectorSink.getInputPort()); + connectBoundedInterThreads(producer.getOutputPort(), relay.getInputPort()); + connectIntraThreads(relay.getOutputPort(), collectorSink.getInputPort()); this.collectorSink = collectorSink; diff --git a/src/test/java/teetime/framework/exceptionHandling/ExceptionHandlingTest.java b/src/test/java/teetime/framework/exceptionHandling/ExceptionHandlingTest.java index 4d23197481984f38ffcdf3793732e60ffc4477ee..1dc02f2b55cfaf73c64b4a87956be187807520d3 100644 --- a/src/test/java/teetime/framework/exceptionHandling/ExceptionHandlingTest.java +++ b/src/test/java/teetime/framework/exceptionHandling/ExceptionHandlingTest.java @@ -60,6 +60,7 @@ public class ExceptionHandlingTest { exceptionArised = true; } assertTrue(exceptionArised); + exceptionArised = false; try { terminatesAllStages(); diff --git a/src/test/java/teetime/framework/exceptionHandling/ExceptionTestConfiguration.java b/src/test/java/teetime/framework/exceptionHandling/ExceptionTestConfiguration.java index 95a6c8d33cec6e1e10aaa03e0d8e0e2666bb5747..a9a1d1dfbbea2638d1c5886566426c250f2f1a0e 100644 --- a/src/test/java/teetime/framework/exceptionHandling/ExceptionTestConfiguration.java +++ b/src/test/java/teetime/framework/exceptionHandling/ExceptionTestConfiguration.java @@ -16,8 +16,6 @@ package teetime.framework.exceptionHandling; import teetime.framework.AnalysisConfiguration; -import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; -import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; public class ExceptionTestConfiguration extends AnalysisConfiguration { @@ -30,8 +28,7 @@ public class ExceptionTestConfiguration extends AnalysisConfiguration { second = new ExceptionTestConsumerStage(); third = new ExceptionTestProducerStage(); - PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false) - .create(first.getOutputPort(), second.getInputPort()); + connectBoundedInterThreads(first.getOutputPort(), second.getInputPort()); // this.addThreadableStage(new ExceptionTestStage()); this.addThreadableStage(first); diff --git a/src/test/java/teetime/stage/InstanceOfFilterTest.java b/src/test/java/teetime/stage/InstanceOfFilterTest.java index 4a85cf416b70634a57ec6dc49c4cf4b0fc160367..186ce0c44fe7017f5bfd2be494f67d539f254db0 100644 --- a/src/test/java/teetime/stage/InstanceOfFilterTest.java +++ b/src/test/java/teetime/stage/InstanceOfFilterTest.java @@ -32,9 +32,6 @@ import org.junit.Test; import teetime.framework.Analysis; import teetime.framework.AnalysisConfiguration; import teetime.framework.AnalysisException; -import teetime.framework.pipe.IPipeFactory; -import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; -import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; import teetime.util.Pair; /** @@ -127,17 +124,15 @@ public class InstanceOfFilterTest { private static class InstanceOfFilterTestConfig extends AnalysisConfiguration { - private final IPipeFactory pipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false); - public InstanceOfFilterTestConfig() { InitialElementProducer<Object> elementProducer = new InitialElementProducer<Object>(); InstanceOfFilter<Object, Clazz> instanceOfFilter = new InstanceOfFilter<Object, Clazz>(Clazz.class); CollectorSink<Clazz> clazzCollector = new CollectorSink<Clazz>(); CollectorSink<Object> mismatchedCollector = new CollectorSink<Object>(); - pipeFactory.create(elementProducer.getOutputPort(), instanceOfFilter.getInputPort()); - pipeFactory.create(instanceOfFilter.getMatchedOutputPort(), clazzCollector.getInputPort()); - pipeFactory.create(instanceOfFilter.getMismatchedOutputPort(), mismatchedCollector.getInputPort()); + connectIntraThreads(elementProducer.getOutputPort(), instanceOfFilter.getInputPort()); + connectIntraThreads(instanceOfFilter.getMatchedOutputPort(), clazzCollector.getInputPort()); + connectIntraThreads(instanceOfFilter.getMismatchedOutputPort(), mismatchedCollector.getInputPort()); addThreadableStage(elementProducer); }