diff --git a/src/main/java/teetime/framework/Analysis.java b/src/main/java/teetime/framework/Analysis.java index 50e4f96f2b963cb38de20df715375b45829d04ad..a8fc0514bf9697495140cb317db28cace9af5ec8 100644 --- a/src/main/java/teetime/framework/Analysis.java +++ b/src/main/java/teetime/framework/Analysis.java @@ -17,10 +17,8 @@ package teetime.framework; import java.lang.Thread.UncaughtExceptionHandler; import java.util.Collection; -import java.util.HashMap; import java.util.LinkedList; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; @@ -30,14 +28,9 @@ import org.slf4j.LoggerFactory; import teetime.framework.exceptionHandling.AbstractExceptionListener; import teetime.framework.exceptionHandling.IExceptionListenerFactory; import teetime.framework.exceptionHandling.IgnoringExceptionListenerFactory; -import teetime.framework.pipe.IPipeFactory; -import teetime.framework.pipe.SingleElementPipeFactory; -import teetime.framework.pipe.SpScPipeFactory; -import teetime.framework.pipe.UnboundedSpScPipeFactory; import teetime.framework.signal.InitializingSignal; import teetime.framework.signal.ValidatingSignal; import teetime.framework.validation.AnalysisNotValidException; -import teetime.util.Connection; import teetime.util.Pair; /** @@ -68,13 +61,6 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught private final Collection<Pair<Thread, Throwable>> exceptions = new ConcurrentLinkedQueue<Pair<Thread, Throwable>>(); - private boolean initialized; - - private final IPipeFactory interBoundedThreadPipeFactory = new SpScPipeFactory(); - private final IPipeFactory interUnboundedThreadPipeFactory = new UnboundedSpScPipeFactory(); - private final IPipeFactory intraThreadPipeFactory = new SingleElementPipeFactory(); - private int createdConnections = 0; - private final List<RunnableProducerStage> producerRunnables = new LinkedList<RunnableProducerStage>(); /** @@ -114,7 +100,7 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught // BETTER validate concurrently private void validateStages() { - final Set<Stage> threadableStageJobs = this.configuration.getThreadableStageJobs(); + final Set<Stage> threadableStageJobs = this.configuration.getThreadableStages(); for (Stage stage : threadableStageJobs) { // // portConnectionValidator.validate(stage); // } @@ -132,10 +118,10 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught * */ private final void init() { + AnalysisInstantiation analysisInstantiation = new AnalysisInstantiation(configuration); + analysisInstantiation.instantiatePipes(); - instantiatePipes(); - - final Set<Stage> threadableStageJobs = this.configuration.getThreadableStageJobs(); + final Set<Stage> threadableStageJobs = this.configuration.getThreadableStages(); if (threadableStageJobs.isEmpty()) { throw new IllegalStateException("No stage was added using the addThreadableStage(..) method. Add at least one stage."); } @@ -191,50 +177,6 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught return thread; } - private void instantiatePipes() { - Integer i = new Integer(0); - Map<Stage, Integer> colors = new HashMap<Stage, Integer>(); - Set<Stage> threadableStageJobs = configuration.getThreadableStageJobs(); - for (Stage threadableStage : threadableStageJobs) { - i++; - colors.put(threadableStage, i); // Markiere den threadHead - colorAndConnectStages(i, colors, threadableStage); - } - if (configuration.getConnections().size() != createdConnections) { - throw new IllegalStateException("Remaining " + (configuration.getConnections().size() - createdConnections) + " connection(s)"); - } - } - - public void colorAndConnectStages(final Integer i, final Map<Stage, Integer> colors, final Stage threadableStage) { - Set<Stage> threadableStageJobs = configuration.getThreadableStageJobs(); - for (Connection connection : configuration.getConnections()) { - if (connection.getSourcePort().getOwningStage() == threadableStage) { - Stage targetStage = connection.getTargetPort().getOwningStage(); - Integer targetColor = new Integer(0); - if (colors.containsKey(targetStage)) { - targetColor = colors.get(targetStage); - } - if (threadableStageJobs.contains(targetStage) && targetColor.compareTo(i) != 0) { - if (connection.getCapacity() != 0) { - interBoundedThreadPipeFactory.create(connection.getSourcePort(), connection.getTargetPort(), connection.getCapacity()); - } else { - interUnboundedThreadPipeFactory.create(connection.getSourcePort(), connection.getTargetPort(), 4); - } - } else { - if (colors.containsKey(targetStage)) { - if (!colors.get(targetStage).equals(i)) { - throw new IllegalStateException("Crossing threads"); // One stage is connected to a stage of another thread (but not its "headstage") - } - } - intraThreadPipeFactory.create(connection.getSourcePort(), connection.getTargetPort()); - colors.put(targetStage, i); - colorAndConnectStages(i, colors, targetStage); - } - createdConnections++; - } - } - } - private Thread createThread(final AbstractRunnableStage runnable, final String name) { final Thread thread = new Thread(runnable); thread.setUncaughtExceptionHandler(this); @@ -356,7 +298,7 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught if (!executionInterrupted) { executionInterrupted = true; LOGGER.warn("Thread " + thread + " was interrupted. Terminating analysis now."); - for (Stage stage : configuration.getThreadableStageJobs()) { + for (Stage stage : configuration.getThreadableStages()) { if (stage.getOwningThread() != thread) { if (stage.getTerminationStrategy() == TerminationStrategy.BY_SELF_DECISION) { stage.terminate(); diff --git a/src/main/java/teetime/framework/AnalysisConfiguration.java b/src/main/java/teetime/framework/AnalysisConfiguration.java index f9b14001a97abb2e3c86d3a487052618461a016b..0d360b5b163b06a2d1c4b871ddd2d0a87749bb62 100644 --- a/src/main/java/teetime/framework/AnalysisConfiguration.java +++ b/src/main/java/teetime/framework/AnalysisConfiguration.java @@ -20,10 +20,10 @@ import java.util.Set; import teetime.framework.pipe.IPipe; import teetime.framework.pipe.IPipeFactory; +import teetime.framework.pipe.InstantiationPipe; import teetime.framework.pipe.PipeFactoryRegistry; import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; -import teetime.util.Connection; /** * Represents a configuration of connected stages, which is needed to run a analysis. @@ -31,8 +31,7 @@ import teetime.util.Connection; */ public abstract class AnalysisConfiguration { - private final Set<Stage> threadableStageJobs = new HashSet<Stage>(); - private final Set<Connection<?>> connections = new HashSet<Connection<?>>(); + private final Set<Stage> threadableStages = new HashSet<Stage>(); @SuppressWarnings("deprecation") private static final PipeFactoryRegistry PIPE_FACTORY_REGISTRY = PipeFactoryRegistry.INSTANCE; @@ -50,8 +49,8 @@ public abstract class AnalysisConfiguration { */ private final static IPipeFactory interUnboundedThreadFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, true); - Set<Stage> getThreadableStageJobs() { - return this.threadableStageJobs; + Set<Stage> getThreadableStages() { + return this.threadableStages; } /** @@ -60,8 +59,8 @@ public abstract class AnalysisConfiguration { * @param stage * A arbitrary stage, which will be added to the configuration and executed in a thread. */ - protected void addThreadableStage(final Stage stage) { - this.threadableStageJobs.add(stage); + protected final void addThreadableStage(final Stage stage) { + this.threadableStages.add(stage); } /** @@ -70,10 +69,9 @@ public abstract class AnalysisConfiguration { * @param stage * A arbitrary CompositeStage, which will be added to the configuration and executed in a thread. */ - protected void addThreadableStage(final AbstractCompositeStage stage) { - this.threadableStageJobs.add(stage.getFirstStage()); - this.connections.addAll(stage.getConnections()); - for (Stage threadableStage : stage.getThreadableStageJobs()) { + protected final void addThreadableStage(final AbstractCompositeStage stage) { + this.threadableStages.add(stage.getFirstStage()); + for (Stage threadableStage : stage.getThreadableStages()) { this.addThreadableStage(threadableStage); } } @@ -187,7 +185,7 @@ public abstract class AnalysisConfiguration { * @param <T> * the type of elements to be sent */ - protected <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { + protected final <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { connectPorts(sourcePort, targetPort, 4); } @@ -203,17 +201,8 @@ public abstract class AnalysisConfiguration { * @param <T> * the type of elements to be sent */ - protected <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { - connections.add(new Connection<T>(sourcePort, targetPort, capacity)); - } - - /** - * Returns a list of pairs, which describe the connections among all stages. - * - * @return a list of pairs of Out- and InputPorts, which are connected - */ - protected Set<Connection<?>> getConnections() { - return connections; + protected final <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { + new InstantiationPipe(sourcePort, targetPort, capacity); } } diff --git a/src/main/java/teetime/framework/AnalysisInstantiation.java b/src/main/java/teetime/framework/AnalysisInstantiation.java new file mode 100644 index 0000000000000000000000000000000000000000..d1283e22f181038399fd2bfb9f770bb1da8594be --- /dev/null +++ b/src/main/java/teetime/framework/AnalysisInstantiation.java @@ -0,0 +1,80 @@ +package teetime.framework; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import teetime.framework.pipe.IPipeFactory; +import teetime.framework.pipe.InstantiationPipe; +import teetime.framework.pipe.SingleElementPipeFactory; +import teetime.framework.pipe.SpScPipeFactory; +import teetime.framework.pipe.UnboundedSpScPipeFactory; + +class AnalysisInstantiation { + + private static final Logger LOGGER = LoggerFactory.getLogger(AnalysisInstantiation.class); + + private final IPipeFactory interBoundedThreadPipeFactory = new SpScPipeFactory(); + private final IPipeFactory interUnboundedThreadPipeFactory = new UnboundedSpScPipeFactory(); + private final IPipeFactory intraThreadPipeFactory = new SingleElementPipeFactory(); + + private final AnalysisConfiguration configuration; + + public AnalysisInstantiation(final AnalysisConfiguration configuration) { + this.configuration = configuration; + } + + @SuppressWarnings("rawtypes") + Integer colorAndConnectStages(final Integer i, final Map<Stage, Integer> colors, final Stage threadableStage, final AnalysisConfiguration configuration) { + Integer createdConnections = new Integer(0); + Set<Stage> threadableStageJobs = configuration.getThreadableStages(); + for (OutputPort outputPort : threadableStage.getOutputPorts()) { + if (outputPort.pipe != null) { + if (outputPort.pipe instanceof InstantiationPipe) { + InstantiationPipe pipe = (InstantiationPipe) outputPort.pipe; + Stage targetStage = pipe.getTargetPort().getOwningStage(); + Integer targetColor = new Integer(0); + if (colors.containsKey(targetStage)) { + targetColor = colors.get(targetStage); + } + if (threadableStageJobs.contains(targetStage) && targetColor.compareTo(i) != 0) { + if (pipe.getCapacity() != 0) { + interBoundedThreadPipeFactory.create(outputPort, pipe.getTargetPort(), pipe.getCapacity()); + } else { + interUnboundedThreadPipeFactory.create(outputPort, pipe.getTargetPort(), 4); + } + } else { + if (colors.containsKey(targetStage)) { + if (!colors.get(targetStage).equals(i)) { + throw new IllegalStateException("Crossing threads"); // One stage is connected to a stage of another thread (but not its "headstage") + } + } + intraThreadPipeFactory.create(outputPort, pipe.getTargetPort()); + colors.put(targetStage, i); + createdConnections += colorAndConnectStages(i, colors, targetStage, configuration); + } + createdConnections++; + } + } + + } + return createdConnections; + } + + void instantiatePipes() { + Integer i = new Integer(0); + Map<Stage, Integer> colors = new HashMap<Stage, Integer>(); + Set<Stage> threadableStageJobs = configuration.getThreadableStages(); + Integer createdConnections = 0; + for (Stage threadableStage : threadableStageJobs) { + i++; + colors.put(threadableStage, i); + createdConnections = colorAndConnectStages(i, colors, threadableStage, configuration); + } + LOGGER.debug("Created " + createdConnections + "connections"); + } + +} diff --git a/src/main/java/teetime/framework/pipe/InstantiationPipe.java b/src/main/java/teetime/framework/pipe/InstantiationPipe.java new file mode 100644 index 0000000000000000000000000000000000000000..702de98809cd9659fc4c2753785753460e7a1dd5 --- /dev/null +++ b/src/main/java/teetime/framework/pipe/InstantiationPipe.java @@ -0,0 +1,121 @@ +/** + * Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://teetime.sourceforge.net) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package teetime.framework.pipe; + +import teetime.framework.InputPort; +import teetime.framework.OutputPort; +import teetime.framework.signal.ISignal; + +public class InstantiationPipe implements IPipe { + + private final InputPort target; + private final int capacity; + + public <T> InstantiationPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { + this.target = targetPort; + this.capacity = capacity; + sourcePort.setPipe(this); + } + + public int getCapacity() { + return capacity; + } + + @Override + public boolean add(final Object element) { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean addNonBlocking(final Object element) { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean isEmpty() { + // TODO Auto-generated method stub + return false; + } + + @Override + public int size() { + // TODO Auto-generated method stub + return 0; + } + + @Override + public Object removeLast() { + // TODO Auto-generated method stub + return null; + } + + @Override + public InputPort<?> getTargetPort() { + // TODO Auto-generated method stub + return this.target; + } + + @Override + public void sendSignal(final ISignal signal) { + // TODO Auto-generated method stub + + } + + @Override + public <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { + // TODO Auto-generated method stub + + } + + @Override + public void reportNewElement() { + // TODO Auto-generated method stub + + } + + @Override + public boolean isClosed() { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean hasMore() { + // TODO Auto-generated method stub + return false; + } + + @Override + public void waitForStartSignal() throws InterruptedException { + // TODO Auto-generated method stub + + } + + @Override + public void waitForInitializingSignal() throws InterruptedException { + // TODO Auto-generated method stub + + } + + @Override + public void close() { + // TODO Auto-generated method stub + + } + +} diff --git a/src/main/java/teetime/util/Connection.java b/src/main/java/teetime/util/Connection.java deleted file mode 100644 index f110c3da3e172013f17ea9f4d42ebc0178e2b222..0000000000000000000000000000000000000000 --- a/src/main/java/teetime/util/Connection.java +++ /dev/null @@ -1,87 +0,0 @@ -/** - * Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://teetime.sourceforge.net) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package teetime.util; - -import teetime.framework.InputPort; -import teetime.framework.OutputPort; - -public class Connection<T> { - - private final OutputPort<? extends T> sourcePort; - private final InputPort<T> targetPort; - private final int capacity; - - public Connection(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { - this(sourcePort, targetPort, 4); - } - - public Connection(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { - this.sourcePort = sourcePort; - this.targetPort = targetPort; - this.capacity = capacity; - } - - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + ((sourcePort == null) ? 0 : sourcePort.hashCode()); - result = prime * result + ((targetPort == null) ? 0 : targetPort.hashCode()); - return result; - } - - @Override - public boolean equals(final Object obj) { - if (this == obj) { - return true; - } - if (obj == null) { - return false; - } - if (getClass() != obj.getClass()) { - return false; - } - Connection<?> other = (Connection<?>) obj; - if (sourcePort == null) { - if (other.sourcePort != null) { - return false; - } - } else if (!sourcePort.equals(other.sourcePort)) { - return false; - } - if (targetPort == null) { - if (other.targetPort != null) { - return false; - } - } else if (!targetPort.equals(other.targetPort)) { - return false; - } - return true; - } - - public int getCapacity() { - return capacity; - } - - public OutputPort<? extends T> getSourcePort() { - return sourcePort; - } - - public InputPort<T> getTargetPort() { - return targetPort; - } - -}