Skip to content
Snippets Groups Projects
Commit 390d3099 authored by Nelson Tavares de Sousa's avatar Nelson Tavares de Sousa
Browse files

Merge branch 'runtime-pipe-check' into 'master'

Runtime pipe check

check is executed upon onStarting... it just takes a look at the type of the output port and compares it to the input port
A Test for a false connection can't be done without a DSL or such, as the generics prohibit a false connection in the IDE.

See merge request !78
parents 54b9781d d385091c
No related branches found
No related tags found
No related merge requests found
...@@ -29,7 +29,6 @@ import org.slf4j.LoggerFactory; ...@@ -29,7 +29,6 @@ import org.slf4j.LoggerFactory;
import teetime.framework.exceptionHandling.AbstractExceptionListener; import teetime.framework.exceptionHandling.AbstractExceptionListener;
import teetime.framework.exceptionHandling.AbstractExceptionListener.FurtherExecution; import teetime.framework.exceptionHandling.AbstractExceptionListener.FurtherExecution;
import teetime.framework.exceptionHandling.TerminateException; import teetime.framework.exceptionHandling.TerminateException;
import teetime.framework.pipe.IPipe;
import teetime.framework.signal.ISignal; import teetime.framework.signal.ISignal;
import teetime.framework.signal.StartingSignal; import teetime.framework.signal.StartingSignal;
import teetime.framework.signal.TerminatingSignal; import teetime.framework.signal.TerminatingSignal;
...@@ -279,7 +278,7 @@ public abstract class AbstractStage { ...@@ -279,7 +278,7 @@ public abstract class AbstractStage {
} }
public void onValidating(final List<InvalidPortConnection> invalidPortConnections) { public void onValidating(final List<InvalidPortConnection> invalidPortConnections) {
this.validateOutputPorts(invalidPortConnections); this.checkTypeCompliance(invalidPortConnections);
changeState(StageState.VALIDATED); changeState(StageState.VALIDATED);
} }
...@@ -299,6 +298,27 @@ public abstract class AbstractStage { ...@@ -299,6 +298,27 @@ public abstract class AbstractStage {
calledOnStarting = true; calledOnStarting = true;
} }
/**
* Checks if connections to this pipe are correct in regards to type compliance.
* Incoming elements must be instanceof input port type.
*
* @param invalidPortConnections
* List of invalid connections. Adding invalid connections to this list is a performance advantage in comparison to returning a list by each stage.
*/
private void checkTypeCompliance(final List<InvalidPortConnection> invalidPortConnections) {
for (InputPort<?> port : getInputPorts()) {
Class<?> targetType = port.getType();
Class<?> sourceType = port.pipe.getSourcePort().getType();
if (targetType != null && sourceType != null) {
if (!targetType.isAssignableFrom(sourceType)) { // if targetType is not superclass of sourceType
invalidPortConnections.add(new InvalidPortConnection(port.pipe.getSourcePort(), port));
// throw new IllegalStateException("2002 - Invalid pipe at " + port.toString() + ": " + targetType + " is not a superclass/type of " +
// sourceType);
}
}
}
}
@SuppressWarnings("PMD.SignatureDeclareThrowsException") @SuppressWarnings("PMD.SignatureDeclareThrowsException")
public void onTerminating() throws Exception { public void onTerminating() throws Exception {
changeState(StageState.TERMINATED); changeState(StageState.TERMINATED);
...@@ -429,26 +449,6 @@ public abstract class AbstractStage { ...@@ -429,26 +449,6 @@ public abstract class AbstractStage {
return outputPort; return outputPort;
} }
/**
* This should check, if the OutputPorts are connected correctly. This is needed to avoid NullPointerExceptions and other errors.
*
* @param invalidPortConnections
* <i>(Passed as parameter for performance reasons)</i>
*/
@SuppressWarnings("PMD.DataflowAnomalyAnalysis")
public void validateOutputPorts(final List<InvalidPortConnection> invalidPortConnections) {
for (OutputPort<?> outputPort : outputPorts.getOpenedPorts()) {
final IPipe<?> pipe = outputPort.getPipe();
final Class<?> sourcePortType = outputPort.getType();
final Class<?> targetPortType = pipe.getTargetPort().getType();
if (null == sourcePortType || !sourcePortType.equals(targetPortType)) {
final InvalidPortConnection invalidPortConnection = new InvalidPortConnection(outputPort, pipe.getTargetPort());
invalidPortConnections.add(invalidPortConnection);
}
}
}
protected void terminate() { protected void terminate() {
changeState(StageState.TERMINATING); changeState(StageState.TERMINATING);
} }
......
...@@ -25,6 +25,7 @@ import org.jctools.queues.spec.Preference; ...@@ -25,6 +25,7 @@ import org.jctools.queues.spec.Preference;
import teetime.framework.signal.ISignal; import teetime.framework.signal.ISignal;
import teetime.framework.signal.StartingSignal; import teetime.framework.signal.StartingSignal;
import teetime.framework.signal.ValidatingSignal;
import teetime.util.framework.concurrent.queue.PCBlockingQueue; import teetime.util.framework.concurrent.queue.PCBlockingQueue;
import teetime.util.framework.concurrent.queue.putstrategy.PutStrategy; import teetime.util.framework.concurrent.queue.putstrategy.PutStrategy;
import teetime.util.framework.concurrent.queue.putstrategy.YieldPutStrategy; import teetime.util.framework.concurrent.queue.putstrategy.YieldPutStrategy;
...@@ -67,6 +68,10 @@ public abstract class AbstractSynchedPipe<T> extends AbstractPipe<T> { ...@@ -67,6 +68,10 @@ public abstract class AbstractSynchedPipe<T> extends AbstractPipe<T> {
@Override @Override
public final void waitForStartSignal() throws InterruptedException { public final void waitForStartSignal() throws InterruptedException {
final ISignal signal = signalQueue.take(); final ISignal signal = signalQueue.take();
if (signal instanceof ValidatingSignal) {
this.waitForStartSignal();
return;
}
if (!(signal instanceof StartingSignal)) { if (!(signal instanceof StartingSignal)) {
throw new IllegalStateException( throw new IllegalStateException(
"2001 - Expected StartingSignal, but was " + signal.getClass().getSimpleName() + " in " + getTargetPort().getOwningStage().getId()); "2001 - Expected StartingSignal, but was " + signal.getClass().getSimpleName() + " in " + getTargetPort().getOwningStage().getId());
......
...@@ -56,7 +56,7 @@ public final class Execution<T extends Configuration> { ...@@ -56,7 +56,7 @@ public final class Execution<T extends Configuration> {
* to be used for the analysis * to be used for the analysis
*/ */
public Execution(final T configuration) { public Execution(final T configuration) {
this(configuration, false); this(configuration, true);
} }
/** /**
...@@ -74,10 +74,10 @@ public final class Execution<T extends Configuration> { ...@@ -74,10 +74,10 @@ public final class Execution<T extends Configuration> {
throw new IllegalStateException("3001 - Configuration has already been used."); throw new IllegalStateException("3001 - Configuration has already been used.");
} }
configuration.setInitialized(true); configuration.setInitialized(true);
init();
if (validationEnabled) { if (validationEnabled) {
validateStages(); validateStages();
} }
init();
} }
// BETTER validate concurrently // BETTER validate concurrently
......
...@@ -34,6 +34,7 @@ public class AnalysisNotValidException extends RuntimeException { ...@@ -34,6 +34,7 @@ public class AnalysisNotValidException extends RuntimeException {
@Override @Override
public String getMessage() { public String getMessage() {
final StringBuilder builder = new StringBuilder(this.invalidPortConnections.size() * 40); final StringBuilder builder = new StringBuilder(this.invalidPortConnections.size() * 40);
builder.append("2002 - ");
builder.append(this.invalidPortConnections.size()); builder.append(this.invalidPortConnections.size());
builder.append(" invalid port connections were detected.\n"); builder.append(" invalid port connections were detected.\n");
Joiner.on("\n").appendTo(builder, this.invalidPortConnections); Joiner.on("\n").appendTo(builder, this.invalidPortConnections);
......
...@@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals; ...@@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat; import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
...@@ -28,6 +29,7 @@ import org.junit.Test; ...@@ -28,6 +29,7 @@ import org.junit.Test;
import teetime.framework.signal.StartingSignal; import teetime.framework.signal.StartingSignal;
import teetime.framework.signal.TerminatingSignal; import teetime.framework.signal.TerminatingSignal;
import teetime.framework.validation.AnalysisNotValidException;
import teetime.stage.Cache; import teetime.stage.Cache;
import teetime.stage.Counter; import teetime.stage.Counter;
import teetime.stage.InitialElementProducer; import teetime.stage.InitialElementProducer;
...@@ -98,6 +100,40 @@ public class AbstractStageTest { ...@@ -98,6 +100,40 @@ public class AbstractStageTest {
} }
@Test(expected = AnalysisNotValidException.class)
public void testCheckTypeCompliance() throws Exception {
try {
// Correct connection
new Execution<Configuration>(new TestConnectionsConfig(false), true).executeBlocking();
} catch (AnalysisNotValidException e) {
fail();
}
// Incorrect connection should fail!
new Execution<Configuration>(new TestConnectionsConfig(true), true).executeBlocking();
}
private class TestConnectionsConfig extends Configuration {
@SuppressWarnings({ "unchecked", "rawtypes" })
TestConnectionsConfig(final boolean fails) {
EmptyStage stage = new EmptyStage();
if (fails) {
connectPorts((OutputPort) new EmptyStage().createOutputPort(Object.class), new EmptyStage().createInputPort(Integer.class));
} else {
connectPorts(stage.createOutputPort(Integer.class), new EmptyStage().createInputPort(Object.class));
}
stage.declareActive();
}
}
private class EmptyStage extends AbstractStage {
@Override
protected void execute() {
terminate();
}
}
// //
// //
// Moved from MergerSignalTest // Moved from MergerSignalTest
......
...@@ -24,15 +24,17 @@ import static teetime.framework.test.StageTester.test; ...@@ -24,15 +24,17 @@ import static teetime.framework.test.StageTester.test;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map.Entry;
import org.junit.Before; import org.junit.Before;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.ExpectedException; import org.junit.rules.ExpectedException;
import teetime.framework.ExecutionException;
import teetime.stage.basic.distributor.strategy.BlockingRoundRobinStrategy;
import teetime.stage.basic.distributor.strategy.CloneStrategy; import teetime.stage.basic.distributor.strategy.CloneStrategy;
import teetime.stage.basic.distributor.strategy.CopyByReferenceStrategy; import teetime.stage.basic.distributor.strategy.CopyByReferenceStrategy;
import teetime.stage.basic.distributor.strategy.BlockingRoundRobinStrategy;
import teetime.stage.basic.distributor.strategy.NonBlockingRoundRobinStrategy; import teetime.stage.basic.distributor.strategy.NonBlockingRoundRobinStrategy;
/** /**
...@@ -128,11 +130,21 @@ public class DistributorTest { ...@@ -128,11 +130,21 @@ public class DistributorTest {
@Test @Test
public void cloneForIntegerShouldNotWork() throws Exception { public void cloneForIntegerShouldNotWork() throws Exception {
this.distributor.setStrategy(new CloneStrategy()); this.distributor.setStrategy(new CloneStrategy());
this.distributor.getNewOutputPort();
this.distributor.onStarting();
expectedException.expect(IllegalStateException.class); expectedException.expect(IllegalStateException.class);
this.distributor.execute(1);
try {
test(distributor).and().send(1).to(distributor.getInputPort()).and().receive(firstIntegers).from(distributor.getNewOutputPort()).and()
.start();
} catch (ExecutionException e) {
// TODO Auto-generated catch block
for (Entry<Thread, List<Exception>> entry : e.getThrownExceptions().entrySet()) {
for (Exception value : entry.getValue()) {
throw value;
}
}
}
} }
@Test @Test
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment