diff --git a/src/main/java/teetime/framework/AbstractStage.java b/src/main/java/teetime/framework/AbstractStage.java index 6496217823a2f07bbff3fe2f16c83472b803db13..447b243c8a0d8d1fd709304281de9d62e56478e3 100644 --- a/src/main/java/teetime/framework/AbstractStage.java +++ b/src/main/java/teetime/framework/AbstractStage.java @@ -15,8 +15,10 @@ */ package teetime.framework; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import teetime.framework.pipe.IPipe; @@ -27,6 +29,7 @@ import teetime.util.framework.port.PortRemovedListener; public abstract class AbstractStage extends Stage { + private final Map<Class<? extends ISignal>, Set<InputPort<?>>> signalMap = new HashMap<Class<? extends ISignal>, Set<InputPort<?>>>(); private final Set<Class<? extends ISignal>> triggeredSignalTypes = new HashSet<Class<? extends ISignal>>(); private final PortList<InputPort<?>> inputPorts = new PortList<InputPort<?>>(); @@ -54,7 +57,21 @@ public abstract class AbstractStage extends Stage { @SuppressWarnings("PMD.DataflowAnomalyAnalysis") @Override public void onSignal(final ISignal signal, final InputPort<?> inputPort) { - if (!this.signalAlreadyReceived(signal, inputPort)) { + Class<? extends ISignal> signalClass = signal.getClass(); + + Set<InputPort<?>> signalReceivedInputPorts; + if (signalMap.containsKey(signalClass)) { + signalReceivedInputPorts = signalMap.get(signalClass); + } else { + signalReceivedInputPorts = new HashSet<InputPort<?>>(); + signalMap.put(signalClass, signalReceivedInputPorts); + } + + if (!signalReceivedInputPorts.add(inputPort)) { + this.logger.warn("Received more than one signal - " + signal + " - from input port: " + inputPort); + return; + } + if (signal.mayBeTriggered(signalReceivedInputPorts, getInputPorts())) { try { signal.trigger(this); } catch (Exception e) { diff --git a/src/main/java/teetime/framework/signal/StartingSignal.java b/src/main/java/teetime/framework/signal/StartingSignal.java index f8e001592ba6250696beef8201c61fd8c64307da..38ad2ce4d4011c0f36adfe7ee583e1fd3932f324 100644 --- a/src/main/java/teetime/framework/signal/StartingSignal.java +++ b/src/main/java/teetime/framework/signal/StartingSignal.java @@ -30,7 +30,7 @@ public final class StartingSignal implements ISignal { @Override public boolean mayBeTriggered(final Set<InputPort<?>> receivedInputPorts, final List<InputPort<?>> allInputPorts) { - return true; + return receivedInputPorts.size() == 1; } } diff --git a/src/main/java/teetime/framework/signal/TerminatingSignal.java b/src/main/java/teetime/framework/signal/TerminatingSignal.java index f17b532a66bb26b634f5de32d35707e5a2249a32..211c57bb42fc7ad6deea6f5e5ad2818dbdbd906f 100644 --- a/src/main/java/teetime/framework/signal/TerminatingSignal.java +++ b/src/main/java/teetime/framework/signal/TerminatingSignal.java @@ -30,7 +30,7 @@ public final class TerminatingSignal implements ISignal { @Override public boolean mayBeTriggered(final Set<InputPort<?>> receivedInputPorts, final List<InputPort<?>> allInputPorts) { - return receivedInputPorts.size() == allInputPorts.size(); + return receivedInputPorts.size() >= allInputPorts.size(); } } diff --git a/src/main/java/teetime/stage/basic/merger/Merger.java b/src/main/java/teetime/stage/basic/merger/Merger.java index 89bc76226e56997b30506d46edb82d704d58fe7f..1ed92585e37423346a2a4ee319f03d7872835c87 100644 --- a/src/main/java/teetime/stage/basic/merger/Merger.java +++ b/src/main/java/teetime/stage/basic/merger/Merger.java @@ -15,11 +15,7 @@ */ package teetime.stage.basic.merger; -import java.util.HashMap; -import java.util.HashSet; import java.util.List; -import java.util.Map; -import java.util.Set; import teetime.framework.AbstractStage; import teetime.framework.InputPort; @@ -42,7 +38,6 @@ import teetime.stage.basic.merger.strategy.RoundRobinStrategy; */ public class Merger<T> extends AbstractStage { - private final Map<Class<? extends ISignal>, Set<InputPort<?>>> signalMap; private final OutputPort<T> outputPort = this.createOutputPort(); private final IMergerStrategy strategy; @@ -52,7 +47,6 @@ public class Merger<T> extends AbstractStage { } public Merger(final IMergerStrategy strategy) { - this.signalMap = new HashMap<Class<? extends ISignal>, Set<InputPort<?>>>(); this.strategy = strategy; addInputPortRemovedListener(strategy); } @@ -66,42 +60,6 @@ public class Merger<T> extends AbstractStage { outputPort.send(token); } - /** - * This method is executed, if a signal is sent to a instance of this class. - * Multiple signals of one certain type are ignored, if they are sent to same port. - * Hence a signal is only passed on, when it arrived on all input ports, regardless how often. - * - * @param signal - * Signal which is sent - * - * @param inputPort - * The port which the signal was sent to - */ - @Override - public void onSignal(final ISignal signal, final InputPort<?> inputPort) { - if (logger.isTraceEnabled()) { - this.logger.trace("Got signal: " + signal + " from input port: " + inputPort); - } - - Class<? extends ISignal> signalClass = signal.getClass(); - - Set<InputPort<?>> signalReceivedInputPorts; - if (signalMap.containsKey(signalClass)) { - signalReceivedInputPorts = signalMap.get(signalClass); - } else { - signalReceivedInputPorts = new HashSet<InputPort<?>>(); - signalMap.put(signalClass, signalReceivedInputPorts); - } - - if (!signalReceivedInputPorts.add(inputPort)) { - this.logger.warn("Received more than one signal - " + signal + " - from input port: " + inputPort); - } - - if (signal.mayBeTriggered(signalReceivedInputPorts, getInputPorts())) { - super.onSignal(signal, inputPort); - } - } - public IMergerStrategy getMergerStrategy() { return this.strategy; }