Skip to content
Snippets Groups Projects
Commit 41f0a1a9 authored by Nelson Tavares de Sousa's avatar Nelson Tavares de Sousa
Browse files

fixes #220

signals get checked, if they should be passed on (merger logic moved to
AbstractStage)
parent 24f691d7
No related branches found
No related tags found
No related merge requests found
......@@ -15,8 +15,10 @@
*/
package teetime.framework;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import teetime.framework.pipe.IPipe;
......@@ -26,6 +28,7 @@ import teetime.util.framework.port.PortList;
import teetime.util.framework.port.PortRemovedListener;
public abstract class AbstractStage extends Stage {
private final Map<Class<? extends ISignal>, Set<InputPort<?>>> signalMap = new HashMap<Class<? extends ISignal>, Set<InputPort<?>>>();
private final Set<Class<? extends ISignal>> triggeredSignalTypes = new HashSet<Class<? extends ISignal>>();
......@@ -54,7 +57,20 @@ public abstract class AbstractStage extends Stage {
@SuppressWarnings("PMD.DataflowAnomalyAnalysis")
@Override
public void onSignal(final ISignal signal, final InputPort<?> inputPort) {
if (!this.signalAlreadyReceived(signal, inputPort)) {
Class<? extends ISignal> signalClass = signal.getClass();
Set<InputPort<?>> signalReceivedInputPorts;
signalReceivedInputPorts = signalMap.get(signalClass);
if (signalReceivedInputPorts == null) {
signalReceivedInputPorts = new HashSet<InputPort<?>>();
signalMap.put(signalClass, signalReceivedInputPorts);
}
if (!signalReceivedInputPorts.add(inputPort)) {
this.logger.warn("Received more than one signal - " + signal + " - from input port: " + inputPort);
return;
}
if (signal.mayBeTriggered(signalReceivedInputPorts, getInputPorts())) {
try {
signal.trigger(this);
} catch (Exception e) {
......
......@@ -30,7 +30,7 @@ public final class TerminatingSignal implements ISignal {
@Override
public boolean mayBeTriggered(final Set<InputPort<?>> receivedInputPorts, final List<InputPort<?>> allInputPorts) {
return receivedInputPorts.size() == allInputPorts.size();
return receivedInputPorts.size() >= allInputPorts.size();
}
}
......@@ -16,7 +16,6 @@
package teetime.stage.basic.merger;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
......@@ -66,42 +65,6 @@ public class Merger<T> extends AbstractStage {
outputPort.send(token);
}
/**
* This method is executed, if a signal is sent to a instance of this class.
* Multiple signals of one certain type are ignored, if they are sent to same port.
* Hence a signal is only passed on, when it arrived on all input ports, regardless how often.
*
* @param signal
* Signal which is sent
*
* @param inputPort
* The port which the signal was sent to
*/
@Override
public void onSignal(final ISignal signal, final InputPort<?> inputPort) {
if (logger.isTraceEnabled()) {
this.logger.trace("Got signal: " + signal + " from input port: " + inputPort);
}
Class<? extends ISignal> signalClass = signal.getClass();
Set<InputPort<?>> signalReceivedInputPorts;
if (signalMap.containsKey(signalClass)) {
signalReceivedInputPorts = signalMap.get(signalClass);
} else {
signalReceivedInputPorts = new HashSet<InputPort<?>>();
signalMap.put(signalClass, signalReceivedInputPorts);
}
if (!signalReceivedInputPorts.add(inputPort)) {
this.logger.warn("Received more than one signal - " + signal + " - from input port: " + inputPort);
}
if (signal.mayBeTriggered(signalReceivedInputPorts, getInputPorts())) {
super.onSignal(signal, inputPort);
}
}
public IMergerStrategy getMergerStrategy() {
return this.strategy;
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment