Skip to content
Snippets Groups Projects
Commit d1b7f47a authored by Christian Wulf's avatar Christian Wulf
Browse files

Merge branch 'threadNameing' into 'master'

Thread nameing

fixes #122

See merge request !47
parents 9243c7bd e5abb5c0
No related branches found
No related tags found
No related merge requests found
...@@ -6,7 +6,7 @@ ...@@ -6,7 +6,7 @@
</properties> </properties>
<body> <body>
<release version="Snapshot" date="Daily basis" <release version="Snapshot" date="Daily basis"
description="Unstable preview of oncoming versions"> description="Unstable preview of oncoming versions - 2.0-SNAPSHOT">
<action dev="ntd" type="add" issue="33"> <action dev="ntd" type="add" issue="33">
TeeTime automatically TeeTime automatically
chooses the correct type of pipe for all connections. chooses the correct type of pipe for all connections.
...@@ -21,19 +21,24 @@ ...@@ -21,19 +21,24 @@
</action> </action>
<action dev="ntd" type="add" issue="171"> <action dev="ntd" type="add" issue="171">
Configurations are now Configurations are now
built within an AnalysisContext which is passed on to nested built within the Configuration class which is passed on to nested
CompositeStages. CompositeStages.
This removes any constraints on CompositeStages and This removes any constraints on CompositeStages and
enables therefore multiple connections and multithreading. enables therefore multiple connections and multithreading.
</action> </action>
<action dev="ntd" type="update">
Renamed Analysis to Execution
</action>
<action dev="ntd" type="remove"> <action dev="ntd" type="remove">
Marked Pair class as deprecated. Removed pair class.
</action> </action>
<action dev="ntd" type="add" issue="154"> <action dev="ntd" type="add" issue="154">
All stages will be All stages will be
initialized before starting the analysis. initialized before starting the analysis.
</action> </action>
<action dev="ntd" type="add" issue="122">
Threads can be named for better debugging.
</action>
</release> </release>
<release version="1.1.2" date="12.05.2015" description="Minor bugfixes for 1.1"> <release version="1.1.2" date="12.05.2015" description="Minor bugfixes for 1.1">
......
...@@ -43,6 +43,18 @@ public abstract class AbstractCompositeStage { ...@@ -43,6 +43,18 @@ public abstract class AbstractCompositeStage {
return context; return context;
} }
/**
* Execute this method, to add a stage to the configuration, which should be executed in a own thread.
*
* @param stage
* A arbitrary stage, which will be added to the configuration and executed in a thread.
* @param threadName
* A string which can be used for debugging.
*/
protected final void addThreadableStage(final Stage stage, final String threadName) {
context.addThreadableStage(stage, threadName);
}
/** /**
* Execute this method, to add a stage to the configuration, which should be executed in a own thread. * Execute this method, to add a stage to the configuration, which should be executed in a own thread.
* *
...@@ -50,7 +62,7 @@ public abstract class AbstractCompositeStage { ...@@ -50,7 +62,7 @@ public abstract class AbstractCompositeStage {
* A arbitrary stage, which will be added to the configuration and executed in a thread. * A arbitrary stage, which will be added to the configuration and executed in a thread.
*/ */
protected final void addThreadableStage(final Stage stage) { protected final void addThreadableStage(final Stage stage) {
context.addThreadableStage(stage); this.addThreadableStage(stage, stage.getId());
} }
/** /**
......
...@@ -15,8 +15,8 @@ ...@@ -15,8 +15,8 @@
*/ */
package teetime.framework; package teetime.framework;
import java.util.HashSet; import java.util.HashMap;
import java.util.Set; import java.util.Map;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -33,19 +33,19 @@ public final class ConfigurationContext { ...@@ -33,19 +33,19 @@ public final class ConfigurationContext {
private static final Logger LOGGER = LoggerFactory.getLogger(ConfigurationContext.class); private static final Logger LOGGER = LoggerFactory.getLogger(ConfigurationContext.class);
private final Set<Stage> threadableStages = new HashSet<Stage>(); private final Map<Stage, String> threadableStages = new HashMap<Stage, String>();
ConfigurationContext() {} ConfigurationContext() {}
Set<Stage> getThreadableStages() { Map<Stage, String> getThreadableStages() {
return this.threadableStages; return this.threadableStages;
} }
/** /**
* @see AbstractCompositeStage#addThreadableStage(Stage) * @see AbstractCompositeStage#addThreadableStage(Stage)
*/ */
final void addThreadableStage(final Stage stage) { final void addThreadableStage(final Stage stage, final String threadName) {
if (!this.threadableStages.add(stage)) { if (this.threadableStages.put(stage, threadName) != null) {
LOGGER.warn("Stage " + stage.getId() + " was already marked as threadable stage."); LOGGER.warn("Stage " + stage.getId() + " was already marked as threadable stage.");
} }
} }
...@@ -54,8 +54,10 @@ public final class ConfigurationContext { ...@@ -54,8 +54,10 @@ public final class ConfigurationContext {
* @see AbstractCompositeStage#connectPorts(OutputPort, InputPort, int) * @see AbstractCompositeStage#connectPorts(OutputPort, InputPort, int)
*/ */
final <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { final <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
if (sourcePort.getOwningStage().getInputPorts().length == 0 && !threadableStages.contains(sourcePort.getOwningStage())) { if (sourcePort.getOwningStage().getInputPorts().length == 0) {
addThreadableStage(sourcePort.getOwningStage()); if (!threadableStages.containsKey(sourcePort.getOwningStage())) {
addThreadableStage(sourcePort.getOwningStage(), sourcePort.getOwningStage().getId());
}
} }
if (sourcePort.getPipe() != null || targetPort.getPipe() != null) { if (sourcePort.getPipe() != null || targetPort.getPipe() != null) {
LOGGER.warn("Overwriting existing pipe while connecting stages " + LOGGER.warn("Overwriting existing pipe while connecting stages " +
......
...@@ -19,6 +19,7 @@ import java.lang.Thread.UncaughtExceptionHandler; ...@@ -19,6 +19,7 @@ import java.lang.Thread.UncaughtExceptionHandler;
import java.util.Collection; import java.util.Collection;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
...@@ -102,8 +103,8 @@ public final class Execution<T extends Configuration> implements UncaughtExcepti ...@@ -102,8 +103,8 @@ public final class Execution<T extends Configuration> implements UncaughtExcepti
// BETTER validate concurrently // BETTER validate concurrently
private void validateStages() { private void validateStages() {
final Set<Stage> threadableStageJobs = this.configuration.getContext().getThreadableStages(); final Map<Stage, String> threadableStageJobs = this.configuration.getContext().getThreadableStages();
for (Stage stage : threadableStageJobs) { for (Stage stage : threadableStageJobs.keySet()) {
// // portConnectionValidator.validate(stage); // // portConnectionValidator.validate(stage);
// } // }
...@@ -123,7 +124,7 @@ public final class Execution<T extends Configuration> implements UncaughtExcepti ...@@ -123,7 +124,7 @@ public final class Execution<T extends Configuration> implements UncaughtExcepti
ExecutionInstantiation executionInstantiation = new ExecutionInstantiation(configuration.getContext()); ExecutionInstantiation executionInstantiation = new ExecutionInstantiation(configuration.getContext());
executionInstantiation.instantiatePipes(); executionInstantiation.instantiatePipes();
final Set<Stage> threadableStageJobs = this.configuration.getContext().getThreadableStages(); final Set<Stage> threadableStageJobs = this.configuration.getContext().getThreadableStages().keySet();
if (threadableStageJobs.isEmpty()) { if (threadableStageJobs.isEmpty()) {
throw new IllegalStateException("No stage was added using the addThreadableStage(..) method. Add at least one stage."); throw new IllegalStateException("No stage was added using the addThreadableStage(..) method. Add at least one stage.");
} }
...@@ -182,7 +183,7 @@ public final class Execution<T extends Configuration> implements UncaughtExcepti ...@@ -182,7 +183,7 @@ public final class Execution<T extends Configuration> implements UncaughtExcepti
private Thread createThread(final AbstractRunnableStage runnable, final String name) { private Thread createThread(final AbstractRunnableStage runnable, final String name) {
final Thread thread = new Thread(runnable); final Thread thread = new Thread(runnable);
thread.setUncaughtExceptionHandler(this); thread.setUncaughtExceptionHandler(this);
thread.setName(name); thread.setName(configuration.getContext().getThreadableStages().get(runnable.stage));
return thread; return thread;
} }
...@@ -300,7 +301,7 @@ public final class Execution<T extends Configuration> implements UncaughtExcepti ...@@ -300,7 +301,7 @@ public final class Execution<T extends Configuration> implements UncaughtExcepti
if (!executionInterrupted) { if (!executionInterrupted) {
executionInterrupted = true; executionInterrupted = true;
LOGGER.warn("Thread " + thread + " was interrupted. Terminating analysis now."); LOGGER.warn("Thread " + thread + " was interrupted. Terminating analysis now.");
for (Stage stage : configuration.getContext().getThreadableStages()) { for (Stage stage : configuration.getContext().getThreadableStages().keySet()) {
if (stage.getOwningThread() != thread) { if (stage.getOwningThread() != thread) {
if (stage.getTerminationStrategy() == TerminationStrategy.BY_SELF_DECISION) { if (stage.getTerminationStrategy() == TerminationStrategy.BY_SELF_DECISION) {
stage.terminate(); stage.terminate();
......
...@@ -45,7 +45,7 @@ class ExecutionInstantiation { ...@@ -45,7 +45,7 @@ class ExecutionInstantiation {
@SuppressWarnings("rawtypes") @SuppressWarnings("rawtypes")
Integer colorAndConnectStages(final Integer i, final Map<Stage, Integer> colors, final Stage threadableStage, final ConfigurationContext configuration) { Integer colorAndConnectStages(final Integer i, final Map<Stage, Integer> colors, final Stage threadableStage, final ConfigurationContext configuration) {
Integer createdConnections = new Integer(0); Integer createdConnections = new Integer(0);
Set<Stage> threadableStageJobs = configuration.getThreadableStages(); Set<Stage> threadableStageJobs = configuration.getThreadableStages().keySet();
for (OutputPort outputPort : threadableStage.getOutputPorts()) { for (OutputPort outputPort : threadableStage.getOutputPorts()) {
if (outputPort.pipe != null) { if (outputPort.pipe != null) {
if (outputPort.pipe instanceof InstantiationPipe) { if (outputPort.pipe instanceof InstantiationPipe) {
...@@ -82,7 +82,7 @@ class ExecutionInstantiation { ...@@ -82,7 +82,7 @@ class ExecutionInstantiation {
void instantiatePipes() { void instantiatePipes() {
Integer i = new Integer(0); Integer i = new Integer(0);
Map<Stage, Integer> colors = new HashMap<Stage, Integer>(); Map<Stage, Integer> colors = new HashMap<Stage, Integer>();
Set<Stage> threadableStageJobs = configuration.getThreadableStages(); Set<Stage> threadableStageJobs = configuration.getThreadableStages().keySet();
Integer createdConnections = 0; Integer createdConnections = 0;
for (Stage threadableStage : threadableStageJobs) { for (Stage threadableStage : threadableStageJobs) {
i++; i++;
......
...@@ -176,4 +176,23 @@ public class ExecutionTest { ...@@ -176,4 +176,23 @@ public class ExecutionTest {
} }
@Test
public void threadNameing() {
NameConfig configuration = new NameConfig();
Execution<NameConfig> execution = new Execution<NameConfig>(configuration);
assertThat(configuration.stageWithNamedThread.getOwningThread().getName(), is("TestName"));
}
private class NameConfig extends Configuration {
public InitialElementProducer<Object> stageWithNamedThread;
public NameConfig() {
stageWithNamedThread = new InitialElementProducer<Object>(new Object());
addThreadableStage(stageWithNamedThread, "TestName");
connectPorts(stageWithNamedThread.getOutputPort(), new Sink().getInputPort());
}
}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment