diff --git a/src/main/java/teetime/framework/AbstractStage.java b/src/main/java/teetime/framework/AbstractStage.java index f8e0697628c2ce5433e590792e84aed1b5d2bcf2..33be2f0cfb5b0773f794ffe0097a679d1e23b36b 100644 --- a/src/main/java/teetime/framework/AbstractStage.java +++ b/src/main/java/teetime/framework/AbstractStage.java @@ -12,8 +12,6 @@ import org.slf4j.LoggerFactory; import teetime.framework.pipe.DummyPipe; import teetime.framework.pipe.IPipe; import teetime.framework.signal.ISignal; -import teetime.framework.signal.OnStartingException; -import teetime.framework.signal.OnTerminatingException; import teetime.framework.validation.InvalidPortConnection; public abstract class AbstractStage implements Stage { @@ -119,14 +117,14 @@ public abstract class AbstractStage implements Stage { this.validateOutputPorts(invalidPortConnections); } - public void onStarting() throws OnStartingException { // NOPMD + public void onStarting() { this.cachedInputPorts = this.inputPortList.toArray(new InputPort<?>[0]); this.cachedOutputPorts = this.outputPortList.toArray(new OutputPort<?>[0]); this.connectUnconnectedOutputPorts(); } - public void onTerminating() throws OnTerminatingException { // NOPMD + public void onTerminating() { // empty default implementation } diff --git a/src/main/java/teetime/framework/pipe/InterThreadPipe.java b/src/main/java/teetime/framework/pipe/InterThreadPipe.java index eb02e1818edb735a6a593f504103bfe53a6c9658..a38e59519e8e3a16c2027fb0b14ff757273b75a3 100644 --- a/src/main/java/teetime/framework/pipe/InterThreadPipe.java +++ b/src/main/java/teetime/framework/pipe/InterThreadPipe.java @@ -34,7 +34,7 @@ public abstract class InterThreadPipe extends AbstractPipe { } @Override - public void reportNewElement() {// NOPMD + public void reportNewElement() { // do nothing } } diff --git a/src/main/java/teetime/framework/signal/OnStartingException.java b/src/main/java/teetime/framework/signal/OnStartingException.java deleted file mode 100644 index 19e75a7257cb193f1b88f7e06bc7a6fe3c8f840d..0000000000000000000000000000000000000000 --- a/src/main/java/teetime/framework/signal/OnStartingException.java +++ /dev/null @@ -1,11 +0,0 @@ -package teetime.framework.signal; - -public class OnStartingException extends Exception { - - private static final long serialVersionUID = 2202821942516875502L; - - public OnStartingException(final String message, final Exception cause) { - super(message, cause); - } - -} diff --git a/src/main/java/teetime/framework/signal/OnTerminatingException.java b/src/main/java/teetime/framework/signal/OnTerminatingException.java deleted file mode 100644 index 5cc302110a15cb67da4d2af1b5850dbe2b34f3a6..0000000000000000000000000000000000000000 --- a/src/main/java/teetime/framework/signal/OnTerminatingException.java +++ /dev/null @@ -1,7 +0,0 @@ -package teetime.framework.signal; - -public class OnTerminatingException extends Exception { - - private static final long serialVersionUID = -3753319394964802310L; - -} diff --git a/src/main/java/teetime/framework/signal/StartingSignal.java b/src/main/java/teetime/framework/signal/StartingSignal.java index 89c164ec61a201cfd908bdeb33adc5fa1a33e096..d1b7aa5474236f782e6b8436f9934ed87149e9f6 100644 --- a/src/main/java/teetime/framework/signal/StartingSignal.java +++ b/src/main/java/teetime/framework/signal/StartingSignal.java @@ -6,11 +6,7 @@ public class StartingSignal implements ISignal { @Override public void trigger(final AbstractStage stage) { - try { - stage.onStarting(); - } catch (OnStartingException e) { - throw new RuntimeException(e); - } + stage.onStarting(); } } diff --git a/src/main/java/teetime/framework/signal/TerminatingSignal.java b/src/main/java/teetime/framework/signal/TerminatingSignal.java index 19bc9299d12c89488d4b481fc494b72aa1d13425..0aba57bae09d4045d6d76163c86a108206a8d660 100644 --- a/src/main/java/teetime/framework/signal/TerminatingSignal.java +++ b/src/main/java/teetime/framework/signal/TerminatingSignal.java @@ -6,11 +6,7 @@ public class TerminatingSignal implements ISignal { @Override public void trigger(final AbstractStage stage) { - try { - stage.onTerminating(); - } catch (OnTerminatingException e) { - throw new RuntimeException(e); - } + stage.onTerminating(); } } diff --git a/src/main/java/teetime/stage/Cache.java b/src/main/java/teetime/stage/Cache.java index d396330b627d0992960e8afe358cbab373aa6ef4..07b7d2896a04bb5338ff31f63aa09613b8ff9cdf 100644 --- a/src/main/java/teetime/stage/Cache.java +++ b/src/main/java/teetime/stage/Cache.java @@ -6,7 +6,6 @@ import java.util.concurrent.TimeUnit; import teetime.framework.ConsumerStage; import teetime.framework.OutputPort; -import teetime.framework.signal.OnTerminatingException; import teetime.util.StopWatch; public class Cache<T> extends ConsumerStage<T> { @@ -21,7 +20,8 @@ public class Cache<T> extends ConsumerStage<T> { } @Override - public void onTerminating() throws OnTerminatingException { + public void onTerminating() { + super.onTerminating(); this.logger.debug("Emitting " + this.cachedObjects.size() + " cached elements..."); StopWatch stopWatch = new StopWatch(); stopWatch.start(); diff --git a/src/main/java/teetime/stage/CollectorSink.java b/src/main/java/teetime/stage/CollectorSink.java index 3a9ca3803c7a1c7e9a9787b2234ac509853a2252..f3b3875dce73a457cc1bec91c195944e7cc9689a 100644 --- a/src/main/java/teetime/stage/CollectorSink.java +++ b/src/main/java/teetime/stage/CollectorSink.java @@ -18,7 +18,6 @@ package teetime.stage; import java.util.List; import teetime.framework.ConsumerStage; -import teetime.framework.signal.OnTerminatingException; /** * @author Christian Wulf @@ -46,7 +45,7 @@ public class CollectorSink<T> extends ConsumerStage<T> { } @Override - public void onTerminating() throws OnTerminatingException { + public void onTerminating() { super.onTerminating(); System.out.println("size: " + this.elements.size()); } diff --git a/src/main/java/teetime/stage/ElementDelayMeasuringStage.java b/src/main/java/teetime/stage/ElementDelayMeasuringStage.java index 2af7dcefc49a12a5db200cc1acd579d9ac5e02fc..452f75e3701d2bb29e74e156b152c5df1156d82f 100644 --- a/src/main/java/teetime/stage/ElementDelayMeasuringStage.java +++ b/src/main/java/teetime/stage/ElementDelayMeasuringStage.java @@ -6,7 +6,6 @@ import java.util.List; import teetime.framework.ConsumerStage; import teetime.framework.InputPort; import teetime.framework.OutputPort; -import teetime.framework.signal.OnStartingException; public class ElementDelayMeasuringStage<T> extends ConsumerStage<T> { @@ -30,7 +29,7 @@ public class ElementDelayMeasuringStage<T> extends ConsumerStage<T> { } @Override - public void onStarting() throws OnStartingException { + public void onStarting() { this.resetTimestamp(System.nanoTime()); super.onStarting(); } diff --git a/src/main/java/teetime/stage/ElementThroughputMeasuringStage.java b/src/main/java/teetime/stage/ElementThroughputMeasuringStage.java index 4d6da35def9271db85206972905fa8840ddfc108..ea36801a4d4ccbb07868f9582f2fdaab7837f7a9 100644 --- a/src/main/java/teetime/stage/ElementThroughputMeasuringStage.java +++ b/src/main/java/teetime/stage/ElementThroughputMeasuringStage.java @@ -7,7 +7,6 @@ import java.util.concurrent.TimeUnit; import teetime.framework.ConsumerStage; import teetime.framework.InputPort; import teetime.framework.OutputPort; -import teetime.framework.signal.OnStartingException; public class ElementThroughputMeasuringStage<T> extends ConsumerStage<T> { @@ -31,7 +30,7 @@ public class ElementThroughputMeasuringStage<T> extends ConsumerStage<T> { } @Override - public void onStarting() throws OnStartingException { + public void onStarting() { this.resetTimestamp(System.nanoTime()); super.onStarting(); } diff --git a/src/main/java/teetime/stage/Relay.java b/src/main/java/teetime/stage/Relay.java index 6ca7d55ac576369f4eb9fee4548fa2188e2138dc..144b34eaf9d0e6dd7f611b54aa3df28f21e14a00 100644 --- a/src/main/java/teetime/stage/Relay.java +++ b/src/main/java/teetime/stage/Relay.java @@ -3,7 +3,6 @@ package teetime.stage; import teetime.framework.InputPort; import teetime.framework.ProducerStage; import teetime.framework.pipe.InterThreadPipe; -import teetime.framework.signal.OnStartingException; import teetime.framework.signal.TerminatingSignal; public class Relay<T> extends ProducerStage<T> { @@ -26,7 +25,7 @@ public class Relay<T> extends ProducerStage<T> { } @Override - public void onStarting() throws OnStartingException { + public void onStarting() { this.cachedCastedInputPipe = (InterThreadPipe) this.inputPort.getPipe(); super.onStarting(); } diff --git a/src/main/java/teetime/stage/basic/Delay.java b/src/main/java/teetime/stage/basic/Delay.java index 18c6db7f8fcce15416c699ffa882c3a69a367f11..6c0ecac5b2b93a86500ee939623e110e1a80f4e9 100644 --- a/src/main/java/teetime/stage/basic/Delay.java +++ b/src/main/java/teetime/stage/basic/Delay.java @@ -6,7 +6,6 @@ import java.util.List; import teetime.framework.AbstractStage; import teetime.framework.InputPort; import teetime.framework.OutputPort; -import teetime.framework.signal.OnTerminatingException; public class Delay<T> extends AbstractStage { @@ -35,7 +34,7 @@ public class Delay<T> extends AbstractStage { } @Override - public void onTerminating() throws OnTerminatingException { + public void onTerminating() { super.onTerminating(); while (!this.inputPort.getPipe().isEmpty()) { this.executeWithPorts(); diff --git a/src/main/java/teetime/stage/basic/distributor/Distributor.java b/src/main/java/teetime/stage/basic/distributor/Distributor.java index 3e4b59bd8b6da3ad96d8564ae6ae7ed5929bbfbc..b80c8e5ef21356fde8b837e2611df330187b75c9 100644 --- a/src/main/java/teetime/stage/basic/distributor/Distributor.java +++ b/src/main/java/teetime/stage/basic/distributor/Distributor.java @@ -37,6 +37,15 @@ public class Distributor<T> extends ConsumerStage<T> { this.strategy.distribute((OutputPort<T>[]) this.getOutputPorts(), element); } + @Override + public void onTerminating() { + super.onTerminating(); + // for (OutputPort<T> op : this.outputPortList) { + // op.getPipe().close(); + // System.out.println("End signal sent, size: " + op.getPipe().size()); + // } + } + public OutputPort<T> getNewOutputPort() { return this.createOutputPort(); } diff --git a/src/main/java/teetime/stage/basic/merger/Merger.java b/src/main/java/teetime/stage/basic/merger/Merger.java index 4cd5229f8a06c8a58b52975c02150c4571184775..870b22b90832de34648468ba03f7c67abc7c21b4 100644 --- a/src/main/java/teetime/stage/basic/merger/Merger.java +++ b/src/main/java/teetime/stage/basic/merger/Merger.java @@ -20,7 +20,6 @@ import teetime.framework.AbstractStage; import teetime.framework.InputPort; import teetime.framework.OutputPort; import teetime.framework.signal.ISignal; -import teetime.framework.signal.OnTerminatingException; /** * @@ -63,9 +62,9 @@ public class Merger<T> extends AbstractStage { } @Override - public void onTerminating() throws OnTerminatingException { - this.finishedInputPorts++; + public void onTerminating() { super.onTerminating(); + this.finishedInputPorts++; } public IMergerStrategy<T> getStrategy() { diff --git a/src/main/java/teetime/stage/io/Printer.java b/src/main/java/teetime/stage/io/Printer.java index 2f273623e9aa4de0127b94c88578191f0d99fe24..40ee69ee69202d70a1d25d744e66b4e39d9b8843 100644 --- a/src/main/java/teetime/stage/io/Printer.java +++ b/src/main/java/teetime/stage/io/Printer.java @@ -21,8 +21,6 @@ import java.io.PrintStream; import java.io.UnsupportedEncodingException; import teetime.framework.ConsumerStage; -import teetime.framework.signal.OnStartingException; -import teetime.framework.signal.OnTerminatingException; /** * A filter to print objects to a configured stream @@ -88,13 +86,13 @@ public class Printer<T> extends ConsumerStage<T> { } @Override - public void onStarting() throws OnStartingException { + public void onStarting() { super.onStarting(); this.initializeStream(); } @Override - public void onTerminating() throws OnTerminatingException { + public void onTerminating() { this.closeStream(); super.onTerminating(); } diff --git a/src/main/java/teetime/stage/stringBuffer/StringBufferFilter.java b/src/main/java/teetime/stage/stringBuffer/StringBufferFilter.java index 8390619bf7a34ba8c1459ad60e23802d3406a1a2..2cefc39dd013fb00c160ef34adb223a0439b8b97 100644 --- a/src/main/java/teetime/stage/stringBuffer/StringBufferFilter.java +++ b/src/main/java/teetime/stage/stringBuffer/StringBufferFilter.java @@ -20,13 +20,12 @@ import java.util.LinkedList; import teetime.framework.ConsumerStage; import teetime.framework.OutputPort; -import teetime.framework.signal.OnStartingException; import teetime.stage.stringBuffer.handler.AbstractDataTypeHandler; import teetime.stage.stringBuffer.util.KiekerHashMap; /** * @author Christian Wulf - * + * * @since 1.10 */ public class StringBufferFilter<T> extends ConsumerStage<T> { @@ -45,7 +44,7 @@ public class StringBufferFilter<T> extends ConsumerStage<T> { } @Override - public void onStarting() throws OnStartingException { + public void onStarting() { for (final AbstractDataTypeHandler<?> handler : this.dataTypeHandlers) { handler.setLogger(this.logger); handler.setStringRepository(this.kiekerHashMap); diff --git a/src/performancetest/java/teetime/examples/loopStage/Countdown.java b/src/performancetest/java/teetime/examples/loopStage/Countdown.java index b0677ee5a99fce14455d24d6dfcd562e6d14cc4e..f70e3eb60a80eadb286bd2c78095040b26ccd2ed 100644 --- a/src/performancetest/java/teetime/examples/loopStage/Countdown.java +++ b/src/performancetest/java/teetime/examples/loopStage/Countdown.java @@ -3,7 +3,6 @@ package teetime.examples.loopStage; import teetime.framework.InputPort; import teetime.framework.OutputPort; import teetime.framework.ProducerStage; -import teetime.framework.signal.OnStartingException; public class Countdown extends ProducerStage<Void> { @@ -18,9 +17,9 @@ public class Countdown extends ProducerStage<Void> { } @Override - public void onStarting() throws OnStartingException { - super.onStarting(); + public void onStarting() { this.countdownInputPort.getPipe().add(this.initialCountdown); + super.onStarting(); } @Override