Skip to content
Snippets Groups Projects
Commit 071bf905 authored by Christian Wulf's avatar Christian Wulf
Browse files

Merge branch 'moved-declareActive' into 'master'

moved declareActive to Stage

See merge request !58
parents e8e48ee6 16e07237
No related branches found
No related tags found
No related merge requests found
Showing
with 65 additions and 59 deletions
......@@ -32,31 +32,6 @@ public abstract class AbstractCompositeStage {
*/
private static final int DEFAULT_CAPACITY = 4;
/**
* Execute this method, to add a stage to the configuration, which should be executed in a own thread.
*
* @param stage
* A arbitrary stage, which will be added to the configuration and executed in a thread.
*/
protected final void declareActive(final Stage stage) {
this.declareActive(stage, stage.getId());
}
/**
* Execute this method, to add a stage to the configuration, which should be executed in a own thread.
*
* @param stage
* A arbitrary stage, which will be added to the configuration and executed in a thread.
* @param threadName
* A string which can be used for debugging.
*/
protected void declareActive(final Stage stage, final String threadName) {
AbstractRunnableStage runnable = AbstractRunnableStage.create(stage);
Thread newThread = new TeeTimeThread(runnable, threadName);
stage.setOwningThread(newThread);
stage.setActive(true);
}
/**
* Connects two ports with a pipe with a default capacity of currently {@value #DEFAULT_CAPACITY}.
*
......@@ -86,7 +61,7 @@ public abstract class AbstractCompositeStage {
protected <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
if (sourcePort.getOwningStage().getInputPorts().size() == 0) {
if (sourcePort.getOwningStage().getOwningThread() == null) {
declareActive(sourcePort.getOwningStage(), sourcePort.getOwningStage().getId());
sourcePort.getOwningStage().declareActive();
}
}
......
......@@ -64,10 +64,14 @@ public abstract class Configuration extends AbstractCompositeStage {
return factory;
}
@Override
protected void declareActive(final Stage stage, final String threadName) {
startStage = stage; // memorize an arbitrary stage as starting point for traversing
super.declareActive(stage, threadName);
/**
* Register pipes if your configuration only relies on custom pipes and therefore {@link #connectPorts(OutputPort, InputPort)} is never called.
*
* @param pipe
* A custom pipe instance
*/
protected void registerCustomPipe(final AbstractPipe<?> pipe) {
startStage = pipe.getSourcePort().getOwningStage(); // memorize an arbitrary stage as starting point for traversing
}
@Override
......
......@@ -191,4 +191,29 @@ public abstract class Stage {
this.isActive = isActive;
}
/**
* Execute this method, to add a stage to the configuration, which should be executed in a own thread.
*
* @param stage
* A arbitrary stage, which will be added to the configuration and executed in a thread.
*/
public void declareActive() {
declareActive(getId());
}
/**
* Execute this method, to add a stage to the configuration, which should be executed in a own thread.
*
* @param stage
* A arbitrary stage, which will be added to the configuration and executed in a thread.
* @param threadName
* A string which can be used for debugging.
*/
public void declareActive(final String threadName) {
AbstractRunnableStage runnable = AbstractRunnableStage.create(this);
Thread newThread = new TeeTimeThread(runnable, threadName);
this.setOwningThread(newThread);
this.setActive(true);
}
}
......@@ -61,7 +61,7 @@ class ThreadService extends AbstractService<ThreadService> {
}
void startStageAtRuntime(final Stage newStage) {
configuration.declareActive(newStage);
newStage.declareActive();
Set<Stage> newThreadableStages = initialize(newStage);
startThreads(newThreadableStages);
......
......@@ -91,7 +91,7 @@ public final class StageTester {
connectPorts(producer.getOutputPort(), inputHolder.getPort());
}
declareActive(stage);
stage.declareActive();
for (OutputHolder<?> outputHolder : outputHolders) {
final CollectorSink<Object> sink = new CollectorSink<Object>(outputHolder.getOutputElements());
......
......@@ -83,7 +83,7 @@ public class WordCounterConfiguration extends Configuration {
connectPorts(distributor.getNewOutputPort(), threadableStage.getInputPort(), 1000);
connectPorts(wc.getOutputPort(), merger.getNewInputPort());
// Add WordCounter as a threadable stage, so it runs in its own thread
declareActive(threadableStage.getInputPort().getOwningStage());
threadableStage.getInputPort().getOwningStage().declareActive();
distributorPorts.add(threadableStage.getInputPort());
mergerPorts.add(wc.getOutputPort());
......@@ -95,8 +95,8 @@ public class WordCounterConfiguration extends Configuration {
connectPorts(merger.getOutputPort(), result.getInputPort());
// Add the first and last part to the threadable stages
declareActive(init);
declareActive(merger);
init.declareActive();
merger.declareActive();
}
public MonitoringThread getMonitoringThread() {
......
......@@ -34,12 +34,12 @@ public class AbstractCompositeStageTest {
private class NestedConf extends Configuration {
private final InitialElementProducer<Object> init;
private final Sink sink;
private final Sink<Object> sink;
private final TestNestingCompositeStage compositeStage;
public NestedConf() {
init = new InitialElementProducer<Object>(new Object());
sink = new Sink();
sink = new Sink<Object>();
compositeStage = new TestNestingCompositeStage();
connectPorts(init.getOutputPort(), compositeStage.firstCompositeStage.firstCounter.getInputPort());
connectPorts(compositeStage.secondCompositeStage.secondCounter.getOutputPort(), sink.getInputPort());
......@@ -52,7 +52,7 @@ public class AbstractCompositeStageTest {
private final Counter firstCounter = new Counter();
public TestCompositeOneStage() {
declareActive(firstCounter);
firstCounter.declareActive();
}
}
......@@ -63,7 +63,7 @@ public class AbstractCompositeStageTest {
private final Counter secondCounter = new Counter();
public TestCompositeTwoStage() {
declareActive(firstCounter);
firstCounter.declareActive();
connectPorts(firstCounter.getOutputPort(), secondCounter.getInputPort());
}
......
......@@ -118,7 +118,7 @@ public class ExecutionTest {
public AnalysisTestConfig(final boolean inter) {
connectPorts(init.getOutputPort(), sink.getInputPort());
if (inter) {
declareActive(sink);
sink.declareActive();
}
}
}
......@@ -143,7 +143,7 @@ public class ExecutionTest {
connectPorts(init.getOutputPort(), iof.getInputPort());
connectPorts(iof.getMatchedOutputPort(), sink.getInputPort());
connectPorts(init.createOutputPort(), sink.createInputPort());
declareActive(iof);
iof.declareActive();
}
}
......@@ -191,7 +191,7 @@ public class ExecutionTest {
stageWithNamedThread = new InitialElementProducer<Object>(new Object());
Sink<Object> sink = new Sink<Object>();
declareActive(stageWithNamedThread, "TestName");
stageWithNamedThread.declareActive("TestName");
connectPorts(stageWithNamedThread.getOutputPort(), sink.getInputPort());
}
......
......@@ -18,6 +18,7 @@ package teetime.framework;
import java.util.ArrayList;
import java.util.List;
import teetime.framework.pipe.IPipe;
import teetime.framework.pipe.SpScPipeFactory;
import teetime.stage.CollectorSink;
import teetime.stage.InitialElementProducer;
......@@ -30,14 +31,15 @@ public class RunnableConsumerStageTestConfiguration extends Configuration {
public RunnableConsumerStageTestConfiguration(final Integer... inputElements) {
InitialElementProducer<Integer> producer = new InitialElementProducer<Integer>(inputElements);
if (inputElements.length > 0) {
declareActive(producer);
producer.declareActive();
}
CollectorSink<Integer> collectorSink = new CollectorSink<Integer>(collectedElements);
declareActive(collectorSink);
collectorSink.declareActive();
// Can not use createPorts, as the if condition above will lead to an exception
new SpScPipeFactory().create(producer.getOutputPort(), collectorSink.getInputPort());
IPipe pipe = new SpScPipeFactory().create(producer.getOutputPort(), collectorSink.getInputPort());
registerCustomPipe((AbstractPipe<?>) pipe);
this.collectorSink = collectorSink;
}
......
......@@ -55,11 +55,11 @@ public class TerminationTest {
connectPorts(init.getOutputPort(), firstProp.getInputPort());
connectPorts(firstProp.getOutputPort(), sinkStage.getInputPort(), capacity);
connectPorts(sinkStage.getOutputPort(), finalProp.getInputPort());
declareActive(sinkStage);
sinkStage.declareActive();
} else {
Sink<Integer> sink = new Sink<Integer>();
connectPorts(init.getOutputPort(), sink.getInputPort(), capacity);
declareActive(sink);
sink.declareActive();
}
}
......
......@@ -86,14 +86,14 @@ public class TraverserTest {
connectPorts(distributor.getNewOutputPort(), wc.getInputPort());
connectPorts(wc.getOutputPort(), merger.getNewInputPort());
// Add WordCounter as a threadable stage, so it runs in its own thread
declareActive(wc.getInputPort().getOwningStage());
wc.getInputPort().getOwningStage().declareActive();
}
// Connect the stages of the last part
connectPorts(merger.getOutputPort(), result.getInputPort());
// Add the first and last part to the threadable stages
declareActive(merger);
merger.declareActive();
}
}
......
......@@ -29,13 +29,13 @@ class WaitStrategyConfiguration extends Configuration {
public WaitStrategyConfiguration(final long initialDelayInMs, final Object... elements) {
Stage producer = buildProducer(elements);
declareActive(producer);
producer.declareActive();
Stage consumer = buildConsumer(delay);
declareActive(consumer);
consumer.declareActive();
Clock clock = buildClock(initialDelayInMs, delay);
declareActive(clock);
clock.declareActive();
}
private Clock buildClock(final long initialDelayInMs, final Delay<Object> delay) {
......
......@@ -26,10 +26,10 @@ class YieldStrategyConfiguration extends Configuration {
public YieldStrategyConfiguration(final Object... elements) {
InitialElementProducer<Object> producer = buildProducer(elements);
declareActive(producer);
producer.declareActive();
Stage consumer = buildConsumer(producer);
declareActive(consumer);
consumer.declareActive();
}
private InitialElementProducer<Object> buildProducer(final Object... elements) {
......
......@@ -33,8 +33,8 @@ public class ExceptionTestConfiguration extends Configuration {
connectPorts(first.getOutputPort(), second.getInputPort());
// this.addThreadableStage(new ExceptionTestStage());
this.declareActive(second);
this.declareActive(third);
second.declareActive();
third.declareActive();
}
}
......@@ -137,8 +137,8 @@ public class DynamicDistributorTest {
connectPorts(initialElementProducer.getOutputPort(), distributor.getInputPort());
connectPorts(distributor.getNewOutputPort(), collectorSink.getInputPort());
declareActive(distributor);
declareActive(collectorSink);
distributor.declareActive();
collectorSink.declareActive();
for (PortAction<DynamicDistributor<T>> a : inputActions) {
distributor.addPortActionRequest(a);
......
......@@ -118,7 +118,7 @@ public class DynamicMergerTest {
connectPorts(initialElementProducer.getOutputPort(), merger.getNewInputPort());
connectPorts(merger.getOutputPort(), collectorSink.getInputPort());
declareActive(merger);
merger.declareActive();
for (PortAction<DynamicMerger<T>> a : inputActions) {
boolean added = merger.addPortActionRequest(a);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment