Skip to content
Snippets Groups Projects
Commit b255b9be authored by Christian Wulf's avatar Christian Wulf
Browse files

Merge branch 'prework-2.0' into 'master'

Prework 2.0

Some minor things which needed to be done before 2.0

See merge request !45
parents 899636f6 08d11d9e
No related branches found
No related tags found
No related merge requests found
Showing
with 36 additions and 610 deletions
......@@ -55,7 +55,7 @@ To download these builds, add following lines to your project's ``pom.xml``:
<dependency>
<groupId>net.sourceforge.teetime</groupId>
<artifactId>teetime</artifactId>
<version>1.2-SNAPSHOT</version>
<version>2.0-SNAPSHOT</version>
</dependency>
```
......
package teetime.framework;
/**
*
* Represents a configuration of connected stages.
*
* @author Christian Wulf, Nelson Tavares de Sousa
*
......
......@@ -21,12 +21,7 @@ import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import teetime.framework.pipe.IPipe;
import teetime.framework.pipe.IPipeFactory;
import teetime.framework.pipe.InstantiationPipe;
import teetime.framework.pipe.PipeFactoryRegistry;
import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
/**
* Represents a context that is used by a configuration and composite stages to connect ports, for example.
......@@ -36,25 +31,13 @@ import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
*/
public final class ConfigurationContext {
private static final int DEFAULT_CAPACITY = 4;
private static final Logger LOGGER = LoggerFactory.getLogger(ConfigurationContext.class);
private final Set<Stage> threadableStages = new HashSet<Stage>();
@SuppressWarnings("deprecation")
private static final PipeFactoryRegistry PIPE_FACTORY_REGISTRY = PipeFactoryRegistry.INSTANCE;
/**
* Can be used by subclasses, to connect stages
*/
private final static IPipeFactory intraThreadFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false);
/**
* Can be used by subclasses, to connect stages
*/
private final static IPipeFactory interBoundedThreadFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false);
/**
* Can be used by subclasses, to connect stages
*/
private final static IPipeFactory interUnboundedThreadFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, true);
ConfigurationContext() {}
Set<Stage> getThreadableStages() {
return this.threadableStages;
......@@ -66,113 +49,14 @@ public final class ConfigurationContext {
* @param stage
* A arbitrary stage, which will be added to the configuration and executed in a thread.
*/
protected final void addThreadableStage(final Stage stage) {
final void addThreadableStage(final Stage stage) {
if (!this.threadableStages.add(stage)) {
LOGGER.warn("Stage " + stage.getId() + " was already marked as threadable stage.");
}
}
/**
* Connects two stages with a pipe within the same thread.
*
* @param sourcePort
* {@link OutputPort} of the sending stage
* @param targetPort
* {@link InputPort} of the sending stage
* @param <T>
* the type of elements to be sent
* @return
* the pipe instance which connects the two given stages
*
* @deprecated since 1.2. Use {@link #connectPorts(OutputPort, InputPort)} instead.
*/
@Deprecated
protected static <T> IPipe connectIntraThreads(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
return intraThreadFactory.create(sourcePort, targetPort);
}
/**
* Connects two stages with a bounded pipe within two separate threads.
*
* @param sourcePort
* {@link OutputPort} of the sending stage
* @param targetPort
* {@link InputPort} of the sending stage
* @param <T>
* the type of elements to be sent
* @return
* the pipe instance which connects the two given stages
*
* @deprecated since 1.2. Use {@link #connectPorts(OutputPort, InputPort)} instead.
*/
@Deprecated
protected static <T> IPipe connectBoundedInterThreads(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
return interBoundedThreadFactory.create(sourcePort, targetPort);
}
/**
* Connects two stages with a unbounded pipe within two separate threads.
*
* @param sourcePort
* {@link OutputPort} of the sending stage
* @param targetPort
* {@link InputPort} of the sending stage
* @param <T>
* the type of elements to be sent
* @return
* the pipe instance which connects the two given stages
*
* @deprecated since 1.2. Use {@link #connectPorts(OutputPort, InputPort)} instead.
*/
@Deprecated
protected static <T> IPipe connectUnboundedInterThreads(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
return interUnboundedThreadFactory.create(sourcePort, targetPort);
}
/**
* Connects two stages with a bounded pipe within two separate threads.
*
* @param sourcePort
* {@link OutputPort} of the sending stage
* @param targetPort
* {@link InputPort} of the sending stage
* @param capacity
* capacity of the underlying queue
* @param <T>
* the type of elements to be sent
* @return
* the pipe instance which connects the two given stages
*
* @deprecated since 1.2. Use {@link #connectPorts(OutputPort, InputPort)} instead.
*/
@Deprecated
protected static <T> IPipe connectBoundedInterThreads(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
return interBoundedThreadFactory.create(sourcePort, targetPort, capacity);
}
/**
* Connects two stages with a unbounded pipe within two separate threads.
*
* @param sourcePort
* {@link OutputPort} of the sending stage
* @param targetPort
* {@link InputPort} of the sending stage
* @param capacity
* capacity of the underlying queue
* @param <T>
* the type of elements to be sent
* @return
* the pipe instance which connects the two given stages
*
* @deprecated since 1.2. Use {@link #connectPorts(OutputPort, InputPort)} instead.
*/
@Deprecated
protected static <T> IPipe connectUnboundedInterThreads(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
return interUnboundedThreadFactory.create(sourcePort, targetPort, capacity);
}
/**
* Connects two ports with a pipe with a default capacity of currently 4
* Connects two ports with a pipe with a default capacity of currently {@value #DEFAULT_CAPACITY}.
*
* @param sourcePort
* {@link OutputPort} of the sending stage
......@@ -181,8 +65,8 @@ public final class ConfigurationContext {
* @param <T>
* the type of elements to be sent
*/
protected final <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
connectPorts(sourcePort, targetPort, 4);
final <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
connectPorts(sourcePort, targetPort, DEFAULT_CAPACITY);
}
/**
......@@ -197,7 +81,7 @@ public final class ConfigurationContext {
* @param <T>
* the type of elements to be sent
*/
protected final <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
final <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
if (sourcePort.getOwningStage().getInputPorts().length == 0 && !threadableStages.contains(sourcePort.getOwningStage())) {
addThreadableStage(sourcePort.getOwningStage());
}
......
......@@ -31,7 +31,7 @@ import teetime.framework.exceptionHandling.IgnoringExceptionListenerFactory;
import teetime.framework.signal.InitializingSignal;
import teetime.framework.signal.ValidatingSignal;
import teetime.framework.validation.AnalysisNotValidException;
import teetime.util.Pair;
import teetime.util.ThreadThrowableContainer;
/**
* Represents an Execution to which stages can be added and executed later.
......@@ -44,7 +44,7 @@ import teetime.util.Pair;
*
* @param <T>
* the type of the {@link Configuration}
*
*
* @since 2.0
*/
public final class Execution<T extends Configuration> implements UncaughtExceptionHandler {
......@@ -61,7 +61,7 @@ public final class Execution<T extends Configuration> implements UncaughtExcepti
private final List<Thread> finiteProducerThreads = new LinkedList<Thread>();
private final List<Thread> infiniteProducerThreads = new LinkedList<Thread>();
private final Collection<Pair<Thread, Throwable>> exceptions = new ConcurrentLinkedQueue<Pair<Thread, Throwable>>();
private final Collection<ThreadThrowableContainer> exceptions = new ConcurrentLinkedQueue<ThreadThrowableContainer>();
private final List<RunnableProducerStage> producerRunnables = new LinkedList<RunnableProducerStage>();
......@@ -308,7 +308,7 @@ public final class Execution<T extends Configuration> implements UncaughtExcepti
}
}
}
this.exceptions.add(Pair.of(thread, throwable));
this.exceptions.add(ThreadThrowableContainer.of(thread, throwable));
}
private Set<Stage> traverseIntraStages(final Stage stage) {
......
......@@ -17,7 +17,7 @@ package teetime.framework;
import java.util.Collection;
import teetime.util.Pair;
import teetime.util.ThreadThrowableContainer;
/**
* Represents a exception, which is thrown by an analysis, if any problems occured within its execution.
......@@ -32,9 +32,9 @@ public class ExecutionException extends RuntimeException {
*/
private static final long serialVersionUID = 7486086437171884298L;
private final Collection<Pair<Thread, Throwable>> exceptions;
private final Collection<ThreadThrowableContainer> exceptions;
public ExecutionException(final Collection<Pair<Thread, Throwable>> exceptions) {
public ExecutionException(final Collection<ThreadThrowableContainer> exceptions) {
super("Error(s) while running analysis. Check thrown exceptions.");
this.exceptions = exceptions;
}
......@@ -45,7 +45,7 @@ public class ExecutionException extends RuntimeException {
*
* @return a collection of pairs
*/
public Collection<Pair<Thread, Throwable>> getThrownExceptions() {
public Collection<ThreadThrowableContainer> getThrownExceptions() {
return exceptions;
}
......
......@@ -17,8 +17,6 @@ package teetime.framework.pipe;
import teetime.framework.InputPort;
import teetime.framework.OutputPort;
import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
/**
* Represents the interface, which is must be defined in every PipeFactory
......@@ -54,16 +52,6 @@ public interface IPipeFactory {
*/
<T> IPipe create(OutputPort<? extends T> sourcePort, InputPort<T> targetPort, int capacity);
/**
* @return Type of ThreadCommunication, which is used by the created pipes.
*/
ThreadCommunication getThreadCommunication();
/**
* @return Ordering type, which is used by the created pipes.
*/
PipeOrdering getOrdering();
/**
* @return Whether or not the created pipes are growable
*/
......
/**
* Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://teetime.sourceforge.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package teetime.framework.pipe;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URL;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import teetime.util.classpath.FileSearcher;
final class PipeFactoryLoader {
private static final Logger LOGGER = LoggerFactory.getLogger(PipeFactoryLoader.class);
private PipeFactoryLoader() {
// utility class
}
public static List<IPipeFactory> loadFromStream(final InputStream stream) throws IOException {
final List<IPipeFactory> pipeFactories = new LinkedList<IPipeFactory>();
final BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(stream, "UTF-8"));
try {
String line;
while (null != (line = bufferedReader.readLine())) {
try {
line = line.trim();
if (!line.isEmpty()) {
final Class<?> clazz = Class.forName(line);
final Class<? extends IPipeFactory> pipeFactoryClass = clazz.asSubclass(IPipeFactory.class);
final IPipeFactory pipeFactory = pipeFactoryClass.newInstance();
pipeFactories.add(pipeFactory);
}
} catch (ClassNotFoundException e) {
if (LOGGER.isWarnEnabled()) {
LOGGER.warn("Could not find class: " + line, e);
}
} catch (InstantiationException e) {
LOGGER.warn("Could not instantiate pipe factory", e);
} catch (IllegalAccessException e) {
LOGGER.warn("Could not instantiate pipe factory", e);
}
}
} finally {
bufferedReader.close();
}
return pipeFactories;
}
public static List<IPipeFactory> loadPipeFactoriesFromClasspath(final String configFileName) {
List<URL> files = null;
try {
files = FileSearcher.loadResources(configFileName);
} catch (IOException e) {
throw new IllegalStateException(e);
}
return mergeFiles(files);
}
public static List<IPipeFactory> mergeFiles(final List<URL> files) {
final List<IPipeFactory> mergedPipeFactories = new ArrayList<IPipeFactory>();
for (URL url : files) {
try {
final InputStream is = url.openStream();
mergedPipeFactories.addAll(loadFromStream(is));
is.close();
} catch (IOException e) {
throw new IllegalStateException(e);
}
}
return mergedPipeFactories;
}
}
/**
* Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://teetime.sourceforge.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package teetime.framework.pipe;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Represents a Registry which provides PipeFactories that are used to create pipes.
* The instance of this singleton class is saved in {@link PipeFactoryRegistry#INSTANCE}.
* <p>
* To get a PipeFactory instance, call {@link #getPipeFactory(ThreadCommunication, PipeOrdering, boolean)}.
*
* @deprecated since 1.2
*/
@Deprecated
public final class PipeFactoryRegistry {
private static final Logger LOGGER = LoggerFactory.getLogger(PipeFactoryRegistry.class);
/**
* Represent a communication type between two connected stages
*/
public enum ThreadCommunication {
INTER, INTRA
}
/**
* Represents the ordering behavior of a pipe
*/
public enum PipeOrdering {
/**
* FIFO
*/
QUEUE_BASED,
/**
* LIFO
*/
STACK_BASED,
ARBITRARY
}
private final Map<String, IPipeFactory> pipeFactories = new HashMap<String, IPipeFactory>();
/**
* The singleton instance of PipeFactoryRegistry
*/
public static final PipeFactoryRegistry INSTANCE = new PipeFactoryRegistry("pipe-factories.conf");
private PipeFactoryRegistry(final String configFileName) {
final List<IPipeFactory> pipeFactories = PipeFactoryLoader.loadPipeFactoriesFromClasspath(configFileName);
for (IPipeFactory pipeFactory : pipeFactories) {
this.register(pipeFactory);
}
}
/**
* Returns a PipeFactory Instance.
*
* @param tc
* Communication type between two connected stages. These are defined in PipeFactoryRegistry.ThreadCommunication
* @param ordering
* Specifies the ordering behavior of the pipe. See PipeFactoryRegistry.PipeOrdering
* @param growable
* Whether the queue size is fixed or not.
* @return
* A PipeFactory, which provides suitable pipes.
*/
public IPipeFactory getPipeFactory(final ThreadCommunication tc, final PipeOrdering ordering, final boolean growable) {
final String key = this.buildKey(tc, ordering, growable);
final IPipeFactory pipeFactory = this.pipeFactories.get(key);
if (null == pipeFactory) {
throw new CouldNotFindPipeImplException(key);
}
return pipeFactory;
}
/**
* Adds a new PipeFactory to the registry.
* The new PipeFactory will be automatically selected by the Registry, if it is the most suitable Factory
* corresponding to the requirements.
*
* @param pipeFactory
* A PipeFactory which will be added to the registry
*/
public void register(final IPipeFactory pipeFactory) {
final String key = this.buildKey(pipeFactory.getThreadCommunication(), pipeFactory.getOrdering(), pipeFactory.isGrowable());
this.pipeFactories.put(key, pipeFactory);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Registered pipe factory: " + pipeFactory.getClass().getCanonicalName());
}
}
private String buildKey(final ThreadCommunication tc, final PipeOrdering ordering, final boolean growable) {
return tc.toString() + ordering.toString() + growable;
}
}
......@@ -17,8 +17,6 @@ package teetime.framework.pipe;
import teetime.framework.InputPort;
import teetime.framework.OutputPort;
import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
public final class SingleElementPipeFactory implements IPipeFactory {
......@@ -37,16 +35,6 @@ public final class SingleElementPipeFactory implements IPipeFactory {
return new SingleElementPipe(sourcePort, targetPort);
}
@Override
public ThreadCommunication getThreadCommunication() {
return ThreadCommunication.INTRA;
}
@Override
public PipeOrdering getOrdering() {
return PipeOrdering.ARBITRARY;
}
@Override
public boolean isGrowable() {
return false;
......
......@@ -17,8 +17,6 @@ package teetime.framework.pipe;
import teetime.framework.InputPort;
import teetime.framework.OutputPort;
import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
public final class SpScPipeFactory implements IPipeFactory {
......@@ -32,16 +30,6 @@ public final class SpScPipeFactory implements IPipeFactory {
return new SpScPipe(sourcePort, targetPort, capacity);
}
@Override
public ThreadCommunication getThreadCommunication() {
return ThreadCommunication.INTER;
}
@Override
public PipeOrdering getOrdering() {
return PipeOrdering.QUEUE_BASED;
}
@Override
public boolean isGrowable() {
return false;
......
......@@ -17,8 +17,6 @@ package teetime.framework.pipe;
import teetime.framework.InputPort;
import teetime.framework.OutputPort;
import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
public class UnboundedSpScPipeFactory implements IPipeFactory {
......@@ -37,16 +35,6 @@ public class UnboundedSpScPipeFactory implements IPipeFactory {
return new UnboundedSpScPipe(sourcePort, targetPort);
}
@Override
public ThreadCommunication getThreadCommunication() {
return ThreadCommunication.INTER;
}
@Override
public PipeOrdering getOrdering() {
return PipeOrdering.QUEUE_BASED;
}
@Override
public boolean isGrowable() {
return true;
......
......@@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Contains all pipes related classes, such as factories and pre-defined pipes.
*/
......
......@@ -15,30 +15,25 @@
*/
package teetime.util;
/**
* @deprecated since 1.2
*/
@Deprecated
// See http://stackoverflow.com/questions/156275/what-is-the-equivalent-of-the-c-pairl-r-in-java
public final class Pair<F, S> {
public final class ThreadThrowableContainer {
private final F first;
private final S second;
private final Thread first;
private final Throwable second;
public Pair(final F first, final S second) {
public ThreadThrowableContainer(final Thread first, final Throwable second) {
this.first = first;
this.second = second;
}
public static <F, S> Pair<F, S> of(final F first, final S second) {
return new Pair<F, S>(first, second);
public static ThreadThrowableContainer of(final Thread first, final Throwable second) {
return new ThreadThrowableContainer(first, second);
}
public F getFirst() {
public Thread getThread() {
return this.first;
}
public S getSecond() {
public Throwable getThrowable() {
return this.second;
}
......
/**
* Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://teetime.sourceforge.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package teetime.util.classpath;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* @param <T>
* the type that is used to cast a type that was found in the class path
*
* @author Christian Wulf
*/
public final class CachedClassForNameResolver<T> {
private final ConcurrentMap<String, Class<? extends T>> cachedClasses = new ConcurrentHashMap<String, Class<? extends T>>(); // NOCS
private final ClassForNameResolver<T> classForNameResolver;
public CachedClassForNameResolver(final ClassForNameResolver<T> classForNameResolver) {
this.classForNameResolver = classForNameResolver;
}
/**
* This method tries to find a class with the given name.
*
* @param classname
* The name of the class.
*
* @return A {@link Class} instance corresponding to the given name, if it exists.
*
* @throws ClassNotFoundException
* thrown iff no class was found for the given <b>classname</b>
*/
public final Class<? extends T> classForName(final String classname) throws ClassNotFoundException {
Class<? extends T> clazz = this.cachedClasses.get(classname);
if (clazz == null) {
clazz = this.classForNameResolver.classForName(classname);
final Class<? extends T> previousClass = this.cachedClasses.putIfAbsent(classname, clazz);
if (null != previousClass) {
clazz = previousClass;
}
}
return clazz;
}
}
/**
* Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://teetime.sourceforge.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package teetime.util.classpath;
/**
* @param <T>
* the type that is used to cast a type that was found in the class path
*
* @author Christian Wulf
* @since 1.11
*/
public final class ClassForNameResolver<T> {
private final Class<T> classToCast;
public ClassForNameResolver(final Class<T> classToCast) {
this.classToCast = classToCast;
}
/**
* This method tries to find a class with the given name.
*
* @param classname
* The name of the class.
*
* @return A {@link Class} instance corresponding to the given name, if it exists.
* @throws ClassNotFoundException
* thrown iff no class was found for the given <b>classname</b>
*
*/
public final Class<? extends T> classForName(final String classname) throws ClassNotFoundException {
final Class<?> clazz = Class.forName(classname);
return clazz.asSubclass(this.classToCast);
}
}
/**
* Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://teetime.sourceforge.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package teetime.util.classpath;
import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
public final class FileSearcher {
private static final ClassLoader CLASS_LOADER = ClassLoader.getSystemClassLoader();
private FileSearcher() {
// utility class
}
public static List<URL> loadResources(final String name) throws IOException {
final List<URL> urls = new ArrayList<URL>();
final Enumeration<URL> systemRes = CLASS_LOADER.getResources(name);
while (systemRes.hasMoreElements()) {
urls.add(systemRes.nextElement());
}
return urls;
}
}
wiki @ 0a5bd4dd
Subproject commit 0a5bd4ddb82684ce1ae2ec84c67ff2117ebff143
......@@ -24,7 +24,7 @@ import java.util.Collection;
import org.junit.Ignore;
import org.junit.Test;
import teetime.util.Pair;
import teetime.util.ThreadThrowableContainer;
import com.google.common.base.Joiner;
......@@ -118,16 +118,16 @@ public class RunnableConsumerStageTest {
}
private void start(final Execution execution) {
Collection<Pair<Thread, Throwable>> exceptions = new ArrayList<Pair<Thread, Throwable>>();
Collection<ThreadThrowableContainer> exceptions = new ArrayList<ThreadThrowableContainer>();
try {
execution.executeBlocking();
} catch (ExecutionException e) {
exceptions = e.getThrownExceptions();
}
for (Pair<Thread, Throwable> pair : exceptions) {
System.err.println(pair.getSecond());
System.err.println(Joiner.on("\n").join(pair.getSecond().getStackTrace()));
throw new AssertionError(pair.getSecond());
for (ThreadThrowableContainer pair : exceptions) {
System.err.println(pair.getThrowable());
System.err.println(Joiner.on("\n").join(pair.getThrowable().getStackTrace()));
throw new AssertionError(pair.getThrowable());
}
assertEquals(0, exceptions.size());
}
......
......@@ -18,6 +18,7 @@ package teetime.framework;
import java.util.ArrayList;
import java.util.List;
import teetime.framework.pipe.SpScPipeFactory;
import teetime.stage.CollectorSink;
import teetime.stage.InitialElementProducer;
......@@ -36,7 +37,7 @@ public class RunnableConsumerStageTestConfiguration extends Configuration {
addThreadableStage(collectorSink);
// Can not use createPorts, as the if condition above will lead to an exception
ConfigurationContext.connectBoundedInterThreads(producer.getOutputPort(), collectorSink.getInputPort());
new SpScPipeFactory().create(producer.getOutputPort(), collectorSink.getInputPort());
this.collectorSink = collectorSink;
}
......
/**
* Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://teetime.sourceforge.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package teetime.framework.pipe;
import teetime.framework.InputPort;
import teetime.framework.OutputPort;
import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
public final class DummyFactory implements IPipeFactory {
@Override
public <T> IPipe create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
return this.create(sourcePort, targetPort, 4);
}
@Override
public <T> IPipe create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
return new SpScPipe(sourcePort, targetPort, capacity);
}
@Override
public ThreadCommunication getThreadCommunication() {
return ThreadCommunication.INTER;
}
@Override
public PipeOrdering getOrdering() {
return PipeOrdering.QUEUE_BASED;
}
@Override
public boolean isGrowable() {
return false;
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment