diff --git a/src/main/java/teetime/framework/AbstractRunnableStage.java b/src/main/java/teetime/framework/AbstractRunnableStage.java index c5d3eb428f2e4f1e8d6f1c26f25660c69d587232..379181a9d2bf548b0c49d5c7f299952f4165e56c 100644 --- a/src/main/java/teetime/framework/AbstractRunnableStage.java +++ b/src/main/java/teetime/framework/AbstractRunnableStage.java @@ -61,14 +61,7 @@ abstract class AbstractRunnableStage implements Runnable { this.logger.debug("Finished runnable stage. (" + this.stage.getId() + ")"); if (failed) { - if (stage.getTerminationStrategy() == TerminationStrategy.BY_SIGNAL) { - TerminatingSignal signal = new TerminatingSignal(); - // TODO: Check if this is really needed... it seems like signals are passed on after their first arrival - InputPort<?>[] inputPorts = stage.getInputPorts(); - for (int i = 0; i < inputPorts.length; i++) { - stage.onSignal(signal, inputPorts[i]); - } - } + sendTerminatingSignal(); throw new IllegalStateException("Terminated by StageExceptionListener"); } @@ -76,6 +69,16 @@ abstract class AbstractRunnableStage implements Runnable { // stage.owningContext.getThreadCounter().dec(); } + private void sendTerminatingSignal() { + if (stage.getTerminationStrategy() == TerminationStrategy.BY_SIGNAL) { + TerminatingSignal signal = new TerminatingSignal(); + // TODO: Check if this is really needed... it seems like signals are passed on after their first arrival + for (InputPort<?> inputPort : stage.getInputPorts()) { + stage.onSignal(signal, inputPort); + } + } + } + protected abstract void beforeStageExecution() throws InterruptedException; protected abstract void executeStage(); diff --git a/src/main/java/teetime/framework/AbstractStage.java b/src/main/java/teetime/framework/AbstractStage.java index 934a2c2bcdc6f6c332229ac99016eebf6642d63c..e48efb0651d41cd6f0e86911cc6f630e5dab2d07 100644 --- a/src/main/java/teetime/framework/AbstractStage.java +++ b/src/main/java/teetime/framework/AbstractStage.java @@ -15,8 +15,6 @@ */ package teetime.framework; -import java.util.ArrayList; -import java.util.Arrays; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -25,6 +23,8 @@ import teetime.framework.pipe.DummyPipe; import teetime.framework.pipe.IPipe; import teetime.framework.signal.ISignal; import teetime.framework.validation.InvalidPortConnection; +import teetime.util.framework.port.PortList; +import teetime.util.framework.port.PortRemovedListener; public abstract class AbstractStage extends Stage { @@ -32,21 +32,18 @@ public abstract class AbstractStage extends Stage { private final Set<Class<? extends ISignal>> triggeredSignalTypes = new HashSet<Class<? extends ISignal>>(); - private InputPort<?>[] inputPorts = new InputPort<?>[0]; - private OutputPort<?>[] outputPorts = new OutputPort<?>[0]; + private final PortList<InputPort<?>> inputPorts = new PortList<InputPort<?>>(); + private final PortList<OutputPort<?>> outputPorts = new PortList<OutputPort<?>>(); private StageState currentState = StageState.CREATED; - private final Set<OutputPortRemovedListener> outputPortRemovedListeners = new HashSet<OutputPortRemovedListener>(); - private final Set<InputPortRemovedListener> inputPortsRemovedListeners = new HashSet<InputPortRemovedListener>(); - @Override - public InputPort<?>[] getInputPorts() { - return inputPorts; + protected List<InputPort<?>> getInputPorts() { + return inputPorts.getOpenedPorts(); } @Override - protected OutputPort<?>[] getOutputPorts() { - return this.outputPorts; + protected List<OutputPort<?>> getOutputPorts() { + return outputPorts.getOpenedPorts(); } @Override @@ -62,7 +59,7 @@ public abstract class AbstractStage extends Stage { public void onSignal(final ISignal signal, final InputPort<?> inputPort) { if (!this.signalAlreadyReceived(signal, inputPort)) { signal.trigger(this); - for (OutputPort<?> outputPort : outputPorts) { + for (OutputPort<?> outputPort : outputPorts.getOpenedPorts()) { outputPort.sendSignal(signal); } @@ -99,7 +96,7 @@ public abstract class AbstractStage extends Stage { @SuppressWarnings("PMD.DataflowAnomalyAnalysis") private void connectUnconnectedOutputPorts() { - for (OutputPort<?> outputPort : this.outputPorts) { + for (OutputPort<?> outputPort : this.outputPorts.getOpenedPorts()) { if (null == outputPort.getPipe()) { // if port is unconnected if (logger.isInfoEnabled()) { this.logger.info("Unconnected output port: " + outputPort + ". Connecting with a dummy output port."); @@ -189,7 +186,7 @@ public abstract class AbstractStage extends Stage { */ protected <T> InputPort<T> createInputPort(final Class<T> type, final String name) { final InputPort<T> inputPort = new InputPort<T>(type, this, name); - inputPorts = addElementToArray(inputPort, inputPorts); + inputPorts.add(inputPort); return inputPort; } @@ -252,20 +249,14 @@ public abstract class AbstractStage extends Stage { */ protected <T> OutputPort<T> createOutputPort(final Class<T> type, final String name) { final OutputPort<T> outputPort = new OutputPort<T>(type, this, name); - outputPorts = addElementToArray(outputPort, outputPorts); + outputPorts.add(outputPort); return outputPort; } - private <T, E extends T> T[] addElementToArray(final E element, final T[] srcArray) { - T[] newOutputPorts = Arrays.copyOf(srcArray, srcArray.length + 1); - newOutputPorts[srcArray.length] = element; - return newOutputPorts; - } - @SuppressWarnings("PMD.DataflowAnomalyAnalysis") @Override public void validateOutputPorts(final List<InvalidPortConnection> invalidPortConnections) { - for (OutputPort<?> outputPort : outputPorts) { + for (OutputPort<?> outputPort : outputPorts.getOpenedPorts()) { final IPipe pipe = outputPort.getPipe(); final Class<?> sourcePortType = outputPort.getType(); @@ -293,67 +284,33 @@ public abstract class AbstractStage extends Stage { } protected <T> DynamicOutputPort<T> createDynamicOutputPort() { - final DynamicOutputPort<T> outputPort = new DynamicOutputPort<T>(null, this, outputPorts.length); - outputPorts = addElementToArray(outputPort, outputPorts); + final DynamicOutputPort<T> outputPort = new DynamicOutputPort<T>(null, this, outputPorts.size()); + outputPorts.add(outputPort); return outputPort; } protected <T> DynamicInputPort<T> createDynamicInputPort() { - final DynamicInputPort<T> inputPort = new DynamicInputPort<T>(null, this, inputPorts.length); - inputPorts = addElementToArray(inputPort, inputPorts); + final DynamicInputPort<T> inputPort = new DynamicInputPort<T>(null, this, inputPorts.size()); + inputPorts.add(inputPort); return inputPort; } @Override protected void removeDynamicPort(final DynamicOutputPort<?> dynamicOutputPort) { - int index = dynamicOutputPort.getIndex(); - List<OutputPort<?>> tempOutputPorts = new ArrayList<OutputPort<?>>(Arrays.asList(outputPorts)); - OutputPort<?> removedOutputPort = tempOutputPorts.remove(index); - for (int i = index; i < tempOutputPorts.size(); i++) { - OutputPort<?> outputPort = tempOutputPorts.get(i); - if (outputPort instanceof DynamicOutputPort) { - ((DynamicOutputPort<?>) outputPort).setIndex(i); - } - } - outputPorts = tempOutputPorts.toArray(new OutputPort[tempOutputPorts.size()]); - - firePortRemoved(removedOutputPort); + outputPorts.remove(dynamicOutputPort); // TODO update setIndex IF it is still used } - private void firePortRemoved(final OutputPort<?> removedOutputPort) { - for (OutputPortRemovedListener listener : outputPortRemovedListeners) { - listener.onOutputPortRemoved(this, removedOutputPort); - } - } - - protected final void addOutputPortRemovedListener(final OutputPortRemovedListener outputPortRemovedListener) { - outputPortRemovedListeners.add(outputPortRemovedListener); + protected final void addOutputPortRemovedListener(final PortRemovedListener<OutputPort<?>> outputPortRemovedListener) { + outputPorts.addPortRemovedListener(outputPortRemovedListener); } @Override protected void removeDynamicPort(final DynamicInputPort<?> dynamicInputPort) { - int index = dynamicInputPort.getIndex(); - List<InputPort<?>> tempInputPorts = new ArrayList<InputPort<?>>(Arrays.asList(inputPorts)); - InputPort<?> removedInputPort = tempInputPorts.remove(index); - for (int i = index; i < tempInputPorts.size(); i++) { - InputPort<?> inputPort = tempInputPorts.get(i); - if (inputPort instanceof DynamicInputPort) { - ((DynamicInputPort<?>) inputPort).setIndex(i); - } - } - inputPorts = tempInputPorts.toArray(new InputPort[0]); - - firePortRemoved(removedInputPort); - } - - private void firePortRemoved(final InputPort<?> removedInputPort) { - for (InputPortRemovedListener listener : inputPortsRemovedListeners) { - listener.onInputPortRemoved(this, removedInputPort); - } + inputPorts.remove(dynamicInputPort); // TODO update setIndex IF it is still used } - protected final void addInputPortRemovedListener(final InputPortRemovedListener outputPortRemovedListener) { - inputPortsRemovedListeners.add(outputPortRemovedListener); + protected final void addInputPortRemovedListener(final PortRemovedListener<InputPort<?>> inputPortRemovedListener) { + inputPorts.addPortRemovedListener(inputPortRemovedListener); } } diff --git a/src/main/java/teetime/framework/ConfigurationContext.java b/src/main/java/teetime/framework/ConfigurationContext.java index b69624cec422c8578b9ea105af3fe95ad14398d9..a4668f080c466c0a0019e3c881dcaa180b600d86 100644 --- a/src/main/java/teetime/framework/ConfigurationContext.java +++ b/src/main/java/teetime/framework/ConfigurationContext.java @@ -57,7 +57,7 @@ final class ConfigurationContext { * @see AbstractCompositeStage#connectPorts(OutputPort, InputPort, int) */ final <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { - if (sourcePort.getOwningStage().getInputPorts().length == 0) { + if (sourcePort.getOwningStage().getInputPorts().size() == 0) { if (!threadableStages.containsKey(sourcePort.getOwningStage())) { addThreadableStage(sourcePort.getOwningStage(), sourcePort.getOwningStage().getId()); } diff --git a/src/main/java/teetime/framework/DynamicActuator.java b/src/main/java/teetime/framework/DynamicActuator.java index 84b4b419b81ad190869b0392d87966cd354e72b8..acc8e99013e237cc05f366de2d8637613086d5b3 100644 --- a/src/main/java/teetime/framework/DynamicActuator.java +++ b/src/main/java/teetime/framework/DynamicActuator.java @@ -22,7 +22,7 @@ public class DynamicActuator { */ @Deprecated public AbstractRunnableStage wrap(final Stage stage) { - if (stage.getInputPorts().length > 0) { + if (stage.getInputPorts().size() > 0) { return new RunnableConsumerStage(stage); } return new RunnableProducerStage(stage); diff --git a/src/main/java/teetime/framework/Stage.java b/src/main/java/teetime/framework/Stage.java index b7337be19a3612fceb1a90755174f5a803fe5dad..5c1ee1c432b724c688dbb30a4da10bdbcfddd762 100644 --- a/src/main/java/teetime/framework/Stage.java +++ b/src/main/java/teetime/framework/Stage.java @@ -122,9 +122,9 @@ public abstract class Stage { this.owningThread = owningThread; } - protected abstract InputPort<?>[] getInputPorts(); + protected abstract List<InputPort<?>> getInputPorts(); - protected abstract OutputPort<?>[] getOutputPorts(); + protected abstract List<OutputPort<?>> getOutputPorts(); // events diff --git a/src/main/java/teetime/framework/Traversor.java b/src/main/java/teetime/framework/Traversor.java index 7621cd06350064991edf77b6454b582f61c569e9..f8a55067ce527d3c86603170ceaa76d0c76048de 100644 --- a/src/main/java/teetime/framework/Traversor.java +++ b/src/main/java/teetime/framework/Traversor.java @@ -35,8 +35,7 @@ public class Traversor { return; } - OutputPort<?>[] outputPorts = stage.getOutputPorts(); - for (OutputPort<?> outputPort : outputPorts) { + for (OutputPort<?> outputPort : stage.getOutputPorts()) { IPipe pipe = outputPort.getPipe(); if (null != pipe && pipeVisitor.visit(pipe) == VisitorBehavior.CONTINUE) { Stage owningStage = pipe.getTargetPort().getOwningStage(); diff --git a/src/main/java/teetime/framework/signal/ISignal.java b/src/main/java/teetime/framework/signal/ISignal.java index 8a64cf87a4d2bb5b9406c47c12fbe3c01b868399..13ecf747a31ae2d69a8d697267e8b7cf81e51f4b 100644 --- a/src/main/java/teetime/framework/signal/ISignal.java +++ b/src/main/java/teetime/framework/signal/ISignal.java @@ -15,6 +15,7 @@ */ package teetime.framework.signal; +import java.util.List; import java.util.Set; import teetime.framework.InputPort; @@ -25,5 +26,6 @@ public interface ISignal { void trigger(Stage stage); // Only used by the merger so far - boolean mayBeTriggered(Set<InputPort<?>> receivedInputPorts, InputPort<?>[] allInputPorts); + boolean mayBeTriggered(Set<InputPort<?>> receivedInputPorts, List<InputPort<?>> allInputPorts); + } diff --git a/src/main/java/teetime/framework/signal/InitializingSignal.java b/src/main/java/teetime/framework/signal/InitializingSignal.java index 938255333ed448b5ec0c75af85234fbe4941e8ae..5de5d00fdf15cc5b49a1bfe13c2ead05edebfab7 100644 --- a/src/main/java/teetime/framework/signal/InitializingSignal.java +++ b/src/main/java/teetime/framework/signal/InitializingSignal.java @@ -15,6 +15,7 @@ */ package teetime.framework.signal; +import java.util.List; import java.util.Set; import teetime.framework.InputPort; @@ -33,7 +34,7 @@ public final class InitializingSignal extends AbstractSignal { } @Override - public boolean mayBeTriggered(final Set<InputPort<?>> receivedInputPorts, final InputPort<?>[] allInputPorts) { + public boolean mayBeTriggered(final Set<InputPort<?>> receivedInputPorts, final List<InputPort<?>> allInputPorts) { return true; } diff --git a/src/main/java/teetime/framework/signal/StartingSignal.java b/src/main/java/teetime/framework/signal/StartingSignal.java index 0554379e39ea89632e0ab1f23391b841d0cfbec9..375952f009c1506bd02b42c4a69d3094519f6af7 100644 --- a/src/main/java/teetime/framework/signal/StartingSignal.java +++ b/src/main/java/teetime/framework/signal/StartingSignal.java @@ -15,6 +15,7 @@ */ package teetime.framework.signal; +import java.util.List; import java.util.Set; import teetime.framework.InputPort; @@ -33,7 +34,7 @@ public final class StartingSignal extends AbstractSignal { } @Override - public boolean mayBeTriggered(final Set<InputPort<?>> receivedInputPorts, final InputPort<?>[] allInputPorts) { + public boolean mayBeTriggered(final Set<InputPort<?>> receivedInputPorts, final List<InputPort<?>> allInputPorts) { return true; } diff --git a/src/main/java/teetime/framework/signal/TerminatingSignal.java b/src/main/java/teetime/framework/signal/TerminatingSignal.java index ad7ee82d252d8aa312295530ce885dfb6301cc28..5823902b87110544e213e0302c23e967e00ec9f4 100644 --- a/src/main/java/teetime/framework/signal/TerminatingSignal.java +++ b/src/main/java/teetime/framework/signal/TerminatingSignal.java @@ -15,6 +15,7 @@ */ package teetime.framework.signal; +import java.util.List; import java.util.Set; import teetime.framework.InputPort; @@ -33,8 +34,8 @@ public final class TerminatingSignal extends AbstractSignal { } @Override - public boolean mayBeTriggered(final Set<InputPort<?>> receivedInputPorts, final InputPort<?>[] allInputPorts) { - return receivedInputPorts.size() == allInputPorts.length; + public boolean mayBeTriggered(final Set<InputPort<?>> receivedInputPorts, final List<InputPort<?>> allInputPorts) { + return receivedInputPorts.size() == allInputPorts.size(); } } diff --git a/src/main/java/teetime/framework/signal/ValidatingSignal.java b/src/main/java/teetime/framework/signal/ValidatingSignal.java index 599b530d89e5429b9aae0cdc034769db9dfa52da..f85b4ada53900cf175025b1fed541f31b3837125 100644 --- a/src/main/java/teetime/framework/signal/ValidatingSignal.java +++ b/src/main/java/teetime/framework/signal/ValidatingSignal.java @@ -37,7 +37,7 @@ public final class ValidatingSignal implements ISignal { } @Override - public boolean mayBeTriggered(final Set<InputPort<?>> receivedInputPorts, final InputPort<?>[] allInputPorts) { + public boolean mayBeTriggered(final Set<InputPort<?>> receivedInputPorts, final List<InputPort<?>> allInputPorts) { return true; } diff --git a/src/main/java/teetime/stage/basic/distributor/Distributor.java b/src/main/java/teetime/stage/basic/distributor/Distributor.java index cae215321a0c9dc885e0f46e524cb53beb5de1ef..53e2027083729588210703487d4f0fd18d088c7b 100644 --- a/src/main/java/teetime/stage/basic/distributor/Distributor.java +++ b/src/main/java/teetime/stage/basic/distributor/Distributor.java @@ -15,6 +15,8 @@ */ package teetime.stage.basic.distributor; +import java.util.List; + import teetime.framework.AbstractConsumerStage; import teetime.framework.DynamicOutputPort; import teetime.framework.OutputPort; @@ -42,10 +44,9 @@ public class Distributor<T> extends AbstractConsumerStage<T> { addOutputPortRemovedListener(strategy); } - @SuppressWarnings("unchecked") @Override protected void execute(final T element) { - this.strategy.distribute((OutputPort<T>[]) this.getOutputPorts(), element); + this.strategy.distribute(this.getOutputPorts(), element); } public DynamicOutputPort<T> getNewOutputPort() { @@ -61,7 +62,7 @@ public class Distributor<T> extends AbstractConsumerStage<T> { } @Override - public OutputPort<?>[] getOutputPorts() { + public List<OutputPort<?>> getOutputPorts() { // make public return super.getOutputPorts(); } diff --git a/src/main/java/teetime/stage/basic/distributor/dynamic/DynamicDistributor.java b/src/main/java/teetime/stage/basic/distributor/dynamic/DynamicDistributor.java index aa91d05b824ad9fcf7ca668057098a8413b4a296..a05b488265c6ba998148d019a873b85facd7eb4b 100644 --- a/src/main/java/teetime/stage/basic/distributor/dynamic/DynamicDistributor.java +++ b/src/main/java/teetime/stage/basic/distributor/dynamic/DynamicDistributor.java @@ -19,16 +19,15 @@ import java.util.concurrent.BlockingQueue; import teetime.framework.DynamicOutputPort; import teetime.framework.OutputPort; -import teetime.framework.OutputPortRemovedListener; -import teetime.framework.Stage; import teetime.framework.signal.TerminatingSignal; import teetime.stage.basic.distributor.Distributor; import teetime.stage.basic.distributor.strategy.IDistributorStrategy; import teetime.stage.basic.distributor.strategy.RoundRobinStrategy2; import teetime.util.framework.port.PortAction; import teetime.util.framework.port.PortActionHelper; +import teetime.util.framework.port.PortRemovedListener; -public class DynamicDistributor<T> extends Distributor<T> implements OutputPortRemovedListener { +public class DynamicDistributor<T> extends Distributor<T> implements PortRemovedListener<OutputPort<?>> { protected final BlockingQueue<PortAction<DynamicDistributor<T>>> portActions; @@ -66,7 +65,7 @@ public class DynamicDistributor<T> extends Distributor<T> implements OutputPortR } @Override - public void onOutputPortRemoved(final Stage stage, final OutputPort<?> removedOutputPort) { + public void onPortRemoved(final OutputPort<?> removedOutputPort) { removedOutputPort.sendSignal(new TerminatingSignal()); } } diff --git a/src/main/java/teetime/stage/basic/distributor/strategy/CloneStrategy.java b/src/main/java/teetime/stage/basic/distributor/strategy/CloneStrategy.java index c163559816c587b195427d41b1b5835e0008ee7c..94b85ec184810cd5b899b7299fec079aa4f58981 100644 --- a/src/main/java/teetime/stage/basic/distributor/strategy/CloneStrategy.java +++ b/src/main/java/teetime/stage/basic/distributor/strategy/CloneStrategy.java @@ -22,7 +22,6 @@ import java.util.Collection; import java.util.List; import teetime.framework.OutputPort; -import teetime.framework.Stage; /** * @author Nils Christian Ehmke @@ -31,10 +30,12 @@ import teetime.framework.Stage; */ public final class CloneStrategy implements IDistributorStrategy { + @SuppressWarnings("unchecked") @Override - public <T> boolean distribute(final OutputPort<T>[] outputPorts, final T element) { - for (final OutputPort<T> outputPort : outputPorts) { - outputPort.send(clone(element)); + public <T> boolean distribute(final List<OutputPort<?>> outputPorts, final T element) { + for (final OutputPort<?> outputPort : outputPorts) { + T clonedElement = clone(element); + ((OutputPort<T>) outputPort).send(clonedElement); } return true; @@ -111,7 +112,7 @@ public final class CloneStrategy implements IDistributorStrategy { } @Override - public void onOutputPortRemoved(final Stage stage, final OutputPort<?> removedOutputPort) { + public void onPortRemoved(final OutputPort<?> removedOutputPort) { // do nothing } diff --git a/src/main/java/teetime/stage/basic/distributor/strategy/CopyByReferenceStrategy.java b/src/main/java/teetime/stage/basic/distributor/strategy/CopyByReferenceStrategy.java index fc21b06070513f77e9b469bcec0f5c895bb2de9f..f8ef52a24435573a7e1b51b8159112ae4d53b804 100644 --- a/src/main/java/teetime/stage/basic/distributor/strategy/CopyByReferenceStrategy.java +++ b/src/main/java/teetime/stage/basic/distributor/strategy/CopyByReferenceStrategy.java @@ -15,8 +15,9 @@ */ package teetime.stage.basic.distributor.strategy; +import java.util.List; + import teetime.framework.OutputPort; -import teetime.framework.Stage; /** * @author Nils Christian Ehmke @@ -25,17 +26,19 @@ import teetime.framework.Stage; */ public final class CopyByReferenceStrategy implements IDistributorStrategy { + @SuppressWarnings("unchecked") @Override - public <T> boolean distribute(final OutputPort<T>[] outputPorts, final T element) { - for (final OutputPort<T> outputPort : outputPorts) { - outputPort.send(element); + public <T> boolean distribute(final List<OutputPort<?>> outputPorts, final T element) { + for (final OutputPort<?> outputPort : outputPorts) { + ((OutputPort<T>) outputPort).send(element); } return true; } @Override - public void onOutputPortRemoved(final Stage stage, final OutputPort<?> removedOutputPort) { + public void onPortRemoved(final OutputPort<?> removedOutputPort) { // do nothing } + } diff --git a/src/main/java/teetime/stage/basic/distributor/strategy/IDistributorStrategy.java b/src/main/java/teetime/stage/basic/distributor/strategy/IDistributorStrategy.java index b04fd172248e09013e58aaace05555c6dd5d8a0d..c81d9c4c3189a78ea1b8eedf84027919eafeaf2c 100644 --- a/src/main/java/teetime/stage/basic/distributor/strategy/IDistributorStrategy.java +++ b/src/main/java/teetime/stage/basic/distributor/strategy/IDistributorStrategy.java @@ -15,16 +15,18 @@ */ package teetime.stage.basic.distributor.strategy; +import java.util.List; + import teetime.framework.OutputPort; -import teetime.framework.OutputPortRemovedListener; +import teetime.util.framework.port.PortRemovedListener; /** - * @author Nils Christian Ehmke + * @author Nils Christian Ehmke, Christian Wulf * * @since 1.0 */ -public interface IDistributorStrategy extends OutputPortRemovedListener { +public interface IDistributorStrategy extends PortRemovedListener<OutputPort<?>> { - public <T> boolean distribute(final OutputPort<T>[] allOutputPorts, final T element); + public <T> boolean distribute(final List<OutputPort<?>> outputPorts, final T element); } diff --git a/src/main/java/teetime/stage/basic/distributor/strategy/RoundRobinStrategy.java b/src/main/java/teetime/stage/basic/distributor/strategy/RoundRobinStrategy.java index cbb0f3b0fe36ff7ece8942068de22353ef9f2c75..4c5882abfe5e1e033311b374b7e96e4163218792 100644 --- a/src/main/java/teetime/stage/basic/distributor/strategy/RoundRobinStrategy.java +++ b/src/main/java/teetime/stage/basic/distributor/strategy/RoundRobinStrategy.java @@ -15,8 +15,9 @@ */ package teetime.stage.basic.distributor.strategy; +import java.util.List; + import teetime.framework.OutputPort; -import teetime.framework.Stage; import teetime.stage.basic.distributor.Distributor; /** @@ -28,28 +29,29 @@ public final class RoundRobinStrategy implements IDistributorStrategy { private int index; + @SuppressWarnings("unchecked") @Override - public <T> boolean distribute(final OutputPort<T>[] outputPorts, final T element) { - final OutputPort<T> outputPort = this.getNextPortInRoundRobinOrder(outputPorts); + public <T> boolean distribute(final List<OutputPort<?>> outputPorts, final T element) { + final OutputPort<T> outputPort = (OutputPort<T>) this.getNextPortInRoundRobinOrder(outputPorts); outputPort.send(element); return true; } - private <T> OutputPort<T> getNextPortInRoundRobinOrder(final OutputPort<T>[] outputPorts) { - final OutputPort<T> outputPort = outputPorts[this.index]; + private OutputPort<?> getNextPortInRoundRobinOrder(final List<OutputPort<?>> outputPorts) { + final OutputPort<?> outputPort = outputPorts.get(this.index); - this.index = (this.index + 1) % outputPorts.length; + this.index = (this.index + 1) % outputPorts.size(); return outputPort; } @Override - public void onOutputPortRemoved(final Stage stage, final OutputPort<?> removedOutputPort) { - Distributor<?> distributor = (Distributor<?>) stage; + public void onPortRemoved(final OutputPort<?> removedOutputPort) { + Distributor<?> distributor = (Distributor<?>) removedOutputPort.getOwningStage(); // correct the index if it is out-of-bounds - this.index = this.index % distributor.getOutputPorts().length; + this.index = this.index % distributor.getOutputPorts().size(); } } diff --git a/src/main/java/teetime/stage/basic/distributor/strategy/RoundRobinStrategy2.java b/src/main/java/teetime/stage/basic/distributor/strategy/RoundRobinStrategy2.java index df81ac0da6f2ca2d7cc565a26cd6d002fe61c39d..9abe0b0af628f64d48212b6a3d138728ddbf5b0a 100644 --- a/src/main/java/teetime/stage/basic/distributor/strategy/RoundRobinStrategy2.java +++ b/src/main/java/teetime/stage/basic/distributor/strategy/RoundRobinStrategy2.java @@ -15,8 +15,9 @@ */ package teetime.stage.basic.distributor.strategy; +import java.util.List; + import teetime.framework.OutputPort; -import teetime.framework.Stage; import teetime.stage.basic.distributor.Distributor; /** @@ -29,15 +30,16 @@ public final class RoundRobinStrategy2 implements IDistributorStrategy { private int index; private int numWaits; + @SuppressWarnings("unchecked") @Override - public <T> boolean distribute(final OutputPort<T>[] outputPorts, final T element) { - final int numOutputPorts = outputPorts.length; + public <T> boolean distribute(final List<OutputPort<?>> outputPorts, final T element) { + final int numOutputPorts = outputPorts.size(); int numLoops = numOutputPorts; boolean success; OutputPort<T> outputPort; do { - outputPort = getNextPortInRoundRobinOrder(outputPorts); + outputPort = (OutputPort<T>) getNextPortInRoundRobinOrder(outputPorts); success = outputPort.sendNonBlocking(element); if (0 == numLoops) { numWaits++; @@ -60,10 +62,10 @@ public final class RoundRobinStrategy2 implements IDistributorStrategy { // Thread.yield(); } - private <T> OutputPort<T> getNextPortInRoundRobinOrder(final OutputPort<T>[] outputPorts) { - final OutputPort<T> outputPort = outputPorts[this.index]; + private OutputPort<?> getNextPortInRoundRobinOrder(final List<OutputPort<?>> outputPorts) { + final OutputPort<?> outputPort = outputPorts.get(this.index); - this.index = (this.index + 1) % outputPorts.length; + this.index = (this.index + 1) % outputPorts.size(); return outputPort; } @@ -73,10 +75,10 @@ public final class RoundRobinStrategy2 implements IDistributorStrategy { } @Override - public void onOutputPortRemoved(final Stage stage, final OutputPort<?> removedOutputPort) { - Distributor<?> distributor = (Distributor<?>) stage; + public void onPortRemoved(final OutputPort<?> removedOutputPort) { + Distributor<?> distributor = (Distributor<?>) removedOutputPort.getOwningStage(); // correct the index if it is out-of-bounds - this.index = this.index % distributor.getOutputPorts().length; + this.index = this.index % distributor.getOutputPorts().size(); } } diff --git a/src/main/java/teetime/stage/basic/merger/Merger.java b/src/main/java/teetime/stage/basic/merger/Merger.java index c82d290bb5ca46869f7baf737b2cd561a4480a25..2a28fed8b470c970b8a4dca3fca8759f067537de 100644 --- a/src/main/java/teetime/stage/basic/merger/Merger.java +++ b/src/main/java/teetime/stage/basic/merger/Merger.java @@ -17,6 +17,7 @@ package teetime.stage.basic.merger; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; @@ -42,10 +43,10 @@ import teetime.stage.basic.merger.strategy.RoundRobinStrategy; */ public class Merger<T> extends AbstractStage { + private final Map<Class<? extends ISignal>, Set<InputPort<?>>> signalMap; private final OutputPort<T> outputPort = this.createOutputPort(); private final IMergerStrategy strategy; - private final Map<Class<? extends ISignal>, Set<InputPort<?>>> signalMap; public Merger() { this(new RoundRobinStrategy()); @@ -106,10 +107,9 @@ public class Merger<T> extends AbstractStage { return this.strategy; } - @SuppressWarnings("unchecked") @Override - public InputPort<T>[] getInputPorts() { // make public - return (InputPort<T>[]) super.getInputPorts(); + public List<InputPort<?>> getInputPorts() { // make public + return super.getInputPorts(); } public DynamicInputPort<T> getNewInputPort() { diff --git a/src/main/java/teetime/stage/basic/merger/dynamic/RemovePortAction.java b/src/main/java/teetime/stage/basic/merger/dynamic/RemovePortAction.java index 334b8c4f5c061d60c327cc7b9c970e6113ec4ab3..d3fc845ab3389f767faa4bac6601f0f2cbf10e42 100644 --- a/src/main/java/teetime/stage/basic/merger/dynamic/RemovePortAction.java +++ b/src/main/java/teetime/stage/basic/merger/dynamic/RemovePortAction.java @@ -15,6 +15,8 @@ */ package teetime.stage.basic.merger.dynamic; +import java.util.List; + import teetime.framework.DynamicInputPort; import teetime.framework.InputPort; import teetime.util.framework.port.PortAction; @@ -34,8 +36,8 @@ public class RemovePortAction<T> implements PortAction<DynamicMerger<T>> { if (null == inputPort) { // for testing purposes only - InputPort<?>[] inputPorts = ((DynamicMerger<?>) dynamicMerger).getInputPorts(); - inputPortsToRemove = inputPorts[inputPorts.length - 1]; + List<InputPort<?>> inputPorts = ((DynamicMerger<?>) dynamicMerger).getInputPorts(); + inputPortsToRemove = inputPorts.get(inputPorts.size() - 1); } else { inputPortsToRemove = inputPort; } diff --git a/src/main/java/teetime/stage/basic/merger/strategy/BusyWaitingRoundRobinStrategy.java b/src/main/java/teetime/stage/basic/merger/strategy/BusyWaitingRoundRobinStrategy.java index 49a73b0a02dacaff4e27f0d21485c7b91a451161..7bddd425f7225154855eddce5726bc7fcfe7da50 100644 --- a/src/main/java/teetime/stage/basic/merger/strategy/BusyWaitingRoundRobinStrategy.java +++ b/src/main/java/teetime/stage/basic/merger/strategy/BusyWaitingRoundRobinStrategy.java @@ -15,8 +15,9 @@ */ package teetime.stage.basic.merger.strategy; +import java.util.List; + import teetime.framework.InputPort; -import teetime.framework.Stage; import teetime.stage.basic.merger.Merger; /** @@ -30,39 +31,40 @@ public final class BusyWaitingRoundRobinStrategy implements IMergerStrategy { @Override public <T> T getNextInput(final Merger<T> merger) { - final InputPort<T>[] inputPorts = merger.getInputPorts(); - final InputPort<T> inputPort = getOpenInputPort(inputPorts); + final List<InputPort<?>> inputPorts = merger.getInputPorts(); + final InputPort<?> inputPort = getOpenInputPort(inputPorts); if (null == inputPort) { return null; } - final T token = inputPort.receive(); + @SuppressWarnings("unchecked") + final T token = (T) inputPort.receive(); if (null != token) { - this.index = (this.index + 1) % inputPorts.length; + this.index = (this.index + 1) % inputPorts.size(); } return token; } - private <T> InputPort<T> getOpenInputPort(final InputPort<T>[] inputPorts) { + private InputPort<?> getOpenInputPort(final List<InputPort<?>> inputPorts) { final int startedIndex = index; - InputPort<T> inputPort = inputPorts[this.index]; + InputPort<?> inputPort = inputPorts.get(this.index); while (inputPort.isClosed()) { - this.index = (this.index + 1) % inputPorts.length; + this.index = (this.index + 1) % inputPorts.size(); if (index == startedIndex) { return null; } - inputPort = inputPorts[this.index]; + inputPort = inputPorts.get(this.index); } return inputPort; } @Override - public void onInputPortRemoved(final Stage stage, final InputPort<?> removedInputPort) { - Merger<?> merger = (Merger<?>) stage; + public void onPortRemoved(final InputPort<?> removedInputPort) { + Merger<?> merger = (Merger<?>) removedInputPort.getOwningStage(); // correct the index if it is out-of-bounds - this.index = (this.index + 1) % merger.getInputPorts().length; + this.index = (this.index + 1) % merger.getInputPorts().size(); } } diff --git a/src/main/java/teetime/stage/basic/merger/strategy/IMergerStrategy.java b/src/main/java/teetime/stage/basic/merger/strategy/IMergerStrategy.java index 428f9f993f2dbb9903b2e47c14df9872b0d5dead..6d4e6dbcc6ba376065384d7c1b1a78ffe3a3ab7c 100644 --- a/src/main/java/teetime/stage/basic/merger/strategy/IMergerStrategy.java +++ b/src/main/java/teetime/stage/basic/merger/strategy/IMergerStrategy.java @@ -15,15 +15,16 @@ */ package teetime.stage.basic.merger.strategy; -import teetime.framework.InputPortRemovedListener; +import teetime.framework.InputPort; import teetime.stage.basic.merger.Merger; +import teetime.util.framework.port.PortRemovedListener; /** - * @author Nils Christian Ehmke + * @author Nils Christian Ehmke, Christian Wulf * * @since 1.0 */ -public interface IMergerStrategy extends InputPortRemovedListener { +public interface IMergerStrategy extends PortRemovedListener<InputPort<?>> { public <T> T getNextInput(Merger<T> merger); diff --git a/src/main/java/teetime/stage/basic/merger/strategy/RoundRobinStrategy.java b/src/main/java/teetime/stage/basic/merger/strategy/RoundRobinStrategy.java index d67b1b0f2f4032259eced6a72b83e3ee40b23fea..9af8305eee9abf9af422469d0b34a657fb252c68 100644 --- a/src/main/java/teetime/stage/basic/merger/strategy/RoundRobinStrategy.java +++ b/src/main/java/teetime/stage/basic/merger/strategy/RoundRobinStrategy.java @@ -15,8 +15,9 @@ */ package teetime.stage.basic.merger.strategy; +import java.util.List; + import teetime.framework.InputPort; -import teetime.framework.Stage; import teetime.stage.basic.merger.Merger; /** @@ -30,12 +31,13 @@ public final class RoundRobinStrategy implements IMergerStrategy { @Override public <T> T getNextInput(final Merger<T> merger) { - final InputPort<T>[] inputPorts = merger.getInputPorts(); - int size = inputPorts.length; + final List<InputPort<?>> inputPorts = merger.getInputPorts(); + int size = inputPorts.size(); // check each port at most once to avoid a potentially infinite loop while (size-- > 0) { - InputPort<T> inputPort = this.getNextPortInRoundRobinOrder(inputPorts); - final T token = inputPort.receive(); + InputPort<?> inputPort = this.getNextPortInRoundRobinOrder(inputPorts); + @SuppressWarnings("unchecked") + final T token = (T) inputPort.receive(); if (token != null) { return token; } @@ -43,19 +45,19 @@ public final class RoundRobinStrategy implements IMergerStrategy { return null; } - private <T> InputPort<T> getNextPortInRoundRobinOrder(final InputPort<T>[] inputPorts) { - InputPort<T> inputPort = inputPorts[this.index]; + private InputPort<?> getNextPortInRoundRobinOrder(final List<InputPort<?>> inputPorts) { + InputPort<?> inputPort = inputPorts.get(this.index); - this.index = (this.index + 1) % inputPorts.length; + this.index = (this.index + 1) % inputPorts.size(); return inputPort; } @Override - public void onInputPortRemoved(final Stage stage, final InputPort<?> removedInputPort) { - Merger<?> merger = (Merger<?>) stage; + public void onPortRemoved(final InputPort<?> removedInputPort) { + Merger<?> merger = (Merger<?>) removedInputPort.getOwningStage(); // correct the index if it is out-of-bounds - this.index = (this.index + 1) % merger.getInputPorts().length; + this.index = (this.index + 1) % merger.getInputPorts().size(); } } diff --git a/src/main/java/teetime/util/framework/port/PortList.java b/src/main/java/teetime/util/framework/port/PortList.java new file mode 100644 index 0000000000000000000000000000000000000000..a7d14053f609ff06c8fa559fe7e54f6bfa7fc622 --- /dev/null +++ b/src/main/java/teetime/util/framework/port/PortList.java @@ -0,0 +1,62 @@ +package teetime.util.framework.port; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import teetime.framework.AbstractPort; + +public class PortList<T extends AbstractPort<?>> { + + private final List<T> openedPorts = new ArrayList<T>(); + + // private final List<T> closedPorts = new ArrayList<T>(); + + private final Set<PortRemovedListener<T>> portsRemovedListeners = new HashSet<PortRemovedListener<T>>(); + + public List<T> getOpenedPorts() { + return openedPorts; + } + + // public List<T> getClosedPorts() { + // return closedPorts; + // } + + public boolean add(final T port) { + return openedPorts.add(port); + } + + public boolean remove(final T port) { + boolean removed = openedPorts.remove(port); // BETTER remove by index for performance reasons + firePortRemoved(port); + return removed; + } + + public boolean close(final T port) { + boolean removed = remove(port); + // if (removed) { + // boolean added = closedPorts.add(port); + // if (added) { + // return true; + // } + // openedPorts.add(port); + // } + return removed; + } + + public int size() { + return openedPorts.size(); + } + + private void firePortRemoved(final T removedPort) { + for (PortRemovedListener<T> listener : portsRemovedListeners) { + listener.onPortRemoved(removedPort); + } + } + + public void addPortRemovedListener(final PortRemovedListener<T> listener) { + portsRemovedListeners.add(listener); + } + +} diff --git a/src/main/java/teetime/util/framework/port/PortRemovedListener.java b/src/main/java/teetime/util/framework/port/PortRemovedListener.java new file mode 100644 index 0000000000000000000000000000000000000000..e716f4591a8190d54c0c6acbdd2e489ad3c824b2 --- /dev/null +++ b/src/main/java/teetime/util/framework/port/PortRemovedListener.java @@ -0,0 +1,8 @@ +package teetime.util.framework.port; + +import teetime.framework.AbstractPort; + +public interface PortRemovedListener<T extends AbstractPort<?>> { + + void onPortRemoved(T removedPort); +} diff --git a/src/test/java/teetime/framework/TraversorTest.java b/src/test/java/teetime/framework/TraversorTest.java index 60ace19a1ac19234a520581cee2bdfd4cdea09f1..35938bb5a6446fa3b3855989d0e3d915d10096b3 100644 --- a/src/test/java/teetime/framework/TraversorTest.java +++ b/src/test/java/teetime/framework/TraversorTest.java @@ -44,11 +44,14 @@ public class TraversorTest { TestConfiguration tc = new TestConfiguration(); new Execution<TestConfiguration>(tc); traversor.traverse(tc.init); + Set<Stage> comparingStages = new HashSet<Stage>(); comparingStages.add(tc.init); comparingStages.add(tc.f2b); comparingStages.add(tc.distributor); - assertThat(tc.distributor.getOwningThread(), is(not(tc.distributor.getOutputPorts()[0].pipe.getTargetPort().getOwningStage().getOwningThread()))); + + OutputPort<?> distributorOutputPort0 = tc.distributor.getOutputPorts().get(0); + assertThat(tc.distributor.getOwningThread(), is(not(distributorOutputPort0.pipe.getTargetPort().getOwningStage().getOwningThread()))); assertEquals(comparingStages, traversor.getVisitedStage()); }