diff --git a/src/main/java/teetime/framework/AbstractStage.java b/src/main/java/teetime/framework/AbstractStage.java index 531af2d74fe2d06bc457b41b6eace9236407388b..59012068d271b5ab42fb903cf8da037e51a34575 100644 --- a/src/main/java/teetime/framework/AbstractStage.java +++ b/src/main/java/teetime/framework/AbstractStage.java @@ -29,7 +29,6 @@ import org.slf4j.LoggerFactory; import teetime.framework.exceptionHandling.AbstractExceptionListener; import teetime.framework.exceptionHandling.AbstractExceptionListener.FurtherExecution; import teetime.framework.exceptionHandling.TerminateException; -import teetime.framework.pipe.IPipe; import teetime.framework.signal.ISignal; import teetime.framework.signal.StartingSignal; import teetime.framework.signal.TerminatingSignal; @@ -279,7 +278,7 @@ public abstract class AbstractStage { } public void onValidating(final List<InvalidPortConnection> invalidPortConnections) { - this.validateOutputPorts(invalidPortConnections); + this.checkTypeCompliance(invalidPortConnections); changeState(StageState.VALIDATED); } @@ -299,6 +298,27 @@ public abstract class AbstractStage { calledOnStarting = true; } + /** + * Checks if connections to this pipe are correct in regards to type compliance. + * Incoming elements must be instanceof input port type. + * + * @param invalidPortConnections + * List of invalid connections. Adding invalid connections to this list is a performance advantage in comparison to returning a list by each stage. + */ + private void checkTypeCompliance(final List<InvalidPortConnection> invalidPortConnections) { + for (InputPort<?> port : getInputPorts()) { + Class<?> targetType = port.getType(); + Class<?> sourceType = port.pipe.getSourcePort().getType(); + if (targetType != null && sourceType != null) { + if (!targetType.isAssignableFrom(sourceType)) { // if targetType is not superclass of sourceType + invalidPortConnections.add(new InvalidPortConnection(port.pipe.getSourcePort(), port)); + // throw new IllegalStateException("2002 - Invalid pipe at " + port.toString() + ": " + targetType + " is not a superclass/type of " + + // sourceType); + } + } + } + } + @SuppressWarnings("PMD.SignatureDeclareThrowsException") public void onTerminating() throws Exception { changeState(StageState.TERMINATED); @@ -429,26 +449,6 @@ public abstract class AbstractStage { return outputPort; } - /** - * This should check, if the OutputPorts are connected correctly. This is needed to avoid NullPointerExceptions and other errors. - * - * @param invalidPortConnections - * <i>(Passed as parameter for performance reasons)</i> - */ - @SuppressWarnings("PMD.DataflowAnomalyAnalysis") - public void validateOutputPorts(final List<InvalidPortConnection> invalidPortConnections) { - for (OutputPort<?> outputPort : outputPorts.getOpenedPorts()) { - final IPipe<?> pipe = outputPort.getPipe(); - - final Class<?> sourcePortType = outputPort.getType(); - final Class<?> targetPortType = pipe.getTargetPort().getType(); - if (null == sourcePortType || !sourcePortType.equals(targetPortType)) { - final InvalidPortConnection invalidPortConnection = new InvalidPortConnection(outputPort, pipe.getTargetPort()); - invalidPortConnections.add(invalidPortConnection); - } - } - } - protected void terminate() { changeState(StageState.TERMINATING); } diff --git a/src/main/java/teetime/framework/AbstractSynchedPipe.java b/src/main/java/teetime/framework/AbstractSynchedPipe.java index ce6f322a1ecbb2e269afe4308765166149b90044..5c76b3f95580eb5776e9dfe423979010deaaec3e 100644 --- a/src/main/java/teetime/framework/AbstractSynchedPipe.java +++ b/src/main/java/teetime/framework/AbstractSynchedPipe.java @@ -25,6 +25,7 @@ import org.jctools.queues.spec.Preference; import teetime.framework.signal.ISignal; import teetime.framework.signal.StartingSignal; +import teetime.framework.signal.ValidatingSignal; import teetime.util.framework.concurrent.queue.PCBlockingQueue; import teetime.util.framework.concurrent.queue.putstrategy.PutStrategy; import teetime.util.framework.concurrent.queue.putstrategy.YieldPutStrategy; @@ -67,6 +68,10 @@ public abstract class AbstractSynchedPipe<T> extends AbstractPipe<T> { @Override public final void waitForStartSignal() throws InterruptedException { final ISignal signal = signalQueue.take(); + if (signal instanceof ValidatingSignal) { + this.waitForStartSignal(); + return; + } if (!(signal instanceof StartingSignal)) { throw new IllegalStateException( "2001 - Expected StartingSignal, but was " + signal.getClass().getSimpleName() + " in " + getTargetPort().getOwningStage().getId()); diff --git a/src/main/java/teetime/framework/Execution.java b/src/main/java/teetime/framework/Execution.java index 7bf68323354bdeca93c2f703bcd392cde8d08cf5..660133e77ce972305d48699958a3838f66017045 100644 --- a/src/main/java/teetime/framework/Execution.java +++ b/src/main/java/teetime/framework/Execution.java @@ -56,7 +56,7 @@ public final class Execution<T extends Configuration> { * to be used for the analysis */ public Execution(final T configuration) { - this(configuration, false); + this(configuration, true); } /** @@ -74,10 +74,10 @@ public final class Execution<T extends Configuration> { throw new IllegalStateException("3001 - Configuration has already been used."); } configuration.setInitialized(true); + init(); if (validationEnabled) { validateStages(); } - init(); } // BETTER validate concurrently diff --git a/src/main/java/teetime/framework/validation/AnalysisNotValidException.java b/src/main/java/teetime/framework/validation/AnalysisNotValidException.java index e915f70c939f3f47f48c0081d59ee41a1c129026..e7028dc98cb5829b2b0a47bdb03c0ecd214a07a5 100644 --- a/src/main/java/teetime/framework/validation/AnalysisNotValidException.java +++ b/src/main/java/teetime/framework/validation/AnalysisNotValidException.java @@ -34,6 +34,7 @@ public class AnalysisNotValidException extends RuntimeException { @Override public String getMessage() { final StringBuilder builder = new StringBuilder(this.invalidPortConnections.size() * 40); + builder.append("2002 - "); builder.append(this.invalidPortConnections.size()); builder.append(" invalid port connections were detected.\n"); Joiner.on("\n").appendTo(builder, this.invalidPortConnections); diff --git a/src/test/java/teetime/framework/AbstractStageTest.java b/src/test/java/teetime/framework/AbstractStageTest.java index 3733e80585054c53007b9e1886761265ef87c75c..f00db03f9001554fdcb4ed23eb3edb18157b8bd7 100644 --- a/src/test/java/teetime/framework/AbstractStageTest.java +++ b/src/test/java/teetime/framework/AbstractStageTest.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import org.junit.Assert; import org.junit.Before; @@ -28,6 +29,7 @@ import org.junit.Test; import teetime.framework.signal.StartingSignal; import teetime.framework.signal.TerminatingSignal; +import teetime.framework.validation.AnalysisNotValidException; import teetime.stage.Cache; import teetime.stage.Counter; import teetime.stage.InitialElementProducer; @@ -98,6 +100,40 @@ public class AbstractStageTest { } + @Test(expected = AnalysisNotValidException.class) + public void testCheckTypeCompliance() throws Exception { + try { + // Correct connection + new Execution<Configuration>(new TestConnectionsConfig(false), true).executeBlocking(); + } catch (AnalysisNotValidException e) { + fail(); + } + // Incorrect connection should fail! + new Execution<Configuration>(new TestConnectionsConfig(true), true).executeBlocking(); + } + + private class TestConnectionsConfig extends Configuration { + + @SuppressWarnings({ "unchecked", "rawtypes" }) + TestConnectionsConfig(final boolean fails) { + EmptyStage stage = new EmptyStage(); + if (fails) { + connectPorts((OutputPort) new EmptyStage().createOutputPort(Object.class), new EmptyStage().createInputPort(Integer.class)); + } else { + connectPorts(stage.createOutputPort(Integer.class), new EmptyStage().createInputPort(Object.class)); + } + stage.declareActive(); + } + + } + + private class EmptyStage extends AbstractStage { + + @Override + protected void execute() { + terminate(); + } + } // // // Moved from MergerSignalTest diff --git a/src/test/java/teetime/stage/basic/distributor/DistributorTest.java b/src/test/java/teetime/stage/basic/distributor/DistributorTest.java index c7cc231c0285951ddcf1a26e6375fd09d08bd741..59cd6239eb274a0c8d768f8ee65dbd98ff99ad35 100644 --- a/src/test/java/teetime/stage/basic/distributor/DistributorTest.java +++ b/src/test/java/teetime/stage/basic/distributor/DistributorTest.java @@ -24,15 +24,17 @@ import static teetime.framework.test.StageTester.test; import java.util.ArrayList; import java.util.List; +import java.util.Map.Entry; import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; +import teetime.framework.ExecutionException; +import teetime.stage.basic.distributor.strategy.BlockingRoundRobinStrategy; import teetime.stage.basic.distributor.strategy.CloneStrategy; import teetime.stage.basic.distributor.strategy.CopyByReferenceStrategy; -import teetime.stage.basic.distributor.strategy.BlockingRoundRobinStrategy; import teetime.stage.basic.distributor.strategy.NonBlockingRoundRobinStrategy; /** @@ -128,11 +130,21 @@ public class DistributorTest { @Test public void cloneForIntegerShouldNotWork() throws Exception { this.distributor.setStrategy(new CloneStrategy()); - this.distributor.getNewOutputPort(); - this.distributor.onStarting(); expectedException.expect(IllegalStateException.class); - this.distributor.execute(1); + + try { + test(distributor).and().send(1).to(distributor.getInputPort()).and().receive(firstIntegers).from(distributor.getNewOutputPort()).and() + .start(); + } catch (ExecutionException e) { + // TODO Auto-generated catch block + for (Entry<Thread, List<Exception>> entry : e.getThrownExceptions().entrySet()) { + for (Exception value : entry.getValue()) { + throw value; + } + } + } + } @Test