diff --git a/src/main/java/teetime/framework/test/StageTester.java b/src/main/java/teetime/framework/test/StageTester.java index 4ece770476f6db94c3b217c611445600b1d4ddc4..2c3ac5232f885cba988919a38143978426a1e688 100644 --- a/src/main/java/teetime/framework/test/StageTester.java +++ b/src/main/java/teetime/framework/test/StageTester.java @@ -88,7 +88,7 @@ public final class StageTester { public Configuration(final List<InputHolder<?>> inputHolders, final Stage stage, final List<OutputHolder<?>> outputHolders) { for (InputHolder<?> inputHolder : inputHolders) { final InitialElementProducer<Object> producer = new InitialElementProducer<Object>(inputHolder.getInput()); - connectBoundedInterThreads(producer.getOutputPort(), inputHolder.getPort()); + connectStages(producer.getOutputPort(), inputHolder.getPort()); addThreadableStage(producer); } @@ -96,7 +96,7 @@ public final class StageTester { for (OutputHolder<?> outputHolder : outputHolders) { final CollectorSink<Object> sink = new CollectorSink<Object>(outputHolder.getOutputElements()); - connectIntraThreads(outputHolder.getPort(), sink.getInputPort()); + connectStages(outputHolder.getPort(), sink.getInputPort()); } } } diff --git a/src/test/java/teetime/examples/cipher/CipherConfiguration.java b/src/test/java/teetime/examples/cipher/CipherConfiguration.java index fca97df2eca0b734b77004a5927bb8738f4dfff9..8cef45c58212952d46d3c99c68472c0a85591cc9 100644 --- a/src/test/java/teetime/examples/cipher/CipherConfiguration.java +++ b/src/test/java/teetime/examples/cipher/CipherConfiguration.java @@ -40,12 +40,12 @@ public class CipherConfiguration extends AnalysisConfiguration { final CipherStage decrypt = new CipherStage(password, CipherMode.DECRYPT); final ByteArrayFileWriter writer = new ByteArrayFileWriter(output); - connectIntraThreads(init.getOutputPort(), f2b.getInputPort()); - connectIntraThreads(f2b.getOutputPort(), enc.getInputPort()); - connectIntraThreads(enc.getOutputPort(), comp.getInputPort()); - connectIntraThreads(comp.getOutputPort(), decomp.getInputPort()); - connectIntraThreads(decomp.getOutputPort(), decrypt.getInputPort()); - connectIntraThreads(decrypt.getOutputPort(), writer.getInputPort()); + connectStages(init.getOutputPort(), f2b.getInputPort()); + connectStages(f2b.getOutputPort(), enc.getInputPort()); + connectStages(enc.getOutputPort(), comp.getInputPort()); + connectStages(comp.getOutputPort(), decomp.getInputPort()); + connectStages(decomp.getOutputPort(), decrypt.getInputPort()); + connectStages(decrypt.getOutputPort(), writer.getInputPort()); // this.getFiniteProducerStages().add(init); this.addThreadableStage(init); diff --git a/src/test/java/teetime/framework/AnalysisTest.java b/src/test/java/teetime/framework/AnalysisTest.java index f9905c60a8fac7d41cb99320757d98a05dc6f8fa..0ebf719eda3f5148e746d5a2e514a14dea1da04b 100644 --- a/src/test/java/teetime/framework/AnalysisTest.java +++ b/src/test/java/teetime/framework/AnalysisTest.java @@ -71,7 +71,7 @@ public class AnalysisTest { public TestConfig() { final InitialElementProducer<String> init = new InitialElementProducer<String>("Hello"); delay = new DelayAndTerminate(DELAY_IN_MS); - connectIntraThreads(init.getOutputPort(), delay.getInputPort()); + connectStages(init.getOutputPort(), delay.getInputPort()); addThreadableStage(init); } } diff --git a/src/test/java/teetime/framework/RunnableConsumerStageTestConfiguration.java b/src/test/java/teetime/framework/RunnableConsumerStageTestConfiguration.java index 892c3cd0de01cd336d150f66e418535b2d439ed1..6e3be7ba565f796258a4d498cc331952fe4a4ca1 100644 --- a/src/test/java/teetime/framework/RunnableConsumerStageTestConfiguration.java +++ b/src/test/java/teetime/framework/RunnableConsumerStageTestConfiguration.java @@ -35,7 +35,7 @@ public class RunnableConsumerStageTestConfiguration extends AnalysisConfiguratio CollectorSink<Integer> collectorSink = new CollectorSink<Integer>(collectedElements); addThreadableStage(collectorSink); - connectBoundedInterThreads(producer.getOutputPort(), collectorSink.getInputPort()); + connectStages(producer.getOutputPort(), collectorSink.getInputPort()); this.collectorSink = collectorSink; } diff --git a/src/test/java/teetime/framework/StageTest.java b/src/test/java/teetime/framework/StageTest.java index 5fcc1dfee8718ee9335dab311cb93bb363a3cc84..4df2995e8abc27adfa1fd79f9e11d1a16d821158 100644 --- a/src/test/java/teetime/framework/StageTest.java +++ b/src/test/java/teetime/framework/StageTest.java @@ -60,7 +60,7 @@ public class StageTest { public TestConfig() { init = new InitialElementProducer<String>("Hello"); delay = new DelayAndTerminate(0); - connectIntraThreads(init.getOutputPort(), delay.getInputPort()); + connectStages(init.getOutputPort(), delay.getInputPort()); addThreadableStage(init); } } diff --git a/src/test/java/teetime/framework/TraversorTest.java b/src/test/java/teetime/framework/TraversorTest.java index 35a7826b5800186a72416d97e18a58f8662292b9..eaf4be71872531692654ccd7a4295b6232a823c5 100644 --- a/src/test/java/teetime/framework/TraversorTest.java +++ b/src/test/java/teetime/framework/TraversorTest.java @@ -23,7 +23,6 @@ import java.util.Set; import org.junit.Test; -import teetime.framework.pipe.IPipe; import teetime.stage.CountingMapMerger; import teetime.stage.InitialElementProducer; import teetime.stage.basic.distributor.Distributor; @@ -67,8 +66,8 @@ public class TraversorTest { // CountingMapMerger (already as field) // Connecting the stages of the first part of the config - connectIntraThreads(init.getOutputPort(), f2b.getInputPort()); - connectIntraThreads(f2b.getOutputPort(), distributor.getInputPort()); + connectStages(init.getOutputPort(), f2b.getInputPort()); + connectStages(f2b.getOutputPort(), distributor.getInputPort()); // Middle part... multiple instances of WordCounter are created and connected to the merger and distrubuter stages for (int i = 0; i < threads; i++) { @@ -76,15 +75,15 @@ public class TraversorTest { final WordCounter wc = new WordCounter(); // intraFact.create(inputPortSizePrinter.getOutputPort(), wc.getInputPort()); - final IPipe distributorPipe = connectBoundedInterThreads(distributor.getNewOutputPort(), wc.getInputPort(), 10000); - final IPipe mergerPipe = connectBoundedInterThreads(wc.getOutputPort(), merger.getNewInputPort()); + connectStages(distributor.getNewOutputPort(), wc.getInputPort()); + connectStages(wc.getOutputPort(), merger.getNewInputPort()); // Add WordCounter as a threadable stage, so it runs in its own thread addThreadableStage(wc); } // Connect the stages of the last part - connectIntraThreads(merger.getOutputPort(), result.getInputPort()); + connectStages(merger.getOutputPort(), result.getInputPort()); // Add the first and last part to the threadable stages addThreadableStage(init); diff --git a/src/test/java/teetime/framework/WaitStrategyConfiguration.java b/src/test/java/teetime/framework/WaitStrategyConfiguration.java index 4d4dff162c62b5b719effb511beceff0148f32a1..785aafd74220d69413bc7dbeefcdc219824455cd 100644 --- a/src/test/java/teetime/framework/WaitStrategyConfiguration.java +++ b/src/test/java/teetime/framework/WaitStrategyConfiguration.java @@ -42,7 +42,7 @@ class WaitStrategyConfiguration extends AnalysisConfiguration { Clock clock = new Clock(); clock.setInitialDelayInMs(initialDelayInMs); - connectBoundedInterThreads(clock.getOutputPort(), delay.getTimestampTriggerInputPort()); + connectStages(clock.getOutputPort(), delay.getTimestampTriggerInputPort()); return clock; } @@ -51,7 +51,7 @@ class WaitStrategyConfiguration extends AnalysisConfiguration { InitialElementProducer<Object> initialElementProducer = new InitialElementProducer<Object>(elements); delay = new Delay<Object>(); - connectIntraThreads(initialElementProducer.getOutputPort(), delay.getInputPort()); + connectStages(initialElementProducer.getOutputPort(), delay.getInputPort()); return initialElementProducer; } @@ -62,8 +62,8 @@ class WaitStrategyConfiguration extends AnalysisConfiguration { // relay.setIdleStrategy(new WaitStrategy(relay)); - connectBoundedInterThreads(delay.getOutputPort(), relay.getInputPort()); - connectIntraThreads(relay.getOutputPort(), collectorSink.getInputPort()); + connectStages(delay.getOutputPort(), relay.getInputPort()); + connectStages(relay.getOutputPort(), collectorSink.getInputPort()); this.collectorSink = collectorSink; diff --git a/src/test/java/teetime/framework/YieldStrategyConfiguration.java b/src/test/java/teetime/framework/YieldStrategyConfiguration.java index f3e48a4d32c21c1e8bf1c01087657593e4d54099..8c784d11414e01b398d946d8522fb2e5af8a9b88 100644 --- a/src/test/java/teetime/framework/YieldStrategyConfiguration.java +++ b/src/test/java/teetime/framework/YieldStrategyConfiguration.java @@ -44,8 +44,8 @@ class YieldStrategyConfiguration extends AnalysisConfiguration { // relay.setIdleStrategy(new YieldStrategy()); - connectBoundedInterThreads(producer.getOutputPort(), relay.getInputPort()); - connectIntraThreads(relay.getOutputPort(), collectorSink.getInputPort()); + connectStages(producer.getOutputPort(), relay.getInputPort()); + connectStages(relay.getOutputPort(), collectorSink.getInputPort()); this.collectorSink = collectorSink; diff --git a/src/test/java/teetime/framework/exceptionHandling/ExceptionTestConfiguration.java b/src/test/java/teetime/framework/exceptionHandling/ExceptionTestConfiguration.java index a9a1d1dfbbea2638d1c5886566426c250f2f1a0e..ab1d2e2728f7a06cfdbf471e2961d8013afaf6b9 100644 --- a/src/test/java/teetime/framework/exceptionHandling/ExceptionTestConfiguration.java +++ b/src/test/java/teetime/framework/exceptionHandling/ExceptionTestConfiguration.java @@ -28,7 +28,7 @@ public class ExceptionTestConfiguration extends AnalysisConfiguration { second = new ExceptionTestConsumerStage(); third = new ExceptionTestProducerStage(); - connectBoundedInterThreads(first.getOutputPort(), second.getInputPort()); + connectStages(first.getOutputPort(), second.getInputPort()); // this.addThreadableStage(new ExceptionTestStage()); this.addThreadableStage(first); diff --git a/src/test/java/teetime/stage/InstanceOfFilterTest.java b/src/test/java/teetime/stage/InstanceOfFilterTest.java index 8abd3018b5122f409e023e936f30dd6a1377892a..03e13bfc04fc8c890f9aaf418aad46e73b89bd9a 100644 --- a/src/test/java/teetime/stage/InstanceOfFilterTest.java +++ b/src/test/java/teetime/stage/InstanceOfFilterTest.java @@ -130,9 +130,9 @@ public class InstanceOfFilterTest { CollectorSink<Clazz> clazzCollector = new CollectorSink<Clazz>(); CollectorSink<Object> mismatchedCollector = new CollectorSink<Object>(); - connectIntraThreads(elementProducer.getOutputPort(), instanceOfFilter.getInputPort()); - connectIntraThreads(instanceOfFilter.getMatchedOutputPort(), clazzCollector.getInputPort()); - connectIntraThreads(instanceOfFilter.getMismatchedOutputPort(), mismatchedCollector.getInputPort()); + connectStages(elementProducer.getOutputPort(), instanceOfFilter.getInputPort()); + connectStages(instanceOfFilter.getMatchedOutputPort(), clazzCollector.getInputPort()); + connectStages(instanceOfFilter.getMismatchedOutputPort(), mismatchedCollector.getInputPort()); addThreadableStage(elementProducer); }