diff --git a/src/main/java/teetime/framework/ConfigurationContext.java b/src/main/java/teetime/framework/ConfigurationContext.java index a9acaebaeb676b792c369d2f588efad2fcbe24e1..896de7f79e527cd513880e484f5ab79775ee32c5 100644 --- a/src/main/java/teetime/framework/ConfigurationContext.java +++ b/src/main/java/teetime/framework/ConfigurationContext.java @@ -21,12 +21,7 @@ import java.util.Set; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import teetime.framework.pipe.IPipe; -import teetime.framework.pipe.IPipeFactory; import teetime.framework.pipe.InstantiationPipe; -import teetime.framework.pipe.PipeFactoryRegistry; -import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; -import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; /** * Represents a configuration of connected stages, which is needed to run a analysis. @@ -38,22 +33,6 @@ public abstract class ConfigurationContext extends Configuration { private final Set<Stage> threadableStages = new HashSet<Stage>(); - @SuppressWarnings("deprecation") - private static final PipeFactoryRegistry PIPE_FACTORY_REGISTRY = PipeFactoryRegistry.INSTANCE; - - /** - * Can be used by subclasses, to connect stages - */ - private final static IPipeFactory intraThreadFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false); - /** - * Can be used by subclasses, to connect stages - */ - private final static IPipeFactory interBoundedThreadFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false); - /** - * Can be used by subclasses, to connect stages - */ - private final static IPipeFactory interUnboundedThreadFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, true); - Set<Stage> getThreadableStages() { return this.threadableStages; } @@ -71,105 +50,6 @@ public abstract class ConfigurationContext extends Configuration { } } - /** - * Connects two stages with a pipe within the same thread. - * - * @param sourcePort - * {@link OutputPort} of the sending stage - * @param targetPort - * {@link InputPort} of the sending stage - * @param <T> - * the type of elements to be sent - * @return - * the pipe instance which connects the two given stages - * - * @deprecated since 1.2. Use {@link #connectPorts(OutputPort, InputPort)} instead. - */ - @Deprecated - protected static <T> IPipe connectIntraThreads(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { - return intraThreadFactory.create(sourcePort, targetPort); - } - - /** - * Connects two stages with a bounded pipe within two separate threads. - * - * @param sourcePort - * {@link OutputPort} of the sending stage - * @param targetPort - * {@link InputPort} of the sending stage - * @param <T> - * the type of elements to be sent - * @return - * the pipe instance which connects the two given stages - * - * @deprecated since 1.2. Use {@link #connectPorts(OutputPort, InputPort)} instead. - */ - @Deprecated - protected static <T> IPipe connectBoundedInterThreads(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { - return interBoundedThreadFactory.create(sourcePort, targetPort); - } - - /** - * Connects two stages with a unbounded pipe within two separate threads. - * - * @param sourcePort - * {@link OutputPort} of the sending stage - * @param targetPort - * {@link InputPort} of the sending stage - * @param <T> - * the type of elements to be sent - * @return - * the pipe instance which connects the two given stages - * - * @deprecated since 1.2. Use {@link #connectPorts(OutputPort, InputPort)} instead. - */ - @Deprecated - protected static <T> IPipe connectUnboundedInterThreads(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { - return interUnboundedThreadFactory.create(sourcePort, targetPort); - } - - /** - * Connects two stages with a bounded pipe within two separate threads. - * - * @param sourcePort - * {@link OutputPort} of the sending stage - * @param targetPort - * {@link InputPort} of the sending stage - * @param capacity - * capacity of the underlying queue - * @param <T> - * the type of elements to be sent - * @return - * the pipe instance which connects the two given stages - * - * @deprecated since 1.2. Use {@link #connectPorts(OutputPort, InputPort)} instead. - */ - @Deprecated - protected static <T> IPipe connectBoundedInterThreads(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { - return interBoundedThreadFactory.create(sourcePort, targetPort, capacity); - } - - /** - * Connects two stages with a unbounded pipe within two separate threads. - * - * @param sourcePort - * {@link OutputPort} of the sending stage - * @param targetPort - * {@link InputPort} of the sending stage - * @param capacity - * capacity of the underlying queue - * @param <T> - * the type of elements to be sent - * @return - * the pipe instance which connects the two given stages - * - * @deprecated since 1.2. Use {@link #connectPorts(OutputPort, InputPort)} instead. - */ - @Deprecated - protected static <T> IPipe connectUnboundedInterThreads(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { - return interUnboundedThreadFactory.create(sourcePort, targetPort, capacity); - } - /** * Connects two ports with a pipe with a default capacity of currently 4 * diff --git a/src/main/java/teetime/framework/Execution.java b/src/main/java/teetime/framework/Execution.java index 042584b4d270d1407c907a77d0b9aa55156472f5..710ec8b6ac48e5d5a5dbd0477d66ca9030a01c07 100644 --- a/src/main/java/teetime/framework/Execution.java +++ b/src/main/java/teetime/framework/Execution.java @@ -31,7 +31,7 @@ import teetime.framework.exceptionHandling.IgnoringExceptionListenerFactory; import teetime.framework.signal.InitializingSignal; import teetime.framework.signal.ValidatingSignal; import teetime.framework.validation.AnalysisNotValidException; -import teetime.util.Pair; +import teetime.util.ThreadThrowableContainer; /** * Represents an Execution to which stages can be added and executed later. @@ -59,7 +59,7 @@ public final class Execution<T extends ConfigurationContext> implements Uncaught private final List<Thread> finiteProducerThreads = new LinkedList<Thread>(); private final List<Thread> infiniteProducerThreads = new LinkedList<Thread>(); - private final Collection<Pair<Thread, Throwable>> exceptions = new ConcurrentLinkedQueue<Pair<Thread, Throwable>>(); + private final Collection<ThreadThrowableContainer<Thread, Throwable>> exceptions = new ConcurrentLinkedQueue<ThreadThrowableContainer<Thread, Throwable>>(); private final List<RunnableProducerStage> producerRunnables = new LinkedList<RunnableProducerStage>(); @@ -306,7 +306,7 @@ public final class Execution<T extends ConfigurationContext> implements Uncaught } } } - this.exceptions.add(Pair.of(thread, throwable)); + this.exceptions.add(ThreadThrowableContainer.of(thread, throwable)); } private Set<Stage> traverseIntraStages(final Stage stage) { diff --git a/src/main/java/teetime/framework/ExecutionException.java b/src/main/java/teetime/framework/ExecutionException.java index 0f123784ed18b778d269b83eada2e19c7bf4ada8..60f595ba3a5509d4a2a24856d88b8022f1acefed 100644 --- a/src/main/java/teetime/framework/ExecutionException.java +++ b/src/main/java/teetime/framework/ExecutionException.java @@ -17,7 +17,7 @@ package teetime.framework; import java.util.Collection; -import teetime.util.Pair; +import teetime.util.ThreadThrowableContainer; /** * Represents a exception, which is thrown by an analysis, if any problems occured within its execution. @@ -32,9 +32,9 @@ public class ExecutionException extends RuntimeException { */ private static final long serialVersionUID = 7486086437171884298L; - private final Collection<Pair<Thread, Throwable>> exceptions; + private final Collection<ThreadThrowableContainer<Thread, Throwable>> exceptions; - public ExecutionException(final Collection<Pair<Thread, Throwable>> exceptions) { + public ExecutionException(final Collection<ThreadThrowableContainer<Thread, Throwable>> exceptions) { super("Error(s) while running analysis. Check thrown exceptions."); this.exceptions = exceptions; } @@ -45,7 +45,7 @@ public class ExecutionException extends RuntimeException { * * @return a collection of pairs */ - public Collection<Pair<Thread, Throwable>> getThrownExceptions() { + public Collection<ThreadThrowableContainer<Thread, Throwable>> getThrownExceptions() { return exceptions; } diff --git a/src/main/java/teetime/util/Pair.java b/src/main/java/teetime/util/ThreadThrowableContainer.java similarity index 79% rename from src/main/java/teetime/util/Pair.java rename to src/main/java/teetime/util/ThreadThrowableContainer.java index 89df09442a63d909f70dd5d444b1e23eff4a28be..ad500d236cd3a59b42d5fa033ab83508ce9fd5d1 100644 --- a/src/main/java/teetime/util/Pair.java +++ b/src/main/java/teetime/util/ThreadThrowableContainer.java @@ -20,18 +20,18 @@ package teetime.util; */ @Deprecated // See http://stackoverflow.com/questions/156275/what-is-the-equivalent-of-the-c-pairl-r-in-java -public final class Pair<F, S> { +public final class ThreadThrowableContainer<F, S> { private final F first; private final S second; - public Pair(final F first, final S second) { + public ThreadThrowableContainer(final F first, final S second) { this.first = first; this.second = second; } - public static <F, S> Pair<F, S> of(final F first, final S second) { - return new Pair<F, S>(first, second); + public static <F, S> ThreadThrowableContainer<F, S> of(final F first, final S second) { + return new ThreadThrowableContainer<F, S>(first, second); } public F getFirst() { diff --git a/src/test/java/teetime/framework/RunnableConsumerStageTest.java b/src/test/java/teetime/framework/RunnableConsumerStageTest.java index b74a255ecdbaa3c2633fd2e83d39e4f556815091..f894429eca0570903508e1f89acd272386515c56 100644 --- a/src/test/java/teetime/framework/RunnableConsumerStageTest.java +++ b/src/test/java/teetime/framework/RunnableConsumerStageTest.java @@ -24,7 +24,7 @@ import java.util.Collection; import org.junit.Ignore; import org.junit.Test; -import teetime.util.Pair; +import teetime.util.ThreadThrowableContainer; import com.google.common.base.Joiner; @@ -118,13 +118,13 @@ public class RunnableConsumerStageTest { } private void start(final Execution execution) { - Collection<Pair<Thread, Throwable>> exceptions = new ArrayList<Pair<Thread, Throwable>>(); + Collection<ThreadThrowableContainer<Thread, Throwable>> exceptions = new ArrayList<ThreadThrowableContainer<Thread, Throwable>>(); try { execution.executeBlocking(); } catch (ExecutionException e) { exceptions = e.getThrownExceptions(); } - for (Pair<Thread, Throwable> pair : exceptions) { + for (ThreadThrowableContainer<Thread, Throwable> pair : exceptions) { System.err.println(pair.getSecond()); System.err.println(Joiner.on("\n").join(pair.getSecond().getStackTrace())); throw new AssertionError(pair.getSecond()); diff --git a/src/test/java/teetime/framework/RunnableConsumerStageTestConfiguration.java b/src/test/java/teetime/framework/RunnableConsumerStageTestConfiguration.java index 904cae287a8404ed1d314edbc1d6f89e8332bd5b..ed9eabec19e7b7da6258cff1bd9b401e220844d8 100644 --- a/src/test/java/teetime/framework/RunnableConsumerStageTestConfiguration.java +++ b/src/test/java/teetime/framework/RunnableConsumerStageTestConfiguration.java @@ -18,6 +18,7 @@ package teetime.framework; import java.util.ArrayList; import java.util.List; +import teetime.framework.pipe.SpScPipeFactory; import teetime.stage.CollectorSink; import teetime.stage.InitialElementProducer; @@ -36,7 +37,7 @@ public class RunnableConsumerStageTestConfiguration extends ConfigurationContext addThreadableStage(collectorSink); // Can not use createPorts, as the if condition above will lead to an exception - connectBoundedInterThreads(producer.getOutputPort(), collectorSink.getInputPort()); + new SpScPipeFactory().create(producer.getOutputPort(), collectorSink.getInputPort()); this.collectorSink = collectorSink; } diff --git a/src/test/java/teetime/stage/InstanceOfFilterTest.java b/src/test/java/teetime/stage/InstanceOfFilterTest.java index f4152c64ec89bfed0a176d0c9fa76b9e56591973..e582a9b256eb21d0faa76800acc8246c08ccfe43 100644 --- a/src/test/java/teetime/stage/InstanceOfFilterTest.java +++ b/src/test/java/teetime/stage/InstanceOfFilterTest.java @@ -32,7 +32,7 @@ import org.junit.Test; import teetime.framework.ConfigurationContext; import teetime.framework.Execution; import teetime.framework.ExecutionException; -import teetime.util.Pair; +import teetime.util.ThreadThrowableContainer; /** * @author Nils Christian Ehmke @@ -117,7 +117,7 @@ public class InstanceOfFilterTest { try { execution.executeBlocking(); } catch (ExecutionException e) { - Collection<Pair<Thread, Throwable>> thrownExceptions = e.getThrownExceptions(); + Collection<ThreadThrowableContainer<Thread, Throwable>> thrownExceptions = e.getThrownExceptions(); // TODO: handle exception } }