diff --git a/src/main/java/teetime/framework/AbstractStage.java b/src/main/java/teetime/framework/AbstractStage.java index 666863efdd79b3e1e053be14d591dd84aaf8aea9..6f3cf0e27497ff4fb545f13784582a601c31c1d4 100644 --- a/src/main/java/teetime/framework/AbstractStage.java +++ b/src/main/java/teetime/framework/AbstractStage.java @@ -29,7 +29,7 @@ public abstract class AbstractStage extends Stage { private static final IPipe DUMMY_PORT = new DummyPipe(); - private final Set<ISignal> triggeredSignals = new HashSet<ISignal>(); + private final Set<Class<? extends ISignal>> triggeredSignalTypes = new HashSet<Class<? extends ISignal>>(); private InputPort<?>[] inputPorts = new InputPort<?>[0]; private OutputPort<?>[] outputPorts = new OutputPort<?>[0]; @@ -73,16 +73,16 @@ public abstract class AbstractStage extends Stage { * @return <code>true</code> if this stage has already received the given <code>signal</code>, <code>false</code> otherwise */ protected boolean signalAlreadyReceived(final ISignal signal, final InputPort<?> inputPort) { - boolean signalAlreadyReceived = this.triggeredSignals.contains(signal); + boolean signalAlreadyReceived = this.triggeredSignalTypes.contains(signal.getClass()); if (signalAlreadyReceived) { if (logger.isTraceEnabled()) { - logger.trace("Got signal: " + signal + " again from input port: " + inputPort); + logger.trace("Got signal again: " + signal + " from input port: " + inputPort); } } else { if (logger.isTraceEnabled()) { logger.trace("Got signal: " + signal + " from input port: " + inputPort); } - this.triggeredSignals.add(signal); + this.triggeredSignalTypes.add(signal.getClass()); } return signalAlreadyReceived; } diff --git a/src/main/java/teetime/framework/OutputPort.java b/src/main/java/teetime/framework/OutputPort.java index 4d23baaad7a2030dd3bbe361f8161b8bb63dca65..beea4479857fe6c3aff579281a942df1fc13a355 100644 --- a/src/main/java/teetime/framework/OutputPort.java +++ b/src/main/java/teetime/framework/OutputPort.java @@ -69,5 +69,4 @@ public final class OutputPort<T> extends AbstractPort<T> { } pipe.sendSignal(signal); } - } diff --git a/src/main/java/teetime/framework/signal/ISignal.java b/src/main/java/teetime/framework/signal/ISignal.java index 83326df1f05d75327995622876c070bd03afcd95..eba8019a120575f7982a80a1466559d01404b5d9 100644 --- a/src/main/java/teetime/framework/signal/ISignal.java +++ b/src/main/java/teetime/framework/signal/ISignal.java @@ -15,9 +15,22 @@ */ package teetime.framework.signal; +import java.util.Set; + +import teetime.framework.InputPort; import teetime.framework.Stage; public interface ISignal { void trigger(Stage stage); + + /** + * Used by the merger only (so far) + * + * @param receivedInputPorts + * @param allInputPorts + * @param stageState + * @return <code>true</code> iff the signal may be triggered, <code>false</code> otherwise. + */ + boolean mayBeTriggered(Set<InputPort<?>> receivedInputPorts, InputPort<?>[] allInputPorts); } diff --git a/src/main/java/teetime/framework/signal/InitializingSignal.java b/src/main/java/teetime/framework/signal/InitializingSignal.java index bb631edcf9aac720b8bb9ff64833dbc34044eef1..37fe60d24c7cfaaea88d7b7ad75b8320f53fd58d 100644 --- a/src/main/java/teetime/framework/signal/InitializingSignal.java +++ b/src/main/java/teetime/framework/signal/InitializingSignal.java @@ -15,33 +15,26 @@ */ package teetime.framework.signal; -import java.util.LinkedList; -import java.util.List; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.util.Set; +import teetime.framework.InputPort; import teetime.framework.Stage; -public final class InitializingSignal implements ISignal { - - private static final Logger LOGGER = LoggerFactory.getLogger(StartingSignal.class); - private final List<Exception> catchedExceptions = new LinkedList<Exception>(); - - public InitializingSignal() {} +public final class InitializingSignal extends AbstractSignal { @Override public void trigger(final Stage stage) { try { stage.onInitializing(); - } catch (Exception e) { // NOCS (Stages can throw any arbitrary Exception) + } catch (final Exception e) { // NOCS NOPMD (Stages can throw any arbitrary Exception) this.catchedExceptions.add(e); - LOGGER.error("Exception while sending the start signal", e); + LOGGER.error("Exception while sending the initializing signal", e); } } - public List<Exception> getCatchedExceptions() { - return this.catchedExceptions; + @Override + public boolean mayBeTriggered(final Set<InputPort<?>> receivedInputPorts, final InputPort<?>[] allInputPorts) { + return true; } } diff --git a/src/main/java/teetime/framework/signal/StartingSignal.java b/src/main/java/teetime/framework/signal/StartingSignal.java index aea07608668cc1449a58c9abea0fac85ad65c520..29f2a99a71ffcee1f2a844736a873eea8a64c046 100644 --- a/src/main/java/teetime/framework/signal/StartingSignal.java +++ b/src/main/java/teetime/framework/signal/StartingSignal.java @@ -15,20 +15,12 @@ */ package teetime.framework.signal; -import java.util.LinkedList; -import java.util.List; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.util.Set; +import teetime.framework.InputPort; import teetime.framework.Stage; -public final class StartingSignal implements ISignal { - - private static final Logger LOGGER = LoggerFactory.getLogger(StartingSignal.class); - private final List<Exception> catchedExceptions = new LinkedList<Exception>(); - - public StartingSignal() {} +public final class StartingSignal extends AbstractSignal { @Override public void trigger(final Stage stage) { @@ -40,8 +32,9 @@ public final class StartingSignal implements ISignal { } } - public List<Exception> getCatchedExceptions() { - return this.catchedExceptions; + @Override + public boolean mayBeTriggered(final Set<InputPort<?>> receivedInputPorts, final InputPort<?>[] allInputPorts) { + return true; } } diff --git a/src/main/java/teetime/framework/signal/TerminatingSignal.java b/src/main/java/teetime/framework/signal/TerminatingSignal.java index 6ff849490c99d57cbcf92eecc4506caabdc78af1..6be1b182ba7a4d40788103dddc7e42d74bd9824e 100644 --- a/src/main/java/teetime/framework/signal/TerminatingSignal.java +++ b/src/main/java/teetime/framework/signal/TerminatingSignal.java @@ -15,20 +15,12 @@ */ package teetime.framework.signal; -import java.util.LinkedList; -import java.util.List; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.util.Set; +import teetime.framework.InputPort; import teetime.framework.Stage; -public final class TerminatingSignal implements ISignal { - - private static final Logger LOGGER = LoggerFactory.getLogger(TerminatingSignal.class); - private final List<Exception> catchedExceptions = new LinkedList<Exception>(); - - public TerminatingSignal() {} +public final class TerminatingSignal extends AbstractSignal { @Override public void trigger(final Stage stage) { @@ -40,8 +32,9 @@ public final class TerminatingSignal implements ISignal { } } - public List<Exception> getCatchedExceptions() { - return this.catchedExceptions; + @Override + public boolean mayBeTriggered(final Set<InputPort<?>> receivedInputPorts, final InputPort<?>[] allInputPorts) { + return receivedInputPorts.size() == allInputPorts.length; } } diff --git a/src/main/java/teetime/framework/signal/ValidatingSignal.java b/src/main/java/teetime/framework/signal/ValidatingSignal.java index a15b37ee44f7da71e770b02a7c48322eccd83d4f..a0a4ea096de5951ea7708425735137432958eee4 100644 --- a/src/main/java/teetime/framework/signal/ValidatingSignal.java +++ b/src/main/java/teetime/framework/signal/ValidatingSignal.java @@ -17,7 +17,9 @@ package teetime.framework.signal; import java.util.LinkedList; import java.util.List; +import java.util.Set; +import teetime.framework.InputPort; import teetime.framework.Stage; import teetime.framework.validation.InvalidPortConnection; @@ -25,8 +27,6 @@ public final class ValidatingSignal implements ISignal { private final List<InvalidPortConnection> invalidPortConnections = new LinkedList<InvalidPortConnection>(); - public ValidatingSignal() {} - @Override public void trigger(final Stage stage) { stage.onValidating(this.invalidPortConnections); @@ -36,4 +36,9 @@ public final class ValidatingSignal implements ISignal { return this.invalidPortConnections; } + @Override + public boolean mayBeTriggered(final Set<InputPort<?>> receivedInputPorts, final InputPort<?>[] allInputPorts) { + return true; + } + } diff --git a/src/main/java/teetime/stage/basic/merger/Merger.java b/src/main/java/teetime/stage/basic/merger/Merger.java index e064fde9b718a37dda3ddf663a706122752cbd33..4bf4ef573320f0e16238a3a173608e5d24453dfb 100644 --- a/src/main/java/teetime/stage/basic/merger/Merger.java +++ b/src/main/java/teetime/stage/basic/merger/Merger.java @@ -24,9 +24,6 @@ import teetime.framework.AbstractStage; import teetime.framework.InputPort; import teetime.framework.OutputPort; import teetime.framework.signal.ISignal; -import teetime.framework.signal.InitializingSignal; -import teetime.framework.signal.StartingSignal; -import teetime.framework.signal.TerminatingSignal; /** * @@ -85,34 +82,53 @@ public final class Merger<T> extends AbstractStage { Class<? extends ISignal> signalClass = signal.getClass(); + Set<InputPort<?>> inputPorts; if (signalMap.containsKey(signalClass)) { - Set<InputPort<?>> set = signalMap.get(signalClass); - if (!set.add(inputPort)) { - this.logger.warn("Received more than one signal - " + signal + " - from input port: " + inputPort); - } - if (signalMap.get(signalClass).size() == this.getInputPorts().length && signalClass == TerminatingSignal.class) { - signal.trigger(this); - sendSignalToOutputPorts(signal); - signalMap.remove(signalClass); - } + inputPorts = signalMap.get(signalClass); } else { - Set<InputPort<?>> tempSet = new HashSet<InputPort<?>>(); - tempSet.add(inputPort); - signalMap.put((Class<ISignal>) signalClass, tempSet); - if (signalClass == InitializingSignal.class || signalClass == StartingSignal.class) { - signal.trigger(this); - sendSignalToOutputPorts(signal); - } + inputPorts = new HashSet<InputPort<?>>(); + signalMap.put((Class<ISignal>) signalClass, inputPorts); } - } + if (!inputPorts.add(inputPort)) { + this.logger.warn("Received more than one signal - " + signal + " - from input port: " + inputPort); + } - private void sendSignalToOutputPorts(final ISignal signal) { - for (OutputPort<?> outputPort : getOutputPorts()) { - outputPort.sendSignal(signal); + if (signal.mayBeTriggered(inputPorts, getInputPorts())) { + super.onSignal(signal, inputPort); } + + // if (signalMap.containsKey(signalClass)) { + // Set<InputPort<?>> set = signalMap.get(signalClass); + // if (!set.add(inputPort)) { + // this.logger.warn("Received more than one signal - " + signal + " - from input port: " + inputPort); + // } + // if (signalMap.get(signalClass).size() == this.getInputPorts().length && signalClass == TerminatingSignal.class) { + // triggerAndPassOn(signal); + // // signalMap.remove(signalClass); + // } + // } else { + // Set<InputPort<?>> tempSet = new HashSet<InputPort<?>>(); + // signalMap.put((Class<ISignal>) signalClass, tempSet); + // tempSet.add(inputPort); + // if (signalClass == InitializingSignal.class || signalClass == StartingSignal.class) { + // triggerAndPassOn(signal); + // } + // } + } + // private void triggerAndPassOn(final ISignal signal) { + // signal.trigger(this); + // sendSignalToOutputPorts(signal); + // } + + // private void sendSignalToOutputPorts(final ISignal signal) { + // for (OutputPort<?> outputPort : getOutputPorts()) { + // outputPort.sendSignal(signal); + // } + // } + public IMergerStrategy getMergerStrategy() { return this.strategy; } diff --git a/src/test/java/teetime/stage/basic/merger/MergerSignalTest.java b/src/test/java/teetime/stage/basic/merger/MergerSignalTest.java index b1967bcd43e5f40cf1556907bed08dfe161ffb41..35ca6644c51644dc2b1f3c334deb7a5d37b042de 100644 --- a/src/test/java/teetime/stage/basic/merger/MergerSignalTest.java +++ b/src/test/java/teetime/stage/basic/merger/MergerSignalTest.java @@ -18,6 +18,7 @@ package teetime.stage.basic.merger; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import org.junit.Before; import org.junit.Test; import teetime.framework.InputPort; @@ -29,83 +30,80 @@ public class MergerSignalTest { private Merger<Integer> merger; private InputPort<Integer> firstPort; private InputPort<Integer> secondPort; - private MergerTestingPipe testPipe; + private MergerTestingPipe mergerOutputPipe; + @Before public void beforeSignalTesting() { merger = new Merger<Integer>(); firstPort = merger.getNewInputPort(); secondPort = merger.getNewInputPort(); - testPipe = new MergerTestingPipe(); - merger.getOutputPort().setPipe(testPipe); + mergerOutputPipe = new MergerTestingPipe(); + merger.getOutputPort().setPipe(mergerOutputPipe); } @Test public void testSameSignal() { - this.beforeSignalTesting(); merger.onSignal(new StartingSignal(), firstPort); - assertTrue(testPipe.startSent()); - testPipe.reset(); + assertTrue(mergerOutputPipe.startSent()); + mergerOutputPipe.reset(); merger.onSignal(new StartingSignal(), secondPort); - assertFalse(testPipe.startSent()); + assertFalse(mergerOutputPipe.startSent()); } @Test public void testDifferentSignals() { - this.beforeSignalTesting(); merger.onSignal(new StartingSignal(), firstPort); - assertTrue(testPipe.startSent()); - testPipe.reset(); + assertTrue(mergerOutputPipe.startSent()); + mergerOutputPipe.reset(); merger.onSignal(new TerminatingSignal(), secondPort); - assertFalse(testPipe.startSent()); + assertFalse(mergerOutputPipe.startSent()); } @Test public void testInterleavedSignals() { - this.beforeSignalTesting(); merger.onSignal(new StartingSignal(), firstPort); - assertTrue(testPipe.startSent()); - assertFalse(testPipe.terminateSent()); - testPipe.reset(); + assertTrue(mergerOutputPipe.startSent()); + assertFalse(mergerOutputPipe.terminateSent()); + mergerOutputPipe.reset(); merger.onSignal(new TerminatingSignal(), secondPort); - assertFalse(testPipe.startSent()); - assertFalse(testPipe.terminateSent()); - testPipe.reset(); + assertFalse(mergerOutputPipe.startSent()); + assertFalse(mergerOutputPipe.terminateSent()); + mergerOutputPipe.reset(); merger.onSignal(new TerminatingSignal(), firstPort); - assertFalse(testPipe.startSent()); - assertTrue(testPipe.terminateSent()); - testPipe.reset(); + assertFalse(mergerOutputPipe.startSent()); + assertTrue(mergerOutputPipe.terminateSent()); + mergerOutputPipe.reset(); merger.onSignal(new TerminatingSignal(), firstPort); - assertFalse(testPipe.startSent()); - assertFalse(testPipe.terminateSent()); - testPipe.reset(); + assertFalse(mergerOutputPipe.startSent()); + assertFalse(mergerOutputPipe.terminateSent()); + mergerOutputPipe.reset(); merger.onSignal(new StartingSignal(), secondPort); - assertFalse(testPipe.startSent()); - assertFalse(testPipe.terminateSent()); + assertFalse(mergerOutputPipe.startSent()); + assertFalse(mergerOutputPipe.terminateSent()); } @Test public void testMultipleSignals() { - this.beforeSignalTesting(); merger.onSignal(new StartingSignal(), firstPort); - assertTrue(testPipe.startSent()); - testPipe.reset(); + assertTrue(mergerOutputPipe.startSent()); + mergerOutputPipe.reset(); merger.onSignal(new StartingSignal(), firstPort); - assertFalse(testPipe.startSent()); - testPipe.reset(); + assertFalse(mergerOutputPipe.startSent()); + mergerOutputPipe.reset(); merger.onSignal(new StartingSignal(), firstPort); - assertFalse(testPipe.startSent()); - testPipe.reset(); + assertFalse(mergerOutputPipe.startSent()); + mergerOutputPipe.reset(); merger.onSignal(new StartingSignal(), secondPort); - assertFalse(testPipe.startSent()); + assertFalse(mergerOutputPipe.startSent()); } } diff --git a/src/test/java/teetime/stage/basic/merger/MergerTestingPipe.java b/src/test/java/teetime/stage/basic/merger/MergerTestingPipe.java index afbc6561b5acb64babb446f5cba1893ca5f1f53a..42696bf77420db1cbc4693771311b38ef5d47152 100644 --- a/src/test/java/teetime/stage/basic/merger/MergerTestingPipe.java +++ b/src/test/java/teetime/stage/basic/merger/MergerTestingPipe.java @@ -22,18 +22,16 @@ import teetime.framework.signal.ISignal; import teetime.framework.signal.StartingSignal; import teetime.framework.signal.TerminatingSignal; -public class MergerTestingPipe implements IPipe { +class MergerTestingPipe implements IPipe { private boolean startSent = false; private boolean terminateSent = false; - public MergerTestingPipe() {} - @Override public void sendSignal(final ISignal signal) { - if (signal.getClass().equals(StartingSignal.class)) { + if (signal instanceof StartingSignal) { this.startSent = true; - } else if (signal.getClass().equals(TerminatingSignal.class)) { + } else if (signal instanceof TerminatingSignal) { this.terminateSent = true; } }