diff --git a/src/main/java/teetime/framework/AnalysisConfiguration.java b/src/main/java/teetime/framework/AnalysisConfiguration.java index f9b9e4ef17824c93de73606c796faa31b599e18d..b8c6bee5e013e55d4aa7c078153098dafcb5e2a1 100644 --- a/src/main/java/teetime/framework/AnalysisConfiguration.java +++ b/src/main/java/teetime/framework/AnalysisConfiguration.java @@ -145,7 +145,7 @@ public abstract class AnalysisConfiguration { return interUnboundedThreadFactory.create(sourcePort, targetPort, capacity); } - protected <T> void connectStages(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { + protected <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { connections.add(new Pair<OutputPort, InputPort>(sourcePort, targetPort)); } diff --git a/src/main/java/teetime/framework/test/StageTester.java b/src/main/java/teetime/framework/test/StageTester.java index 2c3ac5232f885cba988919a38143978426a1e688..59092010e96fdb08cf4e783d8aae1143eb317313 100644 --- a/src/main/java/teetime/framework/test/StageTester.java +++ b/src/main/java/teetime/framework/test/StageTester.java @@ -88,7 +88,7 @@ public final class StageTester { public Configuration(final List<InputHolder<?>> inputHolders, final Stage stage, final List<OutputHolder<?>> outputHolders) { for (InputHolder<?> inputHolder : inputHolders) { final InitialElementProducer<Object> producer = new InitialElementProducer<Object>(inputHolder.getInput()); - connectStages(producer.getOutputPort(), inputHolder.getPort()); + connectPorts(producer.getOutputPort(), inputHolder.getPort()); addThreadableStage(producer); } @@ -96,7 +96,7 @@ public final class StageTester { for (OutputHolder<?> outputHolder : outputHolders) { final CollectorSink<Object> sink = new CollectorSink<Object>(outputHolder.getOutputElements()); - connectStages(outputHolder.getPort(), sink.getInputPort()); + connectPorts(outputHolder.getPort(), sink.getInputPort()); } } } diff --git a/src/test/java/teetime/examples/cipher/CipherConfiguration.java b/src/test/java/teetime/examples/cipher/CipherConfiguration.java index 8cef45c58212952d46d3c99c68472c0a85591cc9..95ba33f4357662d7014573bfe67f6f5d810582f7 100644 --- a/src/test/java/teetime/examples/cipher/CipherConfiguration.java +++ b/src/test/java/teetime/examples/cipher/CipherConfiguration.java @@ -40,12 +40,12 @@ public class CipherConfiguration extends AnalysisConfiguration { final CipherStage decrypt = new CipherStage(password, CipherMode.DECRYPT); final ByteArrayFileWriter writer = new ByteArrayFileWriter(output); - connectStages(init.getOutputPort(), f2b.getInputPort()); - connectStages(f2b.getOutputPort(), enc.getInputPort()); - connectStages(enc.getOutputPort(), comp.getInputPort()); - connectStages(comp.getOutputPort(), decomp.getInputPort()); - connectStages(decomp.getOutputPort(), decrypt.getInputPort()); - connectStages(decrypt.getOutputPort(), writer.getInputPort()); + connectPorts(init.getOutputPort(), f2b.getInputPort()); + connectPorts(f2b.getOutputPort(), enc.getInputPort()); + connectPorts(enc.getOutputPort(), comp.getInputPort()); + connectPorts(comp.getOutputPort(), decomp.getInputPort()); + connectPorts(decomp.getOutputPort(), decrypt.getInputPort()); + connectPorts(decrypt.getOutputPort(), writer.getInputPort()); // this.getFiniteProducerStages().add(init); this.addThreadableStage(init); diff --git a/src/test/java/teetime/examples/tokenizer/TokenizerConfiguration.java b/src/test/java/teetime/examples/tokenizer/TokenizerConfiguration.java index 32bec67fd86676965729ee340e4bcf1a2c5b9c34..15c325dad0a5cca3f7df333d092a8444ec16af55 100644 --- a/src/test/java/teetime/examples/tokenizer/TokenizerConfiguration.java +++ b/src/test/java/teetime/examples/tokenizer/TokenizerConfiguration.java @@ -43,12 +43,12 @@ public class TokenizerConfiguration extends AnalysisConfiguration { final Tokenizer tokenizer = new Tokenizer(" "); this.counter = new Counter<String>(); - connectStages(init.getOutputPort(), f2b.getInputPort()); - connectStages(f2b.getOutputPort(), decomp.getInputPort()); - connectStages(decomp.getOutputPort(), decrypt.getInputPort()); - connectStages(decrypt.getOutputPort(), b2s.getInputPort()); - connectStages(b2s.getOutputPort(), tokenizer.getInputPort()); - connectStages(tokenizer.getOutputPort(), this.counter.getInputPort()); + connectPorts(init.getOutputPort(), f2b.getInputPort()); + connectPorts(f2b.getOutputPort(), decomp.getInputPort()); + connectPorts(decomp.getOutputPort(), decrypt.getInputPort()); + connectPorts(decrypt.getOutputPort(), b2s.getInputPort()); + connectPorts(b2s.getOutputPort(), tokenizer.getInputPort()); + connectPorts(tokenizer.getOutputPort(), this.counter.getInputPort()); this.addThreadableStage(init); } diff --git a/src/test/java/teetime/framework/AnalysisTest.java b/src/test/java/teetime/framework/AnalysisTest.java index 4c64c25f10dc077361542cca79fbd14650b3f0b2..5a5966374f1bedca9425e623ea82ef03efca7671 100644 --- a/src/test/java/teetime/framework/AnalysisTest.java +++ b/src/test/java/teetime/framework/AnalysisTest.java @@ -73,7 +73,7 @@ public class AnalysisTest { public TestConfig() { final InitialElementProducer<String> init = new InitialElementProducer<String>("Hello"); delay = new DelayAndTerminate(DELAY_IN_MS); - connectStages(init.getOutputPort(), delay.getInputPort()); + connectPorts(init.getOutputPort(), delay.getInputPort()); addThreadableStage(init); } } @@ -114,7 +114,7 @@ public class AnalysisTest { public Sink<Object> sink = new Sink<Object>(); public AnalysisTestConfig(final boolean inter) { - connectStages(init.getOutputPort(), sink.getInputPort()); + connectPorts(init.getOutputPort(), sink.getInputPort()); addThreadableStage(init); if (inter) { addThreadableStage(sink); diff --git a/src/test/java/teetime/framework/RunnableConsumerStageTestConfiguration.java b/src/test/java/teetime/framework/RunnableConsumerStageTestConfiguration.java index 6e3be7ba565f796258a4d498cc331952fe4a4ca1..7dca2a99fc172c73a12e247aae3cd4693d67d0de 100644 --- a/src/test/java/teetime/framework/RunnableConsumerStageTestConfiguration.java +++ b/src/test/java/teetime/framework/RunnableConsumerStageTestConfiguration.java @@ -35,7 +35,7 @@ public class RunnableConsumerStageTestConfiguration extends AnalysisConfiguratio CollectorSink<Integer> collectorSink = new CollectorSink<Integer>(collectedElements); addThreadableStage(collectorSink); - connectStages(producer.getOutputPort(), collectorSink.getInputPort()); + connectPorts(producer.getOutputPort(), collectorSink.getInputPort()); this.collectorSink = collectorSink; } diff --git a/src/test/java/teetime/framework/StageTest.java b/src/test/java/teetime/framework/StageTest.java index 4df2995e8abc27adfa1fd79f9e11d1a16d821158..19580dcb72e9573163c2c40755d4f18a45b60e4c 100644 --- a/src/test/java/teetime/framework/StageTest.java +++ b/src/test/java/teetime/framework/StageTest.java @@ -60,7 +60,7 @@ public class StageTest { public TestConfig() { init = new InitialElementProducer<String>("Hello"); delay = new DelayAndTerminate(0); - connectStages(init.getOutputPort(), delay.getInputPort()); + connectPorts(init.getOutputPort(), delay.getInputPort()); addThreadableStage(init); } } diff --git a/src/test/java/teetime/framework/TraversorTest.java b/src/test/java/teetime/framework/TraversorTest.java index f447dc204c2e3551b9d08ac44d482d0d6d9c7bd2..f895104874d57a005aa66e0fa6788c228b36747f 100644 --- a/src/test/java/teetime/framework/TraversorTest.java +++ b/src/test/java/teetime/framework/TraversorTest.java @@ -71,8 +71,8 @@ public class TraversorTest { // CountingMapMerger (already as field) // Connecting the stages of the first part of the config - connectStages(init.getOutputPort(), f2b.getInputPort()); - connectStages(f2b.getOutputPort(), distributor.getInputPort()); + connectPorts(init.getOutputPort(), f2b.getInputPort()); + connectPorts(f2b.getOutputPort(), distributor.getInputPort()); // Middle part... multiple instances of WordCounter are created and connected to the merger and distrubuter stages for (int i = 0; i < threads; i++) { @@ -80,15 +80,15 @@ public class TraversorTest { final WordCounter wc = new WordCounter(); // intraFact.create(inputPortSizePrinter.getOutputPort(), wc.getInputPort()); - connectStages(distributor.getNewOutputPort(), wc.getInputPort()); - connectStages(wc.getOutputPort(), merger.getNewInputPort()); + connectPorts(distributor.getNewOutputPort(), wc.getInputPort()); + connectPorts(wc.getOutputPort(), merger.getNewInputPort()); // Add WordCounter as a threadable stage, so it runs in its own thread addThreadableStage(wc); } // Connect the stages of the last part - connectStages(merger.getOutputPort(), result.getInputPort()); + connectPorts(merger.getOutputPort(), result.getInputPort()); // Add the first and last part to the threadable stages addThreadableStage(init); diff --git a/src/test/java/teetime/framework/WaitStrategyConfiguration.java b/src/test/java/teetime/framework/WaitStrategyConfiguration.java index 785aafd74220d69413bc7dbeefcdc219824455cd..191f8460e6f68a04007a05f7cc29338e3c2789f2 100644 --- a/src/test/java/teetime/framework/WaitStrategyConfiguration.java +++ b/src/test/java/teetime/framework/WaitStrategyConfiguration.java @@ -42,7 +42,7 @@ class WaitStrategyConfiguration extends AnalysisConfiguration { Clock clock = new Clock(); clock.setInitialDelayInMs(initialDelayInMs); - connectStages(clock.getOutputPort(), delay.getTimestampTriggerInputPort()); + connectPorts(clock.getOutputPort(), delay.getTimestampTriggerInputPort()); return clock; } @@ -51,7 +51,7 @@ class WaitStrategyConfiguration extends AnalysisConfiguration { InitialElementProducer<Object> initialElementProducer = new InitialElementProducer<Object>(elements); delay = new Delay<Object>(); - connectStages(initialElementProducer.getOutputPort(), delay.getInputPort()); + connectPorts(initialElementProducer.getOutputPort(), delay.getInputPort()); return initialElementProducer; } @@ -62,8 +62,8 @@ class WaitStrategyConfiguration extends AnalysisConfiguration { // relay.setIdleStrategy(new WaitStrategy(relay)); - connectStages(delay.getOutputPort(), relay.getInputPort()); - connectStages(relay.getOutputPort(), collectorSink.getInputPort()); + connectPorts(delay.getOutputPort(), relay.getInputPort()); + connectPorts(relay.getOutputPort(), collectorSink.getInputPort()); this.collectorSink = collectorSink; diff --git a/src/test/java/teetime/framework/YieldStrategyConfiguration.java b/src/test/java/teetime/framework/YieldStrategyConfiguration.java index 8c784d11414e01b398d946d8522fb2e5af8a9b88..a63025947a38b2db8f3e12e3bad6c1637d52fa8a 100644 --- a/src/test/java/teetime/framework/YieldStrategyConfiguration.java +++ b/src/test/java/teetime/framework/YieldStrategyConfiguration.java @@ -44,8 +44,8 @@ class YieldStrategyConfiguration extends AnalysisConfiguration { // relay.setIdleStrategy(new YieldStrategy()); - connectStages(producer.getOutputPort(), relay.getInputPort()); - connectStages(relay.getOutputPort(), collectorSink.getInputPort()); + connectPorts(producer.getOutputPort(), relay.getInputPort()); + connectPorts(relay.getOutputPort(), collectorSink.getInputPort()); this.collectorSink = collectorSink; diff --git a/src/test/java/teetime/framework/exceptionHandling/ExceptionTestConfiguration.java b/src/test/java/teetime/framework/exceptionHandling/ExceptionTestConfiguration.java index ab1d2e2728f7a06cfdbf471e2961d8013afaf6b9..d985faf23a7c09a0eb2ee3e1727c289047a3f209 100644 --- a/src/test/java/teetime/framework/exceptionHandling/ExceptionTestConfiguration.java +++ b/src/test/java/teetime/framework/exceptionHandling/ExceptionTestConfiguration.java @@ -28,7 +28,7 @@ public class ExceptionTestConfiguration extends AnalysisConfiguration { second = new ExceptionTestConsumerStage(); third = new ExceptionTestProducerStage(); - connectStages(first.getOutputPort(), second.getInputPort()); + connectPorts(first.getOutputPort(), second.getInputPort()); // this.addThreadableStage(new ExceptionTestStage()); this.addThreadableStage(first); diff --git a/src/test/java/teetime/stage/InstanceOfFilterTest.java b/src/test/java/teetime/stage/InstanceOfFilterTest.java index 03e13bfc04fc8c890f9aaf418aad46e73b89bd9a..b87d2ec385f2918c6e364531a649fe37c5ac3427 100644 --- a/src/test/java/teetime/stage/InstanceOfFilterTest.java +++ b/src/test/java/teetime/stage/InstanceOfFilterTest.java @@ -130,9 +130,9 @@ public class InstanceOfFilterTest { CollectorSink<Clazz> clazzCollector = new CollectorSink<Clazz>(); CollectorSink<Object> mismatchedCollector = new CollectorSink<Object>(); - connectStages(elementProducer.getOutputPort(), instanceOfFilter.getInputPort()); - connectStages(instanceOfFilter.getMatchedOutputPort(), clazzCollector.getInputPort()); - connectStages(instanceOfFilter.getMismatchedOutputPort(), mismatchedCollector.getInputPort()); + connectPorts(elementProducer.getOutputPort(), instanceOfFilter.getInputPort()); + connectPorts(instanceOfFilter.getMatchedOutputPort(), clazzCollector.getInputPort()); + connectPorts(instanceOfFilter.getMismatchedOutputPort(), mismatchedCollector.getInputPort()); addThreadableStage(elementProducer); }