Skip to content
Snippets Groups Projects
Commit e732f4a4 authored by Christian Wulf's avatar Christian Wulf
Browse files

added checkstyle config file;

refactored package structure;
added correct and improved stage state management
parent c90c1c34
No related branches found
No related tags found
No related merge requests found
package teetime.framework.scheduling;
import java.util.Arrays;
import teetime.framework.core.IInputPort;
import teetime.framework.core.IPipeline;
import teetime.framework.core.IPortListener;
import teetime.framework.core.IStage;
import teetime.framework.scheduling.StageStateContainer.StageState;
public final class StageStateManager implements IPortListener {
private final StageStateContainer[] stageStateContainers;
public StageStateManager(final IPipeline pipeline) {
this.stageStateContainers = this.initStageStateContainers(pipeline);
this.registerAtAllInputPorts(pipeline);
}
private void registerAtAllInputPorts(final IPipeline pipeline) {
for (IStage stage : pipeline.getStages()) {
for (IInputPort<IStage, ?> inputPort : stage.getInputPorts()) {
inputPort.setPortListener(this);
}
}
}
private StageStateContainer[] initStageStateContainers(final IPipeline pipeline) {
StageStateContainer[] stageStateContainers = new StageStateContainer[pipeline.getStages().size()];
for (IStage stage : pipeline.getStages()) {
StageStateContainer stageStateContainer;
if (this.isConnectedWithAnotherThread(stage)) {
stageStateContainer = new SynchronizedStageStateContainer(stage);
} else {
stageStateContainer = new UnsynchronizedStageStateContainer(stage);
}
stageStateContainers[stage.getSchedulingIndex()] = stageStateContainer;
}
return stageStateContainers;
}
private boolean isConnectedWithAnotherThread(final IStage stage) {
// TODO Auto-generated method stub
return true;
}
@Override
public void onPortIsClosed(final IInputPort<?, ?> inputPort) {
StageStateContainer stageStateContainer = this.getStageStateContainer(inputPort.getOwningStage());
int newNumOpenedPorts = stageStateContainer.decNumOpenedPorts();
if (newNumOpenedPorts == 0) {
stageStateContainer.stageState = StageState.ALL_INPUT_PORTS_CLOSED;
// System.out.println("Closed stage: " + stageStateContainer.stage);
} else if (newNumOpenedPorts < 0) {
// TODO log warning
// this.logger.warning("Closed port more than once: portIndex=" + inputPort.getIndex() + " for stage " + this);
}
}
public boolean areAllInputPortsClosed(final IStage stage) {
StageStateContainer stageStateContainer = this.getStageStateContainer(stage);
return stageStateContainer.stageState == StageState.ALL_INPUT_PORTS_CLOSED;
}
public boolean isStageEnabled(final IStage stage) {
StageStateContainer stageStateContainer = this.getStageStateContainer(stage);
return stageStateContainer.stageState == StageState.ENABLED;
}
public void disable(final IStage stage) {
StageStateContainer stageStateContainer = this.getStageStateContainer(stage);
stageStateContainer.stageState = StageState.DISABLED;
}
private StageStateContainer getStageStateContainer(final IStage stage) {
int schedulingIndex = stage.getSchedulingIndex();
StageStateContainer stageStateContainer = this.stageStateContainers[schedulingIndex];
if (stageStateContainer == null) {
throw new NullPointerException("No container found for index ="+schedulingIndex+"\n=> stageStateContainers="+Arrays.asList(this.stageStateContainers));
}
return stageStateContainer;
}
}
package teetime.framework.scheduling;
import java.util.concurrent.atomic.AtomicInteger;
import teetime.framework.core.IStage;
public final class SynchronizedStageStateContainer extends StageStateContainer {
private final AtomicInteger numOpenedPorts = new AtomicInteger();
public SynchronizedStageStateContainer(final IStage stage) {
super(stage);
this.numOpenedPorts.set(stage.getInputPorts().size());
}
@Override
public int decNumOpenedPorts() {
return this.numOpenedPorts.decrementAndGet();
}
}
package teetime.framework.scheduling;
import teetime.framework.core.IStage;
public final class UnsynchronizedStageStateContainer extends StageStateContainer {
private int numOpenedInputPorts;
public UnsynchronizedStageStateContainer(final IStage stage) {
super(stage);
this.numOpenedInputPorts = stage.getInputPorts().size();
}
@Override
public int decNumOpenedPorts() {
return --this.numOpenedInputPorts;
}
}
...@@ -19,14 +19,14 @@ import java.util.ArrayList; ...@@ -19,14 +19,14 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import kieker.common.logging.LogFactory;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import teetime.util.StatisticsUtil; import teetime.util.StatisticsUtil;
import teetime.util.StopWatch; import teetime.util.StopWatch;
import kieker.common.logging.LogFactory;
/** /**
* @author Christian Wulf * @author Christian Wulf
* *
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment