From 0bdca0ef22de889eb339deca8019d5e41d67890c Mon Sep 17 00:00:00 2001 From: Christian Wulf <chw@informatik.uni-kiel.de> Date: Mon, 25 Aug 2014 23:20:26 +0200 Subject: [PATCH] implemented unfinished generic signal concept; implemented example signals for starting, validating, and terminating --- .../framework/core/AbstractStage.java | 54 +++++++++---------- .../framework/core/OutputPort.java | 2 + .../framework/core/Pipeline.java | 42 +++++++-------- .../framework/core/RunnableStage.java | 28 +++++++++- .../framework/core/Signal.java | 6 --- .../framework/core/StageWithPort.java | 14 ++++- .../framework/core/pipe/DummyPipe.java | 2 +- .../framework/core/pipe/IPipe.java | 2 +- .../framework/core/pipe/IntraThreadPipe.java | 2 +- .../framework/core/pipe/SpScPipe.java | 2 +- .../framework/core/signal/Signal.java | 8 +++ .../framework/core/signal/StartingSignal.java | 12 +++++ .../core/signal/TerminatingSignal.java | 12 +++++ .../core/signal/ValidatingSignal.java | 22 ++++++++ .../InvalidPortConnection.java | 5 +- .../stage/ElementDelayMeasuringStage.java | 4 +- .../ElementThroughputMeasuringStage.java | 4 +- .../methodcallWithPorts/stage/Relay.java | 16 ++---- .../stage/basic/merger/Merger.java | 23 +++----- .../stage/io/DbReader.java | 2 +- .../methodcallWithPorts/stage/io/Printer.java | 9 ++-- .../stage/io/TCPReader.java | 4 +- .../stage/kieker/TCPReaderSink.java | 4 +- .../fileToRecord/BinaryFile2RecordFilter.java | 4 +- .../stringBuffer/StringBufferFilter.java | 4 +- .../MethodCallThroughputAnalysis17.java | 6 +-- submodules/JCTools | 2 +- 27 files changed, 180 insertions(+), 115 deletions(-) delete mode 100644 src/main/java/teetime/variant/methodcallWithPorts/framework/core/Signal.java create mode 100644 src/main/java/teetime/variant/methodcallWithPorts/framework/core/signal/Signal.java create mode 100644 src/main/java/teetime/variant/methodcallWithPorts/framework/core/signal/StartingSignal.java create mode 100644 src/main/java/teetime/variant/methodcallWithPorts/framework/core/signal/TerminatingSignal.java create mode 100644 src/main/java/teetime/variant/methodcallWithPorts/framework/core/signal/ValidatingSignal.java rename src/main/java/teetime/variant/methodcallWithPorts/framework/core/{ => validation}/InvalidPortConnection.java (66%) diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java index 2fe100e5..ad33e815 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java @@ -1,7 +1,6 @@ package teetime.variant.methodcallWithPorts.framework.core; import java.util.ArrayList; -import java.util.LinkedList; import java.util.List; import java.util.UUID; @@ -10,6 +9,8 @@ import org.slf4j.LoggerFactory; import teetime.variant.methodcallWithPorts.framework.core.pipe.DummyPipe; import teetime.variant.methodcallWithPorts.framework.core.pipe.IPipe; +import teetime.variant.methodcallWithPorts.framework.core.signal.Signal; +import teetime.variant.methodcallWithPorts.framework.core.validation.InvalidPortConnection; public abstract class AbstractStage implements StageWithPort { @@ -56,14 +57,6 @@ public abstract class AbstractStage implements StageWithPort { return true; } - @Override - public void onStart() { - this.cachedInputPorts = this.inputPortList.toArray(new InputPort<?>[0]); - this.cachedOutputPorts = this.outputPortList.toArray(new OutputPort<?>[0]); - - this.connectUnconnectedOutputPorts(); - } - @SuppressWarnings("unchecked") private void connectUnconnectedOutputPorts() { for (OutputPort<?> outputPort : this.cachedOutputPorts) { @@ -74,11 +67,6 @@ public abstract class AbstractStage implements StageWithPort { } } - protected void onFinished() { - // empty default implementation - this.onIsPipelineHead(); - } - protected InputPort<?>[] getInputPorts() { return this.cachedInputPorts; } @@ -116,22 +104,31 @@ public abstract class AbstractStage implements StageWithPort { */ @Override public void onSignal(final Signal signal, final InputPort<?> inputPort) { - this.logger.debug("Got signal: " + signal + " from input port: " + inputPort); - - switch (signal) { - case FINISHED: - this.onFinished(); - break; - default: - this.logger.warn("Aborted sending signal " + signal + ". Reason: Unknown signal."); - break; - } + this.logger.trace("Got signal: " + signal + " from input port: " + inputPort); + + signal.trigger(this); for (OutputPort<?> outputPort : this.outputPortList) { outputPort.sendSignal(signal); } } + public void onValidating(final List<InvalidPortConnection> invalidPortConnections) { + this.validateOutputPorts(invalidPortConnections); + } + + public void onStarting() { + this.cachedInputPorts = this.inputPortList.toArray(new InputPort<?>[0]); + this.cachedOutputPorts = this.outputPortList.toArray(new OutputPort<?>[0]); + + this.connectUnconnectedOutputPorts(); + } + + public void onTerminating() { + // empty default implementation + this.onIsPipelineHead(); + } + protected <T> InputPort<T> createInputPort() { InputPort<T> inputPort = new InputPort<T>(this); // inputPort.setType(type); // TODO set type for input port @@ -146,9 +143,8 @@ public abstract class AbstractStage implements StageWithPort { return outputPort; } - public List<InvalidPortConnection> validateOutputPorts() { - List<InvalidPortConnection> invalidOutputPortMessages = new LinkedList<InvalidPortConnection>(); - + @Override + public void validateOutputPorts(final List<InvalidPortConnection> invalidPortConnections) { for (OutputPort<?> outputPort : this.getOutputPorts()) { IPipe<?> pipe = outputPort.getPipe(); if (null != pipe) { // if output port is connected with another one @@ -156,12 +152,10 @@ public abstract class AbstractStage implements StageWithPort { Class<?> targetPortType = pipe.getTargetPort().getType(); if (null == sourcePortType || !sourcePortType.equals(targetPortType)) { InvalidPortConnection invalidPortConnection = new InvalidPortConnection(outputPort, pipe.getTargetPort()); - invalidOutputPortMessages.add(invalidPortConnection); + invalidPortConnections.add(invalidPortConnection); } } } - - return invalidOutputPortMessages; } @Override diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/OutputPort.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/OutputPort.java index b29c1246..97d66d05 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/OutputPort.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/OutputPort.java @@ -1,5 +1,7 @@ package teetime.variant.methodcallWithPorts.framework.core; +import teetime.variant.methodcallWithPorts.framework.core.signal.Signal; + public class OutputPort<T> extends AbstractPort<T> { /** diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Pipeline.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Pipeline.java index f20f891e..018f39e0 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Pipeline.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Pipeline.java @@ -5,13 +5,17 @@ import java.util.LinkedList; import java.util.List; import java.util.UUID; +import teetime.variant.methodcallWithPorts.framework.core.signal.Signal; +import teetime.variant.methodcallWithPorts.framework.core.signal.StartingSignal; +import teetime.variant.methodcallWithPorts.framework.core.validation.InvalidPortConnection; + import kieker.common.logging.Log; import kieker.common.logging.LogFactory; /** - * + * * @author Christian Wulf - * + * * @param <FirstStage> * @param <LastStage> */ @@ -28,13 +32,7 @@ public class Pipeline<FirstStage extends StageWithPort, LastStage extends StageW private final List<StageWithPort> intermediateStages = new LinkedList<StageWithPort>(); private LastStage lastStage; - // BETTER remove the stage array and use the output ports instead for passing a signal to all stages in the same thread; what about multiple same signals due to - // multiple input ports? - private StageWithPort[] stages; private StageWithPort parentStage; - // private int startIndex; - - private int firstStageIndex; // private final Set<StageWithPort<?, ?>> currentHeads = new HashSet<StageWithPort<?, ?>>(); @@ -70,7 +68,7 @@ public class Pipeline<FirstStage extends StageWithPort, LastStage extends StageW @Override public void executeWithPorts() { - StageWithPort headStage = this.stages[this.firstStageIndex]; + StageWithPort headStage = this.firstStage; // do { headStage.executeWithPorts(); @@ -106,16 +104,16 @@ public class Pipeline<FirstStage extends StageWithPort, LastStage extends StageW // do nothing } - @Override - public void onStart() { + @Deprecated + public void onStarting() { int size = 1 + this.intermediateStages.size() + 1; - this.stages = new StageWithPort[size]; - this.stages[0] = this.firstStage; + StageWithPort[] stages = new StageWithPort[size]; + stages[0] = this.firstStage; for (int i = 0; i < this.intermediateStages.size(); i++) { StageWithPort stage = this.intermediateStages.get(i); - this.stages[1 + i] = stage; + stages[1 + i] = stage; } - this.stages[this.stages.length - 1] = this.lastStage; + stages[stages.length - 1] = this.lastStage; // for (int i = 0; i < this.stages.length; i++) { // StageWithPort<?, ?> stage = this.stages[i]; @@ -129,8 +127,8 @@ public class Pipeline<FirstStage extends StageWithPort, LastStage extends StageW // } // this.stages[this.stages.length - 1].setSuccessor(new EndStage<Object>()); - for (StageWithPort stage : this.stages) { - stage.onStart(); + for (StageWithPort stage : stages) { + stage.onSignal(new StartingSignal(), null); } } @@ -146,14 +144,9 @@ public class Pipeline<FirstStage extends StageWithPort, LastStage extends StageW @Override public boolean isReschedulable() { - // return this.reschedulable; return this.firstStage.isReschedulable(); } - // public void setReschedulable(final boolean reschedulable) { - // this.reschedulable = reschedulable; - // } - @Override public void onSignal(final Signal signal, final InputPort<?> inputPort) { this.firstStage.onSignal(signal, inputPort); @@ -167,4 +160,9 @@ public class Pipeline<FirstStage extends StageWithPort, LastStage extends StageW return this.lastStage; } + @Override + public void validateOutputPorts(final List<InvalidPortConnection> invalidPortConnections) { + // do nothing + } + } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/RunnableStage.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/RunnableStage.java index f2ff6c73..54bc6088 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/RunnableStage.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/RunnableStage.java @@ -3,10 +3,15 @@ package teetime.variant.methodcallWithPorts.framework.core; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import teetime.variant.methodcallWithPorts.framework.core.signal.StartingSignal; +import teetime.variant.methodcallWithPorts.framework.core.signal.TerminatingSignal; +import teetime.variant.methodcallWithPorts.framework.core.signal.ValidatingSignal; + public class RunnableStage implements Runnable { private final StageWithPort stage; private final Logger logger; + private boolean validationEnabled; public RunnableStage(final StageWithPort stage) { this.stage = stage; @@ -17,14 +22,25 @@ public class RunnableStage implements Runnable { public void run() { this.logger.debug("Executing runnable stage..."); + if (this.validationEnabled) { + ValidatingSignal validatingSignal = new ValidatingSignal(); + this.stage.onSignal(validatingSignal, null); + if (validatingSignal.getInvalidPortConnections().size() > 0) { + // throw new RuntimeException(message); + // TODO implement what to do on validation messages + } + } + try { - this.stage.onStart(); + StartingSignal startingSignal = new StartingSignal(); + this.stage.onSignal(startingSignal, null); do { this.stage.executeWithPorts(); } while (this.stage.isReschedulable()); - this.stage.onSignal(Signal.FINISHED, null); + TerminatingSignal terminatingSignal = new TerminatingSignal(); + this.stage.onSignal(terminatingSignal, null); } catch (RuntimeException e) { this.logger.error("Terminating thread due to the following exception: ", e); @@ -33,4 +49,12 @@ public class RunnableStage implements Runnable { this.logger.debug("Finished runnable stage. (" + this.stage.getId() + ")"); } + + public boolean isValidationEnabled() { + return this.validationEnabled; + } + + public void setValidationEnabled(final boolean validationEnabled) { + this.validationEnabled = validationEnabled; + } } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Signal.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Signal.java deleted file mode 100644 index 0e3eadce..00000000 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Signal.java +++ /dev/null @@ -1,6 +0,0 @@ -package teetime.variant.methodcallWithPorts.framework.core; - -public enum Signal { - FINISHED - -} diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/StageWithPort.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/StageWithPort.java index 72563660..15733038 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/StageWithPort.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/StageWithPort.java @@ -1,5 +1,10 @@ package teetime.variant.methodcallWithPorts.framework.core; +import java.util.List; + +import teetime.variant.methodcallWithPorts.framework.core.signal.Signal; +import teetime.variant.methodcallWithPorts.framework.core.validation.InvalidPortConnection; + public interface StageWithPort { String getId(); @@ -20,7 +25,12 @@ public interface StageWithPort { void onIsPipelineHead(); - void onStart(); - void onSignal(Signal signal, InputPort<?> inputPort); + + /** + * + * @param invalidPortConnections + * <i>(Passed as parameter for performance reasons)</i> + */ + void validateOutputPorts(List<InvalidPortConnection> invalidPortConnections); } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/DummyPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/DummyPipe.java index ee90ec5d..498ada08 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/DummyPipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/DummyPipe.java @@ -2,7 +2,7 @@ package teetime.variant.methodcallWithPorts.framework.core.pipe; import teetime.variant.methodcallWithPorts.framework.core.InputPort; import teetime.variant.methodcallWithPorts.framework.core.OutputPort; -import teetime.variant.methodcallWithPorts.framework.core.Signal; +import teetime.variant.methodcallWithPorts.framework.core.signal.Signal; /** * A pipe implementation used to connect unconnected output ports. diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IPipe.java index e7d500b5..0bedb314 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IPipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IPipe.java @@ -2,7 +2,7 @@ package teetime.variant.methodcallWithPorts.framework.core.pipe; import teetime.variant.methodcallWithPorts.framework.core.InputPort; import teetime.variant.methodcallWithPorts.framework.core.OutputPort; -import teetime.variant.methodcallWithPorts.framework.core.Signal; +import teetime.variant.methodcallWithPorts.framework.core.signal.Signal; public interface IPipe<T> { diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IntraThreadPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IntraThreadPipe.java index 59af28e5..116d2903 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IntraThreadPipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IntraThreadPipe.java @@ -1,6 +1,6 @@ package teetime.variant.methodcallWithPorts.framework.core.pipe; -import teetime.variant.methodcallWithPorts.framework.core.Signal; +import teetime.variant.methodcallWithPorts.framework.core.signal.Signal; public abstract class IntraThreadPipe<T> extends AbstractPipe<T> { diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java index ef956895..8ceb2927 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java @@ -10,7 +10,7 @@ import org.jctools.queues.spec.Preference; import teetime.variant.methodcallWithPorts.framework.core.InputPort; import teetime.variant.methodcallWithPorts.framework.core.OutputPort; -import teetime.variant.methodcallWithPorts.framework.core.Signal; +import teetime.variant.methodcallWithPorts.framework.core.signal.Signal; public class SpScPipe<T> extends AbstractPipe<T> { diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/signal/Signal.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/signal/Signal.java new file mode 100644 index 00000000..93a47871 --- /dev/null +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/signal/Signal.java @@ -0,0 +1,8 @@ +package teetime.variant.methodcallWithPorts.framework.core.signal; + +import teetime.variant.methodcallWithPorts.framework.core.AbstractStage; + +public interface Signal { + + void trigger(AbstractStage stage); +} diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/signal/StartingSignal.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/signal/StartingSignal.java new file mode 100644 index 00000000..9e82408c --- /dev/null +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/signal/StartingSignal.java @@ -0,0 +1,12 @@ +package teetime.variant.methodcallWithPorts.framework.core.signal; + +import teetime.variant.methodcallWithPorts.framework.core.AbstractStage; + +public class StartingSignal implements Signal { + + @Override + public void trigger(final AbstractStage stage) { + stage.onStarting(); + } + +} diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/signal/TerminatingSignal.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/signal/TerminatingSignal.java new file mode 100644 index 00000000..b0ea58c0 --- /dev/null +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/signal/TerminatingSignal.java @@ -0,0 +1,12 @@ +package teetime.variant.methodcallWithPorts.framework.core.signal; + +import teetime.variant.methodcallWithPorts.framework.core.AbstractStage; + +public class TerminatingSignal implements Signal { + + @Override + public void trigger(final AbstractStage stage) { + stage.onTerminating(); + } + +} diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/signal/ValidatingSignal.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/signal/ValidatingSignal.java new file mode 100644 index 00000000..fad95a43 --- /dev/null +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/signal/ValidatingSignal.java @@ -0,0 +1,22 @@ +package teetime.variant.methodcallWithPorts.framework.core.signal; + +import java.util.LinkedList; +import java.util.List; + +import teetime.variant.methodcallWithPorts.framework.core.AbstractStage; +import teetime.variant.methodcallWithPorts.framework.core.validation.InvalidPortConnection; + +public class ValidatingSignal implements Signal { + + private final List<InvalidPortConnection> invalidPortConnections = new LinkedList<InvalidPortConnection>(); + + @Override + public void trigger(final AbstractStage stage) { + stage.onValidating(this.invalidPortConnections); + } + + public List<InvalidPortConnection> getInvalidPortConnections() { + return invalidPortConnections; + } + +} diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/InvalidPortConnection.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/validation/InvalidPortConnection.java similarity index 66% rename from src/main/java/teetime/variant/methodcallWithPorts/framework/core/InvalidPortConnection.java rename to src/main/java/teetime/variant/methodcallWithPorts/framework/core/validation/InvalidPortConnection.java index d753230d..51239fed 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/InvalidPortConnection.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/validation/InvalidPortConnection.java @@ -1,4 +1,7 @@ -package teetime.variant.methodcallWithPorts.framework.core; +package teetime.variant.methodcallWithPorts.framework.core.validation; + +import teetime.variant.methodcallWithPorts.framework.core.InputPort; +import teetime.variant.methodcallWithPorts.framework.core.OutputPort; public class InvalidPortConnection { diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/ElementDelayMeasuringStage.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/ElementDelayMeasuringStage.java index 72213b5d..2ab1fa47 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/ElementDelayMeasuringStage.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/ElementDelayMeasuringStage.java @@ -29,9 +29,9 @@ public class ElementDelayMeasuringStage<T> extends ConsumerStage<T> { } @Override - public void onStart() { + public void onStarting() { this.resetTimestamp(System.nanoTime()); - super.onStart(); + super.onStarting(); } private void computeElementDelay(final Long timestampInNs) { diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/ElementThroughputMeasuringStage.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/ElementThroughputMeasuringStage.java index a73ec83c..33ad7d5f 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/ElementThroughputMeasuringStage.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/ElementThroughputMeasuringStage.java @@ -30,9 +30,9 @@ public class ElementThroughputMeasuringStage<T> extends ConsumerStage<T> { } @Override - public void onStart() { + public void onStarting() { this.resetTimestamp(System.nanoTime()); - super.onStart(); + super.onStarting(); } private void computeElementThroughput(final Long timestampInNs) { diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/Relay.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/Relay.java index 574f822a..d0e281fd 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/Relay.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/Relay.java @@ -2,8 +2,8 @@ package teetime.variant.methodcallWithPorts.stage; import teetime.variant.methodcallWithPorts.framework.core.InputPort; import teetime.variant.methodcallWithPorts.framework.core.ProducerStage; -import teetime.variant.methodcallWithPorts.framework.core.Signal; import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe; +import teetime.variant.methodcallWithPorts.framework.core.signal.TerminatingSignal; public class Relay<T> extends ProducerStage<T> { @@ -15,8 +15,7 @@ public class Relay<T> extends ProducerStage<T> { public void execute() { T element = this.inputPort.receive(); if (null == element) { - // if (this.getInputPort().getPipe().isClosed()) { - if (this.cachedCastedInputPipe.getSignal() == Signal.FINISHED) { + if (this.cachedCastedInputPipe.getSignal() instanceof TerminatingSignal) { this.setReschedulable(false); assert 0 == this.inputPort.getPipe().size(); } @@ -27,16 +26,9 @@ public class Relay<T> extends ProducerStage<T> { } @Override - public void onStart() { + public void onStarting() { this.cachedCastedInputPipe = (SpScPipe<T>) this.inputPort.getPipe(); - super.onStart(); - } - - @Override - public void onIsPipelineHead() { - // if (this.getInputPort().getPipe().isClosed()) { - // this.setReschedulable(false); - // } + super.onStarting(); } public InputPort<T> getInputPort() { diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/merger/Merger.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/merger/Merger.java index c28d0bcf..7f469d4f 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/merger/Merger.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/merger/Merger.java @@ -19,16 +19,16 @@ package teetime.variant.methodcallWithPorts.stage.basic.merger; import teetime.variant.methodcallWithPorts.framework.core.AbstractStage; import teetime.variant.methodcallWithPorts.framework.core.InputPort; import teetime.variant.methodcallWithPorts.framework.core.OutputPort; -import teetime.variant.methodcallWithPorts.framework.core.Signal; +import teetime.variant.methodcallWithPorts.framework.core.signal.Signal; /** - * + * * This stage merges data from the input ports, by taking elements according to the chosen merge strategy and by putting them to the output port. - * + * * @author Christian Wulf - * + * * @since 1.10 - * + * * @param <T> * the type of the input ports and the output port */ @@ -61,16 +61,9 @@ public class Merger<T> extends AbstractStage { @Override public void onSignal(final Signal signal, final InputPort<?> inputPort) { - this.logger.debug("Got signal: " + signal + " from input port: " + inputPort); - - switch (signal) { - case FINISHED: - this.onFinished(); - break; - default: - this.logger.warn("Aborted sending signal " + signal + ". Reason: Unknown signal."); - break; - } + this.logger.trace("Got signal: " + signal + " from input port: " + inputPort); + + signal.trigger(this); if (this.finishedInputPorts == this.getInputPorts().length) { this.outputPort.sendSignal(signal); diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/io/DbReader.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/io/DbReader.java index d8746827..497daeab 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/io/DbReader.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/io/DbReader.java @@ -49,7 +49,7 @@ public class DbReader extends ProducerStage<IMonitoringRecord> { private volatile boolean running = true; @Override - public void onStart() { + public void onStarting() { try { Class.forName(this.driverClassname).newInstance(); } catch (final Exception ex) { // NOPMD NOCS (IllegalCatchCheck) diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/io/Printer.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/io/Printer.java index 8cf9a988..3bed4e2b 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/io/Printer.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/io/Printer.java @@ -25,7 +25,7 @@ import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage; /** * @author Matthias Rohr, Jan Waller, Nils Christian Ehmke - * + * * @since 1.10 */ @Description("A filter to print objects to a configured stream") @@ -86,14 +86,15 @@ public class Printer<T> extends ConsumerStage<T> { } @Override - public void onStart() { + public void onStarting() { + super.onStarting(); this.initializeStream(); } @Override - protected void onFinished() { + public void onTerminating() { this.closeStream(); - super.onFinished(); + super.onTerminating(); } private void initializeStream() { diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/io/TCPReader.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/io/TCPReader.java index 6dca7449..decfc7f3 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/io/TCPReader.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/io/TCPReader.java @@ -76,13 +76,13 @@ public class TCPReader extends ProducerStage<IMonitoringRecord> { } @Override - public void onStart() { + public void onStarting() { this.recordFactory = new RecordFactory(); this.register(); this.tcpStringReader = new TCPStringReader(this.port2, this.stringRegistry); this.tcpStringReader.start(); - super.onStart(); + super.onStarting(); } private void register() { diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/TCPReaderSink.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/TCPReaderSink.java index a09cdea3..af0c71fa 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/TCPReaderSink.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/TCPReaderSink.java @@ -78,7 +78,7 @@ public class TCPReaderSink extends ProducerStage<IMonitoringRecord> { } @Override - public void onStart() { + public void onStarting() { this.executorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { @@ -88,7 +88,7 @@ public class TCPReaderSink extends ProducerStage<IMonitoringRecord> { this.tcpStringReader = new TCPStringReader(this.port2, this.stringRegistry); this.tcpStringReader.start(); - super.onStart(); + super.onStarting(); } @Override diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/fileToRecord/BinaryFile2RecordFilter.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/fileToRecord/BinaryFile2RecordFilter.java index 9119e31b..6e9769c6 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/fileToRecord/BinaryFile2RecordFilter.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/fileToRecord/BinaryFile2RecordFilter.java @@ -58,9 +58,9 @@ public class BinaryFile2RecordFilter extends ConsumerStage<File> { } @Override - public void onStart() { + public void onStarting() { this.recordFromBinaryFileCreator = new RecordFromBinaryFileCreator(this.logger, this.classNameRegistryRepository); - super.onStart(); + super.onStarting(); } public ClassNameRegistryRepository getClassNameRegistryRepository() { diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/stringBuffer/StringBufferFilter.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/stringBuffer/StringBufferFilter.java index d7c6f3c4..a566f7a5 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/stringBuffer/StringBufferFilter.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/stringBuffer/StringBufferFilter.java @@ -44,12 +44,12 @@ public class StringBufferFilter<T> extends ConsumerStage<T> { } @Override - public void onStart() { + public void onStarting() { for (final AbstractDataTypeHandler<?> handler : this.dataTypeHandlers) { handler.setLogger(this.logger); handler.setStringRepository(this.kiekerHashMap); } - super.onStart(); + super.onStarting(); } private T handle(final T object) { diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment17/MethodCallThroughputAnalysis17.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment17/MethodCallThroughputAnalysis17.java index 652ccbdc..9a4c84ae 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment17/MethodCallThroughputAnalysis17.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment17/MethodCallThroughputAnalysis17.java @@ -24,11 +24,11 @@ import teetime.variant.explicitScheduling.examples.throughput.TimestampObject; import teetime.variant.explicitScheduling.framework.core.Analysis; import teetime.variant.methodcallWithPorts.framework.core.Pipeline; import teetime.variant.methodcallWithPorts.framework.core.RunnableStage; -import teetime.variant.methodcallWithPorts.framework.core.Signal; import teetime.variant.methodcallWithPorts.framework.core.StageWithPort; import teetime.variant.methodcallWithPorts.framework.core.pipe.IPipe; import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe; import teetime.variant.methodcallWithPorts.framework.core.pipe.UnorderedGrowablePipe; +import teetime.variant.methodcallWithPorts.framework.core.signal.TerminatingSignal; import teetime.variant.methodcallWithPorts.stage.CollectorSink; import teetime.variant.methodcallWithPorts.stage.NoopFilter; import teetime.variant.methodcallWithPorts.stage.ObjectProducer; @@ -40,7 +40,7 @@ import teetime.variant.methodcallWithPorts.stage.basic.distributor.Distributor; /** * @author Christian Wulf - * + * * @since 1.10 */ public class MethodCallThroughputAnalysis17 extends Analysis { @@ -160,7 +160,7 @@ public class MethodCallThroughputAnalysis17 extends Analysis { startPipe.add(this.inputObjectCreator.create()); } // startPipe.close(); - startPipe.setSignal(Signal.FINISHED); + startPipe.setSignal(new TerminatingSignal()); UnorderedGrowablePipe.connect(relay.getOutputPort(), startTimestampFilter.getInputPort()); diff --git a/submodules/JCTools b/submodules/JCTools index 75998aa2..88e1e25f 160000 --- a/submodules/JCTools +++ b/submodules/JCTools @@ -1 +1 @@ -Subproject commit 75998aa20b7ec897ec321c1f94192de888f2dc6e +Subproject commit 88e1e25f9519b250258c7e5ada30935975ab2d10 -- GitLab