Skip to content
Snippets Groups Projects
Commit 6532808b authored by Christian Wulf's avatar Christian Wulf
Browse files

Merge remote-tracking branch 'origin/master' into dynamic-distributor

parents ccbe4897 6b4f4788
No related branches found
No related tags found
No related merge requests found
Showing
with 216 additions and 137 deletions
#FindBugs User Preferences #FindBugs User Preferences
#Thu Jun 18 09:21:56 CEST 2015 #Mon Jun 22 16:34:51 CEST 2015
detector_threshold=2 detector_threshold=2
effort=max effort=max
excludefilter0=.fbExcludeFilterFile|true excludefilter0=.fbExcludeFilterFile|true
......
...@@ -4,7 +4,7 @@ ...@@ -4,7 +4,7 @@
<groupId>net.sourceforge.teetime</groupId> <groupId>net.sourceforge.teetime</groupId>
<artifactId>teetime</artifactId> <artifactId>teetime</artifactId>
<version>1.2-SNAPSHOT</version> <version>2.0-SNAPSHOT</version>
<packaging>jar</packaging> <packaging>jar</packaging>
<name>TeeTime</name> <name>TeeTime</name>
......
...@@ -11,10 +11,21 @@ ...@@ -11,10 +11,21 @@
TeeTime automatically TeeTime automatically
chooses the correct type of pipe for all connections. chooses the correct type of pipe for all connections.
</action> </action>
<action dev="ntd" type="add">
Stages without any input port are
automatically executed in a dedicated thread.
</action>
<action dev="ntd" type="fix" issue="93"> <action dev="ntd" type="fix" issue="93">
Introduced a new concept Introduced a new concept
for composing stages. for composing stages.
</action> </action>
<action dev="ntd" type="add" issue="171">
Configurations are now
built within an AnalysisContext which is passed on to nested
CompositeStages.
This removes any constraints on CompositeStages and
enables therefore multiple connections and multithreading.
</action>
<action dev="ntd" type="remove"> <action dev="ntd" type="remove">
Marked Pair class as deprecated. Marked Pair class as deprecated.
</action> </action>
......
...@@ -23,8 +23,34 @@ package teetime.framework; ...@@ -23,8 +23,34 @@ package teetime.framework;
* *
* *
*/ */
public abstract class AbstractCompositeStage extends AnalysisConfiguration { public abstract class AbstractCompositeStage extends Configuration {
protected abstract Stage getFirstStage(); private final ConfigurationContext context;
public AbstractCompositeStage(final ConfigurationContext context) {
if (null == context) {
throw new IllegalArgumentException("Context may not be null.");
}
this.context = context;
}
@Override
protected <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
context.connectPorts(sourcePort, targetPort, capacity);
}
@Override
protected <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
connectPorts(sourcePort, targetPort, 4);
}
@Override
protected void addThreadableStage(final Stage stage) {
context.addThreadableStage(stage);
}
protected ConfigurationContext getContext() {
return context;
}
} }
package teetime.framework;
public abstract class Configuration {
protected abstract <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity);
protected abstract <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort);
protected abstract void addThreadableStage(final Stage stage);
}
...@@ -18,6 +18,9 @@ package teetime.framework; ...@@ -18,6 +18,9 @@ package teetime.framework;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import teetime.framework.pipe.IPipe; import teetime.framework.pipe.IPipe;
import teetime.framework.pipe.IPipeFactory; import teetime.framework.pipe.IPipeFactory;
import teetime.framework.pipe.InstantiationPipe; import teetime.framework.pipe.InstantiationPipe;
...@@ -29,7 +32,9 @@ import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; ...@@ -29,7 +32,9 @@ import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
* Represents a configuration of connected stages, which is needed to run a analysis. * Represents a configuration of connected stages, which is needed to run a analysis.
* Stages can be added by executing {@link #addThreadableStage(Stage)}. * Stages can be added by executing {@link #addThreadableStage(Stage)}.
*/ */
public abstract class AnalysisConfiguration { public abstract class ConfigurationContext extends Configuration {
private static final Logger LOGGER = LoggerFactory.getLogger(ConfigurationContext.class);
private final Set<Stage> threadableStages = new HashSet<Stage>(); private final Set<Stage> threadableStages = new HashSet<Stage>();
...@@ -59,20 +64,10 @@ public abstract class AnalysisConfiguration { ...@@ -59,20 +64,10 @@ public abstract class AnalysisConfiguration {
* @param stage * @param stage
* A arbitrary stage, which will be added to the configuration and executed in a thread. * A arbitrary stage, which will be added to the configuration and executed in a thread.
*/ */
@Override
protected final void addThreadableStage(final Stage stage) { protected final void addThreadableStage(final Stage stage) {
this.threadableStages.add(stage); if (!this.threadableStages.add(stage)) {
} LOGGER.warn("Stage " + stage.getId() + " was already marked as threadable stage.");
/**
* Execute this method, to add a CompositeStage to the configuration, which should be executed in a own thread.
*
* @param stage
* A arbitrary CompositeStage, which will be added to the configuration and executed in a thread.
*/
protected final void addThreadableStage(final AbstractCompositeStage stage) {
this.threadableStages.add(stage.getFirstStage());
for (Stage threadableStage : stage.getThreadableStages()) {
this.addThreadableStage(threadableStage);
} }
} }
...@@ -185,6 +180,7 @@ public abstract class AnalysisConfiguration { ...@@ -185,6 +180,7 @@ public abstract class AnalysisConfiguration {
* @param <T> * @param <T>
* the type of elements to be sent * the type of elements to be sent
*/ */
@Override
protected final <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { protected final <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
connectPorts(sourcePort, targetPort, 4); connectPorts(sourcePort, targetPort, 4);
} }
...@@ -201,7 +197,15 @@ public abstract class AnalysisConfiguration { ...@@ -201,7 +197,15 @@ public abstract class AnalysisConfiguration {
* @param <T> * @param <T>
* the type of elements to be sent * the type of elements to be sent
*/ */
@Override
protected final <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { protected final <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
if (sourcePort.getOwningStage().getInputPorts().length == 0 && !threadableStages.contains(sourcePort.getOwningStage())) {
addThreadableStage(sourcePort.getOwningStage());
}
if (sourcePort.getPipe() != null || targetPort.getPipe() != null) {
LOGGER.warn("Overwriting existing pipe while connecting stages " +
sourcePort.getOwningStage().getId() + " and " + targetPort.getOwningStage().getId() + ".");
}
new InstantiationPipe(sourcePort, targetPort, capacity); new InstantiationPipe(sourcePort, targetPort, capacity);
} }
......
...@@ -34,8 +34,8 @@ import teetime.framework.validation.AnalysisNotValidException; ...@@ -34,8 +34,8 @@ import teetime.framework.validation.AnalysisNotValidException;
import teetime.util.Pair; import teetime.util.Pair;
/** /**
* Represents an Analysis to which stages can be added and executed later. * Represents an Execution to which stages can be added and executed later.
* This needs a {@link AnalysisConfiguration}, * This needs a {@link ConfigurationContext},
* in which the adding and configuring of stages takes place. * in which the adding and configuring of stages takes place.
* To start the analysis {@link #executeBlocking()} needs to be executed. * To start the analysis {@link #executeBlocking()} needs to be executed.
* This class will automatically create threads and join them without any further commitment. * This class will automatically create threads and join them without any further commitment.
...@@ -43,11 +43,11 @@ import teetime.util.Pair; ...@@ -43,11 +43,11 @@ import teetime.util.Pair;
* @author Christian Wulf, Nelson Tavares de Sousa * @author Christian Wulf, Nelson Tavares de Sousa
* *
* @param <T> * @param <T>
* the type of the {@link AnalysisConfiguration} * the type of the {@link ConfigurationContext}
*/ */
public final class Analysis<T extends AnalysisConfiguration> implements UncaughtExceptionHandler { public final class Execution<T extends ConfigurationContext> implements UncaughtExceptionHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(Analysis.class); private static final Logger LOGGER = LoggerFactory.getLogger(Execution.class);
private final T configuration; private final T configuration;
...@@ -64,32 +64,32 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught ...@@ -64,32 +64,32 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught
private final List<RunnableProducerStage> producerRunnables = new LinkedList<RunnableProducerStage>(); private final List<RunnableProducerStage> producerRunnables = new LinkedList<RunnableProducerStage>();
/** /**
* Creates a new {@link Analysis} that skips validating the port connections and uses the default listener. * Creates a new {@link Execution} that skips validating the port connections and uses the default listener.
* *
* @param configuration * @param configuration
* to be used for the analysis * to be used for the analysis
*/ */
public Analysis(final T configuration) { public Execution(final T configuration) {
this(configuration, false, new IgnoringExceptionListenerFactory()); this(configuration, false, new IgnoringExceptionListenerFactory());
} }
public Analysis(final T configuration, final boolean validationEnabled) { public Execution(final T configuration, final boolean validationEnabled) {
this(configuration, validationEnabled, new IgnoringExceptionListenerFactory()); this(configuration, validationEnabled, new IgnoringExceptionListenerFactory());
} }
/** /**
* Creates a new {@link Analysis} that skips validating the port connections and uses a specific listener. * Creates a new {@link Execution} that skips validating the port connections and uses a specific listener.
* *
* @param configuration * @param configuration
* to be used for the analysis * to be used for the analysis
* @param factory * @param factory
* specific listener for the exception handling * specific listener for the exception handling
*/ */
public Analysis(final T configuration, final IExceptionListenerFactory factory) { public Execution(final T configuration, final IExceptionListenerFactory factory) {
this(configuration, false, factory); this(configuration, false, factory);
} }
public Analysis(final T configuration, final boolean validationEnabled, final IExceptionListenerFactory factory) { public Execution(final T configuration, final boolean validationEnabled, final IExceptionListenerFactory factory) {
this.configuration = configuration; this.configuration = configuration;
this.factory = factory; this.factory = factory;
if (validationEnabled) { if (validationEnabled) {
...@@ -118,8 +118,8 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught ...@@ -118,8 +118,8 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught
* *
*/ */
private final void init() { private final void init() {
AnalysisInstantiation analysisInstantiation = new AnalysisInstantiation(configuration); ExecutionInstantiation executionInstantiation = new ExecutionInstantiation(configuration);
analysisInstantiation.instantiatePipes(); executionInstantiation.instantiatePipes();
final Set<Stage> threadableStageJobs = this.configuration.getThreadableStages(); final Set<Stage> threadableStageJobs = this.configuration.getThreadableStages();
if (threadableStageJobs.isEmpty()) { if (threadableStageJobs.isEmpty()) {
...@@ -194,7 +194,7 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught ...@@ -194,7 +194,7 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught
/** /**
* Calling this method will block the current thread, until the analysis terminates. * Calling this method will block the current thread, until the analysis terminates.
* *
* @throws AnalysisException * @throws ExecutionException
* if at least one exception in one thread has occurred within the analysis. The exception contains the pairs of thread and throwable * if at least one exception in one thread has occurred within the analysis. The exception contains the pairs of thread and throwable
* *
* @since 1.1 * @since 1.1
...@@ -209,7 +209,7 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught ...@@ -209,7 +209,7 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught
thread.join(); thread.join();
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
LOGGER.error("Analysis has stopped unexpectedly", e); LOGGER.error("Execution has stopped unexpectedly", e);
for (Thread thread : this.finiteProducerThreads) { for (Thread thread : this.finiteProducerThreads) {
thread.interrupt(); thread.interrupt();
} }
...@@ -224,7 +224,7 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught ...@@ -224,7 +224,7 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught
} }
if (!exceptions.isEmpty()) { if (!exceptions.isEmpty()) {
throw new AnalysisException(exceptions); throw new ExecutionException(exceptions);
} }
} }
...@@ -244,9 +244,9 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught ...@@ -244,9 +244,9 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught
} }
/** /**
* This method will start the Analysis and block until it is finished. * This method will start the Execution and block until it is finished.
* *
* @throws AnalysisException * @throws ExecutionException
* if at least one exception in one thread has occurred within the analysis. The exception contains the pairs of thread and throwable. * if at least one exception in one thread has occurred within the analysis. The exception contains the pairs of thread and throwable.
* *
* @since 1.1 * @since 1.1
...@@ -285,9 +285,9 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught ...@@ -285,9 +285,9 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught
} }
/** /**
* Retrieves the Configuration which was used to add and arrange all stages needed for the Analysis * Retrieves the Configuration which was used to add and arrange all stages needed for the Execution
* *
* @return the configuration used for the Analysis * @return the configuration used for the Execution
*/ */
public T getConfiguration() { public T getConfiguration() {
return this.configuration; return this.configuration;
......
...@@ -25,7 +25,7 @@ import teetime.util.Pair; ...@@ -25,7 +25,7 @@ import teetime.util.Pair;
* *
* @since 1.1 * @since 1.1
*/ */
public class AnalysisException extends RuntimeException { public class ExecutionException extends RuntimeException {
/** /**
* *
...@@ -34,7 +34,7 @@ public class AnalysisException extends RuntimeException { ...@@ -34,7 +34,7 @@ public class AnalysisException extends RuntimeException {
private final Collection<Pair<Thread, Throwable>> exceptions; private final Collection<Pair<Thread, Throwable>> exceptions;
public AnalysisException(final Collection<Pair<Thread, Throwable>> exceptions) { public ExecutionException(final Collection<Pair<Thread, Throwable>> exceptions) {
super("Error(s) while running analysis. Check thrown exceptions."); super("Error(s) while running analysis. Check thrown exceptions.");
this.exceptions = exceptions; this.exceptions = exceptions;
} }
......
/**
* Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://teetime.sourceforge.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package teetime.framework; package teetime.framework;
import java.util.HashMap; import java.util.HashMap;
...@@ -13,22 +28,22 @@ import teetime.framework.pipe.SingleElementPipeFactory; ...@@ -13,22 +28,22 @@ import teetime.framework.pipe.SingleElementPipeFactory;
import teetime.framework.pipe.SpScPipeFactory; import teetime.framework.pipe.SpScPipeFactory;
import teetime.framework.pipe.UnboundedSpScPipeFactory; import teetime.framework.pipe.UnboundedSpScPipeFactory;
class AnalysisInstantiation { class ExecutionInstantiation {
private static final Logger LOGGER = LoggerFactory.getLogger(AnalysisInstantiation.class); private static final Logger LOGGER = LoggerFactory.getLogger(ExecutionInstantiation.class);
private final IPipeFactory interBoundedThreadPipeFactory = new SpScPipeFactory(); private final IPipeFactory interBoundedThreadPipeFactory = new SpScPipeFactory();
private final IPipeFactory interUnboundedThreadPipeFactory = new UnboundedSpScPipeFactory(); private final IPipeFactory interUnboundedThreadPipeFactory = new UnboundedSpScPipeFactory();
private final IPipeFactory intraThreadPipeFactory = new SingleElementPipeFactory(); private final IPipeFactory intraThreadPipeFactory = new SingleElementPipeFactory();
private final AnalysisConfiguration configuration; private final ConfigurationContext configuration;
public AnalysisInstantiation(final AnalysisConfiguration configuration) { public ExecutionInstantiation(final ConfigurationContext configuration) {
this.configuration = configuration; this.configuration = configuration;
} }
@SuppressWarnings("rawtypes") @SuppressWarnings("rawtypes")
Integer colorAndConnectStages(final Integer i, final Map<Stage, Integer> colors, final Stage threadableStage, final AnalysisConfiguration configuration) { Integer colorAndConnectStages(final Integer i, final Map<Stage, Integer> colors, final Stage threadableStage, final ConfigurationContext configuration) {
Integer createdConnections = new Integer(0); Integer createdConnections = new Integer(0);
Set<Stage> threadableStageJobs = configuration.getThreadableStages(); Set<Stage> threadableStageJobs = configuration.getThreadableStages();
for (OutputPort outputPort : threadableStage.getOutputPorts()) { for (OutputPort outputPort : threadableStage.getOutputPorts()) {
......
...@@ -21,95 +21,85 @@ import teetime.framework.signal.ISignal; ...@@ -21,95 +21,85 @@ import teetime.framework.signal.ISignal;
public class InstantiationPipe implements IPipe { public class InstantiationPipe implements IPipe {
private final InputPort<?> target; private static final String ERROR_MESSAGE = "This must not be called while executing the configuration";
private final InputPort<?> targetPort;
private final int capacity; private final int capacity;
public <T> InstantiationPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { public <T> InstantiationPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
this.target = targetPort; this.targetPort = targetPort;
this.capacity = capacity; this.capacity = capacity;
sourcePort.setPipe(this); sourcePort.setPipe(this);
targetPort.setPipe(this);
} }
public int getCapacity() { public int getCapacity() {
return capacity; return capacity;
} }
@Override
public InputPort<?> getTargetPort() {
return this.targetPort;
}
@Override @Override
public boolean add(final Object element) { public boolean add(final Object element) {
// TODO Auto-generated method stub throw new IllegalStateException(ERROR_MESSAGE);
return false;
} }
@Override @Override
public boolean addNonBlocking(final Object element) { public boolean addNonBlocking(final Object element) {
// TODO Auto-generated method stub throw new IllegalStateException(ERROR_MESSAGE);
return false;
} }
@Override @Override
public boolean isEmpty() { public boolean isEmpty() {
// TODO Auto-generated method stub throw new IllegalStateException(ERROR_MESSAGE);
return false;
} }
@Override @Override
public int size() { public int size() {
// TODO Auto-generated method stub throw new IllegalStateException(ERROR_MESSAGE);
return 0;
} }
@Override @Override
public Object removeLast() { public Object removeLast() {
// TODO Auto-generated method stub throw new IllegalStateException(ERROR_MESSAGE);
return null;
}
@Override
public InputPort<?> getTargetPort() {
// TODO Auto-generated method stub
return this.target;
} }
@Override @Override
public void sendSignal(final ISignal signal) { public void sendSignal(final ISignal signal) {
// TODO Auto-generated method stub throw new IllegalStateException(ERROR_MESSAGE);
} }
@Override @Override
public void reportNewElement() { public void reportNewElement() {
// TODO Auto-generated method stub throw new IllegalStateException(ERROR_MESSAGE);
} }
@Override @Override
public boolean isClosed() { public boolean isClosed() {
// TODO Auto-generated method stub throw new IllegalStateException(ERROR_MESSAGE);
return false;
} }
@Override @Override
public boolean hasMore() { public boolean hasMore() {
// TODO Auto-generated method stub throw new IllegalStateException(ERROR_MESSAGE);
return false;
} }
@Override @Override
public void waitForStartSignal() throws InterruptedException { public void waitForStartSignal() throws InterruptedException {
// TODO Auto-generated method stub throw new IllegalStateException(ERROR_MESSAGE);
} }
@Override @Override
public void waitForInitializingSignal() throws InterruptedException { public void waitForInitializingSignal() throws InterruptedException {
// TODO Auto-generated method stub throw new IllegalStateException(ERROR_MESSAGE);
} }
@Override @Override
public void close() { public void close() {
// TODO Auto-generated method stub throw new IllegalStateException(ERROR_MESSAGE);
} }
} }
...@@ -19,9 +19,9 @@ import java.util.ArrayList; ...@@ -19,9 +19,9 @@ import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import teetime.framework.Analysis; import teetime.framework.ConfigurationContext;
import teetime.framework.AnalysisConfiguration; import teetime.framework.Execution;
import teetime.framework.AnalysisException; import teetime.framework.ExecutionException;
import teetime.framework.Stage; import teetime.framework.Stage;
import teetime.framework.StageState; import teetime.framework.StageState;
import teetime.stage.CollectorSink; import teetime.stage.CollectorSink;
...@@ -72,24 +72,23 @@ public final class StageTester { ...@@ -72,24 +72,23 @@ public final class StageTester {
/** /**
* This method will start the test and block until it is finished. * This method will start the test and block until it is finished.
* *
* @throws AnalysisException * @throws ExecutionException
* if at least one exception in one thread has occurred within the analysis. * if at least one exception in one thread has occurred within the analysis.
* The exception contains the pairs of thread and throwable. * The exception contains the pairs of thread and throwable.
* *
*/ */
public void start() { public void start() {
final AnalysisConfiguration configuration = new Configuration(inputHolders, stage, outputHolders); final ConfigurationContext configuration = new Configuration(inputHolders, stage, outputHolders);
final Analysis<AnalysisConfiguration> analysis = new Analysis<AnalysisConfiguration>(configuration); final Execution<ConfigurationContext> analysis = new Execution<ConfigurationContext>(configuration);
analysis.executeBlocking(); analysis.executeBlocking();
} }
private final class Configuration extends AnalysisConfiguration { private final class Configuration extends ConfigurationContext {
public Configuration(final List<InputHolder<?>> inputHolders, final Stage stage, final List<OutputHolder<?>> outputHolders) { public Configuration(final List<InputHolder<?>> inputHolders, final Stage stage, final List<OutputHolder<?>> outputHolders) {
for (InputHolder<?> inputHolder : inputHolders) { for (InputHolder<?> inputHolder : inputHolders) {
final InitialElementProducer<Object> producer = new InitialElementProducer<Object>(inputHolder.getInput()); final InitialElementProducer<Object> producer = new InitialElementProducer<Object>(inputHolder.getInput());
connectPorts(producer.getOutputPort(), inputHolder.getPort()); connectPorts(producer.getOutputPort(), inputHolder.getPort());
addThreadableStage(producer);
} }
addThreadableStage(stage); addThreadableStage(stage);
......
...@@ -19,6 +19,7 @@ import java.util.ArrayList; ...@@ -19,6 +19,7 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import teetime.framework.AbstractCompositeStage; import teetime.framework.AbstractCompositeStage;
import teetime.framework.ConfigurationContext;
import teetime.framework.InputPort; import teetime.framework.InputPort;
import teetime.framework.OutputPort; import teetime.framework.OutputPort;
import teetime.framework.Stage; import teetime.framework.Stage;
...@@ -31,7 +32,8 @@ public final class EveryXthPrinter<T> extends AbstractCompositeStage { ...@@ -31,7 +32,8 @@ public final class EveryXthPrinter<T> extends AbstractCompositeStage {
private final Distributor<T> distributor; private final Distributor<T> distributor;
private final List<Stage> lastStages = new ArrayList<Stage>(); private final List<Stage> lastStages = new ArrayList<Stage>();
public EveryXthPrinter(final int threshold) { public EveryXthPrinter(final int threshold, final ConfigurationContext context) {
super(context);
distributor = new Distributor<T>(new CopyByReferenceStrategy()); distributor = new Distributor<T>(new CopyByReferenceStrategy());
EveryXthStage<T> everyXthStage = new EveryXthStage<T>(threshold); EveryXthStage<T> everyXthStage = new EveryXthStage<T>(threshold);
Printer<Integer> printer = new Printer<Integer>(); Printer<Integer> printer = new Printer<Integer>();
...@@ -50,8 +52,7 @@ public final class EveryXthPrinter<T> extends AbstractCompositeStage { ...@@ -50,8 +52,7 @@ public final class EveryXthPrinter<T> extends AbstractCompositeStage {
return distributor.getNewOutputPort(); return distributor.getNewOutputPort();
} }
@Override public Stage getFirstStage() {
protected Stage getFirstStage() {
return distributor; return distributor;
} }
......
...@@ -18,6 +18,7 @@ package teetime.stage.string; ...@@ -18,6 +18,7 @@ package teetime.stage.string;
import java.util.ArrayList; import java.util.ArrayList;
import teetime.framework.AbstractCompositeStage; import teetime.framework.AbstractCompositeStage;
import teetime.framework.ConfigurationContext;
import teetime.framework.InputPort; import teetime.framework.InputPort;
import teetime.framework.OutputPort; import teetime.framework.OutputPort;
import teetime.framework.Stage; import teetime.framework.Stage;
...@@ -41,7 +42,8 @@ public final class WordCounter extends AbstractCompositeStage { ...@@ -41,7 +42,8 @@ public final class WordCounter extends AbstractCompositeStage {
private final ArrayList<Stage> lastStages = new ArrayList<Stage>(); private final ArrayList<Stage> lastStages = new ArrayList<Stage>();
// The connection of the different stages is realized within the construction of a instance of this class. // The connection of the different stages is realized within the construction of a instance of this class.
public WordCounter() { public WordCounter(final ConfigurationContext context) {
super(context);
this.lastStages.add(this.mapCounter); this.lastStages.add(this.mapCounter);
final ToLowerCase toLowerCase = new ToLowerCase(); final ToLowerCase toLowerCase = new ToLowerCase();
...@@ -50,8 +52,7 @@ public final class WordCounter extends AbstractCompositeStage { ...@@ -50,8 +52,7 @@ public final class WordCounter extends AbstractCompositeStage {
// connectStages(wordcharacterFilter.getOutputPort(), this.mapCounter.getInputPort()); // connectStages(wordcharacterFilter.getOutputPort(), this.mapCounter.getInputPort());
} }
@Override public Stage getFirstStage() {
protected Stage getFirstStage() {
return this.tokenizer; return this.tokenizer;
} }
......
...@@ -17,7 +17,7 @@ package teetime.examples.cipher; ...@@ -17,7 +17,7 @@ package teetime.examples.cipher;
import java.io.File; import java.io.File;
import teetime.framework.AnalysisConfiguration; import teetime.framework.ConfigurationContext;
import teetime.stage.CipherStage; import teetime.stage.CipherStage;
import teetime.stage.CipherStage.CipherMode; import teetime.stage.CipherStage.CipherMode;
import teetime.stage.InitialElementProducer; import teetime.stage.InitialElementProducer;
...@@ -26,7 +26,7 @@ import teetime.stage.ZipByteArray.ZipMode; ...@@ -26,7 +26,7 @@ import teetime.stage.ZipByteArray.ZipMode;
import teetime.stage.io.ByteArrayFileWriter; import teetime.stage.io.ByteArrayFileWriter;
import teetime.stage.io.File2ByteArray; import teetime.stage.io.File2ByteArray;
public class CipherConfiguration extends AnalysisConfiguration { public class CipherConfiguration extends ConfigurationContext {
public CipherConfiguration(final String inputFile, final String outputFile, final String password) { public CipherConfiguration(final String inputFile, final String outputFile, final String password) {
final File input = new File(inputFile); final File input = new File(inputFile);
...@@ -47,8 +47,5 @@ public class CipherConfiguration extends AnalysisConfiguration { ...@@ -47,8 +47,5 @@ public class CipherConfiguration extends AnalysisConfiguration {
connectPorts(decomp.getOutputPort(), decrypt.getInputPort()); connectPorts(decomp.getOutputPort(), decrypt.getInputPort());
connectPorts(decrypt.getOutputPort(), writer.getInputPort()); connectPorts(decrypt.getOutputPort(), writer.getInputPort());
// this.getFiniteProducerStages().add(init);
this.addThreadableStage(init);
} }
} }
...@@ -21,8 +21,8 @@ import java.io.IOException; ...@@ -21,8 +21,8 @@ import java.io.IOException;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import teetime.framework.Analysis; import teetime.framework.Execution;
import teetime.framework.AnalysisConfiguration; import teetime.framework.ConfigurationContext;
import com.google.common.io.Files; import com.google.common.io.Files;
...@@ -43,9 +43,9 @@ public class CipherTest { ...@@ -43,9 +43,9 @@ public class CipherTest {
final String outputFile = "src/test/resources/data/output.txt"; final String outputFile = "src/test/resources/data/output.txt";
final String password = "Password"; final String password = "Password";
final AnalysisConfiguration configuration = new CipherConfiguration(inputFile, outputFile, password); final ConfigurationContext configuration = new CipherConfiguration(inputFile, outputFile, password);
final Analysis analysis = new Analysis(configuration); final Execution execution = new Execution(configuration);
analysis.executeBlocking(); execution.executeBlocking();
Assert.assertTrue(Files.equal(new File(inputFile), new File(outputFile))); Assert.assertTrue(Files.equal(new File(inputFile), new File(outputFile)));
} }
......
...@@ -17,7 +17,7 @@ package teetime.examples.tokenizer; ...@@ -17,7 +17,7 @@ package teetime.examples.tokenizer;
import java.io.File; import java.io.File;
import teetime.framework.AnalysisConfiguration; import teetime.framework.ConfigurationContext;
import teetime.stage.ByteArray2String; import teetime.stage.ByteArray2String;
import teetime.stage.CipherStage; import teetime.stage.CipherStage;
import teetime.stage.CipherStage.CipherMode; import teetime.stage.CipherStage.CipherMode;
...@@ -28,7 +28,7 @@ import teetime.stage.ZipByteArray.ZipMode; ...@@ -28,7 +28,7 @@ import teetime.stage.ZipByteArray.ZipMode;
import teetime.stage.io.File2ByteArray; import teetime.stage.io.File2ByteArray;
import teetime.stage.string.Tokenizer; import teetime.stage.string.Tokenizer;
public class TokenizerConfiguration extends AnalysisConfiguration { public class TokenizerConfiguration extends ConfigurationContext {
private final Counter<String> counter; private final Counter<String> counter;
...@@ -50,7 +50,6 @@ public class TokenizerConfiguration extends AnalysisConfiguration { ...@@ -50,7 +50,6 @@ public class TokenizerConfiguration extends AnalysisConfiguration {
connectPorts(b2s.getOutputPort(), tokenizer.getInputPort()); connectPorts(b2s.getOutputPort(), tokenizer.getInputPort());
connectPorts(tokenizer.getOutputPort(), this.counter.getInputPort()); connectPorts(tokenizer.getOutputPort(), this.counter.getInputPort());
this.addThreadableStage(init);
} }
public int getTokenCount() { public int getTokenCount() {
......
...@@ -22,7 +22,7 @@ import java.nio.charset.Charset; ...@@ -22,7 +22,7 @@ import java.nio.charset.Charset;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import teetime.framework.Analysis; import teetime.framework.Execution;
import com.google.common.io.Files; import com.google.common.io.Files;
...@@ -42,8 +42,8 @@ public class TokenizerTest { ...@@ -42,8 +42,8 @@ public class TokenizerTest {
final String password = "Password"; final String password = "Password";
final TokenizerConfiguration configuration = new TokenizerConfiguration(inputFile, password); final TokenizerConfiguration configuration = new TokenizerConfiguration(inputFile, password);
final Analysis analysis = new Analysis(configuration); final Execution execution = new Execution(configuration);
analysis.executeBlocking(); execution.executeBlocking();
final String string = Files.toString(new File("src/test/resources/data/input.txt"), Charset.forName("UTF-8")); final String string = Files.toString(new File("src/test/resources/data/input.txt"), Charset.forName("UTF-8"));
......
...@@ -33,51 +33,50 @@ import teetime.stage.InstanceOfFilter; ...@@ -33,51 +33,50 @@ import teetime.stage.InstanceOfFilter;
import teetime.stage.basic.Sink; import teetime.stage.basic.Sink;
import teetime.util.StopWatch; import teetime.util.StopWatch;
public class AnalysisTest { public class ExecutionTest {
private static final long DELAY_IN_MS = 500; private static final long DELAY_IN_MS = 500;
private static final long ABSOLUTE_MAX_ERROR_IN_MS = 2; private static final long ABSOLUTE_MAX_ERROR_IN_MS = 2;
private Analysis<TestConfig> analysis; private Execution<TestConfig> execution;
@Before @Before
public void before() { public void before() {
TestConfig configuration = new TestConfig(); TestConfig configuration = new TestConfig();
analysis = new Analysis<TestConfig>(configuration); execution = new Execution<TestConfig>(configuration);
} }
@Test @Test
public void testExecuteNonBlocking() throws Exception { public void testExecuteNonBlocking() throws Exception {
StopWatch watch = new StopWatch(); StopWatch watch = new StopWatch();
watch.start(); watch.start();
analysis.executeNonBlocking(); execution.executeNonBlocking();
watch.end(); watch.end();
assertThat(watch.getDurationInMs(), is(lessThan(DELAY_IN_MS))); assertThat(watch.getDurationInMs(), is(lessThan(DELAY_IN_MS)));
assertFalse(analysis.getConfiguration().delay.finished); assertFalse(execution.getConfiguration().delay.finished);
analysis.waitForTermination(); execution.waitForTermination();
assertTrue(analysis.getConfiguration().delay.finished); assertTrue(execution.getConfiguration().delay.finished);
} }
@Test @Test
public void testExecuteBlocking() { public void testExecuteBlocking() {
StopWatch watch = new StopWatch(); StopWatch watch = new StopWatch();
watch.start(); watch.start();
analysis.executeBlocking(); execution.executeBlocking();
watch.end(); watch.end();
assertThat(watch.getDurationInMs() + ABSOLUTE_MAX_ERROR_IN_MS, is(greaterThanOrEqualTo(DELAY_IN_MS))); assertThat(watch.getDurationInMs() + ABSOLUTE_MAX_ERROR_IN_MS, is(greaterThanOrEqualTo(DELAY_IN_MS)));
} }
private static class TestConfig extends AnalysisConfiguration { private static class TestConfig extends ConfigurationContext {
public final DelayAndTerminate delay; public final DelayAndTerminate delay;
public TestConfig() { public TestConfig() {
final InitialElementProducer<String> init = new InitialElementProducer<String>("Hello"); final InitialElementProducer<String> init = new InitialElementProducer<String>("Hello");
delay = new DelayAndTerminate(DELAY_IN_MS); delay = new DelayAndTerminate(DELAY_IN_MS);
connectPorts(init.getOutputPort(), delay.getInputPort()); connectPorts(init.getOutputPort(), delay.getInputPort());
addThreadableStage(init);
} }
} }
...@@ -105,20 +104,19 @@ public class AnalysisTest { ...@@ -105,20 +104,19 @@ public class AnalysisTest {
@Test @Test
public void testInstantiatePipes() throws Exception { public void testInstantiatePipes() throws Exception {
Analysis<AnalysisTestConfig> interAnalysis = new Analysis<AnalysisTestConfig>(new AnalysisTestConfig(true)); Execution<AnalysisTestConfig> interAnalysis = new Execution<AnalysisTestConfig>(new AnalysisTestConfig(true));
assertThat(interAnalysis.getConfiguration().init.getOwningThread(), is(not(interAnalysis.getConfiguration().sink.getOwningThread()))); assertThat(interAnalysis.getConfiguration().init.getOwningThread(), is(not(interAnalysis.getConfiguration().sink.getOwningThread())));
Analysis<AnalysisTestConfig> intraAnalysis = new Analysis<AnalysisTestConfig>(new AnalysisTestConfig(false)); Execution<AnalysisTestConfig> intraAnalysis = new Execution<AnalysisTestConfig>(new AnalysisTestConfig(false));
assertThat(intraAnalysis.getConfiguration().init.getOwningThread(), is(intraAnalysis.getConfiguration().sink.getOwningThread())); assertThat(intraAnalysis.getConfiguration().init.getOwningThread(), is(intraAnalysis.getConfiguration().sink.getOwningThread()));
} }
private class AnalysisTestConfig extends AnalysisConfiguration { private class AnalysisTestConfig extends ConfigurationContext {
public InitialElementProducer<Object> init = new InitialElementProducer<Object>(); public InitialElementProducer<Object> init = new InitialElementProducer<Object>();
public Sink<Object> sink = new Sink<Object>(); public Sink<Object> sink = new Sink<Object>();
public AnalysisTestConfig(final boolean inter) { public AnalysisTestConfig(final boolean inter) {
connectPorts(init.getOutputPort(), sink.getInputPort()); connectPorts(init.getOutputPort(), sink.getInputPort());
addThreadableStage(init);
if (inter) { if (inter) {
addThreadableStage(sink); addThreadableStage(sink);
} }
...@@ -133,10 +131,10 @@ public class AnalysisTest { ...@@ -133,10 +131,10 @@ public class AnalysisTest {
thrown.expect(IllegalStateException.class); thrown.expect(IllegalStateException.class);
thrown.expectMessage("Crossing threads"); thrown.expectMessage("Crossing threads");
InvalidTestConfig configuration = new InvalidTestConfig(); InvalidTestConfig configuration = new InvalidTestConfig();
new Analysis<InvalidTestConfig>(configuration); new Execution<InvalidTestConfig>(configuration);
} }
private class InvalidTestConfig extends AnalysisConfiguration { private class InvalidTestConfig extends ConfigurationContext {
public InitialElementProducer<Object> init = new InitialElementProducer<Object>(); public InitialElementProducer<Object> init = new InitialElementProducer<Object>();
public InstanceOfFilter<Object, Object> iof = new InstanceOfFilter<Object, Object>(Object.class); public InstanceOfFilter<Object, Object> iof = new InstanceOfFilter<Object, Object>(Object.class);
public Sink<Object> sink = new Sink<Object>(); public Sink<Object> sink = new Sink<Object>();
...@@ -145,9 +143,37 @@ public class AnalysisTest { ...@@ -145,9 +143,37 @@ public class AnalysisTest {
connectPorts(init.getOutputPort(), iof.getInputPort()); connectPorts(init.getOutputPort(), iof.getInputPort());
connectPorts(iof.getMatchedOutputPort(), sink.getInputPort()); connectPorts(iof.getMatchedOutputPort(), sink.getInputPort());
connectPorts(init.createOutputPort(), sink.createInputPort()); connectPorts(init.createOutputPort(), sink.createInputPort());
addThreadableStage(init);
addThreadableStage(iof); addThreadableStage(iof);
} }
} }
@Test
public void automaticallyAddHeadStages() {
AutomaticallyConfig context = new AutomaticallyConfig();
new Execution<ConfigurationContext>(context).executeBlocking();
assertTrue(context.executed);
}
private class AutomaticallyConfig extends ConfigurationContext {
public boolean executed;
public AutomaticallyConfig() {
AutomaticallyAddedStage aas = new AutomaticallyAddedStage();
Sink<Object> sink = new Sink<Object>();
connectPorts(aas.getOutputPort(), sink.getInputPort());
}
private class AutomaticallyAddedStage extends AbstractProducerStage<Object> {
@Override
protected void execute() {
executed = true;
terminate();
}
}
}
} }
...@@ -34,11 +34,11 @@ public class RunnableConsumerStageTest { ...@@ -34,11 +34,11 @@ public class RunnableConsumerStageTest {
public void testWaitingInfinitely() throws Exception { public void testWaitingInfinitely() throws Exception {
RunnableConsumerStageTestConfiguration configuration = new RunnableConsumerStageTestConfiguration(); RunnableConsumerStageTestConfiguration configuration = new RunnableConsumerStageTestConfiguration();
final Analysis analysis = new Analysis(configuration); final Execution execution = new Execution(configuration);
final Thread thread = new Thread(new Runnable() { final Thread thread = new Thread(new Runnable() {
@Override @Override
public void run() { public void run() {
start(analysis); start(execution);
} }
}); });
thread.start(); thread.start();
...@@ -59,8 +59,8 @@ public class RunnableConsumerStageTest { ...@@ -59,8 +59,8 @@ public class RunnableConsumerStageTest {
public void testCorrectStartAndTerminatation() throws Exception { public void testCorrectStartAndTerminatation() throws Exception {
RunnableConsumerStageTestConfiguration configuration = new RunnableConsumerStageTestConfiguration(0, 1, 2, 3, 5); RunnableConsumerStageTestConfiguration configuration = new RunnableConsumerStageTestConfiguration(0, 1, 2, 3, 5);
final Analysis analysis = new Analysis(configuration); final Execution execution = new Execution(configuration);
start(analysis); start(execution);
assertEquals(5, configuration.getCollectedElements().size()); assertEquals(5, configuration.getCollectedElements().size());
} }
...@@ -69,7 +69,7 @@ public class RunnableConsumerStageTest { ...@@ -69,7 +69,7 @@ public class RunnableConsumerStageTest {
// public void testWaitingInfinitely() throws Exception { // public void testWaitingInfinitely() throws Exception {
// WaitStrategyConfiguration waitStrategyConfiguration = new WaitStrategyConfiguration(300, 42); // WaitStrategyConfiguration waitStrategyConfiguration = new WaitStrategyConfiguration(300, 42);
// //
// final Analysis analysis = new Analysis(waitStrategyConfiguration); // final Execution analysis = new Execution(waitStrategyConfiguration);
// Thread thread = new Thread(new Runnable() { // Thread thread = new Thread(new Runnable() {
// @Override // @Override
// public void run() { // public void run() {
...@@ -88,7 +88,7 @@ public class RunnableConsumerStageTest { ...@@ -88,7 +88,7 @@ public class RunnableConsumerStageTest {
// public void testWaitingFinitely() throws Exception { // public void testWaitingFinitely() throws Exception {
// WaitStrategyConfiguration waitStrategyConfiguration = new WaitStrategyConfiguration(300, 42); // WaitStrategyConfiguration waitStrategyConfiguration = new WaitStrategyConfiguration(300, 42);
// //
// final Analysis analysis = new Analysis(waitStrategyConfiguration); // final Execution analysis = new Execution(waitStrategyConfiguration);
// Thread thread = new Thread(new Runnable() { // Thread thread = new Thread(new Runnable() {
// @Override // @Override
// public void run() { // public void run() {
...@@ -109,19 +109,19 @@ public class RunnableConsumerStageTest { ...@@ -109,19 +109,19 @@ public class RunnableConsumerStageTest {
public void testYieldRun() throws Exception { public void testYieldRun() throws Exception {
YieldStrategyConfiguration waitStrategyConfiguration = new YieldStrategyConfiguration(42); YieldStrategyConfiguration waitStrategyConfiguration = new YieldStrategyConfiguration(42);
final Analysis analysis = new Analysis(waitStrategyConfiguration); final Execution execution = new Execution(waitStrategyConfiguration);
start(analysis); start(execution);
assertEquals(42, waitStrategyConfiguration.getCollectorSink().getElements().get(0)); assertEquals(42, waitStrategyConfiguration.getCollectorSink().getElements().get(0));
assertEquals(1, waitStrategyConfiguration.getCollectorSink().getElements().size()); assertEquals(1, waitStrategyConfiguration.getCollectorSink().getElements().size());
} }
private void start(final Analysis analysis) { private void start(final Execution execution) {
Collection<Pair<Thread, Throwable>> exceptions = new ArrayList<Pair<Thread, Throwable>>(); Collection<Pair<Thread, Throwable>> exceptions = new ArrayList<Pair<Thread, Throwable>>();
try { try {
analysis.executeBlocking(); execution.executeBlocking();
} catch (AnalysisException e) { } catch (ExecutionException e) {
exceptions = e.getThrownExceptions(); exceptions = e.getThrownExceptions();
} }
for (Pair<Thread, Throwable> pair : exceptions) { for (Pair<Thread, Throwable> pair : exceptions) {
......
...@@ -21,7 +21,7 @@ import java.util.List; ...@@ -21,7 +21,7 @@ import java.util.List;
import teetime.stage.CollectorSink; import teetime.stage.CollectorSink;
import teetime.stage.InitialElementProducer; import teetime.stage.InitialElementProducer;
public class RunnableConsumerStageTestConfiguration extends AnalysisConfiguration { public class RunnableConsumerStageTestConfiguration extends ConfigurationContext {
private final List<Integer> collectedElements = new ArrayList<Integer>(); private final List<Integer> collectedElements = new ArrayList<Integer>();
private final CollectorSink<Integer> collectorSink; private final CollectorSink<Integer> collectorSink;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment