Skip to content
Snippets Groups Projects
Commit d68d091e authored by Nelson Tavares de Sousa's avatar Nelson Tavares de Sousa
Browse files

introduced static methods for connecting stages #138

parent fd0026b4
No related branches found
No related tags found
No related merge requests found
Showing
with 78 additions and 123 deletions
......@@ -18,7 +18,11 @@ package teetime.framework;
import java.util.LinkedList;
import java.util.List;
import teetime.framework.pipe.IPipe;
import teetime.framework.pipe.IPipeFactory;
import teetime.framework.pipe.PipeFactoryRegistry;
import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
/**
* Represents a configuration of connected stages, which is needed to run a analysis.
......@@ -26,11 +30,17 @@ import teetime.framework.pipe.PipeFactoryRegistry;
*/
public abstract class AnalysisConfiguration {
private final List<Stage> threadableStageJobs = new LinkedList<Stage>();
private final IPipeFactory intraThreadFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false);
private final IPipeFactory interBoundedThreadFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, true);
private final IPipeFactory interUnboundedThreadFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false);
/**
* Can be used by subclasses, to obtain pipe factories
*/
@Deprecated
// TODO: set private
protected static final PipeFactoryRegistry PIPE_FACTORY_REGISTRY = PipeFactoryRegistry.INSTANCE;
private final List<Stage> threadableStageJobs = new LinkedList<Stage>();
List<Stage> getThreadableStageJobs() {
return this.threadableStageJobs;
......@@ -40,10 +50,30 @@ public abstract class AnalysisConfiguration {
* Execute this method, to add a stage to the configuration, which should be executed in a own thread.
*
* @param stage
* A arbitrary stage, which will be added to the configuration und executed in a thread.
* A arbitrary stage, which will be added to the configuration and executed in a thread.
*/
protected void addThreadableStage(final Stage stage) {
this.threadableStageJobs.add(stage);
}
protected <T> IPipe connectIntraThreads(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
return intraThreadFactory.create(sourcePort, targetPort);
}
protected <T> IPipe connectBoundedInterThreads(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
return interBoundedThreadFactory.create(sourcePort, targetPort);
}
protected <T> IPipe connectUnboundedInterThreads(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
return interUnboundedThreadFactory.create(sourcePort, targetPort);
}
protected <T> IPipe connectBoundedInterThreads(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
return interBoundedThreadFactory.create(sourcePort, targetPort, capacity);
}
protected <T> IPipe connectUnboundedInterThreads(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
return interUnboundedThreadFactory.create(sourcePort, targetPort, capacity);
}
}
......@@ -23,9 +23,6 @@ import teetime.framework.Analysis;
import teetime.framework.AnalysisConfiguration;
import teetime.framework.Stage;
import teetime.framework.StageState;
import teetime.framework.pipe.IPipeFactory;
import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
import teetime.stage.CollectorSink;
import teetime.stage.IterableProducer;
......@@ -80,22 +77,19 @@ public final class StageTester {
private final class Configuration extends AnalysisConfiguration {
public Configuration(final List<InputHolder<?>> inputHolders, final Stage stage, final List<OutputHolder<?>> outputHolders) {
final IPipeFactory interPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false);
for (InputHolder<?> inputHolder : inputHolders) {
final IterableProducer<Object> producer = new IterableProducer<Object>(inputHolder.getInput());
interPipeFactory.create(producer.getOutputPort(), inputHolder.getPort());
connectBoundedInterThreads(producer.getOutputPort(), inputHolder.getPort());
addThreadableStage(producer);
}
addThreadableStage(stage);
final IPipeFactory intraPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false);
for (OutputHolder<?> outputHolder : outputHolders) {
final CollectorSink<Object> sink = new CollectorSink<Object>(outputHolder.getOutputElements());
intraPipeFactory.create(outputHolder.getPort(), sink.getInputPort());
connectIntraThreads(outputHolder.getPort(), sink.getInputPort());
}
}
}
}
......@@ -21,10 +21,7 @@ import teetime.framework.AnalysisConfiguration;
import teetime.framework.OldHeadPipeline;
import teetime.framework.RunnableProducerStage;
import teetime.framework.Stage;
import teetime.framework.pipe.IPipeFactory;
import teetime.framework.pipe.OrderedGrowableArrayPipe;
import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
import teetime.framework.pipe.SpScPipe;
import teetime.stage.Clock;
import teetime.stage.CollectorSink;
......@@ -47,8 +44,6 @@ public class MethodCallThroughputAnalysis15 extends AnalysisConfiguration {
private static final int SPSC_INITIAL_CAPACITY = 4;
private final IPipeFactory intraThreadPipeFactory;
private int numInputObjects;
private ConstructorClosure<TimestampObject> inputObjectCreator;
private int numNoopFilters;
......@@ -58,10 +53,6 @@ public class MethodCallThroughputAnalysis15 extends AnalysisConfiguration {
private Runnable runnable;
private Clock clock;
public MethodCallThroughputAnalysis15() {
intraThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false);
}
public void init() {
OldHeadPipeline<Clock, Sink<Long>> clockPipeline = this.buildClockPipeline();
this.clockRunnable = new RunnableProducerStage(clockPipeline);
......@@ -107,15 +98,15 @@ public class MethodCallThroughputAnalysis15 extends AnalysisConfiguration {
SpScPipe.connect(clock.getOutputPort(), delay.getTimestampTriggerInputPort(), SPSC_INITIAL_CAPACITY);
intraThreadPipeFactory.create(objectProducer.getOutputPort(), startTimestampFilter.getInputPort());
intraThreadPipeFactory.create(startTimestampFilter.getOutputPort(), noopFilters[0].getInputPort());
connectIntraThreads(objectProducer.getOutputPort(), startTimestampFilter.getInputPort());
connectIntraThreads(startTimestampFilter.getOutputPort(), noopFilters[0].getInputPort());
for (int i = 0; i < noopFilters.length - 1; i++) {
intraThreadPipeFactory.create(noopFilters[i].getOutputPort(), noopFilters[i + 1].getInputPort());
connectIntraThreads(noopFilters[i].getOutputPort(), noopFilters[i + 1].getInputPort());
}
intraThreadPipeFactory.create(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort());
connectIntraThreads(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort());
OrderedGrowableArrayPipe.connect(stopTimestampFilter.getOutputPort(), delay.getInputPort());
intraThreadPipeFactory.create(delay.getOutputPort(), collectorSink.getInputPort());
connectIntraThreads(delay.getOutputPort(), collectorSink.getInputPort());
return pipeline;
}
......
......@@ -21,9 +21,6 @@ import java.util.List;
import teetime.framework.AnalysisConfiguration;
import teetime.framework.OldHeadPipeline;
import teetime.framework.pipe.IPipeFactory;
import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
import teetime.framework.pipe.SpScPipe;
import teetime.stage.CollectorSink;
import teetime.stage.NoopFilter;
......@@ -46,8 +43,6 @@ class AnalysisConfiguration16 extends AnalysisConfiguration {
private static final int SPSC_INITIAL_CAPACITY = 100100;
private static final int NUM_WORKER_THREADS = Runtime.getRuntime().availableProcessors();
private final IPipeFactory intraThreadPipeFactory;
private int numInputObjects;
private ConstructorClosure<TimestampObject> inputObjectCreator;
private final int numNoopFilters;
......@@ -59,7 +54,6 @@ class AnalysisConfiguration16 extends AnalysisConfiguration {
public AnalysisConfiguration16(final int numWorkerThreads, final int numNoopFilters) {
this.numWorkerThreads = numWorkerThreads;
this.numNoopFilters = numNoopFilters;
this.intraThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false);
}
public void build() {
......@@ -87,7 +81,7 @@ class AnalysisConfiguration16 extends AnalysisConfiguration {
pipeline.setFirstStage(objectProducer);
pipeline.setLastStage(distributor);
intraThreadPipeFactory.create(objectProducer.getOutputPort(), distributor.getInputPort());
connectIntraThreads(objectProducer.getOutputPort(), distributor.getInputPort());
return pipeline;
}
......@@ -117,15 +111,15 @@ class AnalysisConfiguration16 extends AnalysisConfiguration {
SpScPipe.connect(previousStage.getLastStage().getNewOutputPort(), relay.getInputPort(), SPSC_INITIAL_CAPACITY);
intraThreadPipeFactory.create(relay.getOutputPort(), startTimestampFilter.getInputPort());
connectIntraThreads(relay.getOutputPort(), startTimestampFilter.getInputPort());
intraThreadPipeFactory.create(startTimestampFilter.getOutputPort(), noopFilters[0].getInputPort());
connectIntraThreads(startTimestampFilter.getOutputPort(), noopFilters[0].getInputPort());
for (int i = 0; i < noopFilters.length - 1; i++) {
intraThreadPipeFactory.create(noopFilters[i].getOutputPort(), noopFilters[i + 1].getInputPort());
connectIntraThreads(noopFilters[i].getOutputPort(), noopFilters[i + 1].getInputPort());
}
intraThreadPipeFactory.create(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort());
intraThreadPipeFactory.create(stopTimestampFilter.getOutputPort(), everyXthPrinter.getInputPort());
intraThreadPipeFactory.create(everyXthPrinter.getNewOutputPort(), collectorSink.getInputPort());
connectIntraThreads(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort());
connectIntraThreads(stopTimestampFilter.getOutputPort(), everyXthPrinter.getInputPort());
connectIntraThreads(everyXthPrinter.getNewOutputPort(), collectorSink.getInputPort());
return pipeline;
}
......
......@@ -16,17 +16,13 @@
package teetime.examples.loopStage;
import teetime.framework.AnalysisConfiguration;
import teetime.framework.pipe.IPipeFactory;
import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
public class LoopStageAnalysisConfiguration extends AnalysisConfiguration {
public LoopStageAnalysisConfiguration() {
Countdown countdown = new Countdown(10);
IPipeFactory factory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.QUEUE_BASED, true);
factory.create(countdown.getNewCountdownOutputPort(), countdown.getCountdownInputPort());
connectIntraThreads(countdown.getNewCountdownOutputPort(), countdown.getCountdownInputPort());
// this.getFiniteProducerStages().add(countdown);
this.addThreadableStage(countdown);
......
......@@ -18,9 +18,6 @@ package teetime.examples.cipher;
import java.io.File;
import teetime.framework.AnalysisConfiguration;
import teetime.framework.pipe.IPipeFactory;
import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
import teetime.stage.CipherByteArray;
import teetime.stage.CipherByteArray.CipherMode;
import teetime.stage.InitialElementProducer;
......@@ -43,14 +40,12 @@ public class CipherConfiguration extends AnalysisConfiguration {
final CipherByteArray decrypt = new CipherByteArray(password, CipherMode.DECRYPT);
final ByteArrayFileWriter writer = new ByteArrayFileWriter(output);
final IPipeFactory intraFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false);
intraFactory.create(init.getOutputPort(), f2b.getInputPort());
intraFactory.create(f2b.getOutputPort(), enc.getInputPort());
intraFactory.create(enc.getOutputPort(), comp.getInputPort());
intraFactory.create(comp.getOutputPort(), decomp.getInputPort());
intraFactory.create(decomp.getOutputPort(), decrypt.getInputPort());
intraFactory.create(decrypt.getOutputPort(), writer.getInputPort());
connectIntraThreads(init.getOutputPort(), f2b.getInputPort());
connectIntraThreads(f2b.getOutputPort(), enc.getInputPort());
connectIntraThreads(enc.getOutputPort(), comp.getInputPort());
connectIntraThreads(comp.getOutputPort(), decomp.getInputPort());
connectIntraThreads(decomp.getOutputPort(), decrypt.getInputPort());
connectIntraThreads(decrypt.getOutputPort(), writer.getInputPort());
// this.getFiniteProducerStages().add(init);
this.addThreadableStage(init);
......
......@@ -18,9 +18,6 @@ package teetime.examples.tokenizer;
import java.io.File;
import teetime.framework.AnalysisConfiguration;
import teetime.framework.pipe.IPipeFactory;
import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
import teetime.stage.ByteArray2String;
import teetime.stage.CipherByteArray;
import teetime.stage.CipherByteArray.CipherMode;
......@@ -33,7 +30,6 @@ import teetime.stage.string.Tokenizer;
public class TokenizerConfiguration extends AnalysisConfiguration {
private static final IPipeFactory INTRA_PIPE_FACTORY = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false);
private final Counter<String> counter;
public TokenizerConfiguration(final String inputFile, final String password) {
......@@ -47,12 +43,12 @@ public class TokenizerConfiguration extends AnalysisConfiguration {
final Tokenizer tokenizer = new Tokenizer(" ");
this.counter = new Counter<String>();
INTRA_PIPE_FACTORY.create(init.getOutputPort(), f2b.getInputPort());
INTRA_PIPE_FACTORY.create(f2b.getOutputPort(), decomp.getInputPort());
INTRA_PIPE_FACTORY.create(decomp.getOutputPort(), decrypt.getInputPort());
INTRA_PIPE_FACTORY.create(decrypt.getOutputPort(), b2s.getInputPort());
INTRA_PIPE_FACTORY.create(b2s.getOutputPort(), tokenizer.getInputPort());
INTRA_PIPE_FACTORY.create(tokenizer.getOutputPort(), this.counter.getInputPort());
connectIntraThreads(init.getOutputPort(), f2b.getInputPort());
connectIntraThreads(f2b.getOutputPort(), decomp.getInputPort());
connectIntraThreads(decomp.getOutputPort(), decrypt.getInputPort());
connectIntraThreads(decrypt.getOutputPort(), b2s.getInputPort());
connectIntraThreads(b2s.getOutputPort(), tokenizer.getInputPort());
connectIntraThreads(tokenizer.getOutputPort(), this.counter.getInputPort());
this.addThreadableStage(init);
}
......
......@@ -25,9 +25,6 @@ import static org.junit.Assert.assertTrue;
import org.junit.Before;
import org.junit.Test;
import teetime.framework.pipe.IPipeFactory;
import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
import teetime.stage.InitialElementProducer;
import teetime.util.StopWatch;
......@@ -69,13 +66,12 @@ public class AnalysisTest {
}
private static class TestConfig extends AnalysisConfiguration {
final IPipeFactory intraFact = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false);
public final DelayAndTerminate delay;
public TestConfig() {
final InitialElementProducer<String> init = new InitialElementProducer<String>("Hello");
delay = new DelayAndTerminate(DELAY_IN_MS);
intraFact.create(init.getOutputPort(), delay.getInputPort());
connectIntraThreads(init.getOutputPort(), delay.getInputPort());
addThreadableStage(init);
}
}
......
......@@ -18,9 +18,6 @@ package teetime.framework;
import java.util.ArrayList;
import java.util.List;
import teetime.framework.pipe.IPipeFactory;
import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
import teetime.stage.CollectorSink;
import teetime.stage.InitialElementProducer;
......@@ -38,8 +35,7 @@ public class RunnableConsumerStageTestConfiguration extends AnalysisConfiguratio
CollectorSink<Integer> collectorSink = new CollectorSink<Integer>(collectedElements);
addThreadableStage(collectorSink);
IPipeFactory pipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false);
pipeFactory.create(producer.getOutputPort(), collectorSink.getInputPort());
connectBoundedInterThreads(producer.getOutputPort(), collectorSink.getInputPort());
this.collectorSink = collectorSink;
}
......
......@@ -23,9 +23,6 @@ import static org.junit.Assert.assertThat;
import org.junit.Assert;
import org.junit.Test;
import teetime.framework.pipe.IPipeFactory;
import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
import teetime.stage.Cache;
import teetime.stage.Counter;
import teetime.stage.InitialElementProducer;
......@@ -57,14 +54,13 @@ public class StageTest {
}
private static class TestConfig extends AnalysisConfiguration {
final IPipeFactory intraFact = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false);
public final DelayAndTerminate delay;
public InitialElementProducer<String> init;
public TestConfig() {
init = new InitialElementProducer<String>("Hello");
delay = new DelayAndTerminate(0);
intraFact.create(init.getOutputPort(), delay.getInputPort());
connectIntraThreads(init.getOutputPort(), delay.getInputPort());
addThreadableStage(init);
}
}
......
......@@ -24,9 +24,6 @@ import java.util.Set;
import org.junit.Test;
import teetime.framework.pipe.IPipe;
import teetime.framework.pipe.IPipeFactory;
import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
import teetime.stage.CountingMapMerger;
import teetime.stage.InitialElementProducer;
import teetime.stage.basic.distributor.Distributor;
......@@ -69,13 +66,9 @@ public class TraversorTest {
final Merger<CountingMap<String>> merger = new Merger<CountingMap<String>>();
// CountingMapMerger (already as field)
// PipeFactory instaces for intra- and inter-thread communication
final IPipeFactory interFact = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false);
final IPipeFactory intraFact = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false);
// Connecting the stages of the first part of the config
intraFact.create(init.getOutputPort(), f2b.getInputPort());
intraFact.create(f2b.getOutputPort(), distributor.getInputPort());
connectIntraThreads(init.getOutputPort(), f2b.getInputPort());
connectIntraThreads(f2b.getOutputPort(), distributor.getInputPort());
// Middle part... multiple instances of WordCounter are created and connected to the merger and distrubuter stages
for (int i = 0; i < threads; i++) {
......@@ -83,15 +76,15 @@ public class TraversorTest {
final WordCounter wc = new WordCounter();
// intraFact.create(inputPortSizePrinter.getOutputPort(), wc.getInputPort());
final IPipe distributorPipe = interFact.create(distributor.getNewOutputPort(), wc.getInputPort(), 10000);
final IPipe mergerPipe = interFact.create(wc.getOutputPort(), merger.getNewInputPort());
final IPipe distributorPipe = connectBoundedInterThreads(distributor.getNewOutputPort(), wc.getInputPort(), 10000);
final IPipe mergerPipe = connectBoundedInterThreads(wc.getOutputPort(), merger.getNewInputPort());
// Add WordCounter as a threadable stage, so it runs in its own thread
addThreadableStage(wc);
}
// Connect the stages of the last part
intraFact.create(merger.getOutputPort(), result.getInputPort());
connectIntraThreads(merger.getOutputPort(), result.getInputPort());
// Add the first and last part to the threadable stages
addThreadableStage(init);
......
......@@ -15,9 +15,6 @@
*/
package teetime.framework;
import teetime.framework.pipe.IPipeFactory;
import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
import teetime.stage.Clock;
import teetime.stage.CollectorSink;
import teetime.stage.InitialElementProducer;
......@@ -26,15 +23,10 @@ import teetime.stage.basic.Delay;
class WaitStrategyConfiguration extends AnalysisConfiguration {
private final IPipeFactory intraThreadPipeFactory;
private final IPipeFactory interThreadPipeFactory;
private Delay<Object> delay;
private CollectorSink<Object> collectorSink;
public WaitStrategyConfiguration(final long initialDelayInMs, final Object... elements) {
intraThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false);
interThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false);
Stage producer = buildProducer(elements);
addThreadableStage(producer);
......@@ -50,7 +42,7 @@ class WaitStrategyConfiguration extends AnalysisConfiguration {
Clock clock = new Clock();
clock.setInitialDelayInMs(initialDelayInMs);
interThreadPipeFactory.create(clock.getOutputPort(), delay.getTimestampTriggerInputPort());
connectBoundedInterThreads(clock.getOutputPort(), delay.getTimestampTriggerInputPort());
return clock;
}
......@@ -59,7 +51,7 @@ class WaitStrategyConfiguration extends AnalysisConfiguration {
InitialElementProducer<Object> initialElementProducer = new InitialElementProducer<Object>(elements);
delay = new Delay<Object>();
intraThreadPipeFactory.create(initialElementProducer.getOutputPort(), delay.getInputPort());
connectIntraThreads(initialElementProducer.getOutputPort(), delay.getInputPort());
return initialElementProducer;
}
......@@ -70,8 +62,8 @@ class WaitStrategyConfiguration extends AnalysisConfiguration {
// relay.setIdleStrategy(new WaitStrategy(relay));
interThreadPipeFactory.create(delay.getOutputPort(), relay.getInputPort());
intraThreadPipeFactory.create(relay.getOutputPort(), collectorSink.getInputPort());
connectBoundedInterThreads(delay.getOutputPort(), relay.getInputPort());
connectIntraThreads(relay.getOutputPort(), collectorSink.getInputPort());
this.collectorSink = collectorSink;
......
......@@ -15,22 +15,15 @@
*/
package teetime.framework;
import teetime.framework.pipe.IPipeFactory;
import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
import teetime.stage.CollectorSink;
import teetime.stage.InitialElementProducer;
import teetime.stage.Relay;
class YieldStrategyConfiguration extends AnalysisConfiguration {
private final IPipeFactory intraThreadPipeFactory;
private final IPipeFactory interThreadPipeFactory;
private CollectorSink<Object> collectorSink;
public YieldStrategyConfiguration(final Object... elements) {
intraThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false);
interThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false);
InitialElementProducer<Object> producer = buildProducer(elements);
addThreadableStage(producer);
......@@ -51,8 +44,8 @@ class YieldStrategyConfiguration extends AnalysisConfiguration {
// relay.setIdleStrategy(new YieldStrategy());
interThreadPipeFactory.create(producer.getOutputPort(), relay.getInputPort());
intraThreadPipeFactory.create(relay.getOutputPort(), collectorSink.getInputPort());
connectBoundedInterThreads(producer.getOutputPort(), relay.getInputPort());
connectIntraThreads(relay.getOutputPort(), collectorSink.getInputPort());
this.collectorSink = collectorSink;
......
......@@ -60,6 +60,7 @@ public class ExceptionHandlingTest {
exceptionArised = true;
}
assertTrue(exceptionArised);
exceptionArised = false;
try {
terminatesAllStages();
......
......@@ -16,8 +16,6 @@
package teetime.framework.exceptionHandling;
import teetime.framework.AnalysisConfiguration;
import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
public class ExceptionTestConfiguration extends AnalysisConfiguration {
......@@ -30,8 +28,7 @@ public class ExceptionTestConfiguration extends AnalysisConfiguration {
second = new ExceptionTestConsumerStage();
third = new ExceptionTestProducerStage();
PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false)
.create(first.getOutputPort(), second.getInputPort());
connectBoundedInterThreads(first.getOutputPort(), second.getInputPort());
// this.addThreadableStage(new ExceptionTestStage());
this.addThreadableStage(first);
......
......@@ -32,9 +32,6 @@ import org.junit.Test;
import teetime.framework.Analysis;
import teetime.framework.AnalysisConfiguration;
import teetime.framework.AnalysisException;
import teetime.framework.pipe.IPipeFactory;
import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
import teetime.util.Pair;
/**
......@@ -127,17 +124,15 @@ public class InstanceOfFilterTest {
private static class InstanceOfFilterTestConfig extends AnalysisConfiguration {
private final IPipeFactory pipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false);
public InstanceOfFilterTestConfig() {
InitialElementProducer<Object> elementProducer = new InitialElementProducer<Object>();
InstanceOfFilter<Object, Clazz> instanceOfFilter = new InstanceOfFilter<Object, Clazz>(Clazz.class);
CollectorSink<Clazz> clazzCollector = new CollectorSink<Clazz>();
CollectorSink<Object> mismatchedCollector = new CollectorSink<Object>();
pipeFactory.create(elementProducer.getOutputPort(), instanceOfFilter.getInputPort());
pipeFactory.create(instanceOfFilter.getMatchedOutputPort(), clazzCollector.getInputPort());
pipeFactory.create(instanceOfFilter.getMismatchedOutputPort(), mismatchedCollector.getInputPort());
connectIntraThreads(elementProducer.getOutputPort(), instanceOfFilter.getInputPort());
connectIntraThreads(instanceOfFilter.getMatchedOutputPort(), clazzCollector.getInputPort());
connectIntraThreads(instanceOfFilter.getMismatchedOutputPort(), mismatchedCollector.getInputPort());
addThreadableStage(elementProducer);
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment