Newer
Older
/***************************************************************************
* Copyright 2014 Kieker Project (http://kieker-monitoring.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
***************************************************************************/
package teetime.framework.concurrent;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import teetime.framework.core.IInputPort;
import teetime.framework.core.IInputPort.PortState;
import teetime.framework.core.IPipe;
import teetime.framework.core.IPipeCommand;
import teetime.framework.core.IPipeline;
import teetime.framework.core.IStage;
import teetime.framework.scheduling.NextStageScheduler;
import teetime.framework.scheduling.StageStateManager;
import teetime.util.StopWatch;
/**
* @author Christian Wulf
* @since 1.10
*/
public class WorkerThread extends Thread {
private static final int NUM_ITERATIONS_TO_MEASURE = 10000;
private final IPipeline pipeline;
private IStageScheduler stageScheduler;
private StageStateManager stageStateManager;
private volatile StageTerminationPolicy terminationPolicy;
private volatile boolean shouldTerminate = false;
private final int accessesDeviceId;
private int executedUnsuccessfullyCount;
// statistics
private final List<Long> durationPerXIterationsInNs = new LinkedList<Long>();
private int iterations;
public WorkerThread(final IPipeline pipeline, final int accessesDeviceId) {
this.pipeline = pipeline;
this.accessesDeviceId = accessesDeviceId;
}
private void initStages(final IPipeline pipeline) {
for (final IStage stage : pipeline.getStages()) {
stage.setOwningThread(this);
}
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
this.setDepthForEachStage(pipeline);
this.setSchedulingIndexForEachhStage(pipeline);
}
private void setDepthForEachStage(final IPipeline pipeline) {
final IPipeCommand setDepthCommand = new IPipeCommand() {
@Override
public void execute(final IPipe<?> pipe) throws Exception {
final IStage sourceStage = pipe.getSourcePort().getOwningStage();
final IStage owningStage = pipe.getTargetPort().getOwningStage();
if (owningStage.getDepth() == IStage.DEPTH_NOT_SET) {
owningStage.setDepth(sourceStage.getDepth() + 1);
owningStage.notifyOutputPipes(this);
}
}
};
for (final IStage startStage : pipeline.getStartStages()) {
startStage.setDepth(0);
}
for (final IStage startStage : pipeline.getStartStages()) {
try {
startStage.notifyOutputPipes(setDepthCommand);
} catch (final Exception e) {
throw new IllegalStateException("may not happen", e);
}
}
}
private List<IStage> setSchedulingIndexForEachhStage(final IPipeline pipeline) {
final List<IStage> stageList = new ArrayList<IStage>(pipeline.getStages());
final Comparator<? super IStage> depthComparator = new Comparator<IStage>() {
@Override
public int compare(final IStage o1, final IStage o2) {
if (o1.getDepth() == o2.getDepth()) {
return 0;
} else if (o1.getDepth() < o2.getDepth()) {
return -1;
} else {
return 1;
}
}
};
Collections.sort(stageList, depthComparator);
for (int i = 0; i < stageList.size(); i++) {
stageList.get(i).setSchedulingIndex(i);
}
return stageList;
}
@Override
public void run() {
try {
this.initDatastructures();
} catch (final Exception e) {
throw new IllegalStateException(e);
}
this.iterations = 0;
this.stopWatch.start();
while (this.stageScheduler.isAnyStageActive()) {
this.iterations++;
// this.iterationStopWatch.start();
// beforeStageExecutionStopWatch.start();
// beforeStageExecutionStopWatch.end();
// stageExecutionStopWatch.end();
// afterStageExecutionStopWatch.start();
if (this.shouldTerminate) {
this.executeTerminationPolicy(stage, executedSuccessfully);
}
this.stageScheduler.determineNextStage(stage, executedSuccessfully);
// afterStageExecutionStopWatch.end();
// this.iterationStopWatch.end();
// all stop watches are activated
// final long schedulingOverhead = this.iterationStopWatch.getDurationInNs() -
// stageExecutionStopWatch.getDurationInNs(); //4952
// 6268 -> 5350 (w/o after) -> 4450 (w/o before) -> 3800 (w/o stage)
// final long schedulingOverhead = this.iterationStopWatch.getDurationInNs();
// final long schedulingOverhead = beforeStageExecutionStopWatch.getDurationInNs(); //327
// final long schedulingOverhead = stageExecutionStopWatch.getDurationInNs(); //1416
// final long schedulingOverhead = afterStageExecutionStopWatch.getDurationInNs(); //2450
// rest: ~2000 (measurement overhead?)
if ((this.iterations % NUM_ITERATIONS_TO_MEASURE) == 0) {
this.stopWatch.end();
this.durationPerXIterationsInNs.add(this.stopWatch.getDurationInNs());
this.stopWatch.start();
this.durationPerXIterationsInNs.add(this.stopWatch.getDurationInNs());
this.cleanUpDatastructures();
}
private void executeTerminationPolicy(final IStage executedStage, final boolean executedSuccessfully) {
// System.out.println("executeTerminationPolicy executedStage=" + executedStage + ", executedSuccessfully=" + executedSuccessfully);
// System.out.println("executeTerminationPolicy areAllInputPortsClosed(executedStage)=" + this.stageStateManager.areAllInputPortsClosed(executedStage));
switch (this.terminationPolicy) {
case TERMINATE_STAGE_AFTER_NEXT_EXECUTION:
if (this.stageStateManager.areAllInputPortsClosed(executedStage)) {
this.stageScheduler.disable(executedStage);
}
break;
case TERMINATE_STAGE_AFTER_UNSUCCESSFUL_EXECUTION:
if (!executedSuccessfully) {
if (this.stageStateManager.areAllInputPortsClosed(executedStage)) {
this.stageScheduler.disable(executedStage);
}
}
break;
case TERMINATE_STAGE_NOW:
for (final IStage stage : this.pipeline.getStages()) {
this.stageScheduler.disable(stage);
}
break;
default:
break;
}
}
private void initDatastructures() throws Exception {
// stages need to be initialized here, because in a concurrent context some stages (e.g., a merger) is executed after its pipeline has been created.
this.initStages(this.pipeline);
this.stageStateManager = new StageStateManager(this.pipeline);
this.stageScheduler = new NextStageScheduler(this.pipeline, this.accessesDeviceId, this.stageStateManager);
for (final IStage startStage : this.pipeline.getStartStages()) {
for (IInputPort<IStage, ?> inputPort : startStage.getInputPorts()) {
if (inputPort.getState() == PortState.CLOSED) {
inputPort.close();
}
}
}
this.pipeline.fireStartNotification();
}
private void startStageExecution(final IStage stage) {
// System.out.println("Executing stage: " + stage);
}
private void finishStageExecution(final IStage stage, final boolean executedSuccessfully) {
if (!executedSuccessfully) { // statistics
this.executedUnsuccessfullyCount++;
}
}
private void cleanUpDatastructures() {
this.pipeline.fireStopNotification();
}
public IPipeline getPipeline() {
return this.pipeline;
}
// BETTER remove this method since it is not intuitive; add a check to onStartPipeline so that a stage automatically
// disables itself if it has no input ports
public void terminate(final StageTerminationPolicy terminationPolicyToUse) {
// for (final IStage startStage : this.pipeline.getStartStages()) {
// if (this.stageStateManager.areAllInputPortsClosed(startStage)) {
// startStage.fireSignalClosingToAllInputPorts();
// }
// }
this.setTerminationPolicy(terminationPolicyToUse);
}
/**
* If not set, this thread will run infinitely.
* @param terminationPolicyToUse
*/
public void setTerminationPolicy(final StageTerminationPolicy terminationPolicyToUse) {
this.terminationPolicy = terminationPolicyToUse;
this.shouldTerminate = true;
}
public int getExecutedUnsuccessfullyCount() {
return this.executedUnsuccessfullyCount;
}
public List<Long> getDurationPer10000IterationsInNs() {
public int getIterations() {