From 35b33af288eeed9aeeae984f2c9ce3500435382a Mon Sep 17 00:00:00 2001 From: Christian Wulf <chw@informatik.uni-kiel.de> Date: Thu, 19 Mar 2015 13:15:36 +0100 Subject: [PATCH] OutputPort.send now returns a boolean; added RoundRobinStrategy2 --- .settings/edu.umd.cs.findbugs.core.prefs | 2 +- .../java/teetime/framework/OutputPort.java | 7 ++- .../java/teetime/framework/pipe/SpScPipe.java | 16 +++-- .../distributor/RoundRobinStrategy2.java | 62 +++++++++++++++++++ .../java/teetime/stage/string/Tokenizer.java | 15 +++++ 5 files changed, 94 insertions(+), 8 deletions(-) create mode 100644 src/main/java/teetime/stage/basic/distributor/RoundRobinStrategy2.java diff --git a/.settings/edu.umd.cs.findbugs.core.prefs b/.settings/edu.umd.cs.findbugs.core.prefs index 3c0de6d0..233d5c8c 100644 --- a/.settings/edu.umd.cs.findbugs.core.prefs +++ b/.settings/edu.umd.cs.findbugs.core.prefs @@ -1,5 +1,5 @@ #FindBugs User Preferences -#Wed Mar 18 10:50:24 CET 2015 +#Thu Mar 19 12:32:13 CET 2015 detector_threshold=3 effort=max excludefilter0=.fbExcludeFilterFile|true diff --git a/src/main/java/teetime/framework/OutputPort.java b/src/main/java/teetime/framework/OutputPort.java index 99204663..50a54d6a 100644 --- a/src/main/java/teetime/framework/OutputPort.java +++ b/src/main/java/teetime/framework/OutputPort.java @@ -25,11 +25,14 @@ public final class OutputPort<T> extends AbstractPort<T> { } /** + * * @param element * to be sent; May not be <code>null</code>. + * + * @return <code>true</code> iff the <code>element</code> was sent; <code>false</code> otherwise. */ - public void send(final T element) { - this.pipe.add(element); + public boolean send(final T element) { + return this.pipe.add(element); } /** diff --git a/src/main/java/teetime/framework/pipe/SpScPipe.java b/src/main/java/teetime/framework/pipe/SpScPipe.java index c1bb0694..b4d5d0a5 100644 --- a/src/main/java/teetime/framework/pipe/SpScPipe.java +++ b/src/main/java/teetime/framework/pipe/SpScPipe.java @@ -47,12 +47,18 @@ public final class SpScPipe extends AbstractInterThreadPipe { @Override public boolean add(final Object element) { // BETTER introduce a QueueIsFullStrategy - while (!this.queue.offer(element)) { - this.numWaits++; - Thread.yield(); - } + // while (!this.queue.offer(element)) { + // this.numWaits++; + // // Thread.yield(); + // try { + // Thread.sleep(0); + // } catch (InterruptedException e) { + // // TODO Auto-generated catch block + // e.printStackTrace(); + // } + // } // this.reportNewElement(); - return true; + return this.queue.offer(element); } @Override diff --git a/src/main/java/teetime/stage/basic/distributor/RoundRobinStrategy2.java b/src/main/java/teetime/stage/basic/distributor/RoundRobinStrategy2.java new file mode 100644 index 00000000..49350b97 --- /dev/null +++ b/src/main/java/teetime/stage/basic/distributor/RoundRobinStrategy2.java @@ -0,0 +1,62 @@ +/** + * Copyright (C) 2015 TeeTime (http://teetime.sourceforge.net) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package teetime.stage.basic.distributor; + +import teetime.framework.OutputPort; + +/** + * @author Nils Christian Ehmke + * + * @since 1.0 + */ +public final class RoundRobinStrategy2 implements IDistributorStrategy { + + private int index = 0; + + @Override + public <T> boolean distribute(final OutputPort<T>[] outputPorts, final T element) { + int numLoops = outputPorts.length; + + boolean success; + do { + OutputPort<T> outputPort = getNextPortInRoundRobinOrder(outputPorts); + success = outputPort.send(element); + numLoops--; + if (0 == numLoops) { + // Thread.yield(); + try { + Thread.sleep(1); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + + numLoops = outputPorts.length; + } + } while (!success); + + return true; + } + + private <T> OutputPort<T> getNextPortInRoundRobinOrder(final OutputPort<T>[] outputPorts) { + final OutputPort<T> outputPort = outputPorts[this.index]; + + this.index = (this.index + 1) % outputPorts.length; + + return outputPort; + } + +} diff --git a/src/main/java/teetime/stage/string/Tokenizer.java b/src/main/java/teetime/stage/string/Tokenizer.java index 7e451b5b..0a925402 100644 --- a/src/main/java/teetime/stage/string/Tokenizer.java +++ b/src/main/java/teetime/stage/string/Tokenizer.java @@ -15,20 +15,35 @@ */ package teetime.stage.string; +import java.util.concurrent.TimeUnit; + import teetime.framework.AbstractConsumerStage; import teetime.framework.OutputPort; +import teetime.framework.pipe.IPipe; +import teetime.util.StopWatch; public final class Tokenizer extends AbstractConsumerStage<String> { private final OutputPort<String> outputPort = this.createOutputPort(); private final String regex; + private final StopWatch stopWatch; public Tokenizer(final String regex) { this.regex = regex; + stopWatch = new StopWatch(); + stopWatch.start(); } @Override protected void execute(final String element) { + + stopWatch.end(); + if (stopWatch.getDurationInNs() >= TimeUnit.SECONDS.toNanos(1)) { + IPipe pipe = inputPort.getPipe(); + logger.debug("pipe size: " + pipe.size()); + stopWatch.start(); + } + String[] tokens = element.split(regex); for (String token : tokens) { outputPort.send(token); -- GitLab