Skip to content
Snippets Groups Projects
Commit 834cbb20 authored by Nelson Tavares de Sousa's avatar Nelson Tavares de Sousa
Browse files

renamed Pair and removed deprecated methods from ExecutionContext

parent e553e9da
No related branches found
No related tags found
No related merge requests found
...@@ -21,12 +21,7 @@ import java.util.Set; ...@@ -21,12 +21,7 @@ import java.util.Set;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import teetime.framework.pipe.IPipe;
import teetime.framework.pipe.IPipeFactory;
import teetime.framework.pipe.InstantiationPipe; import teetime.framework.pipe.InstantiationPipe;
import teetime.framework.pipe.PipeFactoryRegistry;
import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
/** /**
* Represents a configuration of connected stages, which is needed to run a analysis. * Represents a configuration of connected stages, which is needed to run a analysis.
...@@ -38,22 +33,6 @@ public abstract class ConfigurationContext extends Configuration { ...@@ -38,22 +33,6 @@ public abstract class ConfigurationContext extends Configuration {
private final Set<Stage> threadableStages = new HashSet<Stage>(); private final Set<Stage> threadableStages = new HashSet<Stage>();
@SuppressWarnings("deprecation")
private static final PipeFactoryRegistry PIPE_FACTORY_REGISTRY = PipeFactoryRegistry.INSTANCE;
/**
* Can be used by subclasses, to connect stages
*/
private final static IPipeFactory intraThreadFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false);
/**
* Can be used by subclasses, to connect stages
*/
private final static IPipeFactory interBoundedThreadFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false);
/**
* Can be used by subclasses, to connect stages
*/
private final static IPipeFactory interUnboundedThreadFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, true);
Set<Stage> getThreadableStages() { Set<Stage> getThreadableStages() {
return this.threadableStages; return this.threadableStages;
} }
...@@ -71,105 +50,6 @@ public abstract class ConfigurationContext extends Configuration { ...@@ -71,105 +50,6 @@ public abstract class ConfigurationContext extends Configuration {
} }
} }
/**
* Connects two stages with a pipe within the same thread.
*
* @param sourcePort
* {@link OutputPort} of the sending stage
* @param targetPort
* {@link InputPort} of the sending stage
* @param <T>
* the type of elements to be sent
* @return
* the pipe instance which connects the two given stages
*
* @deprecated since 1.2. Use {@link #connectPorts(OutputPort, InputPort)} instead.
*/
@Deprecated
protected static <T> IPipe connectIntraThreads(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
return intraThreadFactory.create(sourcePort, targetPort);
}
/**
* Connects two stages with a bounded pipe within two separate threads.
*
* @param sourcePort
* {@link OutputPort} of the sending stage
* @param targetPort
* {@link InputPort} of the sending stage
* @param <T>
* the type of elements to be sent
* @return
* the pipe instance which connects the two given stages
*
* @deprecated since 1.2. Use {@link #connectPorts(OutputPort, InputPort)} instead.
*/
@Deprecated
protected static <T> IPipe connectBoundedInterThreads(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
return interBoundedThreadFactory.create(sourcePort, targetPort);
}
/**
* Connects two stages with a unbounded pipe within two separate threads.
*
* @param sourcePort
* {@link OutputPort} of the sending stage
* @param targetPort
* {@link InputPort} of the sending stage
* @param <T>
* the type of elements to be sent
* @return
* the pipe instance which connects the two given stages
*
* @deprecated since 1.2. Use {@link #connectPorts(OutputPort, InputPort)} instead.
*/
@Deprecated
protected static <T> IPipe connectUnboundedInterThreads(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
return interUnboundedThreadFactory.create(sourcePort, targetPort);
}
/**
* Connects two stages with a bounded pipe within two separate threads.
*
* @param sourcePort
* {@link OutputPort} of the sending stage
* @param targetPort
* {@link InputPort} of the sending stage
* @param capacity
* capacity of the underlying queue
* @param <T>
* the type of elements to be sent
* @return
* the pipe instance which connects the two given stages
*
* @deprecated since 1.2. Use {@link #connectPorts(OutputPort, InputPort)} instead.
*/
@Deprecated
protected static <T> IPipe connectBoundedInterThreads(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
return interBoundedThreadFactory.create(sourcePort, targetPort, capacity);
}
/**
* Connects two stages with a unbounded pipe within two separate threads.
*
* @param sourcePort
* {@link OutputPort} of the sending stage
* @param targetPort
* {@link InputPort} of the sending stage
* @param capacity
* capacity of the underlying queue
* @param <T>
* the type of elements to be sent
* @return
* the pipe instance which connects the two given stages
*
* @deprecated since 1.2. Use {@link #connectPorts(OutputPort, InputPort)} instead.
*/
@Deprecated
protected static <T> IPipe connectUnboundedInterThreads(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
return interUnboundedThreadFactory.create(sourcePort, targetPort, capacity);
}
/** /**
* Connects two ports with a pipe with a default capacity of currently 4 * Connects two ports with a pipe with a default capacity of currently 4
* *
......
...@@ -31,7 +31,7 @@ import teetime.framework.exceptionHandling.IgnoringExceptionListenerFactory; ...@@ -31,7 +31,7 @@ import teetime.framework.exceptionHandling.IgnoringExceptionListenerFactory;
import teetime.framework.signal.InitializingSignal; import teetime.framework.signal.InitializingSignal;
import teetime.framework.signal.ValidatingSignal; import teetime.framework.signal.ValidatingSignal;
import teetime.framework.validation.AnalysisNotValidException; import teetime.framework.validation.AnalysisNotValidException;
import teetime.util.Pair; import teetime.util.ThreadThrowableContainer;
/** /**
* Represents an Execution to which stages can be added and executed later. * Represents an Execution to which stages can be added and executed later.
...@@ -59,7 +59,7 @@ public final class Execution<T extends ConfigurationContext> implements Uncaught ...@@ -59,7 +59,7 @@ public final class Execution<T extends ConfigurationContext> implements Uncaught
private final List<Thread> finiteProducerThreads = new LinkedList<Thread>(); private final List<Thread> finiteProducerThreads = new LinkedList<Thread>();
private final List<Thread> infiniteProducerThreads = new LinkedList<Thread>(); private final List<Thread> infiniteProducerThreads = new LinkedList<Thread>();
private final Collection<Pair<Thread, Throwable>> exceptions = new ConcurrentLinkedQueue<Pair<Thread, Throwable>>(); private final Collection<ThreadThrowableContainer<Thread, Throwable>> exceptions = new ConcurrentLinkedQueue<ThreadThrowableContainer<Thread, Throwable>>();
private final List<RunnableProducerStage> producerRunnables = new LinkedList<RunnableProducerStage>(); private final List<RunnableProducerStage> producerRunnables = new LinkedList<RunnableProducerStage>();
...@@ -306,7 +306,7 @@ public final class Execution<T extends ConfigurationContext> implements Uncaught ...@@ -306,7 +306,7 @@ public final class Execution<T extends ConfigurationContext> implements Uncaught
} }
} }
} }
this.exceptions.add(Pair.of(thread, throwable)); this.exceptions.add(ThreadThrowableContainer.of(thread, throwable));
} }
private Set<Stage> traverseIntraStages(final Stage stage) { private Set<Stage> traverseIntraStages(final Stage stage) {
......
...@@ -17,7 +17,7 @@ package teetime.framework; ...@@ -17,7 +17,7 @@ package teetime.framework;
import java.util.Collection; import java.util.Collection;
import teetime.util.Pair; import teetime.util.ThreadThrowableContainer;
/** /**
* Represents a exception, which is thrown by an analysis, if any problems occured within its execution. * Represents a exception, which is thrown by an analysis, if any problems occured within its execution.
...@@ -32,9 +32,9 @@ public class ExecutionException extends RuntimeException { ...@@ -32,9 +32,9 @@ public class ExecutionException extends RuntimeException {
*/ */
private static final long serialVersionUID = 7486086437171884298L; private static final long serialVersionUID = 7486086437171884298L;
private final Collection<Pair<Thread, Throwable>> exceptions; private final Collection<ThreadThrowableContainer<Thread, Throwable>> exceptions;
public ExecutionException(final Collection<Pair<Thread, Throwable>> exceptions) { public ExecutionException(final Collection<ThreadThrowableContainer<Thread, Throwable>> exceptions) {
super("Error(s) while running analysis. Check thrown exceptions."); super("Error(s) while running analysis. Check thrown exceptions.");
this.exceptions = exceptions; this.exceptions = exceptions;
} }
...@@ -45,7 +45,7 @@ public class ExecutionException extends RuntimeException { ...@@ -45,7 +45,7 @@ public class ExecutionException extends RuntimeException {
* *
* @return a collection of pairs * @return a collection of pairs
*/ */
public Collection<Pair<Thread, Throwable>> getThrownExceptions() { public Collection<ThreadThrowableContainer<Thread, Throwable>> getThrownExceptions() {
return exceptions; return exceptions;
} }
......
...@@ -20,18 +20,18 @@ package teetime.util; ...@@ -20,18 +20,18 @@ package teetime.util;
*/ */
@Deprecated @Deprecated
// See http://stackoverflow.com/questions/156275/what-is-the-equivalent-of-the-c-pairl-r-in-java // See http://stackoverflow.com/questions/156275/what-is-the-equivalent-of-the-c-pairl-r-in-java
public final class Pair<F, S> { public final class ThreadThrowableContainer<F, S> {
private final F first; private final F first;
private final S second; private final S second;
public Pair(final F first, final S second) { public ThreadThrowableContainer(final F first, final S second) {
this.first = first; this.first = first;
this.second = second; this.second = second;
} }
public static <F, S> Pair<F, S> of(final F first, final S second) { public static <F, S> ThreadThrowableContainer<F, S> of(final F first, final S second) {
return new Pair<F, S>(first, second); return new ThreadThrowableContainer<F, S>(first, second);
} }
public F getFirst() { public F getFirst() {
......
...@@ -24,7 +24,7 @@ import java.util.Collection; ...@@ -24,7 +24,7 @@ import java.util.Collection;
import org.junit.Ignore; import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import teetime.util.Pair; import teetime.util.ThreadThrowableContainer;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
...@@ -118,13 +118,13 @@ public class RunnableConsumerStageTest { ...@@ -118,13 +118,13 @@ public class RunnableConsumerStageTest {
} }
private void start(final Execution execution) { private void start(final Execution execution) {
Collection<Pair<Thread, Throwable>> exceptions = new ArrayList<Pair<Thread, Throwable>>(); Collection<ThreadThrowableContainer<Thread, Throwable>> exceptions = new ArrayList<ThreadThrowableContainer<Thread, Throwable>>();
try { try {
execution.executeBlocking(); execution.executeBlocking();
} catch (ExecutionException e) { } catch (ExecutionException e) {
exceptions = e.getThrownExceptions(); exceptions = e.getThrownExceptions();
} }
for (Pair<Thread, Throwable> pair : exceptions) { for (ThreadThrowableContainer<Thread, Throwable> pair : exceptions) {
System.err.println(pair.getSecond()); System.err.println(pair.getSecond());
System.err.println(Joiner.on("\n").join(pair.getSecond().getStackTrace())); System.err.println(Joiner.on("\n").join(pair.getSecond().getStackTrace()));
throw new AssertionError(pair.getSecond()); throw new AssertionError(pair.getSecond());
......
...@@ -18,6 +18,7 @@ package teetime.framework; ...@@ -18,6 +18,7 @@ package teetime.framework;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import teetime.framework.pipe.SpScPipeFactory;
import teetime.stage.CollectorSink; import teetime.stage.CollectorSink;
import teetime.stage.InitialElementProducer; import teetime.stage.InitialElementProducer;
...@@ -36,7 +37,7 @@ public class RunnableConsumerStageTestConfiguration extends ConfigurationContext ...@@ -36,7 +37,7 @@ public class RunnableConsumerStageTestConfiguration extends ConfigurationContext
addThreadableStage(collectorSink); addThreadableStage(collectorSink);
// Can not use createPorts, as the if condition above will lead to an exception // Can not use createPorts, as the if condition above will lead to an exception
connectBoundedInterThreads(producer.getOutputPort(), collectorSink.getInputPort()); new SpScPipeFactory().create(producer.getOutputPort(), collectorSink.getInputPort());
this.collectorSink = collectorSink; this.collectorSink = collectorSink;
} }
......
...@@ -32,7 +32,7 @@ import org.junit.Test; ...@@ -32,7 +32,7 @@ import org.junit.Test;
import teetime.framework.ConfigurationContext; import teetime.framework.ConfigurationContext;
import teetime.framework.Execution; import teetime.framework.Execution;
import teetime.framework.ExecutionException; import teetime.framework.ExecutionException;
import teetime.util.Pair; import teetime.util.ThreadThrowableContainer;
/** /**
* @author Nils Christian Ehmke * @author Nils Christian Ehmke
...@@ -117,7 +117,7 @@ public class InstanceOfFilterTest { ...@@ -117,7 +117,7 @@ public class InstanceOfFilterTest {
try { try {
execution.executeBlocking(); execution.executeBlocking();
} catch (ExecutionException e) { } catch (ExecutionException e) {
Collection<Pair<Thread, Throwable>> thrownExceptions = e.getThrownExceptions(); Collection<ThreadThrowableContainer<Thread, Throwable>> thrownExceptions = e.getThrownExceptions();
// TODO: handle exception // TODO: handle exception
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment