Skip to content
Snippets Groups Projects
Commit 75b8736c authored by Christian Claus Wiechmann's avatar Christian Claus Wiechmann
Browse files

Controller implemented (no test yet)

parent 2e2b1cab
No related branches found
No related tags found
1 merge request!67Task Farm branch
...@@ -13,9 +13,12 @@ final public class AdaptationThread extends Thread { ...@@ -13,9 +13,12 @@ final public class AdaptationThread extends Thread {
private class ScheduledTaskFarm { private class ScheduledTaskFarm {
private final TaskFarmStage<?, ?, ?> taskFarmStage; private final TaskFarmStage<?, ?, ?> taskFarmStage;
private final TaskFarmAnalyzer analyzer; private final TaskFarmAnalyzer analyzer;
private final TaskFarmController controller; private final TaskFarmController<?, ?, ?> controller;
public ScheduledTaskFarm(final TaskFarmStage<?, ?, ?> taskFarmStage, final TaskFarmAnalyzer analyzer, final TaskFarmController controller) { public ScheduledTaskFarm(
final TaskFarmStage<?, ?, ?> taskFarmStage,
final TaskFarmAnalyzer analyzer,
final TaskFarmController<?, ?, ?> controller) {
this.taskFarmStage = taskFarmStage; this.taskFarmStage = taskFarmStage;
this.analyzer = analyzer; this.analyzer = analyzer;
this.controller = controller; this.controller = controller;
...@@ -24,10 +27,14 @@ final public class AdaptationThread extends Thread { ...@@ -24,10 +27,14 @@ final public class AdaptationThread extends Thread {
private final List<ScheduledTaskFarm> monitoredTaskFarms = new LinkedList<ScheduledTaskFarm>(); private final List<ScheduledTaskFarm> monitoredTaskFarms = new LinkedList<ScheduledTaskFarm>();
protected void addTaskFarm(final TaskFarmStage<?, ?, ?> taskFarmStage) { protected <I, O, TFS extends TaskFarmDuplicable<I, O>> void addTaskFarm(final TaskFarmStage<I, O, TFS> taskFarmStage) {
TaskFarmAnalyzer analyzer = new TaskFarmAnalyzer(); TaskFarmAnalyzer analyzer = new TaskFarmAnalyzer();
TaskFarmController controller = new TaskFarmController(taskFarmStage.getConfiguration()); TaskFarmController<I, O, TFS> controller = new TaskFarmController<I, O, TFS>(taskFarmStage.getConfiguration());
this.monitoredTaskFarms.add(new ScheduledTaskFarm(taskFarmStage, analyzer, controller)); this.monitoredTaskFarms.add(new ScheduledTaskFarm(taskFarmStage, analyzer, controller));
} }
@Override
public void run() {
// TODO Auto-generated method stub
}
} }
...@@ -53,7 +53,7 @@ public class TaskFarmStage<I, O, TFS extends TaskFarmDuplicable<I, O>> extends A ...@@ -53,7 +53,7 @@ public class TaskFarmStage<I, O, TFS extends TaskFarmDuplicable<I, O>> extends A
} }
} }
public TaskFarmConfiguration<?, ?, ?> getConfiguration() { public TaskFarmConfiguration<I, O, TFS> getConfiguration() {
return this.configuration; return this.configuration;
} }
} }
package teetime.stage.taskfarm.execution; package teetime.stage.taskfarm.execution;
import teetime.framework.DynamicInputPort;
import teetime.framework.DynamicOutputPort;
import teetime.framework.InputPort;
import teetime.framework.OutputPort;
import teetime.framework.pipe.IPipe;
import teetime.stage.basic.distributor.dynamic.DynamicDistributor;
import teetime.stage.basic.merger.dynamic.DynamicMerger;
import teetime.stage.taskfarm.TaskFarmConfiguration; import teetime.stage.taskfarm.TaskFarmConfiguration;
import teetime.stage.taskfarm.TaskFarmDuplicable;
import teetime.stage.taskfarm.TaskFarmTriple;
import teetime.util.framework.port.PortAction;
public class TaskFarmController { public class TaskFarmController<I, O, TFS extends TaskFarmDuplicable<I, O>> {
final private TaskFarmConfiguration<?, ?, ?> configuration; final private TaskFarmConfiguration<I, O, TFS> configuration;
public TaskFarmController(final TaskFarmConfiguration<?, ?, ?> configuration) { public TaskFarmController(final TaskFarmConfiguration<I, O, TFS> taskFarmConfiguration) {
this.configuration = configuration; this.configuration = taskFarmConfiguration;
} }
public void addStageToTaskFarm() { public void addStageToTaskFarm() {
TaskFarmDuplicable<I, O> newStage = configuration.getFirstStage().duplicate();
PortAction<DynamicMerger<O>> mergerPortAction =
new teetime.stage.basic.merger.dynamic.CreatePortAction<O>(newStage.getOutputPort());
configuration.getMerger().addPortActionRequest(mergerPortAction);
PortAction<DynamicDistributor<I>> distributorPortAction =
new teetime.stage.basic.distributor.dynamic.CreatePortAction<I>(newStage.getInputPort());
configuration.getDistributor().addPortActionRequest(distributorPortAction);
} }
public void removeStageFromTaskFarm() { public void removeStageFromTaskFarm() {
// FIXME: Do not remove if number of stages == 1
TaskFarmDuplicable<I, O> stageToBeRemoved = getStageToBeRemoved();
OutputPort<?> distributorOutputPort = getRemoveableDistributorOutputPort(stageToBeRemoved);
InputPort<?> mergerInputPort = getRemoveableMergerInputPort(stageToBeRemoved);
try {
@SuppressWarnings("unchecked")
PortAction<DynamicDistributor<I>> distributorPortAction =
new teetime.stage.basic.distributor.dynamic.RemovePortAction<I>((DynamicOutputPort<I>) distributorOutputPort);
configuration.getDistributor().addPortActionRequest(distributorPortAction);
@SuppressWarnings("unchecked")
PortAction<DynamicMerger<O>> mergerPortAction =
new teetime.stage.basic.merger.dynamic.RemovePortAction<O>((DynamicInputPort<O>) mergerInputPort);
configuration.getMerger().addPortActionRequest(mergerPortAction);
} catch (ClassCastException e) {
// TODO: Exception thrown because of wrong types. Should not happen, theoretically.
}
} }
private void getLastAddedStage() { private InputPort<?> getRemoveableMergerInputPort(final TaskFarmDuplicable<I, O> stageToBeRemoved) {
InputPort<?> mergerInputPort = stageToBeRemoved.getOutputPort().getPipe().getTargetPort();
return mergerInputPort;
}
private OutputPort<?> getRemoveableDistributorOutputPort(final TaskFarmDuplicable<I, O> stageToBeRemoved) {
OutputPort<?>[] potentialDistributorOutputPorts = configuration.getDistributor().getOutputPorts();
OutputPort<?> distributorOutputPort = null;
for (int i = 0; i < potentialDistributorOutputPorts.length; i++) {
IPipe pipe = potentialDistributorOutputPorts[i].getPipe();
if (pipe.equals(stageToBeRemoved.getInputPort().getPipe())) {
distributorOutputPort = potentialDistributorOutputPorts[i];
break;
}
} }
if (distributorOutputPort == null) {
// TODO: Better exception
throw new RuntimeException();
}
return distributorOutputPort;
}
private TaskFarmDuplicable<I, O> getStageToBeRemoved() {
// just get last added stage
TaskFarmTriple<I, O, TFS> triple = configuration.getTriples().get(configuration.getTriples().size() - 1);
return triple.getStage();
}
} }
...@@ -24,6 +24,22 @@ public class TaskFarmStageTest { ...@@ -24,6 +24,22 @@ public class TaskFarmStageTest {
private final static int NUMBER_OF_TEST_ELEMENTS = 1000; private final static int NUMBER_OF_TEST_ELEMENTS = 1000;
@Test
public void simpleTaskFarmStageTest() {
TestConfiguration configuration = new TestConfiguration();
final Execution<TestConfiguration> execution = new Execution<TestConfiguration>(configuration);
execution.executeBlocking();
List<String> result = configuration.getCollection();
for (int i = 1; i <= NUMBER_OF_TEST_ELEMENTS; i++) {
int n = i + 1;
String s = Integer.toString(n) + Integer.toString(n) + Integer.toString(n) + Integer.toString(n);
assertTrue("Does not contain: " + s, result.contains(s));
}
assertThat(result.size(), is(equalTo(NUMBER_OF_TEST_ELEMENTS)));
}
private class PlusOneInStringStage extends AbstractConsumerStage<Integer> { private class PlusOneInStringStage extends AbstractConsumerStage<Integer> {
private final OutputPort<String> outputPort = this.createOutputPort(); private final OutputPort<String> outputPort = this.createOutputPort();
...@@ -117,20 +133,4 @@ public class TaskFarmStageTest { ...@@ -117,20 +133,4 @@ public class TaskFarmStageTest {
return this.collection; return this.collection;
} }
} }
@Test
public void simpleTaskFarmStageTest() {
TestConfiguration configuration = new TestConfiguration();
final Execution<TestConfiguration> execution = new Execution<TestConfiguration>(configuration);
execution.executeBlocking();
List<String> result = configuration.getCollection();
for (int i = 1; i <= NUMBER_OF_TEST_ELEMENTS; i++) {
int n = i + 1;
String s = Integer.toString(n) + Integer.toString(n) + Integer.toString(n) + Integer.toString(n);
assertTrue("Does not contain: " + s, result.contains(s));
}
assertThat(result.size(), is(equalTo(NUMBER_OF_TEST_ELEMENTS)));
}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment