From 852843758108b8d93ef0d1ddfaf1f354bdec3723 Mon Sep 17 00:00:00 2001 From: Christian Wulf <chw@informatik.uni-kiel.de> Date: Fri, 5 Dec 2014 15:14:40 +0100 Subject: [PATCH] send(outputPort, element) -> outputPort.send(element) --- .../java/teetime/framework/AbstractStage.java | 17 ----------------- src/main/java/teetime/framework/OutputPort.java | 10 +++++----- .../java/teetime/stage/ByteArray2String.java | 2 +- src/main/java/teetime/stage/Cache.java | 2 +- .../java/teetime/stage/CipherByteArray.java | 2 +- src/main/java/teetime/stage/Clock.java | 2 +- src/main/java/teetime/stage/Counter.java | 2 +- .../stage/ElementDelayMeasuringStage.java | 2 +- .../stage/ElementThroughputMeasuringStage.java | 2 +- .../java/teetime/stage/FileExtensionSwitch.java | 4 +--- .../teetime/stage/InitialElementProducer.java | 2 +- .../java/teetime/stage/InstanceCounter.java | 2 +- .../java/teetime/stage/InstanceOfFilter.java | 2 +- .../java/teetime/stage/IterableProducer.java | 2 +- src/main/java/teetime/stage/NoopFilter.java | 2 +- src/main/java/teetime/stage/ObjectProducer.java | 2 +- src/main/java/teetime/stage/Relay.java | 2 +- .../teetime/stage/StartTimestampFilter.java | 2 +- .../java/teetime/stage/StopTimestampFilter.java | 2 +- src/main/java/teetime/stage/Tokenizer.java | 2 +- src/main/java/teetime/stage/ZipByteArray.java | 2 +- src/main/java/teetime/stage/basic/Delay.java | 2 +- .../java/teetime/stage/basic/merger/Merger.java | 2 +- .../teetime/stage/io/Directory2FilesFilter.java | 2 +- .../java/teetime/stage/io/File2ByteArray.java | 2 +- .../teetime/stage/io/File2TextLinesFilter.java | 2 +- .../stage/stringBuffer/StringBufferFilter.java | 2 +- .../teetime/examples/loopStage/Countdown.java | 6 +++--- 28 files changed, 33 insertions(+), 52 deletions(-) diff --git a/src/main/java/teetime/framework/AbstractStage.java b/src/main/java/teetime/framework/AbstractStage.java index b56eb2ae..b541b0aa 100644 --- a/src/main/java/teetime/framework/AbstractStage.java +++ b/src/main/java/teetime/framework/AbstractStage.java @@ -40,23 +40,6 @@ public abstract class AbstractStage extends Stage { this.logger = LoggerFactory.getLogger(this.getClass().getName() + "(" + this.id + ")"); } - /** - * Sends the given <code>element</code> using the default output port - * - * @param element - * @return <code>true</code> iff the given element could be sent, <code>false</code> otherwise (then use a re-try strategy) - */ - protected final <O> boolean send(final OutputPort<O> outputPort, final O element) { - if (!outputPort.send(element)) { - return false; - } - - outputPort.reportNewElement(); - - return true; - // return outputPort.send(element); - } - private void connectUnconnectedOutputPorts() { for (OutputPort<?> outputPort : this.cachedOutputPorts) { if (null == outputPort.getPipe()) { // if port is unconnected diff --git a/src/main/java/teetime/framework/OutputPort.java b/src/main/java/teetime/framework/OutputPort.java index 45a8de63..588669bb 100644 --- a/src/main/java/teetime/framework/OutputPort.java +++ b/src/main/java/teetime/framework/OutputPort.java @@ -14,15 +14,15 @@ public final class OutputPort<T> extends AbstractPort<T> { * @return <code>true</code> iff the given <code>element</code> could be sent, <code>false</code> otherwise (then use a re-try strategy) */ public boolean send(final T element) { - return this.pipe.add(element); + boolean added = this.pipe.add(element); + if (added) { + this.pipe.reportNewElement(); + } + return added; } public void sendSignal(final ISignal signal) { this.pipe.sendSignal(signal); } - public void reportNewElement() { - this.pipe.reportNewElement(); - } - } diff --git a/src/main/java/teetime/stage/ByteArray2String.java b/src/main/java/teetime/stage/ByteArray2String.java index eb27f89b..aca5a438 100644 --- a/src/main/java/teetime/stage/ByteArray2String.java +++ b/src/main/java/teetime/stage/ByteArray2String.java @@ -11,7 +11,7 @@ public class ByteArray2String extends AbstractConsumerStage<byte[]> { @Override protected void execute(final byte[] element) { - this.send(this.outputPort, new String(element, Charset.forName("UTF-8"))); + outputPort.send(new String(element, Charset.forName("UTF-8"))); } public OutputPort<? extends String> getOutputPort() { diff --git a/src/main/java/teetime/stage/Cache.java b/src/main/java/teetime/stage/Cache.java index df63a623..a5991589 100644 --- a/src/main/java/teetime/stage/Cache.java +++ b/src/main/java/teetime/stage/Cache.java @@ -25,7 +25,7 @@ public class Cache<T> extends AbstractConsumerStage<T> { StopWatch stopWatch = new StopWatch(); stopWatch.start(); for (T cachedElement : this.cachedObjects) { - this.send(this.outputPort, cachedElement); + outputPort.send(cachedElement); } stopWatch.end(); this.logger.debug("Emitting took " + TimeUnit.NANOSECONDS.toMillis(stopWatch.getDurationInNs()) + " ms"); diff --git a/src/main/java/teetime/stage/CipherByteArray.java b/src/main/java/teetime/stage/CipherByteArray.java index 62d8db99..f91682f1 100644 --- a/src/main/java/teetime/stage/CipherByteArray.java +++ b/src/main/java/teetime/stage/CipherByteArray.java @@ -75,7 +75,7 @@ public class CipherByteArray extends AbstractConsumerStage<byte[]> { e.printStackTrace(); } - this.send(this.outputPort, output); + outputPort.send(output); } public OutputPort<? extends byte[]> getOutputPort() { diff --git a/src/main/java/teetime/stage/Clock.java b/src/main/java/teetime/stage/Clock.java index a536bc90..96a95441 100644 --- a/src/main/java/teetime/stage/Clock.java +++ b/src/main/java/teetime/stage/Clock.java @@ -25,7 +25,7 @@ public class Clock extends AbstractProducerStage<Long> { } // this.logger.debug("Emitting timestamp"); - this.send(this.outputPort, this.getCurrentTimeInNs()); + outputPort.send(this.getCurrentTimeInNs()); } private void sleep(final long delayInMs) { diff --git a/src/main/java/teetime/stage/Counter.java b/src/main/java/teetime/stage/Counter.java index 1dc0ccd2..10632de7 100644 --- a/src/main/java/teetime/stage/Counter.java +++ b/src/main/java/teetime/stage/Counter.java @@ -13,7 +13,7 @@ public class Counter<T> extends AbstractConsumerStage<T> { protected void execute(final T element) { this.numElementsPassed++; // this.logger.debug("count: " + this.numElementsPassed); - this.send(this.outputPort, element); + outputPort.send(element); } // BETTER find a solution w/o any thread-safe code in this stage diff --git a/src/main/java/teetime/stage/ElementDelayMeasuringStage.java b/src/main/java/teetime/stage/ElementDelayMeasuringStage.java index 16ae6374..5ba57194 100644 --- a/src/main/java/teetime/stage/ElementDelayMeasuringStage.java +++ b/src/main/java/teetime/stage/ElementDelayMeasuringStage.java @@ -25,7 +25,7 @@ public class ElementDelayMeasuringStage<T> extends AbstractConsumerStage<T> { } this.numPassedElements++; - this.send(this.outputPort, element); + outputPort.send(element); } @Override diff --git a/src/main/java/teetime/stage/ElementThroughputMeasuringStage.java b/src/main/java/teetime/stage/ElementThroughputMeasuringStage.java index de5958b1..fe3821c7 100644 --- a/src/main/java/teetime/stage/ElementThroughputMeasuringStage.java +++ b/src/main/java/teetime/stage/ElementThroughputMeasuringStage.java @@ -26,7 +26,7 @@ public class ElementThroughputMeasuringStage<T> extends AbstractConsumerStage<T> } this.numPassedElements++; - this.send(this.outputPort, element); + outputPort.send(element); } @Override diff --git a/src/main/java/teetime/stage/FileExtensionSwitch.java b/src/main/java/teetime/stage/FileExtensionSwitch.java index 1ab666f2..b58f2e90 100644 --- a/src/main/java/teetime/stage/FileExtensionSwitch.java +++ b/src/main/java/teetime/stage/FileExtensionSwitch.java @@ -18,9 +18,7 @@ public class FileExtensionSwitch extends AbstractConsumerStage<File> { String fileExtension = Files.getFileExtension(file.getAbsolutePath()); this.logger.debug("fileExtension: " + fileExtension); OutputPort<File> outputPort = this.fileExtensions.get(fileExtension); - if (outputPort != null) { - this.send(outputPort, file); - } + outputPort.send(file); } public OutputPort<File> addFileExtension(String fileExtension) { diff --git a/src/main/java/teetime/stage/InitialElementProducer.java b/src/main/java/teetime/stage/InitialElementProducer.java index 1b167e48..12af6bf0 100644 --- a/src/main/java/teetime/stage/InitialElementProducer.java +++ b/src/main/java/teetime/stage/InitialElementProducer.java @@ -13,7 +13,7 @@ public class InitialElementProducer<T> extends AbstractProducerStage<T> { @Override protected void execute() { for (T e : this.elements) { - this.send(this.outputPort, e); + outputPort.send(e); } this.terminate(); } diff --git a/src/main/java/teetime/stage/InstanceCounter.java b/src/main/java/teetime/stage/InstanceCounter.java index 9db60208..7b5b4a0d 100644 --- a/src/main/java/teetime/stage/InstanceCounter.java +++ b/src/main/java/teetime/stage/InstanceCounter.java @@ -20,7 +20,7 @@ public class InstanceCounter<T, C extends T> extends AbstractConsumerStage<T> { this.counter++; } - this.send(this.outputPort, element); + outputPort.send(element); } public int getCounter() { diff --git a/src/main/java/teetime/stage/InstanceOfFilter.java b/src/main/java/teetime/stage/InstanceOfFilter.java index d2e381c0..a32b6963 100644 --- a/src/main/java/teetime/stage/InstanceOfFilter.java +++ b/src/main/java/teetime/stage/InstanceOfFilter.java @@ -21,7 +21,7 @@ public class InstanceOfFilter<I, O> extends AbstractConsumerStage<I> { @Override protected void execute(final I element) { if (this.type.isInstance(element)) { - this.send(this.outputPort, (O) element); + outputPort.send((O) element); } else { // swallow up the element if (this.logger.isDebugEnabled()) { this.logger.info("element is not an instance of " + this.type.getName() + ", but of " + element.getClass()); diff --git a/src/main/java/teetime/stage/IterableProducer.java b/src/main/java/teetime/stage/IterableProducer.java index 398d6e61..f83f1201 100644 --- a/src/main/java/teetime/stage/IterableProducer.java +++ b/src/main/java/teetime/stage/IterableProducer.java @@ -13,7 +13,7 @@ public class IterableProducer<O extends Iterable<T>, T> extends AbstractProducer @Override protected void execute() { for (T i : iter) { - this.send(this.outputPort, i); + outputPort.send(i); } } diff --git a/src/main/java/teetime/stage/NoopFilter.java b/src/main/java/teetime/stage/NoopFilter.java index a023df02..f42247ff 100644 --- a/src/main/java/teetime/stage/NoopFilter.java +++ b/src/main/java/teetime/stage/NoopFilter.java @@ -29,7 +29,7 @@ public class NoopFilter<T> extends AbstractConsumerStage<T> { @Override protected void execute(final T element) { - this.send(this.outputPort, element); + outputPort.send(element); } public OutputPort<T> getOutputPort() { diff --git a/src/main/java/teetime/stage/ObjectProducer.java b/src/main/java/teetime/stage/ObjectProducer.java index 7b9b76d5..63b4a9c4 100644 --- a/src/main/java/teetime/stage/ObjectProducer.java +++ b/src/main/java/teetime/stage/ObjectProducer.java @@ -57,7 +57,7 @@ public class ObjectProducer<T> extends AbstractProducerStage<T> { T newObject = this.inputObjectCreator.create(); this.numInputObjects--; - this.send(this.outputPort, newObject); + outputPort.send(newObject); if (this.numInputObjects == 0) { this.terminate(); diff --git a/src/main/java/teetime/stage/Relay.java b/src/main/java/teetime/stage/Relay.java index e42808be..08d2ac81 100644 --- a/src/main/java/teetime/stage/Relay.java +++ b/src/main/java/teetime/stage/Relay.java @@ -21,7 +21,7 @@ public class Relay<T> extends AbstractProducerStage<T> { Thread.yield(); return; } - this.send(this.outputPort, element); + outputPort.send(element); } @Override diff --git a/src/main/java/teetime/stage/StartTimestampFilter.java b/src/main/java/teetime/stage/StartTimestampFilter.java index 4f5b50ea..67a0cb5d 100644 --- a/src/main/java/teetime/stage/StartTimestampFilter.java +++ b/src/main/java/teetime/stage/StartTimestampFilter.java @@ -31,7 +31,7 @@ public class StartTimestampFilter extends AbstractConsumerStage<TimestampObject> @Override protected void execute(final TimestampObject element) { element.setStartTimestamp(System.nanoTime()); - this.send(this.outputPort, element); + outputPort.send(element); } public OutputPort<TimestampObject> getOutputPort() { diff --git a/src/main/java/teetime/stage/StopTimestampFilter.java b/src/main/java/teetime/stage/StopTimestampFilter.java index 49385bf2..03eb0573 100644 --- a/src/main/java/teetime/stage/StopTimestampFilter.java +++ b/src/main/java/teetime/stage/StopTimestampFilter.java @@ -31,7 +31,7 @@ public class StopTimestampFilter extends AbstractConsumerStage<TimestampObject> @Override protected void execute(final TimestampObject element) { element.setStopTimestamp(System.nanoTime()); - this.send(this.outputPort, element); + outputPort.send(element); } public OutputPort<TimestampObject> getOutputPort() { diff --git a/src/main/java/teetime/stage/Tokenizer.java b/src/main/java/teetime/stage/Tokenizer.java index 96064e6e..2cbb80b0 100644 --- a/src/main/java/teetime/stage/Tokenizer.java +++ b/src/main/java/teetime/stage/Tokenizer.java @@ -18,7 +18,7 @@ public class Tokenizer extends AbstractConsumerStage<String> { protected void execute(final String element) { StringTokenizer st = new StringTokenizer(element, this.regex); while (st.hasMoreTokens()) { - this.send(this.outputPort, st.nextToken()); + outputPort.send(st.nextToken()); } } diff --git a/src/main/java/teetime/stage/ZipByteArray.java b/src/main/java/teetime/stage/ZipByteArray.java index 36dc9dbe..ea3d3d9d 100644 --- a/src/main/java/teetime/stage/ZipByteArray.java +++ b/src/main/java/teetime/stage/ZipByteArray.java @@ -40,7 +40,7 @@ public class ZipByteArray extends AbstractConsumerStage<byte[]> { } catch (Exception e) { e.printStackTrace(); } - this.send(this.outputPort, cache); + outputPort.send(cache); } private byte[] compress(final byte[] data) throws IOException { diff --git a/src/main/java/teetime/stage/basic/Delay.java b/src/main/java/teetime/stage/basic/Delay.java index 4682ba64..d2cf63b4 100644 --- a/src/main/java/teetime/stage/basic/Delay.java +++ b/src/main/java/teetime/stage/basic/Delay.java @@ -29,7 +29,7 @@ public class Delay<T> extends AbstractStage { while (!bufferedElements.isEmpty()) { element = bufferedElements.remove(0); - this.send(this.outputPort, element); + outputPort.send(element); } } diff --git a/src/main/java/teetime/stage/basic/merger/Merger.java b/src/main/java/teetime/stage/basic/merger/Merger.java index bde21d7f..92d37046 100644 --- a/src/main/java/teetime/stage/basic/merger/Merger.java +++ b/src/main/java/teetime/stage/basic/merger/Merger.java @@ -53,7 +53,7 @@ public class Merger<T> extends AbstractStage { return; } - this.send(this.outputPort, token); + outputPort.send(token); } /** diff --git a/src/main/java/teetime/stage/io/Directory2FilesFilter.java b/src/main/java/teetime/stage/io/Directory2FilesFilter.java index 426cad8c..4fe0d8e3 100644 --- a/src/main/java/teetime/stage/io/Directory2FilesFilter.java +++ b/src/main/java/teetime/stage/io/Directory2FilesFilter.java @@ -79,7 +79,7 @@ public class Directory2FilesFilter extends AbstractConsumerStage<File> { } for (final File file : inputFiles) { - this.send(this.outputPort, file); + outputPort.send(file); } } diff --git a/src/main/java/teetime/stage/io/File2ByteArray.java b/src/main/java/teetime/stage/io/File2ByteArray.java index 17509009..1319b4fb 100644 --- a/src/main/java/teetime/stage/io/File2ByteArray.java +++ b/src/main/java/teetime/stage/io/File2ByteArray.java @@ -16,7 +16,7 @@ public class File2ByteArray extends AbstractConsumerStage<File> { protected void execute(final File element) { try { byte[] cache = Files.toByteArray(element); - this.send(this.outputPort, cache); + outputPort.send(cache); } catch (IOException e) { throw new IllegalStateException(e); } diff --git a/src/main/java/teetime/stage/io/File2TextLinesFilter.java b/src/main/java/teetime/stage/io/File2TextLinesFilter.java index 7acd02c9..329d7594 100644 --- a/src/main/java/teetime/stage/io/File2TextLinesFilter.java +++ b/src/main/java/teetime/stage/io/File2TextLinesFilter.java @@ -46,7 +46,7 @@ public class File2TextLinesFilter extends AbstractConsumerStage<File> { while ((line = reader.readLine()) != null) { line = line.trim(); if (line.length() != 0) { - this.send(this.outputPort, new TextLine(textFile, line)); + outputPort.send(new TextLine(textFile, line)); } // else: ignore empty line } } catch (final FileNotFoundException e) { diff --git a/src/main/java/teetime/stage/stringBuffer/StringBufferFilter.java b/src/main/java/teetime/stage/stringBuffer/StringBufferFilter.java index 73a8e99c..24d030f5 100644 --- a/src/main/java/teetime/stage/stringBuffer/StringBufferFilter.java +++ b/src/main/java/teetime/stage/stringBuffer/StringBufferFilter.java @@ -40,7 +40,7 @@ public class StringBufferFilter<T> extends AbstractConsumerStage<T> { @Override protected void execute(final T element) { final T returnedElement = this.handle(element); - this.send(this.outputPort, returnedElement); + outputPort.send(returnedElement); } @Override diff --git a/src/performancetest/java/teetime/examples/loopStage/Countdown.java b/src/performancetest/java/teetime/examples/loopStage/Countdown.java index bf045270..915837b5 100644 --- a/src/performancetest/java/teetime/examples/loopStage/Countdown.java +++ b/src/performancetest/java/teetime/examples/loopStage/Countdown.java @@ -1,8 +1,8 @@ package teetime.examples.loopStage; +import teetime.framework.AbstractProducerStage; import teetime.framework.InputPort; import teetime.framework.OutputPort; -import teetime.framework.AbstractProducerStage; public class Countdown extends AbstractProducerStage<Void> { @@ -26,10 +26,10 @@ public class Countdown extends AbstractProducerStage<Void> { protected void execute() { Integer countdown = this.countdownInputPort.receive(); if (countdown == 0) { - this.send(this.outputPort, null); + outputPort.send(null); this.terminate(); } else { - this.send(this.newCountdownOutputPort, --countdown); + newCountdownOutputPort.send(--countdown); } } -- GitLab