From e1b8147bd0bb28d8b386f96026856ccb89ce6115 Mon Sep 17 00:00:00 2001 From: Nelson Tavares de Sousa <ntd@informatik.uni-kiel.de> Date: Wed, 15 Apr 2015 16:48:19 +0200 Subject: [PATCH] intraStages are now collected and saved in ARunnableStage --- .../framework/AbstractRunnableStage.java | 13 ++++++++++- src/main/java/teetime/framework/Analysis.java | 22 ++++++++++++++++--- src/site/markdown/wiki | 2 +- ...ndling.java => ExceptionHandlingTest.java} | 4 ++-- 4 files changed, 34 insertions(+), 7 deletions(-) rename src/test/java/teetime/framework/exceptionHandling/{ExceptionHandling.java => ExceptionHandlingTest.java} (98%) diff --git a/src/main/java/teetime/framework/AbstractRunnableStage.java b/src/main/java/teetime/framework/AbstractRunnableStage.java index 21c6616c..bdb27075 100644 --- a/src/main/java/teetime/framework/AbstractRunnableStage.java +++ b/src/main/java/teetime/framework/AbstractRunnableStage.java @@ -15,17 +15,20 @@ */ package teetime.framework; +import java.util.Set; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import teetime.framework.exceptionHandling.StageException; import teetime.framework.exceptionHandling.AbstractExceptionListener; import teetime.framework.exceptionHandling.AbstractExceptionListener.FurtherExecution; +import teetime.framework.exceptionHandling.StageException; import teetime.framework.signal.TerminatingSignal; abstract class AbstractRunnableStage implements Runnable { private final AbstractExceptionListener exceptionHandler; + private Set<Stage> intraStages; private static final String TERMINATING_THREAD_DUE_TO_THE_FOLLOWING_EXCEPTION = "Terminating thread due to the following exception: "; @@ -87,4 +90,12 @@ abstract class AbstractRunnableStage implements Runnable { protected abstract void executeStage(Stage stage); protected abstract void afterStageExecution(Stage stage); + + public Set<Stage> getIntraStages() { + return intraStages; + } + + public void setIntraStages(final Set<Stage> intraStages) { + this.intraStages = intraStages; + } } diff --git a/src/main/java/teetime/framework/Analysis.java b/src/main/java/teetime/framework/Analysis.java index 5c333e6e..43be9ee4 100644 --- a/src/main/java/teetime/framework/Analysis.java +++ b/src/main/java/teetime/framework/Analysis.java @@ -17,8 +17,10 @@ package teetime.framework; import java.lang.Thread.UncaughtExceptionHandler; import java.util.Collection; +import java.util.HashSet; import java.util.LinkedList; import java.util.List; +import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; import org.slf4j.Logger; @@ -112,9 +114,9 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught } /** - * This initializes Analysis and needs to be run right before starting it. + * This initializes the analysis and needs to be run right before starting it. * - * @deprecated since 1.1 + * @deprecated since 1.1, analysis will be initialized automatically by the framework */ @Deprecated public final void init() { @@ -127,12 +129,15 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught if (threadableStageJobs.isEmpty()) { throw new IllegalStateException("No stage was added using the addThreadableStage(..) method. Add at least one stage."); } + AbstractExceptionListener newListener; + Set<Stage> intraStages; for (Stage stage : threadableStageJobs) { - AbstractExceptionListener newListener; + intraStages = traverseIntraStages(stage); newListener = factory.create(); switch (stage.getTerminationStrategy()) { case BY_SIGNAL: { final RunnableConsumerStage runnableConsumerStage = new RunnableConsumerStage(stage, newListener); + runnableConsumerStage.setIntraStages(intraStages); final Thread thread = new Thread(runnableConsumerStage); stage.setOwningThread(thread); this.consumerThreads.add(thread); @@ -141,6 +146,7 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught } case BY_SELF_DECISION: { final RunnableProducerStage runnable = new RunnableProducerStage(stage, newListener); + runnable.setIntraStages(intraStages); final Thread thread = new Thread(runnable); stage.setOwningThread(thread); this.finiteProducerThreads.add(thread); @@ -149,6 +155,7 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught } case BY_INTERRUPT: { final RunnableProducerStage runnable = new RunnableProducerStage(stage, newListener); + runnable.setIntraStages(intraStages); final Thread thread = new Thread(runnable); stage.setOwningThread(thread); this.infiniteProducerThreads.add(thread); @@ -310,4 +317,13 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught } this.exceptions.add(Pair.of(thread, throwable)); } + + private Set<Stage> traverseIntraStages(final Stage stage) { + final Traversor traversor = new Traversor(new IntraStageVisitor()); + if (stage.getOutputPorts().length == 0) { + return new HashSet<Stage>(); + } + traversor.traverse(stage, stage.getOutputPorts()[0].getPipe()); + return traversor.getVisitedStage(); + } } diff --git a/src/site/markdown/wiki b/src/site/markdown/wiki index 0e447457..162510ff 160000 --- a/src/site/markdown/wiki +++ b/src/site/markdown/wiki @@ -1 +1 @@ -Subproject commit 0e4474577e1f49bc96e734c286b2d9e0363895e8 +Subproject commit 162510ff4d2f04011498ba6920aae0c78347c6c8 diff --git a/src/test/java/teetime/framework/exceptionHandling/ExceptionHandling.java b/src/test/java/teetime/framework/exceptionHandling/ExceptionHandlingTest.java similarity index 98% rename from src/test/java/teetime/framework/exceptionHandling/ExceptionHandling.java rename to src/test/java/teetime/framework/exceptionHandling/ExceptionHandlingTest.java index e126f0a9..8aded34b 100644 --- a/src/test/java/teetime/framework/exceptionHandling/ExceptionHandling.java +++ b/src/test/java/teetime/framework/exceptionHandling/ExceptionHandlingTest.java @@ -23,7 +23,7 @@ import org.junit.Test; import teetime.framework.Analysis; -public class ExceptionHandling { +public class ExceptionHandlingTest { private Analysis analysis; @@ -49,8 +49,8 @@ public class ExceptionHandling { * SpScPipe.add and cycle through the sleep method. As a result, the thread will never return to the point * where it checks if it should be terminated. */ - @Test(timeout = 30000) @Ignore + @Test(timeout = 30000) public void forAFewTimes() { for (int i = 0; i < 1000; i++) { newInstances(); -- GitLab