diff --git a/.settings/edu.umd.cs.findbugs.core.prefs b/.settings/edu.umd.cs.findbugs.core.prefs index 3c0de6d05abaa461f39153b10696cb45f2d7e01e..233d5c8cea4349a1b5cc17963d6484ba4084aa22 100644 --- a/.settings/edu.umd.cs.findbugs.core.prefs +++ b/.settings/edu.umd.cs.findbugs.core.prefs @@ -1,5 +1,5 @@ #FindBugs User Preferences -#Wed Mar 18 10:50:24 CET 2015 +#Thu Mar 19 12:32:13 CET 2015 detector_threshold=3 effort=max excludefilter0=.fbExcludeFilterFile|true diff --git a/src/main/java/teetime/framework/OutputPort.java b/src/main/java/teetime/framework/OutputPort.java index 99204663c1badcb60bb58f6e011c844d7052db16..50a54d6a4d2af89c013de029dfc04b71ee242142 100644 --- a/src/main/java/teetime/framework/OutputPort.java +++ b/src/main/java/teetime/framework/OutputPort.java @@ -25,11 +25,14 @@ public final class OutputPort<T> extends AbstractPort<T> { } /** + * * @param element * to be sent; May not be <code>null</code>. + * + * @return <code>true</code> iff the <code>element</code> was sent; <code>false</code> otherwise. */ - public void send(final T element) { - this.pipe.add(element); + public boolean send(final T element) { + return this.pipe.add(element); } /** diff --git a/src/main/java/teetime/framework/pipe/SpScPipe.java b/src/main/java/teetime/framework/pipe/SpScPipe.java index c1bb069433ea68ea34034ebdc7f0cae211b9390f..b4d5d0a5f74c896f260715dda516b8c5017ccd3b 100644 --- a/src/main/java/teetime/framework/pipe/SpScPipe.java +++ b/src/main/java/teetime/framework/pipe/SpScPipe.java @@ -47,12 +47,18 @@ public final class SpScPipe extends AbstractInterThreadPipe { @Override public boolean add(final Object element) { // BETTER introduce a QueueIsFullStrategy - while (!this.queue.offer(element)) { - this.numWaits++; - Thread.yield(); - } + // while (!this.queue.offer(element)) { + // this.numWaits++; + // // Thread.yield(); + // try { + // Thread.sleep(0); + // } catch (InterruptedException e) { + // // TODO Auto-generated catch block + // e.printStackTrace(); + // } + // } // this.reportNewElement(); - return true; + return this.queue.offer(element); } @Override diff --git a/src/main/java/teetime/stage/basic/distributor/RoundRobinStrategy2.java b/src/main/java/teetime/stage/basic/distributor/RoundRobinStrategy2.java new file mode 100644 index 0000000000000000000000000000000000000000..49350b9751850ccf68696d0c0db5a6cd22eb0089 --- /dev/null +++ b/src/main/java/teetime/stage/basic/distributor/RoundRobinStrategy2.java @@ -0,0 +1,62 @@ +/** + * Copyright (C) 2015 TeeTime (http://teetime.sourceforge.net) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package teetime.stage.basic.distributor; + +import teetime.framework.OutputPort; + +/** + * @author Nils Christian Ehmke + * + * @since 1.0 + */ +public final class RoundRobinStrategy2 implements IDistributorStrategy { + + private int index = 0; + + @Override + public <T> boolean distribute(final OutputPort<T>[] outputPorts, final T element) { + int numLoops = outputPorts.length; + + boolean success; + do { + OutputPort<T> outputPort = getNextPortInRoundRobinOrder(outputPorts); + success = outputPort.send(element); + numLoops--; + if (0 == numLoops) { + // Thread.yield(); + try { + Thread.sleep(1); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + + numLoops = outputPorts.length; + } + } while (!success); + + return true; + } + + private <T> OutputPort<T> getNextPortInRoundRobinOrder(final OutputPort<T>[] outputPorts) { + final OutputPort<T> outputPort = outputPorts[this.index]; + + this.index = (this.index + 1) % outputPorts.length; + + return outputPort; + } + +} diff --git a/src/main/java/teetime/stage/string/Tokenizer.java b/src/main/java/teetime/stage/string/Tokenizer.java index 7e451b5b55e321e503f97355fc68622294870bcf..0a9254020f9e0b2769e051d8309325c8879d3025 100644 --- a/src/main/java/teetime/stage/string/Tokenizer.java +++ b/src/main/java/teetime/stage/string/Tokenizer.java @@ -15,20 +15,35 @@ */ package teetime.stage.string; +import java.util.concurrent.TimeUnit; + import teetime.framework.AbstractConsumerStage; import teetime.framework.OutputPort; +import teetime.framework.pipe.IPipe; +import teetime.util.StopWatch; public final class Tokenizer extends AbstractConsumerStage<String> { private final OutputPort<String> outputPort = this.createOutputPort(); private final String regex; + private final StopWatch stopWatch; public Tokenizer(final String regex) { this.regex = regex; + stopWatch = new StopWatch(); + stopWatch.start(); } @Override protected void execute(final String element) { + + stopWatch.end(); + if (stopWatch.getDurationInNs() >= TimeUnit.SECONDS.toNanos(1)) { + IPipe pipe = inputPort.getPipe(); + logger.debug("pipe size: " + pipe.size()); + stopWatch.start(); + } + String[] tokens = element.split(regex); for (String token : tokens) { outputPort.send(token);