diff --git a/src/main/java/teetime/stage/basic/merger/strategy/SkippingBusyWaitingRoundRobinStrategy.java b/src/main/java/teetime/stage/basic/merger/strategy/SkippingBusyWaitingRoundRobinStrategy.java new file mode 100644 index 0000000000000000000000000000000000000000..f8453e03cf7f20d1348925cc050fa8663afaa4cc --- /dev/null +++ b/src/main/java/teetime/stage/basic/merger/strategy/SkippingBusyWaitingRoundRobinStrategy.java @@ -0,0 +1,57 @@ +/** + * Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://teetime-framework.github.io) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package teetime.stage.basic.merger.strategy; + +import teetime.framework.InputPort; +import teetime.stage.basic.merger.Merger; + +import java.util.List; + +/** + * @author Christian Wulf + * + * @since 2.0 + */ +public final class SkippingBusyWaitingRoundRobinStrategy implements IMergerStrategy { + + private int index = 0; + + @Override + public <T> T getNextInput(final Merger<T> merger) { + final List<InputPort<?>> inputPorts = merger.getInputPorts(); + final int startedIndex = index; + + @SuppressWarnings("unchecked") + // handles pipe.isClosed() and pipe.isEmpty() + T token = (T) inputPorts.get(index).receive(); + while (token == null) { + this.index = (this.index + 1) % inputPorts.size(); + if (index == startedIndex) { + return null; + } + token = (T) inputPorts.get(index).receive(); + } + + return token; + } + + @Override + public void onPortRemoved(final InputPort<?> removedInputPort) { + Merger<?> merger = (Merger<?>) removedInputPort.getOwningStage(); + // correct the index if it is out-of-bounds + this.index = (this.index + 1) % merger.getInputPorts().size(); + } +} diff --git a/src/main/java/teetime/stage/taskfarm/TaskFarmStagecdor.java b/src/main/java/teetime/stage/taskfarm/TaskFarmStagecdor.java index 4082084c424877aeb0cb3cffe4a70462bb2a8a07..640491eec74bee0389ecbe42bc7a4167245ac4af 100644 --- a/src/main/java/teetime/stage/taskfarm/TaskFarmStagecdor.java +++ b/src/main/java/teetime/stage/taskfarm/TaskFarmStagecdor.java @@ -29,6 +29,8 @@ import teetime.stage.basic.distributor.dynamic.DynamicDistributor; import teetime.stage.basic.distributor.dynamic.RemovePortActionDistributor; import teetime.stage.basic.merger.dynamic.CreatePortActionMerger; import teetime.stage.basic.merger.dynamic.DynamicMerger; +import teetime.stage.basic.merger.strategy.IMergerStrategy; +import teetime.stage.basic.merger.strategy.SkippingBusyWaitingRoundRobinStrategy; import teetime.stage.taskfarm.exception.TaskFarmControllerException; import teetime.stage.taskfarm.exception.TaskFarmInvalidPipeException; import teetime.util.framework.port.PortAction; @@ -90,7 +92,7 @@ public final class TaskFarmStagecdor<I, O, T extends ITaskFarmDuplicablecdor<I, } if (merger == null) { - this.merger = new MultiStageMerger<O>(); + this.merger = new MultiStageMerger<O>(new SkippingBusyWaitingRoundRobinStrategy()); } else { this.merger = merger; } @@ -309,6 +311,10 @@ public final class TaskFarmStagecdor<I, O, T extends ITaskFarmDuplicablecdor<I, private static class MultiStageMerger<T> extends DynamicMerger<T> { + public MultiStageMerger(IMergerStrategy strategy) { + super(strategy); + } + @Override public void onTerminating() throws Exception { // foreach on portActions is not implemented, so we iterate by ourselves diff --git a/src/test/java/teetime/framework/threadAssignment/CPUTestExecution.java b/src/test/java/teetime/framework/threadAssignment/CPUTestExecution.java index 98c47d36f1cd14ec913a869b8df0e6625907fabb..5bbbd2d6250d262a845096fffb9db075ea14519d 100644 --- a/src/test/java/teetime/framework/threadAssignment/CPUTestExecution.java +++ b/src/test/java/teetime/framework/threadAssignment/CPUTestExecution.java @@ -63,7 +63,6 @@ public class CPUTestExecution { System.out.println("Warmup #" + i + " started"); // final MultiStageMixedWorkloadConfiguration configuration = execute(arguments); final PLHHSConfiguration configuration = execute(arguments); - } for (int i = 0; i < arguments.getRealExecutions(); i++) { diff --git a/src/test/java/teetime/framework/threadAssignment/SimpleThreadAssignmentTest.java b/src/test/java/teetime/framework/threadAssignment/SimpleThreadAssignmentTest.java index 30f8cd0909892ca1760db8f51f4b7b2c5592e362..9e06a5fd90f54757d865cce42349e2af1cf0081f 100644 --- a/src/test/java/teetime/framework/threadAssignment/SimpleThreadAssignmentTest.java +++ b/src/test/java/teetime/framework/threadAssignment/SimpleThreadAssignmentTest.java @@ -17,7 +17,7 @@ public class SimpleThreadAssignmentTest { "--warmup", "5", "--real","10", "--count","20000", - "--assignment", simple, + "--assignment", stable, "--highWorkload","20000" ,"--dir","result" ,"--metric","PushPullDifference" @@ -25,7 +25,7 @@ public class SimpleThreadAssignmentTest { , "-nos", "10" - ,"-q", "1" + ,"-q", "2" }; CPUTestExecution.main(args);