From b8afbe23a733c7ffdf264a9b7da2bbc3bedbb41b Mon Sep 17 00:00:00 2001 From: Christoph Dornieden <cdor@informatik.uni-kiel.de> Date: Wed, 6 Jul 2016 15:42:46 +0200 Subject: [PATCH] created new merger strategy to avoid blocking --- ...SkippingBusyWaitingRoundRobinStrategy.java | 57 +++++++++++++++++++ .../stage/taskfarm/TaskFarmStagecdor.java | 8 ++- .../threadAssignment/CPUTestExecution.java | 1 - .../SimpleThreadAssignmentTest.java | 4 +- 4 files changed, 66 insertions(+), 4 deletions(-) create mode 100644 src/main/java/teetime/stage/basic/merger/strategy/SkippingBusyWaitingRoundRobinStrategy.java diff --git a/src/main/java/teetime/stage/basic/merger/strategy/SkippingBusyWaitingRoundRobinStrategy.java b/src/main/java/teetime/stage/basic/merger/strategy/SkippingBusyWaitingRoundRobinStrategy.java new file mode 100644 index 000000000..f8453e03c --- /dev/null +++ b/src/main/java/teetime/stage/basic/merger/strategy/SkippingBusyWaitingRoundRobinStrategy.java @@ -0,0 +1,57 @@ +/** + * Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://teetime-framework.github.io) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package teetime.stage.basic.merger.strategy; + +import teetime.framework.InputPort; +import teetime.stage.basic.merger.Merger; + +import java.util.List; + +/** + * @author Christian Wulf + * + * @since 2.0 + */ +public final class SkippingBusyWaitingRoundRobinStrategy implements IMergerStrategy { + + private int index = 0; + + @Override + public <T> T getNextInput(final Merger<T> merger) { + final List<InputPort<?>> inputPorts = merger.getInputPorts(); + final int startedIndex = index; + + @SuppressWarnings("unchecked") + // handles pipe.isClosed() and pipe.isEmpty() + T token = (T) inputPorts.get(index).receive(); + while (token == null) { + this.index = (this.index + 1) % inputPorts.size(); + if (index == startedIndex) { + return null; + } + token = (T) inputPorts.get(index).receive(); + } + + return token; + } + + @Override + public void onPortRemoved(final InputPort<?> removedInputPort) { + Merger<?> merger = (Merger<?>) removedInputPort.getOwningStage(); + // correct the index if it is out-of-bounds + this.index = (this.index + 1) % merger.getInputPorts().size(); + } +} diff --git a/src/main/java/teetime/stage/taskfarm/TaskFarmStagecdor.java b/src/main/java/teetime/stage/taskfarm/TaskFarmStagecdor.java index 4082084c4..640491eec 100644 --- a/src/main/java/teetime/stage/taskfarm/TaskFarmStagecdor.java +++ b/src/main/java/teetime/stage/taskfarm/TaskFarmStagecdor.java @@ -29,6 +29,8 @@ import teetime.stage.basic.distributor.dynamic.DynamicDistributor; import teetime.stage.basic.distributor.dynamic.RemovePortActionDistributor; import teetime.stage.basic.merger.dynamic.CreatePortActionMerger; import teetime.stage.basic.merger.dynamic.DynamicMerger; +import teetime.stage.basic.merger.strategy.IMergerStrategy; +import teetime.stage.basic.merger.strategy.SkippingBusyWaitingRoundRobinStrategy; import teetime.stage.taskfarm.exception.TaskFarmControllerException; import teetime.stage.taskfarm.exception.TaskFarmInvalidPipeException; import teetime.util.framework.port.PortAction; @@ -90,7 +92,7 @@ public final class TaskFarmStagecdor<I, O, T extends ITaskFarmDuplicablecdor<I, } if (merger == null) { - this.merger = new MultiStageMerger<O>(); + this.merger = new MultiStageMerger<O>(new SkippingBusyWaitingRoundRobinStrategy()); } else { this.merger = merger; } @@ -309,6 +311,10 @@ public final class TaskFarmStagecdor<I, O, T extends ITaskFarmDuplicablecdor<I, private static class MultiStageMerger<T> extends DynamicMerger<T> { + public MultiStageMerger(IMergerStrategy strategy) { + super(strategy); + } + @Override public void onTerminating() throws Exception { // foreach on portActions is not implemented, so we iterate by ourselves diff --git a/src/test/java/teetime/framework/threadAssignment/CPUTestExecution.java b/src/test/java/teetime/framework/threadAssignment/CPUTestExecution.java index 98c47d36f..5bbbd2d62 100644 --- a/src/test/java/teetime/framework/threadAssignment/CPUTestExecution.java +++ b/src/test/java/teetime/framework/threadAssignment/CPUTestExecution.java @@ -63,7 +63,6 @@ public class CPUTestExecution { System.out.println("Warmup #" + i + " started"); // final MultiStageMixedWorkloadConfiguration configuration = execute(arguments); final PLHHSConfiguration configuration = execute(arguments); - } for (int i = 0; i < arguments.getRealExecutions(); i++) { diff --git a/src/test/java/teetime/framework/threadAssignment/SimpleThreadAssignmentTest.java b/src/test/java/teetime/framework/threadAssignment/SimpleThreadAssignmentTest.java index 30f8cd090..9e06a5fd9 100644 --- a/src/test/java/teetime/framework/threadAssignment/SimpleThreadAssignmentTest.java +++ b/src/test/java/teetime/framework/threadAssignment/SimpleThreadAssignmentTest.java @@ -17,7 +17,7 @@ public class SimpleThreadAssignmentTest { "--warmup", "5", "--real","10", "--count","20000", - "--assignment", simple, + "--assignment", stable, "--highWorkload","20000" ,"--dir","result" ,"--metric","PushPullDifference" @@ -25,7 +25,7 @@ public class SimpleThreadAssignmentTest { , "-nos", "10" - ,"-q", "1" + ,"-q", "2" }; CPUTestExecution.main(args); -- GitLab