From 7ee2b2c3a14d9e621015b6400c2911fa7ce7933f Mon Sep 17 00:00:00 2001 From: Nelson Tavares de Sousa <stu103017@mail.uni-kiel.de> Date: Tue, 23 Jun 2015 16:57:35 +0200 Subject: [PATCH] first prototype for stage nameing --- .../framework/AbstractCompositeStage.java | 14 +++++++++++++- .../teetime/framework/ConfigurationContext.java | 16 ++++++++-------- src/main/java/teetime/framework/Execution.java | 11 ++++++----- .../framework/ExecutionInstantiation.java | 4 ++-- 4 files changed, 29 insertions(+), 16 deletions(-) diff --git a/src/main/java/teetime/framework/AbstractCompositeStage.java b/src/main/java/teetime/framework/AbstractCompositeStage.java index 0ffd7602..26e8685e 100644 --- a/src/main/java/teetime/framework/AbstractCompositeStage.java +++ b/src/main/java/teetime/framework/AbstractCompositeStage.java @@ -43,6 +43,18 @@ public abstract class AbstractCompositeStage { return context; } + /** + * Execute this method, to add a stage to the configuration, which should be executed in a own thread. + * + * @param stage + * A arbitrary stage, which will be added to the configuration and executed in a thread. + * @param threadName + * A string which can be used for debugging. + */ + protected final void addThreadableStage(final Stage stage, final String threadName) { + context.addThreadableStage(stage, threadName); + } + /** * Execute this method, to add a stage to the configuration, which should be executed in a own thread. * @@ -50,7 +62,7 @@ public abstract class AbstractCompositeStage { * A arbitrary stage, which will be added to the configuration and executed in a thread. */ protected final void addThreadableStage(final Stage stage) { - context.addThreadableStage(stage); + this.addThreadableStage(stage, stage.getId()); } /** diff --git a/src/main/java/teetime/framework/ConfigurationContext.java b/src/main/java/teetime/framework/ConfigurationContext.java index e0398832..444bafaa 100644 --- a/src/main/java/teetime/framework/ConfigurationContext.java +++ b/src/main/java/teetime/framework/ConfigurationContext.java @@ -15,8 +15,8 @@ */ package teetime.framework; -import java.util.HashSet; -import java.util.Set; +import java.util.HashMap; +import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,19 +33,19 @@ public final class ConfigurationContext { private static final Logger LOGGER = LoggerFactory.getLogger(ConfigurationContext.class); - private final Set<Stage> threadableStages = new HashSet<Stage>(); + private final Map<Stage, String> threadableStages = new HashMap<Stage, String>(); ConfigurationContext() {} - Set<Stage> getThreadableStages() { + Map<Stage, String> getThreadableStages() { return this.threadableStages; } /** * @see AbstractCompositeStage#addThreadableStage(Stage) */ - final void addThreadableStage(final Stage stage) { - if (!this.threadableStages.add(stage)) { + final void addThreadableStage(final Stage stage, final String threadName) { + if (this.threadableStages.put(stage, threadName) != null) { LOGGER.warn("Stage " + stage.getId() + " was already marked as threadable stage."); } } @@ -54,8 +54,8 @@ public final class ConfigurationContext { * @see AbstractCompositeStage#connectPorts(OutputPort, InputPort, int) */ final <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { - if (sourcePort.getOwningStage().getInputPorts().length == 0 && !threadableStages.contains(sourcePort.getOwningStage())) { - addThreadableStage(sourcePort.getOwningStage()); + if (sourcePort.getOwningStage().getInputPorts().length == 0) { + addThreadableStage(sourcePort.getOwningStage(), sourcePort.getOwningStage().getId()); } if (sourcePort.getPipe() != null || targetPort.getPipe() != null) { LOGGER.warn("Overwriting existing pipe while connecting stages " + diff --git a/src/main/java/teetime/framework/Execution.java b/src/main/java/teetime/framework/Execution.java index fd15a5af..ac6791ee 100644 --- a/src/main/java/teetime/framework/Execution.java +++ b/src/main/java/teetime/framework/Execution.java @@ -19,6 +19,7 @@ import java.lang.Thread.UncaughtExceptionHandler; import java.util.Collection; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; @@ -102,8 +103,8 @@ public final class Execution<T extends Configuration> implements UncaughtExcepti // BETTER validate concurrently private void validateStages() { - final Set<Stage> threadableStageJobs = this.configuration.getContext().getThreadableStages(); - for (Stage stage : threadableStageJobs) { + final Map<Stage, String> threadableStageJobs = this.configuration.getContext().getThreadableStages(); + for (Stage stage : threadableStageJobs.keySet()) { // // portConnectionValidator.validate(stage); // } @@ -123,7 +124,7 @@ public final class Execution<T extends Configuration> implements UncaughtExcepti ExecutionInstantiation executionInstantiation = new ExecutionInstantiation(configuration.getContext()); executionInstantiation.instantiatePipes(); - final Set<Stage> threadableStageJobs = this.configuration.getContext().getThreadableStages(); + final Set<Stage> threadableStageJobs = this.configuration.getContext().getThreadableStages().keySet(); if (threadableStageJobs.isEmpty()) { throw new IllegalStateException("No stage was added using the addThreadableStage(..) method. Add at least one stage."); } @@ -182,7 +183,7 @@ public final class Execution<T extends Configuration> implements UncaughtExcepti private Thread createThread(final AbstractRunnableStage runnable, final String name) { final Thread thread = new Thread(runnable); thread.setUncaughtExceptionHandler(this); - thread.setName(name); + thread.setName(configuration.getContext().getThreadableStages().get(runnable.stage)); return thread; } @@ -300,7 +301,7 @@ public final class Execution<T extends Configuration> implements UncaughtExcepti if (!executionInterrupted) { executionInterrupted = true; LOGGER.warn("Thread " + thread + " was interrupted. Terminating analysis now."); - for (Stage stage : configuration.getContext().getThreadableStages()) { + for (Stage stage : configuration.getContext().getThreadableStages().keySet()) { if (stage.getOwningThread() != thread) { if (stage.getTerminationStrategy() == TerminationStrategy.BY_SELF_DECISION) { stage.terminate(); diff --git a/src/main/java/teetime/framework/ExecutionInstantiation.java b/src/main/java/teetime/framework/ExecutionInstantiation.java index 8a8de620..cea2bcf2 100644 --- a/src/main/java/teetime/framework/ExecutionInstantiation.java +++ b/src/main/java/teetime/framework/ExecutionInstantiation.java @@ -45,7 +45,7 @@ class ExecutionInstantiation { @SuppressWarnings("rawtypes") Integer colorAndConnectStages(final Integer i, final Map<Stage, Integer> colors, final Stage threadableStage, final ConfigurationContext configuration) { Integer createdConnections = new Integer(0); - Set<Stage> threadableStageJobs = configuration.getThreadableStages(); + Set<Stage> threadableStageJobs = configuration.getThreadableStages().keySet(); for (OutputPort outputPort : threadableStage.getOutputPorts()) { if (outputPort.pipe != null) { if (outputPort.pipe instanceof InstantiationPipe) { @@ -82,7 +82,7 @@ class ExecutionInstantiation { void instantiatePipes() { Integer i = new Integer(0); Map<Stage, Integer> colors = new HashMap<Stage, Integer>(); - Set<Stage> threadableStageJobs = configuration.getThreadableStages(); + Set<Stage> threadableStageJobs = configuration.getThreadableStages().keySet(); Integer createdConnections = 0; for (Stage threadableStage : threadableStageJobs) { i++; -- GitLab