Skip to content
Snippets Groups Projects
Commit 96f3df55 authored by Christian Wulf's avatar Christian Wulf
Browse files

refactoring

parent 19c75489
No related branches found
No related tags found
No related merge requests found
...@@ -9,15 +9,11 @@ import org.jctools.queues.spec.Ordering; ...@@ -9,15 +9,11 @@ import org.jctools.queues.spec.Ordering;
import org.jctools.queues.spec.Preference; import org.jctools.queues.spec.Preference;
import teetime.framework.AbstractStage; import teetime.framework.AbstractStage;
import teetime.framework.DynamicActuator;
import teetime.framework.InputPort; import teetime.framework.InputPort;
import teetime.framework.OutputPort; import teetime.framework.OutputPort;
import teetime.framework.Stage; import teetime.framework.Stage;
import teetime.framework.exceptionHandling.AbstractExceptionListener.FurtherExecution; import teetime.framework.exceptionHandling.AbstractExceptionListener.FurtherExecution;
import teetime.framework.exceptionHandling.StageException; import teetime.framework.exceptionHandling.StageException;
import teetime.framework.pipe.SpScPipeFactory;
import teetime.framework.signal.InitializingSignal;
import teetime.framework.signal.StartingSignal;
import teetime.stage.basic.distributor.DynamicPortActionContainer.DynamicPortAction; import teetime.stage.basic.distributor.DynamicPortActionContainer.DynamicPortAction;
import teetime.util.concurrent.queue.PCBlockingQueue; import teetime.util.concurrent.queue.PCBlockingQueue;
import teetime.util.concurrent.queue.putstrategy.PutStrategy; import teetime.util.concurrent.queue.putstrategy.PutStrategy;
...@@ -27,8 +23,6 @@ import teetime.util.concurrent.queue.takestrategy.TakeStrategy; ...@@ -27,8 +23,6 @@ import teetime.util.concurrent.queue.takestrategy.TakeStrategy;
public class ControlledDistributor<T> extends AbstractStage { public class ControlledDistributor<T> extends AbstractStage {
private static final SpScPipeFactory spScPipeFactory = new SpScPipeFactory();
// private final InputPort<DynamicPortActionContainer<T>> dynamicPortActionInputPort = createInputPort(); // private final InputPort<DynamicPortActionContainer<T>> dynamicPortActionInputPort = createInputPort();
private final InputPort<T> inputPort = createInputPort(); private final InputPort<T> inputPort = createInputPort();
...@@ -77,8 +71,6 @@ public class ControlledDistributor<T> extends AbstractStage { ...@@ -77,8 +71,6 @@ public class ControlledDistributor<T> extends AbstractStage {
} }
} }
private final DynamicActuator dynamicActuator = new DynamicActuator();
private void checkForOutputPortChange(final DynamicPortActionContainer<T> dynamicPortAction) { private void checkForOutputPortChange(final DynamicPortActionContainer<T> dynamicPortAction) {
System.out.println("" + dynamicPortAction.getDynamicPortAction()); System.out.println("" + dynamicPortAction.getDynamicPortAction());
...@@ -86,17 +78,7 @@ public class ControlledDistributor<T> extends AbstractStage { ...@@ -86,17 +78,7 @@ public class ControlledDistributor<T> extends AbstractStage {
case CREATE: case CREATE:
Distributor<T> distributor = getDistributor(outputPort); Distributor<T> distributor = getDistributor(outputPort);
OutputPort<T> newOutputPort = distributor.getNewOutputPort(); OutputPort<T> newOutputPort = distributor.getNewOutputPort();
InputPort<T> newInputPort = dynamicPortAction.getInputPort(); dynamicPortAction.execute(newOutputPort);
spScPipeFactory.create(newOutputPort, newInputPort);
Runnable runnable = dynamicActuator.wrap(newInputPort.getOwningStage());
Thread thread = new Thread(runnable);
thread.start();
newOutputPort.sendSignal(new InitializingSignal());
newOutputPort.sendSignal(new StartingSignal());
// FIXME pass the new thread to the analysis so that it can terminate the thread at the end
break; break;
case REMOVE: case REMOVE:
// TODO implement "remove port at runtime" // TODO implement "remove port at runtime"
......
package teetime.stage.basic.distributor; package teetime.stage.basic.distributor;
import teetime.framework.InputPort; import java.util.Queue;
import org.jctools.queues.QueueFactory;
import org.jctools.queues.spec.ConcurrentQueueSpec;
import org.jctools.queues.spec.Ordering;
import org.jctools.queues.spec.Preference;
import teetime.framework.OutputPort; import teetime.framework.OutputPort;
import teetime.framework.pipe.SpScPipeFactory;
import teetime.stage.basic.distributor.DynamicPortActionContainer.DynamicPortAction; import teetime.stage.basic.distributor.DynamicPortActionContainer.DynamicPortAction;
import teetime.util.concurrent.queue.PCBlockingQueue;
import teetime.util.concurrent.queue.putstrategy.PutStrategy;
import teetime.util.concurrent.queue.putstrategy.YieldPutStrategy;
import teetime.util.concurrent.queue.takestrategy.SCParkTakeStrategy;
import teetime.util.concurrent.queue.takestrategy.TakeStrategy;
public class DynamicDistributor<T> extends Distributor<T> { public class DynamicDistributor<T> extends Distributor<T> {
private static final SpScPipeFactory spScPipeFactory = new SpScPipeFactory(); private final PCBlockingQueue<DynamicPortActionContainer<T>> actions;
@SuppressWarnings("rawtypes") public DynamicDistributor() {
private final InputPort<DynamicPortActionContainer> dynamicPortActionInputPort = createInputPort(DynamicPortActionContainer.class); final Queue<DynamicPortActionContainer<T>> localQueue = QueueFactory.newQueue(new ConcurrentQueueSpec(1, 1, 0, Ordering.FIFO, Preference.THROUGHPUT));
final PutStrategy<DynamicPortActionContainer<T>> putStrategy = new YieldPutStrategy<DynamicPortActionContainer<T>>();
final TakeStrategy<DynamicPortActionContainer<T>> takeStrategy = new SCParkTakeStrategy<DynamicPortActionContainer<T>>();
actions = new PCBlockingQueue<DynamicPortActionContainer<T>>(localQueue, putStrategy, takeStrategy);
}
@SuppressWarnings("unchecked")
@Override @Override
protected void execute(final T element) { protected void execute(final T element) {
DynamicPortActionContainer<T> dynamicPortAction = dynamicPortActionInputPort.receive(); checkForPendingPortActionRequest();
super.execute(element);
}
private void checkForPendingPortActionRequest() {
DynamicPortActionContainer<T> dynamicPortAction = actions.poll();
switch (dynamicPortAction.getDynamicPortAction()) { switch (dynamicPortAction.getDynamicPortAction()) {
case CREATE: case CREATE:
OutputPort<T> newOutputPort = createOutputPort(); OutputPort<T> newOutputPort = createOutputPort();
InputPort<T> newInputPort = dynamicPortAction.getInputPort(); dynamicPortAction.execute(newOutputPort);
spScPipeFactory.create(newOutputPort, newInputPort);
break; break;
case REMOVE: case REMOVE:
// TODO implement "remove port at runtime" // TODO implement "remove port at runtime"
...@@ -31,7 +49,5 @@ public class DynamicDistributor<T> extends Distributor<T> { ...@@ -31,7 +49,5 @@ public class DynamicDistributor<T> extends Distributor<T> {
} }
break; break;
} }
super.execute(element);
} }
} }
package teetime.stage.basic.distributor; package teetime.stage.basic.distributor;
import teetime.framework.DynamicActuator;
import teetime.framework.InputPort; import teetime.framework.InputPort;
import teetime.framework.OutputPort;
import teetime.framework.pipe.SpScPipeFactory;
import teetime.framework.signal.InitializingSignal;
import teetime.framework.signal.StartingSignal;
public class DynamicPortActionContainer<T> { public class DynamicPortActionContainer<T> {
private static final SpScPipeFactory INTER_THREAD_PIPE_FACTORY = new SpScPipeFactory();
private final DynamicActuator dynamicActuator = new DynamicActuator();
public enum DynamicPortAction { public enum DynamicPortAction {
CREATE, REMOVE; CREATE, REMOVE;
} }
...@@ -25,4 +34,17 @@ public class DynamicPortActionContainer<T> { ...@@ -25,4 +34,17 @@ public class DynamicPortActionContainer<T> {
return inputPort; return inputPort;
} }
public void execute(final OutputPort<T> newOutputPort) {
INTER_THREAD_PIPE_FACTORY.create(newOutputPort, inputPort);
Runnable runnable = dynamicActuator.wrap(inputPort.getOwningStage());
Thread thread = new Thread(runnable);
thread.start();
newOutputPort.sendSignal(new InitializingSignal());
newOutputPort.sendSignal(new StartingSignal());
// FIXME pass the new thread to the analysis so that it can terminate the thread at the end
}
} }
...@@ -13,7 +13,6 @@ import org.junit.Test; ...@@ -13,7 +13,6 @@ import org.junit.Test;
import teetime.framework.Analysis; import teetime.framework.Analysis;
import teetime.framework.AnalysisConfiguration; import teetime.framework.AnalysisConfiguration;
import teetime.framework.DynamicActuator;
import teetime.framework.Stage; import teetime.framework.Stage;
import teetime.stage.CollectorSink; import teetime.stage.CollectorSink;
import teetime.stage.InitialElementProducer; import teetime.stage.InitialElementProducer;
...@@ -22,7 +21,6 @@ import teetime.stage.basic.distributor.DynamicPortActionContainer.DynamicPortAct ...@@ -22,7 +21,6 @@ import teetime.stage.basic.distributor.DynamicPortActionContainer.DynamicPortAct
public class ControlledDistributorTest { public class ControlledDistributorTest {
// private ControlledDistributor<Integer> controlledDistributor; // private ControlledDistributor<Integer> controlledDistributor;
private final DynamicActuator dynamicActuator = new DynamicActuator();
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment