Skip to content
Snippets Groups Projects
Commit 6763848f authored by Simon Ehrenstein's avatar Simon Ehrenstein
Browse files

Change eclipse preferences and improve workload distribution

parent b9ad9a48
No related branches found
No related tags found
1 merge request!6Add Distributed Workload Generator
Showing with 74 additions and 28 deletions
......@@ -101,7 +101,7 @@ sp_cleanup.qualify_static_member_accesses_with_declaring_class=true
sp_cleanup.qualify_static_method_accesses_with_declaring_class=false
sp_cleanup.remove_private_constructors=true
sp_cleanup.remove_redundant_modifiers=false
sp_cleanup.remove_redundant_semicolons=false
sp_cleanup.remove_redundant_semicolons=true
sp_cleanup.remove_redundant_type_arguments=true
sp_cleanup.remove_trailing_whitespaces=true
sp_cleanup.remove_trailing_whitespaces_all=true
......
......@@ -101,7 +101,7 @@ sp_cleanup.qualify_static_member_accesses_with_declaring_class=true
sp_cleanup.qualify_static_method_accesses_with_declaring_class=false
sp_cleanup.remove_private_constructors=true
sp_cleanup.remove_redundant_modifiers=false
sp_cleanup.remove_redundant_semicolons=false
sp_cleanup.remove_redundant_semicolons=true
sp_cleanup.remove_redundant_type_arguments=true
sp_cleanup.remove_trailing_whitespaces=true
sp_cleanup.remove_trailing_whitespaces_all=true
......
......@@ -48,6 +48,8 @@ public final class LoadGenerator {
final String kafkaBatchSize = System.getenv("KAFKA_BATCH_SIZE");
final String kafkaLingerMs = System.getenv("KAFKA_LINGER_MS");
final String kafkaBufferMemory = System.getenv("KAFKA_BUFFER_MEMORY");
final int instances =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("INSTANCES"), "1"));
// create kafka record sender
final Properties kafkaProperties = new Properties();
......@@ -65,6 +67,7 @@ public final class LoadGenerator {
// create workload generator
final KafkaWorkloadGenerator<ActivePowerRecord> workloadGenerator =
KafkaWorkloadGeneratorBuilder.<ActivePowerRecord>builder()
.setInstances(instances)
.setKeySpace(new KeySpace("s_", numSensors))
.setThreads(threads)
.setPeriod(Duration.of(periodMs, ChronoUnit.MILLIS))
......
......@@ -101,7 +101,7 @@ sp_cleanup.qualify_static_member_accesses_with_declaring_class=true
sp_cleanup.qualify_static_method_accesses_with_declaring_class=false
sp_cleanup.remove_private_constructors=true
sp_cleanup.remove_redundant_modifiers=false
sp_cleanup.remove_redundant_semicolons=false
sp_cleanup.remove_redundant_semicolons=true
sp_cleanup.remove_redundant_type_arguments=true
sp_cleanup.remove_trailing_whitespaces=true
sp_cleanup.remove_trailing_whitespaces_all=true
......
......@@ -11,6 +11,7 @@ import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import theodolite.commons.workloadgeneration.dimensions.KeySpace;
......@@ -29,15 +30,17 @@ public class WorkloadDistributor {
private static final String COUNTER_PATH = "/counter";
private static final String WORKLOAD_PATH = "/workload";
private static final String WORKLOAD_DEFINITION_PATH = "/workload/definition";
private final DistributedAtomicInteger counter;
private final KeySpace keySpace;
private final BeforeAction beforeAction;
private final BiConsumer<WorkloadDefinition, Integer> workerAction;
private final int instances;
private final ZooKeeper zooKeeper;
private final CuratorFramework client;
private boolean workloadGenerationStarted = true;
/**
* Create a new workload distributor.
*
......@@ -46,10 +49,12 @@ public class WorkloadDistributor {
* @param workerAction the action to perform by the workers.
*/
public WorkloadDistributor(
final int instances,
final ZooKeeper zooKeeper,
final KeySpace keySpace,
final BeforeAction beforeAction,
final BiConsumer<WorkloadDefinition, Integer> workerAction) {
this.instances = instances;
this.zooKeeper = zooKeeper;
this.keySpace = keySpace;
this.beforeAction = beforeAction;
......@@ -89,7 +94,7 @@ public class WorkloadDistributor {
final CuratorWatcher watcher = this.buildWatcher(workerId);
this.client.checkExists().creatingParentsIfNeeded().forPath(WORKLOAD_DEFINITION_PATH);
this.client.checkExists().creatingParentsIfNeeded().forPath(WORKLOAD_PATH);
if (workerId == 0) {
LOGGER.info("This instance is master with id {}", workerId);
......@@ -99,14 +104,10 @@ public class WorkloadDistributor {
// register worker action, as master acts also as worker
this.client.getChildren().usingWatcher(watcher).forPath(WORKLOAD_PATH);
Thread.sleep(10000); // wait for all workers to participate in the leader election
final int numberOfWorkers = this.counter.get().postValue();
LOGGER.info("Number of Workers: {}", numberOfWorkers);
LOGGER.info("Number of Workers: {}", this.instances);
final WorkloadDefinition definition =
new WorkloadDefinition(this.keySpace, numberOfWorkers);
new WorkloadDefinition(this.keySpace, this.instances);
this.client.create().withMode(CreateMode.EPHEMERAL).forPath(WORKLOAD_DEFINITION_PATH,
definition.toString().getBytes(StandardCharsets.UTF_8));
......@@ -115,12 +116,39 @@ public class WorkloadDistributor {
LOGGER.info("This instance is worker with id {}", workerId);
this.client.getChildren().usingWatcher(watcher).forPath(WORKLOAD_PATH);
final Stat exists =
this.client.checkExists().creatingParentsIfNeeded().forPath(WORKLOAD_DEFINITION_PATH);
if (exists != null) {
this.startWorkloadGeneration(workerId);
}
}
Thread.sleep(20000); // wait until the workload definition is retrieved
Thread.sleep(20_000);
} catch (final Exception e) {
LOGGER.error("", e);
throw new IllegalStateException("Error when starting thze distribution of the workload.");
throw new IllegalStateException("Error when starting the distribution of the workload.");
}
}
/**
* Start the workload generation. This methods body does only get executed once.
*
* @param workerId the ID of this worker
* @throws Exception
*/
private synchronized void startWorkloadGeneration(final int workerId) throws Exception {
if (!this.workloadGenerationStarted) {
this.workloadGenerationStarted = true;
final byte[] bytes =
this.client.getData().forPath(WORKLOAD_DEFINITION_PATH);
final WorkloadDefinition definition =
WorkloadDefinition.fromString(new String(bytes, StandardCharsets.UTF_8));
this.workerAction.accept(definition, workerId);
}
}
......@@ -134,20 +162,13 @@ public class WorkloadDistributor {
return new CuratorWatcher() {
@Override
public void process(final WatchedEvent event) throws Exception {
public void process(final WatchedEvent event) {
if (event.getType() == EventType.NodeChildrenChanged) {
final byte[] bytes =
WorkloadDistributor.this.client.getData().forPath(WORKLOAD_DEFINITION_PATH);
final WorkloadDefinition definition =
WorkloadDefinition.fromString(new String(bytes, StandardCharsets.UTF_8));
if (workerId > definition.getNumberOfWorkers() - 1) {
LOGGER.warn("Worker with id {} was to slow and is therefore in idle state",
workerId);
WorkloadDistributor.this.workerAction.accept(new WorkloadDefinition(new KeySpace(0), 0),
workerId); // this worker generates no workload
} else {
WorkloadDistributor.this.workerAction.accept(definition, workerId);
try {
WorkloadDistributor.this.startWorkloadGeneration(workerId);
} catch (final Exception e) {
LOGGER.error("", e);
throw new IllegalStateException("Error starting workload generation.");
}
}
}
......
......@@ -26,6 +26,8 @@ public abstract class AbstractWorkloadGenerator<T extends IMonitoringRecord>
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractWorkloadGenerator.class);
private final int instances;
private final ZooKeeper zooKeeper;
private final KeySpace keySpace;
......@@ -63,6 +65,7 @@ public abstract class AbstractWorkloadGenerator<T extends IMonitoringRecord>
}
public AbstractWorkloadGenerator(
final int instances,
final ZooKeeper zooKeeper,
final KeySpace keySpace,
final int threads,
......@@ -71,6 +74,7 @@ public abstract class AbstractWorkloadGenerator<T extends IMonitoringRecord>
final BeforeAction beforeAction,
final MessageGenerator<T> generatorFunction,
final Transport<T> transport) {
this.instances = instances;
this.zooKeeper = zooKeeper;
this.period = period;
this.threads = threads;
......@@ -123,6 +127,7 @@ public abstract class AbstractWorkloadGenerator<T extends IMonitoringRecord>
};
this.workloadDistributor =
new WorkloadDistributor(this.zooKeeper, this.keySpace, this.beforeAction, workerAction);
new WorkloadDistributor(this.instances, this.zooKeeper, this.keySpace, this.beforeAction,
workerAction);
}
}
......@@ -32,6 +32,7 @@ public class KafkaWorkloadGenerator<T extends IMonitoringRecord>
* @param recordSender the record sender which is used to send the generated messages to kafka.
*/
public KafkaWorkloadGenerator(
final int instances,
final ZooKeeper zooKeeper,
final KeySpace keySpace,
final int threads,
......@@ -40,7 +41,8 @@ public class KafkaWorkloadGenerator<T extends IMonitoringRecord>
final BeforeAction beforeAction,
final MessageGenerator<T> generatorFunction,
final KafkaRecordSender<T> recordSender) {
super(zooKeeper, keySpace, threads, period, duration, beforeAction, generatorFunction,
super(instances, zooKeeper, keySpace, threads, period, duration, beforeAction,
generatorFunction,
recordSender);
this.recordSender = recordSender;
}
......
......@@ -11,6 +11,8 @@ import theodolite.commons.workloadgeneration.misc.ZooKeeper;
public class KafkaWorkloadGeneratorBuilder<T extends IMonitoringRecord> {
private int instances;
private ZooKeeper zooKeeper;
private KeySpace keySpace;
......@@ -40,6 +42,17 @@ public class KafkaWorkloadGeneratorBuilder<T extends IMonitoringRecord> {
return new KafkaWorkloadGeneratorBuilder<>();
}
/**
* Set the number of instances.
*
* @param zooKeeper a reference to the ZooKeeper instance.
* @return the builder.
*/
public KafkaWorkloadGeneratorBuilder<T> setInstances(final int instances) {
this.instances = instances;
return this;
}
/**
* Set the ZooKeeper reference.
*
......@@ -145,6 +158,7 @@ public class KafkaWorkloadGeneratorBuilder<T extends IMonitoringRecord> {
* @return the built instance of the {@link KafkaWorkloadGenerator}.
*/
public KafkaWorkloadGenerator<T> build() {
Objects.requireNonNull(this.instances, "Please specify the number of instances.");
Objects.requireNonNull(this.zooKeeper, "Please specify the ZooKeeper instance.");
this.threads = Objects.requireNonNullElse(this.threads, 1);
Objects.requireNonNull(this.keySpace, "Please specify the key space.");
......@@ -156,6 +170,7 @@ public class KafkaWorkloadGeneratorBuilder<T extends IMonitoringRecord> {
Objects.requireNonNull(this.kafkaRecordSender, "Please specify the kafka record sender.");
return new KafkaWorkloadGenerator<>(
this.instances,
this.zooKeeper,
this.keySpace,
this.threads,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment