diff --git a/src/main/java/teetime/framework/A1ThreadableStageCollector.java b/src/main/java/teetime/framework/A1ThreadableStageCollector.java index 387ad88f18ec95014136bfa6aedf145d1e11aadf..0ec14de386c5223e881d7551fab6d494b13785f0 100644 --- a/src/main/java/teetime/framework/A1ThreadableStageCollector.java +++ b/src/main/java/teetime/framework/A1ThreadableStageCollector.java @@ -26,14 +26,14 @@ import teetime.framework.pipe.DummyPipe; */ class A1ThreadableStageCollector implements ITraverserVisitor { - private final Set<Stage> threadableStages = new HashSet<Stage>(); + private final Set<AbstractStage> threadableStages = new HashSet<AbstractStage>(); - public Set<Stage> getThreadableStages() { + public Set<AbstractStage> getThreadableStages() { return threadableStages; } @Override - public VisitorBehavior visit(final Stage stage) { + public VisitorBehavior visit(final AbstractStage stage) { if (stage.getOwningThread() != null && !threadableStages.contains(stage) && stage.getCurrentState() == StageState.CREATED) { threadableStages.add(stage); } diff --git a/src/main/java/teetime/framework/A2InvalidThreadAssignmentCheck.java b/src/main/java/teetime/framework/A2InvalidThreadAssignmentCheck.java index fd4712307644ecd9e4305f31f3b491961e9ad925..1a36cbd329924056e6c3016996e15585fc5b4388 100644 --- a/src/main/java/teetime/framework/A2InvalidThreadAssignmentCheck.java +++ b/src/main/java/teetime/framework/A2InvalidThreadAssignmentCheck.java @@ -31,17 +31,17 @@ public class A2InvalidThreadAssignmentCheck { private static final int DEFAULT_COLOR = 0; - private final Set<Stage> threadableStages; + private final Set<AbstractStage> threadableStages; - public A2InvalidThreadAssignmentCheck(final Set<Stage> threadableStages) { + public A2InvalidThreadAssignmentCheck(final Set<AbstractStage> threadableStages) { this.threadableStages = threadableStages; } public void check() { int color = DEFAULT_COLOR; - ObjectIntMap<Stage> colors = new ObjectIntHashMap<Stage>(); + ObjectIntMap<AbstractStage> colors = new ObjectIntHashMap<AbstractStage>(); - for (Stage threadableStage : threadableStages) { + for (AbstractStage threadableStage : threadableStages) { color++; colors.put(threadableStage, color); @@ -53,11 +53,11 @@ public class A2InvalidThreadAssignmentCheck { private static class ThreadPainter implements ITraverserVisitor { - private final ObjectIntMap<Stage> colors; + private final ObjectIntMap<AbstractStage> colors; private final int color; - private final Set<Stage> threadableStages; + private final Set<AbstractStage> threadableStages; - public ThreadPainter(final ObjectIntMap<Stage> colors, final int color, final Set<Stage> threadableStages) { + public ThreadPainter(final ObjectIntMap<AbstractStage> colors, final int color, final Set<AbstractStage> threadableStages) { super(); this.colors = colors; this.color = color; @@ -65,7 +65,7 @@ public class A2InvalidThreadAssignmentCheck { } @Override - public VisitorBehavior visit(final Stage stage) { + public VisitorBehavior visit(final AbstractStage stage) { return VisitorBehavior.CONTINUE; } @@ -73,7 +73,7 @@ public class A2InvalidThreadAssignmentCheck { public VisitorBehavior visit(final AbstractPort<?> port) { IPipe<?> pipe = port.getPipe(); // FIXME line below requires FORWARD. should be independent of the used direction - Stage targetStage = pipe.getTargetPort().getOwningStage(); + AbstractStage targetStage = pipe.getTargetPort().getOwningStage(); int targetColor = colors.containsKey(targetStage) ? colors.get(targetStage) : DEFAULT_COLOR; diff --git a/src/main/java/teetime/framework/A3PipeInstantiation.java b/src/main/java/teetime/framework/A3PipeInstantiation.java index 72afb486ed82d9dc7d9bcaa8961aa6ded9d7a581..f59a75e7bd834f312fa197f5e0aff8f7e4229d58 100644 --- a/src/main/java/teetime/framework/A3PipeInstantiation.java +++ b/src/main/java/teetime/framework/A3PipeInstantiation.java @@ -44,7 +44,7 @@ class A3PipeInstantiation implements ITraverserVisitor { private final Set<IPipe<?>> visitedPipes = new HashSet<IPipe<?>>(); @Override - public VisitorBehavior visit(final Stage stage) { + public VisitorBehavior visit(final AbstractStage stage) { return VisitorBehavior.CONTINUE; } diff --git a/src/main/java/teetime/framework/A4StageAttributeSetter.java b/src/main/java/teetime/framework/A4StageAttributeSetter.java index 98373e41c8f687a6644a7fdff87a77377a498240..99f45eb9c8d9b3f01cab136022ba60ca0942bd0d 100644 --- a/src/main/java/teetime/framework/A4StageAttributeSetter.java +++ b/src/main/java/teetime/framework/A4StageAttributeSetter.java @@ -23,21 +23,21 @@ import java.util.Set; class A4StageAttributeSetter { private final Configuration configuration; - private final Set<Stage> threadableStages; + private final Set<AbstractStage> threadableStages; - public A4StageAttributeSetter(final Configuration configuration, final Set<Stage> threadableStages) { + public A4StageAttributeSetter(final Configuration configuration, final Set<AbstractStage> threadableStages) { super(); this.configuration = configuration; this.threadableStages = threadableStages; } public void setAttributes() { - for (Stage threadableStage : threadableStages) { + for (AbstractStage threadableStage : threadableStages) { setAttributes(threadableStage); } } - private void setAttributes(final Stage threadableStage) { + private void setAttributes(final AbstractStage threadableStage) { IntraStageCollector visitor = new IntraStageCollector(threadableStage); Traverser traverser = new Traverser(visitor); traverser.traverse(threadableStage); @@ -45,12 +45,12 @@ class A4StageAttributeSetter { setAttributes(threadableStage, traverser.getVisitedStages()); } - private void setAttributes(final Stage threadableStage, final Set<Stage> intraStages) { + private void setAttributes(final AbstractStage threadableStage, final Set<AbstractStage> intraStages) { threadableStage.setExceptionHandler(configuration.getFactory().createInstance(threadableStage.getOwningThread())); // threadableStage.setOwningThread(owningThread); threadableStage.setOwningContext(configuration.getContext()); - for (Stage stage : intraStages) { + for (AbstractStage stage : intraStages) { stage.setExceptionHandler(threadableStage.exceptionListener); stage.setOwningThread(threadableStage.getOwningThread()); stage.setOwningContext(threadableStage.getOwningContext()); diff --git a/src/main/java/teetime/framework/AbstractPipe.java b/src/main/java/teetime/framework/AbstractPipe.java index 55c1f55b3a7cc9cdff05aad45b54c49627e5d8d9..1b0f1546fc6cd301d78d16c2bd1297e0e04eaf8d 100644 --- a/src/main/java/teetime/framework/AbstractPipe.java +++ b/src/main/java/teetime/framework/AbstractPipe.java @@ -26,7 +26,7 @@ public abstract class AbstractPipe<T> implements IPipe<T> { * this.getPipe().getTargetPort().getOwningStage() * </pre> */ - protected final Stage cachedTargetStage; + protected final AbstractStage cachedTargetStage; private final OutputPort<? extends T> sourcePort; private final InputPort<T> targetPort; diff --git a/src/main/java/teetime/framework/AbstractPort.java b/src/main/java/teetime/framework/AbstractPort.java index 50693ae3cb70ba65ab3e50cc2a342006794b036a..ce28991ea55f701b9ff0b852182e39490de04a7d 100644 --- a/src/main/java/teetime/framework/AbstractPort.java +++ b/src/main/java/teetime/framework/AbstractPort.java @@ -27,10 +27,10 @@ public abstract class AbstractPort<T> { * </p> */ private final Class<T> type; - private final Stage owningStage; + private final AbstractStage owningStage; private final String name; - protected AbstractPort(final Class<T> type, final Stage owningStage, final String name) { + protected AbstractPort(final Class<T> type, final AbstractStage owningStage, final String name) { super(); this.type = type; this.owningStage = owningStage; @@ -41,7 +41,7 @@ public abstract class AbstractPort<T> { return this.type; } - public Stage getOwningStage() { + public AbstractStage getOwningStage() { return owningStage; } diff --git a/src/main/java/teetime/framework/AbstractRunnableStage.java b/src/main/java/teetime/framework/AbstractRunnableStage.java index d40c278d20d90b4d3386dfcf7ad7a8895aa905b2..99cc6d0947e3b829ed0dea20182ae5f1253b38ed 100644 --- a/src/main/java/teetime/framework/AbstractRunnableStage.java +++ b/src/main/java/teetime/framework/AbstractRunnableStage.java @@ -31,13 +31,13 @@ abstract class AbstractRunnableStage implements Runnable { private final StopWatch stopWatch = new StopWatch(); - protected final Stage stage; + protected final AbstractStage stage; @SuppressWarnings("PMD.LoggerIsNotStaticFinal") protected final Logger logger; - public static final Map<Stage, Long> durationsInNs = Collections.synchronizedMap(new LinkedHashMap<Stage, Long>()); + public static final Map<AbstractStage, Long> durationsInNs = Collections.synchronizedMap(new LinkedHashMap<AbstractStage, Long>()); - protected AbstractRunnableStage(final Stage stage) { + protected AbstractRunnableStage(final AbstractStage stage) { if (stage == null) { throw new IllegalArgumentException("Argument stage may not be null"); } @@ -48,7 +48,7 @@ abstract class AbstractRunnableStage implements Runnable { @Override public final void run() { - final Stage stage = this.stage; // should prevent the stage to be reloaded after a volatile read + final AbstractStage stage = this.stage; // should prevent the stage to be reloaded after a volatile read final Logger logger = this.logger; // should prevent the logger to be reloaded after a volatile read logger.debug("Executing runnable stage..."); @@ -96,7 +96,7 @@ abstract class AbstractRunnableStage implements Runnable { protected abstract void afterStageExecution(); - static AbstractRunnableStage create(final Stage stage) { + static AbstractRunnableStage create(final AbstractStage stage) { if (stage.getInputPorts().size() > 0) { return new RunnableConsumerStage(stage); } else { diff --git a/src/main/java/teetime/framework/AbstractStage.java b/src/main/java/teetime/framework/AbstractStage.java index 447b243c8a0d8d1fd709304281de9d62e56478e3..3891b1242d0f86349dc2f8c58f480c1a3418f6bd 100644 --- a/src/main/java/teetime/framework/AbstractStage.java +++ b/src/main/java/teetime/framework/AbstractStage.java @@ -20,14 +20,165 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import teetime.framework.exceptionHandling.AbstractExceptionListener; +import teetime.framework.exceptionHandling.AbstractExceptionListener.FurtherExecution; +import teetime.framework.exceptionHandling.TerminateException; import teetime.framework.pipe.IPipe; import teetime.framework.signal.ISignal; import teetime.framework.validation.InvalidPortConnection; import teetime.util.framework.port.PortList; import teetime.util.framework.port.PortRemovedListener; -public abstract class AbstractStage extends Stage { +/** + * Represents a minimal Stage, with some pre-defined methods. + * Implemented stages need to adapt all abstract methods with own implementations. + */ +@SuppressWarnings("PMD.AbstractNaming") +public abstract class AbstractStage { + + private static final ConcurrentMap<String, Integer> INSTANCES_COUNTER = new ConcurrentHashMap<String, Integer>(); + private static final NotEnoughInputException NOT_ENOUGH_INPUT_EXCEPTION = new NotEnoughInputException(); + + private final String id; + /** + * A unique logger instance per stage instance + */ + @SuppressWarnings("PMD.LoggerIsNotStaticFinal") + protected final Logger logger; + + protected AbstractExceptionListener exceptionListener; + + /** The owning thread of this stage if this stage is directly executed by a {@link AbstractRunnableStage}, <code>null</code> otherwise. */ + private Thread owningThread; + + private boolean isActive; + + private ConfigurationContext owningContext; + + ConfigurationContext getOwningContext() { + return owningContext; + } + + void setOwningContext(final ConfigurationContext owningContext) { + this.owningContext = owningContext; + } + + protected AbstractStage() { + this.id = this.createId(); + this.logger = LoggerFactory.getLogger(this.getClass().getCanonicalName() + ":" + id); + } + + /** + * @return an identifier that is unique among all stage instances. It is especially unique among all instances of the same stage type. + */ + public String getId() { + return this.id; + } + + @Override + public String toString() { + return this.getClass().getName() + ": " + this.getId(); + } + + private String createId() { + String simpleName = this.getClass().getSimpleName(); + + Integer numInstances = INSTANCES_COUNTER.get(simpleName); + if (null == numInstances) { + numInstances = 0; + } + + String newId = simpleName + "-" + numInstances; + INSTANCES_COUNTER.put(simpleName, ++numInstances); + return newId; + } + + @SuppressWarnings("PMD.DefaultPackage") + static void clearInstanceCounters() { + INSTANCES_COUNTER.clear(); + } + + // public abstract Stage getParentStage(); + // + // public abstract void setParentStage(Stage parentStage, int index); + + protected final void returnNoElement() { + throw NOT_ENOUGH_INPUT_EXCEPTION; + } + + protected final void executeStage() { + try { + this.execute(); + } catch (NotEnoughInputException e) { + throw e; + } catch (TerminateException e) { + throw e; + } catch (Exception e) { + final FurtherExecution furtherExecution = this.exceptionListener.reportException(e, this); + if (furtherExecution == FurtherExecution.TERMINATE) { + throw TerminateException.INSTANCE; + } + } + } + + protected abstract void execute(); + + public Thread getOwningThread() { + return owningThread; + } + + void setOwningThread(final Thread owningThread) { + if (this.owningThread != null && this.owningThread != owningThread) { + // checks also for "crossing threads" + // throw new IllegalStateException("Attribute owningThread was set twice each with another thread"); + } + this.owningThread = owningThread; + } + + // events + + protected final void setExceptionHandler(final AbstractExceptionListener exceptionHandler) { + this.exceptionListener = exceptionHandler; + } + + public boolean isActive() { + return isActive; + } + + void setActive(final boolean isActive) { + this.isActive = isActive; + } + + /** + * Execute this method, to add a stage to the configuration, which should be executed in a own thread. + * + * @param stage + * A arbitrary stage, which will be added to the configuration and executed in a thread. + */ + public void declareActive() { + declareActive(getId()); + } + + /** + * Execute this method, to add a stage to the configuration, which should be executed in a own thread. + * + * @param stage + * A arbitrary stage, which will be added to the configuration and executed in a thread. + * @param threadName + * A string which can be used for debugging. + */ + public void declareActive(final String threadName) { + AbstractRunnableStage runnable = AbstractRunnableStage.create(this); + Thread newThread = new TeeTimeThread(runnable, threadName); + this.setOwningThread(newThread); + this.setActive(true); + } private final Map<Class<? extends ISignal>, Set<InputPort<?>>> signalMap = new HashMap<Class<? extends ISignal>, Set<InputPort<?>>>(); private final Set<Class<? extends ISignal>> triggeredSignalTypes = new HashSet<Class<? extends ISignal>>(); @@ -36,17 +187,14 @@ public abstract class AbstractStage extends Stage { private final PortList<OutputPort<?>> outputPorts = new PortList<OutputPort<?>>(); private volatile StageState currentState = StageState.CREATED; - @Override protected List<InputPort<?>> getInputPorts() { return inputPorts.getOpenedPorts(); // TODO consider to publish a read-only version } - @Override protected List<OutputPort<?>> getOutputPorts() { return outputPorts.getOpenedPorts(); // TODO consider to publish a read-only version } - @Override public StageState getCurrentState() { return currentState; } @@ -55,7 +203,6 @@ public abstract class AbstractStage extends Stage { * May not be invoked outside of IPipe implementations */ @SuppressWarnings("PMD.DataflowAnomalyAnalysis") - @Override public void onSignal(final ISignal signal, final InputPort<?> inputPort) { Class<? extends ISignal> signalClass = signal.getClass(); @@ -113,20 +260,24 @@ public abstract class AbstractStage extends Stage { } } - @Override public void onValidating(final List<InvalidPortConnection> invalidPortConnections) { this.validateOutputPorts(invalidPortConnections); changeState(StageState.VALIDATED); } + /** + * Event that is triggered within the initialization phase of the analysis. + * It does not count to the execution time. + * + * @throws Exception + * an arbitrary exception if an error occurs during the initialization + */ @SuppressWarnings("PMD.SignatureDeclareThrowsException") - @Override public void onStarting() throws Exception { changeState(StageState.STARTED); } @SuppressWarnings("PMD.SignatureDeclareThrowsException") - @Override public void onTerminating() throws Exception { changeState(StageState.TERMINATED); } @@ -255,8 +406,13 @@ public abstract class AbstractStage extends Stage { return outputPort; } + /** + * This should check, if the OutputPorts are connected correctly. This is needed to avoid NullPointerExceptions and other errors. + * + * @param invalidPortConnections + * <i>(Passed as parameter for performance reasons)</i> + */ @SuppressWarnings("PMD.DataflowAnomalyAnalysis") - @Override public void validateOutputPorts(final List<InvalidPortConnection> invalidPortConnections) { for (OutputPort<?> outputPort : outputPorts.getOpenedPorts()) { final IPipe<?> pipe = outputPort.getPipe(); @@ -270,23 +426,19 @@ public abstract class AbstractStage extends Stage { } } - @Override protected void terminate() { changeState(StageState.TERMINATING); } - @Override protected void abort() { this.terminate(); this.getOwningThread().interrupt(); }; - @Override protected boolean shouldBeTerminated() { return (getCurrentState() == StageState.TERMINATING); } - @Override protected TerminationStrategy getTerminationStrategy() { return TerminationStrategy.BY_SIGNAL; } @@ -303,7 +455,6 @@ public abstract class AbstractStage extends Stage { // return inputPort; // } - @Override protected void removeDynamicPort(final OutputPort<?> outputPort) { outputPorts.remove(outputPort); // TODO update setIndex IF it is still used } @@ -312,7 +463,6 @@ public abstract class AbstractStage extends Stage { outputPorts.addPortRemovedListener(outputPortRemovedListener); } - @Override protected void removeDynamicPort(final InputPort<?> inputPort) { inputPorts.remove(inputPort); // TODO update setIndex IF it is still used } diff --git a/src/main/java/teetime/framework/Configuration.java b/src/main/java/teetime/framework/Configuration.java index de6ba150e10440c8800b09b368b0e3ce3f138da0..2812bce537b81ec7f83393fce2af2e471aaad727 100644 --- a/src/main/java/teetime/framework/Configuration.java +++ b/src/main/java/teetime/framework/Configuration.java @@ -33,7 +33,7 @@ public abstract class Configuration extends AbstractCompositeStage { private boolean initialized; private boolean executed; - private Stage startStage; + private AbstractStage startStage; protected Configuration() { this(new TerminatingExceptionListenerFactory()); @@ -84,7 +84,7 @@ public abstract class Configuration extends AbstractCompositeStage { return context; } - Stage getStartStage() { + AbstractStage getStartStage() { return startStage; } diff --git a/src/main/java/teetime/framework/ConfigurationContext.java b/src/main/java/teetime/framework/ConfigurationContext.java index 054821bbbfb075615425c3baddc183e5d49e4ce9..7f9edc4d7a6019fe0119d6daf0925b6cd8bfb6dd 100644 --- a/src/main/java/teetime/framework/ConfigurationContext.java +++ b/src/main/java/teetime/framework/ConfigurationContext.java @@ -19,7 +19,7 @@ import java.util.Set; /** * Represents a context that is used by a configuration and composite stages to connect ports, for example. - * Stages can be added by executing {@link #declareActive(Stage)}. + * Stages can be added by executing {@link #declareActive(AbstractStage)}. * * @since 2.0 */ @@ -37,7 +37,7 @@ final class ConfigurationContext { this.threadService = new ThreadService(configuration); } - Set<Stage> getThreadableStages() { + Set<AbstractStage> getThreadableStages() { return threadService.getThreadableStages(); } diff --git a/src/main/java/teetime/framework/Execution.java b/src/main/java/teetime/framework/Execution.java index a8c98167c67bbf475a3957a15147aecd55b678b3..d47f0bf058915a1d415fd26dbe578ded002d2f2f 100644 --- a/src/main/java/teetime/framework/Execution.java +++ b/src/main/java/teetime/framework/Execution.java @@ -76,8 +76,8 @@ public final class Execution<T extends Configuration> { // BETTER validate concurrently private void validateStages() { - final Set<Stage> threadableStages = configurationContext.getThreadableStages(); - for (Stage stage : threadableStages) { + final Set<AbstractStage> threadableStages = configurationContext.getThreadableStages(); + for (AbstractStage stage : threadableStages) { // // portConnectionValidator.validate(stage); // } diff --git a/src/main/java/teetime/framework/ExecutionInstantiation.java b/src/main/java/teetime/framework/ExecutionInstantiation.java index ddb6188c2b05e9bdfc00001007de6430298cd4ca..2eedef9e1ed2eca5177e32f3fba91a870f501d70 100644 --- a/src/main/java/teetime/framework/ExecutionInstantiation.java +++ b/src/main/java/teetime/framework/ExecutionInstantiation.java @@ -40,9 +40,9 @@ class ExecutionInstantiation { void instantiatePipes() { int color = DEFAULT_COLOR; - Map<Stage, Integer> colors = new HashMap<Stage, Integer>(); - Set<Stage> threadableStages = context.getThreadableStages(); - for (Stage threadableStage : threadableStages) { + Map<AbstractStage, Integer> colors = new HashMap<AbstractStage, Integer>(); + Set<AbstractStage> threadableStages = context.getThreadableStages(); + for (AbstractStage threadableStage : threadableStages) { color++; colors.put(threadableStage, color); @@ -53,18 +53,18 @@ class ExecutionInstantiation { private static class ThreadPainter { - private final Map<Stage, Integer> colors; + private final Map<AbstractStage, Integer> colors; private final int color; - private final Set<Stage> threadableStages; + private final Set<AbstractStage> threadableStages; - public ThreadPainter(final Map<Stage, Integer> colors, final int color, final Set<Stage> threadableStages) { + public ThreadPainter(final Map<AbstractStage, Integer> colors, final int color, final Set<AbstractStage> threadableStages) { super(); this.colors = colors; this.color = color; this.threadableStages = threadableStages; } - public int colorAndConnectStages(final Stage stage) { + public int colorAndConnectStages(final AbstractStage stage) { int createdConnections = 0; for (OutputPort<?> outputPort : stage.getOutputPorts()) { @@ -82,7 +82,7 @@ class ExecutionInstantiation { private int processPipe(final OutputPort outputPort, final InstantiationPipe pipe) { int numCreatedConnections; - Stage targetStage = pipe.getTargetPort().getOwningStage(); + AbstractStage targetStage = pipe.getTargetPort().getOwningStage(); int targetColor = colors.containsKey(targetStage) ? colors.get(targetStage) : DEFAULT_COLOR; if (threadableStages.contains(targetStage) && targetColor != color) { diff --git a/src/main/java/teetime/framework/ITraverserVisitor.java b/src/main/java/teetime/framework/ITraverserVisitor.java index f089a258104be16050a059274784aab52ffea936..0b312a88dbc44c467d586dc5dd84eb11ed1e1998 100644 --- a/src/main/java/teetime/framework/ITraverserVisitor.java +++ b/src/main/java/teetime/framework/ITraverserVisitor.java @@ -20,7 +20,7 @@ import teetime.framework.pipe.DummyPipe; public interface ITraverserVisitor { - VisitorBehavior visit(Stage stage); + VisitorBehavior visit(AbstractStage stage); VisitorBehavior visit(AbstractPort<?> port); diff --git a/src/main/java/teetime/framework/InputPort.java b/src/main/java/teetime/framework/InputPort.java index d4409bb173d5c282510c0886842f70e954867ee2..ef67ab0da50d098149791bc2fb15761fec056e13 100644 --- a/src/main/java/teetime/framework/InputPort.java +++ b/src/main/java/teetime/framework/InputPort.java @@ -26,7 +26,7 @@ package teetime.framework; */ public final class InputPort<T> extends AbstractPort<T> { - InputPort(final Class<T> type, final Stage owningStage, final String portName) { + InputPort(final Class<T> type, final AbstractStage owningStage, final String portName) { super(type, owningStage, portName); } diff --git a/src/main/java/teetime/framework/IntraStageCollector.java b/src/main/java/teetime/framework/IntraStageCollector.java index 3e95ba4daab30808d950bbe251e0b7bc63376caf..f3f5f8d99651fb2caa933a62ccf9556f6c832e0b 100644 --- a/src/main/java/teetime/framework/IntraStageCollector.java +++ b/src/main/java/teetime/framework/IntraStageCollector.java @@ -20,15 +20,15 @@ import teetime.framework.pipe.DummyPipe; public class IntraStageCollector implements ITraverserVisitor { - private final Stage startStage; + private final AbstractStage startStage; - public IntraStageCollector(final Stage startStage) { + public IntraStageCollector(final AbstractStage startStage) { super(); this.startStage = startStage; } @Override - public VisitorBehavior visit(final Stage stage) { + public VisitorBehavior visit(final AbstractStage stage) { if (stage == startStage || stage.getOwningThread() == null /* before execution */ || stage.getOwningThread() == startStage.getOwningThread() /* while execution */) { return VisitorBehavior.CONTINUE; diff --git a/src/main/java/teetime/framework/OutputPort.java b/src/main/java/teetime/framework/OutputPort.java index 0f2db809a9c3edd66a1c6d07b478247cfce4fe94..041ef186734d0d1ca4b17f937968d693b3688568 100644 --- a/src/main/java/teetime/framework/OutputPort.java +++ b/src/main/java/teetime/framework/OutputPort.java @@ -30,7 +30,7 @@ import teetime.framework.signal.TerminatingSignal; */ public final class OutputPort<T> extends AbstractPort<T> { - OutputPort(final Class<T> type, final Stage owningStage, final String portName) { + OutputPort(final Class<T> type, final AbstractStage owningStage, final String portName) { super(type, owningStage, portName); setPipe(DummyPipe.INSTANCE); } diff --git a/src/main/java/teetime/framework/RunnableConsumerStage.java b/src/main/java/teetime/framework/RunnableConsumerStage.java index 6064511953b3f71f88ad1e13e115ce5fafdc7c55..fa3e7ad7b9d8eab901f90d537af0a100d1a285d0 100644 --- a/src/main/java/teetime/framework/RunnableConsumerStage.java +++ b/src/main/java/teetime/framework/RunnableConsumerStage.java @@ -26,7 +26,7 @@ final class RunnableConsumerStage extends AbstractRunnableStage { * @param stage * to execute within an own thread */ - public RunnableConsumerStage(final Stage stage) { + public RunnableConsumerStage(final AbstractStage stage) { super(stage); } @@ -47,7 +47,7 @@ final class RunnableConsumerStage extends AbstractRunnableStage { } } - private void checkForTerminationSignal(final Stage stage) { + private void checkForTerminationSignal(final AbstractStage stage) { // FIXME should getInputPorts() really be defined in Stage? for (InputPort<?> inputPort : stage.getInputPorts()) { if (inputPort.isClosed()) { diff --git a/src/main/java/teetime/framework/RunnableProducerStage.java b/src/main/java/teetime/framework/RunnableProducerStage.java index 76a9831be5008c913bf4935f66b6b7d42edc4e47..90031ad39fa07a14783cfaf19e089fb9b9be9e90 100644 --- a/src/main/java/teetime/framework/RunnableProducerStage.java +++ b/src/main/java/teetime/framework/RunnableProducerStage.java @@ -24,7 +24,7 @@ public class RunnableProducerStage extends AbstractRunnableStage { private final Semaphore startSemaphore = new Semaphore(0); - public RunnableProducerStage(final Stage stage) { + public RunnableProducerStage(final AbstractStage stage) { super(stage); } diff --git a/src/main/java/teetime/framework/RuntimeServiceFacade.java b/src/main/java/teetime/framework/RuntimeServiceFacade.java index e3e34877e6f173fde2ed0b240ca4d9e4da4bd750..8d4b7503219778c382005938e6066a5d888e8276 100644 --- a/src/main/java/teetime/framework/RuntimeServiceFacade.java +++ b/src/main/java/teetime/framework/RuntimeServiceFacade.java @@ -23,7 +23,7 @@ public final class RuntimeServiceFacade { // singleton } - public void startWithinNewThread(final Stage previousStage, final Stage stage) { + public void startWithinNewThread(final AbstractStage previousStage, final AbstractStage stage) { previousStage.getOwningContext().getThreadService().startStageAtRuntime(stage); } diff --git a/src/main/java/teetime/framework/Stage.java b/src/main/java/teetime/framework/Stage.java deleted file mode 100644 index 4851210d210ef264edcdce57a111ce091377db97..0000000000000000000000000000000000000000 --- a/src/main/java/teetime/framework/Stage.java +++ /dev/null @@ -1,219 +0,0 @@ -/** - * Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://christianwulf.github.io/teetime) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package teetime.framework; - -import java.util.List; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import teetime.framework.exceptionHandling.AbstractExceptionListener; -import teetime.framework.exceptionHandling.AbstractExceptionListener.FurtherExecution; -import teetime.framework.exceptionHandling.TerminateException; -import teetime.framework.signal.ISignal; -import teetime.framework.validation.InvalidPortConnection; - -/** - * Represents a minimal Stage, with some pre-defined methods. - * Implemented stages need to adapt all abstract methods with own implementations. - */ -@SuppressWarnings("PMD.AbstractNaming") -public abstract class Stage { - - private static final ConcurrentMap<String, Integer> INSTANCES_COUNTER = new ConcurrentHashMap<String, Integer>(); - private static final NotEnoughInputException NOT_ENOUGH_INPUT_EXCEPTION = new NotEnoughInputException(); - - private final String id; - /** - * A unique logger instance per stage instance - */ - @SuppressWarnings("PMD.LoggerIsNotStaticFinal") - protected final Logger logger; - - protected AbstractExceptionListener exceptionListener; - - /** The owning thread of this stage if this stage is directly executed by a {@link AbstractRunnableStage}, <code>null</code> otherwise. */ - private Thread owningThread; - - private boolean isActive; - - private ConfigurationContext owningContext; - - ConfigurationContext getOwningContext() { - return owningContext; - } - - void setOwningContext(final ConfigurationContext owningContext) { - this.owningContext = owningContext; - } - - protected Stage() { - this.id = this.createId(); - this.logger = LoggerFactory.getLogger(this.getClass().getCanonicalName() + ":" + id); - } - - /** - * @return an identifier that is unique among all stage instances. It is especially unique among all instances of the same stage type. - */ - public String getId() { - return this.id; - } - - @Override - public String toString() { - return this.getClass().getName() + ": " + this.getId(); - } - - private String createId() { - String simpleName = this.getClass().getSimpleName(); - - Integer numInstances = INSTANCES_COUNTER.get(simpleName); - if (null == numInstances) { - numInstances = 0; - } - - String newId = simpleName + "-" + numInstances; - INSTANCES_COUNTER.put(simpleName, ++numInstances); - return newId; - } - - @SuppressWarnings("PMD.DefaultPackage") - static void clearInstanceCounters() { - INSTANCES_COUNTER.clear(); - } - - // public abstract Stage getParentStage(); - // - // public abstract void setParentStage(Stage parentStage, int index); - - protected final void returnNoElement() { - throw NOT_ENOUGH_INPUT_EXCEPTION; - } - - /** - * This should check, if the OutputPorts are connected correctly. This is needed to avoid NullPointerExceptions and other errors. - * - * @param invalidPortConnections - * <i>(Passed as parameter for performance reasons)</i> - */ - public abstract void validateOutputPorts(List<InvalidPortConnection> invalidPortConnections); - - protected final void executeStage() { - try { - this.execute(); - } catch (NotEnoughInputException e) { - throw e; - } catch (TerminateException e) { - throw e; - } catch (Exception e) { - final FurtherExecution furtherExecution = this.exceptionListener.reportException(e, this); - if (furtherExecution == FurtherExecution.TERMINATE) { - throw TerminateException.INSTANCE; - } - } - } - - protected abstract void execute(); - - protected abstract void onSignal(ISignal signal, InputPort<?> inputPort); - - protected abstract TerminationStrategy getTerminationStrategy(); - - protected abstract void terminate(); - - protected abstract void abort(); - - protected abstract boolean shouldBeTerminated(); - - public abstract StageState getCurrentState(); - - public Thread getOwningThread() { - return owningThread; - } - - void setOwningThread(final Thread owningThread) { - if (this.owningThread != null && this.owningThread != owningThread) { - // checks also for "crossing threads" - // throw new IllegalStateException("Attribute owningThread was set twice each with another thread"); - } - this.owningThread = owningThread; - } - - protected abstract List<InputPort<?>> getInputPorts(); - - protected abstract List<OutputPort<?>> getOutputPorts(); - - // events - - public abstract void onValidating(List<InvalidPortConnection> invalidPortConnections); - - /** - * Event that is triggered within the initialization phase of the analysis. - * It does not count to the execution time. - * - * @throws Exception - * an arbitrary exception if an error occurs during the initialization - */ - @SuppressWarnings("PMD.SignatureDeclareThrowsException") - public abstract void onStarting() throws Exception; - - @SuppressWarnings("PMD.SignatureDeclareThrowsException") - public abstract void onTerminating() throws Exception; - - protected final void setExceptionHandler(final AbstractExceptionListener exceptionHandler) { - this.exceptionListener = exceptionHandler; - } - - protected abstract void removeDynamicPort(OutputPort<?> outputPort); - - protected abstract void removeDynamicPort(InputPort<?> inputPort); - - public boolean isActive() { - return isActive; - } - - void setActive(final boolean isActive) { - this.isActive = isActive; - } - - /** - * Execute this method, to add a stage to the configuration, which should be executed in a own thread. - * - * @param stage - * A arbitrary stage, which will be added to the configuration and executed in a thread. - */ - public void declareActive() { - declareActive(getId()); - } - - /** - * Execute this method, to add a stage to the configuration, which should be executed in a own thread. - * - * @param stage - * A arbitrary stage, which will be added to the configuration and executed in a thread. - * @param threadName - * A string which can be used for debugging. - */ - public void declareActive(final String threadName) { - AbstractRunnableStage runnable = AbstractRunnableStage.create(this); - Thread newThread = new TeeTimeThread(runnable, threadName); - this.setOwningThread(newThread); - this.setActive(true); - } - -} diff --git a/src/main/java/teetime/framework/ThreadService.java b/src/main/java/teetime/framework/ThreadService.java index c7656c57b43ede82997624b522d02105869b0fcd..599b0146d6512e730ca246b678dcf5c5677a5719 100644 --- a/src/main/java/teetime/framework/ThreadService.java +++ b/src/main/java/teetime/framework/ThreadService.java @@ -44,7 +44,7 @@ class ThreadService extends AbstractService<ThreadService> { private final List<Thread> infiniteProducerThreads = Collections.synchronizedList(new LinkedList<Thread>()); private final SignalingCounter runnableCounter = new SignalingCounter(); - private final Set<Stage> threadableStages = Collections.synchronizedSet(new HashSet<Stage>()); + private final Set<AbstractStage> threadableStages = Collections.synchronizedSet(new HashSet<AbstractStage>()); private final Configuration configuration; @@ -54,23 +54,23 @@ class ThreadService extends AbstractService<ThreadService> { @Override void onInitialize() { - Stage startStage = configuration.getStartStage(); + AbstractStage startStage = configuration.getStartStage(); - Set<Stage> newThreadableStages = initialize(startStage); + Set<AbstractStage> newThreadableStages = initialize(startStage); startThreads(newThreadableStages); } - void startStageAtRuntime(final Stage newStage) { + void startStageAtRuntime(final AbstractStage newStage) { newStage.declareActive(); - Set<Stage> newThreadableStages = initialize(newStage); + Set<AbstractStage> newThreadableStages = initialize(newStage); startThreads(newThreadableStages); sendStartingSignal(newThreadableStages); } // extracted for runtime use - private Set<Stage> initialize(final Stage startStage) { + private Set<AbstractStage> initialize(final AbstractStage startStage) { if (startStage == null) { throw new IllegalStateException("The start stage may not be null."); } @@ -81,7 +81,7 @@ class ThreadService extends AbstractService<ThreadService> { Traverser traversor = new Traverser(stageCollector, Direction.BOTH); traversor.traverse(startStage); - Set<Stage> newThreadableStages = stageCollector.getThreadableStages(); + Set<AbstractStage> newThreadableStages = stageCollector.getThreadableStages(); threadableStages.addAll(newThreadableStages); if (threadableStages.isEmpty()) { @@ -98,14 +98,14 @@ class ThreadService extends AbstractService<ThreadService> { A4StageAttributeSetter attributeSetter = new A4StageAttributeSetter(configuration, newThreadableStages); attributeSetter.setAttributes(); - for (Stage stage : newThreadableStages) { + for (AbstractStage stage : newThreadableStages) { categorizeThreadableStage(stage); } return newThreadableStages; } - private void categorizeThreadableStage(final Stage stage) { + private void categorizeThreadableStage(final AbstractStage stage) { switch (stage.getTerminationStrategy()) { case BY_INTERRUPT: infiniteProducerThreads.add(stage.getOwningThread()); @@ -122,15 +122,15 @@ class ThreadService extends AbstractService<ThreadService> { } } - private void startThreads(final Set<Stage> threadableStages) { - for (Stage stage : threadableStages) { + private void startThreads(final Set<AbstractStage> threadableStages) { + for (AbstractStage stage : threadableStages) { stage.getOwningThread().start(); } } - private void sendStartingSignal(final Set<Stage> newThreadableStages) { + private void sendStartingSignal(final Set<AbstractStage> newThreadableStages) { synchronized (newThreadableStages) { - for (Stage stage : newThreadableStages) { + for (AbstractStage stage : newThreadableStages) { ((TeeTimeThread) stage.getOwningThread()).sendStartingSignal(); } } @@ -146,9 +146,9 @@ class ThreadService extends AbstractService<ThreadService> { abortStages(threadableStages); } - private void abortStages(final Set<Stage> currentTreadableStages) { + private void abortStages(final Set<AbstractStage> currentTreadableStages) { synchronized (currentTreadableStages) { - for (Stage stage : currentTreadableStages) { + for (AbstractStage stage : currentTreadableStages) { stage.abort(); } } @@ -193,7 +193,7 @@ class ThreadService extends AbstractService<ThreadService> { return exceptions; } - Set<Stage> getThreadableStages() { + Set<AbstractStage> getThreadableStages() { return threadableStages; } diff --git a/src/main/java/teetime/framework/Traverser.java b/src/main/java/teetime/framework/Traverser.java index 6bb62c647a640fa40cea23a800c1378d448538d7..a5dc4497d054671a2f5f928568e590b9bf29901e 100644 --- a/src/main/java/teetime/framework/Traverser.java +++ b/src/main/java/teetime/framework/Traverser.java @@ -47,7 +47,7 @@ public class Traverser { CONTINUE, STOP; } - private final Set<Stage> visitedStages = new HashSet<Stage>(); + private final Set<AbstractStage> visitedStages = new HashSet<AbstractStage>(); private final ITraverserVisitor traverserVisitor; private final Direction direction; @@ -61,7 +61,7 @@ public class Traverser { this.direction = direction; } - public void traverse(final Stage stage) { + public void traverse(final AbstractStage stage) { VisitorBehavior behavior = traverserVisitor.visit(stage); if (behavior == VisitorBehavior.STOP || !visitedStages.add(stage)) { return; @@ -93,7 +93,7 @@ public class Traverser { } } - public Set<Stage> getVisitedStages() { + public Set<AbstractStage> getVisitedStages() { return visitedStages; } } diff --git a/src/main/java/teetime/framework/exceptionHandling/AbstractExceptionListener.java b/src/main/java/teetime/framework/exceptionHandling/AbstractExceptionListener.java index 81cc2f0297258ddb89dadd5c094f9b28bc41ba71..e317fe51f33b8c0aa81fa933af487641b8845b4f 100644 --- a/src/main/java/teetime/framework/exceptionHandling/AbstractExceptionListener.java +++ b/src/main/java/teetime/framework/exceptionHandling/AbstractExceptionListener.java @@ -21,12 +21,12 @@ import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import teetime.framework.Stage; +import teetime.framework.AbstractStage; /** * Represents a minimalistic StageExceptionListener. * Listener which extend from this one, must a least implement this functionality. - * This abstract class provides a Logger {@link #logger} and the method {@link #onStageException(Exception, Stage)} which is called on every raised exception. + * This abstract class provides a Logger {@link #logger} and the method {@link #onStageException(Exception, AbstractStage)} which is called on every raised exception. */ public abstract class AbstractExceptionListener { @@ -57,13 +57,13 @@ public abstract class AbstractExceptionListener { * @return * true, if the thread should be terminated, false otherwise */ - public abstract FurtherExecution onStageException(Exception e, Stage throwingStage); + public abstract FurtherExecution onStageException(Exception e, AbstractStage throwingStage); public List<Exception> getLoggedExceptions() { return exceptionsList; } - public FurtherExecution reportException(final Exception e, final Stage stage) { + public FurtherExecution reportException(final Exception e, final AbstractStage stage) { if (logExceptions) { exceptionsList.add(e); } diff --git a/src/main/java/teetime/framework/exceptionHandling/IgnoringExceptionListener.java b/src/main/java/teetime/framework/exceptionHandling/IgnoringExceptionListener.java index 3eb5445131482966918d4137e0faee2dc31ec070..e87cf8a77b92a6db7f9f6e2f7480cf1dd39f8412 100644 --- a/src/main/java/teetime/framework/exceptionHandling/IgnoringExceptionListener.java +++ b/src/main/java/teetime/framework/exceptionHandling/IgnoringExceptionListener.java @@ -15,7 +15,7 @@ */ package teetime.framework.exceptionHandling; -import teetime.framework.Stage; +import teetime.framework.AbstractStage; class IgnoringExceptionListener extends AbstractExceptionListener { @@ -24,7 +24,7 @@ class IgnoringExceptionListener extends AbstractExceptionListener { } @Override - public FurtherExecution onStageException(final Exception e, final Stage throwingStage) { + public FurtherExecution onStageException(final Exception e, final AbstractStage throwingStage) { return FurtherExecution.CONTINUE; } } diff --git a/src/main/java/teetime/framework/exceptionHandling/LoggingExceptionListener.java b/src/main/java/teetime/framework/exceptionHandling/LoggingExceptionListener.java index 8fee623cb29f4068f6d98cedcf68e1dab33cb1ba..125fabb0536fc73067f0ae3000da90b1972c24a8 100644 --- a/src/main/java/teetime/framework/exceptionHandling/LoggingExceptionListener.java +++ b/src/main/java/teetime/framework/exceptionHandling/LoggingExceptionListener.java @@ -15,7 +15,7 @@ */ package teetime.framework.exceptionHandling; -import teetime.framework.Stage; +import teetime.framework.AbstractStage; class LoggingExceptionListener extends AbstractExceptionListener { @@ -24,7 +24,7 @@ class LoggingExceptionListener extends AbstractExceptionListener { } @Override - public FurtherExecution onStageException(final Exception e, final Stage throwingStage) { + public FurtherExecution onStageException(final Exception e, final AbstractStage throwingStage) { logger.warn("Exception occurred in " + throwingStage.getId(), e); return FurtherExecution.CONTINUE; } diff --git a/src/main/java/teetime/framework/exceptionHandling/TerminatingExceptionListener.java b/src/main/java/teetime/framework/exceptionHandling/TerminatingExceptionListener.java index 2490c0e43ad4a15b4ffd47baf86521a642cd1c07..d7aab2a238bc2ccf4bf4ac4c3e6a2f059c6668c5 100644 --- a/src/main/java/teetime/framework/exceptionHandling/TerminatingExceptionListener.java +++ b/src/main/java/teetime/framework/exceptionHandling/TerminatingExceptionListener.java @@ -18,7 +18,7 @@ package teetime.framework.exceptionHandling; import java.util.ArrayList; import java.util.List; -import teetime.framework.Stage; +import teetime.framework.AbstractStage; class TerminatingExceptionListener extends AbstractExceptionListener { @@ -29,7 +29,7 @@ class TerminatingExceptionListener extends AbstractExceptionListener { } @Override - public FurtherExecution onStageException(final Exception e, final Stage throwingStage) { + public FurtherExecution onStageException(final Exception e, final AbstractStage throwingStage) { if (logger.isWarnEnabled()) { logger.warn("Exception occurred in " + throwingStage.getId(), e); } diff --git a/src/main/java/teetime/framework/signal/ISignal.java b/src/main/java/teetime/framework/signal/ISignal.java index 80469661a1a5d7ff68667135081f7888c668ccd7..f04d781619cc81e1699be9994530bd4f3716a107 100644 --- a/src/main/java/teetime/framework/signal/ISignal.java +++ b/src/main/java/teetime/framework/signal/ISignal.java @@ -19,11 +19,11 @@ import java.util.List; import java.util.Set; import teetime.framework.InputPort; -import teetime.framework.Stage; +import teetime.framework.AbstractStage; public interface ISignal { - void trigger(Stage stage) throws Exception; + void trigger(AbstractStage stage) throws Exception; // Only used by the merger so far boolean mayBeTriggered(Set<InputPort<?>> receivedInputPorts, List<InputPort<?>> allInputPorts); diff --git a/src/main/java/teetime/framework/signal/StartingSignal.java b/src/main/java/teetime/framework/signal/StartingSignal.java index 38ad2ce4d4011c0f36adfe7ee583e1fd3932f324..3f8849820df7dbc654473977b4b1ae99131eda79 100644 --- a/src/main/java/teetime/framework/signal/StartingSignal.java +++ b/src/main/java/teetime/framework/signal/StartingSignal.java @@ -19,12 +19,12 @@ import java.util.List; import java.util.Set; import teetime.framework.InputPort; -import teetime.framework.Stage; +import teetime.framework.AbstractStage; public final class StartingSignal implements ISignal { @Override - public void trigger(final Stage stage) throws Exception { + public void trigger(final AbstractStage stage) throws Exception { stage.onStarting(); } diff --git a/src/main/java/teetime/framework/signal/TerminatingSignal.java b/src/main/java/teetime/framework/signal/TerminatingSignal.java index 211c57bb42fc7ad6deea6f5e5ad2818dbdbd906f..9411073630c4dbeb340cb3fcf07b12850033c7aa 100644 --- a/src/main/java/teetime/framework/signal/TerminatingSignal.java +++ b/src/main/java/teetime/framework/signal/TerminatingSignal.java @@ -19,12 +19,12 @@ import java.util.List; import java.util.Set; import teetime.framework.InputPort; -import teetime.framework.Stage; +import teetime.framework.AbstractStage; public final class TerminatingSignal implements ISignal { @Override - public void trigger(final Stage stage) throws Exception { + public void trigger(final AbstractStage stage) throws Exception { stage.onTerminating(); } diff --git a/src/main/java/teetime/framework/signal/ValidatingSignal.java b/src/main/java/teetime/framework/signal/ValidatingSignal.java index f85b4ada53900cf175025b1fed541f31b3837125..5e38b9d7aa3db2eb96cf8fbc0fffb2d4b2c8d054 100644 --- a/src/main/java/teetime/framework/signal/ValidatingSignal.java +++ b/src/main/java/teetime/framework/signal/ValidatingSignal.java @@ -20,7 +20,7 @@ import java.util.List; import java.util.Set; import teetime.framework.InputPort; -import teetime.framework.Stage; +import teetime.framework.AbstractStage; import teetime.framework.validation.InvalidPortConnection; public final class ValidatingSignal implements ISignal { @@ -28,7 +28,7 @@ public final class ValidatingSignal implements ISignal { private final List<InvalidPortConnection> invalidPortConnections = new LinkedList<InvalidPortConnection>(); @Override - public void trigger(final Stage stage) { + public void trigger(final AbstractStage stage) { stage.onValidating(this.invalidPortConnections); } diff --git a/src/main/java/teetime/framework/test/InputHolder.java b/src/main/java/teetime/framework/test/InputHolder.java index 29bce044008b00d25a704c3e2b644d6e1c2f3f68..9d58aae1b2e338882d4d9eb2b1635fa9eb7b8caa 100644 --- a/src/main/java/teetime/framework/test/InputHolder.java +++ b/src/main/java/teetime/framework/test/InputHolder.java @@ -16,18 +16,18 @@ package teetime.framework.test; import teetime.framework.InputPort; -import teetime.framework.Stage; +import teetime.framework.AbstractStage; public final class InputHolder<I> { private final StageTester stageTester; - private final Stage stage; + private final AbstractStage stage; private final Iterable<Object> input; private InputPort<Object> port; @SuppressWarnings("unchecked") - InputHolder(final StageTester stageTester, final Stage stage, final Iterable<I> input) { + InputHolder(final StageTester stageTester, final AbstractStage stage, final Iterable<I> input) { this.stageTester = stageTester; this.stage = stage; this.input = (Iterable<Object>) input; diff --git a/src/main/java/teetime/framework/test/StageTester.java b/src/main/java/teetime/framework/test/StageTester.java index 10313c679335d1f9b187fab647e69c6b2f54037f..d0d23b14598bf078623cba9a91e6ac3977d191ea 100644 --- a/src/main/java/teetime/framework/test/StageTester.java +++ b/src/main/java/teetime/framework/test/StageTester.java @@ -22,7 +22,7 @@ import java.util.List; import teetime.framework.Configuration; import teetime.framework.Execution; import teetime.framework.ExecutionException; -import teetime.framework.Stage; +import teetime.framework.AbstractStage; import teetime.framework.StageState; import teetime.stage.CollectorSink; import teetime.stage.InitialElementProducer; @@ -36,13 +36,13 @@ public final class StageTester { private final List<InputHolder<?>> inputHolders = new ArrayList<InputHolder<?>>(); private final List<OutputHolder<?>> outputHolders = new ArrayList<OutputHolder<?>>(); - private final Stage stage; + private final AbstractStage stage; - private StageTester(final Stage stage) { + private StageTester(final AbstractStage stage) { this.stage = stage; } - public static StageTester test(final Stage stage) { + public static StageTester test(final AbstractStage stage) { if (stage.getCurrentState() != StageState.CREATED) { throw new AssertionError("This stage has already been tested in this test method. Move this test into a new test method."); } @@ -85,7 +85,7 @@ public final class StageTester { private final class TestConfiguration extends Configuration { - public TestConfiguration(final List<InputHolder<?>> inputHolders, final Stage stage, final List<OutputHolder<?>> outputHolders) { + public TestConfiguration(final List<InputHolder<?>> inputHolders, final AbstractStage stage, final List<OutputHolder<?>> outputHolders) { for (InputHolder<?> inputHolder : inputHolders) { final InitialElementProducer<Object> producer = new InitialElementProducer<Object>(inputHolder.getInput()); connectPorts(producer.getOutputPort(), inputHolder.getPort()); diff --git a/src/main/java/teetime/stage/io/EveryXthPrinter.java b/src/main/java/teetime/stage/io/EveryXthPrinter.java index 4692ea3c5c2bbe300b40958acfb58550d181dbb2..9c82547ee2d1d91159548eca021afb997c509fcf 100644 --- a/src/main/java/teetime/stage/io/EveryXthPrinter.java +++ b/src/main/java/teetime/stage/io/EveryXthPrinter.java @@ -21,7 +21,7 @@ import java.util.List; import teetime.framework.AbstractCompositeStage; import teetime.framework.InputPort; import teetime.framework.OutputPort; -import teetime.framework.Stage; +import teetime.framework.AbstractStage; import teetime.stage.EveryXthStage; import teetime.stage.basic.distributor.Distributor; import teetime.stage.basic.distributor.strategy.CopyByReferenceStrategy; @@ -29,7 +29,7 @@ import teetime.stage.basic.distributor.strategy.CopyByReferenceStrategy; public final class EveryXthPrinter<T> extends AbstractCompositeStage { private final Distributor<T> distributor; - private final List<Stage> lastStages = new ArrayList<Stage>(); + private final List<AbstractStage> lastStages = new ArrayList<AbstractStage>(); public EveryXthPrinter(final int threshold) { distributor = new Distributor<T>(new CopyByReferenceStrategy()); @@ -50,7 +50,7 @@ public final class EveryXthPrinter<T> extends AbstractCompositeStage { return distributor.getNewOutputPort(); } - public Stage getFirstStage() { + public AbstractStage getFirstStage() { return distributor; } diff --git a/src/main/java/teetime/util/framework/port/PortAction.java b/src/main/java/teetime/util/framework/port/PortAction.java index 52188070b1874f4d4e449cad37f2c46ffa9d3b71..aec07b57786939045f2ac4ead6dbf8ec8dcd757b 100644 --- a/src/main/java/teetime/util/framework/port/PortAction.java +++ b/src/main/java/teetime/util/framework/port/PortAction.java @@ -15,9 +15,9 @@ */ package teetime.util.framework.port; -import teetime.framework.Stage; +import teetime.framework.AbstractStage; -public interface PortAction<T extends Stage> { +public interface PortAction<T extends AbstractStage> { public abstract void execute(final T stage); diff --git a/src/main/java/teetime/util/framework/port/PortActionHelper.java b/src/main/java/teetime/util/framework/port/PortActionHelper.java index c87aebc6c84870787f6746ed67a82d135329fb1a..de509c6c4cf61e6d1c186d787ee8f37bc44509e3 100644 --- a/src/main/java/teetime/util/framework/port/PortActionHelper.java +++ b/src/main/java/teetime/util/framework/port/PortActionHelper.java @@ -23,7 +23,7 @@ import org.jctools.queues.spec.ConcurrentQueueSpec; import org.jctools.queues.spec.Ordering; import org.jctools.queues.spec.Preference; -import teetime.framework.Stage; +import teetime.framework.AbstractStage; import teetime.util.framework.concurrent.queue.PCBlockingQueue; import teetime.util.framework.concurrent.queue.putstrategy.PutStrategy; import teetime.util.framework.concurrent.queue.putstrategy.YieldPutStrategy; @@ -44,14 +44,14 @@ public final class PortActionHelper { return portActions; } - public static <T extends Stage> void checkForPendingPortActionRequest(final T stage, final BlockingQueue<PortAction<T>> portActions) { + public static <T extends AbstractStage> void checkForPendingPortActionRequest(final T stage, final BlockingQueue<PortAction<T>> portActions) { PortAction<T> dynamicPortAction = portActions.poll(); if (null != dynamicPortAction) { dynamicPortAction.execute(stage); } } - public static <T extends Stage> void checkBlockingForPendingPortActionRequest(final T stage, final BlockingQueue<PortAction<T>> portActions) + public static <T extends AbstractStage> void checkBlockingForPendingPortActionRequest(final T stage, final BlockingQueue<PortAction<T>> portActions) throws InterruptedException { PortAction<T> dynamicPortAction = portActions.take(); dynamicPortAction.execute(stage); diff --git a/src/test/java/teetime/framework/StageTest.java b/src/test/java/teetime/framework/AbstractStageTest.java similarity index 95% rename from src/test/java/teetime/framework/StageTest.java rename to src/test/java/teetime/framework/AbstractStageTest.java index 3081b34678e7e3d81ce154d6ec65b9335f61d708..ed883704cddd726ff630e4bd998bb808d910f776 100644 --- a/src/test/java/teetime/framework/StageTest.java +++ b/src/test/java/teetime/framework/AbstractStageTest.java @@ -29,11 +29,11 @@ import teetime.stage.Cache; import teetime.stage.Counter; import teetime.stage.InitialElementProducer; -public class StageTest { +public class AbstractStageTest { @Test public void testId() { - Stage.clearInstanceCounters(); + AbstractStage.clearInstanceCounters(); Counter<Object> counter0 = new Counter<Object>(); Counter<Object> counter1 = new Counter<Object>(); @@ -77,8 +77,6 @@ public class StageTest { private final long delayInMs; - public boolean finished; - public DelayAndTerminate(final long delayInMs) { super(); this.delayInMs = delayInMs; @@ -89,8 +87,8 @@ public class StageTest { try { Thread.sleep(delayInMs); } catch (InterruptedException e) { + throw new IllegalStateException(e); } - finished = true; } } diff --git a/src/test/java/teetime/framework/TraverserTest.java b/src/test/java/teetime/framework/TraverserTest.java index b4f3c0de2e55ca314a1ab88ddf10e5a7c4610d0b..d8d74d9476f63d06225afe1ce8e39958d08b2eb2 100644 --- a/src/test/java/teetime/framework/TraverserTest.java +++ b/src/test/java/teetime/framework/TraverserTest.java @@ -45,7 +45,7 @@ public class TraverserTest { Traverser traversor = new Traverser(new IntraStageCollector(tc.init)); traversor.traverse(tc.init); - Set<Stage> comparingStages = new HashSet<Stage>(); + Set<AbstractStage> comparingStages = new HashSet<AbstractStage>(); comparingStages.add(tc.init); comparingStages.add(tc.f2b); comparingStages.add(tc.distributor); diff --git a/src/test/java/teetime/framework/WaitStrategyConfiguration.java b/src/test/java/teetime/framework/WaitStrategyConfiguration.java index 507dc935b7fa48f71e161634a9cf943e1a8e6b81..9dbe7e68914e60dee050fec3ab656809298ddc2d 100644 --- a/src/test/java/teetime/framework/WaitStrategyConfiguration.java +++ b/src/test/java/teetime/framework/WaitStrategyConfiguration.java @@ -28,10 +28,10 @@ class WaitStrategyConfiguration extends Configuration { public WaitStrategyConfiguration(final long initialDelayInMs, final Object... elements) { - Stage producer = buildProducer(elements); + AbstractStage producer = buildProducer(elements); producer.declareActive(); - Stage consumer = buildConsumer(delay); + AbstractStage consumer = buildConsumer(delay); consumer.declareActive(); Clock clock = buildClock(initialDelayInMs, delay); @@ -47,7 +47,7 @@ class WaitStrategyConfiguration extends Configuration { return clock; } - private Stage buildProducer(final Object... elements) { + private AbstractStage buildProducer(final Object... elements) { InitialElementProducer<Object> initialElementProducer = new InitialElementProducer<Object>(elements); delay = new Delay<Object>(); diff --git a/src/test/java/teetime/framework/YieldStrategyConfiguration.java b/src/test/java/teetime/framework/YieldStrategyConfiguration.java index a44f45bf89d7b818bbef0c59bb4482a3ca471c49..1387bc4191ebc21a75fb02e72ecd43ad5db0dd96 100644 --- a/src/test/java/teetime/framework/YieldStrategyConfiguration.java +++ b/src/test/java/teetime/framework/YieldStrategyConfiguration.java @@ -28,7 +28,7 @@ class YieldStrategyConfiguration extends Configuration { InitialElementProducer<Object> producer = buildProducer(elements); producer.declareActive(); - Stage consumer = buildConsumer(producer); + AbstractStage consumer = buildConsumer(producer); consumer.declareActive(); } diff --git a/src/test/java/teetime/framework/exceptionHandling/TestListener.java b/src/test/java/teetime/framework/exceptionHandling/TestListener.java index 06b32bcd2c3971fe9838776229c0b651ba68fc7e..79f3dd549dccd88715e62d3ecabd3c7d5b3d51aa 100644 --- a/src/test/java/teetime/framework/exceptionHandling/TestListener.java +++ b/src/test/java/teetime/framework/exceptionHandling/TestListener.java @@ -15,7 +15,7 @@ */ package teetime.framework.exceptionHandling; -import teetime.framework.Stage; +import teetime.framework.AbstractStage; public class TestListener extends AbstractExceptionListener { @@ -26,7 +26,7 @@ public class TestListener extends AbstractExceptionListener { private int numExceptionsInvoked; @Override - public FurtherExecution onStageException(final Exception e, final Stage throwingStage) { + public FurtherExecution onStageException(final Exception e, final AbstractStage throwingStage) { numExceptionsInvoked++; if (numExceptionsInvoked == 2) { return FurtherExecution.TERMINATE; diff --git a/src/test/java/teetime/stage/basic/distributor/dynamic/DynamicDistributorTest.java b/src/test/java/teetime/stage/basic/distributor/dynamic/DynamicDistributorTest.java index f8df4b0751389d020d4eb695adac3c5853f7f874..19db569dc688626f31516355db38a5229616ca8b 100644 --- a/src/test/java/teetime/stage/basic/distributor/dynamic/DynamicDistributorTest.java +++ b/src/test/java/teetime/stage/basic/distributor/dynamic/DynamicDistributorTest.java @@ -28,7 +28,7 @@ import org.junit.Test; import teetime.framework.Configuration; import teetime.framework.Execution; import teetime.framework.OutputPort; -import teetime.framework.Stage; +import teetime.framework.AbstractStage; import teetime.stage.CollectorSink; import teetime.stage.InitialElementProducer; import teetime.util.framework.port.PortAction; @@ -117,7 +117,7 @@ public class DynamicDistributorTest { } private void assertValuesForIndex(final PortAction<DynamicDistributor<Integer>> ia, final List<Integer> values) { - Stage stage = ((CreatePortAction<Integer>) ia).getInputPort().getOwningStage(); + AbstractStage stage = ((CreatePortAction<Integer>) ia).getInputPort().getOwningStage(); @SuppressWarnings("unchecked") CollectorSink<Integer> collectorSink = (CollectorSink<Integer>) stage;