Skip to content
Snippets Groups Projects
Commit 48fb3a87 authored by Christian Wulf's avatar Christian Wulf
Browse files

finished new context implementation

parent 0d5f73b8
No related branches found
No related tags found
No related merge requests found
......@@ -15,10 +15,10 @@ public class A1ThreadableStageCollector implements ITraverserVisitor {
@Override
public VisitorBehavior visit(final Stage stage) {
if (stage.getOwningThread() != null && !threadableStages.contains(stage)) {
if (stage.getOwningThread() != null && !threadableStages.contains(stage) && stage.getCurrentState() == StageState.CREATED) {
threadableStages.add(stage);
}
return VisitorBehavior.CONTINUE;
return stage.getCurrentState() == StageState.CREATED ? VisitorBehavior.CONTINUE : VisitorBehavior.STOP;
}
@Override
......
......@@ -32,16 +32,6 @@ public abstract class AbstractCompositeStage {
*/
private static final int DEFAULT_CAPACITY = 4;
// private final ConfigurationContext context;
public AbstractCompositeStage() {
// this.context = new ConfigurationContext(this);
}
// ConfigurationContext getContext() {
// return context;
// }
/**
* Execute this method, to add a stage to the configuration, which should be executed in a own thread.
*
......@@ -61,7 +51,6 @@ public abstract class AbstractCompositeStage {
* A string which can be used for debugging.
*/
protected void addThreadableStage(final Stage stage, final String threadName) {
// context.addThreadableStage(stage, threadName);
AbstractRunnableStage runnable = AbstractRunnableStage.create(stage);
Thread newThread = new TeeTimeThread(runnable, threadName);
stage.setOwningThread(newThread);
......@@ -94,26 +83,16 @@ public abstract class AbstractCompositeStage {
* the type of elements to be sent
*/
protected <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
// context.connectPorts(sourcePort, targetPort, capacity);
connectPortsInternal(sourcePort, targetPort, capacity);
}
private final <T> void connectPortsInternal(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
if (sourcePort.getOwningStage().getInputPorts().size() == 0) {
// if (!threadService.getThreadableStages().containsKey(sourcePort.getOwningStage())) {
if (sourcePort.getOwningStage().getOwningThread() == null) {
addThreadableStage(sourcePort.getOwningStage(), sourcePort.getOwningStage().getId());
}
}
// if (LOGGER.isWarnEnabled() && (sourcePort.getPipe() != null || targetPort.getPipe() != null)) {
// LOGGER.warn("Overwriting existing pipe while connecting stages " +
// sourcePort.getOwningStage().getId() + " and " + targetPort.getOwningStage().getId() + ".");
// }
// addChildContext(sourcePort.getOwningStage());
// addChildContext(targetPort.getOwningStage());
new InstantiationPipe<T>(sourcePort, targetPort, capacity);
}
......
......@@ -14,14 +14,10 @@ abstract class AbstractService<T> {
abstract void onInitialize();
abstract void onStart();
abstract void onExecute();
abstract void onTerminate();
abstract void onFinish();
// abstract void merge(T source);
}
......@@ -25,36 +25,5 @@ public final class RuntimeServiceFacade {
public void startWithinNewThread(final Stage previousStage, final Stage stage) {
previousStage.getOwningContext().getThreadService().startStageAtRuntime(stage);
// SignalingCounter runtimeCounter = previousStage.getOwningContext().getThreadService().getRunnableCounter();
// SignalingCounter newCounter = stage.getOwningContext().getThreadService().getRunnableCounter();
// runtimeCounter.inc(newCounter);
// stage.logger.error(stage.owningContext.getThreadService().getRunnableCounter().toString());
// !!! stage.owningContext = XXX.owningContext !!!
// Runnable runnable = AbstractRunnableStage.create(stage);
// Thread thread = new Thread(runnable);
//
// stage.setOwningThread(thread);
// stage.setExceptionHandler(null);
//
// thread.start();
// requirements:
// 1. all new threads from stage must be known to the global context
// 2. number of active threads must be increased by the stage
// if (runnable instanceof RunnableConsumerStage) {
// // do nothing
// } else if (runnable instanceof RunnableProducerStage) {
// ((RunnableProducerStage) runnable).triggerInitializingSignal();
// ((RunnableProducerStage) runnable).triggerStartingSignal();
// } else {
// // TODO
// }
// stage.onSignal(signal, inputPort);
}
}
......@@ -39,16 +39,17 @@ class ThreadService extends AbstractService<ThreadService> {
@Override
void onInitialize() {
Stage startStage = configuration.getStartStage();
initialize(startStage);
onStart();
Set<Stage> newThreadableStages = initialize(startStage);
startThreads(newThreadableStages);
sendInitializingSignal(newThreadableStages);
}
void startStageAtRuntime(final Stage newStage) {
Set<Stage> newThreadableStages = initialize(newStage);
configuration.addThreadableStage(newStage);
Set<Stage> newThreadableStages = initialize(newStage);
startThreads(newThreadableStages);
sendInitializingSignal(newThreadableStages);
sendStartingSignal(newThreadableStages);
......@@ -128,13 +129,6 @@ class ThreadService extends AbstractService<ThreadService> {
}
}
@Override
void onStart() {
startThreads(threadableStages);
sendInitializingSignal(threadableStages);
}
@Override
void onExecute() {
sendStartingSignal(threadableStages);
......
......@@ -63,13 +63,7 @@ public class Traverser {
public void traverse(final Stage stage) {
VisitorBehavior behavior = traverserVisitor.visit(stage);
if (behavior == VisitorBehavior.STOP) {
return;
}
if (!visitedStages.add(stage)) {
// || stage.getCurrentState() != StageState.CREATED
// do not visit (1) an already visited stage and (2) a stage that currently run (runtime visiting)
if (behavior == VisitorBehavior.STOP || !visitedStages.add(stage)) {
return;
}
......
......@@ -23,7 +23,6 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.junit.Ignore;
import org.junit.Test;
import teetime.framework.Configuration;
......@@ -34,7 +33,7 @@ import teetime.stage.CollectorSink;
import teetime.stage.InitialElementProducer;
import teetime.util.framework.port.PortAction;
@Ignore
//@Ignore
public class DynamicDistributorTest {
@Test
......
......@@ -22,19 +22,17 @@ import static org.junit.Assert.assertTrue;
import java.util.Arrays;
import java.util.List;
import org.junit.Ignore;
import org.junit.Test;
import teetime.framework.Configuration;
import teetime.framework.RuntimeServiceFacade;
import teetime.framework.Execution;
import teetime.framework.RuntimeServiceFacade;
import teetime.framework.exceptionHandling.TerminatingExceptionListenerFactory;
import teetime.stage.CollectorSink;
import teetime.stage.InitialElementProducer;
import teetime.stage.basic.merger.strategy.BusyWaitingRoundRobinStrategy;
import teetime.util.framework.port.PortAction;
@Ignore
public class DynamicMergerTest {
@Test
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment