Skip to content
Snippets Groups Projects
Commit b3cfcaa0 authored by Christian Wulf's avatar Christian Wulf
Browse files

Merge branch 'mayBeTriggered' into 'master'

myBeTriggered in AbstractStage.onSignal

See merge request !52
parents eac79789 58e50f31
No related branches found
No related tags found
No related merge requests found
...@@ -15,8 +15,10 @@ ...@@ -15,8 +15,10 @@
*/ */
package teetime.framework; package teetime.framework;
import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;
import teetime.framework.pipe.IPipe; import teetime.framework.pipe.IPipe;
...@@ -27,6 +29,7 @@ import teetime.util.framework.port.PortRemovedListener; ...@@ -27,6 +29,7 @@ import teetime.util.framework.port.PortRemovedListener;
public abstract class AbstractStage extends Stage { public abstract class AbstractStage extends Stage {
private final Map<Class<? extends ISignal>, Set<InputPort<?>>> signalMap = new HashMap<Class<? extends ISignal>, Set<InputPort<?>>>();
private final Set<Class<? extends ISignal>> triggeredSignalTypes = new HashSet<Class<? extends ISignal>>(); private final Set<Class<? extends ISignal>> triggeredSignalTypes = new HashSet<Class<? extends ISignal>>();
private final PortList<InputPort<?>> inputPorts = new PortList<InputPort<?>>(); private final PortList<InputPort<?>> inputPorts = new PortList<InputPort<?>>();
...@@ -54,7 +57,21 @@ public abstract class AbstractStage extends Stage { ...@@ -54,7 +57,21 @@ public abstract class AbstractStage extends Stage {
@SuppressWarnings("PMD.DataflowAnomalyAnalysis") @SuppressWarnings("PMD.DataflowAnomalyAnalysis")
@Override @Override
public void onSignal(final ISignal signal, final InputPort<?> inputPort) { public void onSignal(final ISignal signal, final InputPort<?> inputPort) {
if (!this.signalAlreadyReceived(signal, inputPort)) { Class<? extends ISignal> signalClass = signal.getClass();
Set<InputPort<?>> signalReceivedInputPorts;
if (signalMap.containsKey(signalClass)) {
signalReceivedInputPorts = signalMap.get(signalClass);
} else {
signalReceivedInputPorts = new HashSet<InputPort<?>>();
signalMap.put(signalClass, signalReceivedInputPorts);
}
if (!signalReceivedInputPorts.add(inputPort)) {
this.logger.warn("Received more than one signal - " + signal + " - from input port: " + inputPort);
return;
}
if (signal.mayBeTriggered(signalReceivedInputPorts, getInputPorts())) {
try { try {
signal.trigger(this); signal.trigger(this);
} catch (Exception e) { } catch (Exception e) {
......
...@@ -30,7 +30,7 @@ public final class StartingSignal implements ISignal { ...@@ -30,7 +30,7 @@ public final class StartingSignal implements ISignal {
@Override @Override
public boolean mayBeTriggered(final Set<InputPort<?>> receivedInputPorts, final List<InputPort<?>> allInputPorts) { public boolean mayBeTriggered(final Set<InputPort<?>> receivedInputPorts, final List<InputPort<?>> allInputPorts) {
return true; return receivedInputPorts.size() == 1;
} }
} }
...@@ -30,7 +30,7 @@ public final class TerminatingSignal implements ISignal { ...@@ -30,7 +30,7 @@ public final class TerminatingSignal implements ISignal {
@Override @Override
public boolean mayBeTriggered(final Set<InputPort<?>> receivedInputPorts, final List<InputPort<?>> allInputPorts) { public boolean mayBeTriggered(final Set<InputPort<?>> receivedInputPorts, final List<InputPort<?>> allInputPorts) {
return receivedInputPorts.size() == allInputPorts.size(); return receivedInputPorts.size() >= allInputPorts.size();
} }
} }
...@@ -15,11 +15,7 @@ ...@@ -15,11 +15,7 @@
*/ */
package teetime.stage.basic.merger; package teetime.stage.basic.merger;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set;
import teetime.framework.AbstractStage; import teetime.framework.AbstractStage;
import teetime.framework.InputPort; import teetime.framework.InputPort;
...@@ -42,7 +38,6 @@ import teetime.stage.basic.merger.strategy.RoundRobinStrategy; ...@@ -42,7 +38,6 @@ import teetime.stage.basic.merger.strategy.RoundRobinStrategy;
*/ */
public class Merger<T> extends AbstractStage { public class Merger<T> extends AbstractStage {
private final Map<Class<? extends ISignal>, Set<InputPort<?>>> signalMap;
private final OutputPort<T> outputPort = this.createOutputPort(); private final OutputPort<T> outputPort = this.createOutputPort();
private final IMergerStrategy strategy; private final IMergerStrategy strategy;
...@@ -52,7 +47,6 @@ public class Merger<T> extends AbstractStage { ...@@ -52,7 +47,6 @@ public class Merger<T> extends AbstractStage {
} }
public Merger(final IMergerStrategy strategy) { public Merger(final IMergerStrategy strategy) {
this.signalMap = new HashMap<Class<? extends ISignal>, Set<InputPort<?>>>();
this.strategy = strategy; this.strategy = strategy;
addInputPortRemovedListener(strategy); addInputPortRemovedListener(strategy);
} }
...@@ -66,42 +60,6 @@ public class Merger<T> extends AbstractStage { ...@@ -66,42 +60,6 @@ public class Merger<T> extends AbstractStage {
outputPort.send(token); outputPort.send(token);
} }
/**
* This method is executed, if a signal is sent to a instance of this class.
* Multiple signals of one certain type are ignored, if they are sent to same port.
* Hence a signal is only passed on, when it arrived on all input ports, regardless how often.
*
* @param signal
* Signal which is sent
*
* @param inputPort
* The port which the signal was sent to
*/
@Override
public void onSignal(final ISignal signal, final InputPort<?> inputPort) {
if (logger.isTraceEnabled()) {
this.logger.trace("Got signal: " + signal + " from input port: " + inputPort);
}
Class<? extends ISignal> signalClass = signal.getClass();
Set<InputPort<?>> signalReceivedInputPorts;
if (signalMap.containsKey(signalClass)) {
signalReceivedInputPorts = signalMap.get(signalClass);
} else {
signalReceivedInputPorts = new HashSet<InputPort<?>>();
signalMap.put(signalClass, signalReceivedInputPorts);
}
if (!signalReceivedInputPorts.add(inputPort)) {
this.logger.warn("Received more than one signal - " + signal + " - from input port: " + inputPort);
}
if (signal.mayBeTriggered(signalReceivedInputPorts, getInputPorts())) {
super.onSignal(signal, inputPort);
}
}
public IMergerStrategy getMergerStrategy() { public IMergerStrategy getMergerStrategy() {
return this.strategy; return this.strategy;
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment