Skip to content
Snippets Groups Projects
Commit f52ab4e6 authored by Christian Wulf's avatar Christian Wulf
Browse files

added port type validation concept

parent 0bdca0ef
No related branches found
No related tags found
No related merge requests found
Showing
with 146 additions and 10 deletions
...@@ -18,7 +18,7 @@ public abstract class AbstractStage implements StageWithPort { ...@@ -18,7 +18,7 @@ public abstract class AbstractStage implements StageWithPort {
/** /**
* A unique logger instance per stage instance * A unique logger instance per stage instance
*/ */
protected final Logger logger; // BETTER use SLF4J as interface and logback as impl protected final Logger logger;
private StageWithPort parentStage; private StageWithPort parentStage;
...@@ -131,14 +131,14 @@ public abstract class AbstractStage implements StageWithPort { ...@@ -131,14 +131,14 @@ public abstract class AbstractStage implements StageWithPort {
protected <T> InputPort<T> createInputPort() { protected <T> InputPort<T> createInputPort() {
InputPort<T> inputPort = new InputPort<T>(this); InputPort<T> inputPort = new InputPort<T>(this);
// inputPort.setType(type); // TODO set type for input port // inputPort.setType(portType);
this.inputPortList.add(inputPort); this.inputPortList.add(inputPort);
return inputPort; return inputPort;
} }
protected <T> OutputPort<T> createOutputPort() { protected <T> OutputPort<T> createOutputPort() {
OutputPort<T> outputPort = new OutputPort<T>(); OutputPort<T> outputPort = new OutputPort<T>();
// outputPort.setType(type); // TODO set type for output port // outputPort.setType(portType);
this.outputPortList.add(outputPort); this.outputPortList.add(outputPort);
return outputPort; return outputPort;
} }
......
...@@ -10,7 +10,7 @@ public abstract class ConsumerStage<I> extends AbstractStage { ...@@ -10,7 +10,7 @@ public abstract class ConsumerStage<I> extends AbstractStage {
@Override @Override
public void executeWithPorts() { public void executeWithPorts() {
I element = this.inputPort.receive(); I element = this.getInputPort().receive();
boolean isReschedulable = this.determineReschedulability(); boolean isReschedulable = this.determineReschedulability();
this.setReschedulable(isReschedulable); this.setReschedulable(isReschedulable);
...@@ -24,7 +24,7 @@ public abstract class ConsumerStage<I> extends AbstractStage { ...@@ -24,7 +24,7 @@ public abstract class ConsumerStage<I> extends AbstractStage {
} }
protected boolean determineReschedulability() { protected boolean determineReschedulability() {
return this.inputPort.getPipe().size() > 0; return this.getInputPort().getPipe().size() > 0;
} }
protected abstract void execute(I element); protected abstract void execute(I element);
......
...@@ -23,6 +23,7 @@ public interface StageWithPort { ...@@ -23,6 +23,7 @@ public interface StageWithPort {
*/ */
boolean isReschedulable(); boolean isReschedulable();
// BETTER remove this method since it will be replaced by onTerminating()
void onIsPipelineHead(); void onIsPipelineHead();
void onSignal(Signal signal, InputPort<?> inputPort); void onSignal(Signal signal, InputPort<?> inputPort);
......
...@@ -26,6 +26,7 @@ public interface IPipe<T> { ...@@ -26,6 +26,7 @@ public interface IPipe<T> {
void setSignal(Signal signal); void setSignal(Signal signal);
// BETTER change signature to allow {OutputPort<T>, OutputPort<A0 extends T>, OutputPort<A1 extends T>, ...}
void connectPorts(OutputPort<T> sourcePort, InputPort<T> targetPort); void connectPorts(OutputPort<T> sourcePort, InputPort<T> targetPort);
} }
...@@ -21,11 +21,17 @@ import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage; ...@@ -21,11 +21,17 @@ import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage;
/** /**
* @author Christian Wulf * @author Christian Wulf
* *
* @since 1.10 * @since 1.10
*/ */
public class CollectorSink<T> extends ConsumerStage<T> { public class CollectorSink<T> extends ConsumerStage<T> {
// private final InputPort<T> inputPort = this.createInputPort();
//
// public final InputPort<T> getInputPort() {
// return this.inputPort;
// }
private final List<T> elements; private final List<T> elements;
private final int threshold; private final int threshold;
......
package teetime.variant.methodcallWithPorts.stage;
import teetime.variant.explicitScheduling.examples.throughput.TimestampObject;
public class PortTypeConfiguration {
public static <T> void setPortTypes(final ObjectProducer<T> stage, final Class<T> clazz) {
stage.getOutputPort().setType(clazz);
}
public static <T> void setPortTypes(final CollectorSink<T> stage, final Class<T> clazz) {
stage.getInputPort().setType(clazz);
}
public static <T> void setPortTypes(final StartTimestampFilter stage) {
stage.getInputPort().setType(TimestampObject.class);
stage.getOutputPort().setType(TimestampObject.class);
}
}
...@@ -21,7 +21,7 @@ import teetime.variant.methodcallWithPorts.framework.core.OutputPort; ...@@ -21,7 +21,7 @@ import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
/** /**
* @author Christian Wulf * @author Christian Wulf
* *
* @since 1.10 * @since 1.10
*/ */
public class StartTimestampFilter extends ConsumerStage<TimestampObject> { public class StartTimestampFilter extends ConsumerStage<TimestampObject> {
...@@ -35,6 +35,6 @@ public class StartTimestampFilter extends ConsumerStage<TimestampObject> { ...@@ -35,6 +35,6 @@ public class StartTimestampFilter extends ConsumerStage<TimestampObject> {
} }
public OutputPort<TimestampObject> getOutputPort() { public OutputPort<TimestampObject> getOutputPort() {
return outputPort; return this.outputPort;
} }
} }
...@@ -21,7 +21,7 @@ import teetime.variant.methodcallWithPorts.framework.core.OutputPort; ...@@ -21,7 +21,7 @@ import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
/** /**
* @author Christian Wulf * @author Christian Wulf
* *
* @since 1.10 * @since 1.10
*/ */
public class StopTimestampFilter extends ConsumerStage<TimestampObject> { public class StopTimestampFilter extends ConsumerStage<TimestampObject> {
......
package teetime.variant.methodcallWithPorts.runtime.typeCheck;
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.TypeVariable;
import org.junit.Test;
import teetime.util.ConstructorClosure;
import teetime.variant.explicitScheduling.examples.throughput.TimestampObject;
import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
import teetime.variant.methodcallWithPorts.framework.core.pipe.IPipe;
import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory;
import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.ThreadCommunication;
import teetime.variant.methodcallWithPorts.stage.ObjectProducer;
import teetime.variant.methodcallWithPorts.stage.PortTypeConfiguration;
import teetime.variant.methodcallWithPorts.stage.StartTimestampFilter;
import teetime.variant.methodcallWithPorts.stage.StopTimestampFilter;
import teetime.variant.methodcallWithPorts.stage.basic.Sink;
public class ConnectionTypeTest {
@SuppressWarnings({ "unchecked", "rawtypes" })
@Test
public void testDynamicPortConnection() throws NoSuchMethodException, SecurityException, InstantiationException, IllegalAccessException,
IllegalArgumentException, InvocationTargetException, ClassNotFoundException {
ConstructorClosure<TimestampObject> constructorClosure = new ConstructorClosure<TimestampObject>() {
@Override
public TimestampObject create() {
return new TimestampObject();
}
};
Constructor<ObjectProducer> constructor = ObjectProducer.class.getConstructor(long.class, ConstructorClosure.class);
ObjectProducer objectProducer = constructor.newInstance(1, constructorClosure);
StartTimestampFilter startTimestampFilter = StartTimestampFilter.class.newInstance();
StopTimestampFilter stopTimestampFilter = StopTimestampFilter.class.newInstance();
Sink sink = Sink.class.newInstance();
PipeFactory pipeFactory = new PipeFactory();
IPipe pipe = pipeFactory.create(ThreadCommunication.INTRA);
pipe.connectPorts(objectProducer.getOutputPort(), startTimestampFilter.getInputPort());
pipe = pipeFactory.create(ThreadCommunication.INTRA);
pipe.connectPorts(startTimestampFilter.getOutputPort(), stopTimestampFilter.getInputPort());
pipe = pipeFactory.create(ThreadCommunication.INTRA);
pipe.connectPorts(stopTimestampFilter.getOutputPort(), sink.getInputPort());
/*
* requirements:
* <ul>
* <li>when selecting a stage class to create one instance, the user is prompted to declare each type argument
* <li>
* </ul>
*/
TypeVariable<Class<ObjectProducer>>[] objectProducerTypeParameters = ObjectProducer.class.getTypeParameters();
for (TypeVariable<Class<ObjectProducer>> typeVariable : objectProducerTypeParameters) {
System.out.println(typeVariable.getBounds()); // ->[Ljava.lang.reflect.Type;@13a65d1f
System.out.println(typeVariable.getBounds().length); // ->1
System.out.println(typeVariable.getBounds()[0]); // ->class java.lang.Object
System.out.println(typeVariable.getName()); // ->T
System.out.println(typeVariable.getClass()); // ->class sun.reflect.generics.reflectiveObjects.TypeVariableImpl
System.out.println(typeVariable.getGenericDeclaration()); // ->class teetime.variant.methodcallWithPorts.stage.ObjectProducer
}
// TypeVariable<?>[] objectProducerOutputPortTypeParameters = objectProducer.getOutputPort().getClass().getTypeParameters();
// for (TypeVariable<?> typeVariable : objectProducerOutputPortTypeParameters) {
// System.out.println(typeVariable.getBounds()); // ->[Ljava.lang.reflect.Type;@20a12d8f
// System.out.println(typeVariable.getBounds().length); // ->1
// System.out.println(typeVariable.getBounds()[0]); // ->class java.lang.Object
// System.out.println(typeVariable.getName()); // ->T
// System.out.println(typeVariable.getClass()); // ->class sun.reflect.generics.reflectiveObjects.TypeVariableImpl
// System.out.println(typeVariable.getGenericDeclaration()); // ->class teetime.variant.methodcallWithPorts.framework.core.OutputPort
// }
//
// TypeVariable<?>[] startTimestampFilterOutputPortTypeParameters = startTimestampFilter.getOutputPort().getClass().getTypeParameters();
// for (TypeVariable<?> typeVariable : startTimestampFilterOutputPortTypeParameters) {
// System.out.println(typeVariable.getBounds()); // ->[Ljava.lang.reflect.Type;@7b365f02
// System.out.println(typeVariable.getBounds().length); // ->1
// System.out.println(typeVariable.getBounds()[0]); // ->class java.lang.Object
// System.out.println(typeVariable.getName()); // ->T
// System.out.println(typeVariable.getClass()); // ->class sun.reflect.generics.reflectiveObjects.TypeVariableImpl
// System.out.println(typeVariable.getGenericDeclaration()); // ->class teetime.variant.methodcallWithPorts.framework.core.OutputPort
// }
Class<?> currentClass = objectProducer.getClass();
while (currentClass.getSuperclass() != null) { // we don't want to process Object.class
Field[] fields = currentClass.getDeclaredFields();
for (Field field : fields) {
// System.out.println("Field: " + field.getType());
if (OutputPort.class.equals(field.getType())) {
System.out.println("Field.name: " + field.getName());
System.out.println("Field.type: " + field.getType());
// field.getType()
}
}
currentClass = currentClass.getSuperclass();
}
System.out.println(objectProducer.getOutputPort().getType());
PortTypeConfiguration.setPortTypes(objectProducer, Class.forName("teetime.variant.explicitScheduling.examples.throughput.TimestampObject"));
System.out.println(objectProducer.getOutputPort().getType());
}
}
Subproject commit 88e1e25f9519b250258c7e5ada30935975ab2d10 Subproject commit 75998aa20b7ec897ec321c1f94192de888f2dc6e
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment