Skip to content
Snippets Groups Projects
Commit 11e116df authored by Christian Wulf's avatar Christian Wulf
Browse files

added ControlledDistributor + test

parent fe89ecb1
No related branches found
No related tags found
No related merge requests found
package teetime.stage.basic.distributor;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import org.jctools.queues.QueueFactory;
import org.jctools.queues.spec.ConcurrentQueueSpec;
import org.jctools.queues.spec.Ordering;
import org.jctools.queues.spec.Preference;
import teetime.framework.AbstractStage;
import teetime.framework.InputPort;
import teetime.framework.OutputPort;
import teetime.framework.Stage;
import teetime.framework.exceptionHandling.AbstractExceptionListener.FurtherExecution;
import teetime.framework.exceptionHandling.StageException;
import teetime.framework.pipe.SingleElementPipeFactory;
import teetime.framework.pipe.SpScPipeFactory;
import teetime.stage.basic.distributor.DynamicPortActionContainer.DynamicPortAction;
import teetime.util.concurrent.queue.PCBlockingQueue;
import teetime.util.concurrent.queue.putstrategy.PutStrategy;
import teetime.util.concurrent.queue.putstrategy.YieldPutStrategy;
import teetime.util.concurrent.queue.takestrategy.SCParkTakeStrategy;
import teetime.util.concurrent.queue.takestrategy.TakeStrategy;
public class ControlledDistributor<T> extends AbstractStage {
private static final SpScPipeFactory spScPipeFactory = new SpScPipeFactory();
private static final SingleElementPipeFactory intraPipeFactory = new SingleElementPipeFactory();
// private final InputPort<DynamicPortActionContainer<T>> dynamicPortActionInputPort = createInputPort();
private final InputPort<T> inputPort = createInputPort();
private final OutputPort<T> outputPort = createOutputPort();
private final BlockingQueue<DynamicPortActionContainer<T>> actions;
public ControlledDistributor() {
final Queue<DynamicPortActionContainer<T>> localQueue = QueueFactory.newQueue(new ConcurrentQueueSpec(1, 1, 0, Ordering.FIFO, Preference.THROUGHPUT));
final PutStrategy<DynamicPortActionContainer<T>> putStrategy = new YieldPutStrategy<DynamicPortActionContainer<T>>();
final TakeStrategy<DynamicPortActionContainer<T>> takeStrategy = new SCParkTakeStrategy<DynamicPortActionContainer<T>>();
actions = new PCBlockingQueue<DynamicPortActionContainer<T>>(localQueue, putStrategy, takeStrategy);
}
@Override
public void onStarting() throws Exception {
getDistributor(outputPort); // throws an ClassCastException if it is not a distributor
super.onStarting();
}
@Override
// first, receive exact one element from the inputPort
// second, receive exact one element from the dynamicPortActionInputPort
// next, repeat in this order
protected void executeStage() {
T element = inputPort.receive();
if (null == element) {
returnNoElement();
}
passToDistributor(element);
try {
// DynamicPortActionContainer<T> dynamicPortAction = dynamicPortActionInputPort.receive();
DynamicPortActionContainer<T> dynamicPortAction = actions.take();
// DynamicPortActionContainer<T> dynamicPortAction = actions.poll();
if (null == dynamicPortAction) {
returnNoElement();
}
checkForOutputPortChange(dynamicPortAction);
} catch (InterruptedException e) {
final FurtherExecution furtherExecution = exceptionHandler.onStageException(e, this);
if (furtherExecution == FurtherExecution.TERMINATE) {
throw new StageException(e, this);
}
}
}
private void checkForOutputPortChange(final DynamicPortActionContainer<T> dynamicPortAction) {
System.out.println("" + dynamicPortAction.getDynamicPortAction());
switch (dynamicPortAction.getDynamicPortAction()) {
case CREATE:
Distributor<T> distributor = getDistributor(outputPort);
OutputPort<T> newOutputPort = distributor.getNewOutputPort();
InputPort<T> newInputPort = dynamicPortAction.getInputPort();
// spScPipeFactory.create(newOutputPort, newInputPort);
intraPipeFactory.create(newOutputPort, newInputPort); // FIXME should be inter, but requires sending init and start signal
break;
case REMOVE:
// TODO implement "remove port at runtime"
break;
default:
if (logger.isWarnEnabled()) {
logger.warn("Unhandled switch case of " + DynamicPortAction.class.getName() + ": " + dynamicPortAction.getDynamicPortAction());
}
break;
}
}
@SuppressWarnings("unchecked")
private Distributor<T> getDistributor(final OutputPort<T> outputPort2) {
final Stage owningStage = outputPort.getPipe().getTargetPort().getOwningStage();
return (Distributor<T>) owningStage;
}
private void passToDistributor(final T element) {
System.out.println("Passing " + element);
outputPort.send(element);
}
public InputPort<T> getInputPort() {
return inputPort;
}
// public InputPort<DynamicPortActionContainer<T>> getDynamicPortActionInputPort() {
// return dynamicPortActionInputPort;
// }
public OutputPort<T> getOutputPort() {
return outputPort;
}
public Queue<DynamicPortActionContainer<T>> getActions() {
return actions;
}
}
......@@ -3,35 +3,12 @@ package teetime.stage.basic.distributor;
import teetime.framework.InputPort;
import teetime.framework.OutputPort;
import teetime.framework.pipe.SpScPipeFactory;
import teetime.stage.basic.distributor.DynamicPortActionContainer.DynamicPortAction;
public class DynamicDistributor<T> extends Distributor<T> {
private static final SpScPipeFactory spScPipeFactory = new SpScPipeFactory();
public enum DynamicPortAction {
CREATE, REMOVE;
}
public static class DynamicPortActionContainer<T> {
private final DynamicPortAction dynamicPortAction;
private final InputPort<T> inputPort;
public DynamicPortActionContainer(final DynamicPortAction dynamicPortAction, final InputPort<T> inputPort) {
super();
this.dynamicPortAction = dynamicPortAction;
this.inputPort = inputPort;
}
public DynamicPortAction getDynamicPortAction() {
return dynamicPortAction;
}
public InputPort<T> getInputPort() {
return inputPort;
}
}
@SuppressWarnings("rawtypes")
private final InputPort<DynamicPortActionContainer> dynamicPortActionInputPort = createInputPort(DynamicPortActionContainer.class);
......@@ -39,10 +16,10 @@ public class DynamicDistributor<T> extends Distributor<T> {
@Override
protected void execute(final T element) {
DynamicPortActionContainer<T> dynamicPortAction = dynamicPortActionInputPort.receive();
switch (dynamicPortAction.dynamicPortAction) {
switch (dynamicPortAction.getDynamicPortAction()) {
case CREATE:
OutputPort<T> newOutputPort = createOutputPort();
InputPort<T> newInputPort = dynamicPortAction.inputPort;
InputPort<T> newInputPort = dynamicPortAction.getInputPort();
spScPipeFactory.create(newOutputPort, newInputPort);
break;
case REMOVE:
......@@ -50,7 +27,7 @@ public class DynamicDistributor<T> extends Distributor<T> {
break;
default:
if (logger.isWarnEnabled()) {
logger.warn("Unhandled switch case of " + DynamicPortAction.class.getName() + ": " + dynamicPortAction.dynamicPortAction);
logger.warn("Unhandled switch case of " + DynamicPortAction.class.getName() + ": " + dynamicPortAction.getDynamicPortAction());
}
break;
}
......
package teetime.stage.basic.distributor;
import teetime.framework.InputPort;
public class DynamicPortActionContainer<T> {
public enum DynamicPortAction {
CREATE, REMOVE;
}
private final DynamicPortAction dynamicPortAction;
private final InputPort<T> inputPort;
public DynamicPortActionContainer(final DynamicPortAction dynamicPortAction, final InputPort<T> inputPort) {
super();
this.dynamicPortAction = dynamicPortAction;
this.inputPort = inputPort;
}
public DynamicPortAction getDynamicPortAction() {
return dynamicPortAction;
}
public InputPort<T> getInputPort() {
return inputPort;
}
}
package teetime.stage.basic.distributor;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
import static org.junit.Assert.assertThat;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.junit.Before;
import org.junit.Test;
import teetime.framework.Analysis;
import teetime.framework.AnalysisConfiguration;
import teetime.framework.Stage;
import teetime.stage.CollectorSink;
import teetime.stage.InitialElementProducer;
import teetime.stage.basic.distributor.DynamicPortActionContainer.DynamicPortAction;
public class ControlledDistributorTest {
// private ControlledDistributor<Integer> controlledDistributor;
@Before
public void setUp() throws Exception {
// controlledDistributor = new ControlledDistributor<Integer>();
}
@Test
public void shouldWorkWithoutActionTriggers() throws Exception {
DynamicPortActionContainer<Integer> createAction = new DynamicPortActionContainer<Integer>(
DynamicPortAction.REMOVE, null);
List<Integer> inputNumbers = Arrays.asList(0, 1, 2, 3, 4);
@SuppressWarnings("unchecked")
List<DynamicPortActionContainer<Integer>> inputActions = Arrays.asList(createAction, createAction, createAction, createAction, createAction);
ControlledDistributorTestConfig<Integer> config = new ControlledDistributorTestConfig<Integer>(inputNumbers, inputActions);
Analysis<ControlledDistributorTestConfig<Integer>> analysis = new Analysis<ControlledDistributorTestConfig<Integer>>(config);
analysis.executeBlocking();
assertThat(config.getOutputElements(), contains(0, 1, 2, 3, 4));
}
@Test
public void shouldWorkWithActionTriggers() throws Exception {
List<Integer> inputNumbers = Arrays.asList(0, 1, 2, 3, 4);
@SuppressWarnings("unchecked")
DynamicPortActionContainer<Integer>[] inputActions = new DynamicPortActionContainer[5];
for (int i = 0; i < inputActions.length; i++) {
DynamicPortActionContainer<Integer> createAction = new DynamicPortActionContainer<Integer>(
DynamicPortAction.CREATE, new CollectorSink<Integer>().getInputPort());
inputActions[i] = createAction;
}
ControlledDistributorTestConfig<Integer> config = new ControlledDistributorTestConfig<Integer>(inputNumbers, Arrays.asList(inputActions));
Analysis<ControlledDistributorTestConfig<Integer>> analysis = new Analysis<ControlledDistributorTestConfig<Integer>>(config);
analysis.executeBlocking();
for (DynamicPortActionContainer<Integer> ia : inputActions) {
Stage stage = ia.getInputPort().getOwningStage();
@SuppressWarnings("unchecked")
CollectorSink<Integer> collectorSink = (CollectorSink<Integer>) stage;
System.out.println("collectorSink: " + collectorSink.getElements());
}
assertThat(config.getOutputElements(), contains(0, 1));
assertValuesForIndex(inputActions, Arrays.asList(2), 0);
assertValuesForIndex(inputActions, Arrays.asList(3), 1);
assertValuesForIndex(inputActions, Arrays.asList(4), 2);
assertValuesForIndex(inputActions, Collections.<Integer> emptyList(), 3);
assertValuesForIndex(inputActions, Collections.<Integer> emptyList(), 4);
}
private void assertValuesForIndex(final DynamicPortActionContainer<Integer>[] inputActions,
final List<Integer> values, final int index) {
DynamicPortActionContainer<Integer> ia = inputActions[index];
Stage stage = ia.getInputPort().getOwningStage();
@SuppressWarnings("unchecked")
CollectorSink<Integer> collectorSink = (CollectorSink<Integer>) stage;
assertThat(collectorSink.getElements(), is(values));
}
private static class ControlledDistributorTestConfig<T> extends AnalysisConfiguration {
private final CollectorSink<T> collectorSink;
public ControlledDistributorTestConfig(final List<T> elements, final List<DynamicPortActionContainer<T>> actions) {
InitialElementProducer<T> initialElementProducer = new InitialElementProducer<T>(elements);
// InitialElementProducer<DynamicPortActionContainer<T>> initialActionProducer = new InitialElementProducer<DynamicPortActionContainer<T>>(actions);
ControlledDistributor<T> controlledDistributor = new ControlledDistributor<T>();
Distributor<T> distributor = new Distributor<T>();
collectorSink = new CollectorSink<T>();
connectPorts(initialElementProducer.getOutputPort(), controlledDistributor.getInputPort());
// connectPorts(initialActionProducer.getOutputPort(), controlledDistributor.getDynamicPortActionInputPort());
connectPorts(controlledDistributor.getOutputPort(), distributor.getInputPort());
connectPorts(distributor.getNewOutputPort(), collectorSink.getInputPort());
addThreadableStage(initialElementProducer);
// addThreadableStage(initialActionProducer); // simulates the AdaptationThread
addThreadableStage(controlledDistributor);
addThreadableStage(collectorSink);
controlledDistributor.getActions().addAll(actions);
}
public List<T> getOutputElements() {
return collectorSink.getElements();
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment