Skip to content
Snippets Groups Projects
WorkerThread.java 6.49 KiB
Newer Older
Christian Wulf's avatar
Christian Wulf committed
/***************************************************************************
 * Copyright 2014 Kieker Project (http://kieker-monitoring.net)
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 ***************************************************************************/

package teetime.framework.concurrent;

import java.util.LinkedList;
import java.util.List;

import teetime.framework.core.IPipeline;
import teetime.framework.core.IStage;
import teetime.util.StopWatch;

/**
 * @author Christian Wulf
Christian Wulf's avatar
Christian Wulf committed
 *
Christian Wulf's avatar
Christian Wulf committed
 * @since 1.10
 */
public class WorkerThread extends Thread {

	private final IPipeline pipeline;
	private IStageScheduler stageScheduler;

	private volatile StageTerminationPolicy terminationPolicy;
	private volatile boolean shouldTerminate = false;
	private final int accessesDeviceId;
	private int executedUnsuccessfullyCount;
	private final StopWatch stopWatch = new StopWatch();
	private final StopWatch iterationStopWatch = new StopWatch();
	private final List<Long> schedulingOverheadsInNs = new LinkedList<Long>();
	private long durationInNs;

	public WorkerThread(final IPipeline pipeline, final int accessesDeviceId) {
		this.pipeline = pipeline;
		for (final IStage stage : pipeline.getStages()) {
			stage.setOwningThread(this);
		}
		this.accessesDeviceId = accessesDeviceId;
	}

	@Override
	public void run() {
		try {
			this.initDatastructures();
		} catch (final Exception e) {
			throw new IllegalStateException(e);
		}

		long iterations = 0;
		long schedulingOverheadInNs = 0;
		this.stopWatch.start();

		while (this.stageScheduler.isAnyStageActive()) {
			iterations++;
Christian Wulf's avatar
Christian Wulf committed
//			this.iterationStopWatch.start();
Christian Wulf's avatar
Christian Wulf committed

			final IStage stage = this.stageScheduler.get();

			this.startStageExecution(stage);
			final boolean executedSuccessfully = stage.execute();
			this.finishStageExecution(stage, executedSuccessfully);

			if (this.shouldTerminate) {
				this.executeTerminationPolicy(stage, executedSuccessfully);
			}
			this.stageScheduler.determineNextStage(stage, executedSuccessfully);

Christian Wulf's avatar
Christian Wulf committed
//			this.iterationStopWatch.end();
//			final long schedulingOverhead = this.iterationStopWatch.getDurationInNs() - stage.getLastDuration();
//			schedulingOverheadInNs += schedulingOverhead;
//			if ((iterations % 10000) == 0) {
//				this.schedulingOverheadsInNs.add(schedulingOverheadInNs);
//				schedulingOverheadInNs = 0;
//			}
Christian Wulf's avatar
Christian Wulf committed
		}

		this.stopWatch.end();
Christian Wulf's avatar
Christian Wulf committed
		this.durationInNs = this.stopWatch.getDurationInNs();
Christian Wulf's avatar
Christian Wulf committed

		final List<Long> durations = ((NextStageScheduler) this.stageScheduler).getDurations();
		long overallDuration = 0;
		for (int i = durations.size() / 2; i < durations.size(); i++) {
			overallDuration += durations.get(i);
		}
		// System.out.println("Scheduler determine next stage (" + (durations.size() / 2) + "): " + TimeUnit.NANOSECONDS.toMillis(overallDuration) + " ms");

		this.cleanUpDatastructures();
	}

	private void executeTerminationPolicy(final IStage executedStage, final boolean executedSuccessfully) {
		// System.out.println("WorkerThread.executeTerminationPolicy(): " + this.terminationPolicy + ", executedSuccessfully=" + executedSuccessfully
		// + ", mayBeDisabled=" + executedStage.mayBeDisabled());

		switch (this.terminationPolicy) {
		case TERMINATE_STAGE_AFTER_NEXT_EXECUTION:
			if (executedStage.mayBeDisabled()) {
				this.stageScheduler.disable(executedStage);
			}
			break;
		case TERMINATE_STAGE_AFTER_UNSUCCESSFUL_EXECUTION:
			if (!executedSuccessfully) {
				if (executedStage.mayBeDisabled()) {
					this.stageScheduler.disable(executedStage);
				}
			}
			break;
		case TERMINATE_STAGE_NOW:
			for (final IStage stage : this.pipeline.getStages()) {
				this.stageScheduler.disable(stage);
			}
			break;
		default:
			break;
		}
	}

	private void initDatastructures() throws Exception {
		this.pipeline.fireStartNotification();
		this.stageScheduler = new NextStageScheduler(this.pipeline, this.accessesDeviceId);
	}

	private void startStageExecution(final IStage stage) {
		// System.out.println("Executing stage: " + stage);
	}

	private void finishStageExecution(final IStage stage, final boolean executedSuccessfully) {
		// System.out.println("Executed stage " + stage + " successfully: " + executedSuccessfully);
		if (!executedSuccessfully) { // statistics
			this.executedUnsuccessfullyCount++;
		}
	}

	private void cleanUpDatastructures() {
		// System.out.println("Cleaning up datastructures...");
		// System.out.println("Firing stop notification...");
		this.pipeline.fireStopNotification();
		// System.out.println("Thread terminated:" + this);
		// System.out.println(this.getName() + ": executedUnsuccessfullyCount=" + this.executedUnsuccessfullyCount);
	}

	public IPipeline getPipeline() {
		return this.pipeline;
	}

	// BETTER remove this method since it is not intuitive; add a check to onStartPipeline so that a stage automatically disables itself if it has no input ports
	public void terminate(final StageTerminationPolicy terminationPolicyToUse) {
		for (final IStage startStage : this.pipeline.getStartStages()) {
			startStage.fireSignalClosingToAllInputPorts();
		}

		this.setTerminationPolicy(terminationPolicyToUse);
	}

	/**
	 * If not set, this thread will run infinitely.
Christian Wulf's avatar
Christian Wulf committed
	 *
Christian Wulf's avatar
Christian Wulf committed
	 * @param terminationPolicyToUse
	 */
	public void setTerminationPolicy(final StageTerminationPolicy terminationPolicyToUse) {
		this.terminationPolicy = terminationPolicyToUse;
		this.shouldTerminate = true;
	}

	public int getExecutedUnsuccessfullyCount() {
		return this.executedUnsuccessfullyCount;
	}

	public List<Long> getSchedulingOverheadsInNs() {
		return this.schedulingOverheadsInNs;
	}

	/**
	 * @since 1.10
	 */
	public long getDurationInNs() {
		return this.durationInNs;
	}

	/**
	 * Uses the last half of values to compute the scheduling overall overhead in ns
Christian Wulf's avatar
Christian Wulf committed
	 *
Christian Wulf's avatar
Christian Wulf committed
	 * @since 1.10
	 */
	public long computeSchedulingOverheadInNs() {
		final int size = this.schedulingOverheadsInNs.size();

		long schedulingOverheadInNs = 0;
		for (int i = size / 2; i < size; i++) {
			final Long iterationOverhead = this.schedulingOverheadsInNs.get(i);
			schedulingOverheadInNs += iterationOverhead;
		}

		return schedulingOverheadInNs;
	}
}