Newer
Older
/***************************************************************************
* Copyright 2014 Kieker Project (http://kieker-monitoring.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
***************************************************************************/
package teetime.examples.countWords;
import java.io.File;
import java.util.LinkedList;
import java.util.List;
import teetime.framework.concurrent.StageTerminationPolicy;
import teetime.framework.concurrent.WorkerThread;
import teetime.framework.core.AbstractFilter;
import teetime.framework.core.Analysis;
import teetime.framework.core.IInputPort.PortState;
import teetime.framework.core.IPipeline;
import teetime.framework.core.IStage;
import teetime.framework.sequential.MethodCallPipe;
import teetime.framework.sequential.QueuePipe;
import teetime.stage.basic.RepeaterSource;
import teetime.stage.basic.distributor.Distributor;
import teetime.stage.basic.merger.Merger;
import teetime.util.Pair;
/**
* @author Christian Wulf
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
* @since 1.10
*/
public class QueuedCountWordsAnalysis extends Analysis {
private static final int NUM_TOKENS_TO_REPEAT = 1000;
private static final String START_DIRECTORY_NAME = ".";
private static final int SECONDS = 1000;
private WorkerThread workerThread;
@Override
public void init() {
super.init();
final IPipeline pipeline = this.buildNonIoPipeline();
this.workerThread = new WorkerThread(pipeline, 0);
}
@Override
public void start() {
super.start();
this.workerThread.terminate(StageTerminationPolicy.TERMINATE_STAGE_AFTER_UNSUCCESSFUL_EXECUTION);
this.workerThread.start();
try {
this.workerThread.join(60 * SECONDS);
} catch (final InterruptedException e) {
e.printStackTrace();
}
}
private IPipeline buildNonIoPipeline() {
// create stages
final RepeaterSource<String> repeaterSource = RepeaterSource.create(START_DIRECTORY_NAME, NUM_TOKENS_TO_REPEAT);
final DirectoryName2Files findFilesStage = new DirectoryName2Files();
final Distributor<File> distributor = new Distributor<File>();
final CountWordsStage countWordsStage0 = new CountWordsStage();
final CountWordsStage countWordsStage1 = new CountWordsStage();
final Merger<Pair<File, Integer>> merger = new Merger<Pair<File, Integer>>();
final OutputWordsCountSink outputWordsCountStage = new OutputWordsCountSink();
// add each stage to a stage list
final List<IStage> startStages = new LinkedList<IStage>();
startStages.add(repeaterSource);
final List<IStage> stages = new LinkedList<IStage>();
stages.add(repeaterSource);
stages.add(findFilesStage);
stages.add(distributor);
stages.add(countWordsStage0);
stages.add(countWordsStage1);
stages.add(merger);
stages.add(outputWordsCountStage);
// connect stages by pipes
QueuePipe.connect(repeaterSource.OUTPUT, findFilesStage.DIRECTORY_NAME);
QueuePipe.connect(findFilesStage.fileOutputPort, distributor.genericInputPort);
QueuePipe.connect(distributor.getNewOutputPort(), countWordsStage0.FILE);
QueuePipe.connect(distributor.getNewOutputPort(), countWordsStage1.FILE);
QueuePipe.connect(countWordsStage0.WORDSCOUNT, merger.getNewInputPort());
QueuePipe.connect(countWordsStage1.WORDSCOUNT, merger.getNewInputPort());
QueuePipe.connect(merger.outputPort, outputWordsCountStage.fileWordcountTupleInputPort);
repeaterSource.START.setAssociatedPipe(new MethodCallPipe<Boolean>(Boolean.TRUE));
repeaterSource.START.setState(PortState.CLOSED);
final Pipeline pipeline = new Pipeline();
pipeline.setStartStages(startStages);
pipeline.setStages(stages);
return pipeline;
}
WorkerThread getWorkerThread() {
return this.workerThread;
}
public static void main(final String[] args) {
final QueuedCountWordsAnalysis analysis = new QueuedCountWordsAnalysis();
analysis.init();
final long start = System.currentTimeMillis();
analysis.start();
final long end = System.currentTimeMillis();
// analysis.terminate();
final long duration = end - start;
System.out.println("duration: " + duration + " ms"); // NOPMD (Just for example purposes)
final IPipeline pipeline = analysis.workerThread.getPipeline();
for (final IStage stage : pipeline.getStages()) {
if (stage instanceof AbstractFilter<?>) {
// System.out.println(stage.getClass().getName() + ": " + ((AbstractFilter<?>) stage).getOverallDurationInNs()); // NOPMD (Just for example purposes)
}
}
final DirectoryName2Files findFilesStage = (DirectoryName2Files) pipeline.getStages().get(1);
System.out.println("findFilesStage: " + findFilesStage.getNumFiles()); // NOPMD (Just for example purposes)
final OutputWordsCountSink outputWordsCountStage = (OutputWordsCountSink) pipeline.getStages().get(6);
System.out.println("outputWordsCountStage: " + outputWordsCountStage.getNumFiles()); // NOPMD (Just for example purposes)
}
}