diff --git a/src/changes/changes.xml b/src/changes/changes.xml index 23c5758a7a62d1b93f768473b3066d374ba5c6a1..6ed7a57dd69949696e17dd850c36b61a52c536c3 100644 --- a/src/changes/changes.xml +++ b/src/changes/changes.xml @@ -6,7 +6,7 @@ </properties> <body> <release version="Snapshot" date="Daily basis" - description="Unstable preview of oncoming versions"> + description="Unstable preview of oncoming versions - 2.0-SNAPSHOT"> <action dev="ntd" type="add" issue="33"> TeeTime automatically chooses the correct type of pipe for all connections. @@ -21,19 +21,24 @@ </action> <action dev="ntd" type="add" issue="171"> Configurations are now - built within an AnalysisContext which is passed on to nested + built within the Configuration class which is passed on to nested CompositeStages. This removes any constraints on CompositeStages and enables therefore multiple connections and multithreading. </action> + <action dev="ntd" type="update"> + Renamed Analysis to Execution + </action> <action dev="ntd" type="remove"> - Marked Pair class as deprecated. + Removed pair class. </action> <action dev="ntd" type="add" issue="154"> All stages will be initialized before starting the analysis. </action> - + <action dev="ntd" type="add" issue="122"> + Threads can be named for better debugging. + </action> </release> <release version="1.1.2" date="12.05.2015" description="Minor bugfixes for 1.1"> diff --git a/src/main/java/teetime/framework/AbstractCompositeStage.java b/src/main/java/teetime/framework/AbstractCompositeStage.java index 0ffd7602798fd70b1852659e0e098bdf76367575..26e8685ef260a18ca2dcf1705cf71f98bff4ba92 100644 --- a/src/main/java/teetime/framework/AbstractCompositeStage.java +++ b/src/main/java/teetime/framework/AbstractCompositeStage.java @@ -43,6 +43,18 @@ public abstract class AbstractCompositeStage { return context; } + /** + * Execute this method, to add a stage to the configuration, which should be executed in a own thread. + * + * @param stage + * A arbitrary stage, which will be added to the configuration and executed in a thread. + * @param threadName + * A string which can be used for debugging. + */ + protected final void addThreadableStage(final Stage stage, final String threadName) { + context.addThreadableStage(stage, threadName); + } + /** * Execute this method, to add a stage to the configuration, which should be executed in a own thread. * @@ -50,7 +62,7 @@ public abstract class AbstractCompositeStage { * A arbitrary stage, which will be added to the configuration and executed in a thread. */ protected final void addThreadableStage(final Stage stage) { - context.addThreadableStage(stage); + this.addThreadableStage(stage, stage.getId()); } /** diff --git a/src/main/java/teetime/framework/ConfigurationContext.java b/src/main/java/teetime/framework/ConfigurationContext.java index e039883240a4c5809febb778d19d3f9c42c5a7d5..483b1ecbdb93ee23cfd9fec89906d947ba1d9787 100644 --- a/src/main/java/teetime/framework/ConfigurationContext.java +++ b/src/main/java/teetime/framework/ConfigurationContext.java @@ -15,8 +15,8 @@ */ package teetime.framework; -import java.util.HashSet; -import java.util.Set; +import java.util.HashMap; +import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,19 +33,19 @@ public final class ConfigurationContext { private static final Logger LOGGER = LoggerFactory.getLogger(ConfigurationContext.class); - private final Set<Stage> threadableStages = new HashSet<Stage>(); + private final Map<Stage, String> threadableStages = new HashMap<Stage, String>(); ConfigurationContext() {} - Set<Stage> getThreadableStages() { + Map<Stage, String> getThreadableStages() { return this.threadableStages; } /** * @see AbstractCompositeStage#addThreadableStage(Stage) */ - final void addThreadableStage(final Stage stage) { - if (!this.threadableStages.add(stage)) { + final void addThreadableStage(final Stage stage, final String threadName) { + if (this.threadableStages.put(stage, threadName) != null) { LOGGER.warn("Stage " + stage.getId() + " was already marked as threadable stage."); } } @@ -54,8 +54,10 @@ public final class ConfigurationContext { * @see AbstractCompositeStage#connectPorts(OutputPort, InputPort, int) */ final <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { - if (sourcePort.getOwningStage().getInputPorts().length == 0 && !threadableStages.contains(sourcePort.getOwningStage())) { - addThreadableStage(sourcePort.getOwningStage()); + if (sourcePort.getOwningStage().getInputPorts().length == 0) { + if (!threadableStages.containsKey(sourcePort.getOwningStage())) { + addThreadableStage(sourcePort.getOwningStage(), sourcePort.getOwningStage().getId()); + } } if (sourcePort.getPipe() != null || targetPort.getPipe() != null) { LOGGER.warn("Overwriting existing pipe while connecting stages " + diff --git a/src/main/java/teetime/framework/Execution.java b/src/main/java/teetime/framework/Execution.java index 5ad6fd52d0f5dcaaedf0b6d65b48cbcdd1283146..7a6710b83efe758fa00d4247f3cde83daac4a19a 100644 --- a/src/main/java/teetime/framework/Execution.java +++ b/src/main/java/teetime/framework/Execution.java @@ -19,6 +19,7 @@ import java.lang.Thread.UncaughtExceptionHandler; import java.util.Collection; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; @@ -102,8 +103,8 @@ public final class Execution<T extends Configuration> implements UncaughtExcepti // BETTER validate concurrently private void validateStages() { - final Set<Stage> threadableStageJobs = this.configuration.getContext().getThreadableStages(); - for (Stage stage : threadableStageJobs) { + final Map<Stage, String> threadableStageJobs = this.configuration.getContext().getThreadableStages(); + for (Stage stage : threadableStageJobs.keySet()) { // // portConnectionValidator.validate(stage); // } @@ -123,7 +124,7 @@ public final class Execution<T extends Configuration> implements UncaughtExcepti ExecutionInstantiation executionInstantiation = new ExecutionInstantiation(configuration.getContext()); executionInstantiation.instantiatePipes(); - final Set<Stage> threadableStageJobs = this.configuration.getContext().getThreadableStages(); + final Set<Stage> threadableStageJobs = this.configuration.getContext().getThreadableStages().keySet(); if (threadableStageJobs.isEmpty()) { throw new IllegalStateException("No stage was added using the addThreadableStage(..) method. Add at least one stage."); } @@ -182,7 +183,7 @@ public final class Execution<T extends Configuration> implements UncaughtExcepti private Thread createThread(final AbstractRunnableStage runnable, final String name) { final Thread thread = new Thread(runnable); thread.setUncaughtExceptionHandler(this); - thread.setName(name); + thread.setName(configuration.getContext().getThreadableStages().get(runnable.stage)); return thread; } @@ -300,7 +301,7 @@ public final class Execution<T extends Configuration> implements UncaughtExcepti if (!executionInterrupted) { executionInterrupted = true; LOGGER.warn("Thread " + thread + " was interrupted. Terminating analysis now."); - for (Stage stage : configuration.getContext().getThreadableStages()) { + for (Stage stage : configuration.getContext().getThreadableStages().keySet()) { if (stage.getOwningThread() != thread) { if (stage.getTerminationStrategy() == TerminationStrategy.BY_SELF_DECISION) { stage.terminate(); diff --git a/src/main/java/teetime/framework/ExecutionInstantiation.java b/src/main/java/teetime/framework/ExecutionInstantiation.java index 8a8de620134b47b154abcec31ee1f6b41861e43a..cea2bcf2e29f8ba38a86033100186e7fcb9655a1 100644 --- a/src/main/java/teetime/framework/ExecutionInstantiation.java +++ b/src/main/java/teetime/framework/ExecutionInstantiation.java @@ -45,7 +45,7 @@ class ExecutionInstantiation { @SuppressWarnings("rawtypes") Integer colorAndConnectStages(final Integer i, final Map<Stage, Integer> colors, final Stage threadableStage, final ConfigurationContext configuration) { Integer createdConnections = new Integer(0); - Set<Stage> threadableStageJobs = configuration.getThreadableStages(); + Set<Stage> threadableStageJobs = configuration.getThreadableStages().keySet(); for (OutputPort outputPort : threadableStage.getOutputPorts()) { if (outputPort.pipe != null) { if (outputPort.pipe instanceof InstantiationPipe) { @@ -82,7 +82,7 @@ class ExecutionInstantiation { void instantiatePipes() { Integer i = new Integer(0); Map<Stage, Integer> colors = new HashMap<Stage, Integer>(); - Set<Stage> threadableStageJobs = configuration.getThreadableStages(); + Set<Stage> threadableStageJobs = configuration.getThreadableStages().keySet(); Integer createdConnections = 0; for (Stage threadableStage : threadableStageJobs) { i++; diff --git a/src/test/java/teetime/framework/ExecutionTest.java b/src/test/java/teetime/framework/ExecutionTest.java index 028513cf2a29be48d60d00445efb186fade57d70..bc7d850c158edd47da941c1f9bbea2dbd896ec95 100644 --- a/src/test/java/teetime/framework/ExecutionTest.java +++ b/src/test/java/teetime/framework/ExecutionTest.java @@ -176,4 +176,23 @@ public class ExecutionTest { } + @Test + public void threadNameing() { + NameConfig configuration = new NameConfig(); + Execution<NameConfig> execution = new Execution<NameConfig>(configuration); + assertThat(configuration.stageWithNamedThread.getOwningThread().getName(), is("TestName")); + } + + private class NameConfig extends Configuration { + + public InitialElementProducer<Object> stageWithNamedThread; + + public NameConfig() { + stageWithNamedThread = new InitialElementProducer<Object>(new Object()); + addThreadableStage(stageWithNamedThread, "TestName"); + connectPorts(stageWithNamedThread.getOutputPort(), new Sink().getInputPort()); + } + + } + }