diff --git a/.settings/edu.umd.cs.findbugs.core.prefs b/.settings/edu.umd.cs.findbugs.core.prefs index 6102e83fb246be6053274f4272509ce91692951e..b8d923bd0097c8006731e42503e1003cd1e197bd 100644 --- a/.settings/edu.umd.cs.findbugs.core.prefs +++ b/.settings/edu.umd.cs.findbugs.core.prefs @@ -1,5 +1,5 @@ #FindBugs User Preferences -#Mon Jul 06 14:06:35 CEST 2015 +#Fri Jul 10 13:06:00 CEST 2015 detector_threshold=2 effort=max excludefilter0=.fbExcludeFilterFile|true diff --git a/src/main/java/teetime/framework/AbstractStage.java b/src/main/java/teetime/framework/AbstractStage.java index e48efb0651d41cd6f0e86911cc6f630e5dab2d07..22a467ce825cc1143fe1bc8ffb43c5fc4df57731 100644 --- a/src/main/java/teetime/framework/AbstractStage.java +++ b/src/main/java/teetime/framework/AbstractStage.java @@ -283,21 +283,21 @@ public abstract class AbstractStage extends Stage { return TerminationStrategy.BY_SIGNAL; } - protected <T> DynamicOutputPort<T> createDynamicOutputPort() { - final DynamicOutputPort<T> outputPort = new DynamicOutputPort<T>(null, this, outputPorts.size()); - outputPorts.add(outputPort); - return outputPort; - } - - protected <T> DynamicInputPort<T> createDynamicInputPort() { - final DynamicInputPort<T> inputPort = new DynamicInputPort<T>(null, this, inputPorts.size()); - inputPorts.add(inputPort); - return inputPort; - } + // protected <T> DynamicOutputPort<T> createDynamicOutputPort() { + // final DynamicOutputPort<T> outputPort = new DynamicOutputPort<T>(null, this, outputPorts.size()); + // outputPorts.add(outputPort); + // return outputPort; + // } + + // protected <T> DynamicInputPort<T> createDynamicInputPort() { + // final DynamicInputPort<T> inputPort = new DynamicInputPort<T>(null, this, inputPorts.size()); + // inputPorts.add(inputPort); + // return inputPort; + // } @Override - protected void removeDynamicPort(final DynamicOutputPort<?> dynamicOutputPort) { - outputPorts.remove(dynamicOutputPort); // TODO update setIndex IF it is still used + protected void removeDynamicPort(final OutputPort<?> outputPort) { + outputPorts.remove(outputPort); // TODO update setIndex IF it is still used } protected final void addOutputPortRemovedListener(final PortRemovedListener<OutputPort<?>> outputPortRemovedListener) { @@ -305,8 +305,8 @@ public abstract class AbstractStage extends Stage { } @Override - protected void removeDynamicPort(final DynamicInputPort<?> dynamicInputPort) { - inputPorts.remove(dynamicInputPort); // TODO update setIndex IF it is still used + protected void removeDynamicPort(final InputPort<?> inputPort) { + inputPorts.remove(inputPort); // TODO update setIndex IF it is still used } protected final void addInputPortRemovedListener(final PortRemovedListener<InputPort<?>> inputPortRemovedListener) { diff --git a/src/main/java/teetime/framework/DynamicInputPort.java b/src/main/java/teetime/framework/DynamicInputPort.java deleted file mode 100644 index 1666d9cd65c0d05d86cfc6f2a5372b0d80a6d236..0000000000000000000000000000000000000000 --- a/src/main/java/teetime/framework/DynamicInputPort.java +++ /dev/null @@ -1,44 +0,0 @@ -/** - * Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://christianwulf.github.io/teetime) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package teetime.framework; - -/** - * - * @author Christian Wulf - * - * @param <T> - * the type of elements to be received - * - * @since 1.2 - */ -public final class DynamicInputPort<T> extends InputPort<T> { - - private int index; - - DynamicInputPort(final Class<T> type, final Stage owningStage, final int index) { - super(type, owningStage, null); - this.index = index; - } - - public int getIndex() { - return index; - } - - public void setIndex(final int index) { - this.index = index; - } - -} diff --git a/src/main/java/teetime/framework/DynamicOutputPort.java b/src/main/java/teetime/framework/DynamicOutputPort.java deleted file mode 100644 index a84bd9c4ec6c23eea75ceb1564c134c341cd0c32..0000000000000000000000000000000000000000 --- a/src/main/java/teetime/framework/DynamicOutputPort.java +++ /dev/null @@ -1,44 +0,0 @@ -/** - * Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://christianwulf.github.io/teetime) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package teetime.framework; - -/** - * - * @author Christian Wulf - * - * @param <T> - * the type of elements to be sent - * - * @since 1.2 - */ -public class DynamicOutputPort<T> extends OutputPort<T> { - - private int index; - - protected DynamicOutputPort(final Class<T> type, final Stage owningStage, final int index) { - super(type, owningStage, null); - this.index = index; - } - - public int getIndex() { - return index; - } - - public void setIndex(final int index) { - this.index = index; - } - -} diff --git a/src/main/java/teetime/framework/RunnableConsumerStage.java b/src/main/java/teetime/framework/RunnableConsumerStage.java index 6facffe8fcfe7ac78ab7a5a6eef5be2c605461eb..7af5b4d6f24a3dc28b1bf7d6bbfc76d2b910c6ea 100644 --- a/src/main/java/teetime/framework/RunnableConsumerStage.java +++ b/src/main/java/teetime/framework/RunnableConsumerStage.java @@ -54,11 +54,11 @@ final class RunnableConsumerStage extends AbstractRunnableStage { } private void checkForTerminationSignal(final Stage stage) { - System.out.println("checkForTerminationSignal: " + stage); // FIXME should getInputPorts() really be defined in Stage? for (InputPort<?> inputPort : stage.getInputPorts()) { - System.out.println("\tclosed: " + inputPort.isClosed() + " (" + inputPort); - if (!inputPort.isClosed()) { + if (inputPort.isClosed()) { + // stage.removeDynamicPort(inputPort); + } else { return; } } diff --git a/src/main/java/teetime/framework/Stage.java b/src/main/java/teetime/framework/Stage.java index 5c1ee1c432b724c688dbb30a4da10bdbcfddd762..5979b9ebbf8fb30ec397ba97c5ef66e17fa0969d 100644 --- a/src/main/java/teetime/framework/Stage.java +++ b/src/main/java/teetime/framework/Stage.java @@ -150,8 +150,8 @@ public abstract class Stage { this.exceptionHandler = exceptionHandler; } - protected abstract void removeDynamicPort(DynamicOutputPort<?> dynamicOutputPort); + protected abstract void removeDynamicPort(OutputPort<?> outputPort); - protected abstract void removeDynamicPort(DynamicInputPort<?> dynamicInputPort); + protected abstract void removeDynamicPort(InputPort<?> inputPort); } diff --git a/src/main/java/teetime/stage/basic/distributor/Distributor.java b/src/main/java/teetime/stage/basic/distributor/Distributor.java index 53e2027083729588210703487d4f0fd18d088c7b..fe7aee22e637c533df52d90f2ee7c0078a2dcf7e 100644 --- a/src/main/java/teetime/stage/basic/distributor/Distributor.java +++ b/src/main/java/teetime/stage/basic/distributor/Distributor.java @@ -18,7 +18,6 @@ package teetime.stage.basic.distributor; import java.util.List; import teetime.framework.AbstractConsumerStage; -import teetime.framework.DynamicOutputPort; import teetime.framework.OutputPort; import teetime.stage.basic.distributor.strategy.IDistributorStrategy; import teetime.stage.basic.distributor.strategy.RoundRobinStrategy2; @@ -49,8 +48,8 @@ public class Distributor<T> extends AbstractConsumerStage<T> { this.strategy.distribute(this.getOutputPorts(), element); } - public DynamicOutputPort<T> getNewOutputPort() { - return this.createDynamicOutputPort(); + public OutputPort<T> getNewOutputPort() { // make public + return this.createOutputPort(); } public IDistributorStrategy getStrategy() { diff --git a/src/main/java/teetime/stage/basic/distributor/dynamic/CreatePortAction.java b/src/main/java/teetime/stage/basic/distributor/dynamic/CreatePortAction.java index 1e04ca08fc7a7e9b469a7611ee26f4dc4946698a..92c3db23c3708b9137903eaa1c44305df4a7666b 100644 --- a/src/main/java/teetime/stage/basic/distributor/dynamic/CreatePortAction.java +++ b/src/main/java/teetime/stage/basic/distributor/dynamic/CreatePortAction.java @@ -19,8 +19,8 @@ import java.util.ArrayList; import java.util.List; import teetime.framework.DynamicActuator; -import teetime.framework.DynamicOutputPort; import teetime.framework.InputPort; +import teetime.framework.OutputPort; import teetime.framework.pipe.SpScPipeFactory; import teetime.framework.signal.InitializingSignal; import teetime.framework.signal.StartingSignal; @@ -41,13 +41,13 @@ public class CreatePortAction<T> implements PortAction<DynamicDistributor<T>> { @Override public void execute(final DynamicDistributor<T> dynamicDistributor) { - DynamicOutputPort<T> newOutputPort = dynamicDistributor.getNewOutputPort(); + OutputPort<T> newOutputPort = dynamicDistributor.getNewOutputPort(); processOutputPort(newOutputPort); onOutputPortCreated(dynamicDistributor, newOutputPort); } - private void processOutputPort(final DynamicOutputPort<T> newOutputPort) { + private void processOutputPort(final OutputPort<T> newOutputPort) { INTER_THREAD_PIPE_FACTORY.create(newOutputPort, inputPort); DYNAMIC_ACTUATOR.startWithinNewThread(inputPort.getOwningStage()); @@ -58,7 +58,7 @@ public class CreatePortAction<T> implements PortAction<DynamicDistributor<T>> { // FIXME pass the new thread to the analysis so that it can terminate the thread at the end } - private void onOutputPortCreated(final DynamicDistributor<T> dynamicDistributor, final DynamicOutputPort<T> newOutputPort) { + private void onOutputPortCreated(final DynamicDistributor<T> dynamicDistributor, final OutputPort<T> newOutputPort) { for (PortActionListener<T> listener : listeners) { listener.onOutputPortCreated(dynamicDistributor, newOutputPort); } diff --git a/src/main/java/teetime/stage/basic/distributor/dynamic/DynamicDistributor.java b/src/main/java/teetime/stage/basic/distributor/dynamic/DynamicDistributor.java index a05b488265c6ba998148d019a873b85facd7eb4b..ad7f056c6dcb2abf4aa7a8e81374c2109cd032cf 100644 --- a/src/main/java/teetime/stage/basic/distributor/dynamic/DynamicDistributor.java +++ b/src/main/java/teetime/stage/basic/distributor/dynamic/DynamicDistributor.java @@ -17,7 +17,6 @@ package teetime.stage.basic.distributor.dynamic; import java.util.concurrent.BlockingQueue; -import teetime.framework.DynamicOutputPort; import teetime.framework.OutputPort; import teetime.framework.signal.TerminatingSignal; import teetime.stage.basic.distributor.Distributor; @@ -56,8 +55,8 @@ public class DynamicDistributor<T> extends Distributor<T> implements PortRemoved } @Override - public void removeDynamicPort(final DynamicOutputPort<?> dynamicOutputPort) { // make public - super.removeDynamicPort(dynamicOutputPort); + public void removeDynamicPort(final OutputPort<?> outputPort) { // make public + super.removeDynamicPort(outputPort); } public boolean addPortActionRequest(final PortAction<DynamicDistributor<T>> newPortActionRequest) { diff --git a/src/main/java/teetime/stage/basic/distributor/dynamic/PortActionListener.java b/src/main/java/teetime/stage/basic/distributor/dynamic/PortActionListener.java index 875db7e737f9ce8b76d66eb6621828ac0719fe10..d160a14ef850de1c7102303e059fe26949b13fa2 100644 --- a/src/main/java/teetime/stage/basic/distributor/dynamic/PortActionListener.java +++ b/src/main/java/teetime/stage/basic/distributor/dynamic/PortActionListener.java @@ -15,9 +15,9 @@ */ package teetime.stage.basic.distributor.dynamic; -import teetime.framework.DynamicOutputPort; +import teetime.framework.OutputPort; public interface PortActionListener<T> { - void onOutputPortCreated(DynamicDistributor<T> distributor, DynamicOutputPort<T> port); + void onOutputPortCreated(DynamicDistributor<T> distributor, OutputPort<T> port); } diff --git a/src/main/java/teetime/stage/basic/distributor/dynamic/RemovePortAction.java b/src/main/java/teetime/stage/basic/distributor/dynamic/RemovePortAction.java index 461d3fdba022d1609d5fba2e52c25cb50d41ccd1..2731b5189f01c69f4ac49045da5cb0cd58d7551a 100644 --- a/src/main/java/teetime/stage/basic/distributor/dynamic/RemovePortAction.java +++ b/src/main/java/teetime/stage/basic/distributor/dynamic/RemovePortAction.java @@ -15,14 +15,14 @@ */ package teetime.stage.basic.distributor.dynamic; -import teetime.framework.DynamicOutputPort; +import teetime.framework.OutputPort; import teetime.util.framework.port.PortAction; public class RemovePortAction<T> implements PortAction<DynamicDistributor<T>> { - private final DynamicOutputPort<T> outputPort; + private final OutputPort<T> outputPort; - public RemovePortAction(final DynamicOutputPort<T> outputPort) { + public RemovePortAction(final OutputPort<T> outputPort) { if (null == outputPort) { throw new IllegalArgumentException("outputPort may not be null"); } diff --git a/src/main/java/teetime/stage/basic/merger/Merger.java b/src/main/java/teetime/stage/basic/merger/Merger.java index 2a28fed8b470c970b8a4dca3fca8759f067537de..7043b5420b4f4ecc335aa1cfe2d1e4ed40ed1db9 100644 --- a/src/main/java/teetime/stage/basic/merger/Merger.java +++ b/src/main/java/teetime/stage/basic/merger/Merger.java @@ -22,7 +22,6 @@ import java.util.Map; import java.util.Set; import teetime.framework.AbstractStage; -import teetime.framework.DynamicInputPort; import teetime.framework.InputPort; import teetime.framework.OutputPort; import teetime.framework.signal.ISignal; @@ -112,8 +111,8 @@ public class Merger<T> extends AbstractStage { return super.getInputPorts(); } - public DynamicInputPort<T> getNewInputPort() { - return this.createDynamicInputPort(); + public InputPort<T> getNewInputPort() { + return this.createInputPort(); } public OutputPort<T> getOutputPort() { diff --git a/src/main/java/teetime/stage/basic/merger/dynamic/DynamicMerger.java b/src/main/java/teetime/stage/basic/merger/dynamic/DynamicMerger.java index 6a513491071fce0eeb2a24d5d945dfeb718189d6..5559c105665e7a4ef41d14406f5b65c8e9f8b690 100644 --- a/src/main/java/teetime/stage/basic/merger/dynamic/DynamicMerger.java +++ b/src/main/java/teetime/stage/basic/merger/dynamic/DynamicMerger.java @@ -17,7 +17,7 @@ package teetime.stage.basic.merger.dynamic; import java.util.concurrent.BlockingQueue; -import teetime.framework.DynamicInputPort; +import teetime.framework.InputPort; import teetime.stage.basic.merger.Merger; import teetime.stage.basic.merger.strategy.IMergerStrategy; import teetime.util.framework.port.PortAction; @@ -43,8 +43,8 @@ public class DynamicMerger<T> extends Merger<T> { } @Override - public void removeDynamicPort(final DynamicInputPort<?> dynamicInputPort) { // make public - super.removeDynamicPort(dynamicInputPort); + public void removeDynamicPort(final InputPort<?> inputPort) { // make public + super.removeDynamicPort(inputPort); } public boolean addPortActionRequest(final PortAction<DynamicMerger<T>> newPortActionRequest) { diff --git a/src/main/java/teetime/stage/basic/merger/dynamic/RemovePortAction.java b/src/main/java/teetime/stage/basic/merger/dynamic/RemovePortAction.java index d3fc845ab3389f767faa4bac6601f0f2cbf10e42..38141b0f9d96928331d8d7baf6b35a3f18f84e04 100644 --- a/src/main/java/teetime/stage/basic/merger/dynamic/RemovePortAction.java +++ b/src/main/java/teetime/stage/basic/merger/dynamic/RemovePortAction.java @@ -17,15 +17,14 @@ package teetime.stage.basic.merger.dynamic; import java.util.List; -import teetime.framework.DynamicInputPort; import teetime.framework.InputPort; import teetime.util.framework.port.PortAction; public class RemovePortAction<T> implements PortAction<DynamicMerger<T>> { - private final DynamicInputPort<T> inputPort; + private final InputPort<T> inputPort; - public RemovePortAction(final DynamicInputPort<T> inputPort) { + public RemovePortAction(final InputPort<T> inputPort) { super(); this.inputPort = inputPort; } @@ -42,6 +41,6 @@ public class RemovePortAction<T> implements PortAction<DynamicMerger<T>> { inputPortsToRemove = inputPort; } - dynamicMerger.removeDynamicPort((DynamicInputPort<?>) inputPortsToRemove); + dynamicMerger.removeDynamicPort(inputPortsToRemove); } } diff --git a/src/test/java/teetime/stage/basic/distributor/dynamic/DynamicDistributorTest.java b/src/test/java/teetime/stage/basic/distributor/dynamic/DynamicDistributorTest.java index 5aa708499b21bbae96495ccdacd8d3652b5a8120..2a8775ec9e70c4ec96a8f2731dd1442859e034a8 100644 --- a/src/test/java/teetime/stage/basic/distributor/dynamic/DynamicDistributorTest.java +++ b/src/test/java/teetime/stage/basic/distributor/dynamic/DynamicDistributorTest.java @@ -26,8 +26,8 @@ import java.util.List; import org.junit.Test; import teetime.framework.Configuration; -import teetime.framework.DynamicOutputPort; import teetime.framework.Execution; +import teetime.framework.OutputPort; import teetime.framework.Stage; import teetime.stage.CollectorSink; import teetime.stage.InitialElementProducer; @@ -100,7 +100,7 @@ public class DynamicDistributorTest { assertThat(config.getOutputElements(), contains(0, 1, 2, 4, 5)); assertValuesForIndex(inputActions[0], Collections.<Integer> emptyList()); - assertValuesForIndex(inputActions[2], Arrays.asList(3)); + assertValuesForIndex(inputActions[2], Arrays.asList(3)); // FIXME fails sometimes assertValuesForIndex(inputActions[3], Collections.<Integer> emptyList()); } @@ -109,7 +109,7 @@ public class DynamicDistributorTest { CreatePortAction<Integer> portAction = new CreatePortAction<Integer>(newStage.getInputPort()); portAction.addPortActionListener(new PortActionListener<Integer>() { @Override - public void onOutputPortCreated(final DynamicDistributor<Integer> distributor, final DynamicOutputPort<Integer> port) { + public void onOutputPortCreated(final DynamicDistributor<Integer> distributor, final OutputPort<Integer> port) { portContainer.setPort(port); } }); @@ -122,7 +122,7 @@ public class DynamicDistributorTest { @SuppressWarnings("unchecked") CollectorSink<Integer> collectorSink = (CollectorSink<Integer>) stage; - assertThat(collectorSink.getElements(), is(values)); + assertThat(collectorSink.getElements(), is(values)); // FIXME fails sometimes with a ConcurrentModificationException } private static class DynamicDistributorTestConfig<T> extends Configuration { diff --git a/src/test/java/teetime/stage/basic/distributor/dynamic/PortContainer.java b/src/test/java/teetime/stage/basic/distributor/dynamic/PortContainer.java index 114c59ed33c388934ad8239fbf5cb52d0ce21499..2646e79ca316a121c540007dcb34e186fbb65770 100644 --- a/src/test/java/teetime/stage/basic/distributor/dynamic/PortContainer.java +++ b/src/test/java/teetime/stage/basic/distributor/dynamic/PortContainer.java @@ -15,7 +15,7 @@ */ package teetime.stage.basic.distributor.dynamic; -import teetime.framework.DynamicOutputPort; +import teetime.framework.OutputPort; /** * Represents a container that eventually holds the output port that a {@link RemovePortAction} can use. @@ -26,15 +26,15 @@ import teetime.framework.DynamicOutputPort; */ final class PortContainer<T> { - private DynamicOutputPort<T> port; + private OutputPort<T> port; PortContainer() {} - public void setPort(final DynamicOutputPort<T> port) { + public void setPort(final OutputPort<T> port) { this.port = port; } - public DynamicOutputPort<T> getPort() { + public OutputPort<T> getPort() { return port; } diff --git a/src/test/java/teetime/stage/basic/distributor/dynamic/RemovePortActionDelegation.java b/src/test/java/teetime/stage/basic/distributor/dynamic/RemovePortActionDelegation.java index 89cd232975bd6bab22989f731936e4b1626c88ed..52e4e33121871f268d427f7c82c9fe4aedf4e47d 100644 --- a/src/test/java/teetime/stage/basic/distributor/dynamic/RemovePortActionDelegation.java +++ b/src/test/java/teetime/stage/basic/distributor/dynamic/RemovePortActionDelegation.java @@ -15,7 +15,7 @@ */ package teetime.stage.basic.distributor.dynamic; -import teetime.framework.DynamicOutputPort; +import teetime.framework.OutputPort; import teetime.util.framework.port.PortAction; /** @@ -35,8 +35,8 @@ public class RemovePortActionDelegation<T> implements PortAction<DynamicDistribu @Override public void execute(final DynamicDistributor<T> dynamicDistributor) { - DynamicOutputPort<?> dynamicOutputPort = portContainer.getPort(); - dynamicDistributor.removeDynamicPort(dynamicOutputPort); + OutputPort<?> outputPort = portContainer.getPort(); + dynamicDistributor.removeDynamicPort(outputPort); } }