Skip to content
Snippets Groups Projects
Commit 0d73dd20 authored by Nelson Tavares de Sousa's avatar Nelson Tavares de Sousa
Browse files

New concept to enable multithreading in nested stages

parent f2392748
No related branches found
No related tags found
No related merge requests found
......@@ -23,8 +23,33 @@ package teetime.framework;
*
*
*/
public abstract class AbstractCompositeStage extends AnalysisConfiguration {
public abstract class AbstractCompositeStage extends Network {
protected abstract Stage getFirstStage();
private final AnalysisConfiguration context;
public abstract Stage getFirstStage();
public AbstractCompositeStage(final AnalysisConfiguration context) {
this.context = context;
}
@Override
protected <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
context.connectPorts(sourcePort, targetPort, capacity);
}
@Override
protected <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
connectPorts(sourcePort, targetPort, 4);
}
@Override
protected void addThreadableStage(final Stage stage) {
context.addThreadableStage(stage);
}
AnalysisConfiguration getContext() {
return context;
}
}
......@@ -29,7 +29,7 @@ import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
* Represents a configuration of connected stages, which is needed to run a analysis.
* Stages can be added by executing {@link #addThreadableStage(Stage)}.
*/
public abstract class AnalysisConfiguration {
public abstract class AnalysisConfiguration extends Network {
private final Set<Stage> threadableStages = new HashSet<Stage>();
......@@ -59,23 +59,11 @@ public abstract class AnalysisConfiguration {
* @param stage
* A arbitrary stage, which will be added to the configuration and executed in a thread.
*/
@Override
protected final void addThreadableStage(final Stage stage) {
this.threadableStages.add(stage);
}
/**
* Execute this method, to add a CompositeStage to the configuration, which should be executed in a own thread.
*
* @param stage
* A arbitrary CompositeStage, which will be added to the configuration and executed in a thread.
*/
protected final void addThreadableStage(final AbstractCompositeStage stage) {
this.threadableStages.add(stage.getFirstStage());
for (Stage threadableStage : stage.getThreadableStages()) {
this.addThreadableStage(threadableStage);
}
}
/**
* Connects two stages with a pipe within the same thread.
*
......@@ -185,6 +173,7 @@ public abstract class AnalysisConfiguration {
* @param <T>
* the type of elements to be sent
*/
@Override
protected final <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
connectPorts(sourcePort, targetPort, 4);
}
......@@ -201,6 +190,7 @@ public abstract class AnalysisConfiguration {
* @param <T>
* the type of elements to be sent
*/
@Override
protected final <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
new InstantiationPipe(sourcePort, targetPort, capacity);
}
......
/**
* Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://teetime.sourceforge.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package teetime.framework;
import java.util.HashMap;
......
package teetime.framework;
public abstract class Network {
protected abstract <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity);
protected abstract <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort);
protected abstract void addThreadableStage(final Stage stage);
}
......@@ -19,6 +19,7 @@ import java.util.ArrayList;
import java.util.List;
import teetime.framework.AbstractCompositeStage;
import teetime.framework.AnalysisConfiguration;
import teetime.framework.InputPort;
import teetime.framework.OutputPort;
import teetime.framework.Stage;
......@@ -31,7 +32,8 @@ public final class EveryXthPrinter<T> extends AbstractCompositeStage {
private final Distributor<T> distributor;
private final List<Stage> lastStages = new ArrayList<Stage>();
public EveryXthPrinter(final int threshold) {
public EveryXthPrinter(final int threshold, final AnalysisConfiguration context) {
super(context);
distributor = new Distributor<T>(new CopyByReferenceStrategy());
EveryXthStage<T> everyXthStage = new EveryXthStage<T>(threshold);
Printer<Integer> printer = new Printer<Integer>();
......@@ -51,7 +53,7 @@ public final class EveryXthPrinter<T> extends AbstractCompositeStage {
}
@Override
protected Stage getFirstStage() {
public Stage getFirstStage() {
return distributor;
}
......
......@@ -18,6 +18,7 @@ package teetime.stage.string;
import java.util.ArrayList;
import teetime.framework.AbstractCompositeStage;
import teetime.framework.AnalysisConfiguration;
import teetime.framework.InputPort;
import teetime.framework.OutputPort;
import teetime.framework.Stage;
......@@ -41,7 +42,8 @@ public final class WordCounter extends AbstractCompositeStage {
private final ArrayList<Stage> lastStages = new ArrayList<Stage>();
// The connection of the different stages is realized within the construction of a instance of this class.
public WordCounter() {
public WordCounter(final AnalysisConfiguration context) {
super(context);
this.lastStages.add(this.mapCounter);
final ToLowerCase toLowerCase = new ToLowerCase();
......@@ -51,7 +53,7 @@ public final class WordCounter extends AbstractCompositeStage {
}
@Override
protected Stage getFirstStage() {
public Stage getFirstStage() {
return this.tokenizer;
}
......
......@@ -77,13 +77,13 @@ public class TraversorTest {
// Middle part... multiple instances of WordCounter are created and connected to the merger and distrubuter stages
for (int i = 0; i < threads; i++) {
// final InputPortSizePrinter<String> inputPortSizePrinter = new InputPortSizePrinter<String>();
final WordCounter wc = new WordCounter();
final WordCounter wc = new WordCounter(this);
// intraFact.create(inputPortSizePrinter.getOutputPort(), wc.getInputPort());
connectPorts(distributor.getNewOutputPort(), wc.getInputPort());
connectPorts(wc.getOutputPort(), merger.getNewInputPort());
// Add WordCounter as a threadable stage, so it runs in its own thread
addThreadableStage(wc);
addThreadableStage(wc.getFirstStage());
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment