From f5931be750cd33f0218b680b227e03b2f703f83c Mon Sep 17 00:00:00 2001 From: Christian Wulf <chw@informatik.uni-kiel.de> Date: Fri, 31 Jul 2015 14:39:10 +0200 Subject: [PATCH] added new working context concept --- .../teetime/framework/A0UnconnectedPort.java | 21 +++ .../framework/A1PipeInstantiation.java | 56 +++++++ .../framework/A2ThreadableStageCollector.java | 35 ++++ .../A3InvalidThreadAssignmentCheck.java | 79 +++++++++ .../framework/A4StageAttributeSetter.java | 37 +++++ .../framework/AbstractCompositeStage.java | 8 +- .../java/teetime/framework/AbstractPipe.java | 5 + .../framework/AbstractRunnableStage.java | 18 ++- .../teetime/framework/AbstractService.java | 2 +- .../java/teetime/framework/AbstractStage.java | 20 +-- .../java/teetime/framework/Configuration.java | 27 +++- .../framework/ConfigurationContext.java | 107 ++++++------- .../teetime/framework/DynamicActuator.java | 4 +- .../java/teetime/framework/Execution.java | 22 +-- .../framework/ExecutionInstantiation.java | 4 +- .../java/teetime/framework/IPortVisitor.java | 7 + .../framework/RunnableConsumerStage.java | 3 +- src/main/java/teetime/framework/Stage.java | 16 +- .../teetime/framework/StageCollector.java | 13 -- .../java/teetime/framework/TeeTimeThread.java | 29 ++++ .../java/teetime/framework/ThreadService.java | 150 +++++++----------- .../java/teetime/framework/Traverser.java | 97 +++++++++++ .../java/teetime/framework/Traversor.java | 72 --------- .../teetime/framework/pipe/DummyPipe.java | 6 + .../examples/cipher/CipherConfiguration.java | 1 - .../examples/tokenizer/TokenizerTest.java | 2 +- .../framework/RunnableConsumerStageTest.java | 8 +- .../framework/RunnableProducerStageTest.java | 3 + .../java/teetime/framework/StageTest.java | 2 +- ...{TraversorTest.java => TraverserTest.java} | 6 +- src/test/resources/data/output.txt | 11 ++ src/test/resources/logback-test.xml | 3 +- 32 files changed, 579 insertions(+), 295 deletions(-) create mode 100644 src/main/java/teetime/framework/A0UnconnectedPort.java create mode 100644 src/main/java/teetime/framework/A1PipeInstantiation.java create mode 100644 src/main/java/teetime/framework/A2ThreadableStageCollector.java create mode 100644 src/main/java/teetime/framework/A3InvalidThreadAssignmentCheck.java create mode 100644 src/main/java/teetime/framework/A4StageAttributeSetter.java create mode 100644 src/main/java/teetime/framework/IPortVisitor.java delete mode 100644 src/main/java/teetime/framework/StageCollector.java create mode 100644 src/main/java/teetime/framework/TeeTimeThread.java create mode 100644 src/main/java/teetime/framework/Traverser.java delete mode 100644 src/main/java/teetime/framework/Traversor.java rename src/test/java/teetime/framework/{TraversorTest.java => TraverserTest.java} (95%) diff --git a/src/main/java/teetime/framework/A0UnconnectedPort.java b/src/main/java/teetime/framework/A0UnconnectedPort.java new file mode 100644 index 00000000..86ef46d1 --- /dev/null +++ b/src/main/java/teetime/framework/A0UnconnectedPort.java @@ -0,0 +1,21 @@ +package teetime.framework; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import teetime.framework.pipe.DummyPipe; + +public class A0UnconnectedPort implements IPortVisitor { + + private static final Logger LOGGER = LoggerFactory.getLogger(A0UnconnectedPort.class); + + @Override + public void visit(final AbstractPort<?> port) { + if (port.getPipe() == null) { + if (LOGGER.isInfoEnabled()) { + LOGGER.info("Unconnected output port: " + port + ". Connecting with a dummy output port."); + } + port.setPipe(DummyPipe.INSTANCE); + } + } +} diff --git a/src/main/java/teetime/framework/A1PipeInstantiation.java b/src/main/java/teetime/framework/A1PipeInstantiation.java new file mode 100644 index 00000000..3b7c8aab --- /dev/null +++ b/src/main/java/teetime/framework/A1PipeInstantiation.java @@ -0,0 +1,56 @@ +package teetime.framework; + +import java.util.HashSet; +import java.util.Set; + +import teetime.framework.pipe.IPipe; +import teetime.framework.pipe.IPipeFactory; +import teetime.framework.pipe.InstantiationPipe; +import teetime.framework.pipe.SingleElementPipeFactory; +import teetime.framework.pipe.SpScPipeFactory; +import teetime.framework.pipe.UnboundedSpScPipeFactory; + +public class A1PipeInstantiation implements IPipeVisitor { + + private static final IPipeFactory interBoundedThreadPipeFactory = new SpScPipeFactory(); + private static final IPipeFactory interUnboundedThreadPipeFactory = new UnboundedSpScPipeFactory(); + private static final IPipeFactory intraThreadPipeFactory = new SingleElementPipeFactory(); + + private final Set<IPipe<?>> visitedPipes = new HashSet<IPipe<?>>(); + + @Override + public VisitorBehavior visit(final IPipe<?> pipe) { + if (visitedPipes.contains(pipe)) { + return VisitorBehavior.STOP; + } + visitedPipes.add(pipe); + + instantiatePipe(pipe); + + return VisitorBehavior.CONTINUE; + } + + private <T> void instantiatePipe(final IPipe<T> pipe) { + if (!(pipe instanceof InstantiationPipe)) { // if manually connected + return; + } + + Thread sourceStageThread = pipe.getSourcePort().getOwningStage().getOwningThread(); + Thread targetStageThread = pipe.getTargetPort().getOwningStage().getOwningThread(); + + if (targetStageThread != null && sourceStageThread != targetStageThread) { + // inter + if (pipe.capacity() != 0) { + interBoundedThreadPipeFactory.create(pipe.getSourcePort(), pipe.getTargetPort(), pipe.capacity()); + } else { + interUnboundedThreadPipeFactory.create(pipe.getSourcePort(), pipe.getTargetPort(), 4); + } + return; + } else { + // normal or reflexive pipe => intra + } + + intraThreadPipeFactory.create(pipe.getSourcePort(), pipe.getTargetPort(), 4); + } + +} diff --git a/src/main/java/teetime/framework/A2ThreadableStageCollector.java b/src/main/java/teetime/framework/A2ThreadableStageCollector.java new file mode 100644 index 00000000..efe44233 --- /dev/null +++ b/src/main/java/teetime/framework/A2ThreadableStageCollector.java @@ -0,0 +1,35 @@ +package teetime.framework; + +import java.util.HashSet; +import java.util.Set; + +import teetime.framework.pipe.IPipe; + +public class A2ThreadableStageCollector implements IPipeVisitor { + + private final Set<Stage> threadableStages = new HashSet<Stage>(); + private final Set<IPipe<?>> visitedPipes = new HashSet<IPipe<?>>(); + + public Set<Stage> getThreadableStages() { + return threadableStages; + } + + @Override + public VisitorBehavior visit(final IPipe<?> pipe) { + if (visitedPipes.contains(pipe)) { + return VisitorBehavior.STOP; + } + visitedPipes.add(pipe); + + collectThreadableStage(pipe.getSourcePort().getOwningStage()); + collectThreadableStage(pipe.getTargetPort().getOwningStage()); + + return VisitorBehavior.CONTINUE; + } + + private void collectThreadableStage(final Stage stage) { + if (stage.getOwningThread() != null && !threadableStages.contains(stage)) { + threadableStages.add(stage); + } + } +} diff --git a/src/main/java/teetime/framework/A3InvalidThreadAssignmentCheck.java b/src/main/java/teetime/framework/A3InvalidThreadAssignmentCheck.java new file mode 100644 index 00000000..4120bae5 --- /dev/null +++ b/src/main/java/teetime/framework/A3InvalidThreadAssignmentCheck.java @@ -0,0 +1,79 @@ +package teetime.framework; + +import java.util.Set; + +import teetime.framework.pipe.DummyPipe; +import teetime.framework.pipe.IPipe; + +import com.carrotsearch.hppc.ObjectIntHashMap; +import com.carrotsearch.hppc.ObjectIntMap; + +public class A3InvalidThreadAssignmentCheck { + + private static final int DEFAULT_COLOR = 0; + + private final Set<Stage> threadableStages; + + public A3InvalidThreadAssignmentCheck(final Set<Stage> threadableStages) { + this.threadableStages = threadableStages; + } + + public void check() { + int color = DEFAULT_COLOR; + ObjectIntMap<Stage> colors = new ObjectIntHashMap<Stage>(); + + for (Stage threadableStage : threadableStages) { + color++; + colors.put(threadableStage, color); + + ThreadPainter threadPainter = new ThreadPainter(colors, color, threadableStages); + threadPainter.check(threadableStage); + } + } + + private static class ThreadPainter { + + private final ObjectIntMap<Stage> colors; + private final int color; + private final Set<Stage> threadableStages; + + public ThreadPainter(final ObjectIntMap<Stage> colors, final int color, final Set<Stage> threadableStages) { + super(); + this.colors = colors; + this.color = color; + this.threadableStages = threadableStages; + } + + // TODO consider to implement it as IPipeVisitor(FORWARD) + + public void check(final Stage stage) { + for (OutputPort<?> outputPort : stage.getOutputPorts()) { + if (outputPort.pipe != DummyPipe.INSTANCE) { + Stage nextStage = checkPipe(outputPort.pipe); + if (nextStage != null) { + check(nextStage); + } + } + } + } + + private Stage checkPipe(final IPipe<?> pipe) { + Stage targetStage = pipe.getTargetPort().getOwningStage(); + int targetColor = colors.containsKey(targetStage) ? colors.get(targetStage) : DEFAULT_COLOR; + + if (threadableStages.contains(targetStage) && targetColor != color) { + // do nothing + } else { + if (colors.containsKey(targetStage)) { + if (colors.get(targetStage) != color) { + throw new IllegalStateException("Crossing threads"); // One stage is connected to a stage of another thread (but not its "headstage") + } + } + colors.put(targetStage, color); + return targetStage; + } + return null; + } + + } +} diff --git a/src/main/java/teetime/framework/A4StageAttributeSetter.java b/src/main/java/teetime/framework/A4StageAttributeSetter.java new file mode 100644 index 00000000..862e95cb --- /dev/null +++ b/src/main/java/teetime/framework/A4StageAttributeSetter.java @@ -0,0 +1,37 @@ +package teetime.framework; + +import java.util.Set; + +public class A4StageAttributeSetter { + + private final Configuration configuration; + private final Set<Stage> threadableStages; + + public A4StageAttributeSetter(final Configuration configuration, final Set<Stage> threadableStages) { + super(); + this.configuration = configuration; + this.threadableStages = threadableStages; + } + + public void setAttributes() { + for (Stage threadableStage : threadableStages) { + IPipeVisitor pipeVisitor = new IntraStageCollector(); + Traverser traverser = new Traverser(pipeVisitor); + traverser.traverse(threadableStage); + + setAttributes(threadableStage, traverser.getVisitedStages()); + } + } + + private void setAttributes(final Stage threadableStage, final Set<Stage> intraStages) { + threadableStage.setExceptionHandler(configuration.getFactory().createInstance()); + // threadableStage.setOwningThread(owningThread); + threadableStage.setOwningContext(configuration.getContext()); + + for (Stage stage : intraStages) { + stage.setExceptionHandler(threadableStage.exceptionListener); + stage.setOwningThread(threadableStage.getOwningThread()); + stage.setOwningContext(threadableStage.getOwningContext()); + } + } +} diff --git a/src/main/java/teetime/framework/AbstractCompositeStage.java b/src/main/java/teetime/framework/AbstractCompositeStage.java index 2a1efcf0..93c9870e 100644 --- a/src/main/java/teetime/framework/AbstractCompositeStage.java +++ b/src/main/java/teetime/framework/AbstractCompositeStage.java @@ -60,9 +60,11 @@ public abstract class AbstractCompositeStage { * @param threadName * A string which can be used for debugging. */ - protected final void addThreadableStage(final Stage stage, final String threadName) { + protected void addThreadableStage(final Stage stage, final String threadName) { // context.addThreadableStage(stage, threadName); - stage.setOwningThread(new Thread(threadName)); + AbstractRunnableStage runnable = AbstractRunnableStage.create(stage); + Thread newThread = new TeeTimeThread(runnable, threadName); + stage.setOwningThread(newThread); } /** @@ -91,7 +93,7 @@ public abstract class AbstractCompositeStage { * @param <T> * the type of elements to be sent */ - protected final <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { + protected <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { // context.connectPorts(sourcePort, targetPort, capacity); connectPortsInternal(sourcePort, targetPort, capacity); } diff --git a/src/main/java/teetime/framework/AbstractPipe.java b/src/main/java/teetime/framework/AbstractPipe.java index 31b3fbd6..55c1f55b 100644 --- a/src/main/java/teetime/framework/AbstractPipe.java +++ b/src/main/java/teetime/framework/AbstractPipe.java @@ -69,4 +69,9 @@ public abstract class AbstractPipe<T> implements IPipe<T> { public final int capacity() { return capacity; } + + @Override + public String toString() { + return sourcePort.getOwningStage().getId() + " -> " + targetPort.getOwningStage().getId() + " (" + super.toString() + ")"; + } } diff --git a/src/main/java/teetime/framework/AbstractRunnableStage.java b/src/main/java/teetime/framework/AbstractRunnableStage.java index 2fe5827c..10d0a890 100644 --- a/src/main/java/teetime/framework/AbstractRunnableStage.java +++ b/src/main/java/teetime/framework/AbstractRunnableStage.java @@ -28,17 +28,13 @@ abstract class AbstractRunnableStage implements Runnable { @SuppressWarnings("PMD.LoggerIsNotStaticFinal") protected final Logger logger; - public AbstractRunnableStage(final Stage stage) { + protected AbstractRunnableStage(final Stage stage) { if (stage == null) { throw new IllegalArgumentException("Argument stage may not be null"); } this.stage = stage; this.logger = LoggerFactory.getLogger(stage.getClass()); - - if (stage.getTerminationStrategy() != TerminationStrategy.BY_INTERRUPT) { - stage.owningContext.getThreadService().getRunnableCounter().inc(); - } } @Override @@ -55,7 +51,7 @@ abstract class AbstractRunnableStage implements Runnable { } while (!Thread.currentThread().isInterrupted()); } catch (TerminateException e) { this.stage.terminate(); - stage.owningContext.abortConfigurationRun(); + stage.getOwningContext().abortConfigurationRun(); } finally { afterStageExecution(); } @@ -68,7 +64,7 @@ abstract class AbstractRunnableStage implements Runnable { } } finally { if (stage.getTerminationStrategy() != TerminationStrategy.BY_INTERRUPT) { - stage.owningContext.getThreadService().getRunnableCounter().dec(); + stage.getOwningContext().getThreadService().getRunnableCounter().dec(); } } @@ -81,4 +77,12 @@ abstract class AbstractRunnableStage implements Runnable { protected abstract void afterStageExecution(); + public static AbstractRunnableStage create(final Stage stage) { + if (stage.getTerminationStrategy() == TerminationStrategy.BY_SIGNAL) { + return new RunnableConsumerStage(stage); + } else { + return new RunnableProducerStage(stage); + } + } + } diff --git a/src/main/java/teetime/framework/AbstractService.java b/src/main/java/teetime/framework/AbstractService.java index 029a6ef6..c103b03c 100644 --- a/src/main/java/teetime/framework/AbstractService.java +++ b/src/main/java/teetime/framework/AbstractService.java @@ -22,6 +22,6 @@ abstract class AbstractService<T> { abstract void onFinish(); - abstract void merge(T source); + // abstract void merge(T source); } diff --git a/src/main/java/teetime/framework/AbstractStage.java b/src/main/java/teetime/framework/AbstractStage.java index 4d0218d2..ec707013 100644 --- a/src/main/java/teetime/framework/AbstractStage.java +++ b/src/main/java/teetime/framework/AbstractStage.java @@ -19,7 +19,6 @@ import java.util.HashSet; import java.util.List; import java.util.Set; -import teetime.framework.pipe.DummyPipe; import teetime.framework.pipe.IPipe; import teetime.framework.signal.ISignal; import teetime.framework.validation.InvalidPortConnection; @@ -28,8 +27,6 @@ import teetime.util.framework.port.PortRemovedListener; public abstract class AbstractStage extends Stage { - private static final IPipe DUMMY_PIPE = new DummyPipe(); - private final Set<Class<? extends ISignal>> triggeredSignalTypes = new HashSet<Class<? extends ISignal>>(); private final PortList<InputPort<?>> inputPorts = new PortList<InputPort<?>>(); @@ -90,22 +87,9 @@ public abstract class AbstractStage extends Stage { @Override public void onInitializing() throws Exception { - this.connectUnconnectedOutputPorts(); changeState(StageState.INITIALIZED); } - @SuppressWarnings("PMD.DataflowAnomalyAnalysis") - private void connectUnconnectedOutputPorts() { - for (OutputPort<?> outputPort : this.outputPorts.getOpenedPorts()) { - if (null == outputPort.getPipe()) { // if port is unconnected - if (logger.isInfoEnabled()) { - this.logger.info("Unconnected output port: " + outputPort + ". Connecting with a dummy output port."); - } - outputPort.setPipe(DUMMY_PIPE); - } - } - } - private void changeState(final StageState newState) { currentState = newState; logger.trace(newState.toString()); @@ -257,7 +241,7 @@ public abstract class AbstractStage extends Stage { @Override public void validateOutputPorts(final List<InvalidPortConnection> invalidPortConnections) { for (OutputPort<?> outputPort : outputPorts.getOpenedPorts()) { - final IPipe pipe = outputPort.getPipe(); + final IPipe<?> pipe = outputPort.getPipe(); final Class<?> sourcePortType = outputPort.getType(); final Class<?> targetPortType = pipe.getTargetPort().getType(); @@ -271,7 +255,7 @@ public abstract class AbstractStage extends Stage { @Override protected void terminate() { changeState(StageState.TERMINATING); - owningThread.interrupt(); + getOwningThread().interrupt(); } @Override diff --git a/src/main/java/teetime/framework/Configuration.java b/src/main/java/teetime/framework/Configuration.java index 77a3d00b..418569c6 100644 --- a/src/main/java/teetime/framework/Configuration.java +++ b/src/main/java/teetime/framework/Configuration.java @@ -28,9 +28,11 @@ import teetime.framework.exceptionHandling.TerminatingExceptionListenerFactory; */ public abstract class Configuration extends AbstractCompositeStage { - private boolean executed; - private final IExceptionListenerFactory factory; + private final ConfigurationContext context; + + private boolean executed; + private Stage startStage; protected Configuration() { this(new TerminatingExceptionListenerFactory()); @@ -38,6 +40,7 @@ public abstract class Configuration extends AbstractCompositeStage { protected Configuration(final IExceptionListenerFactory factory) { this.factory = factory; + this.context = new ConfigurationContext(this); } boolean isExecuted() { @@ -52,4 +55,24 @@ public abstract class Configuration extends AbstractCompositeStage { return factory; } + @Override + protected void addThreadableStage(final Stage stage, final String threadName) { + startStage = stage; // memorize an arbitrary stage as starting point for traversing + super.addThreadableStage(stage, threadName); + } + + @Override + protected <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { + startStage = sourcePort.getOwningStage(); // memorize an arbitrary stage as starting point for traversing + super.connectPorts(sourcePort, targetPort, capacity); + } + + ConfigurationContext getContext() { + return context; + } + + Stage getStartStage() { + return startStage; + } + } diff --git a/src/main/java/teetime/framework/ConfigurationContext.java b/src/main/java/teetime/framework/ConfigurationContext.java index 41fe620b..38c9d9cc 100644 --- a/src/main/java/teetime/framework/ConfigurationContext.java +++ b/src/main/java/teetime/framework/ConfigurationContext.java @@ -15,15 +15,8 @@ */ package teetime.framework; -import java.util.HashSet; -import java.util.Map; import java.util.Set; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import teetime.framework.pipe.InstantiationPipe; - /** * Represents a context that is used by a configuration and composite stages to connect ports, for example. * Stages can be added by executing {@link #addThreadableStage(Stage)}. @@ -32,78 +25,78 @@ import teetime.framework.pipe.InstantiationPipe; */ final class ConfigurationContext { - static final ConfigurationContext EMPTY_CONTEXT = new ConfigurationContext(null); + // static final ConfigurationContext EMPTY_CONTEXT = new ConfigurationContext(null); - private static final Logger LOGGER = LoggerFactory.getLogger(ConfigurationContext.class); + // private static final Logger LOGGER = LoggerFactory.getLogger(ConfigurationContext.class); - private final Set<ConfigurationContext> children = new HashSet<ConfigurationContext>(); // parent-child-tree + // private final Set<ConfigurationContext> children = new HashSet<ConfigurationContext>(); // parent-child-tree private ThreadService threadService; - ConfigurationContext(final AbstractCompositeStage compositeStage) { - this.threadService = new ThreadService(compositeStage); + ConfigurationContext(final Configuration configuration) { + this.threadService = new ThreadService(configuration); } - Map<Stage, String> getThreadableStages() { + Set<Stage> getThreadableStages() { return threadService.getThreadableStages(); } /** * @see AbstractCompositeStage#addThreadableStage(Stage) */ - final void addThreadableStage(final Stage stage, final String threadName) { - addChildContext(stage); - threadService.addThreadableStage(stage, threadName); - } + // final void addThreadableStage(final Stage stage, final String threadName) { + // addChildContext(stage); + // threadService.addThreadableStage(stage, threadName); + // } /** * @see AbstractCompositeStage#connectPorts(OutputPort, InputPort, int) */ - final <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { - if (sourcePort.getOwningStage().getInputPorts().size() == 0) { - if (!threadService.getThreadableStages().containsKey(sourcePort.getOwningStage())) { - addThreadableStage(sourcePort.getOwningStage(), sourcePort.getOwningStage().getId()); - } - } - - if (LOGGER.isWarnEnabled() && (sourcePort.getPipe() != null || targetPort.getPipe() != null)) { - LOGGER.warn("Overwriting existing pipe while connecting stages " + - sourcePort.getOwningStage().getId() + " and " + targetPort.getOwningStage().getId() + "."); - } - - addChildContext(sourcePort.getOwningStage()); - addChildContext(targetPort.getOwningStage()); - - new InstantiationPipe(sourcePort, targetPort, capacity); - } - - final void addChildContext(final Stage stage) { - if (!stage.owningContext.equals(EMPTY_CONTEXT)) { - if (stage.owningContext != this) { // Performance - children.add(stage.owningContext); - } - } else { - stage.owningContext = this; - } - } - - final void initializeContext() { - for (ConfigurationContext child : children) { - child.initializeContext(); - mergeContexts(child); - } - } + // final <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { + // if (sourcePort.getOwningStage().getInputPorts().size() == 0) { + // if (!threadService.getThreadableStages().containsKey(sourcePort.getOwningStage())) { + // addThreadableStage(sourcePort.getOwningStage(), sourcePort.getOwningStage().getId()); + // } + // } + // + // if (LOGGER.isWarnEnabled() && (sourcePort.getPipe() != null || targetPort.getPipe() != null)) { + // LOGGER.warn("Overwriting existing pipe while connecting stages " + + // sourcePort.getOwningStage().getId() + " and " + targetPort.getOwningStage().getId() + "."); + // } + // + // addChildContext(sourcePort.getOwningStage()); + // addChildContext(targetPort.getOwningStage()); + // + // new InstantiationPipe(sourcePort, targetPort, capacity); + // } + + // final void addChildContext(final Stage stage) { + // if (!stage.owningContext.equals(EMPTY_CONTEXT)) { + // if (stage.owningContext != this) { // Performance + // children.add(stage.owningContext); + // } + // } else { + // stage.owningContext = this; + // } + // } + // + // final void initializeContext() { + // for (ConfigurationContext child : children) { + // child.initializeContext(); + // mergeContexts(child); + // } + // } final void initializeServices() { threadService.onInitialize(); } - private void mergeContexts(final ConfigurationContext child) { - threadService.merge(child.getThreadService()); - - // Finally copy parent services - child.threadService = this.threadService; - } + // private void mergeContexts(final ConfigurationContext child) { + // threadService.merge(child.getThreadService()); + // + // // Finally copy parent services + // child.threadService = this.threadService; + // } void executeConfiguration() { this.threadService.onExecute(); diff --git a/src/main/java/teetime/framework/DynamicActuator.java b/src/main/java/teetime/framework/DynamicActuator.java index ee63f6d6..523b489e 100644 --- a/src/main/java/teetime/framework/DynamicActuator.java +++ b/src/main/java/teetime/framework/DynamicActuator.java @@ -31,8 +31,8 @@ public class DynamicActuator { } public Runnable startWithinNewThread(final Stage previousStage, final Stage stage) { - SignalingCounter runtimeCounter = previousStage.owningContext.getThreadService().getRunnableCounter(); - SignalingCounter newCounter = stage.owningContext.getThreadService().getRunnableCounter(); + SignalingCounter runtimeCounter = previousStage.getOwningContext().getThreadService().getRunnableCounter(); + SignalingCounter newCounter = stage.getOwningContext().getThreadService().getRunnableCounter(); // runtimeCounter.inc(newCounter); // stage.logger.error(stage.owningContext.getThreadService().getRunnableCounter().toString()); diff --git a/src/main/java/teetime/framework/Execution.java b/src/main/java/teetime/framework/Execution.java index 3e716268..e6e60886 100644 --- a/src/main/java/teetime/framework/Execution.java +++ b/src/main/java/teetime/framework/Execution.java @@ -15,12 +15,11 @@ */ package teetime.framework; -import java.util.Map; +import java.util.Set; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import teetime.framework.Traversor.Direction; import teetime.framework.signal.ValidatingSignal; import teetime.framework.validation.AnalysisNotValidException; @@ -43,7 +42,6 @@ public final class Execution<T extends Configuration> { private static final Logger LOGGER = LoggerFactory.getLogger(Execution.class); private final T configuration; - private final ConfigurationContext configurationContext; /** @@ -68,8 +66,7 @@ public final class Execution<T extends Configuration> { */ public Execution(final T configuration, final boolean validationEnabled) { this.configuration = configuration; - // this.configurationContext = configuration.getContext(); - this.configurationContext = new ConfigurationContext(configuration); + this.configurationContext = configuration.getContext(); if (configuration.isExecuted()) { throw new IllegalStateException("Configuration was already executed"); } @@ -82,8 +79,8 @@ public final class Execution<T extends Configuration> { // BETTER validate concurrently private void validateStages() { - final Map<Stage, String> threadableStageJobs = configurationContext.getThreadableStages(); - for (Stage stage : threadableStageJobs.keySet()) { + final Set<Stage> threadableStages = configurationContext.getThreadableStages(); + for (Stage stage : threadableStages) { // // portConnectionValidator.validate(stage); // } @@ -100,15 +97,10 @@ public final class Execution<T extends Configuration> { * */ private final void init() { - ExecutionInstantiation executionInstantiation = new ExecutionInstantiation(configurationContext); - executionInstantiation.instantiatePipes(); - - IPipeVisitor pipeVisitor = new StageCollector(); - Traversor traversor = new Traversor(pipeVisitor, Direction.BOTH); - // TODO iterate through each producer - // traversor.traverse(stage); + // ExecutionInstantiation executionInstantiation = new ExecutionInstantiation(configurationContext); + // executionInstantiation.instantiatePipes(); - configurationContext.initializeContext(); + // configurationContext.initializeContext(); configurationContext.initializeServices(); } diff --git a/src/main/java/teetime/framework/ExecutionInstantiation.java b/src/main/java/teetime/framework/ExecutionInstantiation.java index 4334cf42..ddb6188c 100644 --- a/src/main/java/teetime/framework/ExecutionInstantiation.java +++ b/src/main/java/teetime/framework/ExecutionInstantiation.java @@ -34,14 +34,14 @@ class ExecutionInstantiation { private final ConfigurationContext context; - public ExecutionInstantiation(final ConfigurationContext context) { + private ExecutionInstantiation(final ConfigurationContext context) { this.context = context; } void instantiatePipes() { int color = DEFAULT_COLOR; Map<Stage, Integer> colors = new HashMap<Stage, Integer>(); - Set<Stage> threadableStages = context.getThreadableStages().keySet(); + Set<Stage> threadableStages = context.getThreadableStages(); for (Stage threadableStage : threadableStages) { color++; colors.put(threadableStage, color); diff --git a/src/main/java/teetime/framework/IPortVisitor.java b/src/main/java/teetime/framework/IPortVisitor.java new file mode 100644 index 00000000..28f314b3 --- /dev/null +++ b/src/main/java/teetime/framework/IPortVisitor.java @@ -0,0 +1,7 @@ +package teetime.framework; + +public interface IPortVisitor { + + void visit(AbstractPort<?> port); + +} diff --git a/src/main/java/teetime/framework/RunnableConsumerStage.java b/src/main/java/teetime/framework/RunnableConsumerStage.java index a23ae092..03ecfdc2 100644 --- a/src/main/java/teetime/framework/RunnableConsumerStage.java +++ b/src/main/java/teetime/framework/RunnableConsumerStage.java @@ -32,10 +32,11 @@ final class RunnableConsumerStage extends AbstractRunnableStage { @Override protected void beforeStageExecution() throws InterruptedException { + logger.trace("waitForInitializingSignal"); for (InputPort<?> inputPort : stage.getInputPorts()) { inputPort.waitForInitializingSignal(); } - + logger.trace("waitForStartingSignal"); for (InputPort<?> inputPort : stage.getInputPorts()) { inputPort.waitForStartSignal(); } diff --git a/src/main/java/teetime/framework/Stage.java b/src/main/java/teetime/framework/Stage.java index 8b51301e..7af474dd 100644 --- a/src/main/java/teetime/framework/Stage.java +++ b/src/main/java/teetime/framework/Stage.java @@ -48,9 +48,17 @@ public abstract class Stage { protected AbstractExceptionListener exceptionListener; /** The owning thread of this stage if this stage is directly executed by a {@link AbstractRunnableStage}, <code>null</code> otherwise. */ - protected Thread owningThread; + private Thread owningThread; - ConfigurationContext owningContext = ConfigurationContext.EMPTY_CONTEXT; + private ConfigurationContext owningContext; + + ConfigurationContext getOwningContext() { + return owningContext; + } + + void setOwningContext(final ConfigurationContext owningContext) { + this.owningContext = owningContext; + } protected Stage() { this.id = this.createId(); @@ -135,6 +143,10 @@ public abstract class Stage { } void setOwningThread(final Thread owningThread) { + if (this.owningThread != null && this.owningThread != owningThread) { + // checks also for "crossing threads" + // throw new IllegalStateException("Attribute owningThread was set twice each with another thread"); + } this.owningThread = owningThread; } diff --git a/src/main/java/teetime/framework/StageCollector.java b/src/main/java/teetime/framework/StageCollector.java deleted file mode 100644 index 89d28ad7..00000000 --- a/src/main/java/teetime/framework/StageCollector.java +++ /dev/null @@ -1,13 +0,0 @@ -package teetime.framework; - -import teetime.framework.pipe.IPipe; - -public class StageCollector implements IPipeVisitor { - - @Override - public VisitorBehavior visit(IPipe outputPipe) { - // TODO Auto-generated method stub - return null; - } - -} diff --git a/src/main/java/teetime/framework/TeeTimeThread.java b/src/main/java/teetime/framework/TeeTimeThread.java new file mode 100644 index 00000000..00e96fac --- /dev/null +++ b/src/main/java/teetime/framework/TeeTimeThread.java @@ -0,0 +1,29 @@ +package teetime.framework; + +public class TeeTimeThread extends Thread { + + private final AbstractRunnableStage runnable; + + public TeeTimeThread(final AbstractRunnableStage runnable, final String name) { + super(runnable, name); + this.runnable = runnable; + } + + public void sendInitializingSignal() { + if (runnable instanceof RunnableProducerStage) { + ((RunnableProducerStage) runnable).triggerInitializingSignal(); + } + } + + public void sendStartingSignal() { + if (runnable instanceof RunnableProducerStage) { + ((RunnableProducerStage) runnable).triggerStartingSignal(); + } + } + + @Override + public synchronized void start() { + runnable.stage.getOwningContext().getThreadService().getRunnableCounter().inc(); + super.start(); + } +} diff --git a/src/main/java/teetime/framework/ThreadService.java b/src/main/java/teetime/framework/ThreadService.java index 7c76df6f..b80e0a36 100644 --- a/src/main/java/teetime/framework/ThreadService.java +++ b/src/main/java/teetime/framework/ThreadService.java @@ -1,17 +1,14 @@ package teetime.framework; import java.util.ArrayList; -import java.util.HashMap; import java.util.LinkedList; import java.util.List; -import java.util.Map; import java.util.Set; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import teetime.framework.exceptionHandling.AbstractExceptionListener; -import teetime.framework.exceptionHandling.IExceptionListenerFactory; +import teetime.framework.Traverser.Direction; import teetime.util.framework.concurrent.SignalingCounter; /** @@ -23,8 +20,6 @@ import teetime.util.framework.concurrent.SignalingCounter; */ class ThreadService extends AbstractService<ThreadService> { - private Map<Stage, String> threadableStages = new HashMap<Stage, String>(); - private static final Logger LOGGER = LoggerFactory.getLogger(ThreadService.class); private final List<Thread> consumerThreads = new LinkedList<Thread>(); @@ -32,36 +27,67 @@ class ThreadService extends AbstractService<ThreadService> { private final List<Thread> infiniteProducerThreads = new LinkedList<Thread>(); private final SignalingCounter runnableCounter = new SignalingCounter(); + private final Configuration configuration; - private final AbstractCompositeStage compositeStage; + private Set<Stage> threadableStages; - public ThreadService(final AbstractCompositeStage compositeStage) { - this.compositeStage = compositeStage; + public ThreadService(final Configuration configuration) { + this.configuration = configuration; } - private final List<RunnableProducerStage> producerRunnables = new LinkedList<RunnableProducerStage>(); - @Override void onInitialize() { - // is invoked only by a Configuration - IExceptionListenerFactory factory = ((Configuration) compositeStage).getFactory(); + Stage startStage = configuration.getStartStage(); + if (startStage == null) { + throw new IllegalStateException("The start stage may not be null."); + } + + // TODO visit(port) only + // TODO use decorator pattern to combine all analyzes so that only one traverser pass is necessary + IPortVisitor portVisitor = new A0UnconnectedPort(); + IPipeVisitor pipeVisitor = new A1PipeInstantiation(); + Traverser traversor = new Traverser(portVisitor, pipeVisitor, Direction.BOTH); + traversor.traverse(startStage); + + A2ThreadableStageCollector stageCollector = new A2ThreadableStageCollector(); + traversor = new Traverser(stageCollector, Direction.BOTH); + traversor.traverse(startStage); + threadableStages = stageCollector.getThreadableStages(); if (threadableStages.isEmpty()) { throw new IllegalStateException("No stage was added using the addThreadableStage(..) method. Add at least one stage."); } - for (Stage stage : threadableStages.keySet()) { - final Thread thread = initializeStage(stage); + A3InvalidThreadAssignmentCheck checker = new A3InvalidThreadAssignmentCheck(threadableStages); + checker.check(); - final Set<Stage> intraStages = traverseIntraStages(stage); + A4StageAttributeSetter attributeSetter = new A4StageAttributeSetter(configuration, threadableStages); + attributeSetter.setAttributes(); - final AbstractExceptionListener newListener = factory.createInstance(); - initializeIntraStages(intraStages, thread, newListener); + for (Stage stage : threadableStages) { + categorizeThreadableStage(stage); } onStart(); } + private void categorizeThreadableStage(final Stage stage) { + switch (stage.getTerminationStrategy()) { + case BY_INTERRUPT: + infiniteProducerThreads.add(stage.getOwningThread()); + break; + case BY_SELF_DECISION: + finiteProducerThreads.add(stage.getOwningThread()); + break; + case BY_SIGNAL: + consumerThreads.add(stage.getOwningThread()); + break; + default: + LOGGER.warn("Unknown termination strategy '" + stage.getTerminationStrategy() + "' in stage " + stage); + break; + } + } + @Override void onStart() { startThreads(this.consumerThreads); @@ -78,7 +104,7 @@ class ThreadService extends AbstractService<ThreadService> { @Override void onTerminate() { - for (Stage stage : threadableStages.keySet()) { + for (Stage stage : threadableStages) { stage.terminate(); } } @@ -122,62 +148,6 @@ class ThreadService extends AbstractService<ThreadService> { return exceptions; } - private void initializeIntraStages(final Set<Stage> intraStages, final Thread thread, final AbstractExceptionListener newListener) { - for (Stage intraStage : intraStages) { - intraStage.setOwningThread(thread); - intraStage.setExceptionHandler(newListener); - } - } - - private Thread initializeStage(final Stage stage) { - final Thread thread; - - final TerminationStrategy terminationStrategy = stage.getTerminationStrategy(); - switch (terminationStrategy) { - case BY_SIGNAL: { - final RunnableConsumerStage runnable = new RunnableConsumerStage(stage); - thread = createThread(runnable, stage.getId()); - this.consumerThreads.add(thread); - break; - } - case BY_SELF_DECISION: { - final RunnableProducerStage runnable = new RunnableProducerStage(stage); - producerRunnables.add(runnable); - thread = createThread(runnable, stage.getId()); - this.finiteProducerThreads.add(thread); - break; - } - case BY_INTERRUPT: { - final RunnableProducerStage runnable = new RunnableProducerStage(stage); - producerRunnables.add(runnable); - thread = createThread(runnable, stage.getId()); - this.infiniteProducerThreads.add(thread); - break; - } - default: - throw new IllegalStateException("Unhandled termination strategy: " + terminationStrategy); - } - return thread; - } - - private Thread createThread(final AbstractRunnableStage runnable, final String name) { - final Thread thread = new Thread(runnable); - thread.setName(threadableStages.get(runnable.stage)); - return thread; - } - - private Set<Stage> traverseIntraStages(final Stage stage) { - final Traversor traversor = new Traversor(new IntraStageCollector()); - traversor.traverse(stage); - return traversor.getVisitedStage(); - } - - void addThreadableStage(final Stage stage, final String threadName) { - if (this.threadableStages.put(stage, threadName) != null && LOGGER.isWarnEnabled()) { - LOGGER.warn("Stage " + stage.getId() + " was already marked as threadable stage."); - } - } - private void startThreads(final Iterable<Thread> threads) { for (Thread thread : threads) { thread.start(); @@ -185,30 +155,32 @@ class ThreadService extends AbstractService<ThreadService> { } private void sendInitializingSignal() { - for (RunnableProducerStage runnable : producerRunnables) { - runnable.triggerInitializingSignal(); + for (Thread thread : infiniteProducerThreads) { + ((TeeTimeThread) thread).sendInitializingSignal(); + } + for (Thread thread : finiteProducerThreads) { + ((TeeTimeThread) thread).sendInitializingSignal(); } } private void sendStartingSignal() { - for (RunnableProducerStage runnable : producerRunnables) { - runnable.triggerStartingSignal(); + for (Thread thread : infiniteProducerThreads) { + ((TeeTimeThread) thread).sendStartingSignal(); + } + for (Thread thread : finiteProducerThreads) { + ((TeeTimeThread) thread).sendStartingSignal(); } } - Map<Stage, String> getThreadableStages() { + Set<Stage> getThreadableStages() { return threadableStages; } - void setThreadableStages(final Map<Stage, String> threadableStages) { - this.threadableStages = threadableStages; - } - - @Override - void merge(final ThreadService source) { - threadableStages.putAll(source.getThreadableStages()); - // runnableCounter.inc(source.runnableCounter); - } + // @Override + // void merge(final ThreadService source) { + // threadableStages.putAll(source.getThreadableStages()); + // // runnableCounter.inc(source.runnableCounter); + // } SignalingCounter getRunnableCounter() { return runnableCounter; diff --git a/src/main/java/teetime/framework/Traverser.java b/src/main/java/teetime/framework/Traverser.java new file mode 100644 index 00000000..d430e33d --- /dev/null +++ b/src/main/java/teetime/framework/Traverser.java @@ -0,0 +1,97 @@ +/** + * Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://christianwulf.github.io/teetime) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package teetime.framework; + +import java.util.HashSet; +import java.util.Set; + +import teetime.framework.IPipeVisitor.VisitorBehavior; +import teetime.framework.pipe.DummyPipe; +import teetime.framework.pipe.IPipe; + +public class Traverser { + + public static enum Direction { + BACKWARD(1), FORWARD(2), BOTH(BACKWARD.value | FORWARD.value); + + private final int value; + + private Direction(final int value) { + this.value = value; + } + + public boolean represents(final Direction direction) { + return (value & direction.value) == direction.value; + } + } + + private static final IPortVisitor DEFAULT_PORT_VISITOR = new IPortVisitor() { + @Override + public void visit(final AbstractPort<?> port) { + // do nothing + } + }; + + private final IPortVisitor portVisitor; + private final IPipeVisitor pipeVisitor; + private final Direction direction; + private final Set<Stage> visitedStages = new HashSet<Stage>(); + + public Traverser(final IPipeVisitor pipeVisitor) { + this(pipeVisitor, Direction.FORWARD); + } + + public Traverser(final IPipeVisitor pipeVisitor, final Direction direction) { + this(DEFAULT_PORT_VISITOR, pipeVisitor, direction); + } + + public Traverser(final IPortVisitor portVisitor, final IPipeVisitor pipeVisitor, final Direction direction) { + this.portVisitor = portVisitor; + this.pipeVisitor = pipeVisitor; + this.direction = direction; + } + + public void traverse(final Stage stage) { + if (!visitedStages.add(stage)) { + return; + } + + if (direction.represents(Direction.FORWARD)) { + for (OutputPort<?> outputPort : stage.getOutputPorts()) { + visitAndTraverse(outputPort, Direction.FORWARD); + } + } + + if (direction.represents(Direction.BACKWARD)) { + for (InputPort<?> inputPort : stage.getInputPorts()) { + visitAndTraverse(inputPort, Direction.BACKWARD); + } + } + } + + private void visitAndTraverse(final AbstractPort<?> port, final Direction direction) { + portVisitor.visit(port); + IPipe<?> pipe = port.getPipe(); + if (pipe != DummyPipe.INSTANCE && pipeVisitor.visit(pipe) == VisitorBehavior.CONTINUE) { + AbstractPort<?> nextPort = (direction == Direction.FORWARD) ? pipe.getTargetPort() : pipe.getSourcePort(); + traverse(nextPort.getOwningStage()); // recursive call + } + } + + public Set<Stage> getVisitedStages() { + return visitedStages; + } +} diff --git a/src/main/java/teetime/framework/Traversor.java b/src/main/java/teetime/framework/Traversor.java deleted file mode 100644 index 67d3f6c7..00000000 --- a/src/main/java/teetime/framework/Traversor.java +++ /dev/null @@ -1,72 +0,0 @@ -/** - * Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://christianwulf.github.io/teetime) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package teetime.framework; - -import java.util.HashSet; -import java.util.Set; - -import teetime.framework.IPipeVisitor.VisitorBehavior; -import teetime.framework.pipe.IPipe; - -public class Traversor { - - public static enum Direction { - BACKWARD, FORWARD, BOTH - } - - private final IPipeVisitor pipeVisitor; - private final Direction direction; - private final Set<Stage> visitedStages = new HashSet<Stage>(); - - public Traversor(final IPipeVisitor pipeVisitor) { - this(pipeVisitor, Direction.FORWARD); - } - - public Traversor(final IPipeVisitor pipeVisitor, final Direction direction) { - this.pipeVisitor = pipeVisitor; - this.direction = direction; - } - - public void traverse(final Stage stage) { - if (!visitedStages.add(stage)) { - return; - } - - if (direction == Direction.BOTH || direction == Direction.FORWARD) { - for (OutputPort<?> outputPort : stage.getOutputPorts()) { - visitAndTraverse(outputPort); - } - } - - if (direction == Direction.BOTH || direction == Direction.BACKWARD) { - for (InputPort<?> inputPort : stage.getInputPorts()) { - visitAndTraverse(inputPort); - } - } - } - - private void visitAndTraverse(final AbstractPort<?> port) { - IPipe pipe = port.getPipe(); - if (null != pipe && pipeVisitor.visit(pipe) == VisitorBehavior.CONTINUE) { - Stage owningStage = pipe.getTargetPort().getOwningStage(); - traverse(owningStage); // recursive call - } - } - - public Set<Stage> getVisitedStage() { - return visitedStages; - } -} diff --git a/src/main/java/teetime/framework/pipe/DummyPipe.java b/src/main/java/teetime/framework/pipe/DummyPipe.java index 24809109..a41f9233 100644 --- a/src/main/java/teetime/framework/pipe/DummyPipe.java +++ b/src/main/java/teetime/framework/pipe/DummyPipe.java @@ -27,6 +27,12 @@ import teetime.framework.signal.ISignal; */ public final class DummyPipe<T> implements IPipe<T> { + public static final IPipe<?> INSTANCE = new DummyPipe<Object>(); + + private DummyPipe() { + // singleton + } + @Override public boolean add(final Object element) { return true; diff --git a/src/test/java/teetime/examples/cipher/CipherConfiguration.java b/src/test/java/teetime/examples/cipher/CipherConfiguration.java index df9fefcc..60ef0944 100644 --- a/src/test/java/teetime/examples/cipher/CipherConfiguration.java +++ b/src/test/java/teetime/examples/cipher/CipherConfiguration.java @@ -46,6 +46,5 @@ public class CipherConfiguration extends Configuration { connectPorts(comp.getOutputPort(), decomp.getInputPort()); connectPorts(decomp.getOutputPort(), decrypt.getInputPort()); connectPorts(decrypt.getOutputPort(), writer.getInputPort()); - } } diff --git a/src/test/java/teetime/examples/tokenizer/TokenizerTest.java b/src/test/java/teetime/examples/tokenizer/TokenizerTest.java index b176ce5e..bfe97535 100644 --- a/src/test/java/teetime/examples/tokenizer/TokenizerTest.java +++ b/src/test/java/teetime/examples/tokenizer/TokenizerTest.java @@ -42,7 +42,7 @@ public class TokenizerTest { final String password = "Password"; final TokenizerConfiguration configuration = new TokenizerConfiguration(inputFile, password); - final Execution execution = new Execution(configuration); + final Execution<TokenizerConfiguration> execution = new Execution<TokenizerConfiguration>(configuration); execution.executeBlocking(); final String string = Files.toString(new File("src/test/resources/data/input.txt"), Charset.forName("UTF-8")); diff --git a/src/test/java/teetime/framework/RunnableConsumerStageTest.java b/src/test/java/teetime/framework/RunnableConsumerStageTest.java index 2d16fcb8..251a4961 100644 --- a/src/test/java/teetime/framework/RunnableConsumerStageTest.java +++ b/src/test/java/teetime/framework/RunnableConsumerStageTest.java @@ -34,7 +34,7 @@ public class RunnableConsumerStageTest { public void testWaitingInfinitely() throws Exception { RunnableConsumerStageTestConfiguration configuration = new RunnableConsumerStageTestConfiguration(); - final Execution execution = new Execution(configuration); + final Execution<?> execution = new Execution<RunnableConsumerStageTestConfiguration>(configuration); final Thread thread = new Thread(new Runnable() { @Override public void run() { @@ -59,7 +59,7 @@ public class RunnableConsumerStageTest { public void testCorrectStartAndTerminatation() throws Exception { RunnableConsumerStageTestConfiguration configuration = new RunnableConsumerStageTestConfiguration(0, 1, 2, 3, 5); - final Execution execution = new Execution(configuration); + final Execution<?> execution = new Execution<RunnableConsumerStageTestConfiguration>(configuration); start(execution); assertEquals(5, configuration.getCollectedElements().size()); @@ -109,7 +109,7 @@ public class RunnableConsumerStageTest { public void testYieldRun() throws Exception { YieldStrategyConfiguration waitStrategyConfiguration = new YieldStrategyConfiguration(42); - final Execution execution = new Execution(waitStrategyConfiguration); + final Execution<?> execution = new Execution<YieldStrategyConfiguration>(waitStrategyConfiguration); start(execution); @@ -117,7 +117,7 @@ public class RunnableConsumerStageTest { assertEquals(1, waitStrategyConfiguration.getCollectorSink().getElements().size()); } - private void start(final Execution execution) { + private void start(final Execution<?> execution) { Collection<ThreadThrowableContainer> exceptions = new ArrayList<ThreadThrowableContainer>(); try { execution.executeBlocking(); diff --git a/src/test/java/teetime/framework/RunnableProducerStageTest.java b/src/test/java/teetime/framework/RunnableProducerStageTest.java index a2569338..fde7dda3 100644 --- a/src/test/java/teetime/framework/RunnableProducerStageTest.java +++ b/src/test/java/teetime/framework/RunnableProducerStageTest.java @@ -20,11 +20,14 @@ import static org.junit.Assert.assertTrue; import org.junit.Test; +import teetime.framework.pipe.DummyPipe; + public class RunnableProducerStageTest { @Test public void testInit() { RunnableTestStage testStage = new RunnableTestStage(); + testStage.getOutputPort().setPipe(DummyPipe.INSTANCE); RunnableProducerStage runnable = new RunnableProducerStage(testStage); Thread thread = new Thread(runnable); thread.start(); diff --git a/src/test/java/teetime/framework/StageTest.java b/src/test/java/teetime/framework/StageTest.java index 2cf31d61..937a3b2f 100644 --- a/src/test/java/teetime/framework/StageTest.java +++ b/src/test/java/teetime/framework/StageTest.java @@ -48,7 +48,7 @@ public class StageTest { public void testSetOwningThread() throws Exception { TestConfig tc = new TestConfig(); new Execution<TestConfig>(tc); - assertEquals(tc.init.owningThread, tc.delay.owningThread); + assertEquals(tc.init.getOwningThread(), tc.delay.getOwningThread()); assertThat(tc.delay.exceptionListener, is(notNullValue())); assertEquals(tc.init.exceptionListener, tc.delay.exceptionListener); } diff --git a/src/test/java/teetime/framework/TraversorTest.java b/src/test/java/teetime/framework/TraverserTest.java similarity index 95% rename from src/test/java/teetime/framework/TraversorTest.java rename to src/test/java/teetime/framework/TraverserTest.java index 35938bb5..e8272835 100644 --- a/src/test/java/teetime/framework/TraversorTest.java +++ b/src/test/java/teetime/framework/TraverserTest.java @@ -35,9 +35,9 @@ import teetime.stage.io.File2SeqOfWords; import teetime.stage.string.WordCounter; import teetime.stage.util.CountingMap; -public class TraversorTest { +public class TraverserTest { - private final Traversor traversor = new Traversor(new IntraStageCollector()); + private final Traverser traversor = new Traverser(new IntraStageCollector()); @Test public void traverse() { @@ -52,7 +52,7 @@ public class TraversorTest { OutputPort<?> distributorOutputPort0 = tc.distributor.getOutputPorts().get(0); assertThat(tc.distributor.getOwningThread(), is(not(distributorOutputPort0.pipe.getTargetPort().getOwningStage().getOwningThread()))); - assertEquals(comparingStages, traversor.getVisitedStage()); + assertEquals(comparingStages, traversor.getVisitedStages()); } // WordCounterConfiguration diff --git a/src/test/resources/data/output.txt b/src/test/resources/data/output.txt index e69de29b..5c016264 100644 --- a/src/test/resources/data/output.txt +++ b/src/test/resources/data/output.txt @@ -0,0 +1,11 @@ +Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. +Duis autem vel eum iriure dolor in hendrerit in vulputate velit esse molestie consequat, vel illum dolore eu feugiat nulla facilisis at vero eros et accumsan et iusto odio dignissim qui blandit praesent luptatum zzril delenit augue duis dolore te feugait nulla facilisi. Lorem ipsum dolor sit amet, consectetuer adipiscing elit, sed diam nonummy nibh euismod tincidunt ut laoreet dolore magna aliquam erat volutpat. +Ut wisi enim ad minim veniam, quis nostrud exerci tation ullamcorper suscipit lobortis nisl ut aliquip ex ea commodo consequat. Duis autem vel eum iriure dolor in hendrerit in vulputate velit esse molestie consequat, vel illum dolore eu feugiat nulla facilisis at vero eros et accumsan et iusto odio dignissim qui blandit praesent luptatum zzril delenit augue duis dolore te feugait nulla facilisi. +Nam liber tempor cum soluta nobis eleifend option congue nihil imperdiet doming id quod mazim placerat facer possim assum. Lorem ipsum dolor sit amet, consectetuer adipiscing elit, sed diam nonummy nibh euismod tincidunt ut laoreet dolore magna aliquam erat volutpat. Ut wisi enim ad minim veniam, quis nostrud exerci tation ullamcorper suscipit lobortis nisl ut aliquip ex ea commodo consequat. +Duis autem vel eum iriure dolor in hendrerit in vulputate velit esse molestie consequat, vel illum dolore eu feugiat nulla facilisis. +At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, At accusam aliquyam diam diam dolore dolores duo eirmod eos erat, et nonumy sed tempor et et invidunt justo labore Stet clita ea et gubergren, kasd magna no rebum. sanctus sea sed takimata ut vero voluptua. est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat. +Consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus. +Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. +Duis autem vel eum iriure dolor in hendrerit in vulputate velit esse molestie consequat, vel illum dolore eu feugiat nulla facilisis at vero eros et accumsan et iusto odio dignissim qui blandit praesent luptatum zzril delenit augue duis dolore te feugait nulla facilisi. Lorem ipsum dolor sit amet, consectetuer adipiscing elit, sed diam nonummy nibh euismod tincidunt ut laoreet dolore magna aliquam erat volutpat. +Ut wisi enim ad minim veniam, quis nostrud exerci tation ullamcorper suscipit lobortis nisl ut aliquip ex ea commodo consequat. Duis autem vel eum iriure dolor in hendrerit in vulputate velit esse molestie consequat, vel illum dolore eu feugiat nulla facilisis at vero eros et accumsan et iusto odio dignissim qui blandit praesent luptatum zzril delenit augue duis dolore te feugait nulla facilisi. +Nam liber tempor cum soluta nobis eleifend option congue nihil imperdiet doming id quod mazim placerat facer possim assum. Lorem ipsum dolor sit amet, consectetuer adipiscing elit, sed diam nonummy nibh euismod tincidunt ut laoreet dolore magna aliquam erat volutpat. Ut wisi enim ad minim veniam, quis nostrud exerci tation ullamcorper suscipit lobortis nisl ut aliquip ex ea commodo \ No newline at end of file diff --git a/src/test/resources/logback-test.xml b/src/test/resources/logback-test.xml index e1bcc378..d713e377 100644 --- a/src/test/resources/logback-test.xml +++ b/src/test/resources/logback-test.xml @@ -22,7 +22,8 @@ <logger name="teetime" level="INFO" /> <logger name="teetime.framework" level="TRACE" /> - <logger name="teetime.stage.InitialElementProducer" level="DEBUG" /> + <logger name="teetime.stage.InitialElementProducer" level="TRACE" /> + <logger name="teetime.stage.CollectorSink" level="TRACE" /> <logger name="teetime.stage.merger" level="TRACE" /> <!-- <logger name="teetime.stage" level="TRACE" /> --> <!-- <logger name="teetime.framework.signal" level="TRACE" /> --> -- GitLab