From 3defde0ac696ef40a7cab298cb2d6a55c2963648 Mon Sep 17 00:00:00 2001 From: Christian Wulf <chw@informatik.uni-kiel.de> Date: Sat, 11 Jul 2015 21:25:10 +0200 Subject: [PATCH] adapted port API; added PortList --- .../framework/AbstractRunnableStage.java | 19 ++-- .../java/teetime/framework/AbstractStage.java | 89 +++++-------------- .../framework/ConfigurationContext.java | 2 +- .../teetime/framework/DynamicActuator.java | 2 +- src/main/java/teetime/framework/Stage.java | 4 +- .../java/teetime/framework/Traversor.java | 3 +- .../teetime/framework/signal/ISignal.java | 4 +- .../framework/signal/InitializingSignal.java | 3 +- .../framework/signal/StartingSignal.java | 3 +- .../framework/signal/TerminatingSignal.java | 5 +- .../framework/signal/ValidatingSignal.java | 2 +- .../stage/basic/distributor/Distributor.java | 7 +- .../dynamic/DynamicDistributor.java | 7 +- .../distributor/strategy/CloneStrategy.java | 11 +-- .../strategy/CopyByReferenceStrategy.java | 13 +-- .../strategy/IDistributorStrategy.java | 10 ++- .../strategy/RoundRobinStrategy.java | 20 +++-- .../strategy/RoundRobinStrategy2.java | 22 ++--- .../teetime/stage/basic/merger/Merger.java | 8 +- .../merger/dynamic/RemovePortAction.java | 6 +- .../BusyWaitingRoundRobinStrategy.java | 26 +++--- .../merger/strategy/IMergerStrategy.java | 7 +- .../merger/strategy/RoundRobinStrategy.java | 24 ++--- .../teetime/util/framework/port/PortList.java | 62 +++++++++++++ .../framework/port/PortRemovedListener.java | 8 ++ .../java/teetime/framework/TraversorTest.java | 5 +- 26 files changed, 213 insertions(+), 159 deletions(-) create mode 100644 src/main/java/teetime/util/framework/port/PortList.java create mode 100644 src/main/java/teetime/util/framework/port/PortRemovedListener.java diff --git a/src/main/java/teetime/framework/AbstractRunnableStage.java b/src/main/java/teetime/framework/AbstractRunnableStage.java index c5d3eb42..379181a9 100644 --- a/src/main/java/teetime/framework/AbstractRunnableStage.java +++ b/src/main/java/teetime/framework/AbstractRunnableStage.java @@ -61,14 +61,7 @@ abstract class AbstractRunnableStage implements Runnable { this.logger.debug("Finished runnable stage. (" + this.stage.getId() + ")"); if (failed) { - if (stage.getTerminationStrategy() == TerminationStrategy.BY_SIGNAL) { - TerminatingSignal signal = new TerminatingSignal(); - // TODO: Check if this is really needed... it seems like signals are passed on after their first arrival - InputPort<?>[] inputPorts = stage.getInputPorts(); - for (int i = 0; i < inputPorts.length; i++) { - stage.onSignal(signal, inputPorts[i]); - } - } + sendTerminatingSignal(); throw new IllegalStateException("Terminated by StageExceptionListener"); } @@ -76,6 +69,16 @@ abstract class AbstractRunnableStage implements Runnable { // stage.owningContext.getThreadCounter().dec(); } + private void sendTerminatingSignal() { + if (stage.getTerminationStrategy() == TerminationStrategy.BY_SIGNAL) { + TerminatingSignal signal = new TerminatingSignal(); + // TODO: Check if this is really needed... it seems like signals are passed on after their first arrival + for (InputPort<?> inputPort : stage.getInputPorts()) { + stage.onSignal(signal, inputPort); + } + } + } + protected abstract void beforeStageExecution() throws InterruptedException; protected abstract void executeStage(); diff --git a/src/main/java/teetime/framework/AbstractStage.java b/src/main/java/teetime/framework/AbstractStage.java index 934a2c2b..e48efb06 100644 --- a/src/main/java/teetime/framework/AbstractStage.java +++ b/src/main/java/teetime/framework/AbstractStage.java @@ -15,8 +15,6 @@ */ package teetime.framework; -import java.util.ArrayList; -import java.util.Arrays; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -25,6 +23,8 @@ import teetime.framework.pipe.DummyPipe; import teetime.framework.pipe.IPipe; import teetime.framework.signal.ISignal; import teetime.framework.validation.InvalidPortConnection; +import teetime.util.framework.port.PortList; +import teetime.util.framework.port.PortRemovedListener; public abstract class AbstractStage extends Stage { @@ -32,21 +32,18 @@ public abstract class AbstractStage extends Stage { private final Set<Class<? extends ISignal>> triggeredSignalTypes = new HashSet<Class<? extends ISignal>>(); - private InputPort<?>[] inputPorts = new InputPort<?>[0]; - private OutputPort<?>[] outputPorts = new OutputPort<?>[0]; + private final PortList<InputPort<?>> inputPorts = new PortList<InputPort<?>>(); + private final PortList<OutputPort<?>> outputPorts = new PortList<OutputPort<?>>(); private StageState currentState = StageState.CREATED; - private final Set<OutputPortRemovedListener> outputPortRemovedListeners = new HashSet<OutputPortRemovedListener>(); - private final Set<InputPortRemovedListener> inputPortsRemovedListeners = new HashSet<InputPortRemovedListener>(); - @Override - public InputPort<?>[] getInputPorts() { - return inputPorts; + protected List<InputPort<?>> getInputPorts() { + return inputPorts.getOpenedPorts(); } @Override - protected OutputPort<?>[] getOutputPorts() { - return this.outputPorts; + protected List<OutputPort<?>> getOutputPorts() { + return outputPorts.getOpenedPorts(); } @Override @@ -62,7 +59,7 @@ public abstract class AbstractStage extends Stage { public void onSignal(final ISignal signal, final InputPort<?> inputPort) { if (!this.signalAlreadyReceived(signal, inputPort)) { signal.trigger(this); - for (OutputPort<?> outputPort : outputPorts) { + for (OutputPort<?> outputPort : outputPorts.getOpenedPorts()) { outputPort.sendSignal(signal); } @@ -99,7 +96,7 @@ public abstract class AbstractStage extends Stage { @SuppressWarnings("PMD.DataflowAnomalyAnalysis") private void connectUnconnectedOutputPorts() { - for (OutputPort<?> outputPort : this.outputPorts) { + for (OutputPort<?> outputPort : this.outputPorts.getOpenedPorts()) { if (null == outputPort.getPipe()) { // if port is unconnected if (logger.isInfoEnabled()) { this.logger.info("Unconnected output port: " + outputPort + ". Connecting with a dummy output port."); @@ -189,7 +186,7 @@ public abstract class AbstractStage extends Stage { */ protected <T> InputPort<T> createInputPort(final Class<T> type, final String name) { final InputPort<T> inputPort = new InputPort<T>(type, this, name); - inputPorts = addElementToArray(inputPort, inputPorts); + inputPorts.add(inputPort); return inputPort; } @@ -252,20 +249,14 @@ public abstract class AbstractStage extends Stage { */ protected <T> OutputPort<T> createOutputPort(final Class<T> type, final String name) { final OutputPort<T> outputPort = new OutputPort<T>(type, this, name); - outputPorts = addElementToArray(outputPort, outputPorts); + outputPorts.add(outputPort); return outputPort; } - private <T, E extends T> T[] addElementToArray(final E element, final T[] srcArray) { - T[] newOutputPorts = Arrays.copyOf(srcArray, srcArray.length + 1); - newOutputPorts[srcArray.length] = element; - return newOutputPorts; - } - @SuppressWarnings("PMD.DataflowAnomalyAnalysis") @Override public void validateOutputPorts(final List<InvalidPortConnection> invalidPortConnections) { - for (OutputPort<?> outputPort : outputPorts) { + for (OutputPort<?> outputPort : outputPorts.getOpenedPorts()) { final IPipe pipe = outputPort.getPipe(); final Class<?> sourcePortType = outputPort.getType(); @@ -293,67 +284,33 @@ public abstract class AbstractStage extends Stage { } protected <T> DynamicOutputPort<T> createDynamicOutputPort() { - final DynamicOutputPort<T> outputPort = new DynamicOutputPort<T>(null, this, outputPorts.length); - outputPorts = addElementToArray(outputPort, outputPorts); + final DynamicOutputPort<T> outputPort = new DynamicOutputPort<T>(null, this, outputPorts.size()); + outputPorts.add(outputPort); return outputPort; } protected <T> DynamicInputPort<T> createDynamicInputPort() { - final DynamicInputPort<T> inputPort = new DynamicInputPort<T>(null, this, inputPorts.length); - inputPorts = addElementToArray(inputPort, inputPorts); + final DynamicInputPort<T> inputPort = new DynamicInputPort<T>(null, this, inputPorts.size()); + inputPorts.add(inputPort); return inputPort; } @Override protected void removeDynamicPort(final DynamicOutputPort<?> dynamicOutputPort) { - int index = dynamicOutputPort.getIndex(); - List<OutputPort<?>> tempOutputPorts = new ArrayList<OutputPort<?>>(Arrays.asList(outputPorts)); - OutputPort<?> removedOutputPort = tempOutputPorts.remove(index); - for (int i = index; i < tempOutputPorts.size(); i++) { - OutputPort<?> outputPort = tempOutputPorts.get(i); - if (outputPort instanceof DynamicOutputPort) { - ((DynamicOutputPort<?>) outputPort).setIndex(i); - } - } - outputPorts = tempOutputPorts.toArray(new OutputPort[tempOutputPorts.size()]); - - firePortRemoved(removedOutputPort); + outputPorts.remove(dynamicOutputPort); // TODO update setIndex IF it is still used } - private void firePortRemoved(final OutputPort<?> removedOutputPort) { - for (OutputPortRemovedListener listener : outputPortRemovedListeners) { - listener.onOutputPortRemoved(this, removedOutputPort); - } - } - - protected final void addOutputPortRemovedListener(final OutputPortRemovedListener outputPortRemovedListener) { - outputPortRemovedListeners.add(outputPortRemovedListener); + protected final void addOutputPortRemovedListener(final PortRemovedListener<OutputPort<?>> outputPortRemovedListener) { + outputPorts.addPortRemovedListener(outputPortRemovedListener); } @Override protected void removeDynamicPort(final DynamicInputPort<?> dynamicInputPort) { - int index = dynamicInputPort.getIndex(); - List<InputPort<?>> tempInputPorts = new ArrayList<InputPort<?>>(Arrays.asList(inputPorts)); - InputPort<?> removedInputPort = tempInputPorts.remove(index); - for (int i = index; i < tempInputPorts.size(); i++) { - InputPort<?> inputPort = tempInputPorts.get(i); - if (inputPort instanceof DynamicInputPort) { - ((DynamicInputPort<?>) inputPort).setIndex(i); - } - } - inputPorts = tempInputPorts.toArray(new InputPort[0]); - - firePortRemoved(removedInputPort); - } - - private void firePortRemoved(final InputPort<?> removedInputPort) { - for (InputPortRemovedListener listener : inputPortsRemovedListeners) { - listener.onInputPortRemoved(this, removedInputPort); - } + inputPorts.remove(dynamicInputPort); // TODO update setIndex IF it is still used } - protected final void addInputPortRemovedListener(final InputPortRemovedListener outputPortRemovedListener) { - inputPortsRemovedListeners.add(outputPortRemovedListener); + protected final void addInputPortRemovedListener(final PortRemovedListener<InputPort<?>> inputPortRemovedListener) { + inputPorts.addPortRemovedListener(inputPortRemovedListener); } } diff --git a/src/main/java/teetime/framework/ConfigurationContext.java b/src/main/java/teetime/framework/ConfigurationContext.java index b69624ce..a4668f08 100644 --- a/src/main/java/teetime/framework/ConfigurationContext.java +++ b/src/main/java/teetime/framework/ConfigurationContext.java @@ -57,7 +57,7 @@ final class ConfigurationContext { * @see AbstractCompositeStage#connectPorts(OutputPort, InputPort, int) */ final <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { - if (sourcePort.getOwningStage().getInputPorts().length == 0) { + if (sourcePort.getOwningStage().getInputPorts().size() == 0) { if (!threadableStages.containsKey(sourcePort.getOwningStage())) { addThreadableStage(sourcePort.getOwningStage(), sourcePort.getOwningStage().getId()); } diff --git a/src/main/java/teetime/framework/DynamicActuator.java b/src/main/java/teetime/framework/DynamicActuator.java index 84b4b419..acc8e990 100644 --- a/src/main/java/teetime/framework/DynamicActuator.java +++ b/src/main/java/teetime/framework/DynamicActuator.java @@ -22,7 +22,7 @@ public class DynamicActuator { */ @Deprecated public AbstractRunnableStage wrap(final Stage stage) { - if (stage.getInputPorts().length > 0) { + if (stage.getInputPorts().size() > 0) { return new RunnableConsumerStage(stage); } return new RunnableProducerStage(stage); diff --git a/src/main/java/teetime/framework/Stage.java b/src/main/java/teetime/framework/Stage.java index b7337be1..5c1ee1c4 100644 --- a/src/main/java/teetime/framework/Stage.java +++ b/src/main/java/teetime/framework/Stage.java @@ -122,9 +122,9 @@ public abstract class Stage { this.owningThread = owningThread; } - protected abstract InputPort<?>[] getInputPorts(); + protected abstract List<InputPort<?>> getInputPorts(); - protected abstract OutputPort<?>[] getOutputPorts(); + protected abstract List<OutputPort<?>> getOutputPorts(); // events diff --git a/src/main/java/teetime/framework/Traversor.java b/src/main/java/teetime/framework/Traversor.java index 7621cd06..f8a55067 100644 --- a/src/main/java/teetime/framework/Traversor.java +++ b/src/main/java/teetime/framework/Traversor.java @@ -35,8 +35,7 @@ public class Traversor { return; } - OutputPort<?>[] outputPorts = stage.getOutputPorts(); - for (OutputPort<?> outputPort : outputPorts) { + for (OutputPort<?> outputPort : stage.getOutputPorts()) { IPipe pipe = outputPort.getPipe(); if (null != pipe && pipeVisitor.visit(pipe) == VisitorBehavior.CONTINUE) { Stage owningStage = pipe.getTargetPort().getOwningStage(); diff --git a/src/main/java/teetime/framework/signal/ISignal.java b/src/main/java/teetime/framework/signal/ISignal.java index 8a64cf87..13ecf747 100644 --- a/src/main/java/teetime/framework/signal/ISignal.java +++ b/src/main/java/teetime/framework/signal/ISignal.java @@ -15,6 +15,7 @@ */ package teetime.framework.signal; +import java.util.List; import java.util.Set; import teetime.framework.InputPort; @@ -25,5 +26,6 @@ public interface ISignal { void trigger(Stage stage); // Only used by the merger so far - boolean mayBeTriggered(Set<InputPort<?>> receivedInputPorts, InputPort<?>[] allInputPorts); + boolean mayBeTriggered(Set<InputPort<?>> receivedInputPorts, List<InputPort<?>> allInputPorts); + } diff --git a/src/main/java/teetime/framework/signal/InitializingSignal.java b/src/main/java/teetime/framework/signal/InitializingSignal.java index 93825533..5de5d00f 100644 --- a/src/main/java/teetime/framework/signal/InitializingSignal.java +++ b/src/main/java/teetime/framework/signal/InitializingSignal.java @@ -15,6 +15,7 @@ */ package teetime.framework.signal; +import java.util.List; import java.util.Set; import teetime.framework.InputPort; @@ -33,7 +34,7 @@ public final class InitializingSignal extends AbstractSignal { } @Override - public boolean mayBeTriggered(final Set<InputPort<?>> receivedInputPorts, final InputPort<?>[] allInputPorts) { + public boolean mayBeTriggered(final Set<InputPort<?>> receivedInputPorts, final List<InputPort<?>> allInputPorts) { return true; } diff --git a/src/main/java/teetime/framework/signal/StartingSignal.java b/src/main/java/teetime/framework/signal/StartingSignal.java index 0554379e..375952f0 100644 --- a/src/main/java/teetime/framework/signal/StartingSignal.java +++ b/src/main/java/teetime/framework/signal/StartingSignal.java @@ -15,6 +15,7 @@ */ package teetime.framework.signal; +import java.util.List; import java.util.Set; import teetime.framework.InputPort; @@ -33,7 +34,7 @@ public final class StartingSignal extends AbstractSignal { } @Override - public boolean mayBeTriggered(final Set<InputPort<?>> receivedInputPorts, final InputPort<?>[] allInputPorts) { + public boolean mayBeTriggered(final Set<InputPort<?>> receivedInputPorts, final List<InputPort<?>> allInputPorts) { return true; } diff --git a/src/main/java/teetime/framework/signal/TerminatingSignal.java b/src/main/java/teetime/framework/signal/TerminatingSignal.java index ad7ee82d..5823902b 100644 --- a/src/main/java/teetime/framework/signal/TerminatingSignal.java +++ b/src/main/java/teetime/framework/signal/TerminatingSignal.java @@ -15,6 +15,7 @@ */ package teetime.framework.signal; +import java.util.List; import java.util.Set; import teetime.framework.InputPort; @@ -33,8 +34,8 @@ public final class TerminatingSignal extends AbstractSignal { } @Override - public boolean mayBeTriggered(final Set<InputPort<?>> receivedInputPorts, final InputPort<?>[] allInputPorts) { - return receivedInputPorts.size() == allInputPorts.length; + public boolean mayBeTriggered(final Set<InputPort<?>> receivedInputPorts, final List<InputPort<?>> allInputPorts) { + return receivedInputPorts.size() == allInputPorts.size(); } } diff --git a/src/main/java/teetime/framework/signal/ValidatingSignal.java b/src/main/java/teetime/framework/signal/ValidatingSignal.java index 599b530d..f85b4ada 100644 --- a/src/main/java/teetime/framework/signal/ValidatingSignal.java +++ b/src/main/java/teetime/framework/signal/ValidatingSignal.java @@ -37,7 +37,7 @@ public final class ValidatingSignal implements ISignal { } @Override - public boolean mayBeTriggered(final Set<InputPort<?>> receivedInputPorts, final InputPort<?>[] allInputPorts) { + public boolean mayBeTriggered(final Set<InputPort<?>> receivedInputPorts, final List<InputPort<?>> allInputPorts) { return true; } diff --git a/src/main/java/teetime/stage/basic/distributor/Distributor.java b/src/main/java/teetime/stage/basic/distributor/Distributor.java index cae21532..53e20270 100644 --- a/src/main/java/teetime/stage/basic/distributor/Distributor.java +++ b/src/main/java/teetime/stage/basic/distributor/Distributor.java @@ -15,6 +15,8 @@ */ package teetime.stage.basic.distributor; +import java.util.List; + import teetime.framework.AbstractConsumerStage; import teetime.framework.DynamicOutputPort; import teetime.framework.OutputPort; @@ -42,10 +44,9 @@ public class Distributor<T> extends AbstractConsumerStage<T> { addOutputPortRemovedListener(strategy); } - @SuppressWarnings("unchecked") @Override protected void execute(final T element) { - this.strategy.distribute((OutputPort<T>[]) this.getOutputPorts(), element); + this.strategy.distribute(this.getOutputPorts(), element); } public DynamicOutputPort<T> getNewOutputPort() { @@ -61,7 +62,7 @@ public class Distributor<T> extends AbstractConsumerStage<T> { } @Override - public OutputPort<?>[] getOutputPorts() { + public List<OutputPort<?>> getOutputPorts() { // make public return super.getOutputPorts(); } diff --git a/src/main/java/teetime/stage/basic/distributor/dynamic/DynamicDistributor.java b/src/main/java/teetime/stage/basic/distributor/dynamic/DynamicDistributor.java index aa91d05b..a05b4882 100644 --- a/src/main/java/teetime/stage/basic/distributor/dynamic/DynamicDistributor.java +++ b/src/main/java/teetime/stage/basic/distributor/dynamic/DynamicDistributor.java @@ -19,16 +19,15 @@ import java.util.concurrent.BlockingQueue; import teetime.framework.DynamicOutputPort; import teetime.framework.OutputPort; -import teetime.framework.OutputPortRemovedListener; -import teetime.framework.Stage; import teetime.framework.signal.TerminatingSignal; import teetime.stage.basic.distributor.Distributor; import teetime.stage.basic.distributor.strategy.IDistributorStrategy; import teetime.stage.basic.distributor.strategy.RoundRobinStrategy2; import teetime.util.framework.port.PortAction; import teetime.util.framework.port.PortActionHelper; +import teetime.util.framework.port.PortRemovedListener; -public class DynamicDistributor<T> extends Distributor<T> implements OutputPortRemovedListener { +public class DynamicDistributor<T> extends Distributor<T> implements PortRemovedListener<OutputPort<?>> { protected final BlockingQueue<PortAction<DynamicDistributor<T>>> portActions; @@ -66,7 +65,7 @@ public class DynamicDistributor<T> extends Distributor<T> implements OutputPortR } @Override - public void onOutputPortRemoved(final Stage stage, final OutputPort<?> removedOutputPort) { + public void onPortRemoved(final OutputPort<?> removedOutputPort) { removedOutputPort.sendSignal(new TerminatingSignal()); } } diff --git a/src/main/java/teetime/stage/basic/distributor/strategy/CloneStrategy.java b/src/main/java/teetime/stage/basic/distributor/strategy/CloneStrategy.java index c1635598..94b85ec1 100644 --- a/src/main/java/teetime/stage/basic/distributor/strategy/CloneStrategy.java +++ b/src/main/java/teetime/stage/basic/distributor/strategy/CloneStrategy.java @@ -22,7 +22,6 @@ import java.util.Collection; import java.util.List; import teetime.framework.OutputPort; -import teetime.framework.Stage; /** * @author Nils Christian Ehmke @@ -31,10 +30,12 @@ import teetime.framework.Stage; */ public final class CloneStrategy implements IDistributorStrategy { + @SuppressWarnings("unchecked") @Override - public <T> boolean distribute(final OutputPort<T>[] outputPorts, final T element) { - for (final OutputPort<T> outputPort : outputPorts) { - outputPort.send(clone(element)); + public <T> boolean distribute(final List<OutputPort<?>> outputPorts, final T element) { + for (final OutputPort<?> outputPort : outputPorts) { + T clonedElement = clone(element); + ((OutputPort<T>) outputPort).send(clonedElement); } return true; @@ -111,7 +112,7 @@ public final class CloneStrategy implements IDistributorStrategy { } @Override - public void onOutputPortRemoved(final Stage stage, final OutputPort<?> removedOutputPort) { + public void onPortRemoved(final OutputPort<?> removedOutputPort) { // do nothing } diff --git a/src/main/java/teetime/stage/basic/distributor/strategy/CopyByReferenceStrategy.java b/src/main/java/teetime/stage/basic/distributor/strategy/CopyByReferenceStrategy.java index fc21b060..f8ef52a2 100644 --- a/src/main/java/teetime/stage/basic/distributor/strategy/CopyByReferenceStrategy.java +++ b/src/main/java/teetime/stage/basic/distributor/strategy/CopyByReferenceStrategy.java @@ -15,8 +15,9 @@ */ package teetime.stage.basic.distributor.strategy; +import java.util.List; + import teetime.framework.OutputPort; -import teetime.framework.Stage; /** * @author Nils Christian Ehmke @@ -25,17 +26,19 @@ import teetime.framework.Stage; */ public final class CopyByReferenceStrategy implements IDistributorStrategy { + @SuppressWarnings("unchecked") @Override - public <T> boolean distribute(final OutputPort<T>[] outputPorts, final T element) { - for (final OutputPort<T> outputPort : outputPorts) { - outputPort.send(element); + public <T> boolean distribute(final List<OutputPort<?>> outputPorts, final T element) { + for (final OutputPort<?> outputPort : outputPorts) { + ((OutputPort<T>) outputPort).send(element); } return true; } @Override - public void onOutputPortRemoved(final Stage stage, final OutputPort<?> removedOutputPort) { + public void onPortRemoved(final OutputPort<?> removedOutputPort) { // do nothing } + } diff --git a/src/main/java/teetime/stage/basic/distributor/strategy/IDistributorStrategy.java b/src/main/java/teetime/stage/basic/distributor/strategy/IDistributorStrategy.java index b04fd172..c81d9c4c 100644 --- a/src/main/java/teetime/stage/basic/distributor/strategy/IDistributorStrategy.java +++ b/src/main/java/teetime/stage/basic/distributor/strategy/IDistributorStrategy.java @@ -15,16 +15,18 @@ */ package teetime.stage.basic.distributor.strategy; +import java.util.List; + import teetime.framework.OutputPort; -import teetime.framework.OutputPortRemovedListener; +import teetime.util.framework.port.PortRemovedListener; /** - * @author Nils Christian Ehmke + * @author Nils Christian Ehmke, Christian Wulf * * @since 1.0 */ -public interface IDistributorStrategy extends OutputPortRemovedListener { +public interface IDistributorStrategy extends PortRemovedListener<OutputPort<?>> { - public <T> boolean distribute(final OutputPort<T>[] allOutputPorts, final T element); + public <T> boolean distribute(final List<OutputPort<?>> outputPorts, final T element); } diff --git a/src/main/java/teetime/stage/basic/distributor/strategy/RoundRobinStrategy.java b/src/main/java/teetime/stage/basic/distributor/strategy/RoundRobinStrategy.java index cbb0f3b0..4c5882ab 100644 --- a/src/main/java/teetime/stage/basic/distributor/strategy/RoundRobinStrategy.java +++ b/src/main/java/teetime/stage/basic/distributor/strategy/RoundRobinStrategy.java @@ -15,8 +15,9 @@ */ package teetime.stage.basic.distributor.strategy; +import java.util.List; + import teetime.framework.OutputPort; -import teetime.framework.Stage; import teetime.stage.basic.distributor.Distributor; /** @@ -28,28 +29,29 @@ public final class RoundRobinStrategy implements IDistributorStrategy { private int index; + @SuppressWarnings("unchecked") @Override - public <T> boolean distribute(final OutputPort<T>[] outputPorts, final T element) { - final OutputPort<T> outputPort = this.getNextPortInRoundRobinOrder(outputPorts); + public <T> boolean distribute(final List<OutputPort<?>> outputPorts, final T element) { + final OutputPort<T> outputPort = (OutputPort<T>) this.getNextPortInRoundRobinOrder(outputPorts); outputPort.send(element); return true; } - private <T> OutputPort<T> getNextPortInRoundRobinOrder(final OutputPort<T>[] outputPorts) { - final OutputPort<T> outputPort = outputPorts[this.index]; + private OutputPort<?> getNextPortInRoundRobinOrder(final List<OutputPort<?>> outputPorts) { + final OutputPort<?> outputPort = outputPorts.get(this.index); - this.index = (this.index + 1) % outputPorts.length; + this.index = (this.index + 1) % outputPorts.size(); return outputPort; } @Override - public void onOutputPortRemoved(final Stage stage, final OutputPort<?> removedOutputPort) { - Distributor<?> distributor = (Distributor<?>) stage; + public void onPortRemoved(final OutputPort<?> removedOutputPort) { + Distributor<?> distributor = (Distributor<?>) removedOutputPort.getOwningStage(); // correct the index if it is out-of-bounds - this.index = this.index % distributor.getOutputPorts().length; + this.index = this.index % distributor.getOutputPorts().size(); } } diff --git a/src/main/java/teetime/stage/basic/distributor/strategy/RoundRobinStrategy2.java b/src/main/java/teetime/stage/basic/distributor/strategy/RoundRobinStrategy2.java index df81ac0d..9abe0b0a 100644 --- a/src/main/java/teetime/stage/basic/distributor/strategy/RoundRobinStrategy2.java +++ b/src/main/java/teetime/stage/basic/distributor/strategy/RoundRobinStrategy2.java @@ -15,8 +15,9 @@ */ package teetime.stage.basic.distributor.strategy; +import java.util.List; + import teetime.framework.OutputPort; -import teetime.framework.Stage; import teetime.stage.basic.distributor.Distributor; /** @@ -29,15 +30,16 @@ public final class RoundRobinStrategy2 implements IDistributorStrategy { private int index; private int numWaits; + @SuppressWarnings("unchecked") @Override - public <T> boolean distribute(final OutputPort<T>[] outputPorts, final T element) { - final int numOutputPorts = outputPorts.length; + public <T> boolean distribute(final List<OutputPort<?>> outputPorts, final T element) { + final int numOutputPorts = outputPorts.size(); int numLoops = numOutputPorts; boolean success; OutputPort<T> outputPort; do { - outputPort = getNextPortInRoundRobinOrder(outputPorts); + outputPort = (OutputPort<T>) getNextPortInRoundRobinOrder(outputPorts); success = outputPort.sendNonBlocking(element); if (0 == numLoops) { numWaits++; @@ -60,10 +62,10 @@ public final class RoundRobinStrategy2 implements IDistributorStrategy { // Thread.yield(); } - private <T> OutputPort<T> getNextPortInRoundRobinOrder(final OutputPort<T>[] outputPorts) { - final OutputPort<T> outputPort = outputPorts[this.index]; + private OutputPort<?> getNextPortInRoundRobinOrder(final List<OutputPort<?>> outputPorts) { + final OutputPort<?> outputPort = outputPorts.get(this.index); - this.index = (this.index + 1) % outputPorts.length; + this.index = (this.index + 1) % outputPorts.size(); return outputPort; } @@ -73,10 +75,10 @@ public final class RoundRobinStrategy2 implements IDistributorStrategy { } @Override - public void onOutputPortRemoved(final Stage stage, final OutputPort<?> removedOutputPort) { - Distributor<?> distributor = (Distributor<?>) stage; + public void onPortRemoved(final OutputPort<?> removedOutputPort) { + Distributor<?> distributor = (Distributor<?>) removedOutputPort.getOwningStage(); // correct the index if it is out-of-bounds - this.index = this.index % distributor.getOutputPorts().length; + this.index = this.index % distributor.getOutputPorts().size(); } } diff --git a/src/main/java/teetime/stage/basic/merger/Merger.java b/src/main/java/teetime/stage/basic/merger/Merger.java index c82d290b..2a28fed8 100644 --- a/src/main/java/teetime/stage/basic/merger/Merger.java +++ b/src/main/java/teetime/stage/basic/merger/Merger.java @@ -17,6 +17,7 @@ package teetime.stage.basic.merger; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; @@ -42,10 +43,10 @@ import teetime.stage.basic.merger.strategy.RoundRobinStrategy; */ public class Merger<T> extends AbstractStage { + private final Map<Class<? extends ISignal>, Set<InputPort<?>>> signalMap; private final OutputPort<T> outputPort = this.createOutputPort(); private final IMergerStrategy strategy; - private final Map<Class<? extends ISignal>, Set<InputPort<?>>> signalMap; public Merger() { this(new RoundRobinStrategy()); @@ -106,10 +107,9 @@ public class Merger<T> extends AbstractStage { return this.strategy; } - @SuppressWarnings("unchecked") @Override - public InputPort<T>[] getInputPorts() { // make public - return (InputPort<T>[]) super.getInputPorts(); + public List<InputPort<?>> getInputPorts() { // make public + return super.getInputPorts(); } public DynamicInputPort<T> getNewInputPort() { diff --git a/src/main/java/teetime/stage/basic/merger/dynamic/RemovePortAction.java b/src/main/java/teetime/stage/basic/merger/dynamic/RemovePortAction.java index 334b8c4f..d3fc845a 100644 --- a/src/main/java/teetime/stage/basic/merger/dynamic/RemovePortAction.java +++ b/src/main/java/teetime/stage/basic/merger/dynamic/RemovePortAction.java @@ -15,6 +15,8 @@ */ package teetime.stage.basic.merger.dynamic; +import java.util.List; + import teetime.framework.DynamicInputPort; import teetime.framework.InputPort; import teetime.util.framework.port.PortAction; @@ -34,8 +36,8 @@ public class RemovePortAction<T> implements PortAction<DynamicMerger<T>> { if (null == inputPort) { // for testing purposes only - InputPort<?>[] inputPorts = ((DynamicMerger<?>) dynamicMerger).getInputPorts(); - inputPortsToRemove = inputPorts[inputPorts.length - 1]; + List<InputPort<?>> inputPorts = ((DynamicMerger<?>) dynamicMerger).getInputPorts(); + inputPortsToRemove = inputPorts.get(inputPorts.size() - 1); } else { inputPortsToRemove = inputPort; } diff --git a/src/main/java/teetime/stage/basic/merger/strategy/BusyWaitingRoundRobinStrategy.java b/src/main/java/teetime/stage/basic/merger/strategy/BusyWaitingRoundRobinStrategy.java index 49a73b0a..7bddd425 100644 --- a/src/main/java/teetime/stage/basic/merger/strategy/BusyWaitingRoundRobinStrategy.java +++ b/src/main/java/teetime/stage/basic/merger/strategy/BusyWaitingRoundRobinStrategy.java @@ -15,8 +15,9 @@ */ package teetime.stage.basic.merger.strategy; +import java.util.List; + import teetime.framework.InputPort; -import teetime.framework.Stage; import teetime.stage.basic.merger.Merger; /** @@ -30,39 +31,40 @@ public final class BusyWaitingRoundRobinStrategy implements IMergerStrategy { @Override public <T> T getNextInput(final Merger<T> merger) { - final InputPort<T>[] inputPorts = merger.getInputPorts(); - final InputPort<T> inputPort = getOpenInputPort(inputPorts); + final List<InputPort<?>> inputPorts = merger.getInputPorts(); + final InputPort<?> inputPort = getOpenInputPort(inputPorts); if (null == inputPort) { return null; } - final T token = inputPort.receive(); + @SuppressWarnings("unchecked") + final T token = (T) inputPort.receive(); if (null != token) { - this.index = (this.index + 1) % inputPorts.length; + this.index = (this.index + 1) % inputPorts.size(); } return token; } - private <T> InputPort<T> getOpenInputPort(final InputPort<T>[] inputPorts) { + private InputPort<?> getOpenInputPort(final List<InputPort<?>> inputPorts) { final int startedIndex = index; - InputPort<T> inputPort = inputPorts[this.index]; + InputPort<?> inputPort = inputPorts.get(this.index); while (inputPort.isClosed()) { - this.index = (this.index + 1) % inputPorts.length; + this.index = (this.index + 1) % inputPorts.size(); if (index == startedIndex) { return null; } - inputPort = inputPorts[this.index]; + inputPort = inputPorts.get(this.index); } return inputPort; } @Override - public void onInputPortRemoved(final Stage stage, final InputPort<?> removedInputPort) { - Merger<?> merger = (Merger<?>) stage; + public void onPortRemoved(final InputPort<?> removedInputPort) { + Merger<?> merger = (Merger<?>) removedInputPort.getOwningStage(); // correct the index if it is out-of-bounds - this.index = (this.index + 1) % merger.getInputPorts().length; + this.index = (this.index + 1) % merger.getInputPorts().size(); } } diff --git a/src/main/java/teetime/stage/basic/merger/strategy/IMergerStrategy.java b/src/main/java/teetime/stage/basic/merger/strategy/IMergerStrategy.java index 428f9f99..6d4e6dbc 100644 --- a/src/main/java/teetime/stage/basic/merger/strategy/IMergerStrategy.java +++ b/src/main/java/teetime/stage/basic/merger/strategy/IMergerStrategy.java @@ -15,15 +15,16 @@ */ package teetime.stage.basic.merger.strategy; -import teetime.framework.InputPortRemovedListener; +import teetime.framework.InputPort; import teetime.stage.basic.merger.Merger; +import teetime.util.framework.port.PortRemovedListener; /** - * @author Nils Christian Ehmke + * @author Nils Christian Ehmke, Christian Wulf * * @since 1.0 */ -public interface IMergerStrategy extends InputPortRemovedListener { +public interface IMergerStrategy extends PortRemovedListener<InputPort<?>> { public <T> T getNextInput(Merger<T> merger); diff --git a/src/main/java/teetime/stage/basic/merger/strategy/RoundRobinStrategy.java b/src/main/java/teetime/stage/basic/merger/strategy/RoundRobinStrategy.java index d67b1b0f..9af8305e 100644 --- a/src/main/java/teetime/stage/basic/merger/strategy/RoundRobinStrategy.java +++ b/src/main/java/teetime/stage/basic/merger/strategy/RoundRobinStrategy.java @@ -15,8 +15,9 @@ */ package teetime.stage.basic.merger.strategy; +import java.util.List; + import teetime.framework.InputPort; -import teetime.framework.Stage; import teetime.stage.basic.merger.Merger; /** @@ -30,12 +31,13 @@ public final class RoundRobinStrategy implements IMergerStrategy { @Override public <T> T getNextInput(final Merger<T> merger) { - final InputPort<T>[] inputPorts = merger.getInputPorts(); - int size = inputPorts.length; + final List<InputPort<?>> inputPorts = merger.getInputPorts(); + int size = inputPorts.size(); // check each port at most once to avoid a potentially infinite loop while (size-- > 0) { - InputPort<T> inputPort = this.getNextPortInRoundRobinOrder(inputPorts); - final T token = inputPort.receive(); + InputPort<?> inputPort = this.getNextPortInRoundRobinOrder(inputPorts); + @SuppressWarnings("unchecked") + final T token = (T) inputPort.receive(); if (token != null) { return token; } @@ -43,19 +45,19 @@ public final class RoundRobinStrategy implements IMergerStrategy { return null; } - private <T> InputPort<T> getNextPortInRoundRobinOrder(final InputPort<T>[] inputPorts) { - InputPort<T> inputPort = inputPorts[this.index]; + private InputPort<?> getNextPortInRoundRobinOrder(final List<InputPort<?>> inputPorts) { + InputPort<?> inputPort = inputPorts.get(this.index); - this.index = (this.index + 1) % inputPorts.length; + this.index = (this.index + 1) % inputPorts.size(); return inputPort; } @Override - public void onInputPortRemoved(final Stage stage, final InputPort<?> removedInputPort) { - Merger<?> merger = (Merger<?>) stage; + public void onPortRemoved(final InputPort<?> removedInputPort) { + Merger<?> merger = (Merger<?>) removedInputPort.getOwningStage(); // correct the index if it is out-of-bounds - this.index = (this.index + 1) % merger.getInputPorts().length; + this.index = (this.index + 1) % merger.getInputPorts().size(); } } diff --git a/src/main/java/teetime/util/framework/port/PortList.java b/src/main/java/teetime/util/framework/port/PortList.java new file mode 100644 index 00000000..a7d14053 --- /dev/null +++ b/src/main/java/teetime/util/framework/port/PortList.java @@ -0,0 +1,62 @@ +package teetime.util.framework.port; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import teetime.framework.AbstractPort; + +public class PortList<T extends AbstractPort<?>> { + + private final List<T> openedPorts = new ArrayList<T>(); + + // private final List<T> closedPorts = new ArrayList<T>(); + + private final Set<PortRemovedListener<T>> portsRemovedListeners = new HashSet<PortRemovedListener<T>>(); + + public List<T> getOpenedPorts() { + return openedPorts; + } + + // public List<T> getClosedPorts() { + // return closedPorts; + // } + + public boolean add(final T port) { + return openedPorts.add(port); + } + + public boolean remove(final T port) { + boolean removed = openedPorts.remove(port); // BETTER remove by index for performance reasons + firePortRemoved(port); + return removed; + } + + public boolean close(final T port) { + boolean removed = remove(port); + // if (removed) { + // boolean added = closedPorts.add(port); + // if (added) { + // return true; + // } + // openedPorts.add(port); + // } + return removed; + } + + public int size() { + return openedPorts.size(); + } + + private void firePortRemoved(final T removedPort) { + for (PortRemovedListener<T> listener : portsRemovedListeners) { + listener.onPortRemoved(removedPort); + } + } + + public void addPortRemovedListener(final PortRemovedListener<T> listener) { + portsRemovedListeners.add(listener); + } + +} diff --git a/src/main/java/teetime/util/framework/port/PortRemovedListener.java b/src/main/java/teetime/util/framework/port/PortRemovedListener.java new file mode 100644 index 00000000..e716f459 --- /dev/null +++ b/src/main/java/teetime/util/framework/port/PortRemovedListener.java @@ -0,0 +1,8 @@ +package teetime.util.framework.port; + +import teetime.framework.AbstractPort; + +public interface PortRemovedListener<T extends AbstractPort<?>> { + + void onPortRemoved(T removedPort); +} diff --git a/src/test/java/teetime/framework/TraversorTest.java b/src/test/java/teetime/framework/TraversorTest.java index 60ace19a..35938bb5 100644 --- a/src/test/java/teetime/framework/TraversorTest.java +++ b/src/test/java/teetime/framework/TraversorTest.java @@ -44,11 +44,14 @@ public class TraversorTest { TestConfiguration tc = new TestConfiguration(); new Execution<TestConfiguration>(tc); traversor.traverse(tc.init); + Set<Stage> comparingStages = new HashSet<Stage>(); comparingStages.add(tc.init); comparingStages.add(tc.f2b); comparingStages.add(tc.distributor); - assertThat(tc.distributor.getOwningThread(), is(not(tc.distributor.getOutputPorts()[0].pipe.getTargetPort().getOwningStage().getOwningThread()))); + + OutputPort<?> distributorOutputPort0 = tc.distributor.getOutputPorts().get(0); + assertThat(tc.distributor.getOwningThread(), is(not(distributorOutputPort0.pipe.getTargetPort().getOwningStage().getOwningThread()))); assertEquals(comparingStages, traversor.getVisitedStage()); } -- GitLab