Skip to content
Snippets Groups Projects
Commit b28869e1 authored by Christian Wulf's avatar Christian Wulf
Browse files

added working remove port action

parent 84738bfb
No related branches found
No related tags found
No related merge requests found
Showing
with 145 additions and 31 deletions
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
*/ */
package teetime.framework; package teetime.framework;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
...@@ -35,6 +36,8 @@ public abstract class AbstractStage extends Stage { ...@@ -35,6 +36,8 @@ public abstract class AbstractStage extends Stage {
private OutputPort<?>[] outputPorts = new OutputPort<?>[0]; private OutputPort<?>[] outputPorts = new OutputPort<?>[0];
private StageState currentState = StageState.CREATED; private StageState currentState = StageState.CREATED;
private final Set<OutputPortRemovedListener> outputPortRemovedListeners = new HashSet<OutputPortRemovedListener>();
@Override @Override
public InputPort<?>[] getInputPorts() { public InputPort<?>[] getInputPorts() {
return inputPorts; return inputPorts;
...@@ -311,8 +314,8 @@ public abstract class AbstractStage extends Stage { ...@@ -311,8 +314,8 @@ public abstract class AbstractStage extends Stage {
@Override @Override
protected void removeDynamicPort(final DynamicOutputPort<?> dynamicOutputPort) { protected void removeDynamicPort(final DynamicOutputPort<?> dynamicOutputPort) {
int index = dynamicOutputPort.getIndex(); int index = dynamicOutputPort.getIndex();
List<OutputPort<?>> tempOutputPorts = Arrays.asList(outputPorts); List<OutputPort<?>> tempOutputPorts = new ArrayList<OutputPort<?>>(Arrays.asList(outputPorts));
tempOutputPorts.remove(index); OutputPort<?> removedOutputPort = tempOutputPorts.remove(index);
for (int i = index; i < tempOutputPorts.size(); i++) { for (int i = index; i < tempOutputPorts.size(); i++) {
OutputPort<?> outputPort = tempOutputPorts.get(i); OutputPort<?> outputPort = tempOutputPorts.get(i);
if (outputPort instanceof DynamicOutputPort) { if (outputPort instanceof DynamicOutputPort) {
...@@ -320,6 +323,18 @@ public abstract class AbstractStage extends Stage { ...@@ -320,6 +323,18 @@ public abstract class AbstractStage extends Stage {
} }
} }
outputPorts = tempOutputPorts.toArray(new OutputPort[0]); outputPorts = tempOutputPorts.toArray(new OutputPort[0]);
fireOutputPortRemoved(removedOutputPort);
}
private void fireOutputPortRemoved(final OutputPort<?> removedOutputPort) {
for (OutputPortRemovedListener listener : outputPortRemovedListeners) {
listener.onOutputPortRemoved(this, removedOutputPort);
}
}
protected void addOutputPortRemovedListener(final OutputPortRemovedListener outputPortRemovedListener) {
outputPortRemovedListeners.add(outputPortRemovedListener);
} }
} }
package teetime.framework;
public interface OutputPortRemovedListener {
void onOutputPortRemoved(Stage stage, OutputPort<?> removedOutputPort);
}
...@@ -22,6 +22,7 @@ import java.util.Collection; ...@@ -22,6 +22,7 @@ import java.util.Collection;
import java.util.List; import java.util.List;
import teetime.framework.OutputPort; import teetime.framework.OutputPort;
import teetime.framework.Stage;
/** /**
* @author Nils Christian Ehmke * @author Nils Christian Ehmke
...@@ -111,4 +112,9 @@ public final class CloneStrategy implements IDistributorStrategy { ...@@ -111,4 +112,9 @@ public final class CloneStrategy implements IDistributorStrategy {
return null; return null;
} }
@Override
public void onOutputPortRemoved(final Stage stage, final OutputPort<?> removedOutputPort) {
// do nothing
}
} }
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
package teetime.stage.basic.distributor; package teetime.stage.basic.distributor;
import teetime.framework.OutputPort; import teetime.framework.OutputPort;
import teetime.framework.Stage;
/** /**
* @author Nils Christian Ehmke * @author Nils Christian Ehmke
...@@ -33,4 +34,8 @@ public final class CopyByReferenceStrategy implements IDistributorStrategy { ...@@ -33,4 +34,8 @@ public final class CopyByReferenceStrategy implements IDistributorStrategy {
return true; return true;
} }
@Override
public void onOutputPortRemoved(final Stage stage, final OutputPort<?> removedOutputPort) {
// do nothing
}
} }
...@@ -36,6 +36,7 @@ public class Distributor<T> extends AbstractConsumerStage<T> { ...@@ -36,6 +36,7 @@ public class Distributor<T> extends AbstractConsumerStage<T> {
public Distributor(final IDistributorStrategy strategy) { public Distributor(final IDistributorStrategy strategy) {
this.strategy = strategy; this.strategy = strategy;
addOutputPortRemovedListener(strategy);
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
...@@ -45,7 +46,7 @@ public class Distributor<T> extends AbstractConsumerStage<T> { ...@@ -45,7 +46,7 @@ public class Distributor<T> extends AbstractConsumerStage<T> {
} }
public OutputPort<T> getNewOutputPort() { public OutputPort<T> getNewOutputPort() {
return this.createOutputPort(); return this.createDynamicOutputPort();
} }
public IDistributorStrategy getStrategy() { public IDistributorStrategy getStrategy() {
...@@ -56,4 +57,9 @@ public class Distributor<T> extends AbstractConsumerStage<T> { ...@@ -56,4 +57,9 @@ public class Distributor<T> extends AbstractConsumerStage<T> {
this.strategy = strategy; this.strategy = strategy;
} }
@Override
public OutputPort<?>[] getOutputPorts() {
return super.getOutputPorts();
}
} }
...@@ -16,13 +16,14 @@ ...@@ -16,13 +16,14 @@
package teetime.stage.basic.distributor; package teetime.stage.basic.distributor;
import teetime.framework.OutputPort; import teetime.framework.OutputPort;
import teetime.framework.OutputPortRemovedListener;
/** /**
* @author Nils Christian Ehmke * @author Nils Christian Ehmke
* *
* @since 1.0 * @since 1.0
*/ */
public interface IDistributorStrategy { public interface IDistributorStrategy extends OutputPortRemovedListener {
public <T> boolean distribute(final OutputPort<T>[] allOutputPorts, final T element); public <T> boolean distribute(final OutputPort<T>[] allOutputPorts, final T element);
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
package teetime.stage.basic.distributor; package teetime.stage.basic.distributor;
import teetime.framework.OutputPort; import teetime.framework.OutputPort;
import teetime.framework.Stage;
/** /**
* @author Nils Christian Ehmke * @author Nils Christian Ehmke
...@@ -43,4 +44,11 @@ public final class RoundRobinStrategy implements IDistributorStrategy { ...@@ -43,4 +44,11 @@ public final class RoundRobinStrategy implements IDistributorStrategy {
return outputPort; return outputPort;
} }
@Override
public void onOutputPortRemoved(final Stage stage, final OutputPort<?> removedOutputPort) {
Distributor<?> distributor = (Distributor<?>) stage;
// correct the index if it is out-of-bounds
this.index = this.index % distributor.getOutputPorts().length;
}
} }
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
package teetime.stage.basic.distributor; package teetime.stage.basic.distributor;
import teetime.framework.OutputPort; import teetime.framework.OutputPort;
import teetime.framework.Stage;
/** /**
* @author Christian Wulf * @author Christian Wulf
...@@ -33,8 +34,9 @@ public final class RoundRobinStrategy2 implements IDistributorStrategy { ...@@ -33,8 +34,9 @@ public final class RoundRobinStrategy2 implements IDistributorStrategy {
int numLoops = numOutputPorts; int numLoops = numOutputPorts;
boolean success; boolean success;
OutputPort<T> outputPort;
do { do {
final OutputPort<T> outputPort = getNextPortInRoundRobinOrder(outputPorts); outputPort = getNextPortInRoundRobinOrder(outputPorts);
success = outputPort.sendNonBlocking(element); success = outputPort.sendNonBlocking(element);
if (0 == numLoops) { if (0 == numLoops) {
numWaits++; numWaits++;
...@@ -44,6 +46,8 @@ public final class RoundRobinStrategy2 implements IDistributorStrategy { ...@@ -44,6 +46,8 @@ public final class RoundRobinStrategy2 implements IDistributorStrategy {
numLoops--; numLoops--;
} while (!success); } while (!success);
System.out.println("Sent " + element + " via " + outputPort);
return true; return true;
} }
...@@ -69,4 +73,11 @@ public final class RoundRobinStrategy2 implements IDistributorStrategy { ...@@ -69,4 +73,11 @@ public final class RoundRobinStrategy2 implements IDistributorStrategy {
return numWaits; return numWaits;
} }
@Override
public void onOutputPortRemoved(final Stage stage, final OutputPort<?> removedOutputPort) {
Distributor<?> distributor = (Distributor<?>) stage;
// correct the index if it is out-of-bounds
this.index = this.index % distributor.getOutputPorts().length;
}
} }
package teetime.stage.basic.distributor.dynamic; package teetime.stage.basic.distributor.dynamic;
class ControlledDynamicDistributor<T> extends DynamicDistributor<T> { class ControlledDynamicDistributor<T> extends DynamicDistributor<T> {
@Override @Override
...@@ -7,4 +8,9 @@ class ControlledDynamicDistributor<T> extends DynamicDistributor<T> { ...@@ -7,4 +8,9 @@ class ControlledDynamicDistributor<T> extends DynamicDistributor<T> {
return portActions.take(); return portActions.take();
} }
// @Override
// protected OutputPort<?>[] getOutputPorts() { // repeated declaration for testing purposes
// return super.getOutputPorts();
// }
} }
...@@ -25,6 +25,7 @@ public class CreatePortAction<T> implements PortAction<T> { ...@@ -25,6 +25,7 @@ public class CreatePortAction<T> implements PortAction<T> {
@Override @Override
public void execute(final DynamicDistributor<T> dynamicDistributor) { public void execute(final DynamicDistributor<T> dynamicDistributor) {
System.out.println("Creating...");
OutputPort<? extends T> newOutputPort = dynamicDistributor.getNewOutputPort(); OutputPort<? extends T> newOutputPort = dynamicDistributor.getNewOutputPort();
INTER_THREAD_PIPE_FACTORY.create(newOutputPort, inputPort); INTER_THREAD_PIPE_FACTORY.create(newOutputPort, inputPort);
...@@ -37,5 +38,6 @@ public class CreatePortAction<T> implements PortAction<T> { ...@@ -37,5 +38,6 @@ public class CreatePortAction<T> implements PortAction<T> {
newOutputPort.sendSignal(new StartingSignal()); newOutputPort.sendSignal(new StartingSignal());
// FIXME pass the new thread to the analysis so that it can terminate the thread at the end // FIXME pass the new thread to the analysis so that it can terminate the thread at the end
System.out.println("Created.");
} }
} }
...@@ -40,7 +40,7 @@ public class DynamicDistributor<T> extends Distributor<T> { ...@@ -40,7 +40,7 @@ public class DynamicDistributor<T> extends Distributor<T> {
private void checkForPendingPortActionRequest() throws InterruptedException { private void checkForPendingPortActionRequest() throws InterruptedException {
PortAction<T> dynamicPortAction = getPortAction(); PortAction<T> dynamicPortAction = getPortAction();
if (null != dynamicPortAction) { if (null != dynamicPortAction) { // check if getPortAction() uses polling
dynamicPortAction.execute(this); dynamicPortAction.execute(this);
} }
} }
......
package teetime.stage.basic.distributor.dynamic;
import teetime.framework.DynamicOutputPort;
import teetime.framework.OutputPort;
import teetime.framework.signal.TerminatingSignal;
public class RemovePortAction<T> implements PortAction<T> {
private final DynamicOutputPort<T> outputPort;
public RemovePortAction(final DynamicOutputPort<T> outputPort) {
super();
this.outputPort = outputPort;
}
// public DynamicOutputPort<T> getOutputPort() {
// return outputPort;
// }
@Override
public void execute(final DynamicDistributor<T> dynamicDistributor) {
System.out.println("Removing...");
if (dynamicDistributor instanceof ControlledDynamicDistributor) {
OutputPort<?>[] outputPorts = ((ControlledDynamicDistributor<?>) dynamicDistributor).getOutputPorts();
OutputPort<?> outputPortToRemove = outputPorts[outputPorts.length - 1];
// outputPortToRemove = outputPort;
outputPortToRemove.sendSignal(new TerminatingSignal());
dynamicDistributor.removeDynamicPort((DynamicOutputPort<?>) outputPortToRemove);
}
System.out.println("Removed.");
}
}
...@@ -14,13 +14,9 @@ import org.junit.Test; ...@@ -14,13 +14,9 @@ import org.junit.Test;
import teetime.framework.Analysis; import teetime.framework.Analysis;
import teetime.framework.AnalysisConfiguration; import teetime.framework.AnalysisConfiguration;
import teetime.framework.Stage; import teetime.framework.Stage;
import teetime.framework.exceptionHandling.TerminatingExceptionListenerFactory;
import teetime.stage.CollectorSink; import teetime.stage.CollectorSink;
import teetime.stage.InitialElementProducer; import teetime.stage.InitialElementProducer;
import teetime.stage.basic.distributor.dynamic.ControlledDynamicDistributor;
import teetime.stage.basic.distributor.dynamic.CreatePortAction;
import teetime.stage.basic.distributor.dynamic.DoNothingPortAction;
import teetime.stage.basic.distributor.dynamic.DynamicDistributor;
import teetime.stage.basic.distributor.dynamic.PortAction;
public class ControlledDistributorTest { public class ControlledDistributorTest {
...@@ -40,7 +36,8 @@ public class ControlledDistributorTest { ...@@ -40,7 +36,8 @@ public class ControlledDistributorTest {
List<PortAction<Integer>> inputActions = Arrays.asList(createAction, createAction, createAction, createAction, createAction); List<PortAction<Integer>> inputActions = Arrays.asList(createAction, createAction, createAction, createAction, createAction);
ControlledDistributorTestConfig<Integer> config = new ControlledDistributorTestConfig<Integer>(inputNumbers, inputActions); ControlledDistributorTestConfig<Integer> config = new ControlledDistributorTestConfig<Integer>(inputNumbers, inputActions);
Analysis<ControlledDistributorTestConfig<Integer>> analysis = new Analysis<ControlledDistributorTestConfig<Integer>>(config); Analysis<ControlledDistributorTestConfig<Integer>> analysis = new Analysis<ControlledDistributorTestConfig<Integer>>(config,
new TerminatingExceptionListenerFactory());
analysis.executeBlocking(); analysis.executeBlocking();
...@@ -48,34 +45,22 @@ public class ControlledDistributorTest { ...@@ -48,34 +45,22 @@ public class ControlledDistributorTest {
} }
@Test @Test
public void shouldWorkWithActionTriggers() throws Exception { public void shouldWorkWithCreateActionTriggers() throws Exception {
List<Integer> inputNumbers = Arrays.asList(0, 1, 2, 3, 4); List<Integer> inputNumbers = Arrays.asList(0, 1, 2, 3, 4);
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
PortAction<Integer>[] inputActions = new PortAction[5]; PortAction<Integer>[] inputActions = new PortAction[5];
for (int i = 0; i < inputActions.length; i++) { for (int i = 0; i < inputActions.length; i++) {
CollectorSink<Integer> newStage = new CollectorSink<Integer>(); PortAction<Integer> createAction = createPortCreateAction();
// Runnable runnable = dynamicActuator.wrap(newStage);
// Thread thread = new Thread(runnable);
// thread.start();
PortAction<Integer> createAction = new CreatePortAction<Integer>(newStage.getInputPort());
inputActions[i] = createAction; inputActions[i] = createAction;
} }
ControlledDistributorTestConfig<Integer> config = new ControlledDistributorTestConfig<Integer>(inputNumbers, Arrays.asList(inputActions)); ControlledDistributorTestConfig<Integer> config = new ControlledDistributorTestConfig<Integer>(inputNumbers, Arrays.asList(inputActions));
Analysis<ControlledDistributorTestConfig<Integer>> analysis = new Analysis<ControlledDistributorTestConfig<Integer>>(config); Analysis<ControlledDistributorTestConfig<Integer>> analysis = new Analysis<ControlledDistributorTestConfig<Integer>>(config,
new TerminatingExceptionListenerFactory());
analysis.executeBlocking(); analysis.executeBlocking();
for (PortAction<Integer> ia : inputActions) {
Stage stage = ((CreatePortAction<Integer>) ia).getInputPort().getOwningStage();
@SuppressWarnings("unchecked")
CollectorSink<Integer> collectorSink = (CollectorSink<Integer>) stage;
System.out.println("collectorSink: " + collectorSink.getElements());
}
assertThat(config.getOutputElements(), contains(0)); assertThat(config.getOutputElements(), contains(0));
assertValuesForIndex(inputActions, Arrays.asList(1), 0); assertValuesForIndex(inputActions, Arrays.asList(1), 0);
assertValuesForIndex(inputActions, Arrays.asList(2), 1); assertValuesForIndex(inputActions, Arrays.asList(2), 1);
...@@ -84,6 +69,37 @@ public class ControlledDistributorTest { ...@@ -84,6 +69,37 @@ public class ControlledDistributorTest {
assertValuesForIndex(inputActions, Collections.<Integer> emptyList(), 4); assertValuesForIndex(inputActions, Collections.<Integer> emptyList(), 4);
} }
@Test
public void shouldWorkWithRemoveActionTriggers() throws Exception {
List<Integer> inputNumbers = Arrays.asList(0, 1, 2, 3, 4, 5);
@SuppressWarnings("unchecked")
PortAction<Integer>[] inputActions = new PortAction[6];
inputActions[0] = createPortCreateAction();
inputActions[1] = new RemovePortAction<Integer>(null);
inputActions[2] = createPortCreateAction();
inputActions[3] = createPortCreateAction();
inputActions[4] = new RemovePortAction<Integer>(null);
inputActions[5] = new RemovePortAction<Integer>(null);
ControlledDistributorTestConfig<Integer> config = new ControlledDistributorTestConfig<Integer>(inputNumbers, Arrays.asList(inputActions));
Analysis<ControlledDistributorTestConfig<Integer>> analysis = new Analysis<ControlledDistributorTestConfig<Integer>>(config,
new TerminatingExceptionListenerFactory());
analysis.executeBlocking();
assertThat(config.getOutputElements(), contains(0, 1, 2, 4, 5));
assertValuesForIndex(inputActions, Collections.<Integer> emptyList(), 0);
assertValuesForIndex(inputActions, Arrays.asList(3), 2);
assertValuesForIndex(inputActions, Collections.<Integer> emptyList(), 3);
}
private PortAction<Integer> createPortCreateAction() {
CollectorSink<Integer> newStage = new CollectorSink<Integer>();
PortAction<Integer> portAction = new CreatePortAction<Integer>(newStage.getInputPort());
return portAction;
}
private void assertValuesForIndex(final PortAction<Integer>[] inputActions, private void assertValuesForIndex(final PortAction<Integer>[] inputActions,
final List<Integer> values, final int index) { final List<Integer> values, final int index) {
PortAction<Integer> ia = inputActions[index]; PortAction<Integer> ia = inputActions[index];
...@@ -99,16 +115,13 @@ public class ControlledDistributorTest { ...@@ -99,16 +115,13 @@ public class ControlledDistributorTest {
public ControlledDistributorTestConfig(final List<T> elements, final List<PortAction<T>> portActions) { public ControlledDistributorTestConfig(final List<T> elements, final List<PortAction<T>> portActions) {
InitialElementProducer<T> initialElementProducer = new InitialElementProducer<T>(elements); InitialElementProducer<T> initialElementProducer = new InitialElementProducer<T>(elements);
// InitialElementProducer<PortAction<T>> initialActionProducer = new InitialElementProducer<PortAction<T>>(actions);
DynamicDistributor<T> distributor = new ControlledDynamicDistributor<T>(); DynamicDistributor<T> distributor = new ControlledDynamicDistributor<T>();
collectorSink = new CollectorSink<T>(); collectorSink = new CollectorSink<T>();
connectPorts(initialElementProducer.getOutputPort(), distributor.getInputPort()); connectPorts(initialElementProducer.getOutputPort(), distributor.getInputPort());
// connectPorts(initialActionProducer.getOutputPort(), controlledDistributor.getDynamicPortActionInputPort());
connectPorts(distributor.getNewOutputPort(), collectorSink.getInputPort()); connectPorts(distributor.getNewOutputPort(), collectorSink.getInputPort());
addThreadableStage(initialElementProducer); addThreadableStage(initialElementProducer);
// addThreadableStage(initialActionProducer); // simulates the AdaptationThread
addThreadableStage(distributor); addThreadableStage(distributor);
addThreadableStage(collectorSink); addThreadableStage(collectorSink);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment