Skip to content
Snippets Groups Projects
Commit bbb2e1c0 authored by Christian Wulf's avatar Christian Wulf
Browse files

removed dynamic port classes

parent 3503de1e
No related branches found
No related tags found
No related merge requests found
Showing
with 53 additions and 145 deletions
#FindBugs User Preferences
#Mon Jul 06 14:06:35 CEST 2015
#Fri Jul 10 13:06:00 CEST 2015
detector_threshold=2
effort=max
excludefilter0=.fbExcludeFilterFile|true
......
......@@ -283,21 +283,21 @@ public abstract class AbstractStage extends Stage {
return TerminationStrategy.BY_SIGNAL;
}
protected <T> DynamicOutputPort<T> createDynamicOutputPort() {
final DynamicOutputPort<T> outputPort = new DynamicOutputPort<T>(null, this, outputPorts.size());
outputPorts.add(outputPort);
return outputPort;
}
protected <T> DynamicInputPort<T> createDynamicInputPort() {
final DynamicInputPort<T> inputPort = new DynamicInputPort<T>(null, this, inputPorts.size());
inputPorts.add(inputPort);
return inputPort;
}
// protected <T> DynamicOutputPort<T> createDynamicOutputPort() {
// final DynamicOutputPort<T> outputPort = new DynamicOutputPort<T>(null, this, outputPorts.size());
// outputPorts.add(outputPort);
// return outputPort;
// }
// protected <T> DynamicInputPort<T> createDynamicInputPort() {
// final DynamicInputPort<T> inputPort = new DynamicInputPort<T>(null, this, inputPorts.size());
// inputPorts.add(inputPort);
// return inputPort;
// }
@Override
protected void removeDynamicPort(final DynamicOutputPort<?> dynamicOutputPort) {
outputPorts.remove(dynamicOutputPort); // TODO update setIndex IF it is still used
protected void removeDynamicPort(final OutputPort<?> outputPort) {
outputPorts.remove(outputPort); // TODO update setIndex IF it is still used
}
protected final void addOutputPortRemovedListener(final PortRemovedListener<OutputPort<?>> outputPortRemovedListener) {
......@@ -305,8 +305,8 @@ public abstract class AbstractStage extends Stage {
}
@Override
protected void removeDynamicPort(final DynamicInputPort<?> dynamicInputPort) {
inputPorts.remove(dynamicInputPort); // TODO update setIndex IF it is still used
protected void removeDynamicPort(final InputPort<?> inputPort) {
inputPorts.remove(inputPort); // TODO update setIndex IF it is still used
}
protected final void addInputPortRemovedListener(final PortRemovedListener<InputPort<?>> inputPortRemovedListener) {
......
/**
* Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://christianwulf.github.io/teetime)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package teetime.framework;
/**
*
* @author Christian Wulf
*
* @param <T>
* the type of elements to be received
*
* @since 1.2
*/
public final class DynamicInputPort<T> extends InputPort<T> {
private int index;
DynamicInputPort(final Class<T> type, final Stage owningStage, final int index) {
super(type, owningStage, null);
this.index = index;
}
public int getIndex() {
return index;
}
public void setIndex(final int index) {
this.index = index;
}
}
/**
* Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://christianwulf.github.io/teetime)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package teetime.framework;
/**
*
* @author Christian Wulf
*
* @param <T>
* the type of elements to be sent
*
* @since 1.2
*/
public class DynamicOutputPort<T> extends OutputPort<T> {
private int index;
protected DynamicOutputPort(final Class<T> type, final Stage owningStage, final int index) {
super(type, owningStage, null);
this.index = index;
}
public int getIndex() {
return index;
}
public void setIndex(final int index) {
this.index = index;
}
}
......@@ -54,11 +54,11 @@ final class RunnableConsumerStage extends AbstractRunnableStage {
}
private void checkForTerminationSignal(final Stage stage) {
System.out.println("checkForTerminationSignal: " + stage);
// FIXME should getInputPorts() really be defined in Stage?
for (InputPort<?> inputPort : stage.getInputPorts()) {
System.out.println("\tclosed: " + inputPort.isClosed() + " (" + inputPort);
if (!inputPort.isClosed()) {
if (inputPort.isClosed()) {
// stage.removeDynamicPort(inputPort);
} else {
return;
}
}
......
......@@ -150,8 +150,8 @@ public abstract class Stage {
this.exceptionHandler = exceptionHandler;
}
protected abstract void removeDynamicPort(DynamicOutputPort<?> dynamicOutputPort);
protected abstract void removeDynamicPort(OutputPort<?> outputPort);
protected abstract void removeDynamicPort(DynamicInputPort<?> dynamicInputPort);
protected abstract void removeDynamicPort(InputPort<?> inputPort);
}
......@@ -18,7 +18,6 @@ package teetime.stage.basic.distributor;
import java.util.List;
import teetime.framework.AbstractConsumerStage;
import teetime.framework.DynamicOutputPort;
import teetime.framework.OutputPort;
import teetime.stage.basic.distributor.strategy.IDistributorStrategy;
import teetime.stage.basic.distributor.strategy.RoundRobinStrategy2;
......@@ -49,8 +48,8 @@ public class Distributor<T> extends AbstractConsumerStage<T> {
this.strategy.distribute(this.getOutputPorts(), element);
}
public DynamicOutputPort<T> getNewOutputPort() {
return this.createDynamicOutputPort();
public OutputPort<T> getNewOutputPort() { // make public
return this.createOutputPort();
}
public IDistributorStrategy getStrategy() {
......
......@@ -19,8 +19,8 @@ import java.util.ArrayList;
import java.util.List;
import teetime.framework.DynamicActuator;
import teetime.framework.DynamicOutputPort;
import teetime.framework.InputPort;
import teetime.framework.OutputPort;
import teetime.framework.pipe.SpScPipeFactory;
import teetime.framework.signal.InitializingSignal;
import teetime.framework.signal.StartingSignal;
......@@ -41,13 +41,13 @@ public class CreatePortAction<T> implements PortAction<DynamicDistributor<T>> {
@Override
public void execute(final DynamicDistributor<T> dynamicDistributor) {
DynamicOutputPort<T> newOutputPort = dynamicDistributor.getNewOutputPort();
OutputPort<T> newOutputPort = dynamicDistributor.getNewOutputPort();
processOutputPort(newOutputPort);
onOutputPortCreated(dynamicDistributor, newOutputPort);
}
private void processOutputPort(final DynamicOutputPort<T> newOutputPort) {
private void processOutputPort(final OutputPort<T> newOutputPort) {
INTER_THREAD_PIPE_FACTORY.create(newOutputPort, inputPort);
DYNAMIC_ACTUATOR.startWithinNewThread(inputPort.getOwningStage());
......@@ -58,7 +58,7 @@ public class CreatePortAction<T> implements PortAction<DynamicDistributor<T>> {
// FIXME pass the new thread to the analysis so that it can terminate the thread at the end
}
private void onOutputPortCreated(final DynamicDistributor<T> dynamicDistributor, final DynamicOutputPort<T> newOutputPort) {
private void onOutputPortCreated(final DynamicDistributor<T> dynamicDistributor, final OutputPort<T> newOutputPort) {
for (PortActionListener<T> listener : listeners) {
listener.onOutputPortCreated(dynamicDistributor, newOutputPort);
}
......
......@@ -17,7 +17,6 @@ package teetime.stage.basic.distributor.dynamic;
import java.util.concurrent.BlockingQueue;
import teetime.framework.DynamicOutputPort;
import teetime.framework.OutputPort;
import teetime.framework.signal.TerminatingSignal;
import teetime.stage.basic.distributor.Distributor;
......@@ -56,8 +55,8 @@ public class DynamicDistributor<T> extends Distributor<T> implements PortRemoved
}
@Override
public void removeDynamicPort(final DynamicOutputPort<?> dynamicOutputPort) { // make public
super.removeDynamicPort(dynamicOutputPort);
public void removeDynamicPort(final OutputPort<?> outputPort) { // make public
super.removeDynamicPort(outputPort);
}
public boolean addPortActionRequest(final PortAction<DynamicDistributor<T>> newPortActionRequest) {
......
......@@ -15,9 +15,9 @@
*/
package teetime.stage.basic.distributor.dynamic;
import teetime.framework.DynamicOutputPort;
import teetime.framework.OutputPort;
public interface PortActionListener<T> {
void onOutputPortCreated(DynamicDistributor<T> distributor, DynamicOutputPort<T> port);
void onOutputPortCreated(DynamicDistributor<T> distributor, OutputPort<T> port);
}
......@@ -15,14 +15,14 @@
*/
package teetime.stage.basic.distributor.dynamic;
import teetime.framework.DynamicOutputPort;
import teetime.framework.OutputPort;
import teetime.util.framework.port.PortAction;
public class RemovePortAction<T> implements PortAction<DynamicDistributor<T>> {
private final DynamicOutputPort<T> outputPort;
private final OutputPort<T> outputPort;
public RemovePortAction(final DynamicOutputPort<T> outputPort) {
public RemovePortAction(final OutputPort<T> outputPort) {
if (null == outputPort) {
throw new IllegalArgumentException("outputPort may not be null");
}
......
......@@ -22,7 +22,6 @@ import java.util.Map;
import java.util.Set;
import teetime.framework.AbstractStage;
import teetime.framework.DynamicInputPort;
import teetime.framework.InputPort;
import teetime.framework.OutputPort;
import teetime.framework.signal.ISignal;
......@@ -112,8 +111,8 @@ public class Merger<T> extends AbstractStage {
return super.getInputPorts();
}
public DynamicInputPort<T> getNewInputPort() {
return this.createDynamicInputPort();
public InputPort<T> getNewInputPort() {
return this.createInputPort();
}
public OutputPort<T> getOutputPort() {
......
......@@ -17,7 +17,7 @@ package teetime.stage.basic.merger.dynamic;
import java.util.concurrent.BlockingQueue;
import teetime.framework.DynamicInputPort;
import teetime.framework.InputPort;
import teetime.stage.basic.merger.Merger;
import teetime.stage.basic.merger.strategy.IMergerStrategy;
import teetime.util.framework.port.PortAction;
......@@ -43,8 +43,8 @@ public class DynamicMerger<T> extends Merger<T> {
}
@Override
public void removeDynamicPort(final DynamicInputPort<?> dynamicInputPort) { // make public
super.removeDynamicPort(dynamicInputPort);
public void removeDynamicPort(final InputPort<?> inputPort) { // make public
super.removeDynamicPort(inputPort);
}
public boolean addPortActionRequest(final PortAction<DynamicMerger<T>> newPortActionRequest) {
......
......@@ -17,15 +17,14 @@ package teetime.stage.basic.merger.dynamic;
import java.util.List;
import teetime.framework.DynamicInputPort;
import teetime.framework.InputPort;
import teetime.util.framework.port.PortAction;
public class RemovePortAction<T> implements PortAction<DynamicMerger<T>> {
private final DynamicInputPort<T> inputPort;
private final InputPort<T> inputPort;
public RemovePortAction(final DynamicInputPort<T> inputPort) {
public RemovePortAction(final InputPort<T> inputPort) {
super();
this.inputPort = inputPort;
}
......@@ -42,6 +41,6 @@ public class RemovePortAction<T> implements PortAction<DynamicMerger<T>> {
inputPortsToRemove = inputPort;
}
dynamicMerger.removeDynamicPort((DynamicInputPort<?>) inputPortsToRemove);
dynamicMerger.removeDynamicPort(inputPortsToRemove);
}
}
......@@ -26,8 +26,8 @@ import java.util.List;
import org.junit.Test;
import teetime.framework.Configuration;
import teetime.framework.DynamicOutputPort;
import teetime.framework.Execution;
import teetime.framework.OutputPort;
import teetime.framework.Stage;
import teetime.stage.CollectorSink;
import teetime.stage.InitialElementProducer;
......@@ -100,7 +100,7 @@ public class DynamicDistributorTest {
assertThat(config.getOutputElements(), contains(0, 1, 2, 4, 5));
assertValuesForIndex(inputActions[0], Collections.<Integer> emptyList());
assertValuesForIndex(inputActions[2], Arrays.asList(3));
assertValuesForIndex(inputActions[2], Arrays.asList(3)); // FIXME fails sometimes
assertValuesForIndex(inputActions[3], Collections.<Integer> emptyList());
}
......@@ -109,7 +109,7 @@ public class DynamicDistributorTest {
CreatePortAction<Integer> portAction = new CreatePortAction<Integer>(newStage.getInputPort());
portAction.addPortActionListener(new PortActionListener<Integer>() {
@Override
public void onOutputPortCreated(final DynamicDistributor<Integer> distributor, final DynamicOutputPort<Integer> port) {
public void onOutputPortCreated(final DynamicDistributor<Integer> distributor, final OutputPort<Integer> port) {
portContainer.setPort(port);
}
});
......@@ -122,7 +122,7 @@ public class DynamicDistributorTest {
@SuppressWarnings("unchecked")
CollectorSink<Integer> collectorSink = (CollectorSink<Integer>) stage;
assertThat(collectorSink.getElements(), is(values));
assertThat(collectorSink.getElements(), is(values)); // FIXME fails sometimes with a ConcurrentModificationException
}
private static class DynamicDistributorTestConfig<T> extends Configuration {
......
......@@ -15,7 +15,7 @@
*/
package teetime.stage.basic.distributor.dynamic;
import teetime.framework.DynamicOutputPort;
import teetime.framework.OutputPort;
/**
* Represents a container that eventually holds the output port that a {@link RemovePortAction} can use.
......@@ -26,15 +26,15 @@ import teetime.framework.DynamicOutputPort;
*/
final class PortContainer<T> {
private DynamicOutputPort<T> port;
private OutputPort<T> port;
PortContainer() {}
public void setPort(final DynamicOutputPort<T> port) {
public void setPort(final OutputPort<T> port) {
this.port = port;
}
public DynamicOutputPort<T> getPort() {
public OutputPort<T> getPort() {
return port;
}
......
......@@ -15,7 +15,7 @@
*/
package teetime.stage.basic.distributor.dynamic;
import teetime.framework.DynamicOutputPort;
import teetime.framework.OutputPort;
import teetime.util.framework.port.PortAction;
/**
......@@ -35,8 +35,8 @@ public class RemovePortActionDelegation<T> implements PortAction<DynamicDistribu
@Override
public void execute(final DynamicDistributor<T> dynamicDistributor) {
DynamicOutputPort<?> dynamicOutputPort = portContainer.getPort();
dynamicDistributor.removeDynamicPort(dynamicOutputPort);
OutputPort<?> outputPort = portContainer.getPort();
dynamicDistributor.removeDynamicPort(outputPort);
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment