Skip to content
Snippets Groups Projects
Commit c88f7bfa authored by Christian Claus Wiechmann's avatar Christian Claus Wiechmann
Browse files

Checks added to TaskFarmStage

parent 13a2a4c7
No related branches found
No related tags found
1 merge request!67Task Farm branch
...@@ -45,7 +45,7 @@ public abstract class AbstractCompositeStage extends Stage { ...@@ -45,7 +45,7 @@ public abstract class AbstractCompositeStage extends Stage {
.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false); .getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false);
protected final Set<Stage> containingStages = new HashSet<Stage>(); protected final Set<Stage> containingStages = new HashSet<Stage>();
public final Set<Stage> lastStages = new HashSet<Stage>(); protected final Set<Stage> lastStages = new HashSet<Stage>();
protected abstract Stage getFirstStage(); protected abstract Stage getFirstStage();
......
package teetime.framework.exceptionHandling;
public class TaskFarmInvalidPipeException extends RuntimeException {
private static final long serialVersionUID = 219434999064433531L;
public TaskFarmInvalidPipeException(final String s) {
super(s);
}
}
package teetime.framework.exceptionHandling;
public class TaskFarmInvalidStageException extends RuntimeException {
private static final long serialVersionUID = -2024432280298919911L;
public TaskFarmInvalidStageException(final String s) {
super(s);
}
}
...@@ -7,6 +7,8 @@ import teetime.framework.AbstractCompositeStage; ...@@ -7,6 +7,8 @@ import teetime.framework.AbstractCompositeStage;
import teetime.framework.InputPort; import teetime.framework.InputPort;
import teetime.framework.OutputPort; import teetime.framework.OutputPort;
import teetime.framework.Stage; import teetime.framework.Stage;
import teetime.framework.exceptionHandling.TaskFarmInvalidPipeException;
import teetime.framework.exceptionHandling.TaskFarmInvalidStageException;
import teetime.framework.pipe.IMonitorablePipe; import teetime.framework.pipe.IMonitorablePipe;
import teetime.framework.pipe.IPipe; import teetime.framework.pipe.IPipe;
import teetime.framework.pipe.IPipeFactory; import teetime.framework.pipe.IPipeFactory;
...@@ -25,29 +27,79 @@ public class TaskFarmStage<T> extends AbstractCompositeStage { ...@@ -25,29 +27,79 @@ public class TaskFarmStage<T> extends AbstractCompositeStage {
private final Distributor<T> distributor = new Distributor<T>(); private final Distributor<T> distributor = new Distributor<T>();
private final Merger<T> merger = new Merger<T>(); private final Merger<T> merger = new Merger<T>();
private final AbstractCompositeStage includedStage;
private final Map<Integer, AbstractCompositeStage> includedStages = new HashMap<Integer, AbstractCompositeStage>();
private final Map<Integer, IMonitorablePipe> inputPipes = new HashMap<Integer, IMonitorablePipe>(); private final Map<Integer, IMonitorablePipe> inputPipes = new HashMap<Integer, IMonitorablePipe>();
private final Map<Integer, IMonitorablePipe> outputPipes = new HashMap<Integer, IMonitorablePipe>(); private final Map<Integer, IMonitorablePipe> outputPipes = new HashMap<Integer, IMonitorablePipe>();
public TaskFarmStage(final AbstractCompositeStage includedStage) { public TaskFarmStage(final AbstractCompositeStage includedStage) {
this.includedStage = includedStage; this.includedStages.put(0, includedStage);
this.lastStages.add(this.merger); this.lastStages.add(this.merger);
InputPort<T> stageInputPort = (InputPort<T>) this.includedStage.getInputPorts()[0]; checkIfValidAsIncludedStage(includedStage);
@SuppressWarnings("unchecked")
InputPort<T> stageInputPort = (InputPort<T>) includedStage.getInputPorts()[0];
IPipe inputPipe = connectPortsWithReturnValue(this.distributor.getNewOutputPort(), stageInputPort); IPipe inputPipe = connectPortsWithReturnValue(this.distributor.getNewOutputPort(), stageInputPort);
inputPipes.put(0, (IMonitorablePipe) inputPipe); checkIfPipeIsMonitorable(inputPipe);
this.inputPipes.put(0, (IMonitorablePipe) inputPipe);
OutputPort<T> stageOutputPort = (OutputPort<T>) this.includedStage.getOutputPorts()[0]; @SuppressWarnings("unchecked")
OutputPort<T> stageOutputPort = (OutputPort<T>) includedStage.getOutputPorts()[0];
IPipe outputPipe = connectPortsWithReturnValue(stageOutputPort, this.merger.getNewInputPort()); IPipe outputPipe = connectPortsWithReturnValue(stageOutputPort, this.merger.getNewInputPort());
outputPipes.put(0, (IMonitorablePipe) outputPipe); checkIfPipeIsMonitorable(outputPipe);
this.outputPipes.put(0, (IMonitorablePipe) outputPipe);
// TODO Check if getInputPorts returns InputPorts of T (same for OutputPorts)
// TODO Check if there is only one input and output port each of the included stage (may not have more than one last stages)
// TODO Check if input and output pipe are indeed of IMonitorablePipe
// TODO Do simple test analysis... // TODO Do simple test analysis...
} }
private void checkIfPipeIsMonitorable(final IPipe pipe) {
if (!(pipe instanceof IMonitorablePipe)) {
throw new TaskFarmInvalidPipeException("Pipe is not monitorable, which is required for a Task Farm.");
}
}
private void checkIfValidAsIncludedStage(final AbstractCompositeStage includedStage) {
checkInputPorts(includedStage);
checkOutputPorts(includedStage);
}
private void checkOutputPorts(final AbstractCompositeStage includedStage) {
OutputPort<?>[] stageOutputPorts = includedStage.getOutputPorts();
if (stageOutputPorts.length > 1) {
throw new TaskFarmInvalidStageException("Included stage has more than one output port.");
}
if (stageOutputPorts.length < 1) {
throw new TaskFarmInvalidStageException("Included stage has no output ports.");
}
try {
@SuppressWarnings("all")
OutputPort<T> _ = (OutputPort<T>) stageOutputPorts[0];
} catch (Exception _) {
throw new TaskFarmInvalidStageException("Output port of included stage does not have the same type as the Task Farm.");
}
}
private void checkInputPorts(final AbstractCompositeStage includedStage) {
InputPort<?>[] stageInputPorts = includedStage.getInputPorts();
if (stageInputPorts.length > 1) {
throw new TaskFarmInvalidStageException("Included stage has more than one input port.");
}
if (stageInputPorts.length < 1) {
throw new TaskFarmInvalidStageException("Included stage has no input ports.");
}
try {
@SuppressWarnings("all")
InputPort<T> _ = (InputPort<T>) stageInputPorts[0];
} catch (Exception _) {
throw new TaskFarmInvalidStageException("Input port of included stage does not have the same type as the Task Farm.");
}
}
@Override @Override
protected Stage getFirstStage() { protected Stage getFirstStage() {
return this.distributor; return this.distributor;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment