Skip to content
Snippets Groups Projects
QueuedCountWordsAnalysis.java 5.28 KiB
Newer Older
Christian Wulf's avatar
Christian Wulf committed
/***************************************************************************
 * Copyright 2014 Kieker Project (http://kieker-monitoring.net)
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 ***************************************************************************/

package teetime.examples.countWords;

import java.io.File;
import java.util.LinkedList;
import java.util.List;

import teetime.framework.concurrent.StageTerminationPolicy;
import teetime.framework.concurrent.WorkerThread;
import teetime.framework.core.AbstractFilter;
import teetime.framework.core.Analysis;
import teetime.framework.core.IInputPort.PortState;
Christian Wulf's avatar
Christian Wulf committed
import teetime.framework.core.IPipeline;
import teetime.framework.core.IStage;
import teetime.framework.core.Pipeline;
Christian Wulf's avatar
Christian Wulf committed
import teetime.framework.sequential.MethodCallPipe;
import teetime.framework.sequential.QueuePipe;
import teetime.stage.basic.RepeaterSource;
import teetime.stage.basic.distributor.Distributor;
import teetime.stage.basic.merger.Merger;
import teetime.util.Pair;

/**
 * @author Christian Wulf
Christian Wulf's avatar
Christian Wulf committed
 * @since 1.10
 */
public class QueuedCountWordsAnalysis extends Analysis {

	private static final int NUM_TOKENS_TO_REPEAT = 1000;
	private static final String START_DIRECTORY_NAME = ".";
	private static final int SECONDS = 1000;

	private WorkerThread workerThread;

	@Override
	public void init() {
		super.init();

		final IPipeline pipeline = this.buildNonIoPipeline();

		this.workerThread = new WorkerThread(pipeline, 0);
	}

	@Override
	public void start() {
		super.start();

		this.workerThread.terminate(StageTerminationPolicy.TERMINATE_STAGE_AFTER_UNSUCCESSFUL_EXECUTION);

		this.workerThread.start();
		try {
			this.workerThread.join(60 * SECONDS);
		} catch (final InterruptedException e) {
			e.printStackTrace();
		}
	}

	private IPipeline buildNonIoPipeline() {
		// create stages
		final RepeaterSource<String> repeaterSource = RepeaterSource.create(START_DIRECTORY_NAME, NUM_TOKENS_TO_REPEAT);
		final DirectoryName2Files findFilesStage = new DirectoryName2Files();
		final Distributor<File> distributor = new Distributor<File>();
		final CountWordsStage countWordsStage0 = new CountWordsStage();
		final CountWordsStage countWordsStage1 = new CountWordsStage();
		final Merger<Pair<File, Integer>> merger = new Merger<Pair<File, Integer>>();
		final OutputWordsCountSink outputWordsCountStage = new OutputWordsCountSink();

		// add each stage to a stage list
		final List<IStage> startStages = new LinkedList<IStage>();
		startStages.add(repeaterSource);

Christian Wulf's avatar
Christian Wulf committed
		final List<IStage> stages = new LinkedList<IStage>();
		stages.add(repeaterSource);
		stages.add(findFilesStage);
		stages.add(distributor);
		stages.add(countWordsStage0);
		stages.add(countWordsStage1);
		stages.add(merger);
		stages.add(outputWordsCountStage);

		// connect stages by pipes
		QueuePipe.connect(repeaterSource.OUTPUT, findFilesStage.DIRECTORY_NAME);
		QueuePipe.connect(findFilesStage.fileOutputPort, distributor.genericInputPort);
		QueuePipe.connect(distributor.getNewOutputPort(), countWordsStage0.FILE);
		QueuePipe.connect(distributor.getNewOutputPort(), countWordsStage1.FILE);
		QueuePipe.connect(countWordsStage0.WORDSCOUNT, merger.getNewInputPort());
		QueuePipe.connect(countWordsStage1.WORDSCOUNT, merger.getNewInputPort());
		QueuePipe.connect(merger.outputPort, outputWordsCountStage.fileWordcountTupleInputPort);

		repeaterSource.START.setAssociatedPipe(new MethodCallPipe<Boolean>(Boolean.TRUE));
		repeaterSource.START.setState(PortState.CLOSED);
Christian Wulf's avatar
Christian Wulf committed

		final Pipeline pipeline = new Pipeline();
		pipeline.setStartStages(startStages);
		pipeline.setStages(stages);
Christian Wulf's avatar
Christian Wulf committed
		return pipeline;
	}

	WorkerThread getWorkerThread() {
		return this.workerThread;
	}

	public static void main(final String[] args) {
		final QueuedCountWordsAnalysis analysis = new QueuedCountWordsAnalysis();
		analysis.init();
		final long start = System.currentTimeMillis();
		analysis.start();
		final long end = System.currentTimeMillis();
		// analysis.terminate();
		final long duration = end - start;
		System.out.println("duration: " + duration + " ms"); // NOPMD (Just for example purposes)

		final IPipeline pipeline = analysis.workerThread.getPipeline();

		for (final IStage stage : pipeline.getStages()) {
			if (stage instanceof AbstractFilter<?>) {
//				System.out.println(stage.getClass().getName() + ": " + ((AbstractFilter<?>) stage).getOverallDurationInNs()); // NOPMD (Just for example purposes)
Christian Wulf's avatar
Christian Wulf committed
			}
		}

		final DirectoryName2Files findFilesStage = (DirectoryName2Files) pipeline.getStages().get(1);
		System.out.println("findFilesStage: " + findFilesStage.getNumFiles()); // NOPMD (Just for example purposes)

		final OutputWordsCountSink outputWordsCountStage = (OutputWordsCountSink) pipeline.getStages().get(6);
		System.out.println("outputWordsCountStage: " + outputWordsCountStage.getNumFiles()); // NOPMD (Just for example purposes)
	}
}