Skip to content
Snippets Groups Projects
Commit 07029411 authored by Christian Wulf's avatar Christian Wulf
Browse files

added generic distributor

parent 0e3067c9
No related branches found
No related tags found
No related merge requests found
/***************************************************************************
* Copyright 2014 Kieker Project (http://kieker-monitoring.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
***************************************************************************/
package teetime.variant.methodcallWithPorts.stage.basic.distributor;
import java.util.List;
import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
/**
* @author Nils Christian Ehmke
*
* @since 1.10
*/
public final class CloneStrategy<T> implements IDistributorStrategy<T> {
@Override
public boolean distribute(final List<OutputPort<T>> outputPorts, final T element) {
throw new UnsupportedOperationException();
}
}
/***************************************************************************
* Copyright 2014 Kieker Project (http://kieker-monitoring.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
***************************************************************************/
package teetime.variant.methodcallWithPorts.stage.basic.distributor;
import java.util.List;
import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
/**
* @author Nils Christian Ehmke
*
* @since 1.10
*/
public final class CopyByReferenceStrategy<T> implements IDistributorStrategy<T> {
@Override
public boolean distribute(final List<OutputPort<T>> outputPorts, final T element) {
for (final OutputPort<T> port : outputPorts) {
port.send(element);
}
return true;
}
}
/***************************************************************************
* Copyright 2014 Kieker Project (http://kieker-monitoring.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
***************************************************************************/
package teetime.variant.methodcallWithPorts.stage.basic.distributor;
import java.util.ArrayList;
import java.util.List;
import teetime.variant.methodcallWithPorts.framework.core.AbstractStage;
import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
/**
* @author Christian Wulf
*
* @since 1.10
*
* @param T
* the type of the input port and the output ports
*/
public class Distributor<T> extends AbstractStage<T, T> {
// TODO do not inherit from AbstractStage since it provides the default output port that is unnecessary for the distributor ConsumerStage<T, T> {
// BETTER use an array since a list always creates a new iterator when looping
private final List<OutputPort<T>> outputPortList = new ArrayList<OutputPort<T>>();
private IDistributorStrategy<T> strategy = new RoundRobinStrategy<T>();
public IDistributorStrategy<T> getStrategy() {
return this.strategy;
}
public void setStrategy(final IDistributorStrategy<T> strategy) {
this.strategy = strategy;
}
@Override
protected void execute5(final T element) {
this.strategy.distribute(this.outputPortList, element);
}
@Override
public void onIsPipelineHead() {
for (OutputPort<T> op : this.outputPortList) {
op.getPipe().close();
System.out.println("End signal sent, size: " + op.getPipe().size());
}
}
@Override
public OutputPort<T> getOutputPort() {
return this.getNewOutputPort();
}
public OutputPort<T> getNewOutputPort() {
final OutputPort<T> newOutputPort = new OutputPort<T>();
this.outputPortList.add(newOutputPort);
return newOutputPort;
}
public List<OutputPort<T>> getOutputPortList() {
return this.outputPortList;
}
@Override
public void executeWithPorts() {
T element = this.getInputPort().receive();
this.setReschedulable(this.getInputPort().getPipe().size() > 0);
this.execute5(element);
}
}
/***************************************************************************
* Copyright 2014 Kieker Project (http://kieker-monitoring.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
***************************************************************************/
package teetime.variant.methodcallWithPorts.stage.basic.distributor;
import java.util.List;
import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
/**
* @author Nils Christian Ehmke
*
* @since 1.10
*/
public interface IDistributorStrategy<T> {
public boolean distribute(final List<OutputPort<T>> allOutputPorts, final T element);
}
/***************************************************************************
* Copyright 2014 Kieker Project (http://kieker-monitoring.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
***************************************************************************/
package teetime.variant.methodcallWithPorts.stage.basic.distributor;
import java.util.List;
import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
/**
* @author Nils Christian Ehmke
*
* @since 1.10
*/
public final class RoundRobinStrategy<T> implements IDistributorStrategy<T> {
private int index = 0;
@Override
public boolean distribute(final List<OutputPort<T>> outputPorts, final T element) {
final OutputPort<T> outputPort = this.getNextPortInRoundRobinOrder(outputPorts);
outputPort.send(element);
return true;
}
private OutputPort<T> getNextPortInRoundRobinOrder(final List<OutputPort<T>> outputPorts) {
final OutputPort<T> outputPort = outputPorts.get(this.index);
this.index = (this.index + 1) % outputPorts.size();
return outputPort;
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment