Skip to content
Snippets Groups Projects
Commit d27b36b4 authored by Nelson Tavares de Sousa's avatar Nelson Tavares de Sousa
Browse files

some refactoring

parent bdea8523
No related branches found
No related tags found
No related merge requests found
...@@ -17,10 +17,8 @@ package teetime.framework; ...@@ -17,10 +17,8 @@ package teetime.framework;
import java.lang.Thread.UncaughtExceptionHandler; import java.lang.Thread.UncaughtExceptionHandler;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
...@@ -30,11 +28,6 @@ import org.slf4j.LoggerFactory; ...@@ -30,11 +28,6 @@ import org.slf4j.LoggerFactory;
import teetime.framework.exceptionHandling.AbstractExceptionListener; import teetime.framework.exceptionHandling.AbstractExceptionListener;
import teetime.framework.exceptionHandling.IExceptionListenerFactory; import teetime.framework.exceptionHandling.IExceptionListenerFactory;
import teetime.framework.exceptionHandling.IgnoringExceptionListenerFactory; import teetime.framework.exceptionHandling.IgnoringExceptionListenerFactory;
import teetime.framework.pipe.IPipeFactory;
import teetime.framework.pipe.InstantiationPipe;
import teetime.framework.pipe.SingleElementPipeFactory;
import teetime.framework.pipe.SpScPipeFactory;
import teetime.framework.pipe.UnboundedSpScPipeFactory;
import teetime.framework.signal.InitializingSignal; import teetime.framework.signal.InitializingSignal;
import teetime.framework.signal.ValidatingSignal; import teetime.framework.signal.ValidatingSignal;
import teetime.framework.validation.AnalysisNotValidException; import teetime.framework.validation.AnalysisNotValidException;
...@@ -68,11 +61,6 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught ...@@ -68,11 +61,6 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught
private final Collection<Pair<Thread, Throwable>> exceptions = new ConcurrentLinkedQueue<Pair<Thread, Throwable>>(); private final Collection<Pair<Thread, Throwable>> exceptions = new ConcurrentLinkedQueue<Pair<Thread, Throwable>>();
private final IPipeFactory interBoundedThreadPipeFactory = new SpScPipeFactory();
private final IPipeFactory interUnboundedThreadPipeFactory = new UnboundedSpScPipeFactory();
private final IPipeFactory intraThreadPipeFactory = new SingleElementPipeFactory();
private int createdConnections = 0;
private final List<RunnableProducerStage> producerRunnables = new LinkedList<RunnableProducerStage>(); private final List<RunnableProducerStage> producerRunnables = new LinkedList<RunnableProducerStage>();
/** /**
...@@ -112,7 +100,7 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught ...@@ -112,7 +100,7 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught
// BETTER validate concurrently // BETTER validate concurrently
private void validateStages() { private void validateStages() {
final Set<Stage> threadableStageJobs = this.configuration.getThreadableStageJobs(); final Set<Stage> threadableStageJobs = this.configuration.getThreadableStages();
for (Stage stage : threadableStageJobs) { for (Stage stage : threadableStageJobs) {
// // portConnectionValidator.validate(stage); // // portConnectionValidator.validate(stage);
// } // }
...@@ -131,9 +119,9 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught ...@@ -131,9 +119,9 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught
*/ */
private final void init() { private final void init() {
instantiatePipes(); AnalysisInstantiation.instantiatePipes(configuration);
final Set<Stage> threadableStageJobs = this.configuration.getThreadableStageJobs(); final Set<Stage> threadableStageJobs = this.configuration.getThreadableStages();
if (threadableStageJobs.isEmpty()) { if (threadableStageJobs.isEmpty()) {
throw new IllegalStateException("No stage was added using the addThreadableStage(..) method. Add at least one stage."); throw new IllegalStateException("No stage was added using the addThreadableStage(..) method. Add at least one stage.");
} }
...@@ -189,53 +177,6 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught ...@@ -189,53 +177,6 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught
return thread; return thread;
} }
private void instantiatePipes() {
Integer i = new Integer(0);
Map<Stage, Integer> colors = new HashMap<Stage, Integer>();
Set<Stage> threadableStageJobs = configuration.getThreadableStageJobs();
for (Stage threadableStage : threadableStageJobs) {
i++;
colors.put(threadableStage, i);
colorAndConnectStages(i, colors, threadableStage);
}
LOGGER.debug("Created " + createdConnections + "connections");
}
@SuppressWarnings("rawtypes")
private void colorAndConnectStages(final Integer i, final Map<Stage, Integer> colors, final Stage threadableStage) {
Set<Stage> threadableStageJobs = configuration.getThreadableStageJobs();
for (OutputPort outputPort : threadableStage.getOutputPorts()) {
if (outputPort.pipe != null) {
if (outputPort.pipe instanceof InstantiationPipe) {
InstantiationPipe pipe = (InstantiationPipe) outputPort.pipe;
Stage targetStage = pipe.getTargetPort().getOwningStage();
Integer targetColor = new Integer(0);
if (colors.containsKey(targetStage)) {
targetColor = colors.get(targetStage);
}
if (threadableStageJobs.contains(targetStage) && targetColor.compareTo(i) != 0) {
if (pipe.getCapacity() != 0) {
interBoundedThreadPipeFactory.create(outputPort, pipe.getTarget(), pipe.getCapacity());
} else {
interUnboundedThreadPipeFactory.create(outputPort, pipe.getTarget(), 4);
}
} else {
if (colors.containsKey(targetStage)) {
if (!colors.get(targetStage).equals(i)) {
throw new IllegalStateException("Crossing threads"); // One stage is connected to a stage of another thread (but not its "headstage")
}
}
intraThreadPipeFactory.create(outputPort, pipe.getTarget());
colors.put(targetStage, i);
colorAndConnectStages(i, colors, targetStage);
}
createdConnections++;
}
}
}
}
private Thread createThread(final AbstractRunnableStage runnable, final String name) { private Thread createThread(final AbstractRunnableStage runnable, final String name) {
final Thread thread = new Thread(runnable); final Thread thread = new Thread(runnable);
thread.setUncaughtExceptionHandler(this); thread.setUncaughtExceptionHandler(this);
...@@ -357,7 +298,7 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught ...@@ -357,7 +298,7 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught
if (!executionInterrupted) { if (!executionInterrupted) {
executionInterrupted = true; executionInterrupted = true;
LOGGER.warn("Thread " + thread + " was interrupted. Terminating analysis now."); LOGGER.warn("Thread " + thread + " was interrupted. Terminating analysis now.");
for (Stage stage : configuration.getThreadableStageJobs()) { for (Stage stage : configuration.getThreadableStages()) {
if (stage.getOwningThread() != thread) { if (stage.getOwningThread() != thread) {
if (stage.getTerminationStrategy() == TerminationStrategy.BY_SELF_DECISION) { if (stage.getTerminationStrategy() == TerminationStrategy.BY_SELF_DECISION) {
stage.terminate(); stage.terminate();
......
...@@ -49,7 +49,7 @@ public abstract class AnalysisConfiguration { ...@@ -49,7 +49,7 @@ public abstract class AnalysisConfiguration {
*/ */
private final static IPipeFactory interUnboundedThreadFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, true); private final static IPipeFactory interUnboundedThreadFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, true);
Set<Stage> getThreadableStageJobs() { Set<Stage> getThreadableStages() {
return this.threadableStageJobs; return this.threadableStageJobs;
} }
...@@ -71,7 +71,7 @@ public abstract class AnalysisConfiguration { ...@@ -71,7 +71,7 @@ public abstract class AnalysisConfiguration {
*/ */
protected final void addThreadableStage(final AbstractCompositeStage stage) { protected final void addThreadableStage(final AbstractCompositeStage stage) {
this.threadableStageJobs.add(stage.getFirstStage()); this.threadableStageJobs.add(stage.getFirstStage());
for (Stage threadableStage : stage.getThreadableStageJobs()) { for (Stage threadableStage : stage.getThreadableStages()) {
this.addThreadableStage(threadableStage); this.addThreadableStage(threadableStage);
} }
} }
......
package teetime.framework;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import teetime.framework.pipe.IPipeFactory;
import teetime.framework.pipe.InstantiationPipe;
import teetime.framework.pipe.SingleElementPipeFactory;
import teetime.framework.pipe.SpScPipeFactory;
import teetime.framework.pipe.UnboundedSpScPipeFactory;
class AnalysisInstantiation {
private static final Logger LOGGER = LoggerFactory.getLogger(AnalysisInstantiation.class);
private static final IPipeFactory interBoundedThreadPipeFactory = new SpScPipeFactory();
private static final IPipeFactory interUnboundedThreadPipeFactory = new UnboundedSpScPipeFactory();
private static final IPipeFactory intraThreadPipeFactory = new SingleElementPipeFactory();
@SuppressWarnings("rawtypes")
static Integer colorAndConnectStages(final Integer i, final Map<Stage, Integer> colors, final Stage threadableStage, final AnalysisConfiguration configuration) {
Integer createdConnections = new Integer(0);
Set<Stage> threadableStageJobs = configuration.getThreadableStages();
for (OutputPort outputPort : threadableStage.getOutputPorts()) {
if (outputPort.pipe != null) {
if (outputPort.pipe instanceof InstantiationPipe) {
InstantiationPipe pipe = (InstantiationPipe) outputPort.pipe;
Stage targetStage = pipe.getTargetPort().getOwningStage();
Integer targetColor = new Integer(0);
if (colors.containsKey(targetStage)) {
targetColor = colors.get(targetStage);
}
if (threadableStageJobs.contains(targetStage) && targetColor.compareTo(i) != 0) {
if (pipe.getCapacity() != 0) {
interBoundedThreadPipeFactory.create(outputPort, pipe.getTargetPort(), pipe.getCapacity());
} else {
interUnboundedThreadPipeFactory.create(outputPort, pipe.getTargetPort(), 4);
}
} else {
if (colors.containsKey(targetStage)) {
if (!colors.get(targetStage).equals(i)) {
throw new IllegalStateException("Crossing threads"); // One stage is connected to a stage of another thread (but not its "headstage")
}
}
intraThreadPipeFactory.create(outputPort, pipe.getTargetPort());
colors.put(targetStage, i);
createdConnections += colorAndConnectStages(i, colors, targetStage, configuration);
}
createdConnections++;
}
}
}
return createdConnections;
}
static void instantiatePipes(final AnalysisConfiguration configuration) {
Integer i = new Integer(0);
Map<Stage, Integer> colors = new HashMap<Stage, Integer>();
Set<Stage> threadableStageJobs = configuration.getThreadableStages();
Integer createdConnections = 0;
for (Stage threadableStage : threadableStageJobs) {
i++;
colors.put(threadableStage, i);
createdConnections = AnalysisInstantiation.colorAndConnectStages(i, colors, threadableStage, configuration);
}
LOGGER.debug("Created " + createdConnections + "connections");
}
}
...@@ -15,17 +15,16 @@ ...@@ -15,17 +15,16 @@
*/ */
package teetime.framework.pipe; package teetime.framework.pipe;
import teetime.framework.AbstractIntraThreadPipe;
import teetime.framework.InputPort; import teetime.framework.InputPort;
import teetime.framework.OutputPort; import teetime.framework.OutputPort;
import teetime.framework.signal.ISignal;
public class InstantiationPipe<T> extends AbstractIntraThreadPipe { public class InstantiationPipe<T> implements IPipe {
private final InputPort<T> target; private final InputPort<T> target;
private final int capacity; private final int capacity;
public InstantiationPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { public InstantiationPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
super(sourcePort, targetPort);
this.target = targetPort; this.target = targetPort;
this.capacity = capacity; this.capacity = capacity;
sourcePort.setPipe(this); sourcePort.setPipe(this);
...@@ -35,28 +34,88 @@ public class InstantiationPipe<T> extends AbstractIntraThreadPipe { ...@@ -35,28 +34,88 @@ public class InstantiationPipe<T> extends AbstractIntraThreadPipe {
return capacity; return capacity;
} }
public InputPort<T> getTarget() { @Override
return target; public boolean add(final Object element) {
// TODO Auto-generated method stub
return false;
} }
@Override @Override
public boolean add(final Object element) { public boolean addNonBlocking(final Object element) {
throw new IllegalStateException("Should not be called"); // TODO Auto-generated method stub
return false;
} }
@Override @Override
public boolean isEmpty() { public boolean isEmpty() {
throw new IllegalStateException("Should not be called"); // TODO Auto-generated method stub
return false;
} }
@Override @Override
public int size() { public int size() {
throw new IllegalStateException("Should not be called"); // TODO Auto-generated method stub
return 0;
} }
@Override @Override
public Object removeLast() { public Object removeLast() {
throw new IllegalStateException("Should not be called"); // TODO Auto-generated method stub
return null;
}
@Override
public InputPort<?> getTargetPort() {
// TODO Auto-generated method stub
return this.target;
}
@Override
public void sendSignal(final ISignal signal) {
// TODO Auto-generated method stub
}
@Override
public <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
// TODO Auto-generated method stub
}
@Override
public void reportNewElement() {
// TODO Auto-generated method stub
}
@Override
public boolean isClosed() {
// TODO Auto-generated method stub
return false;
}
@Override
public boolean hasMore() {
// TODO Auto-generated method stub
return false;
}
@Override
public void waitForStartSignal() throws InterruptedException {
// TODO Auto-generated method stub
}
@Override
public void waitForInitializingSignal() throws InterruptedException {
// TODO Auto-generated method stub
}
@Override
public void close() {
// TODO Auto-generated method stub
} }
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment