Skip to content
Snippets Groups Projects
Commit 227619de authored by Christian Wulf's avatar Christian Wulf
Browse files

refactored tests

parent dc9188cb
No related branches found
No related tags found
No related merge requests found
package teetime.stage.basic.distributor.dynamic;
import teetime.util.framework.port.PortActionHelper;
class ControlledDynamicDistributor<T> extends DynamicDistributor<T> {
@Override
protected void checkForPendingPortActionRequest() {
try {
PortActionHelper.checkBlockingForPendingPortActionRequest(this, portActions);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
}
// @Override
// protected OutputPort<?>[] getOutputPorts() { // repeated declaration for testing purposes
// return super.getOutputPorts();
// }
}
......@@ -8,6 +8,8 @@ import teetime.framework.OutputPortRemovedListener;
import teetime.framework.Stage;
import teetime.framework.signal.TerminatingSignal;
import teetime.stage.basic.distributor.Distributor;
import teetime.stage.basic.distributor.IDistributorStrategy;
import teetime.stage.basic.distributor.RoundRobinStrategy2;
import teetime.util.framework.port.PortAction;
import teetime.util.framework.port.PortActionHelper;
......@@ -15,7 +17,15 @@ public class DynamicDistributor<T> extends Distributor<T> implements OutputPortR
protected final BlockingQueue<PortAction<DynamicDistributor<T>>> portActions;
/**
* Uses {@link RoundRobinStrategy2} as default distributor strategy.
*/
public DynamicDistributor() {
this(new RoundRobinStrategy2());
}
public DynamicDistributor(final IDistributorStrategy strategy) {
super(strategy);
this.portActions = PortActionHelper.createPortActionQueue();
addOutputPortRemovedListener(this);
}
......
......@@ -17,9 +17,9 @@ public class RemovePortAction<T> implements PortAction<DynamicDistributor<T>> {
public void execute(final DynamicDistributor<T> dynamicDistributor) {
OutputPort<?> outputPortToRemove;
if (dynamicDistributor instanceof ControlledDynamicDistributor) {
if (null == outputPort) {
// for testing purposes only
OutputPort<?>[] outputPorts = ((ControlledDynamicDistributor<?>) dynamicDistributor).getOutputPorts();
OutputPort<?>[] outputPorts = ((DynamicDistributor<?>) dynamicDistributor).getOutputPorts();
outputPortToRemove = outputPorts[outputPorts.length - 1];
} else {
outputPortToRemove = outputPort;
......
......@@ -28,8 +28,8 @@ public class ControlledDistributorTest {
@SuppressWarnings("unchecked")
List<PortAction<DynamicDistributor<Integer>>> inputActions = Arrays.asList(createAction, createAction, createAction, createAction, createAction);
ControlledDistributorTestConfig<Integer> config = new ControlledDistributorTestConfig<Integer>(inputNumbers, inputActions);
Execution<ControlledDistributorTestConfig<Integer>> analysis = new Execution<ControlledDistributorTestConfig<Integer>>(config,
DynamicDistributorTestConfig<Integer> config = new DynamicDistributorTestConfig<Integer>(inputNumbers, inputActions);
Execution<DynamicDistributorTestConfig<Integer>> analysis = new Execution<DynamicDistributorTestConfig<Integer>>(config,
new TerminatingExceptionListenerFactory());
analysis.executeBlocking();
......@@ -48,8 +48,8 @@ public class ControlledDistributorTest {
inputActions[i] = createAction;
}
ControlledDistributorTestConfig<Integer> config = new ControlledDistributorTestConfig<Integer>(inputNumbers, Arrays.asList(inputActions));
Execution<ControlledDistributorTestConfig<Integer>> analysis = new Execution<ControlledDistributorTestConfig<Integer>>(config,
DynamicDistributorTestConfig<Integer> config = new DynamicDistributorTestConfig<Integer>(inputNumbers, Arrays.asList(inputActions));
Execution<DynamicDistributorTestConfig<Integer>> analysis = new Execution<DynamicDistributorTestConfig<Integer>>(config,
new TerminatingExceptionListenerFactory());
analysis.executeBlocking();
......@@ -75,8 +75,8 @@ public class ControlledDistributorTest {
inputActions[4] = new RemovePortAction<Integer>(null);
inputActions[5] = new RemovePortAction<Integer>(null);
ControlledDistributorTestConfig<Integer> config = new ControlledDistributorTestConfig<Integer>(inputNumbers, Arrays.asList(inputActions));
Execution<ControlledDistributorTestConfig<Integer>> analysis = new Execution<ControlledDistributorTestConfig<Integer>>(config,
DynamicDistributorTestConfig<Integer> config = new DynamicDistributorTestConfig<Integer>(inputNumbers, Arrays.asList(inputActions));
Execution<DynamicDistributorTestConfig<Integer>> analysis = new Execution<DynamicDistributorTestConfig<Integer>>(config,
new TerminatingExceptionListenerFactory());
analysis.executeBlocking();
......@@ -102,13 +102,13 @@ public class ControlledDistributorTest {
assertThat(collectorSink.getElements(), is(values));
}
private static class ControlledDistributorTestConfig<T> extends Configuration {
private static class DynamicDistributorTestConfig<T> extends Configuration {
private final CollectorSink<T> collectorSink;
public ControlledDistributorTestConfig(final List<T> elements, final List<PortAction<DynamicDistributor<T>>> inputActions) {
public DynamicDistributorTestConfig(final List<T> elements, final List<PortAction<DynamicDistributor<T>>> inputActions) {
InitialElementProducer<T> initialElementProducer = new InitialElementProducer<T>(elements);
DynamicDistributor<T> distributor = new ControlledDynamicDistributor<T>();
DynamicDistributor<T> distributor = new DynamicDistributor<T>();
collectorSink = new CollectorSink<T>();
connectPorts(initialElementProducer.getOutputPort(), distributor.getInputPort());
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment