Skip to content
Snippets Groups Projects
Commit 35b33af2 authored by Christian Wulf's avatar Christian Wulf
Browse files

OutputPort.send now returns a boolean;

added RoundRobinStrategy2
parent 6f719588
No related branches found
No related tags found
No related merge requests found
#FindBugs User Preferences
#Wed Mar 18 10:50:24 CET 2015
#Thu Mar 19 12:32:13 CET 2015
detector_threshold=3
effort=max
excludefilter0=.fbExcludeFilterFile|true
......
......@@ -25,11 +25,14 @@ public final class OutputPort<T> extends AbstractPort<T> {
}
/**
*
* @param element
* to be sent; May not be <code>null</code>.
*
* @return <code>true</code> iff the <code>element</code> was sent; <code>false</code> otherwise.
*/
public void send(final T element) {
this.pipe.add(element);
public boolean send(final T element) {
return this.pipe.add(element);
}
/**
......
......@@ -47,12 +47,18 @@ public final class SpScPipe extends AbstractInterThreadPipe {
@Override
public boolean add(final Object element) {
// BETTER introduce a QueueIsFullStrategy
while (!this.queue.offer(element)) {
this.numWaits++;
Thread.yield();
}
// while (!this.queue.offer(element)) {
// this.numWaits++;
// // Thread.yield();
// try {
// Thread.sleep(0);
// } catch (InterruptedException e) {
// // TODO Auto-generated catch block
// e.printStackTrace();
// }
// }
// this.reportNewElement();
return true;
return this.queue.offer(element);
}
@Override
......
/**
* Copyright (C) 2015 TeeTime (http://teetime.sourceforge.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package teetime.stage.basic.distributor;
import teetime.framework.OutputPort;
/**
* @author Nils Christian Ehmke
*
* @since 1.0
*/
public final class RoundRobinStrategy2 implements IDistributorStrategy {
private int index = 0;
@Override
public <T> boolean distribute(final OutputPort<T>[] outputPorts, final T element) {
int numLoops = outputPorts.length;
boolean success;
do {
OutputPort<T> outputPort = getNextPortInRoundRobinOrder(outputPorts);
success = outputPort.send(element);
numLoops--;
if (0 == numLoops) {
// Thread.yield();
try {
Thread.sleep(1);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
numLoops = outputPorts.length;
}
} while (!success);
return true;
}
private <T> OutputPort<T> getNextPortInRoundRobinOrder(final OutputPort<T>[] outputPorts) {
final OutputPort<T> outputPort = outputPorts[this.index];
this.index = (this.index + 1) % outputPorts.length;
return outputPort;
}
}
......@@ -15,20 +15,35 @@
*/
package teetime.stage.string;
import java.util.concurrent.TimeUnit;
import teetime.framework.AbstractConsumerStage;
import teetime.framework.OutputPort;
import teetime.framework.pipe.IPipe;
import teetime.util.StopWatch;
public final class Tokenizer extends AbstractConsumerStage<String> {
private final OutputPort<String> outputPort = this.createOutputPort();
private final String regex;
private final StopWatch stopWatch;
public Tokenizer(final String regex) {
this.regex = regex;
stopWatch = new StopWatch();
stopWatch.start();
}
@Override
protected void execute(final String element) {
stopWatch.end();
if (stopWatch.getDurationInNs() >= TimeUnit.SECONDS.toNanos(1)) {
IPipe pipe = inputPort.getPipe();
logger.debug("pipe size: " + pipe.size());
stopWatch.start();
}
String[] tokens = element.split(regex);
for (String token : tokens) {
outputPort.send(token);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment