Skip to content
Snippets Groups Projects
Commit f3154842 authored by Christian Wulf's avatar Christian Wulf
Browse files

added port actions

parent 96f3df55
No related branches found
No related tags found
No related merge requests found
Showing
with 177 additions and 166 deletions
......@@ -266,7 +266,7 @@ public abstract class AbstractStage extends Stage {
return outputPort;
}
private <T> T[] addElementToArray(final T element, final T[] srcArray) {
private <T, E extends T> T[] addElementToArray(final E element, final T[] srcArray) {
T[] newOutputPorts = Arrays.copyOf(srcArray, srcArray.length + 1);
newOutputPorts[srcArray.length] = element;
return newOutputPorts;
......@@ -302,4 +302,24 @@ public abstract class AbstractStage extends Stage {
return TerminationStrategy.BY_SIGNAL;
}
protected <T> DynamicOutputPort<T> createDynamicOutputPort() {
final DynamicOutputPort<T> outputPort = new DynamicOutputPort<T>(null, this, outputPorts.length);
outputPorts = addElementToArray(outputPort, outputPorts);
return outputPort;
}
@Override
protected void removeDynamicPort(final DynamicOutputPort<?> dynamicOutputPort) {
int index = dynamicOutputPort.getIndex();
List<OutputPort<?>> tempOutputPorts = Arrays.asList(outputPorts);
tempOutputPorts.remove(index);
for (int i = index; i < tempOutputPorts.size(); i++) {
OutputPort<?> outputPort = tempOutputPorts.get(i);
if (outputPort instanceof DynamicOutputPort) {
((DynamicOutputPort<?>) outputPort).setIndex(i);
}
}
outputPorts = tempOutputPorts.toArray(new OutputPort[0]);
}
}
package teetime.framework;
/**
*
* @author Christian Wulf
*
* @param <T>
* the type of elements to be sent
*
* @since 1.2
*/
public final class DynamicOutputPort<T> extends OutputPort<T> {
private int index;
DynamicOutputPort(final Class<T> type, final Stage owningStage, final int index) {
super(type, owningStage);
this.index = index;
}
public int getIndex() {
return index;
}
public void setIndex(final int index) {
this.index = index;
}
}
......@@ -27,7 +27,7 @@ import teetime.framework.signal.TerminatingSignal;
*
* @since 1.0
*/
public final class OutputPort<T> extends AbstractPort<T> {
public class OutputPort<T> extends AbstractPort<T> {
OutputPort(final Class<T> type, final Stage owningStage, final String portName) {
super(type, owningStage, portName);
......
......@@ -148,4 +148,6 @@ public abstract class Stage {
this.exceptionHandler = exceptionHandler;
}
protected abstract void removeDynamicPort(DynamicOutputPort<?> dynamicOutputPort);
}
package teetime.stage.basic.distributor;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import org.jctools.queues.QueueFactory;
import org.jctools.queues.spec.ConcurrentQueueSpec;
import org.jctools.queues.spec.Ordering;
import org.jctools.queues.spec.Preference;
import teetime.framework.AbstractStage;
import teetime.framework.InputPort;
import teetime.framework.OutputPort;
import teetime.framework.Stage;
import teetime.framework.exceptionHandling.AbstractExceptionListener.FurtherExecution;
import teetime.framework.exceptionHandling.StageException;
import teetime.stage.basic.distributor.DynamicPortActionContainer.DynamicPortAction;
import teetime.util.concurrent.queue.PCBlockingQueue;
import teetime.util.concurrent.queue.putstrategy.PutStrategy;
import teetime.util.concurrent.queue.putstrategy.YieldPutStrategy;
import teetime.util.concurrent.queue.takestrategy.SCParkTakeStrategy;
import teetime.util.concurrent.queue.takestrategy.TakeStrategy;
public class ControlledDistributor<T> extends AbstractStage {
// private final InputPort<DynamicPortActionContainer<T>> dynamicPortActionInputPort = createInputPort();
private final InputPort<T> inputPort = createInputPort();
private final OutputPort<T> outputPort = createOutputPort();
private final BlockingQueue<DynamicPortActionContainer<T>> actions;
public ControlledDistributor() {
final Queue<DynamicPortActionContainer<T>> localQueue = QueueFactory.newQueue(new ConcurrentQueueSpec(1, 1, 0, Ordering.FIFO, Preference.THROUGHPUT));
final PutStrategy<DynamicPortActionContainer<T>> putStrategy = new YieldPutStrategy<DynamicPortActionContainer<T>>();
final TakeStrategy<DynamicPortActionContainer<T>> takeStrategy = new SCParkTakeStrategy<DynamicPortActionContainer<T>>();
actions = new PCBlockingQueue<DynamicPortActionContainer<T>>(localQueue, putStrategy, takeStrategy);
}
@Override
public void onStarting() throws Exception {
getDistributor(outputPort); // throws an ClassCastException if it is not a distributor
super.onStarting();
}
@Override
// first, receive exact one element from the inputPort
// second, receive exact one element from the dynamicPortActionInputPort
// next, repeat in this order
protected void executeStage() {
T element = inputPort.receive();
if (null == element) {
returnNoElement();
}
passToDistributor(element);
try {
// DynamicPortActionContainer<T> dynamicPortAction = dynamicPortActionInputPort.receive();
DynamicPortActionContainer<T> dynamicPortAction = actions.take();
// DynamicPortActionContainer<T> dynamicPortAction = actions.poll();
if (null == dynamicPortAction) {
returnNoElement();
}
checkForOutputPortChange(dynamicPortAction);
} catch (InterruptedException e) {
final FurtherExecution furtherExecution = exceptionHandler.onStageException(e, this);
if (furtherExecution == FurtherExecution.TERMINATE) {
throw new StageException(e, this);
}
}
}
private void checkForOutputPortChange(final DynamicPortActionContainer<T> dynamicPortAction) {
System.out.println("" + dynamicPortAction.getDynamicPortAction());
switch (dynamicPortAction.getDynamicPortAction()) {
case CREATE:
Distributor<T> distributor = getDistributor(outputPort);
OutputPort<T> newOutputPort = distributor.getNewOutputPort();
dynamicPortAction.execute(newOutputPort);
break;
case REMOVE:
// TODO implement "remove port at runtime"
break;
default:
if (logger.isWarnEnabled()) {
logger.warn("Unhandled switch case of " + DynamicPortAction.class.getName() + ": " + dynamicPortAction.getDynamicPortAction());
}
break;
}
}
@SuppressWarnings("unchecked")
private Distributor<T> getDistributor(final OutputPort<T> outputPort2) {
final Stage owningStage = outputPort.getPipe().getTargetPort().getOwningStage();
return (Distributor<T>) owningStage;
}
private void passToDistributor(final T element) {
System.out.println("Passing " + element);
outputPort.send(element);
}
public InputPort<T> getInputPort() {
return inputPort;
}
// public InputPort<DynamicPortActionContainer<T>> getDynamicPortActionInputPort() {
// return dynamicPortActionInputPort;
// }
public OutputPort<T> getOutputPort() {
return outputPort;
}
public Queue<DynamicPortActionContainer<T>> getActions() {
return actions;
}
}
package teetime.stage.basic.distributor.dynamic;
public class ControlledDynamicDistributor<T> extends DynamicDistributor<T> {
@Override
protected PortAction<T> getPortAction() throws InterruptedException {
return portActions.take();
}
}
package teetime.stage.basic.distributor;
package teetime.stage.basic.distributor.dynamic;
import teetime.framework.DynamicActuator;
import teetime.framework.InputPort;
......@@ -7,37 +7,29 @@ import teetime.framework.pipe.SpScPipeFactory;
import teetime.framework.signal.InitializingSignal;
import teetime.framework.signal.StartingSignal;
public class DynamicPortActionContainer<T> {
public class CreatePortAction<T> implements PortAction<T> {
private static final SpScPipeFactory INTER_THREAD_PIPE_FACTORY = new SpScPipeFactory();
private static final DynamicActuator DYNAMIC_ACTUATOR = new DynamicActuator();
private final DynamicActuator dynamicActuator = new DynamicActuator();
public enum DynamicPortAction {
CREATE, REMOVE;
}
private final DynamicPortAction dynamicPortAction;
private final InputPort<T> inputPort;
public DynamicPortActionContainer(final DynamicPortAction dynamicPortAction, final InputPort<T> inputPort) {
public CreatePortAction(final InputPort<T> inputPort) {
super();
this.dynamicPortAction = dynamicPortAction;
this.inputPort = inputPort;
}
public DynamicPortAction getDynamicPortAction() {
return dynamicPortAction;
}
public InputPort<T> getInputPort() {
return inputPort;
}
public void execute(final OutputPort<T> newOutputPort) {
@Override
public void execute(final DynamicDistributor<T> dynamicDistributor) {
OutputPort<? extends T> newOutputPort = dynamicDistributor.getNewOutputPort();
INTER_THREAD_PIPE_FACTORY.create(newOutputPort, inputPort);
Runnable runnable = dynamicActuator.wrap(inputPort.getOwningStage());
Runnable runnable = DYNAMIC_ACTUATOR.wrap(inputPort.getOwningStage());
Thread thread = new Thread(runnable);
thread.start();
......@@ -46,5 +38,4 @@ public class DynamicPortActionContainer<T> {
// FIXME pass the new thread to the analysis so that it can terminate the thread at the end
}
}
package teetime.stage.basic.distributor.dynamic;
public class DoNothingPortAction<T> implements PortAction<T> {
@Override
public void execute(final DynamicDistributor<T> dynamicDistributor) {
// do nothing for testing purpose
}
}
package teetime.stage.basic.distributor;
package teetime.stage.basic.distributor.dynamic;
import java.util.Queue;
......@@ -7,8 +7,8 @@ import org.jctools.queues.spec.ConcurrentQueueSpec;
import org.jctools.queues.spec.Ordering;
import org.jctools.queues.spec.Preference;
import teetime.framework.OutputPort;
import teetime.stage.basic.distributor.DynamicPortActionContainer.DynamicPortAction;
import teetime.framework.DynamicOutputPort;
import teetime.stage.basic.distributor.Distributor;
import teetime.util.concurrent.queue.PCBlockingQueue;
import teetime.util.concurrent.queue.putstrategy.PutStrategy;
import teetime.util.concurrent.queue.putstrategy.YieldPutStrategy;
......@@ -17,37 +17,44 @@ import teetime.util.concurrent.queue.takestrategy.TakeStrategy;
public class DynamicDistributor<T> extends Distributor<T> {
private final PCBlockingQueue<DynamicPortActionContainer<T>> actions;
protected final PCBlockingQueue<PortAction<T>> portActions;
public DynamicDistributor() {
final Queue<DynamicPortActionContainer<T>> localQueue = QueueFactory.newQueue(new ConcurrentQueueSpec(1, 1, 0, Ordering.FIFO, Preference.THROUGHPUT));
final PutStrategy<DynamicPortActionContainer<T>> putStrategy = new YieldPutStrategy<DynamicPortActionContainer<T>>();
final TakeStrategy<DynamicPortActionContainer<T>> takeStrategy = new SCParkTakeStrategy<DynamicPortActionContainer<T>>();
actions = new PCBlockingQueue<DynamicPortActionContainer<T>>(localQueue, putStrategy, takeStrategy);
final Queue<PortAction<T>> localQueue = QueueFactory.newQueue(new ConcurrentQueueSpec(1, 1, 0, Ordering.FIFO, Preference.THROUGHPUT));
final PutStrategy<PortAction<T>> putStrategy = new YieldPutStrategy<PortAction<T>>();
final TakeStrategy<PortAction<T>> takeStrategy = new SCParkTakeStrategy<PortAction<T>>();
portActions = new PCBlockingQueue<PortAction<T>>(localQueue, putStrategy, takeStrategy);
}
@Override
protected void execute(final T element) {
checkForPendingPortActionRequest();
try {
checkForPendingPortActionRequest();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
super.execute(element);
}
private void checkForPendingPortActionRequest() {
DynamicPortActionContainer<T> dynamicPortAction = actions.poll();
switch (dynamicPortAction.getDynamicPortAction()) {
case CREATE:
OutputPort<T> newOutputPort = createOutputPort();
dynamicPortAction.execute(newOutputPort);
break;
case REMOVE:
// TODO implement "remove port at runtime"
break;
default:
if (logger.isWarnEnabled()) {
logger.warn("Unhandled switch case of " + DynamicPortAction.class.getName() + ": " + dynamicPortAction.getDynamicPortAction());
}
break;
private void checkForPendingPortActionRequest() throws InterruptedException {
PortAction<T> dynamicPortAction = getPortAction();
if (null != dynamicPortAction) {
dynamicPortAction.execute(this);
}
}
protected PortAction<T> getPortAction() throws InterruptedException {
return portActions.poll();
}
@Override
public void removeDynamicPort(final DynamicOutputPort<?> dynamicOutputPort) {
super.removeDynamicPort(dynamicOutputPort);
}
public boolean addPortActionRequest(final PortAction<T> newPortActionRequest) {
return portActions.offer(newPortActionRequest);
}
}
package teetime.stage.basic.distributor.dynamic;
public interface PortAction<T> {
public abstract void execute(final DynamicDistributor<T> dynamicDistributor);
}
......@@ -16,7 +16,11 @@ import teetime.framework.AnalysisConfiguration;
import teetime.framework.Stage;
import teetime.stage.CollectorSink;
import teetime.stage.InitialElementProducer;
import teetime.stage.basic.distributor.DynamicPortActionContainer.DynamicPortAction;
import teetime.stage.basic.distributor.dynamic.ControlledDynamicDistributor;
import teetime.stage.basic.distributor.dynamic.CreatePortAction;
import teetime.stage.basic.distributor.dynamic.DoNothingPortAction;
import teetime.stage.basic.distributor.dynamic.DynamicDistributor;
import teetime.stage.basic.distributor.dynamic.PortAction;
public class ControlledDistributorTest {
......@@ -29,12 +33,11 @@ public class ControlledDistributorTest {
@Test
public void shouldWorkWithoutActionTriggers() throws Exception {
DynamicPortActionContainer<Integer> createAction = new DynamicPortActionContainer<Integer>(
DynamicPortAction.REMOVE, null);
PortAction<Integer> createAction = new DoNothingPortAction<Integer>();
List<Integer> inputNumbers = Arrays.asList(0, 1, 2, 3, 4);
@SuppressWarnings("unchecked")
List<DynamicPortActionContainer<Integer>> inputActions = Arrays.asList(createAction, createAction, createAction, createAction, createAction);
List<PortAction<Integer>> inputActions = Arrays.asList(createAction, createAction, createAction, createAction, createAction);
ControlledDistributorTestConfig<Integer> config = new ControlledDistributorTestConfig<Integer>(inputNumbers, inputActions);
Analysis<ControlledDistributorTestConfig<Integer>> analysis = new Analysis<ControlledDistributorTestConfig<Integer>>(config);
......@@ -49,7 +52,7 @@ public class ControlledDistributorTest {
List<Integer> inputNumbers = Arrays.asList(0, 1, 2, 3, 4);
@SuppressWarnings("unchecked")
DynamicPortActionContainer<Integer>[] inputActions = new DynamicPortActionContainer[5];
PortAction<Integer>[] inputActions = new PortAction[5];
for (int i = 0; i < inputActions.length; i++) {
CollectorSink<Integer> newStage = new CollectorSink<Integer>();
......@@ -57,8 +60,7 @@ public class ControlledDistributorTest {
// Thread thread = new Thread(runnable);
// thread.start();
DynamicPortActionContainer<Integer> createAction = new DynamicPortActionContainer<Integer>(
DynamicPortAction.CREATE, newStage.getInputPort());
PortAction<Integer> createAction = new CreatePortAction<Integer>(newStage.getInputPort());
inputActions[i] = createAction;
}
......@@ -67,25 +69,25 @@ public class ControlledDistributorTest {
analysis.executeBlocking();
for (DynamicPortActionContainer<Integer> ia : inputActions) {
Stage stage = ia.getInputPort().getOwningStage();
for (PortAction<Integer> ia : inputActions) {
Stage stage = ((CreatePortAction<Integer>) ia).getInputPort().getOwningStage();
@SuppressWarnings("unchecked")
CollectorSink<Integer> collectorSink = (CollectorSink<Integer>) stage;
System.out.println("collectorSink: " + collectorSink.getElements());
}
assertThat(config.getOutputElements(), contains(0, 1));
assertValuesForIndex(inputActions, Arrays.asList(2), 0);
assertValuesForIndex(inputActions, Arrays.asList(3), 1);
assertValuesForIndex(inputActions, Arrays.asList(4), 2);
assertValuesForIndex(inputActions, Collections.<Integer> emptyList(), 3);
assertThat(config.getOutputElements(), contains(0));
assertValuesForIndex(inputActions, Arrays.asList(1), 0);
assertValuesForIndex(inputActions, Arrays.asList(2), 1);
assertValuesForIndex(inputActions, Arrays.asList(3), 2);
assertValuesForIndex(inputActions, Arrays.asList(4), 3);
assertValuesForIndex(inputActions, Collections.<Integer> emptyList(), 4);
}
private void assertValuesForIndex(final DynamicPortActionContainer<Integer>[] inputActions,
private void assertValuesForIndex(final PortAction<Integer>[] inputActions,
final List<Integer> values, final int index) {
DynamicPortActionContainer<Integer> ia = inputActions[index];
Stage stage = ia.getInputPort().getOwningStage();
PortAction<Integer> ia = inputActions[index];
Stage stage = ((CreatePortAction<Integer>) ia).getInputPort().getOwningStage();
@SuppressWarnings("unchecked")
CollectorSink<Integer> collectorSink = (CollectorSink<Integer>) stage;
assertThat(collectorSink.getElements(), is(values));
......@@ -95,24 +97,24 @@ public class ControlledDistributorTest {
private final CollectorSink<T> collectorSink;
public ControlledDistributorTestConfig(final List<T> elements, final List<DynamicPortActionContainer<T>> actions) {
public ControlledDistributorTestConfig(final List<T> elements, final List<PortAction<T>> portActions) {
InitialElementProducer<T> initialElementProducer = new InitialElementProducer<T>(elements);
// InitialElementProducer<DynamicPortActionContainer<T>> initialActionProducer = new InitialElementProducer<DynamicPortActionContainer<T>>(actions);
ControlledDistributor<T> controlledDistributor = new ControlledDistributor<T>();
Distributor<T> distributor = new Distributor<T>();
// InitialElementProducer<PortAction<T>> initialActionProducer = new InitialElementProducer<PortAction<T>>(actions);
DynamicDistributor<T> distributor = new ControlledDynamicDistributor<T>();
collectorSink = new CollectorSink<T>();
connectPorts(initialElementProducer.getOutputPort(), controlledDistributor.getInputPort());
connectPorts(initialElementProducer.getOutputPort(), distributor.getInputPort());
// connectPorts(initialActionProducer.getOutputPort(), controlledDistributor.getDynamicPortActionInputPort());
connectPorts(controlledDistributor.getOutputPort(), distributor.getInputPort());
connectPorts(distributor.getNewOutputPort(), collectorSink.getInputPort());
addThreadableStage(initialElementProducer);
// addThreadableStage(initialActionProducer); // simulates the AdaptationThread
addThreadableStage(controlledDistributor);
addThreadableStage(distributor);
addThreadableStage(collectorSink);
controlledDistributor.getActions().addAll(actions);
for (PortAction<T> a : portActions) {
distributor.addPortActionRequest(a);
}
}
public List<T> getOutputElements() {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment