Skip to content
Snippets Groups Projects
Commit 1410a2de authored by Christian Wulf's avatar Christian Wulf
Browse files

added SignalingCounter

parent 80b358b4
No related branches found
No related tags found
No related merge requests found
......@@ -32,13 +32,16 @@ abstract class AbstractRunnableStage implements Runnable {
this.stage = stage;
this.logger = LoggerFactory.getLogger(stage.getClass());
// stage.owningContext.getThreadCounter().inc();
if (stage.getTerminationStrategy() != TerminationStrategy.BY_INTERRUPT) {
stage.owningContext.getRunnableCounter().inc();
}
}
@Override
public final void run() {
this.logger.debug("Executing runnable stage...");
// StageException failedException = null;
try {
try {
beforeStageExecution();
try {
......@@ -47,7 +50,6 @@ abstract class AbstractRunnableStage implements Runnable {
} while (!stage.shouldBeTerminated());
} catch (StageException e) {
this.stage.terminate();
// failedException = e;
throw e;
} finally {
afterStageExecution();
......@@ -59,28 +61,14 @@ abstract class AbstractRunnableStage implements Runnable {
} catch (InterruptedException e) {
this.logger.error(TERMINATING_THREAD_DUE_TO_THE_FOLLOWING_EXCEPTION, e);
}
this.logger.debug("Finished runnable stage. (" + this.stage.getId() + ")");
// if (failedException != null) {
// sendTerminatingSignal();
// // throw new IllegalStateException("Terminated by StageExceptionListener", failedException);
// throw failedException;
// }
// normal and exceptional termination
// stage.owningContext.getThreadCounter().dec();
} finally {
if (stage.getTerminationStrategy() != TerminationStrategy.BY_INTERRUPT) {
stage.owningContext.getRunnableCounter().dec();
}
}
//
// private void sendTerminatingSignal() {
// if (stage.getTerminationStrategy() == TerminationStrategy.BY_SIGNAL) {
// TerminatingSignal signal = new TerminatingSignal();
// // TODO: Check if this is really needed... it seems like signals are passed on after their first arrival
// for (InputPort<?> inputPort : stage.getInputPorts()) {
// stage.onSignal(signal, inputPort);
// }
// }
// }
logger.debug("Finished runnable stage. (" + this.stage.getId() + ")");
}
protected abstract void beforeStageExecution() throws InterruptedException;
......
......@@ -22,6 +22,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import teetime.framework.pipe.InstantiationPipe;
import teetime.util.framework.concurrent.SignalingCounter;
/**
* Represents a context that is used by a configuration and composite stages to connect ports, for example.
......@@ -37,12 +38,18 @@ final class ConfigurationContext {
private Map<Stage, String> threadableStages = new HashMap<Stage, String>();
private final SignalingCounter runnableCounter = new SignalingCounter();
ConfigurationContext() {}
Map<Stage, String> getThreadableStages() {
return this.threadableStages;
}
SignalingCounter getRunnableCounter() {
return runnableCounter;
}
/**
* @see AbstractCompositeStage#addThreadableStage(Stage)
*/
......
......@@ -226,17 +226,17 @@ public final class Execution<T extends Configuration> implements UncaughtExcepti
*/
public void waitForTermination() {
try {
// stage.owningContext.getThreadCounter().await(0);
getConfiguration().getContext().getRunnableCounter().waitFor(0);
LOGGER.debug("Waiting for finiteProducerThreads");
for (Thread thread : this.finiteProducerThreads) {
thread.join();
}
LOGGER.debug("Waiting for consumerThreads");
for (Thread thread : this.consumerThreads) {
thread.join();
}
// LOGGER.debug("Waiting for finiteProducerThreads");
// for (Thread thread : this.finiteProducerThreads) {
// thread.join();
// }
//
// LOGGER.debug("Waiting for consumerThreads");
// for (Thread thread : this.consumerThreads) {
// thread.join();
// }
} catch (InterruptedException e) {
LOGGER.error("Execution has stopped unexpectedly", e);
for (Thread thread : this.finiteProducerThreads) {
......
package teetime.framework;
public class ThreadService {
public Runnable startWithinNewThread(final Stage stage) {
Runnable runnable = wrap(stage);
Thread thread = new Thread(runnable);
thread.start();
return runnable;
}
private AbstractRunnableStage wrap(final Stage stage) {
if (stage.getInputPorts().size() > 0) {
return new RunnableConsumerStage(stage);
}
return new RunnableProducerStage(stage);
}
}
package teetime.util.framework.concurrent;
import com.carrotsearch.hppc.IntObjectHashMap;
import com.carrotsearch.hppc.IntObjectMap;
public class SignalingCounter {
private final IntObjectMap<Object> conditions = new IntObjectHashMap<Object>();
private int counter;
// synchronized methods synchronize the map and the counter
// synchronized(cond) synchronizes the individual numbers for which are being waited for
public synchronized void inc() {
counter++;
conditionalNotifyAll(counter);
}
public synchronized void dec() {
counter--;
conditionalNotifyAll(counter);
}
private synchronized void conditionalNotifyAll(final int number) {
if (conditions.containsKey(number)) {
Object cond = conditions.get(number);
synchronized (cond) {
cond.notifyAll();
}
}
}
public synchronized void waitFor(final int number) throws InterruptedException {
if (!conditions.containsKey(number)) {
conditions.put(number, new Object());
}
final Object cond = conditions.get(number);
synchronized (cond) {
while (counter != number) {
cond.wait();
}
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment