Skip to content
Snippets Groups Projects
Commit 4edfc1f6 authored by Nelson Tavares de Sousa's avatar Nelson Tavares de Sousa
Browse files

Merge remote-tracking branch 'origin/master' into pipe-instantiation

Conflicts:
	src/changes/changes.xml
	src/main/java/teetime/framework/Analysis.java
parents 199e37a2 792a4d3b
No related branches found
No related tags found
1 merge request!41Pipe instantiation
Showing
with 196 additions and 55 deletions
#FindBugs User Preferences
#Wed May 13 15:54:28 CEST 2015
#Fri Jun 12 08:15:02 CEST 2015
detector_threshold=2
effort=max
excludefilter0=.fbExcludeFilterFile|true
......
......@@ -5,71 +5,88 @@
<title>Release Notes</title>
</properties>
<body>
<release version="Snapshot" date="Daily basis" description="Unstable preview of oncoming versions">
<release version="Snapshot" date="Daily basis"
description="Unstable preview of oncoming versions">
<action dev="ntd" type="add" issue="33">
TeeTime automatically chooses the correct type of pipe for all connections.
TeeTime automatically
chooses the correct type of pipe for all connections.
</action>
<action dev="ntd" type="fix" issue="93">
Introduced a new concept for composing stages.
Introduced a new concept
for composing stages.
</action>
<action dev="ntd" type="remove">
Marked Pair class as deprecated.
</action>
<action dev="ntd" type="add" issue="154">
All stages will be
initialized before starting the analysis.
</action>
</release>
<release version="1.1.2" date="12.05.2015" description="Minor bugfixes for 1.1">
<action dev="chw" due-to="Nils C. Ehmke" type="fix" issue="151">
Solved a bug in the merger stage.
</action>
</release>
<release version="1.1.1" date="06.05.2015" description="Minor bugfixes for 1.1">
<action dev="ntd" due-to="Nils C. Ehmke" type="fix" issue="151">
Solved a bug which led to a NullPointerExceptions.
</action>
<action dev="ntd" type="update" issue="102">
Removed deprecated methods.
Removed deprecated
methods.
</action>
</release>
<release version="1.1" date="30.04.2015" description="Second release">
<action dev="ntd" type="add" issue="32">
New concept: exception handling incl. Wiki tutorial.
New concept: exception
handling incl. Wiki tutorial.
</action>
<action due-to="Nils C. Ehmke" type="add" issue="107">
New concept: unit test framework for testing a single stage.
New concept:
unit test framework for testing a single stage.
</action>
<action dev="chw" type="add">
New class: AbstractTransformation;
New class: AbstractTransformation;
Represents a stage with a single input and a single output port.
</action>
<action dev="chw" type="add">
New class: AbstractFilter;
Represents a stage with a single input and a single output port of the same type.
Represents a
stage with a single input and a single output port of the same type.
</action>
<action dev="ntd" type="update" issue="92">
Analysis.start() is now deprecated. Use Analysis.execute() instead.
Analysis.start() is now
deprecated. Use Analysis.execute() instead.
</action>
<action due-to="Arne J. Salveter" type="update" issue="120">
Renamed Stage.executeWithPorts() to Stage.executeStage().
Renamed
Stage.executeWithPorts() to Stage.executeStage().
</action>
<action dev="ntd" type="update" issue="112">
Removed IterableProducer. Use InitialElementProducer instead.
Removed
IterableProducer. Use InitialElementProducer instead.
</action>
<action dev="chw" type="fix" issue="143">
#143 Null values can block the analysis.
#143 Null values can block
the analysis.
</action>
<action dev="ntd" type="fix" issue="109">
#109 Minor bug in ObjectProducer stage.
#109 Minor bug in
ObjectProducer stage.
</action>
<action dev="ntd" type="fix" issue="75">
#75 Signal passing is incorrect.
#75 Signal passing is
incorrect.
</action>
......@@ -77,10 +94,12 @@
Updated dependencies.
</action>
<action dev="ntd" type="update" issue="72">
Jar is not only published via the Central Maven Repository, but also via our CI server Jenkins.
Jar is not only
published via the Central Maven Repository, but also via our CI
server Jenkins.
</action>
</release>
<release version="1.0" date="19.12.2014" description="Initial release">
<action dev="ntd" type="add" issue="66">
Created a new site to
......
......@@ -24,6 +24,7 @@ import org.jctools.queues.spec.Ordering;
import org.jctools.queues.spec.Preference;
import teetime.framework.signal.ISignal;
import teetime.framework.signal.InitializingSignal;
import teetime.util.concurrent.queue.PCBlockingQueue;
import teetime.util.concurrent.queue.putstrategy.PutStrategy;
import teetime.util.concurrent.queue.putstrategy.YieldPutStrategy;
......@@ -69,6 +70,15 @@ public abstract class AbstractInterThreadPipe extends AbstractPipe {
cachedTargetStage.onSignal(signal, getTargetPort());
}
@Override
public final void waitForInitializingSignal() throws InterruptedException {
final ISignal signal = signalQueue.take();
if (!(signal instanceof InitializingSignal)) {
throw new IllegalStateException("Expected InitializingSignal, but was not the first arriving signal");
}
cachedTargetStage.onSignal(signal, getTargetPort());
}
@Override
public final boolean isClosed() {
return isClosed;
......
......@@ -56,4 +56,10 @@ public abstract class AbstractIntraThreadPipe extends AbstractPipe {
public void waitForStartSignal() throws InterruptedException {
// do nothing
}
@SuppressWarnings("PMD.EmptyMethodInAbstractClassShouldBeAbstract")
@Override
public void waitForInitializingSignal() throws InterruptedException {
// do nothing
}
}
......@@ -25,7 +25,7 @@ abstract class AbstractRunnableStage implements Runnable {
private static final String TERMINATING_THREAD_DUE_TO_THE_FOLLOWING_EXCEPTION = "Terminating thread due to the following exception: ";
private final Stage stage;
protected final Stage stage;
@SuppressWarnings("PMD.LoggerIsNotStaticFinal")
protected final Logger logger;
......@@ -39,16 +39,16 @@ abstract class AbstractRunnableStage implements Runnable {
this.logger.debug("Executing runnable stage...");
boolean failed = false;
try {
beforeStageExecution(stage);
beforeStageExecution();
try {
do {
executeStage(stage);
executeStage();
} while (!stage.shouldBeTerminated());
} catch (StageException e) {
this.stage.terminate();
failed = true;
}
afterStageExecution(stage);
afterStageExecution();
} catch (RuntimeException e) {
this.logger.error(TERMINATING_THREAD_DUE_TO_THE_FOLLOWING_EXCEPTION, e);
......@@ -72,10 +72,10 @@ abstract class AbstractRunnableStage implements Runnable {
}
protected abstract void beforeStageExecution(Stage stage) throws InterruptedException;
protected abstract void beforeStageExecution() throws InterruptedException;
protected abstract void executeStage(Stage stage);
protected abstract void executeStage();
protected abstract void afterStageExecution(Stage stage);
protected abstract void afterStageExecution();
}
......@@ -75,6 +75,8 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught
private final IPipeFactory intraThreadPipeFactory = new SingleElementPipeFactory();
private int createdConnections = 0;
private final List<RunnableProducerStage> producerRunnables = new LinkedList<RunnableProducerStage>();
/**
* Creates a new {@link Analysis} that skips validating the port connections and uses the default listener.
*
......@@ -130,10 +132,6 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught
*
*/
private final void init() {
if (initialized) {
return;
}
initialized = true;
instantiatePipes();
......@@ -150,6 +148,12 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught
initializeIntraStages(intraStages, thread, newListener);
}
startThreads(this.consumerThreads);
startThreads(this.finiteProducerThreads);
startThreads(this.infiniteProducerThreads);
sendInitializingSignal();
}
private Thread initializeThreadableStages(final Stage stage) {
......@@ -165,6 +169,7 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught
}
case BY_SELF_DECISION: {
final RunnableProducerStage runnable = new RunnableProducerStage(stage);
producerRunnables.add(runnable);
thread = createThread(runnable, stage.getId());
this.finiteProducerThreads.add(thread);
InitializingSignal initializingSignal = new InitializingSignal();
......@@ -173,6 +178,7 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught
}
case BY_INTERRUPT: {
final RunnableProducerStage runnable = new RunnableProducerStage(stage);
producerRunnables.add(runnable);
thread = createThread(runnable, stage.getId());
InitializingSignal initializingSignal = new InitializingSignal();
stage.onSignal(initializingSignal, null);
......@@ -315,9 +321,7 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught
* @since 1.1
*/
public void executeNonBlocking() {
startThreads(this.consumerThreads);
startThreads(this.finiteProducerThreads);
startThreads(this.infiniteProducerThreads);
sendStartingSignal();
}
private void startThreads(final Iterable<Thread> threads) {
......@@ -326,6 +330,18 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught
}
}
private void sendInitializingSignal() {
for (RunnableProducerStage runnable : producerRunnables) {
runnable.triggerInitializingSignal();
}
}
private void sendStartingSignal() {
for (RunnableProducerStage runnable : producerRunnables) {
runnable.triggerStartingSignal();
}
}
/**
* Retrieves the Configuration which was used to add and arrange all stages needed for the Analysis
*
......
......@@ -38,4 +38,8 @@ public final class InputPort<T> extends AbstractPort<T> {
pipe.waitForStartSignal();
}
public void waitForInitializingSignal() throws InterruptedException {
pipe.waitForInitializingSignal();
};
}
......@@ -42,19 +42,19 @@ final class RunnableConsumerStage extends AbstractRunnableStage {
@SuppressWarnings("PMD.GuardLogStatement")
@Override
protected void beforeStageExecution(final Stage stage) throws InterruptedException {
logger.trace("Waiting for start signals..." + inputPorts);
protected void beforeStageExecution() throws InterruptedException {
logger.trace("Waiting for start signals... " + stage);
for (InputPort<?> inputPort : inputPorts) {
inputPort.waitForStartSignal();
inputPort.waitForInitializingSignal();
}
for (InputPort<?> inputPort : inputPorts) {
inputPort.waitForStartSignal();
}
logger.trace("Starting..." + stage);
logger.trace("Starting... " + stage);
}
@Override
protected void executeStage(final Stage stage) {
protected void executeStage() {
try {
stage.executeStage();
} catch (NotEnoughInputException e) {
......@@ -73,7 +73,7 @@ final class RunnableConsumerStage extends AbstractRunnableStage {
}
@Override
protected void afterStageExecution(final Stage stage) {
protected void afterStageExecution() {
final ISignal signal = new TerminatingSignal();
for (InputPort<?> inputPort : inputPorts) {
stage.onSignal(signal, inputPort);
......
......@@ -15,30 +15,53 @@
*/
package teetime.framework;
import java.util.concurrent.Semaphore;
import teetime.framework.signal.InitializingSignal;
import teetime.framework.signal.StartingSignal;
import teetime.framework.signal.TerminatingSignal;
public final class RunnableProducerStage extends AbstractRunnableStage {
final class RunnableProducerStage extends AbstractRunnableStage {
private final Semaphore startSemaphore = new Semaphore(0);
private final Semaphore initSemaphore = new Semaphore(0);
public RunnableProducerStage(final Stage stage) {
super(stage);
}
@Override
protected void beforeStageExecution(final Stage stage) {
final StartingSignal startingSignal = new StartingSignal();
stage.onSignal(startingSignal, null);
protected void beforeStageExecution() throws InterruptedException {
waitForInitializingSignal();
this.stage.onSignal(new InitializingSignal(), null);
waitForStartingSignal();
this.stage.onSignal(new StartingSignal(), null);
}
@Override
protected void executeStage(final Stage stage) {
stage.executeStage();
protected void executeStage() {
this.stage.executeStage();
}
@Override
protected void afterStageExecution(final Stage stage) {
protected void afterStageExecution() {
final TerminatingSignal terminatingSignal = new TerminatingSignal();
stage.onSignal(terminatingSignal, null);
this.stage.onSignal(terminatingSignal, null);
}
public void triggerInitializingSignal() {
initSemaphore.release();
}
public void triggerStartingSignal() {
startSemaphore.release();
}
public void waitForInitializingSignal() throws InterruptedException {
initSemaphore.acquire();
}
public void waitForStartingSignal() throws InterruptedException {
startSemaphore.acquire();
}
}
......@@ -86,6 +86,11 @@ public final class DummyPipe implements IPipe {
}
@Override
public void waitForInitializingSignal() throws InterruptedException {
}
@Override
public void close() {
......
......@@ -92,6 +92,8 @@ public interface IPipe {
void waitForStartSignal() throws InterruptedException;
void waitForInitializingSignal() throws InterruptedException;
void close();
}
......@@ -31,8 +31,10 @@ public final class Counter<T> extends AbstractConsumerStage<T> {
outputPort.send(element);
}
// BETTER find a solution w/o any thread-safe code in this stage
public synchronized int getNumElementsPassed() {
/**
* <i>Hint:</i> This method may not be invoked by another thread since it is not thread-safe.
*/
public int getNumElementsPassed() {
return this.numElementsPassed;
}
......
package teetime.framework;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import org.junit.Test;
public class RunnableProducerStageTest {
@Test
public void testInit() {
RunnableTestStage testStage = new RunnableTestStage();
RunnableProducerStage runnable = new RunnableProducerStage(testStage);
Thread thread = new Thread(runnable);
thread.start();
// Not running and not initialized
assertFalse(testStage.executed && testStage.initialized);
runnable.triggerInitializingSignal();
// Not running, but initialized
assertFalse(testStage.executed && !testStage.initialized);
runnable.triggerStartingSignal();
while (!(testStage.getCurrentState() == StageState.TERMINATED)) {
Thread.yield();
}
assertTrue(testStage.executed);
}
}
package teetime.framework;
class RunnableTestStage extends AbstractProducerStage<Object> {
boolean executed, initialized;
@Override
protected void executeStage() {
executed = true;
this.terminate();
}
@Override
protected void execute() {
}
@Override
public void onInitializing() throws Exception {
super.onInitializing();
initialized = true;
}
}
......@@ -102,6 +102,9 @@ class MergerTestingPipe implements IPipe {
@Override
public void waitForStartSignal() throws InterruptedException {}
@Override
public void waitForInitializingSignal() throws InterruptedException {}
@Override
public void close() {}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment