diff --git a/src/main/java/teetime/framework/AbstractStage.java b/src/main/java/teetime/framework/AbstractStage.java index b56eb2ae89e375c5217fb1b1c5b2f5b019cba595..b541b0aa415e4764db5e340c802d80ce15cd025b 100644 --- a/src/main/java/teetime/framework/AbstractStage.java +++ b/src/main/java/teetime/framework/AbstractStage.java @@ -40,23 +40,6 @@ public abstract class AbstractStage extends Stage { this.logger = LoggerFactory.getLogger(this.getClass().getName() + "(" + this.id + ")"); } - /** - * Sends the given <code>element</code> using the default output port - * - * @param element - * @return <code>true</code> iff the given element could be sent, <code>false</code> otherwise (then use a re-try strategy) - */ - protected final <O> boolean send(final OutputPort<O> outputPort, final O element) { - if (!outputPort.send(element)) { - return false; - } - - outputPort.reportNewElement(); - - return true; - // return outputPort.send(element); - } - private void connectUnconnectedOutputPorts() { for (OutputPort<?> outputPort : this.cachedOutputPorts) { if (null == outputPort.getPipe()) { // if port is unconnected diff --git a/src/main/java/teetime/framework/OutputPort.java b/src/main/java/teetime/framework/OutputPort.java index 45a8de638a765f72d408dc5e84f856feac624ab6..588669bb10cc4cb47f3d693d2c3041f7b3ecb899 100644 --- a/src/main/java/teetime/framework/OutputPort.java +++ b/src/main/java/teetime/framework/OutputPort.java @@ -14,15 +14,15 @@ public final class OutputPort<T> extends AbstractPort<T> { * @return <code>true</code> iff the given <code>element</code> could be sent, <code>false</code> otherwise (then use a re-try strategy) */ public boolean send(final T element) { - return this.pipe.add(element); + boolean added = this.pipe.add(element); + if (added) { + this.pipe.reportNewElement(); + } + return added; } public void sendSignal(final ISignal signal) { this.pipe.sendSignal(signal); } - public void reportNewElement() { - this.pipe.reportNewElement(); - } - } diff --git a/src/main/java/teetime/stage/ByteArray2String.java b/src/main/java/teetime/stage/ByteArray2String.java index eb27f89b5cd68cbac09ef5cf6a9ed806cf5205d8..aca5a438e085a277ea780f709695104387e25f0e 100644 --- a/src/main/java/teetime/stage/ByteArray2String.java +++ b/src/main/java/teetime/stage/ByteArray2String.java @@ -11,7 +11,7 @@ public class ByteArray2String extends AbstractConsumerStage<byte[]> { @Override protected void execute(final byte[] element) { - this.send(this.outputPort, new String(element, Charset.forName("UTF-8"))); + outputPort.send(new String(element, Charset.forName("UTF-8"))); } public OutputPort<? extends String> getOutputPort() { diff --git a/src/main/java/teetime/stage/Cache.java b/src/main/java/teetime/stage/Cache.java index df63a6232abbbb6963f17761870ca9bd0901a236..a5991589b69f63ec746ea15e3dc09d021a51be61 100644 --- a/src/main/java/teetime/stage/Cache.java +++ b/src/main/java/teetime/stage/Cache.java @@ -25,7 +25,7 @@ public class Cache<T> extends AbstractConsumerStage<T> { StopWatch stopWatch = new StopWatch(); stopWatch.start(); for (T cachedElement : this.cachedObjects) { - this.send(this.outputPort, cachedElement); + outputPort.send(cachedElement); } stopWatch.end(); this.logger.debug("Emitting took " + TimeUnit.NANOSECONDS.toMillis(stopWatch.getDurationInNs()) + " ms"); diff --git a/src/main/java/teetime/stage/CipherByteArray.java b/src/main/java/teetime/stage/CipherByteArray.java index 62d8db9907bef8f6e7fe470358f04fcf7a55c910..f91682f169581d321f96ba6b96292638698546c6 100644 --- a/src/main/java/teetime/stage/CipherByteArray.java +++ b/src/main/java/teetime/stage/CipherByteArray.java @@ -75,7 +75,7 @@ public class CipherByteArray extends AbstractConsumerStage<byte[]> { e.printStackTrace(); } - this.send(this.outputPort, output); + outputPort.send(output); } public OutputPort<? extends byte[]> getOutputPort() { diff --git a/src/main/java/teetime/stage/Clock.java b/src/main/java/teetime/stage/Clock.java index a536bc90803d931da45d499a864a422d4f286bd9..96a9544184231de7248151b896d6c5a9ead78750 100644 --- a/src/main/java/teetime/stage/Clock.java +++ b/src/main/java/teetime/stage/Clock.java @@ -25,7 +25,7 @@ public class Clock extends AbstractProducerStage<Long> { } // this.logger.debug("Emitting timestamp"); - this.send(this.outputPort, this.getCurrentTimeInNs()); + outputPort.send(this.getCurrentTimeInNs()); } private void sleep(final long delayInMs) { diff --git a/src/main/java/teetime/stage/Counter.java b/src/main/java/teetime/stage/Counter.java index 1dc0ccd2ec6e0d64f95f5811aad16dcabe81bd47..10632de710b00dbe9d12d3a701feb09fd69cd3d0 100644 --- a/src/main/java/teetime/stage/Counter.java +++ b/src/main/java/teetime/stage/Counter.java @@ -13,7 +13,7 @@ public class Counter<T> extends AbstractConsumerStage<T> { protected void execute(final T element) { this.numElementsPassed++; // this.logger.debug("count: " + this.numElementsPassed); - this.send(this.outputPort, element); + outputPort.send(element); } // BETTER find a solution w/o any thread-safe code in this stage diff --git a/src/main/java/teetime/stage/ElementDelayMeasuringStage.java b/src/main/java/teetime/stage/ElementDelayMeasuringStage.java index 16ae63749b53654135f4613f66e4b9e1213134cd..5ba5719466bbd1e1a06bf7c4aa97ca8b11e0e622 100644 --- a/src/main/java/teetime/stage/ElementDelayMeasuringStage.java +++ b/src/main/java/teetime/stage/ElementDelayMeasuringStage.java @@ -25,7 +25,7 @@ public class ElementDelayMeasuringStage<T> extends AbstractConsumerStage<T> { } this.numPassedElements++; - this.send(this.outputPort, element); + outputPort.send(element); } @Override diff --git a/src/main/java/teetime/stage/ElementThroughputMeasuringStage.java b/src/main/java/teetime/stage/ElementThroughputMeasuringStage.java index de5958b14cea672b878afd2424e72404284cc036..fe3821c7e2b6bc5dfd83aa5e452a635f68f6cb83 100644 --- a/src/main/java/teetime/stage/ElementThroughputMeasuringStage.java +++ b/src/main/java/teetime/stage/ElementThroughputMeasuringStage.java @@ -26,7 +26,7 @@ public class ElementThroughputMeasuringStage<T> extends AbstractConsumerStage<T> } this.numPassedElements++; - this.send(this.outputPort, element); + outputPort.send(element); } @Override diff --git a/src/main/java/teetime/stage/FileExtensionSwitch.java b/src/main/java/teetime/stage/FileExtensionSwitch.java index 1ab666f26b1156c94d66ca6bd75072080d51549a..b58f2e9017e67094feffcfa075215a66a49bf86d 100644 --- a/src/main/java/teetime/stage/FileExtensionSwitch.java +++ b/src/main/java/teetime/stage/FileExtensionSwitch.java @@ -18,9 +18,7 @@ public class FileExtensionSwitch extends AbstractConsumerStage<File> { String fileExtension = Files.getFileExtension(file.getAbsolutePath()); this.logger.debug("fileExtension: " + fileExtension); OutputPort<File> outputPort = this.fileExtensions.get(fileExtension); - if (outputPort != null) { - this.send(outputPort, file); - } + outputPort.send(file); } public OutputPort<File> addFileExtension(String fileExtension) { diff --git a/src/main/java/teetime/stage/InitialElementProducer.java b/src/main/java/teetime/stage/InitialElementProducer.java index 1b167e481ee5ec46a2a7b768f3be705c268ea768..12af6bf098280e0b1d1d6ffd88e30e3eadec7df9 100644 --- a/src/main/java/teetime/stage/InitialElementProducer.java +++ b/src/main/java/teetime/stage/InitialElementProducer.java @@ -13,7 +13,7 @@ public class InitialElementProducer<T> extends AbstractProducerStage<T> { @Override protected void execute() { for (T e : this.elements) { - this.send(this.outputPort, e); + outputPort.send(e); } this.terminate(); } diff --git a/src/main/java/teetime/stage/InstanceCounter.java b/src/main/java/teetime/stage/InstanceCounter.java index 9db60208c598f41881a99adf38134c07479a9585..7b5b4a0d88402a7966a1d75b5f852afeb0d9c40f 100644 --- a/src/main/java/teetime/stage/InstanceCounter.java +++ b/src/main/java/teetime/stage/InstanceCounter.java @@ -20,7 +20,7 @@ public class InstanceCounter<T, C extends T> extends AbstractConsumerStage<T> { this.counter++; } - this.send(this.outputPort, element); + outputPort.send(element); } public int getCounter() { diff --git a/src/main/java/teetime/stage/InstanceOfFilter.java b/src/main/java/teetime/stage/InstanceOfFilter.java index d2e381c078cdc427b62a3ba3803316b9883616e2..a32b6963a2bd2b7a7f084489a237c123c7692ff5 100644 --- a/src/main/java/teetime/stage/InstanceOfFilter.java +++ b/src/main/java/teetime/stage/InstanceOfFilter.java @@ -21,7 +21,7 @@ public class InstanceOfFilter<I, O> extends AbstractConsumerStage<I> { @Override protected void execute(final I element) { if (this.type.isInstance(element)) { - this.send(this.outputPort, (O) element); + outputPort.send((O) element); } else { // swallow up the element if (this.logger.isDebugEnabled()) { this.logger.info("element is not an instance of " + this.type.getName() + ", but of " + element.getClass()); diff --git a/src/main/java/teetime/stage/IterableProducer.java b/src/main/java/teetime/stage/IterableProducer.java index 398d6e6158107205278bd1c69c0b127459525afe..f83f1201e9951f92e2f0d34050660b8580647a8a 100644 --- a/src/main/java/teetime/stage/IterableProducer.java +++ b/src/main/java/teetime/stage/IterableProducer.java @@ -13,7 +13,7 @@ public class IterableProducer<O extends Iterable<T>, T> extends AbstractProducer @Override protected void execute() { for (T i : iter) { - this.send(this.outputPort, i); + outputPort.send(i); } } diff --git a/src/main/java/teetime/stage/NoopFilter.java b/src/main/java/teetime/stage/NoopFilter.java index a023df02b285871aeccc90da87716db49feb1a29..f42247ffa82c604d8bb3882796020c43f360e9c0 100644 --- a/src/main/java/teetime/stage/NoopFilter.java +++ b/src/main/java/teetime/stage/NoopFilter.java @@ -29,7 +29,7 @@ public class NoopFilter<T> extends AbstractConsumerStage<T> { @Override protected void execute(final T element) { - this.send(this.outputPort, element); + outputPort.send(element); } public OutputPort<T> getOutputPort() { diff --git a/src/main/java/teetime/stage/ObjectProducer.java b/src/main/java/teetime/stage/ObjectProducer.java index 7b9b76d575e9cb82c8a92e5ca07ecbb926b468b4..63b4a9c41049aa0a925a5e87c733779de48a6ae3 100644 --- a/src/main/java/teetime/stage/ObjectProducer.java +++ b/src/main/java/teetime/stage/ObjectProducer.java @@ -57,7 +57,7 @@ public class ObjectProducer<T> extends AbstractProducerStage<T> { T newObject = this.inputObjectCreator.create(); this.numInputObjects--; - this.send(this.outputPort, newObject); + outputPort.send(newObject); if (this.numInputObjects == 0) { this.terminate(); diff --git a/src/main/java/teetime/stage/Relay.java b/src/main/java/teetime/stage/Relay.java index e42808be295ba2541d658cedd67c99d04312b8ef..08d2ac811e29cbc8f4c1ac9dcc70e6db06deba4a 100644 --- a/src/main/java/teetime/stage/Relay.java +++ b/src/main/java/teetime/stage/Relay.java @@ -21,7 +21,7 @@ public class Relay<T> extends AbstractProducerStage<T> { Thread.yield(); return; } - this.send(this.outputPort, element); + outputPort.send(element); } @Override diff --git a/src/main/java/teetime/stage/StartTimestampFilter.java b/src/main/java/teetime/stage/StartTimestampFilter.java index 4f5b50eaa14c4fb98ad8a494c41d6a558f82f59b..67a0cb5d96db0af47e921630a8224bbe6d1f1113 100644 --- a/src/main/java/teetime/stage/StartTimestampFilter.java +++ b/src/main/java/teetime/stage/StartTimestampFilter.java @@ -31,7 +31,7 @@ public class StartTimestampFilter extends AbstractConsumerStage<TimestampObject> @Override protected void execute(final TimestampObject element) { element.setStartTimestamp(System.nanoTime()); - this.send(this.outputPort, element); + outputPort.send(element); } public OutputPort<TimestampObject> getOutputPort() { diff --git a/src/main/java/teetime/stage/StopTimestampFilter.java b/src/main/java/teetime/stage/StopTimestampFilter.java index 49385bf2e23924c8cea16d268d78e8eedf8fdfeb..03eb05732a2af30676bb7c9d2220c30d264c2997 100644 --- a/src/main/java/teetime/stage/StopTimestampFilter.java +++ b/src/main/java/teetime/stage/StopTimestampFilter.java @@ -31,7 +31,7 @@ public class StopTimestampFilter extends AbstractConsumerStage<TimestampObject> @Override protected void execute(final TimestampObject element) { element.setStopTimestamp(System.nanoTime()); - this.send(this.outputPort, element); + outputPort.send(element); } public OutputPort<TimestampObject> getOutputPort() { diff --git a/src/main/java/teetime/stage/Tokenizer.java b/src/main/java/teetime/stage/Tokenizer.java index 96064e6e0f5009bd6512c13714e86390675af7fc..2cbb80b09990fbfe71b1d9c0ea1aa30c0b82b233 100644 --- a/src/main/java/teetime/stage/Tokenizer.java +++ b/src/main/java/teetime/stage/Tokenizer.java @@ -18,7 +18,7 @@ public class Tokenizer extends AbstractConsumerStage<String> { protected void execute(final String element) { StringTokenizer st = new StringTokenizer(element, this.regex); while (st.hasMoreTokens()) { - this.send(this.outputPort, st.nextToken()); + outputPort.send(st.nextToken()); } } diff --git a/src/main/java/teetime/stage/ZipByteArray.java b/src/main/java/teetime/stage/ZipByteArray.java index 36dc9dbef3e3db0328e9b77f07a4230f305b8ca4..ea3d3d9d3d030caa5cc2c3fd08a0c1fdb09ed2c5 100644 --- a/src/main/java/teetime/stage/ZipByteArray.java +++ b/src/main/java/teetime/stage/ZipByteArray.java @@ -40,7 +40,7 @@ public class ZipByteArray extends AbstractConsumerStage<byte[]> { } catch (Exception e) { e.printStackTrace(); } - this.send(this.outputPort, cache); + outputPort.send(cache); } private byte[] compress(final byte[] data) throws IOException { diff --git a/src/main/java/teetime/stage/basic/Delay.java b/src/main/java/teetime/stage/basic/Delay.java index 4682ba64fe7a9afa2b6258fc132a090c0ae1c461..d2cf63b4a2225c5cd204abfeffaf2c37a8b7efed 100644 --- a/src/main/java/teetime/stage/basic/Delay.java +++ b/src/main/java/teetime/stage/basic/Delay.java @@ -29,7 +29,7 @@ public class Delay<T> extends AbstractStage { while (!bufferedElements.isEmpty()) { element = bufferedElements.remove(0); - this.send(this.outputPort, element); + outputPort.send(element); } } diff --git a/src/main/java/teetime/stage/basic/merger/Merger.java b/src/main/java/teetime/stage/basic/merger/Merger.java index bde21d7f9c3028ca6fa093c4aab7e2f93ca67735..92d370468f7b515f507d36cf1724e868d032a7a9 100644 --- a/src/main/java/teetime/stage/basic/merger/Merger.java +++ b/src/main/java/teetime/stage/basic/merger/Merger.java @@ -53,7 +53,7 @@ public class Merger<T> extends AbstractStage { return; } - this.send(this.outputPort, token); + outputPort.send(token); } /** diff --git a/src/main/java/teetime/stage/io/Directory2FilesFilter.java b/src/main/java/teetime/stage/io/Directory2FilesFilter.java index 426cad8c76d5c88ee51233c09bf0c39668c76580..4fe0d8e3c29ad159df974edbe152b771bd0a74b1 100644 --- a/src/main/java/teetime/stage/io/Directory2FilesFilter.java +++ b/src/main/java/teetime/stage/io/Directory2FilesFilter.java @@ -79,7 +79,7 @@ public class Directory2FilesFilter extends AbstractConsumerStage<File> { } for (final File file : inputFiles) { - this.send(this.outputPort, file); + outputPort.send(file); } } diff --git a/src/main/java/teetime/stage/io/File2ByteArray.java b/src/main/java/teetime/stage/io/File2ByteArray.java index 17509009179ea1a7a93ef52caddcc1265d1be8cd..1319b4fb6959e38513ba3c8e101174fa708f6f41 100644 --- a/src/main/java/teetime/stage/io/File2ByteArray.java +++ b/src/main/java/teetime/stage/io/File2ByteArray.java @@ -16,7 +16,7 @@ public class File2ByteArray extends AbstractConsumerStage<File> { protected void execute(final File element) { try { byte[] cache = Files.toByteArray(element); - this.send(this.outputPort, cache); + outputPort.send(cache); } catch (IOException e) { throw new IllegalStateException(e); } diff --git a/src/main/java/teetime/stage/io/File2TextLinesFilter.java b/src/main/java/teetime/stage/io/File2TextLinesFilter.java index 7acd02c9ab69a107f4b7c2d687bf501bb65ca157..329d759408ca055fb35c608d05afe9a6746bb617 100644 --- a/src/main/java/teetime/stage/io/File2TextLinesFilter.java +++ b/src/main/java/teetime/stage/io/File2TextLinesFilter.java @@ -46,7 +46,7 @@ public class File2TextLinesFilter extends AbstractConsumerStage<File> { while ((line = reader.readLine()) != null) { line = line.trim(); if (line.length() != 0) { - this.send(this.outputPort, new TextLine(textFile, line)); + outputPort.send(new TextLine(textFile, line)); } // else: ignore empty line } } catch (final FileNotFoundException e) { diff --git a/src/main/java/teetime/stage/stringBuffer/StringBufferFilter.java b/src/main/java/teetime/stage/stringBuffer/StringBufferFilter.java index 73a8e99c03d1e978a1a76df9851b379304872d70..24d030f52644fabe78d4bcfb9a3b29b27da1797c 100644 --- a/src/main/java/teetime/stage/stringBuffer/StringBufferFilter.java +++ b/src/main/java/teetime/stage/stringBuffer/StringBufferFilter.java @@ -40,7 +40,7 @@ public class StringBufferFilter<T> extends AbstractConsumerStage<T> { @Override protected void execute(final T element) { final T returnedElement = this.handle(element); - this.send(this.outputPort, returnedElement); + outputPort.send(returnedElement); } @Override diff --git a/src/performancetest/java/teetime/examples/loopStage/Countdown.java b/src/performancetest/java/teetime/examples/loopStage/Countdown.java index bf045270ad614e6973274164cabc78ef11aa68c9..915837b5d69e59d137ae2186012ffb9f4fcc1e72 100644 --- a/src/performancetest/java/teetime/examples/loopStage/Countdown.java +++ b/src/performancetest/java/teetime/examples/loopStage/Countdown.java @@ -1,8 +1,8 @@ package teetime.examples.loopStage; +import teetime.framework.AbstractProducerStage; import teetime.framework.InputPort; import teetime.framework.OutputPort; -import teetime.framework.AbstractProducerStage; public class Countdown extends AbstractProducerStage<Void> { @@ -26,10 +26,10 @@ public class Countdown extends AbstractProducerStage<Void> { protected void execute() { Integer countdown = this.countdownInputPort.receive(); if (countdown == 0) { - this.send(this.outputPort, null); + outputPort.send(null); this.terminate(); } else { - this.send(this.newCountdownOutputPort, --countdown); + newCountdownOutputPort.send(--countdown); } }