From 0702941176cc544ca8382460164c16fbff31c077 Mon Sep 17 00:00:00 2001 From: Christian Wulf <chw@informatik.uni-kiel.de> Date: Wed, 2 Jul 2014 18:39:50 +0200 Subject: [PATCH] added generic distributor --- .../basic/distributor/CloneStrategy.java | 34 ++++++++ .../distributor/CopyByReferenceStrategy.java | 38 ++++++++ .../stage/basic/distributor/Distributor.java | 86 +++++++++++++++++++ .../distributor/IDistributorStrategy.java | 31 +++++++ .../basic/distributor/RoundRobinStrategy.java | 47 ++++++++++ 5 files changed, 236 insertions(+) create mode 100644 src/main/java/teetime/variant/methodcallWithPorts/stage/basic/distributor/CloneStrategy.java create mode 100644 src/main/java/teetime/variant/methodcallWithPorts/stage/basic/distributor/CopyByReferenceStrategy.java create mode 100644 src/main/java/teetime/variant/methodcallWithPorts/stage/basic/distributor/Distributor.java create mode 100644 src/main/java/teetime/variant/methodcallWithPorts/stage/basic/distributor/IDistributorStrategy.java create mode 100644 src/main/java/teetime/variant/methodcallWithPorts/stage/basic/distributor/RoundRobinStrategy.java diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/distributor/CloneStrategy.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/distributor/CloneStrategy.java new file mode 100644 index 00000000..523b150e --- /dev/null +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/distributor/CloneStrategy.java @@ -0,0 +1,34 @@ +/*************************************************************************** + * Copyright 2014 Kieker Project (http://kieker-monitoring.net) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ***************************************************************************/ +package teetime.variant.methodcallWithPorts.stage.basic.distributor; + +import java.util.List; + +import teetime.variant.methodcallWithPorts.framework.core.OutputPort; + +/** + * @author Nils Christian Ehmke + * + * @since 1.10 + */ +public final class CloneStrategy<T> implements IDistributorStrategy<T> { + + @Override + public boolean distribute(final List<OutputPort<T>> outputPorts, final T element) { + throw new UnsupportedOperationException(); + } + +} diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/distributor/CopyByReferenceStrategy.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/distributor/CopyByReferenceStrategy.java new file mode 100644 index 00000000..0fa5efc0 --- /dev/null +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/distributor/CopyByReferenceStrategy.java @@ -0,0 +1,38 @@ +/*************************************************************************** + * Copyright 2014 Kieker Project (http://kieker-monitoring.net) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ***************************************************************************/ +package teetime.variant.methodcallWithPorts.stage.basic.distributor; + +import java.util.List; + +import teetime.variant.methodcallWithPorts.framework.core.OutputPort; + +/** + * @author Nils Christian Ehmke + * + * @since 1.10 + */ +public final class CopyByReferenceStrategy<T> implements IDistributorStrategy<T> { + + @Override + public boolean distribute(final List<OutputPort<T>> outputPorts, final T element) { + for (final OutputPort<T> port : outputPorts) { + port.send(element); + } + + return true; + } + +} diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/distributor/Distributor.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/distributor/Distributor.java new file mode 100644 index 00000000..8fc677ae --- /dev/null +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/distributor/Distributor.java @@ -0,0 +1,86 @@ +/*************************************************************************** + * Copyright 2014 Kieker Project (http://kieker-monitoring.net) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ***************************************************************************/ + +package teetime.variant.methodcallWithPorts.stage.basic.distributor; + +import java.util.ArrayList; +import java.util.List; + +import teetime.variant.methodcallWithPorts.framework.core.AbstractStage; +import teetime.variant.methodcallWithPorts.framework.core.OutputPort; + +/** + * @author Christian Wulf + * + * @since 1.10 + * + * @param T + * the type of the input port and the output ports + */ +public class Distributor<T> extends AbstractStage<T, T> { + + // TODO do not inherit from AbstractStage since it provides the default output port that is unnecessary for the distributor ConsumerStage<T, T> { + + // BETTER use an array since a list always creates a new iterator when looping + private final List<OutputPort<T>> outputPortList = new ArrayList<OutputPort<T>>(); + + private IDistributorStrategy<T> strategy = new RoundRobinStrategy<T>(); + + public IDistributorStrategy<T> getStrategy() { + return this.strategy; + } + + public void setStrategy(final IDistributorStrategy<T> strategy) { + this.strategy = strategy; + } + + @Override + protected void execute5(final T element) { + this.strategy.distribute(this.outputPortList, element); + } + + @Override + public void onIsPipelineHead() { + for (OutputPort<T> op : this.outputPortList) { + op.getPipe().close(); + System.out.println("End signal sent, size: " + op.getPipe().size()); + } + } + + @Override + public OutputPort<T> getOutputPort() { + return this.getNewOutputPort(); + } + + public OutputPort<T> getNewOutputPort() { + final OutputPort<T> newOutputPort = new OutputPort<T>(); + this.outputPortList.add(newOutputPort); + return newOutputPort; + } + + public List<OutputPort<T>> getOutputPortList() { + return this.outputPortList; + } + + @Override + public void executeWithPorts() { + T element = this.getInputPort().receive(); + + this.setReschedulable(this.getInputPort().getPipe().size() > 0); + + this.execute5(element); + } +} diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/distributor/IDistributorStrategy.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/distributor/IDistributorStrategy.java new file mode 100644 index 00000000..3906263d --- /dev/null +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/distributor/IDistributorStrategy.java @@ -0,0 +1,31 @@ +/*************************************************************************** + * Copyright 2014 Kieker Project (http://kieker-monitoring.net) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ***************************************************************************/ +package teetime.variant.methodcallWithPorts.stage.basic.distributor; + +import java.util.List; + +import teetime.variant.methodcallWithPorts.framework.core.OutputPort; + +/** + * @author Nils Christian Ehmke + * + * @since 1.10 + */ +public interface IDistributorStrategy<T> { + + public boolean distribute(final List<OutputPort<T>> allOutputPorts, final T element); + +} diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/distributor/RoundRobinStrategy.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/distributor/RoundRobinStrategy.java new file mode 100644 index 00000000..13b27fa3 --- /dev/null +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/distributor/RoundRobinStrategy.java @@ -0,0 +1,47 @@ +/*************************************************************************** + * Copyright 2014 Kieker Project (http://kieker-monitoring.net) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ***************************************************************************/ +package teetime.variant.methodcallWithPorts.stage.basic.distributor; + +import java.util.List; + +import teetime.variant.methodcallWithPorts.framework.core.OutputPort; + +/** + * @author Nils Christian Ehmke + * + * @since 1.10 + */ +public final class RoundRobinStrategy<T> implements IDistributorStrategy<T> { + + private int index = 0; + + @Override + public boolean distribute(final List<OutputPort<T>> outputPorts, final T element) { + final OutputPort<T> outputPort = this.getNextPortInRoundRobinOrder(outputPorts); + outputPort.send(element); + + return true; + } + + private OutputPort<T> getNextPortInRoundRobinOrder(final List<OutputPort<T>> outputPorts) { + final OutputPort<T> outputPort = outputPorts.get(this.index); + + this.index = (this.index + 1) % outputPorts.size(); + + return outputPort; + } + +} -- GitLab