Skip to content
Snippets Groups Projects
Commit b8afbe23 authored by Christoph Dornieden's avatar Christoph Dornieden
Browse files

created new merger strategy to avoid blocking

parent 70ad5ed9
No related branches found
No related tags found
No related merge requests found
/**
* Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://teetime-framework.github.io)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package teetime.stage.basic.merger.strategy;
import teetime.framework.InputPort;
import teetime.stage.basic.merger.Merger;
import java.util.List;
/**
* @author Christian Wulf
*
* @since 2.0
*/
public final class SkippingBusyWaitingRoundRobinStrategy implements IMergerStrategy {
private int index = 0;
@Override
public <T> T getNextInput(final Merger<T> merger) {
final List<InputPort<?>> inputPorts = merger.getInputPorts();
final int startedIndex = index;
@SuppressWarnings("unchecked")
// handles pipe.isClosed() and pipe.isEmpty()
T token = (T) inputPorts.get(index).receive();
while (token == null) {
this.index = (this.index + 1) % inputPorts.size();
if (index == startedIndex) {
return null;
}
token = (T) inputPorts.get(index).receive();
}
return token;
}
@Override
public void onPortRemoved(final InputPort<?> removedInputPort) {
Merger<?> merger = (Merger<?>) removedInputPort.getOwningStage();
// correct the index if it is out-of-bounds
this.index = (this.index + 1) % merger.getInputPorts().size();
}
}
......@@ -29,6 +29,8 @@ import teetime.stage.basic.distributor.dynamic.DynamicDistributor;
import teetime.stage.basic.distributor.dynamic.RemovePortActionDistributor;
import teetime.stage.basic.merger.dynamic.CreatePortActionMerger;
import teetime.stage.basic.merger.dynamic.DynamicMerger;
import teetime.stage.basic.merger.strategy.IMergerStrategy;
import teetime.stage.basic.merger.strategy.SkippingBusyWaitingRoundRobinStrategy;
import teetime.stage.taskfarm.exception.TaskFarmControllerException;
import teetime.stage.taskfarm.exception.TaskFarmInvalidPipeException;
import teetime.util.framework.port.PortAction;
......@@ -90,7 +92,7 @@ public final class TaskFarmStagecdor<I, O, T extends ITaskFarmDuplicablecdor<I,
}
if (merger == null) {
this.merger = new MultiStageMerger<O>();
this.merger = new MultiStageMerger<O>(new SkippingBusyWaitingRoundRobinStrategy());
} else {
this.merger = merger;
}
......@@ -309,6 +311,10 @@ public final class TaskFarmStagecdor<I, O, T extends ITaskFarmDuplicablecdor<I,
private static class MultiStageMerger<T> extends DynamicMerger<T> {
public MultiStageMerger(IMergerStrategy strategy) {
super(strategy);
}
@Override
public void onTerminating() throws Exception {
// foreach on portActions is not implemented, so we iterate by ourselves
......
......@@ -63,7 +63,6 @@ public class CPUTestExecution {
System.out.println("Warmup #" + i + " started");
// final MultiStageMixedWorkloadConfiguration configuration = execute(arguments);
final PLHHSConfiguration configuration = execute(arguments);
}
for (int i = 0; i < arguments.getRealExecutions(); i++) {
......
......@@ -17,7 +17,7 @@ public class SimpleThreadAssignmentTest {
"--warmup", "5",
"--real","10",
"--count","20000",
"--assignment", simple,
"--assignment", stable,
"--highWorkload","20000"
,"--dir","result"
,"--metric","PushPullDifference"
......@@ -25,7 +25,7 @@ public class SimpleThreadAssignmentTest {
, "-nos", "10"
,"-q", "1"
,"-q", "2"
};
CPUTestExecution.main(args);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment