Skip to content
Snippets Groups Projects

Add Distributed Workload Generator

3 files
+ 17
43
Compare changes
  • Side-by-side
  • Inline
Files
3
@@ -15,7 +15,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import theodolite.commons.workloadgeneration.dimensions.KeySpace;
import theodolite.commons.workloadgeneration.functions.BeforeAction;
import theodolite.commons.workloadgeneration.misc.Worker;
import theodolite.commons.workloadgeneration.misc.WorkloadDefinition;
import theodolite.commons.workloadgeneration.misc.ZooKeeper;
@@ -34,7 +33,7 @@ public class WorkloadDistributor {
private final DistributedAtomicInteger counter;
private final KeySpace keySpace;
private final BeforeAction beforeAction;
private final BiConsumer<WorkloadDefinition, Worker> workerAction;
private final BiConsumer<WorkloadDefinition, Integer> workerAction;
private final ZooKeeper zooKeeper;
private final CuratorFramework client;
@@ -50,7 +49,7 @@ public class WorkloadDistributor {
final ZooKeeper zooKeeper,
final KeySpace keySpace,
final BeforeAction beforeAction,
final BiConsumer<WorkloadDefinition, Worker> workerAction) {
final BiConsumer<WorkloadDefinition, Integer> workerAction) {
this.zooKeeper = zooKeeper;
this.keySpace = keySpace;
this.beforeAction = beforeAction;
@@ -86,14 +85,14 @@ public class WorkloadDistributor {
result = this.counter.increment();
}
final Worker worker = new Worker(result.preValue());
final int workerId = result.preValue();
final CuratorWatcher watcher = this.buildWatcher(worker);
final CuratorWatcher watcher = this.buildWatcher(workerId);
this.client.checkExists().creatingParentsIfNeeded().forPath(WORKLOAD_DEFINITION_PATH);
if (worker.getId() == 0) {
LOGGER.info("This instance is master with id {}", worker.getId());
if (workerId == 0) {
LOGGER.info("This instance is master with id {}", workerId);
this.beforeAction.run();
@@ -113,7 +112,7 @@ public class WorkloadDistributor {
definition.toString().getBytes(StandardCharsets.UTF_8));
} else {
LOGGER.info("This instance is worker with id {}", worker.getId());
LOGGER.info("This instance is worker with id {}", workerId);
this.client.getChildren().usingWatcher(watcher).forPath(WORKLOAD_PATH);
}
@@ -131,7 +130,7 @@ public class WorkloadDistributor {
* @param worker the worker to create the watcher for.
* @return the curator watcher.
*/
private CuratorWatcher buildWatcher(final Worker worker) {
private CuratorWatcher buildWatcher(final int workerId) {
return new CuratorWatcher() {
@Override
@@ -142,13 +141,13 @@ public class WorkloadDistributor {
final WorkloadDefinition definition =
WorkloadDefinition.fromString(new String(bytes, StandardCharsets.UTF_8));
if (worker.getId() > definition.getNumberOfWorkers() - 1) {
if (workerId > definition.getNumberOfWorkers() - 1) {
LOGGER.warn("Worker with id {} was to slow and is therefore in idle state",
worker.getId());
workerId);
WorkloadDistributor.this.workerAction.accept(new WorkloadDefinition(new KeySpace(0), 0),
worker); // this worker generates no workload
workerId); // this worker generates no workload
} else {
WorkloadDistributor.this.workerAction.accept(definition, worker);
WorkloadDistributor.this.workerAction.accept(definition, workerId);
}
}
}
Loading