Skip to content
Snippets Groups Projects
Commit e0ee0a91 authored by Simon Ehrenstein's avatar Simon Ehrenstein
Browse files

Remove Worker class

parent ea2aa9ee
No related branches found
No related tags found
1 merge request!6Add Distributed Workload Generator
......@@ -15,7 +15,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import theodolite.commons.workloadgeneration.dimensions.KeySpace;
import theodolite.commons.workloadgeneration.functions.BeforeAction;
import theodolite.commons.workloadgeneration.misc.Worker;
import theodolite.commons.workloadgeneration.misc.WorkloadDefinition;
import theodolite.commons.workloadgeneration.misc.ZooKeeper;
......@@ -34,7 +33,7 @@ public class WorkloadDistributor {
private final DistributedAtomicInteger counter;
private final KeySpace keySpace;
private final BeforeAction beforeAction;
private final BiConsumer<WorkloadDefinition, Worker> workerAction;
private final BiConsumer<WorkloadDefinition, Integer> workerAction;
private final ZooKeeper zooKeeper;
private final CuratorFramework client;
......@@ -50,7 +49,7 @@ public class WorkloadDistributor {
final ZooKeeper zooKeeper,
final KeySpace keySpace,
final BeforeAction beforeAction,
final BiConsumer<WorkloadDefinition, Worker> workerAction) {
final BiConsumer<WorkloadDefinition, Integer> workerAction) {
this.zooKeeper = zooKeeper;
this.keySpace = keySpace;
this.beforeAction = beforeAction;
......@@ -86,14 +85,14 @@ public class WorkloadDistributor {
result = this.counter.increment();
}
final Worker worker = new Worker(result.preValue());
final int workerId = result.preValue();
final CuratorWatcher watcher = this.buildWatcher(worker);
final CuratorWatcher watcher = this.buildWatcher(workerId);
this.client.checkExists().creatingParentsIfNeeded().forPath(WORKLOAD_DEFINITION_PATH);
if (worker.getId() == 0) {
LOGGER.info("This instance is master with id {}", worker.getId());
if (workerId == 0) {
LOGGER.info("This instance is master with id {}", workerId);
this.beforeAction.run();
......@@ -113,7 +112,7 @@ public class WorkloadDistributor {
definition.toString().getBytes(StandardCharsets.UTF_8));
} else {
LOGGER.info("This instance is worker with id {}", worker.getId());
LOGGER.info("This instance is worker with id {}", workerId);
this.client.getChildren().usingWatcher(watcher).forPath(WORKLOAD_PATH);
}
......@@ -131,7 +130,7 @@ public class WorkloadDistributor {
* @param worker the worker to create the watcher for.
* @return the curator watcher.
*/
private CuratorWatcher buildWatcher(final Worker worker) {
private CuratorWatcher buildWatcher(final int workerId) {
return new CuratorWatcher() {
@Override
......@@ -142,13 +141,13 @@ public class WorkloadDistributor {
final WorkloadDefinition definition =
WorkloadDefinition.fromString(new String(bytes, StandardCharsets.UTF_8));
if (worker.getId() > definition.getNumberOfWorkers() - 1) {
if (workerId > definition.getNumberOfWorkers() - 1) {
LOGGER.warn("Worker with id {} was to slow and is therefore in idle state",
worker.getId());
workerId);
WorkloadDistributor.this.workerAction.accept(new WorkloadDefinition(new KeySpace(0), 0),
worker); // this worker generates no workload
workerId); // this worker generates no workload
} else {
WorkloadDistributor.this.workerAction.accept(definition, worker);
WorkloadDistributor.this.workerAction.accept(definition, workerId);
}
}
}
......
......@@ -17,7 +17,6 @@ import theodolite.commons.workloadgeneration.dimensions.KeySpace;
import theodolite.commons.workloadgeneration.functions.BeforeAction;
import theodolite.commons.workloadgeneration.functions.MessageGenerator;
import theodolite.commons.workloadgeneration.functions.Transport;
import theodolite.commons.workloadgeneration.misc.Worker;
import theodolite.commons.workloadgeneration.misc.WorkloadDefinition;
import theodolite.commons.workloadgeneration.misc.WorkloadEntity;
import theodolite.commons.workloadgeneration.misc.ZooKeeper;
......@@ -39,7 +38,7 @@ public abstract class AbstractWorkloadGenerator<T extends IMonitoringRecord>
private final BeforeAction beforeAction;
private final BiFunction<WorkloadDefinition, Worker, List<WorkloadEntity<T>>> workloadSelector;
private final BiFunction<WorkloadDefinition, Integer, List<WorkloadEntity<T>>> workloadSelector;
private final MessageGenerator<T> generatorFunction;
......@@ -79,11 +78,11 @@ public abstract class AbstractWorkloadGenerator<T extends IMonitoringRecord>
this.duration = duration;
this.beforeAction = beforeAction;
this.generatorFunction = generatorFunction;
this.workloadSelector = (workloadDefinition, worker) -> {
this.workloadSelector = (workloadDefinition, workerId) -> {
final List<WorkloadEntity<T>> workloadEntities = new LinkedList<>();
for (int i =
workloadDefinition.getKeySpace().getMin() + worker.getId(); i <= workloadDefinition
workloadDefinition.getKeySpace().getMin() + workerId; i <= workloadDefinition
.getKeySpace().getMax(); i += workloadDefinition.getNumberOfWorkers()) {
final String id = workloadDefinition.getKeySpace().getPrefix() + i;
workloadEntities.add(new WorkloadEntity<>(id, this.generatorFunction));
......@@ -98,9 +97,9 @@ public abstract class AbstractWorkloadGenerator<T extends IMonitoringRecord>
final int periodMs = period.getNano() / 1_000_000;
final BiConsumer<WorkloadDefinition, Worker> workerAction = (declaration, worker) -> {
final BiConsumer<WorkloadDefinition, Integer> workerAction = (declaration, workerId) -> {
final List<WorkloadEntity<T>> entities = this.workloadSelector.apply(declaration, worker);
final List<WorkloadEntity<T>> entities = this.workloadSelector.apply(declaration, workerId);
LOGGER.info("Beginning of Experiment...");
LOGGER.info("Experiment is going to be executed for the specified duration...");
......
package theodolite.commons.workloadgeneration.misc;
/*
* Wrapper class for a worker.
*/
public class Worker {
private final int id;
/**
* Create a new worker with an {@code id}
*
* @param id the id of the worker.
*/
public Worker(final int id) {
super();
this.id = id;
}
public int getId() {
return this.id;
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment