Skip to content
Snippets Groups Projects
Commit 3e74fe4a authored by Nelson Tavares de Sousa's avatar Nelson Tavares de Sousa
Browse files

renamed method

parent 43f27c9d
No related branches found
No related tags found
No related merge requests found
Showing
with 33 additions and 33 deletions
......@@ -38,8 +38,8 @@ public abstract class AbstractCompositeStage {
* @param stage
* A arbitrary stage, which will be added to the configuration and executed in a thread.
*/
protected final void addThreadableStage(final Stage stage) {
this.addThreadableStage(stage, stage.getId());
protected final void declareActive(final Stage stage) {
this.declareActive(stage, stage.getId());
}
/**
......@@ -50,7 +50,7 @@ public abstract class AbstractCompositeStage {
* @param threadName
* A string which can be used for debugging.
*/
protected void addThreadableStage(final Stage stage, final String threadName) {
protected void declareActive(final Stage stage, final String threadName) {
AbstractRunnableStage runnable = AbstractRunnableStage.create(stage);
Thread newThread = new TeeTimeThread(runnable, threadName);
stage.setOwningThread(newThread);
......@@ -89,7 +89,7 @@ public abstract class AbstractCompositeStage {
private final <T> void connectPortsInternal(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
if (sourcePort.getOwningStage().getInputPorts().size() == 0) {
if (sourcePort.getOwningStage().getOwningThread() == null) {
addThreadableStage(sourcePort.getOwningStage(), sourcePort.getOwningStage().getId());
declareActive(sourcePort.getOwningStage(), sourcePort.getOwningStage().getId());
}
}
......
......@@ -56,9 +56,9 @@ public abstract class Configuration extends AbstractCompositeStage {
}
@Override
protected void addThreadableStage(final Stage stage, final String threadName) {
protected void declareActive(final Stage stage, final String threadName) {
startStage = stage; // memorize an arbitrary stage as starting point for traversing
super.addThreadableStage(stage, threadName);
super.declareActive(stage, threadName);
}
@Override
......
......@@ -19,7 +19,7 @@ import java.util.Set;
/**
* Represents a context that is used by a configuration and composite stages to connect ports, for example.
* Stages can be added by executing {@link #addThreadableStage(Stage)}.
* Stages can be added by executing {@link #declareActive(Stage)}.
*
* @since 2.0
*/
......
......@@ -61,7 +61,7 @@ class ThreadService extends AbstractService<ThreadService> {
}
void startStageAtRuntime(final Stage newStage) {
configuration.addThreadableStage(newStage);
configuration.declareActive(newStage);
Set<Stage> newThreadableStages = initialize(newStage);
startThreads(newThreadableStages);
......
......@@ -91,7 +91,7 @@ public final class StageTester {
connectPorts(producer.getOutputPort(), inputHolder.getPort());
}
addThreadableStage(stage);
declareActive(stage);
for (OutputHolder<?> outputHolder : outputHolders) {
final CollectorSink<Object> sink = new CollectorSink<Object>(outputHolder.getOutputElements());
......
......@@ -83,7 +83,7 @@ public class WordCounterConfiguration extends Configuration {
connectPorts(distributor.getNewOutputPort(), threadableStage.getInputPort(), 1000);
connectPorts(wc.getOutputPort(), merger.getNewInputPort());
// Add WordCounter as a threadable stage, so it runs in its own thread
addThreadableStage(threadableStage.getInputPort().getOwningStage());
declareActive(threadableStage.getInputPort().getOwningStage());
distributorPorts.add(threadableStage.getInputPort());
mergerPorts.add(wc.getOutputPort());
......@@ -95,8 +95,8 @@ public class WordCounterConfiguration extends Configuration {
connectPorts(merger.getOutputPort(), result.getInputPort());
// Add the first and last part to the threadable stages
addThreadableStage(init);
addThreadableStage(merger);
declareActive(init);
declareActive(merger);
}
public MonitoringThread getMonitoringThread() {
......
......@@ -52,7 +52,7 @@ public class AbstractCompositeStageTest {
private final Counter firstCounter = new Counter();
public TestCompositeOneStage() {
addThreadableStage(firstCounter);
declareActive(firstCounter);
}
}
......@@ -63,7 +63,7 @@ public class AbstractCompositeStageTest {
private final Counter secondCounter = new Counter();
public TestCompositeTwoStage() {
addThreadableStage(firstCounter);
declareActive(firstCounter);
connectPorts(firstCounter.getOutputPort(), secondCounter.getInputPort());
}
......
......@@ -118,7 +118,7 @@ public class ExecutionTest {
public AnalysisTestConfig(final boolean inter) {
connectPorts(init.getOutputPort(), sink.getInputPort());
if (inter) {
addThreadableStage(sink);
declareActive(sink);
}
}
}
......@@ -143,7 +143,7 @@ public class ExecutionTest {
connectPorts(init.getOutputPort(), iof.getInputPort());
connectPorts(iof.getMatchedOutputPort(), sink.getInputPort());
connectPorts(init.createOutputPort(), sink.createInputPort());
addThreadableStage(iof);
declareActive(iof);
}
}
......@@ -191,7 +191,7 @@ public class ExecutionTest {
stageWithNamedThread = new InitialElementProducer<Object>(new Object());
Sink<Object> sink = new Sink<Object>();
addThreadableStage(stageWithNamedThread, "TestName");
declareActive(stageWithNamedThread, "TestName");
connectPorts(stageWithNamedThread.getOutputPort(), sink.getInputPort());
}
......
......@@ -30,11 +30,11 @@ public class RunnableConsumerStageTestConfiguration extends Configuration {
public RunnableConsumerStageTestConfiguration(final Integer... inputElements) {
InitialElementProducer<Integer> producer = new InitialElementProducer<Integer>(inputElements);
if (inputElements.length > 0) {
addThreadableStage(producer);
declareActive(producer);
}
CollectorSink<Integer> collectorSink = new CollectorSink<Integer>(collectedElements);
addThreadableStage(collectorSink);
declareActive(collectorSink);
// Can not use createPorts, as the if condition above will lead to an exception
new SpScPipeFactory().create(producer.getOutputPort(), collectorSink.getInputPort());
......
......@@ -55,11 +55,11 @@ public class TerminationTest {
connectPorts(init.getOutputPort(), firstProp.getInputPort());
connectPorts(firstProp.getOutputPort(), sinkStage.getInputPort(), capacity);
connectPorts(sinkStage.getOutputPort(), finalProp.getInputPort());
addThreadableStage(sinkStage);
declareActive(sinkStage);
} else {
Sink<Integer> sink = new Sink<Integer>();
connectPorts(init.getOutputPort(), sink.getInputPort(), capacity);
addThreadableStage(sink);
declareActive(sink);
}
}
......
......@@ -86,14 +86,14 @@ public class TraverserTest {
connectPorts(distributor.getNewOutputPort(), wc.getInputPort());
connectPorts(wc.getOutputPort(), merger.getNewInputPort());
// Add WordCounter as a threadable stage, so it runs in its own thread
addThreadableStage(wc.getInputPort().getOwningStage());
declareActive(wc.getInputPort().getOwningStage());
}
// Connect the stages of the last part
connectPorts(merger.getOutputPort(), result.getInputPort());
// Add the first and last part to the threadable stages
addThreadableStage(merger);
declareActive(merger);
}
}
......
......@@ -29,13 +29,13 @@ class WaitStrategyConfiguration extends Configuration {
public WaitStrategyConfiguration(final long initialDelayInMs, final Object... elements) {
Stage producer = buildProducer(elements);
addThreadableStage(producer);
declareActive(producer);
Stage consumer = buildConsumer(delay);
addThreadableStage(consumer);
declareActive(consumer);
Clock clock = buildClock(initialDelayInMs, delay);
addThreadableStage(clock);
declareActive(clock);
}
private Clock buildClock(final long initialDelayInMs, final Delay<Object> delay) {
......
......@@ -26,10 +26,10 @@ class YieldStrategyConfiguration extends Configuration {
public YieldStrategyConfiguration(final Object... elements) {
InitialElementProducer<Object> producer = buildProducer(elements);
addThreadableStage(producer);
declareActive(producer);
Stage consumer = buildConsumer(producer);
addThreadableStage(consumer);
declareActive(consumer);
}
private InitialElementProducer<Object> buildProducer(final Object... elements) {
......
......@@ -33,8 +33,8 @@ public class ExceptionTestConfiguration extends Configuration {
connectPorts(first.getOutputPort(), second.getInputPort());
// this.addThreadableStage(new ExceptionTestStage());
this.addThreadableStage(second);
this.addThreadableStage(third);
this.declareActive(second);
this.declareActive(third);
}
}
......@@ -137,8 +137,8 @@ public class DynamicDistributorTest {
connectPorts(initialElementProducer.getOutputPort(), distributor.getInputPort());
connectPorts(distributor.getNewOutputPort(), collectorSink.getInputPort());
addThreadableStage(distributor);
addThreadableStage(collectorSink);
declareActive(distributor);
declareActive(collectorSink);
for (PortAction<DynamicDistributor<T>> a : inputActions) {
distributor.addPortActionRequest(a);
......
......@@ -118,7 +118,7 @@ public class DynamicMergerTest {
connectPorts(initialElementProducer.getOutputPort(), merger.getNewInputPort());
connectPorts(merger.getOutputPort(), collectorSink.getInputPort());
addThreadableStage(merger);
declareActive(merger);
for (PortAction<DynamicMerger<T>> a : inputActions) {
boolean added = merger.addPortActionRequest(a);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment