diff --git a/src/main/java/teetime/framework/A0UnconnectedPort.java b/src/main/java/teetime/framework/A0UnconnectedPort.java new file mode 100644 index 0000000000000000000000000000000000000000..86ef46d145c15bd36da5b85cce7dc4ac84e62227 --- /dev/null +++ b/src/main/java/teetime/framework/A0UnconnectedPort.java @@ -0,0 +1,21 @@ +package teetime.framework; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import teetime.framework.pipe.DummyPipe; + +public class A0UnconnectedPort implements IPortVisitor { + + private static final Logger LOGGER = LoggerFactory.getLogger(A0UnconnectedPort.class); + + @Override + public void visit(final AbstractPort<?> port) { + if (port.getPipe() == null) { + if (LOGGER.isInfoEnabled()) { + LOGGER.info("Unconnected output port: " + port + ". Connecting with a dummy output port."); + } + port.setPipe(DummyPipe.INSTANCE); + } + } +} diff --git a/src/main/java/teetime/framework/A1PipeInstantiation.java b/src/main/java/teetime/framework/A1PipeInstantiation.java new file mode 100644 index 0000000000000000000000000000000000000000..3b7c8aabd922c7419234a4d57239a61b698c0c7c --- /dev/null +++ b/src/main/java/teetime/framework/A1PipeInstantiation.java @@ -0,0 +1,56 @@ +package teetime.framework; + +import java.util.HashSet; +import java.util.Set; + +import teetime.framework.pipe.IPipe; +import teetime.framework.pipe.IPipeFactory; +import teetime.framework.pipe.InstantiationPipe; +import teetime.framework.pipe.SingleElementPipeFactory; +import teetime.framework.pipe.SpScPipeFactory; +import teetime.framework.pipe.UnboundedSpScPipeFactory; + +public class A1PipeInstantiation implements IPipeVisitor { + + private static final IPipeFactory interBoundedThreadPipeFactory = new SpScPipeFactory(); + private static final IPipeFactory interUnboundedThreadPipeFactory = new UnboundedSpScPipeFactory(); + private static final IPipeFactory intraThreadPipeFactory = new SingleElementPipeFactory(); + + private final Set<IPipe<?>> visitedPipes = new HashSet<IPipe<?>>(); + + @Override + public VisitorBehavior visit(final IPipe<?> pipe) { + if (visitedPipes.contains(pipe)) { + return VisitorBehavior.STOP; + } + visitedPipes.add(pipe); + + instantiatePipe(pipe); + + return VisitorBehavior.CONTINUE; + } + + private <T> void instantiatePipe(final IPipe<T> pipe) { + if (!(pipe instanceof InstantiationPipe)) { // if manually connected + return; + } + + Thread sourceStageThread = pipe.getSourcePort().getOwningStage().getOwningThread(); + Thread targetStageThread = pipe.getTargetPort().getOwningStage().getOwningThread(); + + if (targetStageThread != null && sourceStageThread != targetStageThread) { + // inter + if (pipe.capacity() != 0) { + interBoundedThreadPipeFactory.create(pipe.getSourcePort(), pipe.getTargetPort(), pipe.capacity()); + } else { + interUnboundedThreadPipeFactory.create(pipe.getSourcePort(), pipe.getTargetPort(), 4); + } + return; + } else { + // normal or reflexive pipe => intra + } + + intraThreadPipeFactory.create(pipe.getSourcePort(), pipe.getTargetPort(), 4); + } + +} diff --git a/src/main/java/teetime/framework/A2ThreadableStageCollector.java b/src/main/java/teetime/framework/A2ThreadableStageCollector.java new file mode 100644 index 0000000000000000000000000000000000000000..efe44233a97aaba468116dd51503c68c8f72047f --- /dev/null +++ b/src/main/java/teetime/framework/A2ThreadableStageCollector.java @@ -0,0 +1,35 @@ +package teetime.framework; + +import java.util.HashSet; +import java.util.Set; + +import teetime.framework.pipe.IPipe; + +public class A2ThreadableStageCollector implements IPipeVisitor { + + private final Set<Stage> threadableStages = new HashSet<Stage>(); + private final Set<IPipe<?>> visitedPipes = new HashSet<IPipe<?>>(); + + public Set<Stage> getThreadableStages() { + return threadableStages; + } + + @Override + public VisitorBehavior visit(final IPipe<?> pipe) { + if (visitedPipes.contains(pipe)) { + return VisitorBehavior.STOP; + } + visitedPipes.add(pipe); + + collectThreadableStage(pipe.getSourcePort().getOwningStage()); + collectThreadableStage(pipe.getTargetPort().getOwningStage()); + + return VisitorBehavior.CONTINUE; + } + + private void collectThreadableStage(final Stage stage) { + if (stage.getOwningThread() != null && !threadableStages.contains(stage)) { + threadableStages.add(stage); + } + } +} diff --git a/src/main/java/teetime/framework/A3InvalidThreadAssignmentCheck.java b/src/main/java/teetime/framework/A3InvalidThreadAssignmentCheck.java new file mode 100644 index 0000000000000000000000000000000000000000..4120bae5aa0d6c59835c94fb988f1d2adefcb949 --- /dev/null +++ b/src/main/java/teetime/framework/A3InvalidThreadAssignmentCheck.java @@ -0,0 +1,79 @@ +package teetime.framework; + +import java.util.Set; + +import teetime.framework.pipe.DummyPipe; +import teetime.framework.pipe.IPipe; + +import com.carrotsearch.hppc.ObjectIntHashMap; +import com.carrotsearch.hppc.ObjectIntMap; + +public class A3InvalidThreadAssignmentCheck { + + private static final int DEFAULT_COLOR = 0; + + private final Set<Stage> threadableStages; + + public A3InvalidThreadAssignmentCheck(final Set<Stage> threadableStages) { + this.threadableStages = threadableStages; + } + + public void check() { + int color = DEFAULT_COLOR; + ObjectIntMap<Stage> colors = new ObjectIntHashMap<Stage>(); + + for (Stage threadableStage : threadableStages) { + color++; + colors.put(threadableStage, color); + + ThreadPainter threadPainter = new ThreadPainter(colors, color, threadableStages); + threadPainter.check(threadableStage); + } + } + + private static class ThreadPainter { + + private final ObjectIntMap<Stage> colors; + private final int color; + private final Set<Stage> threadableStages; + + public ThreadPainter(final ObjectIntMap<Stage> colors, final int color, final Set<Stage> threadableStages) { + super(); + this.colors = colors; + this.color = color; + this.threadableStages = threadableStages; + } + + // TODO consider to implement it as IPipeVisitor(FORWARD) + + public void check(final Stage stage) { + for (OutputPort<?> outputPort : stage.getOutputPorts()) { + if (outputPort.pipe != DummyPipe.INSTANCE) { + Stage nextStage = checkPipe(outputPort.pipe); + if (nextStage != null) { + check(nextStage); + } + } + } + } + + private Stage checkPipe(final IPipe<?> pipe) { + Stage targetStage = pipe.getTargetPort().getOwningStage(); + int targetColor = colors.containsKey(targetStage) ? colors.get(targetStage) : DEFAULT_COLOR; + + if (threadableStages.contains(targetStage) && targetColor != color) { + // do nothing + } else { + if (colors.containsKey(targetStage)) { + if (colors.get(targetStage) != color) { + throw new IllegalStateException("Crossing threads"); // One stage is connected to a stage of another thread (but not its "headstage") + } + } + colors.put(targetStage, color); + return targetStage; + } + return null; + } + + } +} diff --git a/src/main/java/teetime/framework/A4StageAttributeSetter.java b/src/main/java/teetime/framework/A4StageAttributeSetter.java new file mode 100644 index 0000000000000000000000000000000000000000..862e95cb107310a2dbeaf73b9a6fc0d1575a4f08 --- /dev/null +++ b/src/main/java/teetime/framework/A4StageAttributeSetter.java @@ -0,0 +1,37 @@ +package teetime.framework; + +import java.util.Set; + +public class A4StageAttributeSetter { + + private final Configuration configuration; + private final Set<Stage> threadableStages; + + public A4StageAttributeSetter(final Configuration configuration, final Set<Stage> threadableStages) { + super(); + this.configuration = configuration; + this.threadableStages = threadableStages; + } + + public void setAttributes() { + for (Stage threadableStage : threadableStages) { + IPipeVisitor pipeVisitor = new IntraStageCollector(); + Traverser traverser = new Traverser(pipeVisitor); + traverser.traverse(threadableStage); + + setAttributes(threadableStage, traverser.getVisitedStages()); + } + } + + private void setAttributes(final Stage threadableStage, final Set<Stage> intraStages) { + threadableStage.setExceptionHandler(configuration.getFactory().createInstance()); + // threadableStage.setOwningThread(owningThread); + threadableStage.setOwningContext(configuration.getContext()); + + for (Stage stage : intraStages) { + stage.setExceptionHandler(threadableStage.exceptionListener); + stage.setOwningThread(threadableStage.getOwningThread()); + stage.setOwningContext(threadableStage.getOwningContext()); + } + } +} diff --git a/src/main/java/teetime/framework/AbstractCompositeStage.java b/src/main/java/teetime/framework/AbstractCompositeStage.java index 2a1efcf00374f7efb9fe2cbe5592d2158a73816f..93c9870e5ef763f2d3a7f2cd4a705219d076671a 100644 --- a/src/main/java/teetime/framework/AbstractCompositeStage.java +++ b/src/main/java/teetime/framework/AbstractCompositeStage.java @@ -60,9 +60,11 @@ public abstract class AbstractCompositeStage { * @param threadName * A string which can be used for debugging. */ - protected final void addThreadableStage(final Stage stage, final String threadName) { + protected void addThreadableStage(final Stage stage, final String threadName) { // context.addThreadableStage(stage, threadName); - stage.setOwningThread(new Thread(threadName)); + AbstractRunnableStage runnable = AbstractRunnableStage.create(stage); + Thread newThread = new TeeTimeThread(runnable, threadName); + stage.setOwningThread(newThread); } /** @@ -91,7 +93,7 @@ public abstract class AbstractCompositeStage { * @param <T> * the type of elements to be sent */ - protected final <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { + protected <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { // context.connectPorts(sourcePort, targetPort, capacity); connectPortsInternal(sourcePort, targetPort, capacity); } diff --git a/src/main/java/teetime/framework/AbstractPipe.java b/src/main/java/teetime/framework/AbstractPipe.java index 31b3fbd60bff2ff62e8c9d042b0b286c24b6d982..55c1f55b3a7cc9cdff05aad45b54c49627e5d8d9 100644 --- a/src/main/java/teetime/framework/AbstractPipe.java +++ b/src/main/java/teetime/framework/AbstractPipe.java @@ -69,4 +69,9 @@ public abstract class AbstractPipe<T> implements IPipe<T> { public final int capacity() { return capacity; } + + @Override + public String toString() { + return sourcePort.getOwningStage().getId() + " -> " + targetPort.getOwningStage().getId() + " (" + super.toString() + ")"; + } } diff --git a/src/main/java/teetime/framework/AbstractRunnableStage.java b/src/main/java/teetime/framework/AbstractRunnableStage.java index 2fe5827caffa46a45cf1ab60913a66916586b8c2..10d0a890a4e6ac886115203350a232145f8791e0 100644 --- a/src/main/java/teetime/framework/AbstractRunnableStage.java +++ b/src/main/java/teetime/framework/AbstractRunnableStage.java @@ -28,17 +28,13 @@ abstract class AbstractRunnableStage implements Runnable { @SuppressWarnings("PMD.LoggerIsNotStaticFinal") protected final Logger logger; - public AbstractRunnableStage(final Stage stage) { + protected AbstractRunnableStage(final Stage stage) { if (stage == null) { throw new IllegalArgumentException("Argument stage may not be null"); } this.stage = stage; this.logger = LoggerFactory.getLogger(stage.getClass()); - - if (stage.getTerminationStrategy() != TerminationStrategy.BY_INTERRUPT) { - stage.owningContext.getThreadService().getRunnableCounter().inc(); - } } @Override @@ -55,7 +51,7 @@ abstract class AbstractRunnableStage implements Runnable { } while (!Thread.currentThread().isInterrupted()); } catch (TerminateException e) { this.stage.terminate(); - stage.owningContext.abortConfigurationRun(); + stage.getOwningContext().abortConfigurationRun(); } finally { afterStageExecution(); } @@ -68,7 +64,7 @@ abstract class AbstractRunnableStage implements Runnable { } } finally { if (stage.getTerminationStrategy() != TerminationStrategy.BY_INTERRUPT) { - stage.owningContext.getThreadService().getRunnableCounter().dec(); + stage.getOwningContext().getThreadService().getRunnableCounter().dec(); } } @@ -81,4 +77,12 @@ abstract class AbstractRunnableStage implements Runnable { protected abstract void afterStageExecution(); + public static AbstractRunnableStage create(final Stage stage) { + if (stage.getTerminationStrategy() == TerminationStrategy.BY_SIGNAL) { + return new RunnableConsumerStage(stage); + } else { + return new RunnableProducerStage(stage); + } + } + } diff --git a/src/main/java/teetime/framework/AbstractService.java b/src/main/java/teetime/framework/AbstractService.java index 029a6ef6d37792c0da3747e388ec87efa2b2a4a6..c103b03cab44a26b45f8faffdaa98778aee83863 100644 --- a/src/main/java/teetime/framework/AbstractService.java +++ b/src/main/java/teetime/framework/AbstractService.java @@ -22,6 +22,6 @@ abstract class AbstractService<T> { abstract void onFinish(); - abstract void merge(T source); + // abstract void merge(T source); } diff --git a/src/main/java/teetime/framework/AbstractStage.java b/src/main/java/teetime/framework/AbstractStage.java index 4d0218d23051bb115fbfb0dff4f097f72d098857..ec7070130bc0f6c3e3a41361b9282c192938f085 100644 --- a/src/main/java/teetime/framework/AbstractStage.java +++ b/src/main/java/teetime/framework/AbstractStage.java @@ -19,7 +19,6 @@ import java.util.HashSet; import java.util.List; import java.util.Set; -import teetime.framework.pipe.DummyPipe; import teetime.framework.pipe.IPipe; import teetime.framework.signal.ISignal; import teetime.framework.validation.InvalidPortConnection; @@ -28,8 +27,6 @@ import teetime.util.framework.port.PortRemovedListener; public abstract class AbstractStage extends Stage { - private static final IPipe DUMMY_PIPE = new DummyPipe(); - private final Set<Class<? extends ISignal>> triggeredSignalTypes = new HashSet<Class<? extends ISignal>>(); private final PortList<InputPort<?>> inputPorts = new PortList<InputPort<?>>(); @@ -90,22 +87,9 @@ public abstract class AbstractStage extends Stage { @Override public void onInitializing() throws Exception { - this.connectUnconnectedOutputPorts(); changeState(StageState.INITIALIZED); } - @SuppressWarnings("PMD.DataflowAnomalyAnalysis") - private void connectUnconnectedOutputPorts() { - for (OutputPort<?> outputPort : this.outputPorts.getOpenedPorts()) { - if (null == outputPort.getPipe()) { // if port is unconnected - if (logger.isInfoEnabled()) { - this.logger.info("Unconnected output port: " + outputPort + ". Connecting with a dummy output port."); - } - outputPort.setPipe(DUMMY_PIPE); - } - } - } - private void changeState(final StageState newState) { currentState = newState; logger.trace(newState.toString()); @@ -257,7 +241,7 @@ public abstract class AbstractStage extends Stage { @Override public void validateOutputPorts(final List<InvalidPortConnection> invalidPortConnections) { for (OutputPort<?> outputPort : outputPorts.getOpenedPorts()) { - final IPipe pipe = outputPort.getPipe(); + final IPipe<?> pipe = outputPort.getPipe(); final Class<?> sourcePortType = outputPort.getType(); final Class<?> targetPortType = pipe.getTargetPort().getType(); @@ -271,7 +255,7 @@ public abstract class AbstractStage extends Stage { @Override protected void terminate() { changeState(StageState.TERMINATING); - owningThread.interrupt(); + getOwningThread().interrupt(); } @Override diff --git a/src/main/java/teetime/framework/Configuration.java b/src/main/java/teetime/framework/Configuration.java index 77a3d00b12afe404455e002721ee07407021f4a4..418569c62fd9a946da894194e7aa33febac8e700 100644 --- a/src/main/java/teetime/framework/Configuration.java +++ b/src/main/java/teetime/framework/Configuration.java @@ -28,9 +28,11 @@ import teetime.framework.exceptionHandling.TerminatingExceptionListenerFactory; */ public abstract class Configuration extends AbstractCompositeStage { - private boolean executed; - private final IExceptionListenerFactory factory; + private final ConfigurationContext context; + + private boolean executed; + private Stage startStage; protected Configuration() { this(new TerminatingExceptionListenerFactory()); @@ -38,6 +40,7 @@ public abstract class Configuration extends AbstractCompositeStage { protected Configuration(final IExceptionListenerFactory factory) { this.factory = factory; + this.context = new ConfigurationContext(this); } boolean isExecuted() { @@ -52,4 +55,24 @@ public abstract class Configuration extends AbstractCompositeStage { return factory; } + @Override + protected void addThreadableStage(final Stage stage, final String threadName) { + startStage = stage; // memorize an arbitrary stage as starting point for traversing + super.addThreadableStage(stage, threadName); + } + + @Override + protected <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { + startStage = sourcePort.getOwningStage(); // memorize an arbitrary stage as starting point for traversing + super.connectPorts(sourcePort, targetPort, capacity); + } + + ConfigurationContext getContext() { + return context; + } + + Stage getStartStage() { + return startStage; + } + } diff --git a/src/main/java/teetime/framework/ConfigurationContext.java b/src/main/java/teetime/framework/ConfigurationContext.java index 41fe620bb45dad13e1345cdf33554236fe3ee525..38c9d9cc706c97f048bbb73ff6cc85db7e912b21 100644 --- a/src/main/java/teetime/framework/ConfigurationContext.java +++ b/src/main/java/teetime/framework/ConfigurationContext.java @@ -15,15 +15,8 @@ */ package teetime.framework; -import java.util.HashSet; -import java.util.Map; import java.util.Set; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import teetime.framework.pipe.InstantiationPipe; - /** * Represents a context that is used by a configuration and composite stages to connect ports, for example. * Stages can be added by executing {@link #addThreadableStage(Stage)}. @@ -32,78 +25,78 @@ import teetime.framework.pipe.InstantiationPipe; */ final class ConfigurationContext { - static final ConfigurationContext EMPTY_CONTEXT = new ConfigurationContext(null); + // static final ConfigurationContext EMPTY_CONTEXT = new ConfigurationContext(null); - private static final Logger LOGGER = LoggerFactory.getLogger(ConfigurationContext.class); + // private static final Logger LOGGER = LoggerFactory.getLogger(ConfigurationContext.class); - private final Set<ConfigurationContext> children = new HashSet<ConfigurationContext>(); // parent-child-tree + // private final Set<ConfigurationContext> children = new HashSet<ConfigurationContext>(); // parent-child-tree private ThreadService threadService; - ConfigurationContext(final AbstractCompositeStage compositeStage) { - this.threadService = new ThreadService(compositeStage); + ConfigurationContext(final Configuration configuration) { + this.threadService = new ThreadService(configuration); } - Map<Stage, String> getThreadableStages() { + Set<Stage> getThreadableStages() { return threadService.getThreadableStages(); } /** * @see AbstractCompositeStage#addThreadableStage(Stage) */ - final void addThreadableStage(final Stage stage, final String threadName) { - addChildContext(stage); - threadService.addThreadableStage(stage, threadName); - } + // final void addThreadableStage(final Stage stage, final String threadName) { + // addChildContext(stage); + // threadService.addThreadableStage(stage, threadName); + // } /** * @see AbstractCompositeStage#connectPorts(OutputPort, InputPort, int) */ - final <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { - if (sourcePort.getOwningStage().getInputPorts().size() == 0) { - if (!threadService.getThreadableStages().containsKey(sourcePort.getOwningStage())) { - addThreadableStage(sourcePort.getOwningStage(), sourcePort.getOwningStage().getId()); - } - } - - if (LOGGER.isWarnEnabled() && (sourcePort.getPipe() != null || targetPort.getPipe() != null)) { - LOGGER.warn("Overwriting existing pipe while connecting stages " + - sourcePort.getOwningStage().getId() + " and " + targetPort.getOwningStage().getId() + "."); - } - - addChildContext(sourcePort.getOwningStage()); - addChildContext(targetPort.getOwningStage()); - - new InstantiationPipe(sourcePort, targetPort, capacity); - } - - final void addChildContext(final Stage stage) { - if (!stage.owningContext.equals(EMPTY_CONTEXT)) { - if (stage.owningContext != this) { // Performance - children.add(stage.owningContext); - } - } else { - stage.owningContext = this; - } - } - - final void initializeContext() { - for (ConfigurationContext child : children) { - child.initializeContext(); - mergeContexts(child); - } - } + // final <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { + // if (sourcePort.getOwningStage().getInputPorts().size() == 0) { + // if (!threadService.getThreadableStages().containsKey(sourcePort.getOwningStage())) { + // addThreadableStage(sourcePort.getOwningStage(), sourcePort.getOwningStage().getId()); + // } + // } + // + // if (LOGGER.isWarnEnabled() && (sourcePort.getPipe() != null || targetPort.getPipe() != null)) { + // LOGGER.warn("Overwriting existing pipe while connecting stages " + + // sourcePort.getOwningStage().getId() + " and " + targetPort.getOwningStage().getId() + "."); + // } + // + // addChildContext(sourcePort.getOwningStage()); + // addChildContext(targetPort.getOwningStage()); + // + // new InstantiationPipe(sourcePort, targetPort, capacity); + // } + + // final void addChildContext(final Stage stage) { + // if (!stage.owningContext.equals(EMPTY_CONTEXT)) { + // if (stage.owningContext != this) { // Performance + // children.add(stage.owningContext); + // } + // } else { + // stage.owningContext = this; + // } + // } + // + // final void initializeContext() { + // for (ConfigurationContext child : children) { + // child.initializeContext(); + // mergeContexts(child); + // } + // } final void initializeServices() { threadService.onInitialize(); } - private void mergeContexts(final ConfigurationContext child) { - threadService.merge(child.getThreadService()); - - // Finally copy parent services - child.threadService = this.threadService; - } + // private void mergeContexts(final ConfigurationContext child) { + // threadService.merge(child.getThreadService()); + // + // // Finally copy parent services + // child.threadService = this.threadService; + // } void executeConfiguration() { this.threadService.onExecute(); diff --git a/src/main/java/teetime/framework/DynamicActuator.java b/src/main/java/teetime/framework/DynamicActuator.java index ee63f6d6b4cef526af20995fe2207b0dcd91acef..523b489ee803809a8b0b9d5b5c09a6c85d0af8cb 100644 --- a/src/main/java/teetime/framework/DynamicActuator.java +++ b/src/main/java/teetime/framework/DynamicActuator.java @@ -31,8 +31,8 @@ public class DynamicActuator { } public Runnable startWithinNewThread(final Stage previousStage, final Stage stage) { - SignalingCounter runtimeCounter = previousStage.owningContext.getThreadService().getRunnableCounter(); - SignalingCounter newCounter = stage.owningContext.getThreadService().getRunnableCounter(); + SignalingCounter runtimeCounter = previousStage.getOwningContext().getThreadService().getRunnableCounter(); + SignalingCounter newCounter = stage.getOwningContext().getThreadService().getRunnableCounter(); // runtimeCounter.inc(newCounter); // stage.logger.error(stage.owningContext.getThreadService().getRunnableCounter().toString()); diff --git a/src/main/java/teetime/framework/Execution.java b/src/main/java/teetime/framework/Execution.java index 3e71626868284fc9b5c9cce83ccb7b5e3ad77cea..e6e608863c4da47bd8094e9bd54f1d10899c6c8d 100644 --- a/src/main/java/teetime/framework/Execution.java +++ b/src/main/java/teetime/framework/Execution.java @@ -15,12 +15,11 @@ */ package teetime.framework; -import java.util.Map; +import java.util.Set; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import teetime.framework.Traversor.Direction; import teetime.framework.signal.ValidatingSignal; import teetime.framework.validation.AnalysisNotValidException; @@ -43,7 +42,6 @@ public final class Execution<T extends Configuration> { private static final Logger LOGGER = LoggerFactory.getLogger(Execution.class); private final T configuration; - private final ConfigurationContext configurationContext; /** @@ -68,8 +66,7 @@ public final class Execution<T extends Configuration> { */ public Execution(final T configuration, final boolean validationEnabled) { this.configuration = configuration; - // this.configurationContext = configuration.getContext(); - this.configurationContext = new ConfigurationContext(configuration); + this.configurationContext = configuration.getContext(); if (configuration.isExecuted()) { throw new IllegalStateException("Configuration was already executed"); } @@ -82,8 +79,8 @@ public final class Execution<T extends Configuration> { // BETTER validate concurrently private void validateStages() { - final Map<Stage, String> threadableStageJobs = configurationContext.getThreadableStages(); - for (Stage stage : threadableStageJobs.keySet()) { + final Set<Stage> threadableStages = configurationContext.getThreadableStages(); + for (Stage stage : threadableStages) { // // portConnectionValidator.validate(stage); // } @@ -100,15 +97,10 @@ public final class Execution<T extends Configuration> { * */ private final void init() { - ExecutionInstantiation executionInstantiation = new ExecutionInstantiation(configurationContext); - executionInstantiation.instantiatePipes(); - - IPipeVisitor pipeVisitor = new StageCollector(); - Traversor traversor = new Traversor(pipeVisitor, Direction.BOTH); - // TODO iterate through each producer - // traversor.traverse(stage); + // ExecutionInstantiation executionInstantiation = new ExecutionInstantiation(configurationContext); + // executionInstantiation.instantiatePipes(); - configurationContext.initializeContext(); + // configurationContext.initializeContext(); configurationContext.initializeServices(); } diff --git a/src/main/java/teetime/framework/ExecutionInstantiation.java b/src/main/java/teetime/framework/ExecutionInstantiation.java index 4334cf4252d69d25f88cee187a7548f5f8c8c05b..ddb6188c2b05e9bdfc00001007de6430298cd4ca 100644 --- a/src/main/java/teetime/framework/ExecutionInstantiation.java +++ b/src/main/java/teetime/framework/ExecutionInstantiation.java @@ -34,14 +34,14 @@ class ExecutionInstantiation { private final ConfigurationContext context; - public ExecutionInstantiation(final ConfigurationContext context) { + private ExecutionInstantiation(final ConfigurationContext context) { this.context = context; } void instantiatePipes() { int color = DEFAULT_COLOR; Map<Stage, Integer> colors = new HashMap<Stage, Integer>(); - Set<Stage> threadableStages = context.getThreadableStages().keySet(); + Set<Stage> threadableStages = context.getThreadableStages(); for (Stage threadableStage : threadableStages) { color++; colors.put(threadableStage, color); diff --git a/src/main/java/teetime/framework/IPortVisitor.java b/src/main/java/teetime/framework/IPortVisitor.java new file mode 100644 index 0000000000000000000000000000000000000000..28f314b35043f9987b9ea514fa9e35e13fe2ac97 --- /dev/null +++ b/src/main/java/teetime/framework/IPortVisitor.java @@ -0,0 +1,7 @@ +package teetime.framework; + +public interface IPortVisitor { + + void visit(AbstractPort<?> port); + +} diff --git a/src/main/java/teetime/framework/RunnableConsumerStage.java b/src/main/java/teetime/framework/RunnableConsumerStage.java index a23ae092f69297a93fc8648d627d20dcab04b966..03ecfdc28774964a797f4b2923d637267f9daa36 100644 --- a/src/main/java/teetime/framework/RunnableConsumerStage.java +++ b/src/main/java/teetime/framework/RunnableConsumerStage.java @@ -32,10 +32,11 @@ final class RunnableConsumerStage extends AbstractRunnableStage { @Override protected void beforeStageExecution() throws InterruptedException { + logger.trace("waitForInitializingSignal"); for (InputPort<?> inputPort : stage.getInputPorts()) { inputPort.waitForInitializingSignal(); } - + logger.trace("waitForStartingSignal"); for (InputPort<?> inputPort : stage.getInputPorts()) { inputPort.waitForStartSignal(); } diff --git a/src/main/java/teetime/framework/Stage.java b/src/main/java/teetime/framework/Stage.java index 8b51301e873578cbbcc098d42278965445da8a6b..7af474dd13600acc188369d8e3b3a824d9fc3b46 100644 --- a/src/main/java/teetime/framework/Stage.java +++ b/src/main/java/teetime/framework/Stage.java @@ -48,9 +48,17 @@ public abstract class Stage { protected AbstractExceptionListener exceptionListener; /** The owning thread of this stage if this stage is directly executed by a {@link AbstractRunnableStage}, <code>null</code> otherwise. */ - protected Thread owningThread; + private Thread owningThread; - ConfigurationContext owningContext = ConfigurationContext.EMPTY_CONTEXT; + private ConfigurationContext owningContext; + + ConfigurationContext getOwningContext() { + return owningContext; + } + + void setOwningContext(final ConfigurationContext owningContext) { + this.owningContext = owningContext; + } protected Stage() { this.id = this.createId(); @@ -135,6 +143,10 @@ public abstract class Stage { } void setOwningThread(final Thread owningThread) { + if (this.owningThread != null && this.owningThread != owningThread) { + // checks also for "crossing threads" + // throw new IllegalStateException("Attribute owningThread was set twice each with another thread"); + } this.owningThread = owningThread; } diff --git a/src/main/java/teetime/framework/StageCollector.java b/src/main/java/teetime/framework/StageCollector.java deleted file mode 100644 index 89d28ad77bc2bce29fab9d534628ad2c0d04def3..0000000000000000000000000000000000000000 --- a/src/main/java/teetime/framework/StageCollector.java +++ /dev/null @@ -1,13 +0,0 @@ -package teetime.framework; - -import teetime.framework.pipe.IPipe; - -public class StageCollector implements IPipeVisitor { - - @Override - public VisitorBehavior visit(IPipe outputPipe) { - // TODO Auto-generated method stub - return null; - } - -} diff --git a/src/main/java/teetime/framework/TeeTimeThread.java b/src/main/java/teetime/framework/TeeTimeThread.java new file mode 100644 index 0000000000000000000000000000000000000000..00e96fac780293eef26a0693d69b2deb678ebd23 --- /dev/null +++ b/src/main/java/teetime/framework/TeeTimeThread.java @@ -0,0 +1,29 @@ +package teetime.framework; + +public class TeeTimeThread extends Thread { + + private final AbstractRunnableStage runnable; + + public TeeTimeThread(final AbstractRunnableStage runnable, final String name) { + super(runnable, name); + this.runnable = runnable; + } + + public void sendInitializingSignal() { + if (runnable instanceof RunnableProducerStage) { + ((RunnableProducerStage) runnable).triggerInitializingSignal(); + } + } + + public void sendStartingSignal() { + if (runnable instanceof RunnableProducerStage) { + ((RunnableProducerStage) runnable).triggerStartingSignal(); + } + } + + @Override + public synchronized void start() { + runnable.stage.getOwningContext().getThreadService().getRunnableCounter().inc(); + super.start(); + } +} diff --git a/src/main/java/teetime/framework/ThreadService.java b/src/main/java/teetime/framework/ThreadService.java index 7c76df6fce9d0987076241f6bd325bd1df74bd5f..b80e0a36fba7df6cf5af809df91154c06112613f 100644 --- a/src/main/java/teetime/framework/ThreadService.java +++ b/src/main/java/teetime/framework/ThreadService.java @@ -1,17 +1,14 @@ package teetime.framework; import java.util.ArrayList; -import java.util.HashMap; import java.util.LinkedList; import java.util.List; -import java.util.Map; import java.util.Set; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import teetime.framework.exceptionHandling.AbstractExceptionListener; -import teetime.framework.exceptionHandling.IExceptionListenerFactory; +import teetime.framework.Traverser.Direction; import teetime.util.framework.concurrent.SignalingCounter; /** @@ -23,8 +20,6 @@ import teetime.util.framework.concurrent.SignalingCounter; */ class ThreadService extends AbstractService<ThreadService> { - private Map<Stage, String> threadableStages = new HashMap<Stage, String>(); - private static final Logger LOGGER = LoggerFactory.getLogger(ThreadService.class); private final List<Thread> consumerThreads = new LinkedList<Thread>(); @@ -32,36 +27,67 @@ class ThreadService extends AbstractService<ThreadService> { private final List<Thread> infiniteProducerThreads = new LinkedList<Thread>(); private final SignalingCounter runnableCounter = new SignalingCounter(); + private final Configuration configuration; - private final AbstractCompositeStage compositeStage; + private Set<Stage> threadableStages; - public ThreadService(final AbstractCompositeStage compositeStage) { - this.compositeStage = compositeStage; + public ThreadService(final Configuration configuration) { + this.configuration = configuration; } - private final List<RunnableProducerStage> producerRunnables = new LinkedList<RunnableProducerStage>(); - @Override void onInitialize() { - // is invoked only by a Configuration - IExceptionListenerFactory factory = ((Configuration) compositeStage).getFactory(); + Stage startStage = configuration.getStartStage(); + if (startStage == null) { + throw new IllegalStateException("The start stage may not be null."); + } + + // TODO visit(port) only + // TODO use decorator pattern to combine all analyzes so that only one traverser pass is necessary + IPortVisitor portVisitor = new A0UnconnectedPort(); + IPipeVisitor pipeVisitor = new A1PipeInstantiation(); + Traverser traversor = new Traverser(portVisitor, pipeVisitor, Direction.BOTH); + traversor.traverse(startStage); + + A2ThreadableStageCollector stageCollector = new A2ThreadableStageCollector(); + traversor = new Traverser(stageCollector, Direction.BOTH); + traversor.traverse(startStage); + threadableStages = stageCollector.getThreadableStages(); if (threadableStages.isEmpty()) { throw new IllegalStateException("No stage was added using the addThreadableStage(..) method. Add at least one stage."); } - for (Stage stage : threadableStages.keySet()) { - final Thread thread = initializeStage(stage); + A3InvalidThreadAssignmentCheck checker = new A3InvalidThreadAssignmentCheck(threadableStages); + checker.check(); - final Set<Stage> intraStages = traverseIntraStages(stage); + A4StageAttributeSetter attributeSetter = new A4StageAttributeSetter(configuration, threadableStages); + attributeSetter.setAttributes(); - final AbstractExceptionListener newListener = factory.createInstance(); - initializeIntraStages(intraStages, thread, newListener); + for (Stage stage : threadableStages) { + categorizeThreadableStage(stage); } onStart(); } + private void categorizeThreadableStage(final Stage stage) { + switch (stage.getTerminationStrategy()) { + case BY_INTERRUPT: + infiniteProducerThreads.add(stage.getOwningThread()); + break; + case BY_SELF_DECISION: + finiteProducerThreads.add(stage.getOwningThread()); + break; + case BY_SIGNAL: + consumerThreads.add(stage.getOwningThread()); + break; + default: + LOGGER.warn("Unknown termination strategy '" + stage.getTerminationStrategy() + "' in stage " + stage); + break; + } + } + @Override void onStart() { startThreads(this.consumerThreads); @@ -78,7 +104,7 @@ class ThreadService extends AbstractService<ThreadService> { @Override void onTerminate() { - for (Stage stage : threadableStages.keySet()) { + for (Stage stage : threadableStages) { stage.terminate(); } } @@ -122,62 +148,6 @@ class ThreadService extends AbstractService<ThreadService> { return exceptions; } - private void initializeIntraStages(final Set<Stage> intraStages, final Thread thread, final AbstractExceptionListener newListener) { - for (Stage intraStage : intraStages) { - intraStage.setOwningThread(thread); - intraStage.setExceptionHandler(newListener); - } - } - - private Thread initializeStage(final Stage stage) { - final Thread thread; - - final TerminationStrategy terminationStrategy = stage.getTerminationStrategy(); - switch (terminationStrategy) { - case BY_SIGNAL: { - final RunnableConsumerStage runnable = new RunnableConsumerStage(stage); - thread = createThread(runnable, stage.getId()); - this.consumerThreads.add(thread); - break; - } - case BY_SELF_DECISION: { - final RunnableProducerStage runnable = new RunnableProducerStage(stage); - producerRunnables.add(runnable); - thread = createThread(runnable, stage.getId()); - this.finiteProducerThreads.add(thread); - break; - } - case BY_INTERRUPT: { - final RunnableProducerStage runnable = new RunnableProducerStage(stage); - producerRunnables.add(runnable); - thread = createThread(runnable, stage.getId()); - this.infiniteProducerThreads.add(thread); - break; - } - default: - throw new IllegalStateException("Unhandled termination strategy: " + terminationStrategy); - } - return thread; - } - - private Thread createThread(final AbstractRunnableStage runnable, final String name) { - final Thread thread = new Thread(runnable); - thread.setName(threadableStages.get(runnable.stage)); - return thread; - } - - private Set<Stage> traverseIntraStages(final Stage stage) { - final Traversor traversor = new Traversor(new IntraStageCollector()); - traversor.traverse(stage); - return traversor.getVisitedStage(); - } - - void addThreadableStage(final Stage stage, final String threadName) { - if (this.threadableStages.put(stage, threadName) != null && LOGGER.isWarnEnabled()) { - LOGGER.warn("Stage " + stage.getId() + " was already marked as threadable stage."); - } - } - private void startThreads(final Iterable<Thread> threads) { for (Thread thread : threads) { thread.start(); @@ -185,30 +155,32 @@ class ThreadService extends AbstractService<ThreadService> { } private void sendInitializingSignal() { - for (RunnableProducerStage runnable : producerRunnables) { - runnable.triggerInitializingSignal(); + for (Thread thread : infiniteProducerThreads) { + ((TeeTimeThread) thread).sendInitializingSignal(); + } + for (Thread thread : finiteProducerThreads) { + ((TeeTimeThread) thread).sendInitializingSignal(); } } private void sendStartingSignal() { - for (RunnableProducerStage runnable : producerRunnables) { - runnable.triggerStartingSignal(); + for (Thread thread : infiniteProducerThreads) { + ((TeeTimeThread) thread).sendStartingSignal(); + } + for (Thread thread : finiteProducerThreads) { + ((TeeTimeThread) thread).sendStartingSignal(); } } - Map<Stage, String> getThreadableStages() { + Set<Stage> getThreadableStages() { return threadableStages; } - void setThreadableStages(final Map<Stage, String> threadableStages) { - this.threadableStages = threadableStages; - } - - @Override - void merge(final ThreadService source) { - threadableStages.putAll(source.getThreadableStages()); - // runnableCounter.inc(source.runnableCounter); - } + // @Override + // void merge(final ThreadService source) { + // threadableStages.putAll(source.getThreadableStages()); + // // runnableCounter.inc(source.runnableCounter); + // } SignalingCounter getRunnableCounter() { return runnableCounter; diff --git a/src/main/java/teetime/framework/Traverser.java b/src/main/java/teetime/framework/Traverser.java new file mode 100644 index 0000000000000000000000000000000000000000..d430e33de7ea9fa841604d858b8eebcefb06dfcd --- /dev/null +++ b/src/main/java/teetime/framework/Traverser.java @@ -0,0 +1,97 @@ +/** + * Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://christianwulf.github.io/teetime) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package teetime.framework; + +import java.util.HashSet; +import java.util.Set; + +import teetime.framework.IPipeVisitor.VisitorBehavior; +import teetime.framework.pipe.DummyPipe; +import teetime.framework.pipe.IPipe; + +public class Traverser { + + public static enum Direction { + BACKWARD(1), FORWARD(2), BOTH(BACKWARD.value | FORWARD.value); + + private final int value; + + private Direction(final int value) { + this.value = value; + } + + public boolean represents(final Direction direction) { + return (value & direction.value) == direction.value; + } + } + + private static final IPortVisitor DEFAULT_PORT_VISITOR = new IPortVisitor() { + @Override + public void visit(final AbstractPort<?> port) { + // do nothing + } + }; + + private final IPortVisitor portVisitor; + private final IPipeVisitor pipeVisitor; + private final Direction direction; + private final Set<Stage> visitedStages = new HashSet<Stage>(); + + public Traverser(final IPipeVisitor pipeVisitor) { + this(pipeVisitor, Direction.FORWARD); + } + + public Traverser(final IPipeVisitor pipeVisitor, final Direction direction) { + this(DEFAULT_PORT_VISITOR, pipeVisitor, direction); + } + + public Traverser(final IPortVisitor portVisitor, final IPipeVisitor pipeVisitor, final Direction direction) { + this.portVisitor = portVisitor; + this.pipeVisitor = pipeVisitor; + this.direction = direction; + } + + public void traverse(final Stage stage) { + if (!visitedStages.add(stage)) { + return; + } + + if (direction.represents(Direction.FORWARD)) { + for (OutputPort<?> outputPort : stage.getOutputPorts()) { + visitAndTraverse(outputPort, Direction.FORWARD); + } + } + + if (direction.represents(Direction.BACKWARD)) { + for (InputPort<?> inputPort : stage.getInputPorts()) { + visitAndTraverse(inputPort, Direction.BACKWARD); + } + } + } + + private void visitAndTraverse(final AbstractPort<?> port, final Direction direction) { + portVisitor.visit(port); + IPipe<?> pipe = port.getPipe(); + if (pipe != DummyPipe.INSTANCE && pipeVisitor.visit(pipe) == VisitorBehavior.CONTINUE) { + AbstractPort<?> nextPort = (direction == Direction.FORWARD) ? pipe.getTargetPort() : pipe.getSourcePort(); + traverse(nextPort.getOwningStage()); // recursive call + } + } + + public Set<Stage> getVisitedStages() { + return visitedStages; + } +} diff --git a/src/main/java/teetime/framework/Traversor.java b/src/main/java/teetime/framework/Traversor.java deleted file mode 100644 index 67d3f6c75c814caae78f750400dc5515c2a79c73..0000000000000000000000000000000000000000 --- a/src/main/java/teetime/framework/Traversor.java +++ /dev/null @@ -1,72 +0,0 @@ -/** - * Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://christianwulf.github.io/teetime) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package teetime.framework; - -import java.util.HashSet; -import java.util.Set; - -import teetime.framework.IPipeVisitor.VisitorBehavior; -import teetime.framework.pipe.IPipe; - -public class Traversor { - - public static enum Direction { - BACKWARD, FORWARD, BOTH - } - - private final IPipeVisitor pipeVisitor; - private final Direction direction; - private final Set<Stage> visitedStages = new HashSet<Stage>(); - - public Traversor(final IPipeVisitor pipeVisitor) { - this(pipeVisitor, Direction.FORWARD); - } - - public Traversor(final IPipeVisitor pipeVisitor, final Direction direction) { - this.pipeVisitor = pipeVisitor; - this.direction = direction; - } - - public void traverse(final Stage stage) { - if (!visitedStages.add(stage)) { - return; - } - - if (direction == Direction.BOTH || direction == Direction.FORWARD) { - for (OutputPort<?> outputPort : stage.getOutputPorts()) { - visitAndTraverse(outputPort); - } - } - - if (direction == Direction.BOTH || direction == Direction.BACKWARD) { - for (InputPort<?> inputPort : stage.getInputPorts()) { - visitAndTraverse(inputPort); - } - } - } - - private void visitAndTraverse(final AbstractPort<?> port) { - IPipe pipe = port.getPipe(); - if (null != pipe && pipeVisitor.visit(pipe) == VisitorBehavior.CONTINUE) { - Stage owningStage = pipe.getTargetPort().getOwningStage(); - traverse(owningStage); // recursive call - } - } - - public Set<Stage> getVisitedStage() { - return visitedStages; - } -} diff --git a/src/main/java/teetime/framework/pipe/DummyPipe.java b/src/main/java/teetime/framework/pipe/DummyPipe.java index 2480910976230877e29c35308f44c800c7001f70..a41f923399bf9bd9a877466a9a078092d0a86c66 100644 --- a/src/main/java/teetime/framework/pipe/DummyPipe.java +++ b/src/main/java/teetime/framework/pipe/DummyPipe.java @@ -27,6 +27,12 @@ import teetime.framework.signal.ISignal; */ public final class DummyPipe<T> implements IPipe<T> { + public static final IPipe<?> INSTANCE = new DummyPipe<Object>(); + + private DummyPipe() { + // singleton + } + @Override public boolean add(final Object element) { return true; diff --git a/src/test/java/teetime/examples/cipher/CipherConfiguration.java b/src/test/java/teetime/examples/cipher/CipherConfiguration.java index df9fefcc9d2692a6d014546831c35e6339e804a5..60ef0944313ea785461e9da8409210a389cb06f6 100644 --- a/src/test/java/teetime/examples/cipher/CipherConfiguration.java +++ b/src/test/java/teetime/examples/cipher/CipherConfiguration.java @@ -46,6 +46,5 @@ public class CipherConfiguration extends Configuration { connectPorts(comp.getOutputPort(), decomp.getInputPort()); connectPorts(decomp.getOutputPort(), decrypt.getInputPort()); connectPorts(decrypt.getOutputPort(), writer.getInputPort()); - } } diff --git a/src/test/java/teetime/examples/tokenizer/TokenizerTest.java b/src/test/java/teetime/examples/tokenizer/TokenizerTest.java index b176ce5e62df62436145f3e0138a4b9f89af0b90..bfe97535528dff110483fcae8c995b2603fb89ae 100644 --- a/src/test/java/teetime/examples/tokenizer/TokenizerTest.java +++ b/src/test/java/teetime/examples/tokenizer/TokenizerTest.java @@ -42,7 +42,7 @@ public class TokenizerTest { final String password = "Password"; final TokenizerConfiguration configuration = new TokenizerConfiguration(inputFile, password); - final Execution execution = new Execution(configuration); + final Execution<TokenizerConfiguration> execution = new Execution<TokenizerConfiguration>(configuration); execution.executeBlocking(); final String string = Files.toString(new File("src/test/resources/data/input.txt"), Charset.forName("UTF-8")); diff --git a/src/test/java/teetime/framework/RunnableConsumerStageTest.java b/src/test/java/teetime/framework/RunnableConsumerStageTest.java index 2d16fcb80a14aa31b3213a8f649ec7777b8c76e8..251a49610d2833610eded1320455997fb60842be 100644 --- a/src/test/java/teetime/framework/RunnableConsumerStageTest.java +++ b/src/test/java/teetime/framework/RunnableConsumerStageTest.java @@ -34,7 +34,7 @@ public class RunnableConsumerStageTest { public void testWaitingInfinitely() throws Exception { RunnableConsumerStageTestConfiguration configuration = new RunnableConsumerStageTestConfiguration(); - final Execution execution = new Execution(configuration); + final Execution<?> execution = new Execution<RunnableConsumerStageTestConfiguration>(configuration); final Thread thread = new Thread(new Runnable() { @Override public void run() { @@ -59,7 +59,7 @@ public class RunnableConsumerStageTest { public void testCorrectStartAndTerminatation() throws Exception { RunnableConsumerStageTestConfiguration configuration = new RunnableConsumerStageTestConfiguration(0, 1, 2, 3, 5); - final Execution execution = new Execution(configuration); + final Execution<?> execution = new Execution<RunnableConsumerStageTestConfiguration>(configuration); start(execution); assertEquals(5, configuration.getCollectedElements().size()); @@ -109,7 +109,7 @@ public class RunnableConsumerStageTest { public void testYieldRun() throws Exception { YieldStrategyConfiguration waitStrategyConfiguration = new YieldStrategyConfiguration(42); - final Execution execution = new Execution(waitStrategyConfiguration); + final Execution<?> execution = new Execution<YieldStrategyConfiguration>(waitStrategyConfiguration); start(execution); @@ -117,7 +117,7 @@ public class RunnableConsumerStageTest { assertEquals(1, waitStrategyConfiguration.getCollectorSink().getElements().size()); } - private void start(final Execution execution) { + private void start(final Execution<?> execution) { Collection<ThreadThrowableContainer> exceptions = new ArrayList<ThreadThrowableContainer>(); try { execution.executeBlocking(); diff --git a/src/test/java/teetime/framework/RunnableProducerStageTest.java b/src/test/java/teetime/framework/RunnableProducerStageTest.java index a2569338181835a207f2921b5c53527ca55f7413..fde7dda3378c65ec679990c4c3c3c2ecd6f6aff5 100644 --- a/src/test/java/teetime/framework/RunnableProducerStageTest.java +++ b/src/test/java/teetime/framework/RunnableProducerStageTest.java @@ -20,11 +20,14 @@ import static org.junit.Assert.assertTrue; import org.junit.Test; +import teetime.framework.pipe.DummyPipe; + public class RunnableProducerStageTest { @Test public void testInit() { RunnableTestStage testStage = new RunnableTestStage(); + testStage.getOutputPort().setPipe(DummyPipe.INSTANCE); RunnableProducerStage runnable = new RunnableProducerStage(testStage); Thread thread = new Thread(runnable); thread.start(); diff --git a/src/test/java/teetime/framework/StageTest.java b/src/test/java/teetime/framework/StageTest.java index 2cf31d619e854a7191e6f65d0a5191ffb912b80e..937a3b2fd0d6c944354ca426a5e6371f3478c0bb 100644 --- a/src/test/java/teetime/framework/StageTest.java +++ b/src/test/java/teetime/framework/StageTest.java @@ -48,7 +48,7 @@ public class StageTest { public void testSetOwningThread() throws Exception { TestConfig tc = new TestConfig(); new Execution<TestConfig>(tc); - assertEquals(tc.init.owningThread, tc.delay.owningThread); + assertEquals(tc.init.getOwningThread(), tc.delay.getOwningThread()); assertThat(tc.delay.exceptionListener, is(notNullValue())); assertEquals(tc.init.exceptionListener, tc.delay.exceptionListener); } diff --git a/src/test/java/teetime/framework/TraversorTest.java b/src/test/java/teetime/framework/TraverserTest.java similarity index 95% rename from src/test/java/teetime/framework/TraversorTest.java rename to src/test/java/teetime/framework/TraverserTest.java index 35938bb5a6446fa3b3855989d0e3d915d10096b3..e8272835efa5212e030712804704dfb9a34fffea 100644 --- a/src/test/java/teetime/framework/TraversorTest.java +++ b/src/test/java/teetime/framework/TraverserTest.java @@ -35,9 +35,9 @@ import teetime.stage.io.File2SeqOfWords; import teetime.stage.string.WordCounter; import teetime.stage.util.CountingMap; -public class TraversorTest { +public class TraverserTest { - private final Traversor traversor = new Traversor(new IntraStageCollector()); + private final Traverser traversor = new Traverser(new IntraStageCollector()); @Test public void traverse() { @@ -52,7 +52,7 @@ public class TraversorTest { OutputPort<?> distributorOutputPort0 = tc.distributor.getOutputPorts().get(0); assertThat(tc.distributor.getOwningThread(), is(not(distributorOutputPort0.pipe.getTargetPort().getOwningStage().getOwningThread()))); - assertEquals(comparingStages, traversor.getVisitedStage()); + assertEquals(comparingStages, traversor.getVisitedStages()); } // WordCounterConfiguration diff --git a/src/test/resources/data/output.txt b/src/test/resources/data/output.txt index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..5c016264f5118569bef2312dcd2f2f97bfd7d292 100644 --- a/src/test/resources/data/output.txt +++ b/src/test/resources/data/output.txt @@ -0,0 +1,11 @@ +Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. +Duis autem vel eum iriure dolor in hendrerit in vulputate velit esse molestie consequat, vel illum dolore eu feugiat nulla facilisis at vero eros et accumsan et iusto odio dignissim qui blandit praesent luptatum zzril delenit augue duis dolore te feugait nulla facilisi. Lorem ipsum dolor sit amet, consectetuer adipiscing elit, sed diam nonummy nibh euismod tincidunt ut laoreet dolore magna aliquam erat volutpat. +Ut wisi enim ad minim veniam, quis nostrud exerci tation ullamcorper suscipit lobortis nisl ut aliquip ex ea commodo consequat. Duis autem vel eum iriure dolor in hendrerit in vulputate velit esse molestie consequat, vel illum dolore eu feugiat nulla facilisis at vero eros et accumsan et iusto odio dignissim qui blandit praesent luptatum zzril delenit augue duis dolore te feugait nulla facilisi. +Nam liber tempor cum soluta nobis eleifend option congue nihil imperdiet doming id quod mazim placerat facer possim assum. Lorem ipsum dolor sit amet, consectetuer adipiscing elit, sed diam nonummy nibh euismod tincidunt ut laoreet dolore magna aliquam erat volutpat. Ut wisi enim ad minim veniam, quis nostrud exerci tation ullamcorper suscipit lobortis nisl ut aliquip ex ea commodo consequat. +Duis autem vel eum iriure dolor in hendrerit in vulputate velit esse molestie consequat, vel illum dolore eu feugiat nulla facilisis. +At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, At accusam aliquyam diam diam dolore dolores duo eirmod eos erat, et nonumy sed tempor et et invidunt justo labore Stet clita ea et gubergren, kasd magna no rebum. sanctus sea sed takimata ut vero voluptua. est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat. +Consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus. +Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. +Duis autem vel eum iriure dolor in hendrerit in vulputate velit esse molestie consequat, vel illum dolore eu feugiat nulla facilisis at vero eros et accumsan et iusto odio dignissim qui blandit praesent luptatum zzril delenit augue duis dolore te feugait nulla facilisi. Lorem ipsum dolor sit amet, consectetuer adipiscing elit, sed diam nonummy nibh euismod tincidunt ut laoreet dolore magna aliquam erat volutpat. +Ut wisi enim ad minim veniam, quis nostrud exerci tation ullamcorper suscipit lobortis nisl ut aliquip ex ea commodo consequat. Duis autem vel eum iriure dolor in hendrerit in vulputate velit esse molestie consequat, vel illum dolore eu feugiat nulla facilisis at vero eros et accumsan et iusto odio dignissim qui blandit praesent luptatum zzril delenit augue duis dolore te feugait nulla facilisi. +Nam liber tempor cum soluta nobis eleifend option congue nihil imperdiet doming id quod mazim placerat facer possim assum. Lorem ipsum dolor sit amet, consectetuer adipiscing elit, sed diam nonummy nibh euismod tincidunt ut laoreet dolore magna aliquam erat volutpat. Ut wisi enim ad minim veniam, quis nostrud exerci tation ullamcorper suscipit lobortis nisl ut aliquip ex ea commodo \ No newline at end of file diff --git a/src/test/resources/logback-test.xml b/src/test/resources/logback-test.xml index e1bcc378314eb2349091da6e361afec781a42692..d713e3777ffefba16ac7f1d9f320c5d3dcbfd31f 100644 --- a/src/test/resources/logback-test.xml +++ b/src/test/resources/logback-test.xml @@ -22,7 +22,8 @@ <logger name="teetime" level="INFO" /> <logger name="teetime.framework" level="TRACE" /> - <logger name="teetime.stage.InitialElementProducer" level="DEBUG" /> + <logger name="teetime.stage.InitialElementProducer" level="TRACE" /> + <logger name="teetime.stage.CollectorSink" level="TRACE" /> <logger name="teetime.stage.merger" level="TRACE" /> <!-- <logger name="teetime.stage" level="TRACE" /> --> <!-- <logger name="teetime.framework.signal" level="TRACE" /> -->