Skip to content
Snippets Groups Projects
Commit b13da110 authored by Christian Wulf's avatar Christian Wulf
Browse files

added port actions

parent 9c782dca
No related branches found
No related tags found
No related merge requests found
Showing
with 177 additions and 166 deletions
...@@ -194,7 +194,7 @@ public abstract class AbstractStage extends Stage { ...@@ -194,7 +194,7 @@ public abstract class AbstractStage extends Stage {
return outputPort; return outputPort;
} }
private <T> T[] addElementToArray(final T element, final T[] srcArray) { private <T, E extends T> T[] addElementToArray(final E element, final T[] srcArray) {
T[] newOutputPorts = Arrays.copyOf(srcArray, srcArray.length + 1); T[] newOutputPorts = Arrays.copyOf(srcArray, srcArray.length + 1);
newOutputPorts[srcArray.length] = element; newOutputPorts[srcArray.length] = element;
return newOutputPorts; return newOutputPorts;
...@@ -230,4 +230,24 @@ public abstract class AbstractStage extends Stage { ...@@ -230,4 +230,24 @@ public abstract class AbstractStage extends Stage {
return TerminationStrategy.BY_SIGNAL; return TerminationStrategy.BY_SIGNAL;
} }
protected <T> DynamicOutputPort<T> createDynamicOutputPort() {
final DynamicOutputPort<T> outputPort = new DynamicOutputPort<T>(null, this, outputPorts.length);
outputPorts = addElementToArray(outputPort, outputPorts);
return outputPort;
}
@Override
protected void removeDynamicPort(final DynamicOutputPort<?> dynamicOutputPort) {
int index = dynamicOutputPort.getIndex();
List<OutputPort<?>> tempOutputPorts = Arrays.asList(outputPorts);
tempOutputPorts.remove(index);
for (int i = index; i < tempOutputPorts.size(); i++) {
OutputPort<?> outputPort = tempOutputPorts.get(i);
if (outputPort instanceof DynamicOutputPort) {
((DynamicOutputPort<?>) outputPort).setIndex(i);
}
}
outputPorts = tempOutputPorts.toArray(new OutputPort[0]);
}
} }
package teetime.framework;
/**
*
* @author Christian Wulf
*
* @param <T>
* the type of elements to be sent
*
* @since 1.2
*/
public final class DynamicOutputPort<T> extends OutputPort<T> {
private int index;
DynamicOutputPort(final Class<T> type, final Stage owningStage, final int index) {
super(type, owningStage);
this.index = index;
}
public int getIndex() {
return index;
}
public void setIndex(final int index) {
this.index = index;
}
}
...@@ -27,7 +27,7 @@ import teetime.framework.signal.TerminatingSignal; ...@@ -27,7 +27,7 @@ import teetime.framework.signal.TerminatingSignal;
* *
* @since 1.0 * @since 1.0
*/ */
public final class OutputPort<T> extends AbstractPort<T> { public class OutputPort<T> extends AbstractPort<T> {
OutputPort(final Class<T> type, final Stage owningStage) { OutputPort(final Class<T> type, final Stage owningStage) {
super(type, owningStage); super(type, owningStage);
......
...@@ -148,4 +148,6 @@ public abstract class Stage { ...@@ -148,4 +148,6 @@ public abstract class Stage {
this.exceptionHandler = exceptionHandler; this.exceptionHandler = exceptionHandler;
} }
protected abstract void removeDynamicPort(DynamicOutputPort<?> dynamicOutputPort);
} }
package teetime.stage.basic.distributor;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import org.jctools.queues.QueueFactory;
import org.jctools.queues.spec.ConcurrentQueueSpec;
import org.jctools.queues.spec.Ordering;
import org.jctools.queues.spec.Preference;
import teetime.framework.AbstractStage;
import teetime.framework.InputPort;
import teetime.framework.OutputPort;
import teetime.framework.Stage;
import teetime.framework.exceptionHandling.AbstractExceptionListener.FurtherExecution;
import teetime.framework.exceptionHandling.StageException;
import teetime.stage.basic.distributor.DynamicPortActionContainer.DynamicPortAction;
import teetime.util.concurrent.queue.PCBlockingQueue;
import teetime.util.concurrent.queue.putstrategy.PutStrategy;
import teetime.util.concurrent.queue.putstrategy.YieldPutStrategy;
import teetime.util.concurrent.queue.takestrategy.SCParkTakeStrategy;
import teetime.util.concurrent.queue.takestrategy.TakeStrategy;
public class ControlledDistributor<T> extends AbstractStage {
// private final InputPort<DynamicPortActionContainer<T>> dynamicPortActionInputPort = createInputPort();
private final InputPort<T> inputPort = createInputPort();
private final OutputPort<T> outputPort = createOutputPort();
private final BlockingQueue<DynamicPortActionContainer<T>> actions;
public ControlledDistributor() {
final Queue<DynamicPortActionContainer<T>> localQueue = QueueFactory.newQueue(new ConcurrentQueueSpec(1, 1, 0, Ordering.FIFO, Preference.THROUGHPUT));
final PutStrategy<DynamicPortActionContainer<T>> putStrategy = new YieldPutStrategy<DynamicPortActionContainer<T>>();
final TakeStrategy<DynamicPortActionContainer<T>> takeStrategy = new SCParkTakeStrategy<DynamicPortActionContainer<T>>();
actions = new PCBlockingQueue<DynamicPortActionContainer<T>>(localQueue, putStrategy, takeStrategy);
}
@Override
public void onStarting() throws Exception {
getDistributor(outputPort); // throws an ClassCastException if it is not a distributor
super.onStarting();
}
@Override
// first, receive exact one element from the inputPort
// second, receive exact one element from the dynamicPortActionInputPort
// next, repeat in this order
protected void executeStage() {
T element = inputPort.receive();
if (null == element) {
returnNoElement();
}
passToDistributor(element);
try {
// DynamicPortActionContainer<T> dynamicPortAction = dynamicPortActionInputPort.receive();
DynamicPortActionContainer<T> dynamicPortAction = actions.take();
// DynamicPortActionContainer<T> dynamicPortAction = actions.poll();
if (null == dynamicPortAction) {
returnNoElement();
}
checkForOutputPortChange(dynamicPortAction);
} catch (InterruptedException e) {
final FurtherExecution furtherExecution = exceptionHandler.onStageException(e, this);
if (furtherExecution == FurtherExecution.TERMINATE) {
throw new StageException(e, this);
}
}
}
private void checkForOutputPortChange(final DynamicPortActionContainer<T> dynamicPortAction) {
System.out.println("" + dynamicPortAction.getDynamicPortAction());
switch (dynamicPortAction.getDynamicPortAction()) {
case CREATE:
Distributor<T> distributor = getDistributor(outputPort);
OutputPort<T> newOutputPort = distributor.getNewOutputPort();
dynamicPortAction.execute(newOutputPort);
break;
case REMOVE:
// TODO implement "remove port at runtime"
break;
default:
if (logger.isWarnEnabled()) {
logger.warn("Unhandled switch case of " + DynamicPortAction.class.getName() + ": " + dynamicPortAction.getDynamicPortAction());
}
break;
}
}
@SuppressWarnings("unchecked")
private Distributor<T> getDistributor(final OutputPort<T> outputPort2) {
final Stage owningStage = outputPort.getPipe().getTargetPort().getOwningStage();
return (Distributor<T>) owningStage;
}
private void passToDistributor(final T element) {
System.out.println("Passing " + element);
outputPort.send(element);
}
public InputPort<T> getInputPort() {
return inputPort;
}
// public InputPort<DynamicPortActionContainer<T>> getDynamicPortActionInputPort() {
// return dynamicPortActionInputPort;
// }
public OutputPort<T> getOutputPort() {
return outputPort;
}
public Queue<DynamicPortActionContainer<T>> getActions() {
return actions;
}
}
package teetime.stage.basic.distributor.dynamic;
public class ControlledDynamicDistributor<T> extends DynamicDistributor<T> {
@Override
protected PortAction<T> getPortAction() throws InterruptedException {
return portActions.take();
}
}
package teetime.stage.basic.distributor; package teetime.stage.basic.distributor.dynamic;
import teetime.framework.DynamicActuator; import teetime.framework.DynamicActuator;
import teetime.framework.InputPort; import teetime.framework.InputPort;
...@@ -7,37 +7,29 @@ import teetime.framework.pipe.SpScPipeFactory; ...@@ -7,37 +7,29 @@ import teetime.framework.pipe.SpScPipeFactory;
import teetime.framework.signal.InitializingSignal; import teetime.framework.signal.InitializingSignal;
import teetime.framework.signal.StartingSignal; import teetime.framework.signal.StartingSignal;
public class DynamicPortActionContainer<T> { public class CreatePortAction<T> implements PortAction<T> {
private static final SpScPipeFactory INTER_THREAD_PIPE_FACTORY = new SpScPipeFactory(); private static final SpScPipeFactory INTER_THREAD_PIPE_FACTORY = new SpScPipeFactory();
private static final DynamicActuator DYNAMIC_ACTUATOR = new DynamicActuator();
private final DynamicActuator dynamicActuator = new DynamicActuator();
public enum DynamicPortAction {
CREATE, REMOVE;
}
private final DynamicPortAction dynamicPortAction;
private final InputPort<T> inputPort; private final InputPort<T> inputPort;
public DynamicPortActionContainer(final DynamicPortAction dynamicPortAction, final InputPort<T> inputPort) { public CreatePortAction(final InputPort<T> inputPort) {
super(); super();
this.dynamicPortAction = dynamicPortAction;
this.inputPort = inputPort; this.inputPort = inputPort;
} }
public DynamicPortAction getDynamicPortAction() {
return dynamicPortAction;
}
public InputPort<T> getInputPort() { public InputPort<T> getInputPort() {
return inputPort; return inputPort;
} }
public void execute(final OutputPort<T> newOutputPort) { @Override
public void execute(final DynamicDistributor<T> dynamicDistributor) {
OutputPort<? extends T> newOutputPort = dynamicDistributor.getNewOutputPort();
INTER_THREAD_PIPE_FACTORY.create(newOutputPort, inputPort); INTER_THREAD_PIPE_FACTORY.create(newOutputPort, inputPort);
Runnable runnable = dynamicActuator.wrap(inputPort.getOwningStage()); Runnable runnable = DYNAMIC_ACTUATOR.wrap(inputPort.getOwningStage());
Thread thread = new Thread(runnable); Thread thread = new Thread(runnable);
thread.start(); thread.start();
...@@ -46,5 +38,4 @@ public class DynamicPortActionContainer<T> { ...@@ -46,5 +38,4 @@ public class DynamicPortActionContainer<T> {
// FIXME pass the new thread to the analysis so that it can terminate the thread at the end // FIXME pass the new thread to the analysis so that it can terminate the thread at the end
} }
} }
package teetime.stage.basic.distributor.dynamic;
public class DoNothingPortAction<T> implements PortAction<T> {
@Override
public void execute(final DynamicDistributor<T> dynamicDistributor) {
// do nothing for testing purpose
}
}
package teetime.stage.basic.distributor; package teetime.stage.basic.distributor.dynamic;
import java.util.Queue; import java.util.Queue;
...@@ -7,8 +7,8 @@ import org.jctools.queues.spec.ConcurrentQueueSpec; ...@@ -7,8 +7,8 @@ import org.jctools.queues.spec.ConcurrentQueueSpec;
import org.jctools.queues.spec.Ordering; import org.jctools.queues.spec.Ordering;
import org.jctools.queues.spec.Preference; import org.jctools.queues.spec.Preference;
import teetime.framework.OutputPort; import teetime.framework.DynamicOutputPort;
import teetime.stage.basic.distributor.DynamicPortActionContainer.DynamicPortAction; import teetime.stage.basic.distributor.Distributor;
import teetime.util.concurrent.queue.PCBlockingQueue; import teetime.util.concurrent.queue.PCBlockingQueue;
import teetime.util.concurrent.queue.putstrategy.PutStrategy; import teetime.util.concurrent.queue.putstrategy.PutStrategy;
import teetime.util.concurrent.queue.putstrategy.YieldPutStrategy; import teetime.util.concurrent.queue.putstrategy.YieldPutStrategy;
...@@ -17,37 +17,44 @@ import teetime.util.concurrent.queue.takestrategy.TakeStrategy; ...@@ -17,37 +17,44 @@ import teetime.util.concurrent.queue.takestrategy.TakeStrategy;
public class DynamicDistributor<T> extends Distributor<T> { public class DynamicDistributor<T> extends Distributor<T> {
private final PCBlockingQueue<DynamicPortActionContainer<T>> actions; protected final PCBlockingQueue<PortAction<T>> portActions;
public DynamicDistributor() { public DynamicDistributor() {
final Queue<DynamicPortActionContainer<T>> localQueue = QueueFactory.newQueue(new ConcurrentQueueSpec(1, 1, 0, Ordering.FIFO, Preference.THROUGHPUT)); final Queue<PortAction<T>> localQueue = QueueFactory.newQueue(new ConcurrentQueueSpec(1, 1, 0, Ordering.FIFO, Preference.THROUGHPUT));
final PutStrategy<DynamicPortActionContainer<T>> putStrategy = new YieldPutStrategy<DynamicPortActionContainer<T>>(); final PutStrategy<PortAction<T>> putStrategy = new YieldPutStrategy<PortAction<T>>();
final TakeStrategy<DynamicPortActionContainer<T>> takeStrategy = new SCParkTakeStrategy<DynamicPortActionContainer<T>>(); final TakeStrategy<PortAction<T>> takeStrategy = new SCParkTakeStrategy<PortAction<T>>();
actions = new PCBlockingQueue<DynamicPortActionContainer<T>>(localQueue, putStrategy, takeStrategy); portActions = new PCBlockingQueue<PortAction<T>>(localQueue, putStrategy, takeStrategy);
} }
@Override @Override
protected void execute(final T element) { protected void execute(final T element) {
checkForPendingPortActionRequest(); try {
checkForPendingPortActionRequest();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
super.execute(element); super.execute(element);
} }
private void checkForPendingPortActionRequest() { private void checkForPendingPortActionRequest() throws InterruptedException {
DynamicPortActionContainer<T> dynamicPortAction = actions.poll(); PortAction<T> dynamicPortAction = getPortAction();
switch (dynamicPortAction.getDynamicPortAction()) { if (null != dynamicPortAction) {
case CREATE: dynamicPortAction.execute(this);
OutputPort<T> newOutputPort = createOutputPort();
dynamicPortAction.execute(newOutputPort);
break;
case REMOVE:
// TODO implement "remove port at runtime"
break;
default:
if (logger.isWarnEnabled()) {
logger.warn("Unhandled switch case of " + DynamicPortAction.class.getName() + ": " + dynamicPortAction.getDynamicPortAction());
}
break;
} }
} }
protected PortAction<T> getPortAction() throws InterruptedException {
return portActions.poll();
}
@Override
public void removeDynamicPort(final DynamicOutputPort<?> dynamicOutputPort) {
super.removeDynamicPort(dynamicOutputPort);
}
public boolean addPortActionRequest(final PortAction<T> newPortActionRequest) {
return portActions.offer(newPortActionRequest);
}
} }
package teetime.stage.basic.distributor.dynamic;
public interface PortAction<T> {
public abstract void execute(final DynamicDistributor<T> dynamicDistributor);
}
...@@ -16,7 +16,11 @@ import teetime.framework.AnalysisConfiguration; ...@@ -16,7 +16,11 @@ import teetime.framework.AnalysisConfiguration;
import teetime.framework.Stage; import teetime.framework.Stage;
import teetime.stage.CollectorSink; import teetime.stage.CollectorSink;
import teetime.stage.InitialElementProducer; import teetime.stage.InitialElementProducer;
import teetime.stage.basic.distributor.DynamicPortActionContainer.DynamicPortAction; import teetime.stage.basic.distributor.dynamic.ControlledDynamicDistributor;
import teetime.stage.basic.distributor.dynamic.CreatePortAction;
import teetime.stage.basic.distributor.dynamic.DoNothingPortAction;
import teetime.stage.basic.distributor.dynamic.DynamicDistributor;
import teetime.stage.basic.distributor.dynamic.PortAction;
public class ControlledDistributorTest { public class ControlledDistributorTest {
...@@ -29,12 +33,11 @@ public class ControlledDistributorTest { ...@@ -29,12 +33,11 @@ public class ControlledDistributorTest {
@Test @Test
public void shouldWorkWithoutActionTriggers() throws Exception { public void shouldWorkWithoutActionTriggers() throws Exception {
DynamicPortActionContainer<Integer> createAction = new DynamicPortActionContainer<Integer>( PortAction<Integer> createAction = new DoNothingPortAction<Integer>();
DynamicPortAction.REMOVE, null);
List<Integer> inputNumbers = Arrays.asList(0, 1, 2, 3, 4); List<Integer> inputNumbers = Arrays.asList(0, 1, 2, 3, 4);
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
List<DynamicPortActionContainer<Integer>> inputActions = Arrays.asList(createAction, createAction, createAction, createAction, createAction); List<PortAction<Integer>> inputActions = Arrays.asList(createAction, createAction, createAction, createAction, createAction);
ControlledDistributorTestConfig<Integer> config = new ControlledDistributorTestConfig<Integer>(inputNumbers, inputActions); ControlledDistributorTestConfig<Integer> config = new ControlledDistributorTestConfig<Integer>(inputNumbers, inputActions);
Analysis<ControlledDistributorTestConfig<Integer>> analysis = new Analysis<ControlledDistributorTestConfig<Integer>>(config); Analysis<ControlledDistributorTestConfig<Integer>> analysis = new Analysis<ControlledDistributorTestConfig<Integer>>(config);
...@@ -49,7 +52,7 @@ public class ControlledDistributorTest { ...@@ -49,7 +52,7 @@ public class ControlledDistributorTest {
List<Integer> inputNumbers = Arrays.asList(0, 1, 2, 3, 4); List<Integer> inputNumbers = Arrays.asList(0, 1, 2, 3, 4);
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
DynamicPortActionContainer<Integer>[] inputActions = new DynamicPortActionContainer[5]; PortAction<Integer>[] inputActions = new PortAction[5];
for (int i = 0; i < inputActions.length; i++) { for (int i = 0; i < inputActions.length; i++) {
CollectorSink<Integer> newStage = new CollectorSink<Integer>(); CollectorSink<Integer> newStage = new CollectorSink<Integer>();
...@@ -57,8 +60,7 @@ public class ControlledDistributorTest { ...@@ -57,8 +60,7 @@ public class ControlledDistributorTest {
// Thread thread = new Thread(runnable); // Thread thread = new Thread(runnable);
// thread.start(); // thread.start();
DynamicPortActionContainer<Integer> createAction = new DynamicPortActionContainer<Integer>( PortAction<Integer> createAction = new CreatePortAction<Integer>(newStage.getInputPort());
DynamicPortAction.CREATE, newStage.getInputPort());
inputActions[i] = createAction; inputActions[i] = createAction;
} }
...@@ -67,25 +69,25 @@ public class ControlledDistributorTest { ...@@ -67,25 +69,25 @@ public class ControlledDistributorTest {
analysis.executeBlocking(); analysis.executeBlocking();
for (DynamicPortActionContainer<Integer> ia : inputActions) { for (PortAction<Integer> ia : inputActions) {
Stage stage = ia.getInputPort().getOwningStage(); Stage stage = ((CreatePortAction<Integer>) ia).getInputPort().getOwningStage();
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
CollectorSink<Integer> collectorSink = (CollectorSink<Integer>) stage; CollectorSink<Integer> collectorSink = (CollectorSink<Integer>) stage;
System.out.println("collectorSink: " + collectorSink.getElements()); System.out.println("collectorSink: " + collectorSink.getElements());
} }
assertThat(config.getOutputElements(), contains(0, 1)); assertThat(config.getOutputElements(), contains(0));
assertValuesForIndex(inputActions, Arrays.asList(2), 0); assertValuesForIndex(inputActions, Arrays.asList(1), 0);
assertValuesForIndex(inputActions, Arrays.asList(3), 1); assertValuesForIndex(inputActions, Arrays.asList(2), 1);
assertValuesForIndex(inputActions, Arrays.asList(4), 2); assertValuesForIndex(inputActions, Arrays.asList(3), 2);
assertValuesForIndex(inputActions, Collections.<Integer> emptyList(), 3); assertValuesForIndex(inputActions, Arrays.asList(4), 3);
assertValuesForIndex(inputActions, Collections.<Integer> emptyList(), 4); assertValuesForIndex(inputActions, Collections.<Integer> emptyList(), 4);
} }
private void assertValuesForIndex(final DynamicPortActionContainer<Integer>[] inputActions, private void assertValuesForIndex(final PortAction<Integer>[] inputActions,
final List<Integer> values, final int index) { final List<Integer> values, final int index) {
DynamicPortActionContainer<Integer> ia = inputActions[index]; PortAction<Integer> ia = inputActions[index];
Stage stage = ia.getInputPort().getOwningStage(); Stage stage = ((CreatePortAction<Integer>) ia).getInputPort().getOwningStage();
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
CollectorSink<Integer> collectorSink = (CollectorSink<Integer>) stage; CollectorSink<Integer> collectorSink = (CollectorSink<Integer>) stage;
assertThat(collectorSink.getElements(), is(values)); assertThat(collectorSink.getElements(), is(values));
...@@ -95,24 +97,24 @@ public class ControlledDistributorTest { ...@@ -95,24 +97,24 @@ public class ControlledDistributorTest {
private final CollectorSink<T> collectorSink; private final CollectorSink<T> collectorSink;
public ControlledDistributorTestConfig(final List<T> elements, final List<DynamicPortActionContainer<T>> actions) { public ControlledDistributorTestConfig(final List<T> elements, final List<PortAction<T>> portActions) {
InitialElementProducer<T> initialElementProducer = new InitialElementProducer<T>(elements); InitialElementProducer<T> initialElementProducer = new InitialElementProducer<T>(elements);
// InitialElementProducer<DynamicPortActionContainer<T>> initialActionProducer = new InitialElementProducer<DynamicPortActionContainer<T>>(actions); // InitialElementProducer<PortAction<T>> initialActionProducer = new InitialElementProducer<PortAction<T>>(actions);
ControlledDistributor<T> controlledDistributor = new ControlledDistributor<T>(); DynamicDistributor<T> distributor = new ControlledDynamicDistributor<T>();
Distributor<T> distributor = new Distributor<T>();
collectorSink = new CollectorSink<T>(); collectorSink = new CollectorSink<T>();
connectPorts(initialElementProducer.getOutputPort(), controlledDistributor.getInputPort()); connectPorts(initialElementProducer.getOutputPort(), distributor.getInputPort());
// connectPorts(initialActionProducer.getOutputPort(), controlledDistributor.getDynamicPortActionInputPort()); // connectPorts(initialActionProducer.getOutputPort(), controlledDistributor.getDynamicPortActionInputPort());
connectPorts(controlledDistributor.getOutputPort(), distributor.getInputPort());
connectPorts(distributor.getNewOutputPort(), collectorSink.getInputPort()); connectPorts(distributor.getNewOutputPort(), collectorSink.getInputPort());
addThreadableStage(initialElementProducer); addThreadableStage(initialElementProducer);
// addThreadableStage(initialActionProducer); // simulates the AdaptationThread // addThreadableStage(initialActionProducer); // simulates the AdaptationThread
addThreadableStage(controlledDistributor); addThreadableStage(distributor);
addThreadableStage(collectorSink); addThreadableStage(collectorSink);
controlledDistributor.getActions().addAll(actions); for (PortAction<T> a : portActions) {
distributor.addPortActionRequest(a);
}
} }
public List<T> getOutputElements() { public List<T> getOutputElements() {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment