diff --git a/src/main/java/teetime/framework/AbstractRunnableStage.java b/src/main/java/teetime/framework/AbstractRunnableStage.java index 04d2c800820963a3e5de448dfefec3103f387e23..2d4a24b2dab47abca571f1723b5d26ea8217cd75 100644 --- a/src/main/java/teetime/framework/AbstractRunnableStage.java +++ b/src/main/java/teetime/framework/AbstractRunnableStage.java @@ -33,7 +33,7 @@ abstract class AbstractRunnableStage implements Runnable { this.logger = LoggerFactory.getLogger(stage.getClass()); if (stage.getTerminationStrategy() != TerminationStrategy.BY_INTERRUPT) { - stage.owningContext.getRunnableCounter().inc(); + stage.owningContext.getRuntimeService().getRunnableCounter().inc(); } } @@ -63,7 +63,7 @@ abstract class AbstractRunnableStage implements Runnable { } } finally { if (stage.getTerminationStrategy() != TerminationStrategy.BY_INTERRUPT) { - stage.owningContext.getRunnableCounter().dec(); + stage.owningContext.getRuntimeService().getRunnableCounter().dec(); } } diff --git a/src/main/java/teetime/framework/ConfigurationContext.java b/src/main/java/teetime/framework/ConfigurationContext.java index 529164db49d9c1d7cc76c0f48b4a9b5eb6318b75..a1606ad66d260478ac431cd6750e41cba768f6cb 100644 --- a/src/main/java/teetime/framework/ConfigurationContext.java +++ b/src/main/java/teetime/framework/ConfigurationContext.java @@ -15,14 +15,12 @@ */ package teetime.framework; -import java.util.HashMap; import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import teetime.framework.pipe.InstantiationPipe; -import teetime.util.framework.concurrent.SignalingCounter; /** * Represents a context that is used by a configuration and composite stages to connect ports, for example. @@ -36,18 +34,12 @@ final class ConfigurationContext { private static final Logger LOGGER = LoggerFactory.getLogger(ConfigurationContext.class); - private Map<Stage, String> threadableStages = new HashMap<Stage, String>(); - - private final SignalingCounter runnableCounter = new SignalingCounter(); + private ThreadService runtimeService = new ThreadService(); ConfigurationContext() {} Map<Stage, String> getThreadableStages() { - return this.threadableStages; - } - - SignalingCounter getRunnableCounter() { - return runnableCounter; + return runtimeService.getThreadableStages(); } /** @@ -55,9 +47,7 @@ final class ConfigurationContext { */ final void addThreadableStage(final Stage stage, final String threadName) { mergeContexts(stage); - if (this.threadableStages.put(stage, threadName) != null) { - LOGGER.warn("Stage " + stage.getId() + " was already marked as threadable stage."); - } + runtimeService.addThreadableStage(stage, threadName); } /** @@ -65,7 +55,7 @@ final class ConfigurationContext { */ final <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { if (sourcePort.getOwningStage().getInputPorts().size() == 0) { - if (!threadableStages.containsKey(sourcePort.getOwningStage())) { + if (!runtimeService.getThreadableStages().containsKey(sourcePort.getOwningStage())) { addThreadableStage(sourcePort.getOwningStage(), sourcePort.getOwningStage().getId()); } } @@ -81,8 +71,8 @@ final class ConfigurationContext { final void mergeContexts(final Stage stage) { if (!stage.owningContext.equals(EMPTY_CONTEXT)) { if (stage.owningContext != this) { // Performance - this.threadableStages.putAll(stage.owningContext.threadableStages); - stage.owningContext.threadableStages = this.threadableStages; + this.runtimeService.getThreadableStages().putAll(stage.owningContext.getRuntimeService().getThreadableStages()); + stage.owningContext.getRuntimeService().setThreadableStages(this.getRuntimeService().getThreadableStages()); } } else { stage.owningContext = this; @@ -90,4 +80,12 @@ final class ConfigurationContext { } + public ThreadService getRuntimeService() { + return runtimeService; + } + + public void setRuntimeService(final ThreadService runtimeService) { + this.runtimeService = runtimeService; + } + } diff --git a/src/main/java/teetime/framework/Execution.java b/src/main/java/teetime/framework/Execution.java index 3b843c1391f5d7a9a64fb5199ad8855623e29879..61a80824790339cf4b311aaac19246464e541096 100644 --- a/src/main/java/teetime/framework/Execution.java +++ b/src/main/java/teetime/framework/Execution.java @@ -15,13 +15,8 @@ */ package teetime.framework; -import java.lang.Thread.UncaughtExceptionHandler; -import java.util.Collection; -import java.util.LinkedList; -import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentLinkedQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,10 +24,8 @@ import org.slf4j.LoggerFactory; import teetime.framework.exceptionHandling.AbstractExceptionListener; import teetime.framework.exceptionHandling.IExceptionListenerFactory; import teetime.framework.exceptionHandling.TerminatingExceptionListenerFactory; -import teetime.framework.signal.InitializingSignal; import teetime.framework.signal.ValidatingSignal; import teetime.framework.validation.AnalysisNotValidException; -import teetime.util.ThreadThrowableContainer; /** * Represents an Execution to which stages can be added and executed later. @@ -48,7 +41,7 @@ import teetime.util.ThreadThrowableContainer; * * @since 2.0 */ -public final class Execution<T extends Configuration> implements UncaughtExceptionHandler { +public final class Execution<T extends Configuration> { private static final Logger LOGGER = LoggerFactory.getLogger(Execution.class); @@ -56,15 +49,7 @@ public final class Execution<T extends Configuration> implements UncaughtExcepti private final IExceptionListenerFactory factory; - private boolean executionInterrupted = false; - - private final List<Thread> consumerThreads = new LinkedList<Thread>(); - private final List<Thread> finiteProducerThreads = new LinkedList<Thread>(); - private final List<Thread> infiniteProducerThreads = new LinkedList<Thread>(); - - private final Collection<ThreadThrowableContainer> exceptions = new ConcurrentLinkedQueue<ThreadThrowableContainer>(); - - private final List<RunnableProducerStage> producerRunnables = new LinkedList<RunnableProducerStage>(); + private final boolean executionInterrupted = false; /** * Creates a new {@link Execution} that skips validating the port connections and uses the default listener. @@ -159,54 +144,12 @@ public final class Execution<T extends Configuration> implements UncaughtExcepti initializeIntraStages(intraStages, thread, newListener); } - startThreads(this.consumerThreads); - startThreads(this.finiteProducerThreads); - startThreads(this.infiniteProducerThreads); - - sendInitializingSignal(); + getConfiguration().getContext().getRuntimeService().startThreads(); } private Thread initializeThreadableStages(final Stage stage) { - final Thread thread; - - final TerminationStrategy terminationStrategy = stage.getTerminationStrategy(); - switch (terminationStrategy) { - case BY_SIGNAL: { - final RunnableConsumerStage runnable = new RunnableConsumerStage(stage); - thread = createThread(runnable, stage.getId()); - this.consumerThreads.add(thread); - break; - } - case BY_SELF_DECISION: { - final RunnableProducerStage runnable = new RunnableProducerStage(stage); - producerRunnables.add(runnable); - thread = createThread(runnable, stage.getId()); - this.finiteProducerThreads.add(thread); - InitializingSignal initializingSignal = new InitializingSignal(); - stage.onSignal(initializingSignal, null); - break; - } - case BY_INTERRUPT: { - final RunnableProducerStage runnable = new RunnableProducerStage(stage); - producerRunnables.add(runnable); - thread = createThread(runnable, stage.getId()); - InitializingSignal initializingSignal = new InitializingSignal(); - stage.onSignal(initializingSignal, null); - this.infiniteProducerThreads.add(thread); - break; - } - default: - throw new IllegalStateException("Unhandled termination strategy: " + terminationStrategy); - } - return thread; - } - - private Thread createThread(final AbstractRunnableStage runnable, final String name) { - final Thread thread = new Thread(runnable); - thread.setUncaughtExceptionHandler(this); - thread.setName(configuration.getContext().getThreadableStages().get(runnable.stage)); - return thread; + return configuration.getContext().getRuntimeService().initializeThreadableStages(stage); } private void initializeIntraStages(final Set<Stage> intraStages, final Thread thread, final AbstractExceptionListener newListener) { @@ -225,37 +168,7 @@ public final class Execution<T extends Configuration> implements UncaughtExcepti * @since 2.0 */ public void waitForTermination() { - try { - getConfiguration().getContext().getRunnableCounter().waitFor(0); - - // LOGGER.debug("Waiting for finiteProducerThreads"); - // for (Thread thread : this.finiteProducerThreads) { - // thread.join(); - // } - // - // LOGGER.debug("Waiting for consumerThreads"); - // for (Thread thread : this.consumerThreads) { - // thread.join(); - // } - } catch (InterruptedException e) { - LOGGER.error("Execution has stopped unexpectedly", e); - for (Thread thread : this.finiteProducerThreads) { - thread.interrupt(); - } - - for (Thread thread : this.consumerThreads) { - thread.interrupt(); - } - } - - LOGGER.debug("Interrupting infiniteProducerThreads..."); - for (Thread thread : this.infiniteProducerThreads) { - thread.interrupt(); - } - - if (!exceptions.isEmpty()) { - throw new ExecutionException(exceptions); - } + getConfiguration().getContext().getRuntimeService().waitForTermination(); } // TODO: implement @@ -286,25 +199,7 @@ public final class Execution<T extends Configuration> implements UncaughtExcepti * @since 2.0 */ public void executeNonBlocking() { - sendStartingSignal(); - } - - private void startThreads(final Iterable<Thread> threads) { - for (Thread thread : threads) { - thread.start(); - } - } - - private void sendInitializingSignal() { - for (RunnableProducerStage runnable : producerRunnables) { - runnable.triggerInitializingSignal(); - } - } - - private void sendStartingSignal() { - for (RunnableProducerStage runnable : producerRunnables) { - runnable.triggerStartingSignal(); - } + configuration.getContext().getRuntimeService().executeNonBlocking(); } /** @@ -316,22 +211,6 @@ public final class Execution<T extends Configuration> implements UncaughtExcepti return this.configuration; } - @Override - public void uncaughtException(final Thread thread, final Throwable throwable) { - if (!executionInterrupted) { - executionInterrupted = true; - LOGGER.warn("Thread " + thread + " was interrupted. Terminating analysis now."); - for (Stage stage : configuration.getContext().getThreadableStages().keySet()) { - if (stage.getOwningThread() != thread) { - if (stage.getTerminationStrategy() == TerminationStrategy.BY_SELF_DECISION) { - stage.terminate(); - } - } - } - } - this.exceptions.add(ThreadThrowableContainer.of(thread, throwable)); - } - private Set<Stage> traverseIntraStages(final Stage stage) { final Traversor traversor = new Traversor(new IntraStageCollector()); traversor.traverse(stage); @@ -344,7 +223,7 @@ public final class Execution<T extends Configuration> implements UncaughtExcepti * * @since 2.0 */ - public IExceptionListenerFactory getFactory() { + public IExceptionListenerFactory getExceptionListenerFactory() { return factory; } } diff --git a/src/main/java/teetime/framework/IService.java b/src/main/java/teetime/framework/IService.java new file mode 100644 index 0000000000000000000000000000000000000000..8f5f6935ebe04d4f3eefc20f22cce1c29c3a5359 --- /dev/null +++ b/src/main/java/teetime/framework/IService.java @@ -0,0 +1,7 @@ +package teetime.framework; + +public interface IService { + + public void merge(IService first, IService second); + +} diff --git a/src/main/java/teetime/framework/OldExecution.java b/src/main/java/teetime/framework/OldExecution.java new file mode 100644 index 0000000000000000000000000000000000000000..8429fc1e5ab134acae27c0e31915ad9be27608aa --- /dev/null +++ b/src/main/java/teetime/framework/OldExecution.java @@ -0,0 +1,334 @@ +/** + * Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://christianwulf.github.io/teetime) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package teetime.framework; + +import java.lang.Thread.UncaughtExceptionHandler; + +/** + * Represents an Execution to which stages can be added and executed later. + * This needs a {@link Configuration}, + * in which the adding and configuring of stages takes place. + * To start the analysis {@link #executeBlocking()} needs to be executed. + * This class will automatically create threads and join them without any further commitment. + * + * @author Christian Wulf, Nelson Tavares de Sousa + * + * @param <T> + * the type of the {@link Configuration} + * + * @since 2.0 + */ +public abstract class OldExecution<T extends Configuration> implements UncaughtExceptionHandler { + // + // private static final Logger LOGGER = LoggerFactory.getLogger(Execution.class); + // + // private final T configuration; + // + // private final IExceptionListenerFactory factory; + // + // private boolean executionInterrupted = false; + // + // private final List<Thread> consumerThreads = new LinkedList<Thread>(); + // private final List<Thread> finiteProducerThreads = new LinkedList<Thread>(); + // private final List<Thread> infiniteProducerThreads = new LinkedList<Thread>(); + // + // private final Collection<ThreadThrowableContainer> exceptions = new ConcurrentLinkedQueue<ThreadThrowableContainer>(); + // + // private final List<RunnableProducerStage> producerRunnables = new LinkedList<RunnableProducerStage>(); + // + // /** + // * Creates a new {@link Execution} that skips validating the port connections and uses the default listener. + // * + // * @param configuration + // * to be used for the analysis + // */ + // public OldExecution(final T configuration) { + // this(configuration, false); + // } + // + // /** + // * Creates a new {@link Execution} that uses the default listener. + // * + // * @param configuration + // * to be used for the analysis + // * @param validationEnabled + // * whether or not the validation should be executed + // */ + // public OldExecution(final T configuration, final boolean validationEnabled) { + // this(configuration, validationEnabled, new TerminatingExceptionListenerFactory()); + // } + // + // /** + // * Creates a new {@link Execution} that skips validating the port connections and uses a specific listener. + // * + // * @param configuration + // * to be used for the analysis + // * @param factory + // * specific listener for the exception handling + // */ + // public OldExecution(final T configuration, final IExceptionListenerFactory factory) { + // this(configuration, false, factory); + // } + // + // /** + // * Creates a new {@link Execution} that uses a specific listener. + // * + // * @param configuration + // * to be used for the analysis + // * @param validationEnabled + // * whether or not the validation should be executed + // * @param factory + // * specific listener for the exception handling + // */ + // public OldExecution(final T configuration, final boolean validationEnabled, final IExceptionListenerFactory factory) { + // this.configuration = configuration; + // this.factory = factory; + // if (configuration.isExecuted()) { + // throw new IllegalStateException("Configuration was already executed"); + // } + // configuration.setExecuted(true); + // if (validationEnabled) { + // validateStages(); + // } + // init(); + // } + // + // // BETTER validate concurrently + // private void validateStages() { + // final Map<Stage, String> threadableStageJobs = this.configuration.getContext().getThreadableStages(); + // for (Stage stage : threadableStageJobs.keySet()) { + // // // portConnectionValidator.validate(stage); + // // } + // + // final ValidatingSignal validatingSignal = new ValidatingSignal(); + // stage.onSignal(validatingSignal, null); + // if (validatingSignal.getInvalidPortConnections().size() > 0) { + // throw new AnalysisNotValidException(validatingSignal.getInvalidPortConnections()); + // } + // } + // } + // + // /** + // * This initializes the analysis and needs to be run right before starting it. + // * + // */ + // private final void init() { + // ExecutionInstantiation executionInstantiation = new ExecutionInstantiation(configuration.getContext()); + // executionInstantiation.instantiatePipes(); + // + // final Set<Stage> threadableStageJobs = this.configuration.getContext().getThreadableStages().keySet(); + // if (threadableStageJobs.isEmpty()) { + // throw new IllegalStateException("No stage was added using the addThreadableStage(..) method. Add at least one stage."); + // } + // + // for (Stage stage : threadableStageJobs) { + // final Thread thread = initializeThreadableStages(stage); + // + // final Set<Stage> intraStages = traverseIntraStages(stage); + // final AbstractExceptionListener newListener = factory.createInstance(); + // initializeIntraStages(intraStages, thread, newListener); + // } + // + // startThreads(this.consumerThreads); + // startThreads(this.finiteProducerThreads); + // startThreads(this.infiniteProducerThreads); + // + // sendInitializingSignal(); + // + // } + // + // private Thread initializeThreadableStages(final Stage stage) { + // final Thread thread; + // + // final TerminationStrategy terminationStrategy = stage.getTerminationStrategy(); + // switch (terminationStrategy) { + // case BY_SIGNAL: { + // final RunnableConsumerStage runnable = new RunnableConsumerStage(stage); + // thread = createThread(runnable, stage.getId()); + // this.consumerThreads.add(thread); + // break; + // } + // case BY_SELF_DECISION: { + // final RunnableProducerStage runnable = new RunnableProducerStage(stage); + // producerRunnables.add(runnable); + // thread = createThread(runnable, stage.getId()); + // this.finiteProducerThreads.add(thread); + // InitializingSignal initializingSignal = new InitializingSignal(); + // stage.onSignal(initializingSignal, null); + // break; + // } + // case BY_INTERRUPT: { + // final RunnableProducerStage runnable = new RunnableProducerStage(stage); + // producerRunnables.add(runnable); + // thread = createThread(runnable, stage.getId()); + // InitializingSignal initializingSignal = new InitializingSignal(); + // stage.onSignal(initializingSignal, null); + // this.infiniteProducerThreads.add(thread); + // break; + // } + // default: + // throw new IllegalStateException("Unhandled termination strategy: " + terminationStrategy); + // } + // return thread; + // } + // + // private Thread createThread(final AbstractRunnableStage runnable, final String name) { + // final Thread thread = new Thread(runnable); + // thread.setUncaughtExceptionHandler(this); + // thread.setName(configuration.getContext().getThreadableStages().get(runnable.stage)); + // return thread; + // } + // + // private void initializeIntraStages(final Set<Stage> intraStages, final Thread thread, final AbstractExceptionListener newListener) { + // for (Stage intraStage : intraStages) { + // intraStage.setOwningThread(thread); + // intraStage.setExceptionHandler(newListener); + // } + // } + // + // /** + // * Calling this method will block the current thread until the execution terminates. + // * + // * @throws ExecutionException + // * if at least one exception in one thread has occurred within the execution. The exception contains the pairs of thread and throwable + // * + // * @since 2.0 + // */ + // public void waitForTermination() { + // try { + // getConfiguration().getContext().getRunnableCounter().waitFor(0); + // + // // LOGGER.debug("Waiting for finiteProducerThreads"); + // // for (Thread thread : this.finiteProducerThreads) { + // // thread.join(); + // // } + // // + // // LOGGER.debug("Waiting for consumerThreads"); + // // for (Thread thread : this.consumerThreads) { + // // thread.join(); + // // } + // } catch (InterruptedException e) { + // LOGGER.error("Execution has stopped unexpectedly", e); + // for (Thread thread : this.finiteProducerThreads) { + // thread.interrupt(); + // } + // + // for (Thread thread : this.consumerThreads) { + // thread.interrupt(); + // } + // } + // + // LOGGER.debug("Interrupting infiniteProducerThreads..."); + // for (Thread thread : this.infiniteProducerThreads) { + // thread.interrupt(); + // } + // + // if (!exceptions.isEmpty()) { + // throw new ExecutionException(exceptions); + // } + // } + // + // // TODO: implement + // private void abortEventually() { + // for (Stage stage : configuration.getContext().getThreadableStages().keySet()) { + // stage.terminate(); + // } + // waitForTermination(); + // } + // + // /** + // * This method will start this execution and block until it is finished. + // * + // * @throws ExecutionException + // * if at least one exception in one thread has occurred within the execution. The exception contains the pairs of thread and throwable. + // * + // * @since 2.0 + // */ + // public void executeBlocking() { + // executeNonBlocking(); + // waitForTermination(); + // } + // + // /** + // * This method starts this execution without waiting for its termination. The method {@link #waitForTermination()} must be called to unsure a correct + // termination + // * of the execution. + // * + // * @since 2.0 + // */ + // public void executeNonBlocking() { + // sendStartingSignal(); + // } + // + // private void startThreads(final Iterable<Thread> threads) { + // for (Thread thread : threads) { + // thread.start(); + // } + // } + // + // private void sendInitializingSignal() { + // for (RunnableProducerStage runnable : producerRunnables) { + // runnable.triggerInitializingSignal(); + // } + // } + // + // private void sendStartingSignal() { + // for (RunnableProducerStage runnable : producerRunnables) { + // runnable.triggerStartingSignal(); + // } + // } + // + // /** + // * Retrieves the Configuration which was used to add and arrange all stages needed for this execution. + // * + // * @return the configuration used for this execution + // */ + // public T getConfiguration() { + // return this.configuration; + // } + // + // @Override + // public void uncaughtException(final Thread thread, final Throwable throwable) { + // if (!executionInterrupted) { + // executionInterrupted = true; + // LOGGER.warn("Thread " + thread + " was interrupted. Terminating analysis now."); + // for (Stage stage : configuration.getContext().getThreadableStages().keySet()) { + // if (stage.getOwningThread() != thread) { + // if (stage.getTerminationStrategy() == TerminationStrategy.BY_SELF_DECISION) { + // stage.terminate(); + // } + // } + // } + // } + // this.exceptions.add(ThreadThrowableContainer.of(thread, throwable)); + // } + // + // private Set<Stage> traverseIntraStages(final Stage stage) { + // final Traversor traversor = new Traversor(new IntraStageCollector()); + // traversor.traverse(stage); + // return traversor.getVisitedStage(); + // } + // + // /** + // * @return + // * the given ExceptionListenerFactory instance + // * + // * @since 2.0 + // */ + // public IExceptionListenerFactory getExceptionListenerFactory() { + // return factory; + // } +} diff --git a/src/main/java/teetime/framework/ThreadService.java b/src/main/java/teetime/framework/ThreadService.java index c4194b4ee853018f14fae0c5766aa28921c8f193..deab96c48bbcd3b092dbf07e7c5aaaf3b04a3f33 100644 --- a/src/main/java/teetime/framework/ThreadService.java +++ b/src/main/java/teetime/framework/ThreadService.java @@ -1,33 +1,161 @@ -/** - * Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://christianwulf.github.io/teetime) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ package teetime.framework; -public class ThreadService { +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentLinkedQueue; - public Runnable startWithinNewThread(final Stage stage) { - Runnable runnable = wrap(stage); - Thread thread = new Thread(runnable); - thread.start(); - return runnable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import teetime.framework.signal.InitializingSignal; +import teetime.util.ThreadThrowableContainer; +import teetime.util.framework.concurrent.SignalingCounter; + +class ThreadService implements IService { + + private Map<Stage, String> threadableStages = new HashMap<Stage, String>(); + + private static final Logger LOGGER = LoggerFactory.getLogger(ThreadService.class); + + private final List<Thread> consumerThreads = new LinkedList<Thread>(); + private final List<Thread> finiteProducerThreads = new LinkedList<Thread>(); + private final List<Thread> infiniteProducerThreads = new LinkedList<Thread>(); + + private final SignalingCounter runnableCounter = new SignalingCounter(); + + SignalingCounter getRunnableCounter() { + return runnableCounter; } - private AbstractRunnableStage wrap(final Stage stage) { - if (stage.getInputPorts().size() > 0) { - return new RunnableConsumerStage(stage); + private final Collection<ThreadThrowableContainer> exceptions = new ConcurrentLinkedQueue<ThreadThrowableContainer>(); + + private final List<RunnableProducerStage> producerRunnables = new LinkedList<RunnableProducerStage>(); + + Thread initializeThreadableStages(final Stage stage) { + final Thread thread; + + final TerminationStrategy terminationStrategy = stage.getTerminationStrategy(); + switch (terminationStrategy) { + case BY_SIGNAL: { + final RunnableConsumerStage runnable = new RunnableConsumerStage(stage); + thread = createThread(runnable, stage.getId()); + this.consumerThreads.add(thread); + break; + } + case BY_SELF_DECISION: { + final RunnableProducerStage runnable = new RunnableProducerStage(stage); + producerRunnables.add(runnable); + thread = createThread(runnable, stage.getId()); + this.finiteProducerThreads.add(thread); + InitializingSignal initializingSignal = new InitializingSignal(); + stage.onSignal(initializingSignal, null); + break; + } + case BY_INTERRUPT: { + final RunnableProducerStage runnable = new RunnableProducerStage(stage); + producerRunnables.add(runnable); + thread = createThread(runnable, stage.getId()); + InitializingSignal initializingSignal = new InitializingSignal(); + stage.onSignal(initializingSignal, null); + this.infiniteProducerThreads.add(thread); + break; + } + default: + throw new IllegalStateException("Unhandled termination strategy: " + terminationStrategy); } - return new RunnableProducerStage(stage); + return thread; } + + private Thread createThread(final AbstractRunnableStage runnable, final String name) { + final Thread thread = new Thread(runnable); + thread.setName(threadableStages.get(runnable.stage)); + return thread; + } + + public void addThreadableStage(final Stage stage, final String threadName) { + if (this.threadableStages.put(stage, threadName) != null) { + LOGGER.warn("Stage " + stage.getId() + " was already marked as threadable stage."); + } + } + + void waitForTermination() { + try { + runnableCounter.waitFor(0); + + // LOGGER.debug("Waiting for finiteProducerThreads"); + // for (Thread thread : this.finiteProducerThreads) { + // thread.join(); + // } + // + // LOGGER.debug("Waiting for consumerThreads"); + // for (Thread thread : this.consumerThreads) { + // thread.join(); + // } + } catch (InterruptedException e) { + LOGGER.error("Execution has stopped unexpectedly", e); + for (Thread thread : this.finiteProducerThreads) { + thread.interrupt(); + } + + for (Thread thread : this.consumerThreads) { + thread.interrupt(); + } + } + + LOGGER.debug("Interrupting infiniteProducerThreads..."); + for (Thread thread : this.infiniteProducerThreads) { + thread.interrupt(); + } + + if (!exceptions.isEmpty()) { + throw new ExecutionException(exceptions); + } + } + + void executeNonBlocking() { + sendStartingSignal(); + } + + void startThreads() { + startThreads(this.consumerThreads); + startThreads(this.finiteProducerThreads); + startThreads(this.infiniteProducerThreads); + + sendInitializingSignal(); + } + + private void startThreads(final Iterable<Thread> threads) { + for (Thread thread : threads) { + thread.start(); + } + } + + private void sendInitializingSignal() { + for (RunnableProducerStage runnable : producerRunnables) { + runnable.triggerInitializingSignal(); + } + } + + private void sendStartingSignal() { + for (RunnableProducerStage runnable : producerRunnables) { + runnable.triggerStartingSignal(); + } + } + + @Override + public void merge(final IService first, final IService second) { + + } + + public Map<Stage, String> getThreadableStages() { + return threadableStages; + } + + void setThreadableStages(final Map<Stage, String> threadableStages) { + this.threadableStages = threadableStages; + } + } diff --git a/src/main/java/teetime/framework/ThreadServiceB.java b/src/main/java/teetime/framework/ThreadServiceB.java new file mode 100644 index 0000000000000000000000000000000000000000..aca4cba953e6db6115f0bb094b2d22170e36bde1 --- /dev/null +++ b/src/main/java/teetime/framework/ThreadServiceB.java @@ -0,0 +1,33 @@ +/** + * Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://christianwulf.github.io/teetime) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package teetime.framework; + +public class ThreadServiceB { + + public Runnable startWithinNewThread(final Stage stage) { + Runnable runnable = wrap(stage); + Thread thread = new Thread(runnable); + thread.start(); + return runnable; + } + + private AbstractRunnableStage wrap(final Stage stage) { + if (stage.getInputPorts().size() > 0) { + return new RunnableConsumerStage(stage); + } + return new RunnableProducerStage(stage); + } +} diff --git a/src/main/java/teetime/framework/service/RuntimeService.java b/src/main/java/teetime/framework/service/RuntimeService.java deleted file mode 100644 index 73d1af0ccfcd3fa4b13bb285bbc288a4b16c3a0e..0000000000000000000000000000000000000000 --- a/src/main/java/teetime/framework/service/RuntimeService.java +++ /dev/null @@ -1,5 +0,0 @@ -package teetime.framework.service; - -public class RuntimeService { - -}