diff --git a/src/main/java/teetime/framework/AbstractStage.java b/src/main/java/teetime/framework/AbstractStage.java index ab8d1dae94909a75c1254a40121c64392c78ad54..96b71bb44ff3d4f9e3d5964a5cdad53f233bfff8 100644 --- a/src/main/java/teetime/framework/AbstractStage.java +++ b/src/main/java/teetime/framework/AbstractStage.java @@ -58,10 +58,10 @@ public abstract class AbstractStage extends Stage { public void onSignal(final ISignal signal, final InputPort<?> inputPort) { if (!this.signalAlreadyReceived(signal, inputPort)) { signal.trigger(this); - for (OutputPort<?> outputPort : outputPorts) { outputPort.sendSignal(signal); } + } } diff --git a/src/main/java/teetime/framework/Analysis.java b/src/main/java/teetime/framework/Analysis.java index ab2a1d2c6d2835e0bdba69fe245a04c7b9ca9f73..ea357146a290b441c56b5b2a3764e1a5b41c8b74 100644 --- a/src/main/java/teetime/framework/Analysis.java +++ b/src/main/java/teetime/framework/Analysis.java @@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory; import teetime.framework.exceptionHandling.AbstractExceptionListener; import teetime.framework.exceptionHandling.IExceptionListenerFactory; import teetime.framework.exceptionHandling.IgnoringExceptionListenerFactory; +import teetime.framework.signal.InitializingSignal; import teetime.framework.signal.ValidatingSignal; import teetime.framework.validation.AnalysisNotValidException; import teetime.util.Pair; @@ -140,18 +141,21 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught final RunnableProducerStage runnable = new RunnableProducerStage(stage); thread = createThread(runnable, stage.getId()); this.finiteProducerThreads.add(thread); + InitializingSignal initializingSignal = new InitializingSignal(); + stage.onSignal(initializingSignal, null); break; } case BY_INTERRUPT: { final RunnableProducerStage runnable = new RunnableProducerStage(stage); thread = createThread(runnable, stage.getId()); + InitializingSignal initializingSignal = new InitializingSignal(); + stage.onSignal(initializingSignal, null); this.infiniteProducerThreads.add(thread); break; } default: throw new IllegalStateException("Unhandled termination strategy: " + terminationStrategy); } - final Set<Stage> intraStages = traverseIntraStages(stage); final AbstractExceptionListener newListener = factory.createInstance(); initializeIntraStages(intraStages, thread, newListener); @@ -170,11 +174,6 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught for (Stage intraStage : intraStages) { intraStage.setOwningThread(thread); intraStage.setExceptionHandler(newListener); - try { - intraStage.onInitializing(); - } catch (Exception e) { // NOPMD(generic framework catch) - throw new IllegalStateException("The following exception occurs within initializing the analysis:", e); - } } } diff --git a/src/main/java/teetime/framework/RunnableConsumerStage.java b/src/main/java/teetime/framework/RunnableConsumerStage.java index 5a8e39643208d88882ca578bd0a6475813fb1e16..ded489ebed6581caff6da9b0cc4ae39c82fb20cb 100644 --- a/src/main/java/teetime/framework/RunnableConsumerStage.java +++ b/src/main/java/teetime/framework/RunnableConsumerStage.java @@ -47,6 +47,9 @@ final class RunnableConsumerStage extends AbstractRunnableStage { for (InputPort<?> inputPort : inputPorts) { inputPort.waitForStartSignal(); } + for (InputPort<?> inputPort : inputPorts) { + inputPort.waitForStartSignal(); + } logger.trace("Starting..." + stage); } diff --git a/src/main/java/teetime/framework/signal/InitializingSignal.java b/src/main/java/teetime/framework/signal/InitializingSignal.java new file mode 100644 index 0000000000000000000000000000000000000000..bb631edcf9aac720b8bb9ff64833dbc34044eef1 --- /dev/null +++ b/src/main/java/teetime/framework/signal/InitializingSignal.java @@ -0,0 +1,47 @@ +/** + * Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://teetime.sourceforge.net) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package teetime.framework.signal; + +import java.util.LinkedList; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import teetime.framework.Stage; + +public final class InitializingSignal implements ISignal { + + private static final Logger LOGGER = LoggerFactory.getLogger(StartingSignal.class); + private final List<Exception> catchedExceptions = new LinkedList<Exception>(); + + public InitializingSignal() {} + + @Override + public void trigger(final Stage stage) { + try { + stage.onInitializing(); + } catch (Exception e) { // NOCS (Stages can throw any arbitrary Exception) + this.catchedExceptions.add(e); + LOGGER.error("Exception while sending the start signal", e); + } + } + + public List<Exception> getCatchedExceptions() { + return this.catchedExceptions; + } + +} diff --git a/src/main/java/teetime/stage/MultipleInstanceOfFilter.java b/src/main/java/teetime/stage/MultipleInstanceOfFilter.java index 46a363af2fd5f6c9162a83deab064cb958a50597..42f0d22ba131b5075b6799269ce7ca1c09ca3b78 100644 --- a/src/main/java/teetime/stage/MultipleInstanceOfFilter.java +++ b/src/main/java/teetime/stage/MultipleInstanceOfFilter.java @@ -42,7 +42,6 @@ public final class MultipleInstanceOfFilter<I> extends AbstractConsumerStage<I> @SuppressWarnings("unchecked") public void onStarting() throws Exception { super.onStarting(); - // We cache the map to avoid the creating of iterators during runtime cachedOutputPortsMap = (Entry<Class<? extends I>, OutputPort<? super I>>[]) outputPortsMap.entrySet().toArray(new Entry<?, ?>[outputPortsMap.size()]); } diff --git a/src/test/java/teetime/stage/MultipleInstanceOfFilterTest.java b/src/test/java/teetime/stage/MultipleInstanceOfFilterTest.java index 3654feddc788773efc5f7fd7401b626c9b1ff3e9..6221ba7733486296626f1b55ea5fb486d2db96c6 100644 --- a/src/test/java/teetime/stage/MultipleInstanceOfFilterTest.java +++ b/src/test/java/teetime/stage/MultipleInstanceOfFilterTest.java @@ -16,7 +16,9 @@ package teetime.stage; import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertThat; import java.util.ArrayList; @@ -42,6 +44,7 @@ public class MultipleInstanceOfFilterTest { StageTester.test(filter).and().send(input).to(filter.getInputPort()).and().receive(result).from(filter.getOutputPortForType(String.class)).start(); + assertThat(result, is(not(empty()))); assertThat(result, contains("1", "2", "3")); }