Skip to content
Snippets Groups Projects
Commit 6b68248e authored by Nelson Tavares de Sousa's avatar Nelson Tavares de Sousa
Browse files

refactored tests; all work but one... but that's logical consequence of

the changes
parent 4e8f8745
No related branches found
No related tags found
No related merge requests found
Showing with 26 additions and 27 deletions
...@@ -88,7 +88,7 @@ public final class StageTester { ...@@ -88,7 +88,7 @@ public final class StageTester {
public Configuration(final List<InputHolder<?>> inputHolders, final Stage stage, final List<OutputHolder<?>> outputHolders) { public Configuration(final List<InputHolder<?>> inputHolders, final Stage stage, final List<OutputHolder<?>> outputHolders) {
for (InputHolder<?> inputHolder : inputHolders) { for (InputHolder<?> inputHolder : inputHolders) {
final InitialElementProducer<Object> producer = new InitialElementProducer<Object>(inputHolder.getInput()); final InitialElementProducer<Object> producer = new InitialElementProducer<Object>(inputHolder.getInput());
connectBoundedInterThreads(producer.getOutputPort(), inputHolder.getPort()); connectStages(producer.getOutputPort(), inputHolder.getPort());
addThreadableStage(producer); addThreadableStage(producer);
} }
...@@ -96,7 +96,7 @@ public final class StageTester { ...@@ -96,7 +96,7 @@ public final class StageTester {
for (OutputHolder<?> outputHolder : outputHolders) { for (OutputHolder<?> outputHolder : outputHolders) {
final CollectorSink<Object> sink = new CollectorSink<Object>(outputHolder.getOutputElements()); final CollectorSink<Object> sink = new CollectorSink<Object>(outputHolder.getOutputElements());
connectIntraThreads(outputHolder.getPort(), sink.getInputPort()); connectStages(outputHolder.getPort(), sink.getInputPort());
} }
} }
} }
......
...@@ -40,12 +40,12 @@ public class CipherConfiguration extends AnalysisConfiguration { ...@@ -40,12 +40,12 @@ public class CipherConfiguration extends AnalysisConfiguration {
final CipherStage decrypt = new CipherStage(password, CipherMode.DECRYPT); final CipherStage decrypt = new CipherStage(password, CipherMode.DECRYPT);
final ByteArrayFileWriter writer = new ByteArrayFileWriter(output); final ByteArrayFileWriter writer = new ByteArrayFileWriter(output);
connectIntraThreads(init.getOutputPort(), f2b.getInputPort()); connectStages(init.getOutputPort(), f2b.getInputPort());
connectIntraThreads(f2b.getOutputPort(), enc.getInputPort()); connectStages(f2b.getOutputPort(), enc.getInputPort());
connectIntraThreads(enc.getOutputPort(), comp.getInputPort()); connectStages(enc.getOutputPort(), comp.getInputPort());
connectIntraThreads(comp.getOutputPort(), decomp.getInputPort()); connectStages(comp.getOutputPort(), decomp.getInputPort());
connectIntraThreads(decomp.getOutputPort(), decrypt.getInputPort()); connectStages(decomp.getOutputPort(), decrypt.getInputPort());
connectIntraThreads(decrypt.getOutputPort(), writer.getInputPort()); connectStages(decrypt.getOutputPort(), writer.getInputPort());
// this.getFiniteProducerStages().add(init); // this.getFiniteProducerStages().add(init);
this.addThreadableStage(init); this.addThreadableStage(init);
......
...@@ -71,7 +71,7 @@ public class AnalysisTest { ...@@ -71,7 +71,7 @@ public class AnalysisTest {
public TestConfig() { public TestConfig() {
final InitialElementProducer<String> init = new InitialElementProducer<String>("Hello"); final InitialElementProducer<String> init = new InitialElementProducer<String>("Hello");
delay = new DelayAndTerminate(DELAY_IN_MS); delay = new DelayAndTerminate(DELAY_IN_MS);
connectIntraThreads(init.getOutputPort(), delay.getInputPort()); connectStages(init.getOutputPort(), delay.getInputPort());
addThreadableStage(init); addThreadableStage(init);
} }
} }
......
...@@ -35,7 +35,7 @@ public class RunnableConsumerStageTestConfiguration extends AnalysisConfiguratio ...@@ -35,7 +35,7 @@ public class RunnableConsumerStageTestConfiguration extends AnalysisConfiguratio
CollectorSink<Integer> collectorSink = new CollectorSink<Integer>(collectedElements); CollectorSink<Integer> collectorSink = new CollectorSink<Integer>(collectedElements);
addThreadableStage(collectorSink); addThreadableStage(collectorSink);
connectBoundedInterThreads(producer.getOutputPort(), collectorSink.getInputPort()); connectStages(producer.getOutputPort(), collectorSink.getInputPort());
this.collectorSink = collectorSink; this.collectorSink = collectorSink;
} }
......
...@@ -60,7 +60,7 @@ public class StageTest { ...@@ -60,7 +60,7 @@ public class StageTest {
public TestConfig() { public TestConfig() {
init = new InitialElementProducer<String>("Hello"); init = new InitialElementProducer<String>("Hello");
delay = new DelayAndTerminate(0); delay = new DelayAndTerminate(0);
connectIntraThreads(init.getOutputPort(), delay.getInputPort()); connectStages(init.getOutputPort(), delay.getInputPort());
addThreadableStage(init); addThreadableStage(init);
} }
} }
......
...@@ -23,7 +23,6 @@ import java.util.Set; ...@@ -23,7 +23,6 @@ import java.util.Set;
import org.junit.Test; import org.junit.Test;
import teetime.framework.pipe.IPipe;
import teetime.stage.CountingMapMerger; import teetime.stage.CountingMapMerger;
import teetime.stage.InitialElementProducer; import teetime.stage.InitialElementProducer;
import teetime.stage.basic.distributor.Distributor; import teetime.stage.basic.distributor.Distributor;
...@@ -67,8 +66,8 @@ public class TraversorTest { ...@@ -67,8 +66,8 @@ public class TraversorTest {
// CountingMapMerger (already as field) // CountingMapMerger (already as field)
// Connecting the stages of the first part of the config // Connecting the stages of the first part of the config
connectIntraThreads(init.getOutputPort(), f2b.getInputPort()); connectStages(init.getOutputPort(), f2b.getInputPort());
connectIntraThreads(f2b.getOutputPort(), distributor.getInputPort()); connectStages(f2b.getOutputPort(), distributor.getInputPort());
// Middle part... multiple instances of WordCounter are created and connected to the merger and distrubuter stages // Middle part... multiple instances of WordCounter are created and connected to the merger and distrubuter stages
for (int i = 0; i < threads; i++) { for (int i = 0; i < threads; i++) {
...@@ -76,15 +75,15 @@ public class TraversorTest { ...@@ -76,15 +75,15 @@ public class TraversorTest {
final WordCounter wc = new WordCounter(); final WordCounter wc = new WordCounter();
// intraFact.create(inputPortSizePrinter.getOutputPort(), wc.getInputPort()); // intraFact.create(inputPortSizePrinter.getOutputPort(), wc.getInputPort());
final IPipe distributorPipe = connectBoundedInterThreads(distributor.getNewOutputPort(), wc.getInputPort(), 10000); connectStages(distributor.getNewOutputPort(), wc.getInputPort());
final IPipe mergerPipe = connectBoundedInterThreads(wc.getOutputPort(), merger.getNewInputPort()); connectStages(wc.getOutputPort(), merger.getNewInputPort());
// Add WordCounter as a threadable stage, so it runs in its own thread // Add WordCounter as a threadable stage, so it runs in its own thread
addThreadableStage(wc); addThreadableStage(wc);
} }
// Connect the stages of the last part // Connect the stages of the last part
connectIntraThreads(merger.getOutputPort(), result.getInputPort()); connectStages(merger.getOutputPort(), result.getInputPort());
// Add the first and last part to the threadable stages // Add the first and last part to the threadable stages
addThreadableStage(init); addThreadableStage(init);
......
...@@ -42,7 +42,7 @@ class WaitStrategyConfiguration extends AnalysisConfiguration { ...@@ -42,7 +42,7 @@ class WaitStrategyConfiguration extends AnalysisConfiguration {
Clock clock = new Clock(); Clock clock = new Clock();
clock.setInitialDelayInMs(initialDelayInMs); clock.setInitialDelayInMs(initialDelayInMs);
connectBoundedInterThreads(clock.getOutputPort(), delay.getTimestampTriggerInputPort()); connectStages(clock.getOutputPort(), delay.getTimestampTriggerInputPort());
return clock; return clock;
} }
...@@ -51,7 +51,7 @@ class WaitStrategyConfiguration extends AnalysisConfiguration { ...@@ -51,7 +51,7 @@ class WaitStrategyConfiguration extends AnalysisConfiguration {
InitialElementProducer<Object> initialElementProducer = new InitialElementProducer<Object>(elements); InitialElementProducer<Object> initialElementProducer = new InitialElementProducer<Object>(elements);
delay = new Delay<Object>(); delay = new Delay<Object>();
connectIntraThreads(initialElementProducer.getOutputPort(), delay.getInputPort()); connectStages(initialElementProducer.getOutputPort(), delay.getInputPort());
return initialElementProducer; return initialElementProducer;
} }
...@@ -62,8 +62,8 @@ class WaitStrategyConfiguration extends AnalysisConfiguration { ...@@ -62,8 +62,8 @@ class WaitStrategyConfiguration extends AnalysisConfiguration {
// relay.setIdleStrategy(new WaitStrategy(relay)); // relay.setIdleStrategy(new WaitStrategy(relay));
connectBoundedInterThreads(delay.getOutputPort(), relay.getInputPort()); connectStages(delay.getOutputPort(), relay.getInputPort());
connectIntraThreads(relay.getOutputPort(), collectorSink.getInputPort()); connectStages(relay.getOutputPort(), collectorSink.getInputPort());
this.collectorSink = collectorSink; this.collectorSink = collectorSink;
......
...@@ -44,8 +44,8 @@ class YieldStrategyConfiguration extends AnalysisConfiguration { ...@@ -44,8 +44,8 @@ class YieldStrategyConfiguration extends AnalysisConfiguration {
// relay.setIdleStrategy(new YieldStrategy()); // relay.setIdleStrategy(new YieldStrategy());
connectBoundedInterThreads(producer.getOutputPort(), relay.getInputPort()); connectStages(producer.getOutputPort(), relay.getInputPort());
connectIntraThreads(relay.getOutputPort(), collectorSink.getInputPort()); connectStages(relay.getOutputPort(), collectorSink.getInputPort());
this.collectorSink = collectorSink; this.collectorSink = collectorSink;
......
...@@ -28,7 +28,7 @@ public class ExceptionTestConfiguration extends AnalysisConfiguration { ...@@ -28,7 +28,7 @@ public class ExceptionTestConfiguration extends AnalysisConfiguration {
second = new ExceptionTestConsumerStage(); second = new ExceptionTestConsumerStage();
third = new ExceptionTestProducerStage(); third = new ExceptionTestProducerStage();
connectBoundedInterThreads(first.getOutputPort(), second.getInputPort()); connectStages(first.getOutputPort(), second.getInputPort());
// this.addThreadableStage(new ExceptionTestStage()); // this.addThreadableStage(new ExceptionTestStage());
this.addThreadableStage(first); this.addThreadableStage(first);
......
...@@ -130,9 +130,9 @@ public class InstanceOfFilterTest { ...@@ -130,9 +130,9 @@ public class InstanceOfFilterTest {
CollectorSink<Clazz> clazzCollector = new CollectorSink<Clazz>(); CollectorSink<Clazz> clazzCollector = new CollectorSink<Clazz>();
CollectorSink<Object> mismatchedCollector = new CollectorSink<Object>(); CollectorSink<Object> mismatchedCollector = new CollectorSink<Object>();
connectIntraThreads(elementProducer.getOutputPort(), instanceOfFilter.getInputPort()); connectStages(elementProducer.getOutputPort(), instanceOfFilter.getInputPort());
connectIntraThreads(instanceOfFilter.getMatchedOutputPort(), clazzCollector.getInputPort()); connectStages(instanceOfFilter.getMatchedOutputPort(), clazzCollector.getInputPort());
connectIntraThreads(instanceOfFilter.getMismatchedOutputPort(), mismatchedCollector.getInputPort()); connectStages(instanceOfFilter.getMismatchedOutputPort(), mismatchedCollector.getInputPort());
addThreadableStage(elementProducer); addThreadableStage(elementProducer);
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment