From 6193418d255a34a18f41c35bf482b7767d4af85b Mon Sep 17 00:00:00 2001 From: Christian Wulf <chw@informatik.uni-kiel.de> Date: Mon, 3 Nov 2014 08:35:21 +0100 Subject: [PATCH] added "throws exception" to signal method headers --- src/main/java/teetime/framework/AbstractStage.java | 6 ++++-- .../java/teetime/framework/pipe/InterThreadPipe.java | 2 +- .../teetime/framework/signal/OnStartingException.java | 11 +++++++++++ .../framework/signal/OnTerminatingException.java | 7 +++++++ .../java/teetime/framework/signal/StartingSignal.java | 6 +++++- .../teetime/framework/signal/TerminatingSignal.java | 6 +++++- src/main/java/teetime/stage/Cache.java | 4 ++-- src/main/java/teetime/stage/CollectorSink.java | 3 ++- .../teetime/stage/ElementDelayMeasuringStage.java | 3 ++- .../stage/ElementThroughputMeasuringStage.java | 3 ++- src/main/java/teetime/stage/Relay.java | 3 ++- src/main/java/teetime/stage/basic/Delay.java | 3 ++- .../teetime/stage/basic/distributor/Distributor.java | 9 --------- src/main/java/teetime/stage/basic/merger/Merger.java | 5 +++-- src/main/java/teetime/stage/io/Printer.java | 6 ++++-- .../stage/stringBuffer/StringBufferFilter.java | 5 +++-- .../java/teetime/examples/loopStage/Countdown.java | 5 +++-- 17 files changed, 58 insertions(+), 29 deletions(-) create mode 100644 src/main/java/teetime/framework/signal/OnStartingException.java create mode 100644 src/main/java/teetime/framework/signal/OnTerminatingException.java diff --git a/src/main/java/teetime/framework/AbstractStage.java b/src/main/java/teetime/framework/AbstractStage.java index 33be2f0c..f8e06976 100644 --- a/src/main/java/teetime/framework/AbstractStage.java +++ b/src/main/java/teetime/framework/AbstractStage.java @@ -12,6 +12,8 @@ import org.slf4j.LoggerFactory; import teetime.framework.pipe.DummyPipe; import teetime.framework.pipe.IPipe; import teetime.framework.signal.ISignal; +import teetime.framework.signal.OnStartingException; +import teetime.framework.signal.OnTerminatingException; import teetime.framework.validation.InvalidPortConnection; public abstract class AbstractStage implements Stage { @@ -117,14 +119,14 @@ public abstract class AbstractStage implements Stage { this.validateOutputPorts(invalidPortConnections); } - public void onStarting() { + public void onStarting() throws OnStartingException { // NOPMD this.cachedInputPorts = this.inputPortList.toArray(new InputPort<?>[0]); this.cachedOutputPorts = this.outputPortList.toArray(new OutputPort<?>[0]); this.connectUnconnectedOutputPorts(); } - public void onTerminating() { + public void onTerminating() throws OnTerminatingException { // NOPMD // empty default implementation } diff --git a/src/main/java/teetime/framework/pipe/InterThreadPipe.java b/src/main/java/teetime/framework/pipe/InterThreadPipe.java index a38e5951..eb02e181 100644 --- a/src/main/java/teetime/framework/pipe/InterThreadPipe.java +++ b/src/main/java/teetime/framework/pipe/InterThreadPipe.java @@ -34,7 +34,7 @@ public abstract class InterThreadPipe extends AbstractPipe { } @Override - public void reportNewElement() { + public void reportNewElement() {// NOPMD // do nothing } } diff --git a/src/main/java/teetime/framework/signal/OnStartingException.java b/src/main/java/teetime/framework/signal/OnStartingException.java new file mode 100644 index 00000000..19e75a72 --- /dev/null +++ b/src/main/java/teetime/framework/signal/OnStartingException.java @@ -0,0 +1,11 @@ +package teetime.framework.signal; + +public class OnStartingException extends Exception { + + private static final long serialVersionUID = 2202821942516875502L; + + public OnStartingException(final String message, final Exception cause) { + super(message, cause); + } + +} diff --git a/src/main/java/teetime/framework/signal/OnTerminatingException.java b/src/main/java/teetime/framework/signal/OnTerminatingException.java new file mode 100644 index 00000000..5cc30211 --- /dev/null +++ b/src/main/java/teetime/framework/signal/OnTerminatingException.java @@ -0,0 +1,7 @@ +package teetime.framework.signal; + +public class OnTerminatingException extends Exception { + + private static final long serialVersionUID = -3753319394964802310L; + +} diff --git a/src/main/java/teetime/framework/signal/StartingSignal.java b/src/main/java/teetime/framework/signal/StartingSignal.java index d1b7aa54..89c164ec 100644 --- a/src/main/java/teetime/framework/signal/StartingSignal.java +++ b/src/main/java/teetime/framework/signal/StartingSignal.java @@ -6,7 +6,11 @@ public class StartingSignal implements ISignal { @Override public void trigger(final AbstractStage stage) { - stage.onStarting(); + try { + stage.onStarting(); + } catch (OnStartingException e) { + throw new RuntimeException(e); + } } } diff --git a/src/main/java/teetime/framework/signal/TerminatingSignal.java b/src/main/java/teetime/framework/signal/TerminatingSignal.java index 0aba57ba..19bc9299 100644 --- a/src/main/java/teetime/framework/signal/TerminatingSignal.java +++ b/src/main/java/teetime/framework/signal/TerminatingSignal.java @@ -6,7 +6,11 @@ public class TerminatingSignal implements ISignal { @Override public void trigger(final AbstractStage stage) { - stage.onTerminating(); + try { + stage.onTerminating(); + } catch (OnTerminatingException e) { + throw new RuntimeException(e); + } } } diff --git a/src/main/java/teetime/stage/Cache.java b/src/main/java/teetime/stage/Cache.java index 07b7d289..d396330b 100644 --- a/src/main/java/teetime/stage/Cache.java +++ b/src/main/java/teetime/stage/Cache.java @@ -6,6 +6,7 @@ import java.util.concurrent.TimeUnit; import teetime.framework.ConsumerStage; import teetime.framework.OutputPort; +import teetime.framework.signal.OnTerminatingException; import teetime.util.StopWatch; public class Cache<T> extends ConsumerStage<T> { @@ -20,8 +21,7 @@ public class Cache<T> extends ConsumerStage<T> { } @Override - public void onTerminating() { - super.onTerminating(); + public void onTerminating() throws OnTerminatingException { this.logger.debug("Emitting " + this.cachedObjects.size() + " cached elements..."); StopWatch stopWatch = new StopWatch(); stopWatch.start(); diff --git a/src/main/java/teetime/stage/CollectorSink.java b/src/main/java/teetime/stage/CollectorSink.java index f3b3875d..3a9ca380 100644 --- a/src/main/java/teetime/stage/CollectorSink.java +++ b/src/main/java/teetime/stage/CollectorSink.java @@ -18,6 +18,7 @@ package teetime.stage; import java.util.List; import teetime.framework.ConsumerStage; +import teetime.framework.signal.OnTerminatingException; /** * @author Christian Wulf @@ -45,7 +46,7 @@ public class CollectorSink<T> extends ConsumerStage<T> { } @Override - public void onTerminating() { + public void onTerminating() throws OnTerminatingException { super.onTerminating(); System.out.println("size: " + this.elements.size()); } diff --git a/src/main/java/teetime/stage/ElementDelayMeasuringStage.java b/src/main/java/teetime/stage/ElementDelayMeasuringStage.java index 452f75e3..2af7dcef 100644 --- a/src/main/java/teetime/stage/ElementDelayMeasuringStage.java +++ b/src/main/java/teetime/stage/ElementDelayMeasuringStage.java @@ -6,6 +6,7 @@ import java.util.List; import teetime.framework.ConsumerStage; import teetime.framework.InputPort; import teetime.framework.OutputPort; +import teetime.framework.signal.OnStartingException; public class ElementDelayMeasuringStage<T> extends ConsumerStage<T> { @@ -29,7 +30,7 @@ public class ElementDelayMeasuringStage<T> extends ConsumerStage<T> { } @Override - public void onStarting() { + public void onStarting() throws OnStartingException { this.resetTimestamp(System.nanoTime()); super.onStarting(); } diff --git a/src/main/java/teetime/stage/ElementThroughputMeasuringStage.java b/src/main/java/teetime/stage/ElementThroughputMeasuringStage.java index ea36801a..4d6da35d 100644 --- a/src/main/java/teetime/stage/ElementThroughputMeasuringStage.java +++ b/src/main/java/teetime/stage/ElementThroughputMeasuringStage.java @@ -7,6 +7,7 @@ import java.util.concurrent.TimeUnit; import teetime.framework.ConsumerStage; import teetime.framework.InputPort; import teetime.framework.OutputPort; +import teetime.framework.signal.OnStartingException; public class ElementThroughputMeasuringStage<T> extends ConsumerStage<T> { @@ -30,7 +31,7 @@ public class ElementThroughputMeasuringStage<T> extends ConsumerStage<T> { } @Override - public void onStarting() { + public void onStarting() throws OnStartingException { this.resetTimestamp(System.nanoTime()); super.onStarting(); } diff --git a/src/main/java/teetime/stage/Relay.java b/src/main/java/teetime/stage/Relay.java index 144b34ea..6ca7d55a 100644 --- a/src/main/java/teetime/stage/Relay.java +++ b/src/main/java/teetime/stage/Relay.java @@ -3,6 +3,7 @@ package teetime.stage; import teetime.framework.InputPort; import teetime.framework.ProducerStage; import teetime.framework.pipe.InterThreadPipe; +import teetime.framework.signal.OnStartingException; import teetime.framework.signal.TerminatingSignal; public class Relay<T> extends ProducerStage<T> { @@ -25,7 +26,7 @@ public class Relay<T> extends ProducerStage<T> { } @Override - public void onStarting() { + public void onStarting() throws OnStartingException { this.cachedCastedInputPipe = (InterThreadPipe) this.inputPort.getPipe(); super.onStarting(); } diff --git a/src/main/java/teetime/stage/basic/Delay.java b/src/main/java/teetime/stage/basic/Delay.java index 6c0ecac5..18c6db7f 100644 --- a/src/main/java/teetime/stage/basic/Delay.java +++ b/src/main/java/teetime/stage/basic/Delay.java @@ -6,6 +6,7 @@ import java.util.List; import teetime.framework.AbstractStage; import teetime.framework.InputPort; import teetime.framework.OutputPort; +import teetime.framework.signal.OnTerminatingException; public class Delay<T> extends AbstractStage { @@ -34,7 +35,7 @@ public class Delay<T> extends AbstractStage { } @Override - public void onTerminating() { + public void onTerminating() throws OnTerminatingException { super.onTerminating(); while (!this.inputPort.getPipe().isEmpty()) { this.executeWithPorts(); diff --git a/src/main/java/teetime/stage/basic/distributor/Distributor.java b/src/main/java/teetime/stage/basic/distributor/Distributor.java index b80c8e5e..3e4b59bd 100644 --- a/src/main/java/teetime/stage/basic/distributor/Distributor.java +++ b/src/main/java/teetime/stage/basic/distributor/Distributor.java @@ -37,15 +37,6 @@ public class Distributor<T> extends ConsumerStage<T> { this.strategy.distribute((OutputPort<T>[]) this.getOutputPorts(), element); } - @Override - public void onTerminating() { - super.onTerminating(); - // for (OutputPort<T> op : this.outputPortList) { - // op.getPipe().close(); - // System.out.println("End signal sent, size: " + op.getPipe().size()); - // } - } - public OutputPort<T> getNewOutputPort() { return this.createOutputPort(); } diff --git a/src/main/java/teetime/stage/basic/merger/Merger.java b/src/main/java/teetime/stage/basic/merger/Merger.java index 870b22b9..4cd5229f 100644 --- a/src/main/java/teetime/stage/basic/merger/Merger.java +++ b/src/main/java/teetime/stage/basic/merger/Merger.java @@ -20,6 +20,7 @@ import teetime.framework.AbstractStage; import teetime.framework.InputPort; import teetime.framework.OutputPort; import teetime.framework.signal.ISignal; +import teetime.framework.signal.OnTerminatingException; /** * @@ -62,9 +63,9 @@ public class Merger<T> extends AbstractStage { } @Override - public void onTerminating() { - super.onTerminating(); + public void onTerminating() throws OnTerminatingException { this.finishedInputPorts++; + super.onTerminating(); } public IMergerStrategy<T> getStrategy() { diff --git a/src/main/java/teetime/stage/io/Printer.java b/src/main/java/teetime/stage/io/Printer.java index 40ee69ee..2f273623 100644 --- a/src/main/java/teetime/stage/io/Printer.java +++ b/src/main/java/teetime/stage/io/Printer.java @@ -21,6 +21,8 @@ import java.io.PrintStream; import java.io.UnsupportedEncodingException; import teetime.framework.ConsumerStage; +import teetime.framework.signal.OnStartingException; +import teetime.framework.signal.OnTerminatingException; /** * A filter to print objects to a configured stream @@ -86,13 +88,13 @@ public class Printer<T> extends ConsumerStage<T> { } @Override - public void onStarting() { + public void onStarting() throws OnStartingException { super.onStarting(); this.initializeStream(); } @Override - public void onTerminating() { + public void onTerminating() throws OnTerminatingException { this.closeStream(); super.onTerminating(); } diff --git a/src/main/java/teetime/stage/stringBuffer/StringBufferFilter.java b/src/main/java/teetime/stage/stringBuffer/StringBufferFilter.java index 2cefc39d..8390619b 100644 --- a/src/main/java/teetime/stage/stringBuffer/StringBufferFilter.java +++ b/src/main/java/teetime/stage/stringBuffer/StringBufferFilter.java @@ -20,12 +20,13 @@ import java.util.LinkedList; import teetime.framework.ConsumerStage; import teetime.framework.OutputPort; +import teetime.framework.signal.OnStartingException; import teetime.stage.stringBuffer.handler.AbstractDataTypeHandler; import teetime.stage.stringBuffer.util.KiekerHashMap; /** * @author Christian Wulf - * + * * @since 1.10 */ public class StringBufferFilter<T> extends ConsumerStage<T> { @@ -44,7 +45,7 @@ public class StringBufferFilter<T> extends ConsumerStage<T> { } @Override - public void onStarting() { + public void onStarting() throws OnStartingException { for (final AbstractDataTypeHandler<?> handler : this.dataTypeHandlers) { handler.setLogger(this.logger); handler.setStringRepository(this.kiekerHashMap); diff --git a/src/performancetest/java/teetime/examples/loopStage/Countdown.java b/src/performancetest/java/teetime/examples/loopStage/Countdown.java index f70e3eb6..b0677ee5 100644 --- a/src/performancetest/java/teetime/examples/loopStage/Countdown.java +++ b/src/performancetest/java/teetime/examples/loopStage/Countdown.java @@ -3,6 +3,7 @@ package teetime.examples.loopStage; import teetime.framework.InputPort; import teetime.framework.OutputPort; import teetime.framework.ProducerStage; +import teetime.framework.signal.OnStartingException; public class Countdown extends ProducerStage<Void> { @@ -17,9 +18,9 @@ public class Countdown extends ProducerStage<Void> { } @Override - public void onStarting() { - this.countdownInputPort.getPipe().add(this.initialCountdown); + public void onStarting() throws OnStartingException { super.onStarting(); + this.countdownInputPort.getPipe().add(this.initialCountdown); } @Override -- GitLab