From 315f61b6ad2c2caf11e46bdf4cc0e9444903408f Mon Sep 17 00:00:00 2001 From: Christian Wulf <chw@informatik.uni-kiel.de> Date: Tue, 22 Sep 2015 14:28:54 +0200 Subject: [PATCH] removed Stage --- .../framework/A1ThreadableStageCollector.java | 6 +- .../A2InvalidThreadAssignmentCheck.java | 18 +- .../framework/A3PipeInstantiation.java | 2 +- .../framework/A4StageAttributeSetter.java | 12 +- .../java/teetime/framework/AbstractPipe.java | 2 +- .../java/teetime/framework/AbstractPort.java | 6 +- .../framework/AbstractRunnableStage.java | 10 +- .../java/teetime/framework/AbstractStage.java | 180 ++++++++++++-- .../java/teetime/framework/Configuration.java | 4 +- .../framework/ConfigurationContext.java | 4 +- .../java/teetime/framework/Execution.java | 4 +- .../framework/ExecutionInstantiation.java | 16 +- .../teetime/framework/ITraverserVisitor.java | 2 +- .../java/teetime/framework/InputPort.java | 2 +- .../framework/IntraStageCollector.java | 6 +- .../java/teetime/framework/OutputPort.java | 2 +- .../framework/RunnableConsumerStage.java | 4 +- .../framework/RunnableProducerStage.java | 2 +- .../framework/RuntimeServiceFacade.java | 2 +- src/main/java/teetime/framework/Stage.java | 219 ------------------ .../java/teetime/framework/ThreadService.java | 32 +-- .../java/teetime/framework/Traverser.java | 6 +- .../AbstractExceptionListener.java | 8 +- .../IgnoringExceptionListener.java | 4 +- .../LoggingExceptionListener.java | 4 +- .../TerminatingExceptionListener.java | 4 +- .../teetime/framework/signal/ISignal.java | 4 +- .../framework/signal/StartingSignal.java | 4 +- .../framework/signal/TerminatingSignal.java | 4 +- .../framework/signal/ValidatingSignal.java | 4 +- .../teetime/framework/test/InputHolder.java | 6 +- .../teetime/framework/test/StageTester.java | 10 +- .../teetime/stage/io/EveryXthPrinter.java | 6 +- .../util/framework/port/PortAction.java | 4 +- .../util/framework/port/PortActionHelper.java | 6 +- ...{StageTest.java => AbstractStageTest.java} | 8 +- .../java/teetime/framework/TraverserTest.java | 2 +- .../framework/WaitStrategyConfiguration.java | 6 +- .../framework/YieldStrategyConfiguration.java | 2 +- .../exceptionHandling/TestListener.java | 4 +- .../dynamic/DynamicDistributorTest.java | 4 +- 41 files changed, 282 insertions(+), 353 deletions(-) delete mode 100644 src/main/java/teetime/framework/Stage.java rename src/test/java/teetime/framework/{StageTest.java => AbstractStageTest.java} (95%) diff --git a/src/main/java/teetime/framework/A1ThreadableStageCollector.java b/src/main/java/teetime/framework/A1ThreadableStageCollector.java index 387ad88f..0ec14de3 100644 --- a/src/main/java/teetime/framework/A1ThreadableStageCollector.java +++ b/src/main/java/teetime/framework/A1ThreadableStageCollector.java @@ -26,14 +26,14 @@ import teetime.framework.pipe.DummyPipe; */ class A1ThreadableStageCollector implements ITraverserVisitor { - private final Set<Stage> threadableStages = new HashSet<Stage>(); + private final Set<AbstractStage> threadableStages = new HashSet<AbstractStage>(); - public Set<Stage> getThreadableStages() { + public Set<AbstractStage> getThreadableStages() { return threadableStages; } @Override - public VisitorBehavior visit(final Stage stage) { + public VisitorBehavior visit(final AbstractStage stage) { if (stage.getOwningThread() != null && !threadableStages.contains(stage) && stage.getCurrentState() == StageState.CREATED) { threadableStages.add(stage); } diff --git a/src/main/java/teetime/framework/A2InvalidThreadAssignmentCheck.java b/src/main/java/teetime/framework/A2InvalidThreadAssignmentCheck.java index fd471230..1a36cbd3 100644 --- a/src/main/java/teetime/framework/A2InvalidThreadAssignmentCheck.java +++ b/src/main/java/teetime/framework/A2InvalidThreadAssignmentCheck.java @@ -31,17 +31,17 @@ public class A2InvalidThreadAssignmentCheck { private static final int DEFAULT_COLOR = 0; - private final Set<Stage> threadableStages; + private final Set<AbstractStage> threadableStages; - public A2InvalidThreadAssignmentCheck(final Set<Stage> threadableStages) { + public A2InvalidThreadAssignmentCheck(final Set<AbstractStage> threadableStages) { this.threadableStages = threadableStages; } public void check() { int color = DEFAULT_COLOR; - ObjectIntMap<Stage> colors = new ObjectIntHashMap<Stage>(); + ObjectIntMap<AbstractStage> colors = new ObjectIntHashMap<AbstractStage>(); - for (Stage threadableStage : threadableStages) { + for (AbstractStage threadableStage : threadableStages) { color++; colors.put(threadableStage, color); @@ -53,11 +53,11 @@ public class A2InvalidThreadAssignmentCheck { private static class ThreadPainter implements ITraverserVisitor { - private final ObjectIntMap<Stage> colors; + private final ObjectIntMap<AbstractStage> colors; private final int color; - private final Set<Stage> threadableStages; + private final Set<AbstractStage> threadableStages; - public ThreadPainter(final ObjectIntMap<Stage> colors, final int color, final Set<Stage> threadableStages) { + public ThreadPainter(final ObjectIntMap<AbstractStage> colors, final int color, final Set<AbstractStage> threadableStages) { super(); this.colors = colors; this.color = color; @@ -65,7 +65,7 @@ public class A2InvalidThreadAssignmentCheck { } @Override - public VisitorBehavior visit(final Stage stage) { + public VisitorBehavior visit(final AbstractStage stage) { return VisitorBehavior.CONTINUE; } @@ -73,7 +73,7 @@ public class A2InvalidThreadAssignmentCheck { public VisitorBehavior visit(final AbstractPort<?> port) { IPipe<?> pipe = port.getPipe(); // FIXME line below requires FORWARD. should be independent of the used direction - Stage targetStage = pipe.getTargetPort().getOwningStage(); + AbstractStage targetStage = pipe.getTargetPort().getOwningStage(); int targetColor = colors.containsKey(targetStage) ? colors.get(targetStage) : DEFAULT_COLOR; diff --git a/src/main/java/teetime/framework/A3PipeInstantiation.java b/src/main/java/teetime/framework/A3PipeInstantiation.java index 72afb486..f59a75e7 100644 --- a/src/main/java/teetime/framework/A3PipeInstantiation.java +++ b/src/main/java/teetime/framework/A3PipeInstantiation.java @@ -44,7 +44,7 @@ class A3PipeInstantiation implements ITraverserVisitor { private final Set<IPipe<?>> visitedPipes = new HashSet<IPipe<?>>(); @Override - public VisitorBehavior visit(final Stage stage) { + public VisitorBehavior visit(final AbstractStage stage) { return VisitorBehavior.CONTINUE; } diff --git a/src/main/java/teetime/framework/A4StageAttributeSetter.java b/src/main/java/teetime/framework/A4StageAttributeSetter.java index 98373e41..99f45eb9 100644 --- a/src/main/java/teetime/framework/A4StageAttributeSetter.java +++ b/src/main/java/teetime/framework/A4StageAttributeSetter.java @@ -23,21 +23,21 @@ import java.util.Set; class A4StageAttributeSetter { private final Configuration configuration; - private final Set<Stage> threadableStages; + private final Set<AbstractStage> threadableStages; - public A4StageAttributeSetter(final Configuration configuration, final Set<Stage> threadableStages) { + public A4StageAttributeSetter(final Configuration configuration, final Set<AbstractStage> threadableStages) { super(); this.configuration = configuration; this.threadableStages = threadableStages; } public void setAttributes() { - for (Stage threadableStage : threadableStages) { + for (AbstractStage threadableStage : threadableStages) { setAttributes(threadableStage); } } - private void setAttributes(final Stage threadableStage) { + private void setAttributes(final AbstractStage threadableStage) { IntraStageCollector visitor = new IntraStageCollector(threadableStage); Traverser traverser = new Traverser(visitor); traverser.traverse(threadableStage); @@ -45,12 +45,12 @@ class A4StageAttributeSetter { setAttributes(threadableStage, traverser.getVisitedStages()); } - private void setAttributes(final Stage threadableStage, final Set<Stage> intraStages) { + private void setAttributes(final AbstractStage threadableStage, final Set<AbstractStage> intraStages) { threadableStage.setExceptionHandler(configuration.getFactory().createInstance(threadableStage.getOwningThread())); // threadableStage.setOwningThread(owningThread); threadableStage.setOwningContext(configuration.getContext()); - for (Stage stage : intraStages) { + for (AbstractStage stage : intraStages) { stage.setExceptionHandler(threadableStage.exceptionListener); stage.setOwningThread(threadableStage.getOwningThread()); stage.setOwningContext(threadableStage.getOwningContext()); diff --git a/src/main/java/teetime/framework/AbstractPipe.java b/src/main/java/teetime/framework/AbstractPipe.java index 55c1f55b..1b0f1546 100644 --- a/src/main/java/teetime/framework/AbstractPipe.java +++ b/src/main/java/teetime/framework/AbstractPipe.java @@ -26,7 +26,7 @@ public abstract class AbstractPipe<T> implements IPipe<T> { * this.getPipe().getTargetPort().getOwningStage() * </pre> */ - protected final Stage cachedTargetStage; + protected final AbstractStage cachedTargetStage; private final OutputPort<? extends T> sourcePort; private final InputPort<T> targetPort; diff --git a/src/main/java/teetime/framework/AbstractPort.java b/src/main/java/teetime/framework/AbstractPort.java index 50693ae3..ce28991e 100644 --- a/src/main/java/teetime/framework/AbstractPort.java +++ b/src/main/java/teetime/framework/AbstractPort.java @@ -27,10 +27,10 @@ public abstract class AbstractPort<T> { * </p> */ private final Class<T> type; - private final Stage owningStage; + private final AbstractStage owningStage; private final String name; - protected AbstractPort(final Class<T> type, final Stage owningStage, final String name) { + protected AbstractPort(final Class<T> type, final AbstractStage owningStage, final String name) { super(); this.type = type; this.owningStage = owningStage; @@ -41,7 +41,7 @@ public abstract class AbstractPort<T> { return this.type; } - public Stage getOwningStage() { + public AbstractStage getOwningStage() { return owningStage; } diff --git a/src/main/java/teetime/framework/AbstractRunnableStage.java b/src/main/java/teetime/framework/AbstractRunnableStage.java index d40c278d..99cc6d09 100644 --- a/src/main/java/teetime/framework/AbstractRunnableStage.java +++ b/src/main/java/teetime/framework/AbstractRunnableStage.java @@ -31,13 +31,13 @@ abstract class AbstractRunnableStage implements Runnable { private final StopWatch stopWatch = new StopWatch(); - protected final Stage stage; + protected final AbstractStage stage; @SuppressWarnings("PMD.LoggerIsNotStaticFinal") protected final Logger logger; - public static final Map<Stage, Long> durationsInNs = Collections.synchronizedMap(new LinkedHashMap<Stage, Long>()); + public static final Map<AbstractStage, Long> durationsInNs = Collections.synchronizedMap(new LinkedHashMap<AbstractStage, Long>()); - protected AbstractRunnableStage(final Stage stage) { + protected AbstractRunnableStage(final AbstractStage stage) { if (stage == null) { throw new IllegalArgumentException("Argument stage may not be null"); } @@ -48,7 +48,7 @@ abstract class AbstractRunnableStage implements Runnable { @Override public final void run() { - final Stage stage = this.stage; // should prevent the stage to be reloaded after a volatile read + final AbstractStage stage = this.stage; // should prevent the stage to be reloaded after a volatile read final Logger logger = this.logger; // should prevent the logger to be reloaded after a volatile read logger.debug("Executing runnable stage..."); @@ -96,7 +96,7 @@ abstract class AbstractRunnableStage implements Runnable { protected abstract void afterStageExecution(); - static AbstractRunnableStage create(final Stage stage) { + static AbstractRunnableStage create(final AbstractStage stage) { if (stage.getInputPorts().size() > 0) { return new RunnableConsumerStage(stage); } else { diff --git a/src/main/java/teetime/framework/AbstractStage.java b/src/main/java/teetime/framework/AbstractStage.java index 447b243c..3891b124 100644 --- a/src/main/java/teetime/framework/AbstractStage.java +++ b/src/main/java/teetime/framework/AbstractStage.java @@ -20,14 +20,165 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import teetime.framework.exceptionHandling.AbstractExceptionListener; +import teetime.framework.exceptionHandling.AbstractExceptionListener.FurtherExecution; +import teetime.framework.exceptionHandling.TerminateException; import teetime.framework.pipe.IPipe; import teetime.framework.signal.ISignal; import teetime.framework.validation.InvalidPortConnection; import teetime.util.framework.port.PortList; import teetime.util.framework.port.PortRemovedListener; -public abstract class AbstractStage extends Stage { +/** + * Represents a minimal Stage, with some pre-defined methods. + * Implemented stages need to adapt all abstract methods with own implementations. + */ +@SuppressWarnings("PMD.AbstractNaming") +public abstract class AbstractStage { + + private static final ConcurrentMap<String, Integer> INSTANCES_COUNTER = new ConcurrentHashMap<String, Integer>(); + private static final NotEnoughInputException NOT_ENOUGH_INPUT_EXCEPTION = new NotEnoughInputException(); + + private final String id; + /** + * A unique logger instance per stage instance + */ + @SuppressWarnings("PMD.LoggerIsNotStaticFinal") + protected final Logger logger; + + protected AbstractExceptionListener exceptionListener; + + /** The owning thread of this stage if this stage is directly executed by a {@link AbstractRunnableStage}, <code>null</code> otherwise. */ + private Thread owningThread; + + private boolean isActive; + + private ConfigurationContext owningContext; + + ConfigurationContext getOwningContext() { + return owningContext; + } + + void setOwningContext(final ConfigurationContext owningContext) { + this.owningContext = owningContext; + } + + protected AbstractStage() { + this.id = this.createId(); + this.logger = LoggerFactory.getLogger(this.getClass().getCanonicalName() + ":" + id); + } + + /** + * @return an identifier that is unique among all stage instances. It is especially unique among all instances of the same stage type. + */ + public String getId() { + return this.id; + } + + @Override + public String toString() { + return this.getClass().getName() + ": " + this.getId(); + } + + private String createId() { + String simpleName = this.getClass().getSimpleName(); + + Integer numInstances = INSTANCES_COUNTER.get(simpleName); + if (null == numInstances) { + numInstances = 0; + } + + String newId = simpleName + "-" + numInstances; + INSTANCES_COUNTER.put(simpleName, ++numInstances); + return newId; + } + + @SuppressWarnings("PMD.DefaultPackage") + static void clearInstanceCounters() { + INSTANCES_COUNTER.clear(); + } + + // public abstract Stage getParentStage(); + // + // public abstract void setParentStage(Stage parentStage, int index); + + protected final void returnNoElement() { + throw NOT_ENOUGH_INPUT_EXCEPTION; + } + + protected final void executeStage() { + try { + this.execute(); + } catch (NotEnoughInputException e) { + throw e; + } catch (TerminateException e) { + throw e; + } catch (Exception e) { + final FurtherExecution furtherExecution = this.exceptionListener.reportException(e, this); + if (furtherExecution == FurtherExecution.TERMINATE) { + throw TerminateException.INSTANCE; + } + } + } + + protected abstract void execute(); + + public Thread getOwningThread() { + return owningThread; + } + + void setOwningThread(final Thread owningThread) { + if (this.owningThread != null && this.owningThread != owningThread) { + // checks also for "crossing threads" + // throw new IllegalStateException("Attribute owningThread was set twice each with another thread"); + } + this.owningThread = owningThread; + } + + // events + + protected final void setExceptionHandler(final AbstractExceptionListener exceptionHandler) { + this.exceptionListener = exceptionHandler; + } + + public boolean isActive() { + return isActive; + } + + void setActive(final boolean isActive) { + this.isActive = isActive; + } + + /** + * Execute this method, to add a stage to the configuration, which should be executed in a own thread. + * + * @param stage + * A arbitrary stage, which will be added to the configuration and executed in a thread. + */ + public void declareActive() { + declareActive(getId()); + } + + /** + * Execute this method, to add a stage to the configuration, which should be executed in a own thread. + * + * @param stage + * A arbitrary stage, which will be added to the configuration and executed in a thread. + * @param threadName + * A string which can be used for debugging. + */ + public void declareActive(final String threadName) { + AbstractRunnableStage runnable = AbstractRunnableStage.create(this); + Thread newThread = new TeeTimeThread(runnable, threadName); + this.setOwningThread(newThread); + this.setActive(true); + } private final Map<Class<? extends ISignal>, Set<InputPort<?>>> signalMap = new HashMap<Class<? extends ISignal>, Set<InputPort<?>>>(); private final Set<Class<? extends ISignal>> triggeredSignalTypes = new HashSet<Class<? extends ISignal>>(); @@ -36,17 +187,14 @@ public abstract class AbstractStage extends Stage { private final PortList<OutputPort<?>> outputPorts = new PortList<OutputPort<?>>(); private volatile StageState currentState = StageState.CREATED; - @Override protected List<InputPort<?>> getInputPorts() { return inputPorts.getOpenedPorts(); // TODO consider to publish a read-only version } - @Override protected List<OutputPort<?>> getOutputPorts() { return outputPorts.getOpenedPorts(); // TODO consider to publish a read-only version } - @Override public StageState getCurrentState() { return currentState; } @@ -55,7 +203,6 @@ public abstract class AbstractStage extends Stage { * May not be invoked outside of IPipe implementations */ @SuppressWarnings("PMD.DataflowAnomalyAnalysis") - @Override public void onSignal(final ISignal signal, final InputPort<?> inputPort) { Class<? extends ISignal> signalClass = signal.getClass(); @@ -113,20 +260,24 @@ public abstract class AbstractStage extends Stage { } } - @Override public void onValidating(final List<InvalidPortConnection> invalidPortConnections) { this.validateOutputPorts(invalidPortConnections); changeState(StageState.VALIDATED); } + /** + * Event that is triggered within the initialization phase of the analysis. + * It does not count to the execution time. + * + * @throws Exception + * an arbitrary exception if an error occurs during the initialization + */ @SuppressWarnings("PMD.SignatureDeclareThrowsException") - @Override public void onStarting() throws Exception { changeState(StageState.STARTED); } @SuppressWarnings("PMD.SignatureDeclareThrowsException") - @Override public void onTerminating() throws Exception { changeState(StageState.TERMINATED); } @@ -255,8 +406,13 @@ public abstract class AbstractStage extends Stage { return outputPort; } + /** + * This should check, if the OutputPorts are connected correctly. This is needed to avoid NullPointerExceptions and other errors. + * + * @param invalidPortConnections + * <i>(Passed as parameter for performance reasons)</i> + */ @SuppressWarnings("PMD.DataflowAnomalyAnalysis") - @Override public void validateOutputPorts(final List<InvalidPortConnection> invalidPortConnections) { for (OutputPort<?> outputPort : outputPorts.getOpenedPorts()) { final IPipe<?> pipe = outputPort.getPipe(); @@ -270,23 +426,19 @@ public abstract class AbstractStage extends Stage { } } - @Override protected void terminate() { changeState(StageState.TERMINATING); } - @Override protected void abort() { this.terminate(); this.getOwningThread().interrupt(); }; - @Override protected boolean shouldBeTerminated() { return (getCurrentState() == StageState.TERMINATING); } - @Override protected TerminationStrategy getTerminationStrategy() { return TerminationStrategy.BY_SIGNAL; } @@ -303,7 +455,6 @@ public abstract class AbstractStage extends Stage { // return inputPort; // } - @Override protected void removeDynamicPort(final OutputPort<?> outputPort) { outputPorts.remove(outputPort); // TODO update setIndex IF it is still used } @@ -312,7 +463,6 @@ public abstract class AbstractStage extends Stage { outputPorts.addPortRemovedListener(outputPortRemovedListener); } - @Override protected void removeDynamicPort(final InputPort<?> inputPort) { inputPorts.remove(inputPort); // TODO update setIndex IF it is still used } diff --git a/src/main/java/teetime/framework/Configuration.java b/src/main/java/teetime/framework/Configuration.java index de6ba150..2812bce5 100644 --- a/src/main/java/teetime/framework/Configuration.java +++ b/src/main/java/teetime/framework/Configuration.java @@ -33,7 +33,7 @@ public abstract class Configuration extends AbstractCompositeStage { private boolean initialized; private boolean executed; - private Stage startStage; + private AbstractStage startStage; protected Configuration() { this(new TerminatingExceptionListenerFactory()); @@ -84,7 +84,7 @@ public abstract class Configuration extends AbstractCompositeStage { return context; } - Stage getStartStage() { + AbstractStage getStartStage() { return startStage; } diff --git a/src/main/java/teetime/framework/ConfigurationContext.java b/src/main/java/teetime/framework/ConfigurationContext.java index 054821bb..7f9edc4d 100644 --- a/src/main/java/teetime/framework/ConfigurationContext.java +++ b/src/main/java/teetime/framework/ConfigurationContext.java @@ -19,7 +19,7 @@ import java.util.Set; /** * Represents a context that is used by a configuration and composite stages to connect ports, for example. - * Stages can be added by executing {@link #declareActive(Stage)}. + * Stages can be added by executing {@link #declareActive(AbstractStage)}. * * @since 2.0 */ @@ -37,7 +37,7 @@ final class ConfigurationContext { this.threadService = new ThreadService(configuration); } - Set<Stage> getThreadableStages() { + Set<AbstractStage> getThreadableStages() { return threadService.getThreadableStages(); } diff --git a/src/main/java/teetime/framework/Execution.java b/src/main/java/teetime/framework/Execution.java index a8c98167..d47f0bf0 100644 --- a/src/main/java/teetime/framework/Execution.java +++ b/src/main/java/teetime/framework/Execution.java @@ -76,8 +76,8 @@ public final class Execution<T extends Configuration> { // BETTER validate concurrently private void validateStages() { - final Set<Stage> threadableStages = configurationContext.getThreadableStages(); - for (Stage stage : threadableStages) { + final Set<AbstractStage> threadableStages = configurationContext.getThreadableStages(); + for (AbstractStage stage : threadableStages) { // // portConnectionValidator.validate(stage); // } diff --git a/src/main/java/teetime/framework/ExecutionInstantiation.java b/src/main/java/teetime/framework/ExecutionInstantiation.java index ddb6188c..2eedef9e 100644 --- a/src/main/java/teetime/framework/ExecutionInstantiation.java +++ b/src/main/java/teetime/framework/ExecutionInstantiation.java @@ -40,9 +40,9 @@ class ExecutionInstantiation { void instantiatePipes() { int color = DEFAULT_COLOR; - Map<Stage, Integer> colors = new HashMap<Stage, Integer>(); - Set<Stage> threadableStages = context.getThreadableStages(); - for (Stage threadableStage : threadableStages) { + Map<AbstractStage, Integer> colors = new HashMap<AbstractStage, Integer>(); + Set<AbstractStage> threadableStages = context.getThreadableStages(); + for (AbstractStage threadableStage : threadableStages) { color++; colors.put(threadableStage, color); @@ -53,18 +53,18 @@ class ExecutionInstantiation { private static class ThreadPainter { - private final Map<Stage, Integer> colors; + private final Map<AbstractStage, Integer> colors; private final int color; - private final Set<Stage> threadableStages; + private final Set<AbstractStage> threadableStages; - public ThreadPainter(final Map<Stage, Integer> colors, final int color, final Set<Stage> threadableStages) { + public ThreadPainter(final Map<AbstractStage, Integer> colors, final int color, final Set<AbstractStage> threadableStages) { super(); this.colors = colors; this.color = color; this.threadableStages = threadableStages; } - public int colorAndConnectStages(final Stage stage) { + public int colorAndConnectStages(final AbstractStage stage) { int createdConnections = 0; for (OutputPort<?> outputPort : stage.getOutputPorts()) { @@ -82,7 +82,7 @@ class ExecutionInstantiation { private int processPipe(final OutputPort outputPort, final InstantiationPipe pipe) { int numCreatedConnections; - Stage targetStage = pipe.getTargetPort().getOwningStage(); + AbstractStage targetStage = pipe.getTargetPort().getOwningStage(); int targetColor = colors.containsKey(targetStage) ? colors.get(targetStage) : DEFAULT_COLOR; if (threadableStages.contains(targetStage) && targetColor != color) { diff --git a/src/main/java/teetime/framework/ITraverserVisitor.java b/src/main/java/teetime/framework/ITraverserVisitor.java index f089a258..0b312a88 100644 --- a/src/main/java/teetime/framework/ITraverserVisitor.java +++ b/src/main/java/teetime/framework/ITraverserVisitor.java @@ -20,7 +20,7 @@ import teetime.framework.pipe.DummyPipe; public interface ITraverserVisitor { - VisitorBehavior visit(Stage stage); + VisitorBehavior visit(AbstractStage stage); VisitorBehavior visit(AbstractPort<?> port); diff --git a/src/main/java/teetime/framework/InputPort.java b/src/main/java/teetime/framework/InputPort.java index d4409bb1..ef67ab0d 100644 --- a/src/main/java/teetime/framework/InputPort.java +++ b/src/main/java/teetime/framework/InputPort.java @@ -26,7 +26,7 @@ package teetime.framework; */ public final class InputPort<T> extends AbstractPort<T> { - InputPort(final Class<T> type, final Stage owningStage, final String portName) { + InputPort(final Class<T> type, final AbstractStage owningStage, final String portName) { super(type, owningStage, portName); } diff --git a/src/main/java/teetime/framework/IntraStageCollector.java b/src/main/java/teetime/framework/IntraStageCollector.java index 3e95ba4d..f3f5f8d9 100644 --- a/src/main/java/teetime/framework/IntraStageCollector.java +++ b/src/main/java/teetime/framework/IntraStageCollector.java @@ -20,15 +20,15 @@ import teetime.framework.pipe.DummyPipe; public class IntraStageCollector implements ITraverserVisitor { - private final Stage startStage; + private final AbstractStage startStage; - public IntraStageCollector(final Stage startStage) { + public IntraStageCollector(final AbstractStage startStage) { super(); this.startStage = startStage; } @Override - public VisitorBehavior visit(final Stage stage) { + public VisitorBehavior visit(final AbstractStage stage) { if (stage == startStage || stage.getOwningThread() == null /* before execution */ || stage.getOwningThread() == startStage.getOwningThread() /* while execution */) { return VisitorBehavior.CONTINUE; diff --git a/src/main/java/teetime/framework/OutputPort.java b/src/main/java/teetime/framework/OutputPort.java index 0f2db809..041ef186 100644 --- a/src/main/java/teetime/framework/OutputPort.java +++ b/src/main/java/teetime/framework/OutputPort.java @@ -30,7 +30,7 @@ import teetime.framework.signal.TerminatingSignal; */ public final class OutputPort<T> extends AbstractPort<T> { - OutputPort(final Class<T> type, final Stage owningStage, final String portName) { + OutputPort(final Class<T> type, final AbstractStage owningStage, final String portName) { super(type, owningStage, portName); setPipe(DummyPipe.INSTANCE); } diff --git a/src/main/java/teetime/framework/RunnableConsumerStage.java b/src/main/java/teetime/framework/RunnableConsumerStage.java index 60645119..fa3e7ad7 100644 --- a/src/main/java/teetime/framework/RunnableConsumerStage.java +++ b/src/main/java/teetime/framework/RunnableConsumerStage.java @@ -26,7 +26,7 @@ final class RunnableConsumerStage extends AbstractRunnableStage { * @param stage * to execute within an own thread */ - public RunnableConsumerStage(final Stage stage) { + public RunnableConsumerStage(final AbstractStage stage) { super(stage); } @@ -47,7 +47,7 @@ final class RunnableConsumerStage extends AbstractRunnableStage { } } - private void checkForTerminationSignal(final Stage stage) { + private void checkForTerminationSignal(final AbstractStage stage) { // FIXME should getInputPorts() really be defined in Stage? for (InputPort<?> inputPort : stage.getInputPorts()) { if (inputPort.isClosed()) { diff --git a/src/main/java/teetime/framework/RunnableProducerStage.java b/src/main/java/teetime/framework/RunnableProducerStage.java index 76a9831b..90031ad3 100644 --- a/src/main/java/teetime/framework/RunnableProducerStage.java +++ b/src/main/java/teetime/framework/RunnableProducerStage.java @@ -24,7 +24,7 @@ public class RunnableProducerStage extends AbstractRunnableStage { private final Semaphore startSemaphore = new Semaphore(0); - public RunnableProducerStage(final Stage stage) { + public RunnableProducerStage(final AbstractStage stage) { super(stage); } diff --git a/src/main/java/teetime/framework/RuntimeServiceFacade.java b/src/main/java/teetime/framework/RuntimeServiceFacade.java index e3e34877..8d4b7503 100644 --- a/src/main/java/teetime/framework/RuntimeServiceFacade.java +++ b/src/main/java/teetime/framework/RuntimeServiceFacade.java @@ -23,7 +23,7 @@ public final class RuntimeServiceFacade { // singleton } - public void startWithinNewThread(final Stage previousStage, final Stage stage) { + public void startWithinNewThread(final AbstractStage previousStage, final AbstractStage stage) { previousStage.getOwningContext().getThreadService().startStageAtRuntime(stage); } diff --git a/src/main/java/teetime/framework/Stage.java b/src/main/java/teetime/framework/Stage.java deleted file mode 100644 index 4851210d..00000000 --- a/src/main/java/teetime/framework/Stage.java +++ /dev/null @@ -1,219 +0,0 @@ -/** - * Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://christianwulf.github.io/teetime) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package teetime.framework; - -import java.util.List; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import teetime.framework.exceptionHandling.AbstractExceptionListener; -import teetime.framework.exceptionHandling.AbstractExceptionListener.FurtherExecution; -import teetime.framework.exceptionHandling.TerminateException; -import teetime.framework.signal.ISignal; -import teetime.framework.validation.InvalidPortConnection; - -/** - * Represents a minimal Stage, with some pre-defined methods. - * Implemented stages need to adapt all abstract methods with own implementations. - */ -@SuppressWarnings("PMD.AbstractNaming") -public abstract class Stage { - - private static final ConcurrentMap<String, Integer> INSTANCES_COUNTER = new ConcurrentHashMap<String, Integer>(); - private static final NotEnoughInputException NOT_ENOUGH_INPUT_EXCEPTION = new NotEnoughInputException(); - - private final String id; - /** - * A unique logger instance per stage instance - */ - @SuppressWarnings("PMD.LoggerIsNotStaticFinal") - protected final Logger logger; - - protected AbstractExceptionListener exceptionListener; - - /** The owning thread of this stage if this stage is directly executed by a {@link AbstractRunnableStage}, <code>null</code> otherwise. */ - private Thread owningThread; - - private boolean isActive; - - private ConfigurationContext owningContext; - - ConfigurationContext getOwningContext() { - return owningContext; - } - - void setOwningContext(final ConfigurationContext owningContext) { - this.owningContext = owningContext; - } - - protected Stage() { - this.id = this.createId(); - this.logger = LoggerFactory.getLogger(this.getClass().getCanonicalName() + ":" + id); - } - - /** - * @return an identifier that is unique among all stage instances. It is especially unique among all instances of the same stage type. - */ - public String getId() { - return this.id; - } - - @Override - public String toString() { - return this.getClass().getName() + ": " + this.getId(); - } - - private String createId() { - String simpleName = this.getClass().getSimpleName(); - - Integer numInstances = INSTANCES_COUNTER.get(simpleName); - if (null == numInstances) { - numInstances = 0; - } - - String newId = simpleName + "-" + numInstances; - INSTANCES_COUNTER.put(simpleName, ++numInstances); - return newId; - } - - @SuppressWarnings("PMD.DefaultPackage") - static void clearInstanceCounters() { - INSTANCES_COUNTER.clear(); - } - - // public abstract Stage getParentStage(); - // - // public abstract void setParentStage(Stage parentStage, int index); - - protected final void returnNoElement() { - throw NOT_ENOUGH_INPUT_EXCEPTION; - } - - /** - * This should check, if the OutputPorts are connected correctly. This is needed to avoid NullPointerExceptions and other errors. - * - * @param invalidPortConnections - * <i>(Passed as parameter for performance reasons)</i> - */ - public abstract void validateOutputPorts(List<InvalidPortConnection> invalidPortConnections); - - protected final void executeStage() { - try { - this.execute(); - } catch (NotEnoughInputException e) { - throw e; - } catch (TerminateException e) { - throw e; - } catch (Exception e) { - final FurtherExecution furtherExecution = this.exceptionListener.reportException(e, this); - if (furtherExecution == FurtherExecution.TERMINATE) { - throw TerminateException.INSTANCE; - } - } - } - - protected abstract void execute(); - - protected abstract void onSignal(ISignal signal, InputPort<?> inputPort); - - protected abstract TerminationStrategy getTerminationStrategy(); - - protected abstract void terminate(); - - protected abstract void abort(); - - protected abstract boolean shouldBeTerminated(); - - public abstract StageState getCurrentState(); - - public Thread getOwningThread() { - return owningThread; - } - - void setOwningThread(final Thread owningThread) { - if (this.owningThread != null && this.owningThread != owningThread) { - // checks also for "crossing threads" - // throw new IllegalStateException("Attribute owningThread was set twice each with another thread"); - } - this.owningThread = owningThread; - } - - protected abstract List<InputPort<?>> getInputPorts(); - - protected abstract List<OutputPort<?>> getOutputPorts(); - - // events - - public abstract void onValidating(List<InvalidPortConnection> invalidPortConnections); - - /** - * Event that is triggered within the initialization phase of the analysis. - * It does not count to the execution time. - * - * @throws Exception - * an arbitrary exception if an error occurs during the initialization - */ - @SuppressWarnings("PMD.SignatureDeclareThrowsException") - public abstract void onStarting() throws Exception; - - @SuppressWarnings("PMD.SignatureDeclareThrowsException") - public abstract void onTerminating() throws Exception; - - protected final void setExceptionHandler(final AbstractExceptionListener exceptionHandler) { - this.exceptionListener = exceptionHandler; - } - - protected abstract void removeDynamicPort(OutputPort<?> outputPort); - - protected abstract void removeDynamicPort(InputPort<?> inputPort); - - public boolean isActive() { - return isActive; - } - - void setActive(final boolean isActive) { - this.isActive = isActive; - } - - /** - * Execute this method, to add a stage to the configuration, which should be executed in a own thread. - * - * @param stage - * A arbitrary stage, which will be added to the configuration and executed in a thread. - */ - public void declareActive() { - declareActive(getId()); - } - - /** - * Execute this method, to add a stage to the configuration, which should be executed in a own thread. - * - * @param stage - * A arbitrary stage, which will be added to the configuration and executed in a thread. - * @param threadName - * A string which can be used for debugging. - */ - public void declareActive(final String threadName) { - AbstractRunnableStage runnable = AbstractRunnableStage.create(this); - Thread newThread = new TeeTimeThread(runnable, threadName); - this.setOwningThread(newThread); - this.setActive(true); - } - -} diff --git a/src/main/java/teetime/framework/ThreadService.java b/src/main/java/teetime/framework/ThreadService.java index c7656c57..599b0146 100644 --- a/src/main/java/teetime/framework/ThreadService.java +++ b/src/main/java/teetime/framework/ThreadService.java @@ -44,7 +44,7 @@ class ThreadService extends AbstractService<ThreadService> { private final List<Thread> infiniteProducerThreads = Collections.synchronizedList(new LinkedList<Thread>()); private final SignalingCounter runnableCounter = new SignalingCounter(); - private final Set<Stage> threadableStages = Collections.synchronizedSet(new HashSet<Stage>()); + private final Set<AbstractStage> threadableStages = Collections.synchronizedSet(new HashSet<AbstractStage>()); private final Configuration configuration; @@ -54,23 +54,23 @@ class ThreadService extends AbstractService<ThreadService> { @Override void onInitialize() { - Stage startStage = configuration.getStartStage(); + AbstractStage startStage = configuration.getStartStage(); - Set<Stage> newThreadableStages = initialize(startStage); + Set<AbstractStage> newThreadableStages = initialize(startStage); startThreads(newThreadableStages); } - void startStageAtRuntime(final Stage newStage) { + void startStageAtRuntime(final AbstractStage newStage) { newStage.declareActive(); - Set<Stage> newThreadableStages = initialize(newStage); + Set<AbstractStage> newThreadableStages = initialize(newStage); startThreads(newThreadableStages); sendStartingSignal(newThreadableStages); } // extracted for runtime use - private Set<Stage> initialize(final Stage startStage) { + private Set<AbstractStage> initialize(final AbstractStage startStage) { if (startStage == null) { throw new IllegalStateException("The start stage may not be null."); } @@ -81,7 +81,7 @@ class ThreadService extends AbstractService<ThreadService> { Traverser traversor = new Traverser(stageCollector, Direction.BOTH); traversor.traverse(startStage); - Set<Stage> newThreadableStages = stageCollector.getThreadableStages(); + Set<AbstractStage> newThreadableStages = stageCollector.getThreadableStages(); threadableStages.addAll(newThreadableStages); if (threadableStages.isEmpty()) { @@ -98,14 +98,14 @@ class ThreadService extends AbstractService<ThreadService> { A4StageAttributeSetter attributeSetter = new A4StageAttributeSetter(configuration, newThreadableStages); attributeSetter.setAttributes(); - for (Stage stage : newThreadableStages) { + for (AbstractStage stage : newThreadableStages) { categorizeThreadableStage(stage); } return newThreadableStages; } - private void categorizeThreadableStage(final Stage stage) { + private void categorizeThreadableStage(final AbstractStage stage) { switch (stage.getTerminationStrategy()) { case BY_INTERRUPT: infiniteProducerThreads.add(stage.getOwningThread()); @@ -122,15 +122,15 @@ class ThreadService extends AbstractService<ThreadService> { } } - private void startThreads(final Set<Stage> threadableStages) { - for (Stage stage : threadableStages) { + private void startThreads(final Set<AbstractStage> threadableStages) { + for (AbstractStage stage : threadableStages) { stage.getOwningThread().start(); } } - private void sendStartingSignal(final Set<Stage> newThreadableStages) { + private void sendStartingSignal(final Set<AbstractStage> newThreadableStages) { synchronized (newThreadableStages) { - for (Stage stage : newThreadableStages) { + for (AbstractStage stage : newThreadableStages) { ((TeeTimeThread) stage.getOwningThread()).sendStartingSignal(); } } @@ -146,9 +146,9 @@ class ThreadService extends AbstractService<ThreadService> { abortStages(threadableStages); } - private void abortStages(final Set<Stage> currentTreadableStages) { + private void abortStages(final Set<AbstractStage> currentTreadableStages) { synchronized (currentTreadableStages) { - for (Stage stage : currentTreadableStages) { + for (AbstractStage stage : currentTreadableStages) { stage.abort(); } } @@ -193,7 +193,7 @@ class ThreadService extends AbstractService<ThreadService> { return exceptions; } - Set<Stage> getThreadableStages() { + Set<AbstractStage> getThreadableStages() { return threadableStages; } diff --git a/src/main/java/teetime/framework/Traverser.java b/src/main/java/teetime/framework/Traverser.java index 6bb62c64..a5dc4497 100644 --- a/src/main/java/teetime/framework/Traverser.java +++ b/src/main/java/teetime/framework/Traverser.java @@ -47,7 +47,7 @@ public class Traverser { CONTINUE, STOP; } - private final Set<Stage> visitedStages = new HashSet<Stage>(); + private final Set<AbstractStage> visitedStages = new HashSet<AbstractStage>(); private final ITraverserVisitor traverserVisitor; private final Direction direction; @@ -61,7 +61,7 @@ public class Traverser { this.direction = direction; } - public void traverse(final Stage stage) { + public void traverse(final AbstractStage stage) { VisitorBehavior behavior = traverserVisitor.visit(stage); if (behavior == VisitorBehavior.STOP || !visitedStages.add(stage)) { return; @@ -93,7 +93,7 @@ public class Traverser { } } - public Set<Stage> getVisitedStages() { + public Set<AbstractStage> getVisitedStages() { return visitedStages; } } diff --git a/src/main/java/teetime/framework/exceptionHandling/AbstractExceptionListener.java b/src/main/java/teetime/framework/exceptionHandling/AbstractExceptionListener.java index 81cc2f02..e317fe51 100644 --- a/src/main/java/teetime/framework/exceptionHandling/AbstractExceptionListener.java +++ b/src/main/java/teetime/framework/exceptionHandling/AbstractExceptionListener.java @@ -21,12 +21,12 @@ import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import teetime.framework.Stage; +import teetime.framework.AbstractStage; /** * Represents a minimalistic StageExceptionListener. * Listener which extend from this one, must a least implement this functionality. - * This abstract class provides a Logger {@link #logger} and the method {@link #onStageException(Exception, Stage)} which is called on every raised exception. + * This abstract class provides a Logger {@link #logger} and the method {@link #onStageException(Exception, AbstractStage)} which is called on every raised exception. */ public abstract class AbstractExceptionListener { @@ -57,13 +57,13 @@ public abstract class AbstractExceptionListener { * @return * true, if the thread should be terminated, false otherwise */ - public abstract FurtherExecution onStageException(Exception e, Stage throwingStage); + public abstract FurtherExecution onStageException(Exception e, AbstractStage throwingStage); public List<Exception> getLoggedExceptions() { return exceptionsList; } - public FurtherExecution reportException(final Exception e, final Stage stage) { + public FurtherExecution reportException(final Exception e, final AbstractStage stage) { if (logExceptions) { exceptionsList.add(e); } diff --git a/src/main/java/teetime/framework/exceptionHandling/IgnoringExceptionListener.java b/src/main/java/teetime/framework/exceptionHandling/IgnoringExceptionListener.java index 3eb54451..e87cf8a7 100644 --- a/src/main/java/teetime/framework/exceptionHandling/IgnoringExceptionListener.java +++ b/src/main/java/teetime/framework/exceptionHandling/IgnoringExceptionListener.java @@ -15,7 +15,7 @@ */ package teetime.framework.exceptionHandling; -import teetime.framework.Stage; +import teetime.framework.AbstractStage; class IgnoringExceptionListener extends AbstractExceptionListener { @@ -24,7 +24,7 @@ class IgnoringExceptionListener extends AbstractExceptionListener { } @Override - public FurtherExecution onStageException(final Exception e, final Stage throwingStage) { + public FurtherExecution onStageException(final Exception e, final AbstractStage throwingStage) { return FurtherExecution.CONTINUE; } } diff --git a/src/main/java/teetime/framework/exceptionHandling/LoggingExceptionListener.java b/src/main/java/teetime/framework/exceptionHandling/LoggingExceptionListener.java index 8fee623c..125fabb0 100644 --- a/src/main/java/teetime/framework/exceptionHandling/LoggingExceptionListener.java +++ b/src/main/java/teetime/framework/exceptionHandling/LoggingExceptionListener.java @@ -15,7 +15,7 @@ */ package teetime.framework.exceptionHandling; -import teetime.framework.Stage; +import teetime.framework.AbstractStage; class LoggingExceptionListener extends AbstractExceptionListener { @@ -24,7 +24,7 @@ class LoggingExceptionListener extends AbstractExceptionListener { } @Override - public FurtherExecution onStageException(final Exception e, final Stage throwingStage) { + public FurtherExecution onStageException(final Exception e, final AbstractStage throwingStage) { logger.warn("Exception occurred in " + throwingStage.getId(), e); return FurtherExecution.CONTINUE; } diff --git a/src/main/java/teetime/framework/exceptionHandling/TerminatingExceptionListener.java b/src/main/java/teetime/framework/exceptionHandling/TerminatingExceptionListener.java index 2490c0e4..d7aab2a2 100644 --- a/src/main/java/teetime/framework/exceptionHandling/TerminatingExceptionListener.java +++ b/src/main/java/teetime/framework/exceptionHandling/TerminatingExceptionListener.java @@ -18,7 +18,7 @@ package teetime.framework.exceptionHandling; import java.util.ArrayList; import java.util.List; -import teetime.framework.Stage; +import teetime.framework.AbstractStage; class TerminatingExceptionListener extends AbstractExceptionListener { @@ -29,7 +29,7 @@ class TerminatingExceptionListener extends AbstractExceptionListener { } @Override - public FurtherExecution onStageException(final Exception e, final Stage throwingStage) { + public FurtherExecution onStageException(final Exception e, final AbstractStage throwingStage) { if (logger.isWarnEnabled()) { logger.warn("Exception occurred in " + throwingStage.getId(), e); } diff --git a/src/main/java/teetime/framework/signal/ISignal.java b/src/main/java/teetime/framework/signal/ISignal.java index 80469661..f04d7816 100644 --- a/src/main/java/teetime/framework/signal/ISignal.java +++ b/src/main/java/teetime/framework/signal/ISignal.java @@ -19,11 +19,11 @@ import java.util.List; import java.util.Set; import teetime.framework.InputPort; -import teetime.framework.Stage; +import teetime.framework.AbstractStage; public interface ISignal { - void trigger(Stage stage) throws Exception; + void trigger(AbstractStage stage) throws Exception; // Only used by the merger so far boolean mayBeTriggered(Set<InputPort<?>> receivedInputPorts, List<InputPort<?>> allInputPorts); diff --git a/src/main/java/teetime/framework/signal/StartingSignal.java b/src/main/java/teetime/framework/signal/StartingSignal.java index 38ad2ce4..3f884982 100644 --- a/src/main/java/teetime/framework/signal/StartingSignal.java +++ b/src/main/java/teetime/framework/signal/StartingSignal.java @@ -19,12 +19,12 @@ import java.util.List; import java.util.Set; import teetime.framework.InputPort; -import teetime.framework.Stage; +import teetime.framework.AbstractStage; public final class StartingSignal implements ISignal { @Override - public void trigger(final Stage stage) throws Exception { + public void trigger(final AbstractStage stage) throws Exception { stage.onStarting(); } diff --git a/src/main/java/teetime/framework/signal/TerminatingSignal.java b/src/main/java/teetime/framework/signal/TerminatingSignal.java index 211c57bb..94110736 100644 --- a/src/main/java/teetime/framework/signal/TerminatingSignal.java +++ b/src/main/java/teetime/framework/signal/TerminatingSignal.java @@ -19,12 +19,12 @@ import java.util.List; import java.util.Set; import teetime.framework.InputPort; -import teetime.framework.Stage; +import teetime.framework.AbstractStage; public final class TerminatingSignal implements ISignal { @Override - public void trigger(final Stage stage) throws Exception { + public void trigger(final AbstractStage stage) throws Exception { stage.onTerminating(); } diff --git a/src/main/java/teetime/framework/signal/ValidatingSignal.java b/src/main/java/teetime/framework/signal/ValidatingSignal.java index f85b4ada..5e38b9d7 100644 --- a/src/main/java/teetime/framework/signal/ValidatingSignal.java +++ b/src/main/java/teetime/framework/signal/ValidatingSignal.java @@ -20,7 +20,7 @@ import java.util.List; import java.util.Set; import teetime.framework.InputPort; -import teetime.framework.Stage; +import teetime.framework.AbstractStage; import teetime.framework.validation.InvalidPortConnection; public final class ValidatingSignal implements ISignal { @@ -28,7 +28,7 @@ public final class ValidatingSignal implements ISignal { private final List<InvalidPortConnection> invalidPortConnections = new LinkedList<InvalidPortConnection>(); @Override - public void trigger(final Stage stage) { + public void trigger(final AbstractStage stage) { stage.onValidating(this.invalidPortConnections); } diff --git a/src/main/java/teetime/framework/test/InputHolder.java b/src/main/java/teetime/framework/test/InputHolder.java index 29bce044..9d58aae1 100644 --- a/src/main/java/teetime/framework/test/InputHolder.java +++ b/src/main/java/teetime/framework/test/InputHolder.java @@ -16,18 +16,18 @@ package teetime.framework.test; import teetime.framework.InputPort; -import teetime.framework.Stage; +import teetime.framework.AbstractStage; public final class InputHolder<I> { private final StageTester stageTester; - private final Stage stage; + private final AbstractStage stage; private final Iterable<Object> input; private InputPort<Object> port; @SuppressWarnings("unchecked") - InputHolder(final StageTester stageTester, final Stage stage, final Iterable<I> input) { + InputHolder(final StageTester stageTester, final AbstractStage stage, final Iterable<I> input) { this.stageTester = stageTester; this.stage = stage; this.input = (Iterable<Object>) input; diff --git a/src/main/java/teetime/framework/test/StageTester.java b/src/main/java/teetime/framework/test/StageTester.java index 10313c67..d0d23b14 100644 --- a/src/main/java/teetime/framework/test/StageTester.java +++ b/src/main/java/teetime/framework/test/StageTester.java @@ -22,7 +22,7 @@ import java.util.List; import teetime.framework.Configuration; import teetime.framework.Execution; import teetime.framework.ExecutionException; -import teetime.framework.Stage; +import teetime.framework.AbstractStage; import teetime.framework.StageState; import teetime.stage.CollectorSink; import teetime.stage.InitialElementProducer; @@ -36,13 +36,13 @@ public final class StageTester { private final List<InputHolder<?>> inputHolders = new ArrayList<InputHolder<?>>(); private final List<OutputHolder<?>> outputHolders = new ArrayList<OutputHolder<?>>(); - private final Stage stage; + private final AbstractStage stage; - private StageTester(final Stage stage) { + private StageTester(final AbstractStage stage) { this.stage = stage; } - public static StageTester test(final Stage stage) { + public static StageTester test(final AbstractStage stage) { if (stage.getCurrentState() != StageState.CREATED) { throw new AssertionError("This stage has already been tested in this test method. Move this test into a new test method."); } @@ -85,7 +85,7 @@ public final class StageTester { private final class TestConfiguration extends Configuration { - public TestConfiguration(final List<InputHolder<?>> inputHolders, final Stage stage, final List<OutputHolder<?>> outputHolders) { + public TestConfiguration(final List<InputHolder<?>> inputHolders, final AbstractStage stage, final List<OutputHolder<?>> outputHolders) { for (InputHolder<?> inputHolder : inputHolders) { final InitialElementProducer<Object> producer = new InitialElementProducer<Object>(inputHolder.getInput()); connectPorts(producer.getOutputPort(), inputHolder.getPort()); diff --git a/src/main/java/teetime/stage/io/EveryXthPrinter.java b/src/main/java/teetime/stage/io/EveryXthPrinter.java index 4692ea3c..9c82547e 100644 --- a/src/main/java/teetime/stage/io/EveryXthPrinter.java +++ b/src/main/java/teetime/stage/io/EveryXthPrinter.java @@ -21,7 +21,7 @@ import java.util.List; import teetime.framework.AbstractCompositeStage; import teetime.framework.InputPort; import teetime.framework.OutputPort; -import teetime.framework.Stage; +import teetime.framework.AbstractStage; import teetime.stage.EveryXthStage; import teetime.stage.basic.distributor.Distributor; import teetime.stage.basic.distributor.strategy.CopyByReferenceStrategy; @@ -29,7 +29,7 @@ import teetime.stage.basic.distributor.strategy.CopyByReferenceStrategy; public final class EveryXthPrinter<T> extends AbstractCompositeStage { private final Distributor<T> distributor; - private final List<Stage> lastStages = new ArrayList<Stage>(); + private final List<AbstractStage> lastStages = new ArrayList<AbstractStage>(); public EveryXthPrinter(final int threshold) { distributor = new Distributor<T>(new CopyByReferenceStrategy()); @@ -50,7 +50,7 @@ public final class EveryXthPrinter<T> extends AbstractCompositeStage { return distributor.getNewOutputPort(); } - public Stage getFirstStage() { + public AbstractStage getFirstStage() { return distributor; } diff --git a/src/main/java/teetime/util/framework/port/PortAction.java b/src/main/java/teetime/util/framework/port/PortAction.java index 52188070..aec07b57 100644 --- a/src/main/java/teetime/util/framework/port/PortAction.java +++ b/src/main/java/teetime/util/framework/port/PortAction.java @@ -15,9 +15,9 @@ */ package teetime.util.framework.port; -import teetime.framework.Stage; +import teetime.framework.AbstractStage; -public interface PortAction<T extends Stage> { +public interface PortAction<T extends AbstractStage> { public abstract void execute(final T stage); diff --git a/src/main/java/teetime/util/framework/port/PortActionHelper.java b/src/main/java/teetime/util/framework/port/PortActionHelper.java index c87aebc6..de509c6c 100644 --- a/src/main/java/teetime/util/framework/port/PortActionHelper.java +++ b/src/main/java/teetime/util/framework/port/PortActionHelper.java @@ -23,7 +23,7 @@ import org.jctools.queues.spec.ConcurrentQueueSpec; import org.jctools.queues.spec.Ordering; import org.jctools.queues.spec.Preference; -import teetime.framework.Stage; +import teetime.framework.AbstractStage; import teetime.util.framework.concurrent.queue.PCBlockingQueue; import teetime.util.framework.concurrent.queue.putstrategy.PutStrategy; import teetime.util.framework.concurrent.queue.putstrategy.YieldPutStrategy; @@ -44,14 +44,14 @@ public final class PortActionHelper { return portActions; } - public static <T extends Stage> void checkForPendingPortActionRequest(final T stage, final BlockingQueue<PortAction<T>> portActions) { + public static <T extends AbstractStage> void checkForPendingPortActionRequest(final T stage, final BlockingQueue<PortAction<T>> portActions) { PortAction<T> dynamicPortAction = portActions.poll(); if (null != dynamicPortAction) { dynamicPortAction.execute(stage); } } - public static <T extends Stage> void checkBlockingForPendingPortActionRequest(final T stage, final BlockingQueue<PortAction<T>> portActions) + public static <T extends AbstractStage> void checkBlockingForPendingPortActionRequest(final T stage, final BlockingQueue<PortAction<T>> portActions) throws InterruptedException { PortAction<T> dynamicPortAction = portActions.take(); dynamicPortAction.execute(stage); diff --git a/src/test/java/teetime/framework/StageTest.java b/src/test/java/teetime/framework/AbstractStageTest.java similarity index 95% rename from src/test/java/teetime/framework/StageTest.java rename to src/test/java/teetime/framework/AbstractStageTest.java index 3081b346..ed883704 100644 --- a/src/test/java/teetime/framework/StageTest.java +++ b/src/test/java/teetime/framework/AbstractStageTest.java @@ -29,11 +29,11 @@ import teetime.stage.Cache; import teetime.stage.Counter; import teetime.stage.InitialElementProducer; -public class StageTest { +public class AbstractStageTest { @Test public void testId() { - Stage.clearInstanceCounters(); + AbstractStage.clearInstanceCounters(); Counter<Object> counter0 = new Counter<Object>(); Counter<Object> counter1 = new Counter<Object>(); @@ -77,8 +77,6 @@ public class StageTest { private final long delayInMs; - public boolean finished; - public DelayAndTerminate(final long delayInMs) { super(); this.delayInMs = delayInMs; @@ -89,8 +87,8 @@ public class StageTest { try { Thread.sleep(delayInMs); } catch (InterruptedException e) { + throw new IllegalStateException(e); } - finished = true; } } diff --git a/src/test/java/teetime/framework/TraverserTest.java b/src/test/java/teetime/framework/TraverserTest.java index b4f3c0de..d8d74d94 100644 --- a/src/test/java/teetime/framework/TraverserTest.java +++ b/src/test/java/teetime/framework/TraverserTest.java @@ -45,7 +45,7 @@ public class TraverserTest { Traverser traversor = new Traverser(new IntraStageCollector(tc.init)); traversor.traverse(tc.init); - Set<Stage> comparingStages = new HashSet<Stage>(); + Set<AbstractStage> comparingStages = new HashSet<AbstractStage>(); comparingStages.add(tc.init); comparingStages.add(tc.f2b); comparingStages.add(tc.distributor); diff --git a/src/test/java/teetime/framework/WaitStrategyConfiguration.java b/src/test/java/teetime/framework/WaitStrategyConfiguration.java index 507dc935..9dbe7e68 100644 --- a/src/test/java/teetime/framework/WaitStrategyConfiguration.java +++ b/src/test/java/teetime/framework/WaitStrategyConfiguration.java @@ -28,10 +28,10 @@ class WaitStrategyConfiguration extends Configuration { public WaitStrategyConfiguration(final long initialDelayInMs, final Object... elements) { - Stage producer = buildProducer(elements); + AbstractStage producer = buildProducer(elements); producer.declareActive(); - Stage consumer = buildConsumer(delay); + AbstractStage consumer = buildConsumer(delay); consumer.declareActive(); Clock clock = buildClock(initialDelayInMs, delay); @@ -47,7 +47,7 @@ class WaitStrategyConfiguration extends Configuration { return clock; } - private Stage buildProducer(final Object... elements) { + private AbstractStage buildProducer(final Object... elements) { InitialElementProducer<Object> initialElementProducer = new InitialElementProducer<Object>(elements); delay = new Delay<Object>(); diff --git a/src/test/java/teetime/framework/YieldStrategyConfiguration.java b/src/test/java/teetime/framework/YieldStrategyConfiguration.java index a44f45bf..1387bc41 100644 --- a/src/test/java/teetime/framework/YieldStrategyConfiguration.java +++ b/src/test/java/teetime/framework/YieldStrategyConfiguration.java @@ -28,7 +28,7 @@ class YieldStrategyConfiguration extends Configuration { InitialElementProducer<Object> producer = buildProducer(elements); producer.declareActive(); - Stage consumer = buildConsumer(producer); + AbstractStage consumer = buildConsumer(producer); consumer.declareActive(); } diff --git a/src/test/java/teetime/framework/exceptionHandling/TestListener.java b/src/test/java/teetime/framework/exceptionHandling/TestListener.java index 06b32bcd..79f3dd54 100644 --- a/src/test/java/teetime/framework/exceptionHandling/TestListener.java +++ b/src/test/java/teetime/framework/exceptionHandling/TestListener.java @@ -15,7 +15,7 @@ */ package teetime.framework.exceptionHandling; -import teetime.framework.Stage; +import teetime.framework.AbstractStage; public class TestListener extends AbstractExceptionListener { @@ -26,7 +26,7 @@ public class TestListener extends AbstractExceptionListener { private int numExceptionsInvoked; @Override - public FurtherExecution onStageException(final Exception e, final Stage throwingStage) { + public FurtherExecution onStageException(final Exception e, final AbstractStage throwingStage) { numExceptionsInvoked++; if (numExceptionsInvoked == 2) { return FurtherExecution.TERMINATE; diff --git a/src/test/java/teetime/stage/basic/distributor/dynamic/DynamicDistributorTest.java b/src/test/java/teetime/stage/basic/distributor/dynamic/DynamicDistributorTest.java index f8df4b07..19db569d 100644 --- a/src/test/java/teetime/stage/basic/distributor/dynamic/DynamicDistributorTest.java +++ b/src/test/java/teetime/stage/basic/distributor/dynamic/DynamicDistributorTest.java @@ -28,7 +28,7 @@ import org.junit.Test; import teetime.framework.Configuration; import teetime.framework.Execution; import teetime.framework.OutputPort; -import teetime.framework.Stage; +import teetime.framework.AbstractStage; import teetime.stage.CollectorSink; import teetime.stage.InitialElementProducer; import teetime.util.framework.port.PortAction; @@ -117,7 +117,7 @@ public class DynamicDistributorTest { } private void assertValuesForIndex(final PortAction<DynamicDistributor<Integer>> ia, final List<Integer> values) { - Stage stage = ((CreatePortAction<Integer>) ia).getInputPort().getOwningStage(); + AbstractStage stage = ((CreatePortAction<Integer>) ia).getInputPort().getOwningStage(); @SuppressWarnings("unchecked") CollectorSink<Integer> collectorSink = (CollectorSink<Integer>) stage; -- GitLab