Skip to content
Snippets Groups Projects
Commit eac79789 authored by Christian Wulf's avatar Christian Wulf
Browse files

Merge branch 'active-declaration' into 'master'

Active declaration

fixes #174
renamed addThreadableStage to declareActive and added a flag to Stage indicating if the current stage is active or not

See merge request !54
parents 0bfd1018 abeb2125
Branches
Tags
No related merge requests found
Showing
with 56 additions and 36 deletions
...@@ -38,8 +38,8 @@ public abstract class AbstractCompositeStage { ...@@ -38,8 +38,8 @@ public abstract class AbstractCompositeStage {
* @param stage * @param stage
* A arbitrary stage, which will be added to the configuration and executed in a thread. * A arbitrary stage, which will be added to the configuration and executed in a thread.
*/ */
protected final void addThreadableStage(final Stage stage) { protected final void declareActive(final Stage stage) {
this.addThreadableStage(stage, stage.getId()); this.declareActive(stage, stage.getId());
} }
/** /**
...@@ -50,10 +50,11 @@ public abstract class AbstractCompositeStage { ...@@ -50,10 +50,11 @@ public abstract class AbstractCompositeStage {
* @param threadName * @param threadName
* A string which can be used for debugging. * A string which can be used for debugging.
*/ */
protected void addThreadableStage(final Stage stage, final String threadName) { protected void declareActive(final Stage stage, final String threadName) {
AbstractRunnableStage runnable = AbstractRunnableStage.create(stage); AbstractRunnableStage runnable = AbstractRunnableStage.create(stage);
Thread newThread = new TeeTimeThread(runnable, threadName); Thread newThread = new TeeTimeThread(runnable, threadName);
stage.setOwningThread(newThread); stage.setOwningThread(newThread);
stage.setActive(true);
} }
/** /**
...@@ -85,7 +86,7 @@ public abstract class AbstractCompositeStage { ...@@ -85,7 +86,7 @@ public abstract class AbstractCompositeStage {
protected <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { protected <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
if (sourcePort.getOwningStage().getInputPorts().size() == 0) { if (sourcePort.getOwningStage().getInputPorts().size() == 0) {
if (sourcePort.getOwningStage().getOwningThread() == null) { if (sourcePort.getOwningStage().getOwningThread() == null) {
addThreadableStage(sourcePort.getOwningStage(), sourcePort.getOwningStage().getId()); declareActive(sourcePort.getOwningStage(), sourcePort.getOwningStage().getId());
} }
} }
......
...@@ -56,9 +56,9 @@ public abstract class Configuration extends AbstractCompositeStage { ...@@ -56,9 +56,9 @@ public abstract class Configuration extends AbstractCompositeStage {
} }
@Override @Override
protected void addThreadableStage(final Stage stage, final String threadName) { protected void declareActive(final Stage stage, final String threadName) {
startStage = stage; // memorize an arbitrary stage as starting point for traversing startStage = stage; // memorize an arbitrary stage as starting point for traversing
super.addThreadableStage(stage, threadName); super.declareActive(stage, threadName);
} }
@Override @Override
......
...@@ -19,7 +19,7 @@ import java.util.Set; ...@@ -19,7 +19,7 @@ import java.util.Set;
/** /**
* Represents a context that is used by a configuration and composite stages to connect ports, for example. * Represents a context that is used by a configuration and composite stages to connect ports, for example.
* Stages can be added by executing {@link #addThreadableStage(Stage)}. * Stages can be added by executing {@link #declareActive(Stage)}.
* *
* @since 2.0 * @since 2.0
*/ */
......
...@@ -50,6 +50,8 @@ public abstract class Stage { ...@@ -50,6 +50,8 @@ public abstract class Stage {
/** The owning thread of this stage if this stage is directly executed by a {@link AbstractRunnableStage}, <code>null</code> otherwise. */ /** The owning thread of this stage if this stage is directly executed by a {@link AbstractRunnableStage}, <code>null</code> otherwise. */
private Thread owningThread; private Thread owningThread;
private boolean isActive;
private ConfigurationContext owningContext; private ConfigurationContext owningContext;
ConfigurationContext getOwningContext() { ConfigurationContext getOwningContext() {
...@@ -181,4 +183,12 @@ public abstract class Stage { ...@@ -181,4 +183,12 @@ public abstract class Stage {
protected abstract void removeDynamicPort(InputPort<?> inputPort); protected abstract void removeDynamicPort(InputPort<?> inputPort);
public boolean isActive() {
return isActive;
}
void setActive(final boolean isActive) {
this.isActive = isActive;
}
} }
...@@ -61,7 +61,7 @@ class ThreadService extends AbstractService<ThreadService> { ...@@ -61,7 +61,7 @@ class ThreadService extends AbstractService<ThreadService> {
} }
void startStageAtRuntime(final Stage newStage) { void startStageAtRuntime(final Stage newStage) {
configuration.addThreadableStage(newStage); configuration.declareActive(newStage);
Set<Stage> newThreadableStages = initialize(newStage); Set<Stage> newThreadableStages = initialize(newStage);
startThreads(newThreadableStages); startThreads(newThreadableStages);
......
...@@ -91,7 +91,7 @@ public final class StageTester { ...@@ -91,7 +91,7 @@ public final class StageTester {
connectPorts(producer.getOutputPort(), inputHolder.getPort()); connectPorts(producer.getOutputPort(), inputHolder.getPort());
} }
addThreadableStage(stage); declareActive(stage);
for (OutputHolder<?> outputHolder : outputHolders) { for (OutputHolder<?> outputHolder : outputHolders) {
final CollectorSink<Object> sink = new CollectorSink<Object>(outputHolder.getOutputElements()); final CollectorSink<Object> sink = new CollectorSink<Object>(outputHolder.getOutputElements());
......
...@@ -83,7 +83,7 @@ public class WordCounterConfiguration extends Configuration { ...@@ -83,7 +83,7 @@ public class WordCounterConfiguration extends Configuration {
connectPorts(distributor.getNewOutputPort(), threadableStage.getInputPort(), 1000); connectPorts(distributor.getNewOutputPort(), threadableStage.getInputPort(), 1000);
connectPorts(wc.getOutputPort(), merger.getNewInputPort()); connectPorts(wc.getOutputPort(), merger.getNewInputPort());
// Add WordCounter as a threadable stage, so it runs in its own thread // Add WordCounter as a threadable stage, so it runs in its own thread
addThreadableStage(threadableStage.getInputPort().getOwningStage()); declareActive(threadableStage.getInputPort().getOwningStage());
distributorPorts.add(threadableStage.getInputPort()); distributorPorts.add(threadableStage.getInputPort());
mergerPorts.add(wc.getOutputPort()); mergerPorts.add(wc.getOutputPort());
...@@ -95,8 +95,8 @@ public class WordCounterConfiguration extends Configuration { ...@@ -95,8 +95,8 @@ public class WordCounterConfiguration extends Configuration {
connectPorts(merger.getOutputPort(), result.getInputPort()); connectPorts(merger.getOutputPort(), result.getInputPort());
// Add the first and last part to the threadable stages // Add the first and last part to the threadable stages
addThreadableStage(init); declareActive(init);
addThreadableStage(merger); declareActive(merger);
} }
public MonitoringThread getMonitoringThread() { public MonitoringThread getMonitoringThread() {
......
...@@ -27,17 +27,17 @@ public class AbstractCompositeStageTest { ...@@ -27,17 +27,17 @@ public class AbstractCompositeStageTest {
@Ignore @Ignore
@Test @Test
public void testNestedStages() { public void testNestedStages() {
Execution<NestesConfig> exec = new Execution<NestesConfig>(new NestesConfig()); Execution<NestedConf> exec = new Execution<NestedConf>(new NestedConf());
// assertThat(exec.getConfiguration().getContext().getThreadableStages().size(), is(3)); // assertThat(exec.getConfiguration().getContext().getThreadableStages().size(), is(3));
} }
private class NestesConfig extends Configuration { private class NestedConf extends Configuration {
private final InitialElementProducer<Object> init; private final InitialElementProducer<Object> init;
private final Sink sink; private final Sink sink;
private final TestNestingCompositeStage compositeStage; private final TestNestingCompositeStage compositeStage;
public NestesConfig() { public NestedConf() {
init = new InitialElementProducer<Object>(new Object()); init = new InitialElementProducer<Object>(new Object());
sink = new Sink(); sink = new Sink();
compositeStage = new TestNestingCompositeStage(); compositeStage = new TestNestingCompositeStage();
...@@ -52,7 +52,7 @@ public class AbstractCompositeStageTest { ...@@ -52,7 +52,7 @@ public class AbstractCompositeStageTest {
private final Counter firstCounter = new Counter(); private final Counter firstCounter = new Counter();
public TestCompositeOneStage() { public TestCompositeOneStage() {
addThreadableStage(firstCounter); declareActive(firstCounter);
} }
} }
...@@ -63,7 +63,7 @@ public class AbstractCompositeStageTest { ...@@ -63,7 +63,7 @@ public class AbstractCompositeStageTest {
private final Counter secondCounter = new Counter(); private final Counter secondCounter = new Counter();
public TestCompositeTwoStage() { public TestCompositeTwoStage() {
addThreadableStage(firstCounter); declareActive(firstCounter);
connectPorts(firstCounter.getOutputPort(), secondCounter.getInputPort()); connectPorts(firstCounter.getOutputPort(), secondCounter.getInputPort());
} }
......
...@@ -118,7 +118,7 @@ public class ExecutionTest { ...@@ -118,7 +118,7 @@ public class ExecutionTest {
public AnalysisTestConfig(final boolean inter) { public AnalysisTestConfig(final boolean inter) {
connectPorts(init.getOutputPort(), sink.getInputPort()); connectPorts(init.getOutputPort(), sink.getInputPort());
if (inter) { if (inter) {
addThreadableStage(sink); declareActive(sink);
} }
} }
} }
...@@ -143,7 +143,7 @@ public class ExecutionTest { ...@@ -143,7 +143,7 @@ public class ExecutionTest {
connectPorts(init.getOutputPort(), iof.getInputPort()); connectPorts(init.getOutputPort(), iof.getInputPort());
connectPorts(iof.getMatchedOutputPort(), sink.getInputPort()); connectPorts(iof.getMatchedOutputPort(), sink.getInputPort());
connectPorts(init.createOutputPort(), sink.createInputPort()); connectPorts(init.createOutputPort(), sink.createInputPort());
addThreadableStage(iof); declareActive(iof);
} }
} }
...@@ -191,7 +191,7 @@ public class ExecutionTest { ...@@ -191,7 +191,7 @@ public class ExecutionTest {
stageWithNamedThread = new InitialElementProducer<Object>(new Object()); stageWithNamedThread = new InitialElementProducer<Object>(new Object());
Sink<Object> sink = new Sink<Object>(); Sink<Object> sink = new Sink<Object>();
addThreadableStage(stageWithNamedThread, "TestName"); declareActive(stageWithNamedThread, "TestName");
connectPorts(stageWithNamedThread.getOutputPort(), sink.getInputPort()); connectPorts(stageWithNamedThread.getOutputPort(), sink.getInputPort());
} }
......
...@@ -30,11 +30,11 @@ public class RunnableConsumerStageTestConfiguration extends Configuration { ...@@ -30,11 +30,11 @@ public class RunnableConsumerStageTestConfiguration extends Configuration {
public RunnableConsumerStageTestConfiguration(final Integer... inputElements) { public RunnableConsumerStageTestConfiguration(final Integer... inputElements) {
InitialElementProducer<Integer> producer = new InitialElementProducer<Integer>(inputElements); InitialElementProducer<Integer> producer = new InitialElementProducer<Integer>(inputElements);
if (inputElements.length > 0) { if (inputElements.length > 0) {
addThreadableStage(producer); declareActive(producer);
} }
CollectorSink<Integer> collectorSink = new CollectorSink<Integer>(collectedElements); CollectorSink<Integer> collectorSink = new CollectorSink<Integer>(collectedElements);
addThreadableStage(collectorSink); declareActive(collectorSink);
// Can not use createPorts, as the if condition above will lead to an exception // Can not use createPorts, as the if condition above will lead to an exception
new SpScPipeFactory().create(producer.getOutputPort(), collectorSink.getInputPort()); new SpScPipeFactory().create(producer.getOutputPort(), collectorSink.getInputPort());
......
...@@ -18,7 +18,9 @@ package teetime.framework; ...@@ -18,7 +18,9 @@ package teetime.framework;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.notNullValue;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat; import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
...@@ -53,6 +55,13 @@ public class StageTest { ...@@ -53,6 +55,13 @@ public class StageTest {
assertEquals(tc.init.exceptionListener, tc.delay.exceptionListener); assertEquals(tc.init.exceptionListener, tc.delay.exceptionListener);
} }
@Test
public void testActiveFlag() {
TestConfig config = new TestConfig();
assertTrue(config.init.isActive());
assertFalse(config.delay.isActive());
}
private static class TestConfig extends Configuration { private static class TestConfig extends Configuration {
public final DelayAndTerminate delay; public final DelayAndTerminate delay;
public InitialElementProducer<String> init; public InitialElementProducer<String> init;
......
...@@ -55,11 +55,11 @@ public class TerminationTest { ...@@ -55,11 +55,11 @@ public class TerminationTest {
connectPorts(init.getOutputPort(), firstProp.getInputPort()); connectPorts(init.getOutputPort(), firstProp.getInputPort());
connectPorts(firstProp.getOutputPort(), sinkStage.getInputPort(), capacity); connectPorts(firstProp.getOutputPort(), sinkStage.getInputPort(), capacity);
connectPorts(sinkStage.getOutputPort(), finalProp.getInputPort()); connectPorts(sinkStage.getOutputPort(), finalProp.getInputPort());
addThreadableStage(sinkStage); declareActive(sinkStage);
} else { } else {
Sink<Integer> sink = new Sink<Integer>(); Sink<Integer> sink = new Sink<Integer>();
connectPorts(init.getOutputPort(), sink.getInputPort(), capacity); connectPorts(init.getOutputPort(), sink.getInputPort(), capacity);
addThreadableStage(sink); declareActive(sink);
} }
} }
......
...@@ -86,14 +86,14 @@ public class TraverserTest { ...@@ -86,14 +86,14 @@ public class TraverserTest {
connectPorts(distributor.getNewOutputPort(), wc.getInputPort()); connectPorts(distributor.getNewOutputPort(), wc.getInputPort());
connectPorts(wc.getOutputPort(), merger.getNewInputPort()); connectPorts(wc.getOutputPort(), merger.getNewInputPort());
// Add WordCounter as a threadable stage, so it runs in its own thread // Add WordCounter as a threadable stage, so it runs in its own thread
addThreadableStage(wc.getInputPort().getOwningStage()); declareActive(wc.getInputPort().getOwningStage());
} }
// Connect the stages of the last part // Connect the stages of the last part
connectPorts(merger.getOutputPort(), result.getInputPort()); connectPorts(merger.getOutputPort(), result.getInputPort());
// Add the first and last part to the threadable stages // Add the first and last part to the threadable stages
addThreadableStage(merger); declareActive(merger);
} }
} }
......
...@@ -29,13 +29,13 @@ class WaitStrategyConfiguration extends Configuration { ...@@ -29,13 +29,13 @@ class WaitStrategyConfiguration extends Configuration {
public WaitStrategyConfiguration(final long initialDelayInMs, final Object... elements) { public WaitStrategyConfiguration(final long initialDelayInMs, final Object... elements) {
Stage producer = buildProducer(elements); Stage producer = buildProducer(elements);
addThreadableStage(producer); declareActive(producer);
Stage consumer = buildConsumer(delay); Stage consumer = buildConsumer(delay);
addThreadableStage(consumer); declareActive(consumer);
Clock clock = buildClock(initialDelayInMs, delay); Clock clock = buildClock(initialDelayInMs, delay);
addThreadableStage(clock); declareActive(clock);
} }
private Clock buildClock(final long initialDelayInMs, final Delay<Object> delay) { private Clock buildClock(final long initialDelayInMs, final Delay<Object> delay) {
......
...@@ -26,10 +26,10 @@ class YieldStrategyConfiguration extends Configuration { ...@@ -26,10 +26,10 @@ class YieldStrategyConfiguration extends Configuration {
public YieldStrategyConfiguration(final Object... elements) { public YieldStrategyConfiguration(final Object... elements) {
InitialElementProducer<Object> producer = buildProducer(elements); InitialElementProducer<Object> producer = buildProducer(elements);
addThreadableStage(producer); declareActive(producer);
Stage consumer = buildConsumer(producer); Stage consumer = buildConsumer(producer);
addThreadableStage(consumer); declareActive(consumer);
} }
private InitialElementProducer<Object> buildProducer(final Object... elements) { private InitialElementProducer<Object> buildProducer(final Object... elements) {
......
...@@ -33,8 +33,8 @@ public class ExceptionTestConfiguration extends Configuration { ...@@ -33,8 +33,8 @@ public class ExceptionTestConfiguration extends Configuration {
connectPorts(first.getOutputPort(), second.getInputPort()); connectPorts(first.getOutputPort(), second.getInputPort());
// this.addThreadableStage(new ExceptionTestStage()); // this.addThreadableStage(new ExceptionTestStage());
this.addThreadableStage(second); this.declareActive(second);
this.addThreadableStage(third); this.declareActive(third);
} }
} }
...@@ -137,8 +137,8 @@ public class DynamicDistributorTest { ...@@ -137,8 +137,8 @@ public class DynamicDistributorTest {
connectPorts(initialElementProducer.getOutputPort(), distributor.getInputPort()); connectPorts(initialElementProducer.getOutputPort(), distributor.getInputPort());
connectPorts(distributor.getNewOutputPort(), collectorSink.getInputPort()); connectPorts(distributor.getNewOutputPort(), collectorSink.getInputPort());
addThreadableStage(distributor); declareActive(distributor);
addThreadableStage(collectorSink); declareActive(collectorSink);
for (PortAction<DynamicDistributor<T>> a : inputActions) { for (PortAction<DynamicDistributor<T>> a : inputActions) {
distributor.addPortActionRequest(a); distributor.addPortActionRequest(a);
......
...@@ -118,7 +118,7 @@ public class DynamicMergerTest { ...@@ -118,7 +118,7 @@ public class DynamicMergerTest {
connectPorts(initialElementProducer.getOutputPort(), merger.getNewInputPort()); connectPorts(initialElementProducer.getOutputPort(), merger.getNewInputPort());
connectPorts(merger.getOutputPort(), collectorSink.getInputPort()); connectPorts(merger.getOutputPort(), collectorSink.getInputPort());
addThreadableStage(merger); declareActive(merger);
for (PortAction<DynamicMerger<T>> a : inputActions) { for (PortAction<DynamicMerger<T>> a : inputActions) {
boolean added = merger.addPortActionRequest(a); boolean added = merger.addPortActionRequest(a);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment