Skip to content
Snippets Groups Projects
Commit b5b24312 authored by Nelson Tavares de Sousa's avatar Nelson Tavares de Sousa
Browse files

modified test and added bigger testfile

parent 79402eb2
No related branches found
No related tags found
No related merge requests found
/**
* Copyright (C) 2015 TeeTime (http://teetime.sourceforge.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package teetime.stage.io;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import teetime.framework.AbstractConsumerStage;
import teetime.framework.OutputPort;
/**
* @author Christian Wulf
*
*/
public final class File2Lines extends AbstractConsumerStage<File> {
private final OutputPort<String> outputPort = this.createOutputPort();
private String charset = "UTF-8";
@Override
protected void execute(final File textFile) {
BufferedReader reader = null;
try {
reader = new BufferedReader(new InputStreamReader(new FileInputStream(textFile), this.charset));
String line;
while ((line = reader.readLine()) != null) {
line = line.trim();
if (line.length() != 0) {
outputPort.send(line);
} // else: ignore empty line
}
} catch (final FileNotFoundException e) {
this.logger.error("", e);
} catch (final IOException e) {
this.logger.error("", e);
} finally {
try {
if (reader != null) {
reader.close();
}
} catch (final IOException e) {
this.logger.warn("", e);
}
}
}
public String getCharset() {
return this.charset;
}
public void setCharset(final String charset) {
this.charset = charset;
}
public OutputPort<String> getOutputPort() {
return outputPort;
}
}
wiki @ 63ccbbc8
Subproject commit 0e4474577e1f49bc96e734c286b2d9e0363895e8 Subproject commit 63ccbbc87bd2c0e6599ca91502149dba3cfb99de
...@@ -23,7 +23,7 @@ import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; ...@@ -23,7 +23,7 @@ import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
import teetime.stage.basic.distributor.Distributor; import teetime.stage.basic.distributor.Distributor;
import teetime.stage.basic.merger.Merger; import teetime.stage.basic.merger.Merger;
import teetime.stage.io.File2ByteArray; import teetime.stage.io.File2Lines;
import teetime.stage.string.WordCounter; import teetime.stage.string.WordCounter;
import teetime.stage.util.CountingMap; import teetime.stage.util.CountingMap;
...@@ -48,8 +48,7 @@ public class WordCountingConfiguration extends AnalysisConfiguration { ...@@ -48,8 +48,7 @@ public class WordCountingConfiguration extends AnalysisConfiguration {
public WordCountingConfiguration(final int threads, final File... input) { public WordCountingConfiguration(final int threads, final File... input) {
// First part of the config // First part of the config
final InitialElementProducer<File> init = new InitialElementProducer<File>(input); final InitialElementProducer<File> init = new InitialElementProducer<File>(input);
final File2ByteArray f2b = new File2ByteArray(); final File2Lines f2b = new File2Lines();
final ByteArray2String b2s = new ByteArray2String();
final Distributor<String> dist = new Distributor<String>(); final Distributor<String> dist = new Distributor<String>();
// last part // last part
...@@ -62,14 +61,13 @@ public class WordCountingConfiguration extends AnalysisConfiguration { ...@@ -62,14 +61,13 @@ public class WordCountingConfiguration extends AnalysisConfiguration {
// Connecting the stages of the first part of the config // Connecting the stages of the first part of the config
intraFact.create(init.getOutputPort(), f2b.getInputPort()); intraFact.create(init.getOutputPort(), f2b.getInputPort());
intraFact.create(f2b.getOutputPort(), b2s.getInputPort()); intraFact.create(f2b.getOutputPort(), dist.getInputPort());
intraFact.create(b2s.getOutputPort(), dist.getInputPort());
// Middle part... multiple instances of WordCounter are created and connected to the merger and distrubuter stages // Middle part... multiple instances of WordCounter are created and connected to the merger and distrubuter stages
WordCounter wc; WordCounter wc;
for (int i = 0; i < threads; i++) { for (int i = 0; i < threads; i++) {
wc = new WordCounter(); wc = new WordCounter();
interFact.create(dist.getNewOutputPort(), wc.getInputPort()); interFact.create(dist.getNewOutputPort(), wc.getInputPort(), 10000);
interFact.create(wc.getOutputPort(), merger.getNewInputPort()); interFact.create(wc.getOutputPort(), merger.getNewInputPort());
// Add WordCounter as a threadable stage, so it runs in its own thread // Add WordCounter as a threadable stage, so it runs in its own thread
addThreadableStage(wc); addThreadableStage(wc);
......
...@@ -25,36 +25,29 @@ import org.junit.Test; ...@@ -25,36 +25,29 @@ import org.junit.Test;
import teetime.framework.Analysis; import teetime.framework.Analysis;
import teetime.stage.util.CountingMap; import teetime.stage.util.CountingMap;
import com.google.common.base.CharMatcher;
import com.google.common.base.Charsets;
import com.google.common.base.Splitter;
import com.google.common.collect.HashMultiset;
import com.google.common.collect.Multiset;
import com.google.common.io.Files;
public class WordCountingTest { public class WordCountingTest {
private static final File testFile = new File("src/test/resources/data/output.txt"); private static final File testFile = new File("src/test/resources/data/consoleText.txt-aa");
@Test @Test
public void test1() throws IOException { public void test1() throws IOException {
int threads = 1; int threads = 4;
WordCountingConfiguration wcc = new WordCountingConfiguration(threads, testFile, testFile); WordCountingConfiguration wcc = new WordCountingConfiguration(threads, testFile);
Analysis analysis = new Analysis(wcc); Analysis analysis = new Analysis(wcc);
analysis.start(); analysis.start();
CountingMap<String> map = wcc.getResult(); CountingMap<String> map = wcc.getResult();
assertEquals(new Integer(wordOccurrences(testFile).count(new String("diam")) * 2), map.get("diam")); assertEquals(new Integer(525059), map.get("rsa"));
assertEquals(new Integer(wordOccurrences(testFile).count(new String("tation")) * 2), map.get("tation")); // assertEquals(new Integer(wordOccurrences(testFile).count(new String("tation"))), map.get("tation"));
assertEquals(new Integer(wordOccurrences(testFile).count(new String("cum")) * 2), map.get("cum")); // assertEquals(new Integer(wordOccurrences(testFile).count(new String("cum"))), map.get("cum"));
}
private Multiset<String> wordOccurrences(final File file) throws IOException {
return HashMultiset.create(
Splitter.on(CharMatcher.WHITESPACE)
.trimResults()
.omitEmptyStrings()
.split(Files.asCharSource(testFile, Charsets.UTF_8).read()));
} }
//
// private Multiset<String> wordOccurrences(final File file) throws IOException {
// return HashMultiset.create(
// Splitter.on(CharMatcher.WHITESPACE)
// .trimResults()
// .omitEmptyStrings()
// .split(Files.asCharSource(testFile, Charsets.UTF_8).read()));
// }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment