Skip to content
Snippets Groups Projects
Commit 24bd298b authored by Nelson Tavares de Sousa's avatar Nelson Tavares de Sousa
Browse files

Merge remote-tracking branch 'origin/master' into uncaught-exceptions

Conflicts:
	src/main/java/teetime/framework/AbstractRunnableStage.java
	src/main/java/teetime/framework/Configuration.java
parents adaecd4f bbd65c67
No related branches found
No related tags found
No related merge requests found
Showing
with 1099 additions and 1224 deletions
...@@ -12,4 +12,5 @@ ...@@ -12,4 +12,5 @@
<file-match-pattern match-pattern="." include-pattern="true"/> <file-match-pattern match-pattern="." include-pattern="true"/>
</fileset> </fileset>
<filter name="WriteProtectedFiles" enabled="true"/> <filter name="WriteProtectedFiles" enabled="true"/>
<filter name="DerivedFiles" enabled="true"/>
</fileset-config> </fileset-config>
This diff is collapsed.
...@@ -65,6 +65,14 @@ ...@@ -65,6 +65,14 @@
<exclude name="AvoidUsingVolatile" /> <exclude name="AvoidUsingVolatile" />
</rule> </rule>
<!-- UR means "undefined reference" which is already detected by the compiler.
so we deactivate it. -->
<rule ref="rulesets/java/controversial.xml/DataflowAnomalyAnalysis">
<properties>
<property name="violationSuppressRegex" value="^Found 'UR'-anomaly.*" />
</properties>
</rule>
<rule ref="rulesets/java/coupling.xml"> <rule ref="rulesets/java/coupling.xml">
<exclude name="LawOfDemeter" /> <exclude name="LawOfDemeter" />
</rule> </rule>
......
...@@ -19,6 +19,7 @@ import teetime.framework.pipe.IPipe; ...@@ -19,6 +19,7 @@ import teetime.framework.pipe.IPipe;
public abstract class AbstractPort<T> { public abstract class AbstractPort<T> {
protected IPipe pipe;
/** /**
* The type of this port. * The type of this port.
* <p> * <p>
...@@ -29,8 +30,6 @@ public abstract class AbstractPort<T> { ...@@ -29,8 +30,6 @@ public abstract class AbstractPort<T> {
private final Stage owningStage; private final Stage owningStage;
private final String name; private final String name;
protected IPipe pipe;
protected AbstractPort(final Class<T> type, final Stage owningStage, final String name) { protected AbstractPort(final Class<T> type, final Stage owningStage, final String name) {
super(); super();
this.type = type; this.type = type;
......
...@@ -39,6 +39,7 @@ abstract class AbstractRunnableStage implements Runnable { ...@@ -39,6 +39,7 @@ abstract class AbstractRunnableStage implements Runnable {
@Override @Override
public final void run() { public final void run() {
final Stage stage = this.stage; // should prevent the stage to be reloaded after a volatile read
this.logger.debug("Executing runnable stage..."); this.logger.debug("Executing runnable stage...");
try { try {
...@@ -67,7 +68,7 @@ abstract class AbstractRunnableStage implements Runnable { ...@@ -67,7 +68,7 @@ abstract class AbstractRunnableStage implements Runnable {
} }
} }
logger.debug("Finished runnable stage. (" + this.stage.getId() + ")"); logger.debug("Finished runnable stage. (" + stage.getId() + ")");
} }
protected abstract void beforeStageExecution() throws InterruptedException; protected abstract void beforeStageExecution() throws InterruptedException;
......
...@@ -38,12 +38,12 @@ public abstract class AbstractStage extends Stage { ...@@ -38,12 +38,12 @@ public abstract class AbstractStage extends Stage {
@Override @Override
protected List<InputPort<?>> getInputPorts() { protected List<InputPort<?>> getInputPorts() {
return inputPorts.getOpenedPorts(); return inputPorts.getOpenedPorts(); // TODO consider to publish a read-only version
} }
@Override @Override
protected List<OutputPort<?>> getOutputPorts() { protected List<OutputPort<?>> getOutputPorts() {
return outputPorts.getOpenedPorts(); return outputPorts.getOpenedPorts(); // TODO consider to publish a read-only version
} }
@Override @Override
...@@ -74,7 +74,7 @@ public abstract class AbstractStage extends Stage { ...@@ -74,7 +74,7 @@ public abstract class AbstractStage extends Stage {
* @return <code>true</code> if this stage has already received the given <code>signal</code>, <code>false</code> otherwise * @return <code>true</code> if this stage has already received the given <code>signal</code>, <code>false</code> otherwise
*/ */
protected boolean signalAlreadyReceived(final ISignal signal, final InputPort<?> inputPort) { protected boolean signalAlreadyReceived(final ISignal signal, final InputPort<?> inputPort) {
boolean signalAlreadyReceived = this.triggeredSignalTypes.contains(signal.getClass()); final boolean signalAlreadyReceived = this.triggeredSignalTypes.contains(signal.getClass());
if (signalAlreadyReceived) { if (signalAlreadyReceived) {
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("Got signal again: " + signal + " from input port: " + inputPort); logger.trace("Got signal again: " + signal + " from input port: " + inputPort);
......
...@@ -32,6 +32,14 @@ public abstract class Configuration extends AbstractCompositeStage { ...@@ -32,6 +32,14 @@ public abstract class Configuration extends AbstractCompositeStage {
private final IExceptionListenerFactory factory; private final IExceptionListenerFactory factory;
protected Configuration() {
this(new TerminatingExceptionListenerFactory());
}
protected Configuration(final IExceptionListenerFactory factory) {
this.factory = factory;
}
boolean isExecuted() { boolean isExecuted() {
return executed; return executed;
} }
...@@ -44,11 +52,4 @@ public abstract class Configuration extends AbstractCompositeStage { ...@@ -44,11 +52,4 @@ public abstract class Configuration extends AbstractCompositeStage {
return factory; return factory;
} }
protected Configuration() {
this(new TerminatingExceptionListenerFactory());
}
protected Configuration(final IExceptionListenerFactory factory) {
this.factory = factory;
}
} }
...@@ -66,10 +66,7 @@ class ExecutionInstantiation { ...@@ -66,10 +66,7 @@ class ExecutionInstantiation {
final OutputPort outputPort, final InstantiationPipe pipe) { final OutputPort outputPort, final InstantiationPipe pipe) {
Stage targetStage = pipe.getTargetPort().getOwningStage(); Stage targetStage = pipe.getTargetPort().getOwningStage();
int targetColor = DEFAULT_COLOR; int targetColor = colors.containsKey(targetStage) ? colors.get(targetStage) : DEFAULT_COLOR;
if (colors.containsKey(targetStage)) {
targetColor = colors.get(targetStage);
}
if (threadableStages.contains(targetStage) && targetColor != color) { if (threadableStages.contains(targetStage) && targetColor != color) {
if (pipe.capacity() != 0) { if (pipe.capacity() != 0) {
...@@ -94,13 +91,16 @@ class ExecutionInstantiation { ...@@ -94,13 +91,16 @@ class ExecutionInstantiation {
int color = DEFAULT_COLOR; int color = DEFAULT_COLOR;
Map<Stage, Integer> colors = new HashMap<Stage, Integer>(); Map<Stage, Integer> colors = new HashMap<Stage, Integer>();
Set<Stage> threadableStageJobs = configuration.getThreadableStages().keySet(); Set<Stage> threadableStageJobs = configuration.getThreadableStages().keySet();
Integer createdConnections = 0; int numCreatedConnections = 0;
for (Stage threadableStage : threadableStageJobs) { for (Stage threadableStage : threadableStageJobs) {
color++; color++;
colors.put(threadableStage, color); colors.put(threadableStage, color);
createdConnections = colorAndConnectStages(color, colors, threadableStage, configuration); numCreatedConnections += colorAndConnectStages(color, colors, threadableStage, configuration);
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Created " + numCreatedConnections + " connections");
} }
LOGGER.debug("Created " + createdConnections + " connections");
} }
} }
...@@ -53,8 +53,9 @@ public class SignalingCounter { ...@@ -53,8 +53,9 @@ public class SignalingCounter {
} }
final Object cond = conditions.get(number); final Object cond = conditions.get(number);
while (counter != number) {
synchronized (cond) { synchronized (cond) { // counter must be wrapped by synchronized to get the latest value
while (counter != number) {
cond.wait(); cond.wait();
} }
} }
......
...@@ -17,20 +17,17 @@ package teetime.util.framework.concurrent.queue.putstrategy; ...@@ -17,20 +17,17 @@ package teetime.util.framework.concurrent.queue.putstrategy;
import java.util.Queue; import java.util.Queue;
public class YieldPutStrategy<E> implements PutStrategy<E> public class YieldPutStrategy<E> implements PutStrategy<E> {
{
@Override @Override
public void backoffOffer(final Queue<E> q, final E e) public void backoffOffer(final Queue<E> q, final E e) {
{ while (!q.offer(e)) {
while (!q.offer(e))
{
Thread.yield(); Thread.yield();
} }
} }
@Override @Override
public void signal() public void signal() {
{
// Nothing // Nothing
} }
} }
...@@ -21,25 +21,22 @@ import java.util.concurrent.locks.LockSupport; ...@@ -21,25 +21,22 @@ import java.util.concurrent.locks.LockSupport;
public final class SCParkTakeStrategy<E> implements TakeStrategy<E> { public final class SCParkTakeStrategy<E> implements TakeStrategy<E> {
public volatile int storeFence = 0; public volatile int storeFence = 0; // NOCS
private final AtomicReference<Thread> t = new AtomicReference<Thread>(null); private final AtomicReference<Thread> t = new AtomicReference<Thread>(null);
@Override @Override
// Make sure the offer is visible before unpark // Make sure the offer is visible before unpark
public void signal() public void signal() {
{
storeFence = 1; // store barrier storeFence = 1; // store barrier
LockSupport.unpark(t.get()); // t.get() load barrier LockSupport.unpark(t.get()); // t.get() load barrier
} }
@Override @Override
public E waitPoll(final Queue<E> q) throws InterruptedException public E waitPoll(final Queue<E> q) throws InterruptedException {
{
E e = q.poll(); E e = q.poll();
if (e != null) if (e != null) {
{
return e; return e;
} }
......
...@@ -30,8 +30,8 @@ public class File2SeqOfWordsTest { ...@@ -30,8 +30,8 @@ public class File2SeqOfWordsTest {
@Test @Test
public void testExecute() throws Exception { public void testExecute() throws Exception {
File2SeqOfWords stage = new File2SeqOfWords(14); final File2SeqOfWords stage = new File2SeqOfWords(14);
List<String> outputSeqOfWords = new ArrayList<String>(); final List<String> outputSeqOfWords = new ArrayList<String>();
StageTester.test(stage).send(Arrays.asList(new File("./src/test/resources/data/input.txt"))).to(stage.getInputPort()).and().receive(outputSeqOfWords) StageTester.test(stage).send(Arrays.asList(new File("./src/test/resources/data/input.txt"))).to(stage.getInputPort()).and().receive(outputSeqOfWords)
.from(stage.getOutputPort()).start(); .from(stage.getOutputPort()).start();
assertEquals(outputSeqOfWords.get(0), "Lorem ipsum"); assertEquals(outputSeqOfWords.get(0), "Lorem ipsum");
......
...@@ -39,7 +39,7 @@ public class TokenizerTest { ...@@ -39,7 +39,7 @@ public class TokenizerTest {
@Test @Test
public void tokenizerShouldJustDelaySingleToken() { public void tokenizerShouldJustDelaySingleToken() {
List<String> results = new ArrayList<String>(); final List<String> results = new ArrayList<String>();
test(tokenizer).and().send("Hello World").to(tokenizer.getInputPort()).and().receive(results).from(tokenizer.getOutputPort()).start(); test(tokenizer).and().send("Hello World").to(tokenizer.getInputPort()).and().receive(results).from(tokenizer.getOutputPort()).start();
...@@ -48,7 +48,7 @@ public class TokenizerTest { ...@@ -48,7 +48,7 @@ public class TokenizerTest {
@Test @Test
public void tokenizerShouldSplitMultipleToken() { public void tokenizerShouldSplitMultipleToken() {
List<String> results = new ArrayList<String>(); final List<String> results = new ArrayList<String>();
test(tokenizer).and().send("Hello;World").to(tokenizer.getInputPort()).and().receive(results).from(tokenizer.getOutputPort()).start(); test(tokenizer).and().send("Hello;World").to(tokenizer.getInputPort()).and().receive(results).from(tokenizer.getOutputPort()).start();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment