Skip to content
Snippets Groups Projects
Commit 20b10cf8 authored by Nelson Tavares de Sousa's avatar Nelson Tavares de Sousa
Browse files

clean-up - teetime.framework.pipe

parent ade9076a
No related branches found
No related tags found
No related merge requests found
......@@ -17,8 +17,6 @@ package teetime.framework.pipe;
import teetime.framework.InputPort;
import teetime.framework.OutputPort;
import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
/**
* Represents the interface, which is must be defined in every PipeFactory
......@@ -54,16 +52,6 @@ public interface IPipeFactory {
*/
<T> IPipe create(OutputPort<? extends T> sourcePort, InputPort<T> targetPort, int capacity);
/**
* @return Type of ThreadCommunication, which is used by the created pipes.
*/
ThreadCommunication getThreadCommunication();
/**
* @return Ordering type, which is used by the created pipes.
*/
PipeOrdering getOrdering();
/**
* @return Whether or not the created pipes are growable
*/
......
/**
* Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://teetime.sourceforge.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package teetime.framework.pipe;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URL;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import teetime.util.classpath.FileSearcher;
final class PipeFactoryLoader {
private static final Logger LOGGER = LoggerFactory.getLogger(PipeFactoryLoader.class);
private PipeFactoryLoader() {
// utility class
}
public static List<IPipeFactory> loadFromStream(final InputStream stream) throws IOException {
final List<IPipeFactory> pipeFactories = new LinkedList<IPipeFactory>();
final BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(stream, "UTF-8"));
try {
String line;
while (null != (line = bufferedReader.readLine())) {
try {
line = line.trim();
if (!line.isEmpty()) {
final Class<?> clazz = Class.forName(line);
final Class<? extends IPipeFactory> pipeFactoryClass = clazz.asSubclass(IPipeFactory.class);
final IPipeFactory pipeFactory = pipeFactoryClass.newInstance();
pipeFactories.add(pipeFactory);
}
} catch (ClassNotFoundException e) {
if (LOGGER.isWarnEnabled()) {
LOGGER.warn("Could not find class: " + line, e);
}
} catch (InstantiationException e) {
LOGGER.warn("Could not instantiate pipe factory", e);
} catch (IllegalAccessException e) {
LOGGER.warn("Could not instantiate pipe factory", e);
}
}
} finally {
bufferedReader.close();
}
return pipeFactories;
}
public static List<IPipeFactory> loadPipeFactoriesFromClasspath(final String configFileName) {
List<URL> files = null;
try {
files = FileSearcher.loadResources(configFileName);
} catch (IOException e) {
throw new IllegalStateException(e);
}
return mergeFiles(files);
}
public static List<IPipeFactory> mergeFiles(final List<URL> files) {
final List<IPipeFactory> mergedPipeFactories = new ArrayList<IPipeFactory>();
for (URL url : files) {
try {
final InputStream is = url.openStream();
mergedPipeFactories.addAll(loadFromStream(is));
is.close();
} catch (IOException e) {
throw new IllegalStateException(e);
}
}
return mergedPipeFactories;
}
}
/**
* Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://teetime.sourceforge.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package teetime.framework.pipe;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Represents a Registry which provides PipeFactories that are used to create pipes.
* The instance of this singleton class is saved in {@link PipeFactoryRegistry#INSTANCE}.
* <p>
* To get a PipeFactory instance, call {@link #getPipeFactory(ThreadCommunication, PipeOrdering, boolean)}.
*
* @deprecated since 1.2
*/
@Deprecated
public final class PipeFactoryRegistry {
private static final Logger LOGGER = LoggerFactory.getLogger(PipeFactoryRegistry.class);
/**
* Represent a communication type between two connected stages
*/
public enum ThreadCommunication {
INTER, INTRA
}
/**
* Represents the ordering behavior of a pipe
*/
public enum PipeOrdering {
/**
* FIFO
*/
QUEUE_BASED,
/**
* LIFO
*/
STACK_BASED,
ARBITRARY
}
private final Map<String, IPipeFactory> pipeFactories = new HashMap<String, IPipeFactory>();
/**
* The singleton instance of PipeFactoryRegistry
*/
public static final PipeFactoryRegistry INSTANCE = new PipeFactoryRegistry("pipe-factories.conf");
private PipeFactoryRegistry(final String configFileName) {
final List<IPipeFactory> pipeFactories = PipeFactoryLoader.loadPipeFactoriesFromClasspath(configFileName);
for (IPipeFactory pipeFactory : pipeFactories) {
this.register(pipeFactory);
}
}
/**
* Returns a PipeFactory Instance.
*
* @param tc
* Communication type between two connected stages. These are defined in PipeFactoryRegistry.ThreadCommunication
* @param ordering
* Specifies the ordering behavior of the pipe. See PipeFactoryRegistry.PipeOrdering
* @param growable
* Whether the queue size is fixed or not.
* @return
* A PipeFactory, which provides suitable pipes.
*/
public IPipeFactory getPipeFactory(final ThreadCommunication tc, final PipeOrdering ordering, final boolean growable) {
final String key = this.buildKey(tc, ordering, growable);
final IPipeFactory pipeFactory = this.pipeFactories.get(key);
if (null == pipeFactory) {
throw new CouldNotFindPipeImplException(key);
}
return pipeFactory;
}
/**
* Adds a new PipeFactory to the registry.
* The new PipeFactory will be automatically selected by the Registry, if it is the most suitable Factory
* corresponding to the requirements.
*
* @param pipeFactory
* A PipeFactory which will be added to the registry
*/
public void register(final IPipeFactory pipeFactory) {
final String key = this.buildKey(pipeFactory.getThreadCommunication(), pipeFactory.getOrdering(), pipeFactory.isGrowable());
this.pipeFactories.put(key, pipeFactory);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Registered pipe factory: " + pipeFactory.getClass().getCanonicalName());
}
}
private String buildKey(final ThreadCommunication tc, final PipeOrdering ordering, final boolean growable) {
return tc.toString() + ordering.toString() + growable;
}
}
......@@ -17,8 +17,6 @@ package teetime.framework.pipe;
import teetime.framework.InputPort;
import teetime.framework.OutputPort;
import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
public final class SingleElementPipeFactory implements IPipeFactory {
......@@ -37,16 +35,6 @@ public final class SingleElementPipeFactory implements IPipeFactory {
return new SingleElementPipe(sourcePort, targetPort);
}
@Override
public ThreadCommunication getThreadCommunication() {
return ThreadCommunication.INTRA;
}
@Override
public PipeOrdering getOrdering() {
return PipeOrdering.ARBITRARY;
}
@Override
public boolean isGrowable() {
return false;
......
......@@ -17,8 +17,6 @@ package teetime.framework.pipe;
import teetime.framework.InputPort;
import teetime.framework.OutputPort;
import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
public final class SpScPipeFactory implements IPipeFactory {
......@@ -32,16 +30,6 @@ public final class SpScPipeFactory implements IPipeFactory {
return new SpScPipe(sourcePort, targetPort, capacity);
}
@Override
public ThreadCommunication getThreadCommunication() {
return ThreadCommunication.INTER;
}
@Override
public PipeOrdering getOrdering() {
return PipeOrdering.QUEUE_BASED;
}
@Override
public boolean isGrowable() {
return false;
......
......@@ -17,8 +17,6 @@ package teetime.framework.pipe;
import teetime.framework.InputPort;
import teetime.framework.OutputPort;
import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
public class UnboundedSpScPipeFactory implements IPipeFactory {
......@@ -37,16 +35,6 @@ public class UnboundedSpScPipeFactory implements IPipeFactory {
return new UnboundedSpScPipe(sourcePort, targetPort);
}
@Override
public ThreadCommunication getThreadCommunication() {
return ThreadCommunication.INTER;
}
@Override
public PipeOrdering getOrdering() {
return PipeOrdering.QUEUE_BASED;
}
@Override
public boolean isGrowable() {
return true;
......
......@@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Contains all pipes related classes, such as factories and pre-defined pipes.
*/
......
/**
* Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://teetime.sourceforge.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package teetime.framework.pipe;
import teetime.framework.InputPort;
import teetime.framework.OutputPort;
import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
public final class DummyFactory implements IPipeFactory {
@Override
public <T> IPipe create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
return this.create(sourcePort, targetPort, 4);
}
@Override
public <T> IPipe create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
return new SpScPipe(sourcePort, targetPort, capacity);
}
@Override
public ThreadCommunication getThreadCommunication() {
return ThreadCommunication.INTER;
}
@Override
public PipeOrdering getOrdering() {
return PipeOrdering.QUEUE_BASED;
}
@Override
public boolean isGrowable() {
return false;
}
}
/**
* Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://teetime.sourceforge.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package teetime.framework.pipe;
import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import org.junit.Assert;
import org.junit.Test;
import teetime.util.classpath.ClassForNameResolver;
import com.google.common.base.Charsets;
import com.google.common.io.Files;
public class PipeFactoryLoaderTest {
public PipeFactoryLoaderTest() {}
@Test
public void emptyConfig() throws IOException {
final List<IPipeFactory> pipeFactories = PipeFactoryLoader.loadPipeFactoriesFromClasspath("data/empty-test.conf");
Assert.assertEquals(true, pipeFactories.isEmpty());
}
@Test
public void singleConfig() throws IOException {
final List<IPipeFactory> pipeFactories = PipeFactoryLoader.loadPipeFactoriesFromClasspath("pipe-factories.conf");
final int lines = Files.readLines(new File("target/classes/pipe-factories.conf"), Charsets.UTF_8).size();
Assert.assertEquals(lines, pipeFactories.size());
}
@Test
public void multipleConfigs() throws IOException, ClassNotFoundException, InstantiationException, IllegalAccessException {
final List<URL> files = new ArrayList<URL>();
final File pipeConfig = new File("target/classes/pipe-factories.conf");
final File testConfig = new File("target/test-classes/data/normal-test.conf");
files.add(testConfig.toURI().toURL());
files.add(pipeConfig.toURI().toURL());
final List<IPipeFactory> pipeFactories = PipeFactoryLoader.mergeFiles(files);
final List<String> contents = Files.readLines(pipeConfig, Charsets.UTF_8);
contents.addAll(Files.readLines(testConfig, Charsets.UTF_8));
// Check if all read factories are contained in one of the files
for (IPipeFactory iPipeFactory : pipeFactories) {
Assert.assertTrue(contents.indexOf(iPipeFactory.getClass().getCanonicalName()) != -1);
}
// Second part of the test: PipeFactoryRegistry
final PipeFactoryRegistry pipeRegistry = PipeFactoryRegistry.INSTANCE;
final ClassForNameResolver<IPipeFactory> classResolver = new ClassForNameResolver<IPipeFactory>(IPipeFactory.class);
// Look for the "normal" pipes
for (String className : Files.readLines(pipeConfig, Charsets.UTF_8)) {
final IPipeFactory pipeFactory = classResolver.classForName(className).newInstance();
final IPipeFactory returnedFactory = pipeRegistry.getPipeFactory(pipeFactory.getThreadCommunication(), pipeFactory.getOrdering(),
pipeFactory.isGrowable());
Assert.assertEquals(pipeFactory.getClass().getCanonicalName(), returnedFactory.getClass().getCanonicalName());
}
// Second "and a half" part
for (String className : Files.readLines(testConfig, Charsets.UTF_8)) {
final IPipeFactory pipeFactory = classResolver.classForName(className).newInstance();
// Still old factory
IPipeFactory returnedFactory = pipeRegistry.getPipeFactory(pipeFactory.getThreadCommunication(), pipeFactory.getOrdering(), pipeFactory.isGrowable());
Assert.assertNotEquals(pipeFactory.getClass().getCanonicalName(), returnedFactory.getClass().getCanonicalName());
// Overload factory and check for the new one
pipeRegistry.register(pipeFactory);
returnedFactory = pipeRegistry.getPipeFactory(pipeFactory.getThreadCommunication(), pipeFactory.getOrdering(), pipeFactory.isGrowable());
Assert.assertEquals(pipeFactory.getClass().getCanonicalName(), returnedFactory.getClass().getCanonicalName());
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment