Skip to content
Snippets Groups Projects
Commit 918a8fbb authored by Christian Wulf's avatar Christian Wulf
Browse files

fixed signal handling in merger;

refactored MergerSignalTest
parent 8e647783
No related branches found
No related tags found
No related merge requests found
Showing with 118 additions and 110 deletions
......@@ -29,7 +29,7 @@ public abstract class AbstractStage extends Stage {
private static final IPipe DUMMY_PORT = new DummyPipe();
private final Set<ISignal> triggeredSignals = new HashSet<ISignal>();
private final Set<Class<? extends ISignal>> triggeredSignalTypes = new HashSet<Class<? extends ISignal>>();
private InputPort<?>[] inputPorts = new InputPort<?>[0];
private OutputPort<?>[] outputPorts = new OutputPort<?>[0];
......@@ -73,16 +73,16 @@ public abstract class AbstractStage extends Stage {
* @return <code>true</code> if this stage has already received the given <code>signal</code>, <code>false</code> otherwise
*/
protected boolean signalAlreadyReceived(final ISignal signal, final InputPort<?> inputPort) {
boolean signalAlreadyReceived = this.triggeredSignals.contains(signal);
boolean signalAlreadyReceived = this.triggeredSignalTypes.contains(signal.getClass());
if (signalAlreadyReceived) {
if (logger.isTraceEnabled()) {
logger.trace("Got signal: " + signal + " again from input port: " + inputPort);
logger.trace("Got signal again: " + signal + " from input port: " + inputPort);
}
} else {
if (logger.isTraceEnabled()) {
logger.trace("Got signal: " + signal + " from input port: " + inputPort);
}
this.triggeredSignals.add(signal);
this.triggeredSignalTypes.add(signal.getClass());
}
return signalAlreadyReceived;
}
......
......@@ -69,5 +69,4 @@ public final class OutputPort<T> extends AbstractPort<T> {
}
pipe.sendSignal(signal);
}
}
......@@ -15,9 +15,22 @@
*/
package teetime.framework.signal;
import java.util.Set;
import teetime.framework.InputPort;
import teetime.framework.Stage;
public interface ISignal {
void trigger(Stage stage);
/**
* Used by the merger only (so far)
*
* @param receivedInputPorts
* @param allInputPorts
* @param stageState
* @return <code>true</code> iff the signal may be triggered, <code>false</code> otherwise.
*/
boolean mayBeTriggered(Set<InputPort<?>> receivedInputPorts, InputPort<?>[] allInputPorts);
}
......@@ -15,33 +15,26 @@
*/
package teetime.framework.signal;
import java.util.LinkedList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Set;
import teetime.framework.InputPort;
import teetime.framework.Stage;
public final class InitializingSignal implements ISignal {
private static final Logger LOGGER = LoggerFactory.getLogger(StartingSignal.class);
private final List<Exception> catchedExceptions = new LinkedList<Exception>();
public InitializingSignal() {}
public final class InitializingSignal extends AbstractSignal {
@Override
public void trigger(final Stage stage) {
try {
stage.onInitializing();
} catch (Exception e) { // NOCS (Stages can throw any arbitrary Exception)
} catch (final Exception e) { // NOCS NOPMD (Stages can throw any arbitrary Exception)
this.catchedExceptions.add(e);
LOGGER.error("Exception while sending the start signal", e);
LOGGER.error("Exception while sending the initializing signal", e);
}
}
public List<Exception> getCatchedExceptions() {
return this.catchedExceptions;
@Override
public boolean mayBeTriggered(final Set<InputPort<?>> receivedInputPorts, final InputPort<?>[] allInputPorts) {
return true;
}
}
......@@ -15,20 +15,12 @@
*/
package teetime.framework.signal;
import java.util.LinkedList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Set;
import teetime.framework.InputPort;
import teetime.framework.Stage;
public final class StartingSignal implements ISignal {
private static final Logger LOGGER = LoggerFactory.getLogger(StartingSignal.class);
private final List<Exception> catchedExceptions = new LinkedList<Exception>();
public StartingSignal() {}
public final class StartingSignal extends AbstractSignal {
@Override
public void trigger(final Stage stage) {
......@@ -40,8 +32,9 @@ public final class StartingSignal implements ISignal {
}
}
public List<Exception> getCatchedExceptions() {
return this.catchedExceptions;
@Override
public boolean mayBeTriggered(final Set<InputPort<?>> receivedInputPorts, final InputPort<?>[] allInputPorts) {
return true;
}
}
......@@ -15,20 +15,12 @@
*/
package teetime.framework.signal;
import java.util.LinkedList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Set;
import teetime.framework.InputPort;
import teetime.framework.Stage;
public final class TerminatingSignal implements ISignal {
private static final Logger LOGGER = LoggerFactory.getLogger(TerminatingSignal.class);
private final List<Exception> catchedExceptions = new LinkedList<Exception>();
public TerminatingSignal() {}
public final class TerminatingSignal extends AbstractSignal {
@Override
public void trigger(final Stage stage) {
......@@ -40,8 +32,9 @@ public final class TerminatingSignal implements ISignal {
}
}
public List<Exception> getCatchedExceptions() {
return this.catchedExceptions;
@Override
public boolean mayBeTriggered(final Set<InputPort<?>> receivedInputPorts, final InputPort<?>[] allInputPorts) {
return receivedInputPorts.size() == allInputPorts.length;
}
}
......@@ -17,7 +17,9 @@ package teetime.framework.signal;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import teetime.framework.InputPort;
import teetime.framework.Stage;
import teetime.framework.validation.InvalidPortConnection;
......@@ -25,8 +27,6 @@ public final class ValidatingSignal implements ISignal {
private final List<InvalidPortConnection> invalidPortConnections = new LinkedList<InvalidPortConnection>();
public ValidatingSignal() {}
@Override
public void trigger(final Stage stage) {
stage.onValidating(this.invalidPortConnections);
......@@ -36,4 +36,9 @@ public final class ValidatingSignal implements ISignal {
return this.invalidPortConnections;
}
@Override
public boolean mayBeTriggered(final Set<InputPort<?>> receivedInputPorts, final InputPort<?>[] allInputPorts) {
return true;
}
}
......@@ -24,9 +24,6 @@ import teetime.framework.AbstractStage;
import teetime.framework.InputPort;
import teetime.framework.OutputPort;
import teetime.framework.signal.ISignal;
import teetime.framework.signal.InitializingSignal;
import teetime.framework.signal.StartingSignal;
import teetime.framework.signal.TerminatingSignal;
/**
*
......@@ -85,34 +82,53 @@ public final class Merger<T> extends AbstractStage {
Class<? extends ISignal> signalClass = signal.getClass();
Set<InputPort<?>> inputPorts;
if (signalMap.containsKey(signalClass)) {
Set<InputPort<?>> set = signalMap.get(signalClass);
if (!set.add(inputPort)) {
this.logger.warn("Received more than one signal - " + signal + " - from input port: " + inputPort);
}
if (signalMap.get(signalClass).size() == this.getInputPorts().length && signalClass == TerminatingSignal.class) {
signal.trigger(this);
sendSignalToOutputPorts(signal);
signalMap.remove(signalClass);
}
inputPorts = signalMap.get(signalClass);
} else {
Set<InputPort<?>> tempSet = new HashSet<InputPort<?>>();
tempSet.add(inputPort);
signalMap.put((Class<ISignal>) signalClass, tempSet);
if (signalClass == InitializingSignal.class || signalClass == StartingSignal.class) {
signal.trigger(this);
sendSignalToOutputPorts(signal);
}
inputPorts = new HashSet<InputPort<?>>();
signalMap.put((Class<ISignal>) signalClass, inputPorts);
}
}
if (!inputPorts.add(inputPort)) {
this.logger.warn("Received more than one signal - " + signal + " - from input port: " + inputPort);
}
private void sendSignalToOutputPorts(final ISignal signal) {
for (OutputPort<?> outputPort : getOutputPorts()) {
outputPort.sendSignal(signal);
if (signal.mayBeTriggered(inputPorts, getInputPorts())) {
super.onSignal(signal, inputPort);
}
// if (signalMap.containsKey(signalClass)) {
// Set<InputPort<?>> set = signalMap.get(signalClass);
// if (!set.add(inputPort)) {
// this.logger.warn("Received more than one signal - " + signal + " - from input port: " + inputPort);
// }
// if (signalMap.get(signalClass).size() == this.getInputPorts().length && signalClass == TerminatingSignal.class) {
// triggerAndPassOn(signal);
// // signalMap.remove(signalClass);
// }
// } else {
// Set<InputPort<?>> tempSet = new HashSet<InputPort<?>>();
// signalMap.put((Class<ISignal>) signalClass, tempSet);
// tempSet.add(inputPort);
// if (signalClass == InitializingSignal.class || signalClass == StartingSignal.class) {
// triggerAndPassOn(signal);
// }
// }
}
// private void triggerAndPassOn(final ISignal signal) {
// signal.trigger(this);
// sendSignalToOutputPorts(signal);
// }
// private void sendSignalToOutputPorts(final ISignal signal) {
// for (OutputPort<?> outputPort : getOutputPorts()) {
// outputPort.sendSignal(signal);
// }
// }
public IMergerStrategy getMergerStrategy() {
return this.strategy;
}
......
......@@ -18,6 +18,7 @@ package teetime.stage.basic.merger;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import org.junit.Before;
import org.junit.Test;
import teetime.framework.InputPort;
......@@ -29,83 +30,80 @@ public class MergerSignalTest {
private Merger<Integer> merger;
private InputPort<Integer> firstPort;
private InputPort<Integer> secondPort;
private MergerTestingPipe testPipe;
private MergerTestingPipe mergerOutputPipe;
@Before
public void beforeSignalTesting() {
merger = new Merger<Integer>();
firstPort = merger.getNewInputPort();
secondPort = merger.getNewInputPort();
testPipe = new MergerTestingPipe();
merger.getOutputPort().setPipe(testPipe);
mergerOutputPipe = new MergerTestingPipe();
merger.getOutputPort().setPipe(mergerOutputPipe);
}
@Test
public void testSameSignal() {
this.beforeSignalTesting();
merger.onSignal(new StartingSignal(), firstPort);
assertTrue(testPipe.startSent());
testPipe.reset();
assertTrue(mergerOutputPipe.startSent());
mergerOutputPipe.reset();
merger.onSignal(new StartingSignal(), secondPort);
assertFalse(testPipe.startSent());
assertFalse(mergerOutputPipe.startSent());
}
@Test
public void testDifferentSignals() {
this.beforeSignalTesting();
merger.onSignal(new StartingSignal(), firstPort);
assertTrue(testPipe.startSent());
testPipe.reset();
assertTrue(mergerOutputPipe.startSent());
mergerOutputPipe.reset();
merger.onSignal(new TerminatingSignal(), secondPort);
assertFalse(testPipe.startSent());
assertFalse(mergerOutputPipe.startSent());
}
@Test
public void testInterleavedSignals() {
this.beforeSignalTesting();
merger.onSignal(new StartingSignal(), firstPort);
assertTrue(testPipe.startSent());
assertFalse(testPipe.terminateSent());
testPipe.reset();
assertTrue(mergerOutputPipe.startSent());
assertFalse(mergerOutputPipe.terminateSent());
mergerOutputPipe.reset();
merger.onSignal(new TerminatingSignal(), secondPort);
assertFalse(testPipe.startSent());
assertFalse(testPipe.terminateSent());
testPipe.reset();
assertFalse(mergerOutputPipe.startSent());
assertFalse(mergerOutputPipe.terminateSent());
mergerOutputPipe.reset();
merger.onSignal(new TerminatingSignal(), firstPort);
assertFalse(testPipe.startSent());
assertTrue(testPipe.terminateSent());
testPipe.reset();
assertFalse(mergerOutputPipe.startSent());
assertTrue(mergerOutputPipe.terminateSent());
mergerOutputPipe.reset();
merger.onSignal(new TerminatingSignal(), firstPort);
assertFalse(testPipe.startSent());
assertFalse(testPipe.terminateSent());
testPipe.reset();
assertFalse(mergerOutputPipe.startSent());
assertFalse(mergerOutputPipe.terminateSent());
mergerOutputPipe.reset();
merger.onSignal(new StartingSignal(), secondPort);
assertFalse(testPipe.startSent());
assertFalse(testPipe.terminateSent());
assertFalse(mergerOutputPipe.startSent());
assertFalse(mergerOutputPipe.terminateSent());
}
@Test
public void testMultipleSignals() {
this.beforeSignalTesting();
merger.onSignal(new StartingSignal(), firstPort);
assertTrue(testPipe.startSent());
testPipe.reset();
assertTrue(mergerOutputPipe.startSent());
mergerOutputPipe.reset();
merger.onSignal(new StartingSignal(), firstPort);
assertFalse(testPipe.startSent());
testPipe.reset();
assertFalse(mergerOutputPipe.startSent());
mergerOutputPipe.reset();
merger.onSignal(new StartingSignal(), firstPort);
assertFalse(testPipe.startSent());
testPipe.reset();
assertFalse(mergerOutputPipe.startSent());
mergerOutputPipe.reset();
merger.onSignal(new StartingSignal(), secondPort);
assertFalse(testPipe.startSent());
assertFalse(mergerOutputPipe.startSent());
}
}
......@@ -22,18 +22,16 @@ import teetime.framework.signal.ISignal;
import teetime.framework.signal.StartingSignal;
import teetime.framework.signal.TerminatingSignal;
public class MergerTestingPipe implements IPipe {
class MergerTestingPipe implements IPipe {
private boolean startSent = false;
private boolean terminateSent = false;
public MergerTestingPipe() {}
@Override
public void sendSignal(final ISignal signal) {
if (signal.getClass().equals(StartingSignal.class)) {
if (signal instanceof StartingSignal) {
this.startSent = true;
} else if (signal.getClass().equals(TerminatingSignal.class)) {
} else if (signal instanceof TerminatingSignal) {
this.terminateSent = true;
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment