diff --git a/src/main/java/teetime/framework/Analysis.java b/src/main/java/teetime/framework/Analysis.java index e2737f87fe663b1fa4cd3caae0fb35c775d469ba..ea917227e8417d3e96de2d8ccd7e0a4a8e30ff3c 100644 --- a/src/main/java/teetime/framework/Analysis.java +++ b/src/main/java/teetime/framework/Analysis.java @@ -17,10 +17,8 @@ package teetime.framework; import java.lang.Thread.UncaughtExceptionHandler; import java.util.Collection; -import java.util.HashMap; import java.util.LinkedList; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; @@ -30,11 +28,6 @@ import org.slf4j.LoggerFactory; import teetime.framework.exceptionHandling.AbstractExceptionListener; import teetime.framework.exceptionHandling.IExceptionListenerFactory; import teetime.framework.exceptionHandling.IgnoringExceptionListenerFactory; -import teetime.framework.pipe.IPipeFactory; -import teetime.framework.pipe.InstantiationPipe; -import teetime.framework.pipe.SingleElementPipeFactory; -import teetime.framework.pipe.SpScPipeFactory; -import teetime.framework.pipe.UnboundedSpScPipeFactory; import teetime.framework.signal.InitializingSignal; import teetime.framework.signal.ValidatingSignal; import teetime.framework.validation.AnalysisNotValidException; @@ -68,11 +61,6 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught private final Collection<Pair<Thread, Throwable>> exceptions = new ConcurrentLinkedQueue<Pair<Thread, Throwable>>(); - private final IPipeFactory interBoundedThreadPipeFactory = new SpScPipeFactory(); - private final IPipeFactory interUnboundedThreadPipeFactory = new UnboundedSpScPipeFactory(); - private final IPipeFactory intraThreadPipeFactory = new SingleElementPipeFactory(); - private int createdConnections = 0; - private final List<RunnableProducerStage> producerRunnables = new LinkedList<RunnableProducerStage>(); /** @@ -112,7 +100,7 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught // BETTER validate concurrently private void validateStages() { - final Set<Stage> threadableStageJobs = this.configuration.getThreadableStageJobs(); + final Set<Stage> threadableStageJobs = this.configuration.getThreadableStages(); for (Stage stage : threadableStageJobs) { // // portConnectionValidator.validate(stage); // } @@ -131,9 +119,9 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught */ private final void init() { - instantiatePipes(); + AnalysisInstantiation.instantiatePipes(configuration); - final Set<Stage> threadableStageJobs = this.configuration.getThreadableStageJobs(); + final Set<Stage> threadableStageJobs = this.configuration.getThreadableStages(); if (threadableStageJobs.isEmpty()) { throw new IllegalStateException("No stage was added using the addThreadableStage(..) method. Add at least one stage."); } @@ -189,53 +177,6 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught return thread; } - private void instantiatePipes() { - Integer i = new Integer(0); - Map<Stage, Integer> colors = new HashMap<Stage, Integer>(); - Set<Stage> threadableStageJobs = configuration.getThreadableStageJobs(); - for (Stage threadableStage : threadableStageJobs) { - i++; - colors.put(threadableStage, i); - colorAndConnectStages(i, colors, threadableStage); - } - LOGGER.debug("Created " + createdConnections + "connections"); - } - - @SuppressWarnings("rawtypes") - private void colorAndConnectStages(final Integer i, final Map<Stage, Integer> colors, final Stage threadableStage) { - Set<Stage> threadableStageJobs = configuration.getThreadableStageJobs(); - for (OutputPort outputPort : threadableStage.getOutputPorts()) { - if (outputPort.pipe != null) { - if (outputPort.pipe instanceof InstantiationPipe) { - InstantiationPipe pipe = (InstantiationPipe) outputPort.pipe; - Stage targetStage = pipe.getTargetPort().getOwningStage(); - Integer targetColor = new Integer(0); - if (colors.containsKey(targetStage)) { - targetColor = colors.get(targetStage); - } - if (threadableStageJobs.contains(targetStage) && targetColor.compareTo(i) != 0) { - if (pipe.getCapacity() != 0) { - interBoundedThreadPipeFactory.create(outputPort, pipe.getTarget(), pipe.getCapacity()); - } else { - interUnboundedThreadPipeFactory.create(outputPort, pipe.getTarget(), 4); - } - } else { - if (colors.containsKey(targetStage)) { - if (!colors.get(targetStage).equals(i)) { - throw new IllegalStateException("Crossing threads"); // One stage is connected to a stage of another thread (but not its "headstage") - } - } - intraThreadPipeFactory.create(outputPort, pipe.getTarget()); - colors.put(targetStage, i); - colorAndConnectStages(i, colors, targetStage); - } - createdConnections++; - } - } - - } - } - private Thread createThread(final AbstractRunnableStage runnable, final String name) { final Thread thread = new Thread(runnable); thread.setUncaughtExceptionHandler(this); @@ -357,7 +298,7 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught if (!executionInterrupted) { executionInterrupted = true; LOGGER.warn("Thread " + thread + " was interrupted. Terminating analysis now."); - for (Stage stage : configuration.getThreadableStageJobs()) { + for (Stage stage : configuration.getThreadableStages()) { if (stage.getOwningThread() != thread) { if (stage.getTerminationStrategy() == TerminationStrategy.BY_SELF_DECISION) { stage.terminate(); diff --git a/src/main/java/teetime/framework/AnalysisConfiguration.java b/src/main/java/teetime/framework/AnalysisConfiguration.java index 8c3c92780444aea1a8a48d488df067f99815946d..8edb7c391b4d25d1b17ccd7f4ea88e2ed0f92844 100644 --- a/src/main/java/teetime/framework/AnalysisConfiguration.java +++ b/src/main/java/teetime/framework/AnalysisConfiguration.java @@ -49,7 +49,7 @@ public abstract class AnalysisConfiguration { */ private final static IPipeFactory interUnboundedThreadFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, true); - Set<Stage> getThreadableStageJobs() { + Set<Stage> getThreadableStages() { return this.threadableStageJobs; } @@ -71,7 +71,7 @@ public abstract class AnalysisConfiguration { */ protected final void addThreadableStage(final AbstractCompositeStage stage) { this.threadableStageJobs.add(stage.getFirstStage()); - for (Stage threadableStage : stage.getThreadableStageJobs()) { + for (Stage threadableStage : stage.getThreadableStages()) { this.addThreadableStage(threadableStage); } } diff --git a/src/main/java/teetime/framework/AnalysisInstantiation.java b/src/main/java/teetime/framework/AnalysisInstantiation.java new file mode 100644 index 0000000000000000000000000000000000000000..e659e4e77f89f5fabb2d8b31a2c5182401c79ec1 --- /dev/null +++ b/src/main/java/teetime/framework/AnalysisInstantiation.java @@ -0,0 +1,74 @@ +package teetime.framework; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import teetime.framework.pipe.IPipeFactory; +import teetime.framework.pipe.InstantiationPipe; +import teetime.framework.pipe.SingleElementPipeFactory; +import teetime.framework.pipe.SpScPipeFactory; +import teetime.framework.pipe.UnboundedSpScPipeFactory; + +class AnalysisInstantiation { + + private static final Logger LOGGER = LoggerFactory.getLogger(AnalysisInstantiation.class); + + private static final IPipeFactory interBoundedThreadPipeFactory = new SpScPipeFactory(); + private static final IPipeFactory interUnboundedThreadPipeFactory = new UnboundedSpScPipeFactory(); + private static final IPipeFactory intraThreadPipeFactory = new SingleElementPipeFactory(); + + @SuppressWarnings("rawtypes") + static Integer colorAndConnectStages(final Integer i, final Map<Stage, Integer> colors, final Stage threadableStage, final AnalysisConfiguration configuration) { + Integer createdConnections = new Integer(0); + Set<Stage> threadableStageJobs = configuration.getThreadableStages(); + for (OutputPort outputPort : threadableStage.getOutputPorts()) { + if (outputPort.pipe != null) { + if (outputPort.pipe instanceof InstantiationPipe) { + InstantiationPipe pipe = (InstantiationPipe) outputPort.pipe; + Stage targetStage = pipe.getTargetPort().getOwningStage(); + Integer targetColor = new Integer(0); + if (colors.containsKey(targetStage)) { + targetColor = colors.get(targetStage); + } + if (threadableStageJobs.contains(targetStage) && targetColor.compareTo(i) != 0) { + if (pipe.getCapacity() != 0) { + interBoundedThreadPipeFactory.create(outputPort, pipe.getTargetPort(), pipe.getCapacity()); + } else { + interUnboundedThreadPipeFactory.create(outputPort, pipe.getTargetPort(), 4); + } + } else { + if (colors.containsKey(targetStage)) { + if (!colors.get(targetStage).equals(i)) { + throw new IllegalStateException("Crossing threads"); // One stage is connected to a stage of another thread (but not its "headstage") + } + } + intraThreadPipeFactory.create(outputPort, pipe.getTargetPort()); + colors.put(targetStage, i); + createdConnections += colorAndConnectStages(i, colors, targetStage, configuration); + } + createdConnections++; + } + } + + } + return createdConnections; + } + + static void instantiatePipes(final AnalysisConfiguration configuration) { + Integer i = new Integer(0); + Map<Stage, Integer> colors = new HashMap<Stage, Integer>(); + Set<Stage> threadableStageJobs = configuration.getThreadableStages(); + Integer createdConnections = 0; + for (Stage threadableStage : threadableStageJobs) { + i++; + colors.put(threadableStage, i); + createdConnections = AnalysisInstantiation.colorAndConnectStages(i, colors, threadableStage, configuration); + } + LOGGER.debug("Created " + createdConnections + "connections"); + } + +} diff --git a/src/main/java/teetime/framework/pipe/InstantiationPipe.java b/src/main/java/teetime/framework/pipe/InstantiationPipe.java index 513bf516fab09863b5ac9a72b3f1b429cef993d8..799ed32400fb2c28858dc62b4ed74ee55294e9b0 100644 --- a/src/main/java/teetime/framework/pipe/InstantiationPipe.java +++ b/src/main/java/teetime/framework/pipe/InstantiationPipe.java @@ -15,17 +15,16 @@ */ package teetime.framework.pipe; -import teetime.framework.AbstractIntraThreadPipe; import teetime.framework.InputPort; import teetime.framework.OutputPort; +import teetime.framework.signal.ISignal; -public class InstantiationPipe<T> extends AbstractIntraThreadPipe { +public class InstantiationPipe<T> implements IPipe { private final InputPort<T> target; private final int capacity; public InstantiationPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { - super(sourcePort, targetPort); this.target = targetPort; this.capacity = capacity; sourcePort.setPipe(this); @@ -35,28 +34,88 @@ public class InstantiationPipe<T> extends AbstractIntraThreadPipe { return capacity; } - public InputPort<T> getTarget() { - return target; + @Override + public boolean add(final Object element) { + // TODO Auto-generated method stub + return false; } @Override - public boolean add(final Object element) { - throw new IllegalStateException("Should not be called"); + public boolean addNonBlocking(final Object element) { + // TODO Auto-generated method stub + return false; } @Override public boolean isEmpty() { - throw new IllegalStateException("Should not be called"); + // TODO Auto-generated method stub + return false; } @Override public int size() { - throw new IllegalStateException("Should not be called"); + // TODO Auto-generated method stub + return 0; } @Override public Object removeLast() { - throw new IllegalStateException("Should not be called"); + // TODO Auto-generated method stub + return null; + } + + @Override + public InputPort<?> getTargetPort() { + // TODO Auto-generated method stub + return this.target; + } + + @Override + public void sendSignal(final ISignal signal) { + // TODO Auto-generated method stub + + } + + @Override + public <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { + // TODO Auto-generated method stub + + } + + @Override + public void reportNewElement() { + // TODO Auto-generated method stub + + } + + @Override + public boolean isClosed() { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean hasMore() { + // TODO Auto-generated method stub + return false; + } + + @Override + public void waitForStartSignal() throws InterruptedException { + // TODO Auto-generated method stub + + } + + @Override + public void waitForInitializingSignal() throws InterruptedException { + // TODO Auto-generated method stub + + } + + @Override + public void close() { + // TODO Auto-generated method stub + } }