Skip to content
Snippets Groups Projects
Commit d81c69af authored by Christian Wulf's avatar Christian Wulf
Browse files

added working remove port action

parent 8b40d375
No related branches found
No related tags found
No related merge requests found
Showing
with 145 additions and 31 deletions
......@@ -15,6 +15,7 @@
*/
package teetime.framework;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
......@@ -35,6 +36,8 @@ public abstract class AbstractStage extends Stage {
private OutputPort<?>[] outputPorts = new OutputPort<?>[0];
private StageState currentState = StageState.CREATED;
private final Set<OutputPortRemovedListener> outputPortRemovedListeners = new HashSet<OutputPortRemovedListener>();
@Override
public InputPort<?>[] getInputPorts() {
return inputPorts;
......@@ -239,8 +242,8 @@ public abstract class AbstractStage extends Stage {
@Override
protected void removeDynamicPort(final DynamicOutputPort<?> dynamicOutputPort) {
int index = dynamicOutputPort.getIndex();
List<OutputPort<?>> tempOutputPorts = Arrays.asList(outputPorts);
tempOutputPorts.remove(index);
List<OutputPort<?>> tempOutputPorts = new ArrayList<OutputPort<?>>(Arrays.asList(outputPorts));
OutputPort<?> removedOutputPort = tempOutputPorts.remove(index);
for (int i = index; i < tempOutputPorts.size(); i++) {
OutputPort<?> outputPort = tempOutputPorts.get(i);
if (outputPort instanceof DynamicOutputPort) {
......@@ -248,6 +251,18 @@ public abstract class AbstractStage extends Stage {
}
}
outputPorts = tempOutputPorts.toArray(new OutputPort[0]);
fireOutputPortRemoved(removedOutputPort);
}
private void fireOutputPortRemoved(final OutputPort<?> removedOutputPort) {
for (OutputPortRemovedListener listener : outputPortRemovedListeners) {
listener.onOutputPortRemoved(this, removedOutputPort);
}
}
protected void addOutputPortRemovedListener(final OutputPortRemovedListener outputPortRemovedListener) {
outputPortRemovedListeners.add(outputPortRemovedListener);
}
}
package teetime.framework;
public interface OutputPortRemovedListener {
void onOutputPortRemoved(Stage stage, OutputPort<?> removedOutputPort);
}
......@@ -22,6 +22,7 @@ import java.util.Collection;
import java.util.List;
import teetime.framework.OutputPort;
import teetime.framework.Stage;
/**
* @author Nils Christian Ehmke
......@@ -111,4 +112,9 @@ public final class CloneStrategy implements IDistributorStrategy {
return null;
}
@Override
public void onOutputPortRemoved(final Stage stage, final OutputPort<?> removedOutputPort) {
// do nothing
}
}
......@@ -16,6 +16,7 @@
package teetime.stage.basic.distributor;
import teetime.framework.OutputPort;
import teetime.framework.Stage;
/**
* @author Nils Christian Ehmke
......@@ -33,4 +34,8 @@ public final class CopyByReferenceStrategy implements IDistributorStrategy {
return true;
}
@Override
public void onOutputPortRemoved(final Stage stage, final OutputPort<?> removedOutputPort) {
// do nothing
}
}
......@@ -36,6 +36,7 @@ public class Distributor<T> extends AbstractConsumerStage<T> {
public Distributor(final IDistributorStrategy strategy) {
this.strategy = strategy;
addOutputPortRemovedListener(strategy);
}
@SuppressWarnings("unchecked")
......@@ -45,7 +46,7 @@ public class Distributor<T> extends AbstractConsumerStage<T> {
}
public OutputPort<T> getNewOutputPort() {
return this.createOutputPort();
return this.createDynamicOutputPort();
}
public IDistributorStrategy getStrategy() {
......@@ -56,4 +57,9 @@ public class Distributor<T> extends AbstractConsumerStage<T> {
this.strategy = strategy;
}
@Override
public OutputPort<?>[] getOutputPorts() {
return super.getOutputPorts();
}
}
......@@ -16,13 +16,14 @@
package teetime.stage.basic.distributor;
import teetime.framework.OutputPort;
import teetime.framework.OutputPortRemovedListener;
/**
* @author Nils Christian Ehmke
*
* @since 1.0
*/
public interface IDistributorStrategy {
public interface IDistributorStrategy extends OutputPortRemovedListener {
public <T> boolean distribute(final OutputPort<T>[] allOutputPorts, final T element);
......
......@@ -16,6 +16,7 @@
package teetime.stage.basic.distributor;
import teetime.framework.OutputPort;
import teetime.framework.Stage;
/**
* @author Nils Christian Ehmke
......@@ -43,4 +44,11 @@ public final class RoundRobinStrategy implements IDistributorStrategy {
return outputPort;
}
@Override
public void onOutputPortRemoved(final Stage stage, final OutputPort<?> removedOutputPort) {
Distributor<?> distributor = (Distributor<?>) stage;
// correct the index if it is out-of-bounds
this.index = this.index % distributor.getOutputPorts().length;
}
}
......@@ -16,6 +16,7 @@
package teetime.stage.basic.distributor;
import teetime.framework.OutputPort;
import teetime.framework.Stage;
/**
* @author Christian Wulf
......@@ -33,8 +34,9 @@ public final class RoundRobinStrategy2 implements IDistributorStrategy {
int numLoops = numOutputPorts;
boolean success;
OutputPort<T> outputPort;
do {
final OutputPort<T> outputPort = getNextPortInRoundRobinOrder(outputPorts);
outputPort = getNextPortInRoundRobinOrder(outputPorts);
success = outputPort.sendNonBlocking(element);
if (0 == numLoops) {
numWaits++;
......@@ -44,6 +46,8 @@ public final class RoundRobinStrategy2 implements IDistributorStrategy {
numLoops--;
} while (!success);
System.out.println("Sent " + element + " via " + outputPort);
return true;
}
......@@ -69,4 +73,11 @@ public final class RoundRobinStrategy2 implements IDistributorStrategy {
return numWaits;
}
@Override
public void onOutputPortRemoved(final Stage stage, final OutputPort<?> removedOutputPort) {
Distributor<?> distributor = (Distributor<?>) stage;
// correct the index if it is out-of-bounds
this.index = this.index % distributor.getOutputPorts().length;
}
}
package teetime.stage.basic.distributor.dynamic;
class ControlledDynamicDistributor<T> extends DynamicDistributor<T> {
@Override
......@@ -7,4 +8,9 @@ class ControlledDynamicDistributor<T> extends DynamicDistributor<T> {
return portActions.take();
}
// @Override
// protected OutputPort<?>[] getOutputPorts() { // repeated declaration for testing purposes
// return super.getOutputPorts();
// }
}
......@@ -25,6 +25,7 @@ public class CreatePortAction<T> implements PortAction<T> {
@Override
public void execute(final DynamicDistributor<T> dynamicDistributor) {
System.out.println("Creating...");
OutputPort<? extends T> newOutputPort = dynamicDistributor.getNewOutputPort();
INTER_THREAD_PIPE_FACTORY.create(newOutputPort, inputPort);
......@@ -37,5 +38,6 @@ public class CreatePortAction<T> implements PortAction<T> {
newOutputPort.sendSignal(new StartingSignal());
// FIXME pass the new thread to the analysis so that it can terminate the thread at the end
System.out.println("Created.");
}
}
......@@ -40,7 +40,7 @@ public class DynamicDistributor<T> extends Distributor<T> {
private void checkForPendingPortActionRequest() throws InterruptedException {
PortAction<T> dynamicPortAction = getPortAction();
if (null != dynamicPortAction) {
if (null != dynamicPortAction) { // check if getPortAction() uses polling
dynamicPortAction.execute(this);
}
}
......
package teetime.stage.basic.distributor.dynamic;
import teetime.framework.DynamicOutputPort;
import teetime.framework.OutputPort;
import teetime.framework.signal.TerminatingSignal;
public class RemovePortAction<T> implements PortAction<T> {
private final DynamicOutputPort<T> outputPort;
public RemovePortAction(final DynamicOutputPort<T> outputPort) {
super();
this.outputPort = outputPort;
}
// public DynamicOutputPort<T> getOutputPort() {
// return outputPort;
// }
@Override
public void execute(final DynamicDistributor<T> dynamicDistributor) {
System.out.println("Removing...");
if (dynamicDistributor instanceof ControlledDynamicDistributor) {
OutputPort<?>[] outputPorts = ((ControlledDynamicDistributor<?>) dynamicDistributor).getOutputPorts();
OutputPort<?> outputPortToRemove = outputPorts[outputPorts.length - 1];
// outputPortToRemove = outputPort;
outputPortToRemove.sendSignal(new TerminatingSignal());
dynamicDistributor.removeDynamicPort((DynamicOutputPort<?>) outputPortToRemove);
}
System.out.println("Removed.");
}
}
......@@ -14,13 +14,9 @@ import org.junit.Test;
import teetime.framework.Analysis;
import teetime.framework.AnalysisConfiguration;
import teetime.framework.Stage;
import teetime.framework.exceptionHandling.TerminatingExceptionListenerFactory;
import teetime.stage.CollectorSink;
import teetime.stage.InitialElementProducer;
import teetime.stage.basic.distributor.dynamic.ControlledDynamicDistributor;
import teetime.stage.basic.distributor.dynamic.CreatePortAction;
import teetime.stage.basic.distributor.dynamic.DoNothingPortAction;
import teetime.stage.basic.distributor.dynamic.DynamicDistributor;
import teetime.stage.basic.distributor.dynamic.PortAction;
public class ControlledDistributorTest {
......@@ -40,7 +36,8 @@ public class ControlledDistributorTest {
List<PortAction<Integer>> inputActions = Arrays.asList(createAction, createAction, createAction, createAction, createAction);
ControlledDistributorTestConfig<Integer> config = new ControlledDistributorTestConfig<Integer>(inputNumbers, inputActions);
Analysis<ControlledDistributorTestConfig<Integer>> analysis = new Analysis<ControlledDistributorTestConfig<Integer>>(config);
Analysis<ControlledDistributorTestConfig<Integer>> analysis = new Analysis<ControlledDistributorTestConfig<Integer>>(config,
new TerminatingExceptionListenerFactory());
analysis.executeBlocking();
......@@ -48,34 +45,22 @@ public class ControlledDistributorTest {
}
@Test
public void shouldWorkWithActionTriggers() throws Exception {
public void shouldWorkWithCreateActionTriggers() throws Exception {
List<Integer> inputNumbers = Arrays.asList(0, 1, 2, 3, 4);
@SuppressWarnings("unchecked")
PortAction<Integer>[] inputActions = new PortAction[5];
for (int i = 0; i < inputActions.length; i++) {
CollectorSink<Integer> newStage = new CollectorSink<Integer>();
// Runnable runnable = dynamicActuator.wrap(newStage);
// Thread thread = new Thread(runnable);
// thread.start();
PortAction<Integer> createAction = new CreatePortAction<Integer>(newStage.getInputPort());
PortAction<Integer> createAction = createPortCreateAction();
inputActions[i] = createAction;
}
ControlledDistributorTestConfig<Integer> config = new ControlledDistributorTestConfig<Integer>(inputNumbers, Arrays.asList(inputActions));
Analysis<ControlledDistributorTestConfig<Integer>> analysis = new Analysis<ControlledDistributorTestConfig<Integer>>(config);
Analysis<ControlledDistributorTestConfig<Integer>> analysis = new Analysis<ControlledDistributorTestConfig<Integer>>(config,
new TerminatingExceptionListenerFactory());
analysis.executeBlocking();
for (PortAction<Integer> ia : inputActions) {
Stage stage = ((CreatePortAction<Integer>) ia).getInputPort().getOwningStage();
@SuppressWarnings("unchecked")
CollectorSink<Integer> collectorSink = (CollectorSink<Integer>) stage;
System.out.println("collectorSink: " + collectorSink.getElements());
}
assertThat(config.getOutputElements(), contains(0));
assertValuesForIndex(inputActions, Arrays.asList(1), 0);
assertValuesForIndex(inputActions, Arrays.asList(2), 1);
......@@ -84,6 +69,37 @@ public class ControlledDistributorTest {
assertValuesForIndex(inputActions, Collections.<Integer> emptyList(), 4);
}
@Test
public void shouldWorkWithRemoveActionTriggers() throws Exception {
List<Integer> inputNumbers = Arrays.asList(0, 1, 2, 3, 4, 5);
@SuppressWarnings("unchecked")
PortAction<Integer>[] inputActions = new PortAction[6];
inputActions[0] = createPortCreateAction();
inputActions[1] = new RemovePortAction<Integer>(null);
inputActions[2] = createPortCreateAction();
inputActions[3] = createPortCreateAction();
inputActions[4] = new RemovePortAction<Integer>(null);
inputActions[5] = new RemovePortAction<Integer>(null);
ControlledDistributorTestConfig<Integer> config = new ControlledDistributorTestConfig<Integer>(inputNumbers, Arrays.asList(inputActions));
Analysis<ControlledDistributorTestConfig<Integer>> analysis = new Analysis<ControlledDistributorTestConfig<Integer>>(config,
new TerminatingExceptionListenerFactory());
analysis.executeBlocking();
assertThat(config.getOutputElements(), contains(0, 1, 2, 4, 5));
assertValuesForIndex(inputActions, Collections.<Integer> emptyList(), 0);
assertValuesForIndex(inputActions, Arrays.asList(3), 2);
assertValuesForIndex(inputActions, Collections.<Integer> emptyList(), 3);
}
private PortAction<Integer> createPortCreateAction() {
CollectorSink<Integer> newStage = new CollectorSink<Integer>();
PortAction<Integer> portAction = new CreatePortAction<Integer>(newStage.getInputPort());
return portAction;
}
private void assertValuesForIndex(final PortAction<Integer>[] inputActions,
final List<Integer> values, final int index) {
PortAction<Integer> ia = inputActions[index];
......@@ -99,16 +115,13 @@ public class ControlledDistributorTest {
public ControlledDistributorTestConfig(final List<T> elements, final List<PortAction<T>> portActions) {
InitialElementProducer<T> initialElementProducer = new InitialElementProducer<T>(elements);
// InitialElementProducer<PortAction<T>> initialActionProducer = new InitialElementProducer<PortAction<T>>(actions);
DynamicDistributor<T> distributor = new ControlledDynamicDistributor<T>();
collectorSink = new CollectorSink<T>();
connectPorts(initialElementProducer.getOutputPort(), distributor.getInputPort());
// connectPorts(initialActionProducer.getOutputPort(), controlledDistributor.getDynamicPortActionInputPort());
connectPorts(distributor.getNewOutputPort(), collectorSink.getInputPort());
addThreadableStage(initialElementProducer);
// addThreadableStage(initialActionProducer); // simulates the AdaptationThread
addThreadableStage(distributor);
addThreadableStage(collectorSink);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment