diff --git a/src/main/java/teetime/stage/io/File2Lines.java b/src/main/java/teetime/stage/io/File2Lines.java new file mode 100644 index 0000000000000000000000000000000000000000..e19f9c6dc6bdbee14a2308344f8d04dead46d2b6 --- /dev/null +++ b/src/main/java/teetime/stage/io/File2Lines.java @@ -0,0 +1,77 @@ +/** + * Copyright (C) 2015 TeeTime (http://teetime.sourceforge.net) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package teetime.stage.io; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStreamReader; + +import teetime.framework.AbstractConsumerStage; +import teetime.framework.OutputPort; + +/** + * @author Christian Wulf + * + */ +public final class File2Lines extends AbstractConsumerStage<File> { + + private final OutputPort<String> outputPort = this.createOutputPort(); + + private String charset = "UTF-8"; + + @Override + protected void execute(final File textFile) { + BufferedReader reader = null; + try { + reader = new BufferedReader(new InputStreamReader(new FileInputStream(textFile), this.charset)); + String line; + while ((line = reader.readLine()) != null) { + line = line.trim(); + if (line.length() != 0) { + outputPort.send(line); + } // else: ignore empty line + } + } catch (final FileNotFoundException e) { + this.logger.error("", e); + } catch (final IOException e) { + this.logger.error("", e); + } finally { + try { + if (reader != null) { + reader.close(); + } + } catch (final IOException e) { + this.logger.warn("", e); + } + } + } + + public String getCharset() { + return this.charset; + } + + public void setCharset(final String charset) { + this.charset = charset; + } + + public OutputPort<String> getOutputPort() { + return outputPort; + } + +} diff --git a/src/site/markdown/wiki b/src/site/markdown/wiki index 0e4474577e1f49bc96e734c286b2d9e0363895e8..63ccbbc87bd2c0e6599ca91502149dba3cfb99de 160000 --- a/src/site/markdown/wiki +++ b/src/site/markdown/wiki @@ -1 +1 @@ -Subproject commit 0e4474577e1f49bc96e734c286b2d9e0363895e8 +Subproject commit 63ccbbc87bd2c0e6599ca91502149dba3cfb99de diff --git a/src/test/java/teetime/stage/WordCountingConfiguration.java b/src/test/java/teetime/stage/WordCountingConfiguration.java index 31537566379ed6afb5bcd27fef03f9dd83388a2f..facd6c7dd22d93bf9b092dffedb89d520f553a04 100644 --- a/src/test/java/teetime/stage/WordCountingConfiguration.java +++ b/src/test/java/teetime/stage/WordCountingConfiguration.java @@ -23,7 +23,7 @@ import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; import teetime.stage.basic.distributor.Distributor; import teetime.stage.basic.merger.Merger; -import teetime.stage.io.File2ByteArray; +import teetime.stage.io.File2Lines; import teetime.stage.string.WordCounter; import teetime.stage.util.CountingMap; @@ -48,8 +48,7 @@ public class WordCountingConfiguration extends AnalysisConfiguration { public WordCountingConfiguration(final int threads, final File... input) { // First part of the config final InitialElementProducer<File> init = new InitialElementProducer<File>(input); - final File2ByteArray f2b = new File2ByteArray(); - final ByteArray2String b2s = new ByteArray2String(); + final File2Lines f2b = new File2Lines(); final Distributor<String> dist = new Distributor<String>(); // last part @@ -62,14 +61,13 @@ public class WordCountingConfiguration extends AnalysisConfiguration { // Connecting the stages of the first part of the config intraFact.create(init.getOutputPort(), f2b.getInputPort()); - intraFact.create(f2b.getOutputPort(), b2s.getInputPort()); - intraFact.create(b2s.getOutputPort(), dist.getInputPort()); + intraFact.create(f2b.getOutputPort(), dist.getInputPort()); // Middle part... multiple instances of WordCounter are created and connected to the merger and distrubuter stages WordCounter wc; for (int i = 0; i < threads; i++) { wc = new WordCounter(); - interFact.create(dist.getNewOutputPort(), wc.getInputPort()); + interFact.create(dist.getNewOutputPort(), wc.getInputPort(), 10000); interFact.create(wc.getOutputPort(), merger.getNewInputPort()); // Add WordCounter as a threadable stage, so it runs in its own thread addThreadableStage(wc); diff --git a/src/test/java/teetime/stage/WordCountingTest.java b/src/test/java/teetime/stage/WordCountingTest.java index 6fec33e68ea191bc3ee82bdf22046e0aea285104..c8cc60408b0889334843a8180a6483cd0f2350fb 100644 --- a/src/test/java/teetime/stage/WordCountingTest.java +++ b/src/test/java/teetime/stage/WordCountingTest.java @@ -25,36 +25,29 @@ import org.junit.Test; import teetime.framework.Analysis; import teetime.stage.util.CountingMap; -import com.google.common.base.CharMatcher; -import com.google.common.base.Charsets; -import com.google.common.base.Splitter; -import com.google.common.collect.HashMultiset; -import com.google.common.collect.Multiset; -import com.google.common.io.Files; - public class WordCountingTest { - private static final File testFile = new File("src/test/resources/data/output.txt"); + private static final File testFile = new File("src/test/resources/data/consoleText.txt-aa"); @Test public void test1() throws IOException { - int threads = 1; - WordCountingConfiguration wcc = new WordCountingConfiguration(threads, testFile, testFile); + int threads = 4; + WordCountingConfiguration wcc = new WordCountingConfiguration(threads, testFile); Analysis analysis = new Analysis(wcc); analysis.start(); CountingMap<String> map = wcc.getResult(); - assertEquals(new Integer(wordOccurrences(testFile).count(new String("diam")) * 2), map.get("diam")); - assertEquals(new Integer(wordOccurrences(testFile).count(new String("tation")) * 2), map.get("tation")); - assertEquals(new Integer(wordOccurrences(testFile).count(new String("cum")) * 2), map.get("cum")); - } - - private Multiset<String> wordOccurrences(final File file) throws IOException { - return HashMultiset.create( - Splitter.on(CharMatcher.WHITESPACE) - .trimResults() - .omitEmptyStrings() - .split(Files.asCharSource(testFile, Charsets.UTF_8).read())); + assertEquals(new Integer(525059), map.get("rsa")); + // assertEquals(new Integer(wordOccurrences(testFile).count(new String("tation"))), map.get("tation")); + // assertEquals(new Integer(wordOccurrences(testFile).count(new String("cum"))), map.get("cum")); } + // + // private Multiset<String> wordOccurrences(final File file) throws IOException { + // return HashMultiset.create( + // Splitter.on(CharMatcher.WHITESPACE) + // .trimResults() + // .omitEmptyStrings() + // .split(Files.asCharSource(testFile, Charsets.UTF_8).read())); + // } }