From f86d0ef5b8325221b3ad72495ab46a1da6a9c850 Mon Sep 17 00:00:00 2001 From: Christian Wulf <chw@informatik.uni-kiel.de> Date: Thu, 18 Jun 2015 12:41:27 +0200 Subject: [PATCH] added first draft of DynamicDistributor --- .../basic/distributor/DynamicDistributor.java | 60 +++++++++++++++++++ 1 file changed, 60 insertions(+) create mode 100644 src/main/java/teetime/stage/basic/distributor/DynamicDistributor.java diff --git a/src/main/java/teetime/stage/basic/distributor/DynamicDistributor.java b/src/main/java/teetime/stage/basic/distributor/DynamicDistributor.java new file mode 100644 index 00000000..a7dc80d7 --- /dev/null +++ b/src/main/java/teetime/stage/basic/distributor/DynamicDistributor.java @@ -0,0 +1,60 @@ +package teetime.stage.basic.distributor; + +import teetime.framework.InputPort; +import teetime.framework.OutputPort; +import teetime.framework.pipe.SpScPipeFactory; + +public class DynamicDistributor<T> extends Distributor<T> { + + private static final SpScPipeFactory spScPipeFactory = new SpScPipeFactory(); + + public enum DynamicPortAction { + CREATE, REMOVE; + } + + public static class DynamicPortActionContainer<T> { + private final DynamicPortAction dynamicPortAction; + private final InputPort<T> inputPort; + + public DynamicPortActionContainer(final DynamicPortAction dynamicPortAction, final InputPort<T> inputPort) { + super(); + this.dynamicPortAction = dynamicPortAction; + this.inputPort = inputPort; + } + + public DynamicPortAction getDynamicPortAction() { + return dynamicPortAction; + } + + public InputPort<T> getInputPort() { + return inputPort; + } + + } + + @SuppressWarnings("rawtypes") + private final InputPort<DynamicPortActionContainer> dynamicPortActionInputPort = createInputPort(DynamicPortActionContainer.class); + + @SuppressWarnings("unchecked") + @Override + protected void execute(final T element) { + DynamicPortActionContainer<T> dynamicPortAction = dynamicPortActionInputPort.receive(); + switch (dynamicPortAction.dynamicPortAction) { + case CREATE: + OutputPort<T> newOutputPort = createOutputPort(); + InputPort<T> newInputPort = dynamicPortAction.inputPort; + spScPipeFactory.create(newOutputPort, newInputPort); + break; + case REMOVE: + // TODO implement "remove port at runtime" + break; + default: + if (logger.isWarnEnabled()) { + logger.warn("Unhandled switch case of " + DynamicPortAction.class.getName() + ": " + dynamicPortAction.dynamicPortAction); + } + break; + } + + this.strategy.distribute((OutputPort<T>[]) this.getOutputPorts(), element); + } +} -- GitLab