Skip to content
Snippets Groups Projects
Commit a00cabaa authored by Christian Wulf's avatar Christian Wulf
Browse files

implemented unfinished generic signal concept;

implemented example signals for starting, validating, and terminating
parent 6bca3bd2
No related branches found
No related tags found
No related merge requests found
Showing
with 163 additions and 99 deletions
package teetime.variant.methodcallWithPorts.framework.core;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.UUID;
......@@ -10,6 +9,8 @@ import org.slf4j.LoggerFactory;
import teetime.variant.methodcallWithPorts.framework.core.pipe.DummyPipe;
import teetime.variant.methodcallWithPorts.framework.core.pipe.IPipe;
import teetime.variant.methodcallWithPorts.framework.core.signal.Signal;
import teetime.variant.methodcallWithPorts.framework.core.validation.InvalidPortConnection;
public abstract class AbstractStage implements StageWithPort {
......@@ -56,14 +57,6 @@ public abstract class AbstractStage implements StageWithPort {
return true;
}
@Override
public void onStart() {
this.cachedInputPorts = this.inputPortList.toArray(new InputPort<?>[0]);
this.cachedOutputPorts = this.outputPortList.toArray(new OutputPort<?>[0]);
this.connectUnconnectedOutputPorts();
}
@SuppressWarnings("unchecked")
private void connectUnconnectedOutputPorts() {
for (OutputPort<?> outputPort : this.cachedOutputPorts) {
......@@ -74,11 +67,6 @@ public abstract class AbstractStage implements StageWithPort {
}
}
protected void onFinished() {
// empty default implementation
this.onIsPipelineHead();
}
protected InputPort<?>[] getInputPorts() {
return this.cachedInputPorts;
}
......@@ -116,22 +104,31 @@ public abstract class AbstractStage implements StageWithPort {
*/
@Override
public void onSignal(final Signal signal, final InputPort<?> inputPort) {
this.logger.debug("Got signal: " + signal + " from input port: " + inputPort);
switch (signal) {
case FINISHED:
this.onFinished();
break;
default:
this.logger.warn("Aborted sending signal " + signal + ". Reason: Unknown signal.");
break;
}
this.logger.trace("Got signal: " + signal + " from input port: " + inputPort);
signal.trigger(this);
for (OutputPort<?> outputPort : this.outputPortList) {
outputPort.sendSignal(signal);
}
}
public void onValidating(final List<InvalidPortConnection> invalidPortConnections) {
this.validateOutputPorts(invalidPortConnections);
}
public void onStarting() {
this.cachedInputPorts = this.inputPortList.toArray(new InputPort<?>[0]);
this.cachedOutputPorts = this.outputPortList.toArray(new OutputPort<?>[0]);
this.connectUnconnectedOutputPorts();
}
public void onTerminating() {
// empty default implementation
this.onIsPipelineHead();
}
protected <T> InputPort<T> createInputPort() {
InputPort<T> inputPort = new InputPort<T>(this);
// inputPort.setType(type); // TODO set type for input port
......@@ -146,9 +143,8 @@ public abstract class AbstractStage implements StageWithPort {
return outputPort;
}
public List<InvalidPortConnection> validateOutputPorts() {
List<InvalidPortConnection> invalidOutputPortMessages = new LinkedList<InvalidPortConnection>();
@Override
public void validateOutputPorts(final List<InvalidPortConnection> invalidPortConnections) {
for (OutputPort<?> outputPort : this.getOutputPorts()) {
IPipe<?> pipe = outputPort.getPipe();
if (null != pipe) { // if output port is connected with another one
......@@ -156,12 +152,10 @@ public abstract class AbstractStage implements StageWithPort {
Class<?> targetPortType = pipe.getTargetPort().getType();
if (null == sourcePortType || !sourcePortType.equals(targetPortType)) {
InvalidPortConnection invalidPortConnection = new InvalidPortConnection(outputPort, pipe.getTargetPort());
invalidOutputPortMessages.add(invalidPortConnection);
invalidPortConnections.add(invalidPortConnection);
}
}
}
return invalidOutputPortMessages;
}
@Override
......
package teetime.variant.methodcallWithPorts.framework.core;
import teetime.variant.methodcallWithPorts.framework.core.signal.Signal;
public class OutputPort<T> extends AbstractPort<T> {
/**
......
......@@ -5,13 +5,17 @@ import java.util.LinkedList;
import java.util.List;
import java.util.UUID;
import teetime.variant.methodcallWithPorts.framework.core.signal.Signal;
import teetime.variant.methodcallWithPorts.framework.core.signal.StartingSignal;
import teetime.variant.methodcallWithPorts.framework.core.validation.InvalidPortConnection;
import kieker.common.logging.Log;
import kieker.common.logging.LogFactory;
/**
*
*
* @author Christian Wulf
*
*
* @param <FirstStage>
* @param <LastStage>
*/
......@@ -28,13 +32,7 @@ public class Pipeline<FirstStage extends StageWithPort, LastStage extends StageW
private final List<StageWithPort> intermediateStages = new LinkedList<StageWithPort>();
private LastStage lastStage;
// BETTER remove the stage array and use the output ports instead for passing a signal to all stages in the same thread; what about multiple same signals due to
// multiple input ports?
private StageWithPort[] stages;
private StageWithPort parentStage;
// private int startIndex;
private int firstStageIndex;
// private final Set<StageWithPort<?, ?>> currentHeads = new HashSet<StageWithPort<?, ?>>();
......@@ -70,7 +68,7 @@ public class Pipeline<FirstStage extends StageWithPort, LastStage extends StageW
@Override
public void executeWithPorts() {
StageWithPort headStage = this.stages[this.firstStageIndex];
StageWithPort headStage = this.firstStage;
// do {
headStage.executeWithPorts();
......@@ -106,16 +104,16 @@ public class Pipeline<FirstStage extends StageWithPort, LastStage extends StageW
// do nothing
}
@Override
public void onStart() {
@Deprecated
public void onStarting() {
int size = 1 + this.intermediateStages.size() + 1;
this.stages = new StageWithPort[size];
this.stages[0] = this.firstStage;
StageWithPort[] stages = new StageWithPort[size];
stages[0] = this.firstStage;
for (int i = 0; i < this.intermediateStages.size(); i++) {
StageWithPort stage = this.intermediateStages.get(i);
this.stages[1 + i] = stage;
stages[1 + i] = stage;
}
this.stages[this.stages.length - 1] = this.lastStage;
stages[stages.length - 1] = this.lastStage;
// for (int i = 0; i < this.stages.length; i++) {
// StageWithPort<?, ?> stage = this.stages[i];
......@@ -129,8 +127,8 @@ public class Pipeline<FirstStage extends StageWithPort, LastStage extends StageW
// }
// this.stages[this.stages.length - 1].setSuccessor(new EndStage<Object>());
for (StageWithPort stage : this.stages) {
stage.onStart();
for (StageWithPort stage : stages) {
stage.onSignal(new StartingSignal(), null);
}
}
......@@ -146,14 +144,9 @@ public class Pipeline<FirstStage extends StageWithPort, LastStage extends StageW
@Override
public boolean isReschedulable() {
// return this.reschedulable;
return this.firstStage.isReschedulable();
}
// public void setReschedulable(final boolean reschedulable) {
// this.reschedulable = reschedulable;
// }
@Override
public void onSignal(final Signal signal, final InputPort<?> inputPort) {
this.firstStage.onSignal(signal, inputPort);
......@@ -167,4 +160,9 @@ public class Pipeline<FirstStage extends StageWithPort, LastStage extends StageW
return this.lastStage;
}
@Override
public void validateOutputPorts(final List<InvalidPortConnection> invalidPortConnections) {
// do nothing
}
}
......@@ -3,10 +3,15 @@ package teetime.variant.methodcallWithPorts.framework.core;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import teetime.variant.methodcallWithPorts.framework.core.signal.StartingSignal;
import teetime.variant.methodcallWithPorts.framework.core.signal.TerminatingSignal;
import teetime.variant.methodcallWithPorts.framework.core.signal.ValidatingSignal;
public class RunnableStage implements Runnable {
private final StageWithPort stage;
private final Logger logger;
private boolean validationEnabled;
public RunnableStage(final StageWithPort stage) {
this.stage = stage;
......@@ -17,14 +22,25 @@ public class RunnableStage implements Runnable {
public void run() {
this.logger.debug("Executing runnable stage...");
if (this.validationEnabled) {
ValidatingSignal validatingSignal = new ValidatingSignal();
this.stage.onSignal(validatingSignal, null);
if (validatingSignal.getInvalidPortConnections().size() > 0) {
// throw new RuntimeException(message);
// TODO implement what to do on validation messages
}
}
try {
this.stage.onStart();
StartingSignal startingSignal = new StartingSignal();
this.stage.onSignal(startingSignal, null);
do {
this.stage.executeWithPorts();
} while (this.stage.isReschedulable());
this.stage.onSignal(Signal.FINISHED, null);
TerminatingSignal terminatingSignal = new TerminatingSignal();
this.stage.onSignal(terminatingSignal, null);
} catch (RuntimeException e) {
this.logger.error("Terminating thread due to the following exception: ", e);
......@@ -33,4 +49,12 @@ public class RunnableStage implements Runnable {
this.logger.debug("Finished runnable stage. (" + this.stage.getId() + ")");
}
public boolean isValidationEnabled() {
return this.validationEnabled;
}
public void setValidationEnabled(final boolean validationEnabled) {
this.validationEnabled = validationEnabled;
}
}
package teetime.variant.methodcallWithPorts.framework.core;
public enum Signal {
FINISHED
}
package teetime.variant.methodcallWithPorts.framework.core;
import java.util.List;
import teetime.variant.methodcallWithPorts.framework.core.signal.Signal;
import teetime.variant.methodcallWithPorts.framework.core.validation.InvalidPortConnection;
public interface StageWithPort {
String getId();
......@@ -20,7 +25,12 @@ public interface StageWithPort {
void onIsPipelineHead();
void onStart();
void onSignal(Signal signal, InputPort<?> inputPort);
/**
*
* @param invalidPortConnections
* <i>(Passed as parameter for performance reasons)</i>
*/
void validateOutputPorts(List<InvalidPortConnection> invalidPortConnections);
}
......@@ -2,7 +2,7 @@ package teetime.variant.methodcallWithPorts.framework.core.pipe;
import teetime.variant.methodcallWithPorts.framework.core.InputPort;
import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
import teetime.variant.methodcallWithPorts.framework.core.Signal;
import teetime.variant.methodcallWithPorts.framework.core.signal.Signal;
/**
* A pipe implementation used to connect unconnected output ports.
......
......@@ -2,7 +2,7 @@ package teetime.variant.methodcallWithPorts.framework.core.pipe;
import teetime.variant.methodcallWithPorts.framework.core.InputPort;
import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
import teetime.variant.methodcallWithPorts.framework.core.Signal;
import teetime.variant.methodcallWithPorts.framework.core.signal.Signal;
public interface IPipe<T> {
......
package teetime.variant.methodcallWithPorts.framework.core.pipe;
import teetime.variant.methodcallWithPorts.framework.core.Signal;
import teetime.variant.methodcallWithPorts.framework.core.signal.Signal;
public abstract class IntraThreadPipe<T> extends AbstractPipe<T> {
......
......@@ -10,7 +10,7 @@ import org.jctools.queues.spec.Preference;
import teetime.variant.methodcallWithPorts.framework.core.InputPort;
import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
import teetime.variant.methodcallWithPorts.framework.core.Signal;
import teetime.variant.methodcallWithPorts.framework.core.signal.Signal;
public class SpScPipe<T> extends AbstractPipe<T> {
......
package teetime.variant.methodcallWithPorts.framework.core.signal;
import teetime.variant.methodcallWithPorts.framework.core.AbstractStage;
public interface Signal {
void trigger(AbstractStage stage);
}
package teetime.variant.methodcallWithPorts.framework.core.signal;
import teetime.variant.methodcallWithPorts.framework.core.AbstractStage;
public class StartingSignal implements Signal {
@Override
public void trigger(final AbstractStage stage) {
stage.onStarting();
}
}
package teetime.variant.methodcallWithPorts.framework.core.signal;
import teetime.variant.methodcallWithPorts.framework.core.AbstractStage;
public class TerminatingSignal implements Signal {
@Override
public void trigger(final AbstractStage stage) {
stage.onTerminating();
}
}
package teetime.variant.methodcallWithPorts.framework.core.signal;
import java.util.LinkedList;
import java.util.List;
import teetime.variant.methodcallWithPorts.framework.core.AbstractStage;
import teetime.variant.methodcallWithPorts.framework.core.validation.InvalidPortConnection;
public class ValidatingSignal implements Signal {
private final List<InvalidPortConnection> invalidPortConnections = new LinkedList<InvalidPortConnection>();
@Override
public void trigger(final AbstractStage stage) {
stage.onValidating(this.invalidPortConnections);
}
public List<InvalidPortConnection> getInvalidPortConnections() {
return invalidPortConnections;
}
}
package teetime.variant.methodcallWithPorts.framework.core;
package teetime.variant.methodcallWithPorts.framework.core.validation;
import teetime.variant.methodcallWithPorts.framework.core.InputPort;
import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
public class InvalidPortConnection {
......
......@@ -29,9 +29,9 @@ public class ElementDelayMeasuringStage<T> extends ConsumerStage<T> {
}
@Override
public void onStart() {
public void onStarting() {
this.resetTimestamp(System.nanoTime());
super.onStart();
super.onStarting();
}
private void computeElementDelay(final Long timestampInNs) {
......
......@@ -30,9 +30,9 @@ public class ElementThroughputMeasuringStage<T> extends ConsumerStage<T> {
}
@Override
public void onStart() {
public void onStarting() {
this.resetTimestamp(System.nanoTime());
super.onStart();
super.onStarting();
}
private void computeElementThroughput(final Long timestampInNs) {
......
......@@ -2,8 +2,8 @@ package teetime.variant.methodcallWithPorts.stage;
import teetime.variant.methodcallWithPorts.framework.core.InputPort;
import teetime.variant.methodcallWithPorts.framework.core.ProducerStage;
import teetime.variant.methodcallWithPorts.framework.core.Signal;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
import teetime.variant.methodcallWithPorts.framework.core.signal.TerminatingSignal;
public class Relay<T> extends ProducerStage<T> {
......@@ -15,8 +15,7 @@ public class Relay<T> extends ProducerStage<T> {
public void execute() {
T element = this.inputPort.receive();
if (null == element) {
// if (this.getInputPort().getPipe().isClosed()) {
if (this.cachedCastedInputPipe.getSignal() == Signal.FINISHED) {
if (this.cachedCastedInputPipe.getSignal() instanceof TerminatingSignal) {
this.setReschedulable(false);
assert 0 == this.inputPort.getPipe().size();
}
......@@ -27,16 +26,9 @@ public class Relay<T> extends ProducerStage<T> {
}
@Override
public void onStart() {
public void onStarting() {
this.cachedCastedInputPipe = (SpScPipe<T>) this.inputPort.getPipe();
super.onStart();
}
@Override
public void onIsPipelineHead() {
// if (this.getInputPort().getPipe().isClosed()) {
// this.setReschedulable(false);
// }
super.onStarting();
}
public InputPort<T> getInputPort() {
......
......@@ -19,16 +19,16 @@ package teetime.variant.methodcallWithPorts.stage.basic.merger;
import teetime.variant.methodcallWithPorts.framework.core.AbstractStage;
import teetime.variant.methodcallWithPorts.framework.core.InputPort;
import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
import teetime.variant.methodcallWithPorts.framework.core.Signal;
import teetime.variant.methodcallWithPorts.framework.core.signal.Signal;
/**
*
*
* This stage merges data from the input ports, by taking elements according to the chosen merge strategy and by putting them to the output port.
*
*
* @author Christian Wulf
*
*
* @since 1.10
*
*
* @param <T>
* the type of the input ports and the output port
*/
......@@ -61,16 +61,9 @@ public class Merger<T> extends AbstractStage {
@Override
public void onSignal(final Signal signal, final InputPort<?> inputPort) {
this.logger.debug("Got signal: " + signal + " from input port: " + inputPort);
switch (signal) {
case FINISHED:
this.onFinished();
break;
default:
this.logger.warn("Aborted sending signal " + signal + ". Reason: Unknown signal.");
break;
}
this.logger.trace("Got signal: " + signal + " from input port: " + inputPort);
signal.trigger(this);
if (this.finishedInputPorts == this.getInputPorts().length) {
this.outputPort.sendSignal(signal);
......
......@@ -49,7 +49,7 @@ public class DbReader extends ProducerStage<IMonitoringRecord> {
private volatile boolean running = true;
@Override
public void onStart() {
public void onStarting() {
try {
Class.forName(this.driverClassname).newInstance();
} catch (final Exception ex) { // NOPMD NOCS (IllegalCatchCheck)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment