Skip to content
Snippets Groups Projects
Commit 27d79084 authored by Sören Henning's avatar Sören Henning
Browse files

Remove old classes for load generation with ZooKeeper

parent f350b623
No related branches found
No related tags found
No related merge requests found
Showing
with 0 additions and 726 deletions
package theodolite.commons.workloadgeneration.communication.zookeeper;
import java.nio.charset.StandardCharsets;
import java.util.function.BiConsumer;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.curator.framework.recipes.atomic.AtomicValue;
import org.apache.curator.framework.recipes.atomic.DistributedAtomicInteger;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import theodolite.commons.workloadgeneration.BeforeAction;
import theodolite.commons.workloadgeneration.KeySpace;
import theodolite.commons.workloadgeneration.misc.WorkloadDefinition;
import theodolite.commons.workloadgeneration.misc.ZooKeeper;
/**
* The central class responsible for distributing the workload through all workload generators.
*/
@Deprecated
public class WorkloadDistributor {
private static final Logger LOGGER = LoggerFactory.getLogger(WorkloadDistributor.class);
private static final String NAMESPACE = "workload-generation";
private static final String COUNTER_PATH = "/counter";
private static final String WORKLOAD_PATH = "/workload";
private static final String WORKLOAD_DEFINITION_PATH = "/workload/definition";
// Curator retry strategy
private static final int BASE_SLEEP_TIME_MS = 2000;
private static final int MAX_RETRIES = 5;
// Wait time
private static final int MAX_WAIT_TIME = 20_000;
private final DistributedAtomicInteger counter;
private final KeySpace keySpace;
private final BeforeAction beforeAction;
private final BiConsumer<WorkloadDefinition, Integer> workerAction;
private final int instances;
private final ZooKeeper zooKeeper; // NOPMD keep instance variable instead of local variable
private final CuratorFramework client;
private boolean workloadGenerationStarted = false; // NOPMD explicit intention that false
/**
* Create a new workload distributor.
*
* @param keySpace the keyspace for the workload generation.
* @param beforeAction the before action for the workload generation.
* @param workerAction the action to perform by the workers.
*/
public WorkloadDistributor(
final int instances,
final ZooKeeper zooKeeper,
final KeySpace keySpace,
final BeforeAction beforeAction,
final BiConsumer<WorkloadDefinition, Integer> workerAction) {
this.instances = instances;
this.zooKeeper = zooKeeper;
this.keySpace = keySpace;
this.beforeAction = beforeAction;
this.workerAction = workerAction;
this.client = CuratorFrameworkFactory.builder()
.namespace(NAMESPACE)
.connectString(this.zooKeeper.getHost() + ":" + this.zooKeeper.getPort())
.retryPolicy(new ExponentialBackoffRetry(BASE_SLEEP_TIME_MS, MAX_RETRIES))
.build();
this.client.start();
try {
this.client.blockUntilConnected();
} catch (final InterruptedException e) {
LOGGER.error(e.getMessage(), e);
throw new IllegalStateException(e);
}
this.counter =
new DistributedAtomicInteger(this.client, COUNTER_PATH,
new ExponentialBackoffRetry(BASE_SLEEP_TIME_MS, MAX_RETRIES));
}
/**
* Start the workload distribution.
*/
public void start() {
try {
AtomicValue<Integer> result = this.counter.increment();
while (!result.succeeded()) {
result = this.counter.increment();
}
final int workerId = result.preValue();
final CuratorWatcher watcher = this.buildWatcher(workerId);
final Stat nodeExists =
this.client.checkExists().creatingParentsIfNeeded().forPath(WORKLOAD_PATH);
if (nodeExists == null) {
this.client.create().forPath(WORKLOAD_PATH);
}
if (workerId == 0) {
LOGGER.info("This instance is master with id {}", workerId);
this.beforeAction.run();
// register worker action, as master acts also as worker
this.client.getChildren().usingWatcher(watcher).forPath(WORKLOAD_PATH);
LOGGER.info("Number of Workers: {}", this.instances);
final WorkloadDefinition definition =
new WorkloadDefinition(this.keySpace, this.instances);
this.client.create().withMode(CreateMode.EPHEMERAL).forPath(WORKLOAD_DEFINITION_PATH,
definition.toString().getBytes(StandardCharsets.UTF_8));
} else {
LOGGER.info("This instance is worker with id {}", workerId);
this.client.getChildren().usingWatcher(watcher).forPath(WORKLOAD_PATH);
final Stat definitionExists =
this.client.checkExists().creatingParentsIfNeeded().forPath(WORKLOAD_DEFINITION_PATH);
if (definitionExists != null) {
this.startWorkloadGeneration(workerId);
}
}
Thread.sleep(MAX_WAIT_TIME);
if (!this.workloadGenerationStarted) {
LOGGER.warn("No workload definition retrieved for 20 s. Terminating now..");
}
} catch (final Exception e) { // NOPMD need to catch exception because of external framework
LOGGER.error(e.getMessage(), e);
throw new IllegalStateException("Error when starting the distribution of the workload.", e);
}
}
/**
* Start the workload generation. This methods body does only get executed once.
*
* @param workerId the ID of this worker
* @throws Exception when an error occurs
*/
// NOPMD because exception thrown from used framework
private synchronized void startWorkloadGeneration(final int workerId) throws Exception { // NOPMD
if (!this.workloadGenerationStarted) {
this.workloadGenerationStarted = true;
final byte[] bytes =
this.client.getData().forPath(WORKLOAD_DEFINITION_PATH);
final WorkloadDefinition definition =
WorkloadDefinition.fromString(new String(bytes, StandardCharsets.UTF_8));
this.workerAction.accept(definition, workerId);
}
}
/**
* Build a curator watcher which performs the worker action.
*
* @param worker the worker to create the watcher for.
* @return the curator watcher.
*/
private CuratorWatcher buildWatcher(final int workerId) {
return new CuratorWatcher() {
@Override
public void process(final WatchedEvent event) {
if (event.getType() == EventType.NodeChildrenChanged) {
try {
WorkloadDistributor.this.startWorkloadGeneration(workerId);
} catch (final Exception e) { // NOPMD external framework throws exception
LOGGER.error(e.getMessage(), e);
throw new IllegalStateException("Error starting workload generation.", e);
}
}
}
};
}
/**
* Stop the workload distributor.
*/
public void stop() {
this.client.close();
}
}
package theodolite.commons.workloadgeneration.generators;
import java.time.Duration;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import theodolite.commons.workloadgeneration.BeforeAction;
import theodolite.commons.workloadgeneration.KeySpace;
import theodolite.commons.workloadgeneration.RecordGenerator;
import theodolite.commons.workloadgeneration.RecordSender;
import theodolite.commons.workloadgeneration.communication.zookeeper.WorkloadDistributor;
import theodolite.commons.workloadgeneration.misc.WorkloadDefinition;
import theodolite.commons.workloadgeneration.misc.WorkloadEntity;
import theodolite.commons.workloadgeneration.misc.ZooKeeper;
/**
* Base for workload generators.
*
* @param <T> The type of records the workload generator is dedicated for.
*/
public abstract class AbstractWorkloadGenerator<T>
implements WorkloadGenerator {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractWorkloadGenerator.class);
private final int instances; // NOPMD keep instance variable instead of local variable
private final ZooKeeper zooKeeper; // NOPMD keep instance variable instead of local variable
private final KeySpace keySpace;// NOPMD keep instance variable instead of local variable
private final BeforeAction beforeAction; // NOPMD keep instance variable instead of local variable
private final BiFunction<WorkloadDefinition, Integer, List<WorkloadEntity<T>>> workloadSelector;
private final RecordGenerator<T> generatorFunction;
private final RecordSender<T> transport;
private WorkloadDistributor workloadDistributor; // NOPMD keep instance variable instead of local
private final ScheduledExecutorService executor;
/**
* Create a new workload generator.
*
* @param instances the number of workload-generator instances.
* @param zooKeeper the zookeeper connection.
* @param keySpace the keyspace.
* @param threads the number of threads that is used to generate the load.
* @param period the period, how often a new record is emitted.
* @param duration the maximum runtime.
* @param beforeAction the action to perform before the workload generation starts.
* @param generatorFunction the function that is used to generate the individual records.
* @param transport the function that is used to send generated messages to the messaging system.
*/
public AbstractWorkloadGenerator(
final int instances,
final ZooKeeper zooKeeper,
final KeySpace keySpace,
final int threads,
final Duration period,
final Duration duration,
final BeforeAction beforeAction,
final RecordGenerator<T> generatorFunction,
final RecordSender<T> transport) {
this.instances = instances;
this.zooKeeper = zooKeeper;
this.keySpace = keySpace;
this.beforeAction = beforeAction;
this.generatorFunction = generatorFunction;
this.workloadSelector = (workloadDefinition, workerId) -> {
final List<WorkloadEntity<T>> workloadEntities = new LinkedList<>();
for (int i =
workloadDefinition.getKeySpace().getMin() + workerId; i <= workloadDefinition
.getKeySpace().getMax(); i += workloadDefinition.getNumberOfWorkers()) {
final String id = workloadDefinition.getKeySpace().getPrefix() + i;
workloadEntities.add(new WorkloadEntity<>(id, this.generatorFunction));
}
return workloadEntities;
};
this.transport = transport;
this.executor = Executors.newScheduledThreadPool(threads);
final Random random = new Random();
final int periodMs = (int) period.toMillis();
LOGGER.info("Period: {}", periodMs);
final BiConsumer<WorkloadDefinition, Integer> workerAction = (declaration, workerId) -> {
final List<WorkloadEntity<T>> entities = this.workloadSelector.apply(declaration, workerId);
LOGGER.info("Beginning of Experiment...");
LOGGER.info("Generating records for {} keys.", entities.size());
LOGGER.info("Experiment is going to be executed for the specified duration...");
entities.forEach(entity -> {
final long initialDelay = random.nextInt(periodMs);
final Runnable task = () -> this.transport.send(entity.generateMessage());
this.executor.scheduleAtFixedRate(task, initialDelay, periodMs, TimeUnit.MILLISECONDS);
});
try {
this.executor.awaitTermination(duration.getSeconds(), TimeUnit.SECONDS);
LOGGER.info("Terminating now...");
this.stop();
} catch (final InterruptedException e) {
LOGGER.error("", e);
throw new IllegalStateException("Error when terminating the workload generation.", e);
}
};
this.workloadDistributor = new WorkloadDistributor(
this.instances,
this.zooKeeper,
this.keySpace,
this.beforeAction,
workerAction);
}
/**
* Start the workload generation. The generation terminates automatically after the specified
* {@code duration}.
*/
@Override
public void start() {
this.workloadDistributor.start();
}
@Override
public void stop() {
this.workloadDistributor.stop();
}
}
package theodolite.commons.workloadgeneration.generators;
import java.time.Duration;
import org.apache.avro.specific.SpecificRecord;
import theodolite.commons.workloadgeneration.BeforeAction;
import theodolite.commons.workloadgeneration.KafkaRecordSender;
import theodolite.commons.workloadgeneration.KeySpace;
import theodolite.commons.workloadgeneration.RecordGenerator;
import theodolite.commons.workloadgeneration.misc.ZooKeeper;
/**
* Workload generator for generating load for the kafka messaging system.
*
* @param <T> The type of records the workload generator is dedicated for.
*/
public class KafkaWorkloadGenerator<T extends SpecificRecord>
extends AbstractWorkloadGenerator<T> {
private final KafkaRecordSender<T> recordSender;
/**
* Create a new workload generator.
*
* @param zooKeeper a reference to the ZooKeeper instance.
* @param keySpace the key space to generate the workload for.
* @param threads tha amount of threads to use per instance.
* @param period the period how often a message is generated for each key specified in the
* {@code keySpace}
* @param duration the duration how long the workload generator will emit messages.
* @param beforeAction the action which will be performed before the workload generator starts
* generating messages. If {@code null}, no before action will be performed.
* @param generatorFunction the generator function. This function is executed, each time a message
* is generated.
* @param recordSender the record sender which is used to send the generated messages to kafka.
*/
public KafkaWorkloadGenerator(
final int instances,
final ZooKeeper zooKeeper,
final KeySpace keySpace,
final int threads,
final Duration period,
final Duration duration,
final BeforeAction beforeAction,
final RecordGenerator<T> generatorFunction,
final KafkaRecordSender<T> recordSender) {
super(instances, zooKeeper, keySpace, threads, period, duration, beforeAction,
generatorFunction,
recordSender);
this.recordSender = recordSender;
}
@Override
public void stop() {
this.recordSender.terminate();
super.stop();
}
}
package theodolite.commons.workloadgeneration.generators;
import java.time.Duration;
import java.util.Objects;
import org.apache.avro.specific.SpecificRecord;
import theodolite.commons.workloadgeneration.BeforeAction;
import theodolite.commons.workloadgeneration.KafkaRecordSender;
import theodolite.commons.workloadgeneration.KeySpace;
import theodolite.commons.workloadgeneration.RecordGenerator;
import theodolite.commons.workloadgeneration.misc.ZooKeeper;
/**
* Builder for {@link workload generators}.
*
* @param <T> the record for which the builder is dedicated for.
*/
public final class KafkaWorkloadGeneratorBuilder<T extends SpecificRecord> { // NOPMD
private int instances; // NOPMD
private ZooKeeper zooKeeper; // NOPMD
private KeySpace keySpace; // NOPMD
private int threads; // NOPMD
private Duration period; // NOPMD
private Duration duration; // NOPMD
private BeforeAction beforeAction; // NOPMD
private RecordGenerator<T> generatorFunction; // NOPMD
private KafkaRecordSender<T> kafkaRecordSender; // NOPMD
private KafkaWorkloadGeneratorBuilder() {
}
/**
* Get a builder for the {@link KafkaWorkloadGenerator}.
*
* @return the builder.
*/
public static <T extends SpecificRecord> KafkaWorkloadGeneratorBuilder<T> builder() {
return new KafkaWorkloadGeneratorBuilder<>();
}
/**
* Set the number of instances.
*
* @param instances the number of instances.
* @return the builder.
*/
public KafkaWorkloadGeneratorBuilder<T> instances(final int instances) {
this.instances = instances;
return this;
}
/**
* Set the ZooKeeper reference.
*
* @param zooKeeper a reference to the ZooKeeper instance.
* @return the builder.
*/
public KafkaWorkloadGeneratorBuilder<T> zooKeeper(final ZooKeeper zooKeeper) {
this.zooKeeper = zooKeeper;
return this;
}
/**
* Set the before action for the {@link KafkaWorkloadGenerator}.
*
* @param beforeAction the {@link BeforeAction}.
* @return the builder.
*/
public KafkaWorkloadGeneratorBuilder<T> beforeAction(final BeforeAction beforeAction) {
this.beforeAction = beforeAction;
return this;
}
/**
* Set the key space for the {@link KafkaWorkloadGenerator}.
*
* @param keySpace the {@link KeySpace}.
* @return the builder.
*/
public KafkaWorkloadGeneratorBuilder<T> keySpace(final KeySpace keySpace) {
this.keySpace = keySpace;
return this;
}
/**
* Set the key space for the {@link KafkaWorkloadGenerator}.
*
* @param threads the number of threads.
* @return the builder.
*/
public KafkaWorkloadGeneratorBuilder<T> threads(final int threads) {
this.threads = threads;
return this;
}
/**
* Set the period for the {@link KafkaWorkloadGenerator}.
*
* @param period the {@link Period}
* @return the builder.
*/
public KafkaWorkloadGeneratorBuilder<T> period(final Duration period) {
this.period = period;
return this;
}
/**
* Set the durtion for the {@link KafkaWorkloadGenerator}.
*
* @param duration the {@link Duration}.
* @return the builder.
*/
public KafkaWorkloadGeneratorBuilder<T> duration(final Duration duration) {
this.duration = duration;
return this;
}
/**
* Set the generator function for the {@link KafkaWorkloadGenerator}.
*
* @param generatorFunction the generator function.
* @return the builder.
*/
public KafkaWorkloadGeneratorBuilder<T> generatorFunction(
final RecordGenerator<T> generatorFunction) {
this.generatorFunction = generatorFunction;
return this;
}
/**
* Set the {@link KafkaRecordSender} for the {@link KafkaWorkloadGenerator}.
*
* @param kafkaRecordSender the record sender to use.
* @return the builder.
*/
public KafkaWorkloadGeneratorBuilder<T> kafkaRecordSender(
final KafkaRecordSender<T> kafkaRecordSender) {
this.kafkaRecordSender = kafkaRecordSender;
return this;
}
/**
* Build the actual {@link KafkaWorkloadGenerator}. The following parameters are must be
* specicified before this method is called:
* <ul>
* <li>zookeeper</li>
* <li>key space</li>
* <li>period</li>
* <li>duration</li>
* <li>generator function</li>
* <li>kafka record sender</li>
* </ul>
*
* @return the built instance of the {@link KafkaWorkloadGenerator}.
*/
public KafkaWorkloadGenerator<T> build() {
if (this.instances < 1) { // NOPMD
throw new IllegalArgumentException(
"Please specify a valid number of instances. Currently: " + this.instances);
}
Objects.requireNonNull(this.zooKeeper, "Please specify the ZooKeeper instance.");
if (this.threads < 1) { // NOPMD
this.threads = 1;
}
Objects.requireNonNull(this.keySpace, "Please specify the key space.");
Objects.requireNonNull(this.period, "Please specify the period.");
Objects.requireNonNull(this.duration, "Please specify the duration.");
this.beforeAction = Objects.requireNonNullElse(this.beforeAction, () -> {
});
Objects.requireNonNull(this.generatorFunction, "Please specify the generator function.");
Objects.requireNonNull(this.kafkaRecordSender, "Please specify the kafka record sender.");
return new KafkaWorkloadGenerator<>(
this.instances,
this.zooKeeper,
this.keySpace,
this.threads,
this.period,
this.duration,
this.beforeAction,
this.generatorFunction,
this.kafkaRecordSender);
}
}
package theodolite.commons.workloadgeneration.generators;
/**
* Base methods for workload generators.
*/
public interface WorkloadGenerator {
/**
* Start the workload generation.
*/
void start();
/**
* Stop the workload generation.
*/
void stop();
}
package theodolite.commons.workloadgeneration.misc;
import theodolite.commons.workloadgeneration.KeySpace;
/**
* The central class that contains all information that needs to be exchanged between the nodes for
* distributed workload generation.
*/
public class WorkloadDefinition {
private static final int ZERO = 0;
private static final int ONE = 1;
private static final int TWO = 2;
private static final int THREE = 3;
private static final int FOUR = 4;
private final KeySpace keySpace;
private final int numberOfWorkers;
/**
* Create a new workload definition.
*
* @param keySpace the key space to use.
* @param numberOfWorkers the number of workers participating in the workload generation.
*/
public WorkloadDefinition(final KeySpace keySpace, final int numberOfWorkers) {
this.keySpace = keySpace;
this.numberOfWorkers = numberOfWorkers;
}
public KeySpace getKeySpace() {
return this.keySpace;
}
public int getNumberOfWorkers() {
return this.numberOfWorkers;
}
/**
* Simple method for encoding all information of the workload definition into one string.
*
* @return a string that encodes all information of the workload generation in a compact format.
* The format is 'keySpace;keySpace.min;keySpace.max;numberOfWorkers'.
*/
@Override
public String toString() {
return this.getKeySpace().getPrefix() + ";" + this.getKeySpace().getMin() + ";"
+ this.getKeySpace().getMax() + ";" + this.getNumberOfWorkers();
}
/**
* Parse a workload generation from a previously encoded string with the format returned by
* {@link WorkloadDefinition#toString()}.
*
* @param workloadDefinitionString the workload definition string.
* @return the parsed workload definition.
*/
public static WorkloadDefinition fromString(final String workloadDefinitionString) {
final String[] deserialized = workloadDefinitionString.split(";");
if (deserialized.length != FOUR) {
throw new IllegalArgumentException(
"Wrong workload definition string when trying to parse the workload generation.");
}
return new WorkloadDefinition(
new KeySpace(deserialized[ZERO], Integer.valueOf(deserialized[ONE]),
Integer.valueOf(deserialized[TWO])),
Integer.valueOf(deserialized[THREE]));
}
}
package theodolite.commons.workloadgeneration.misc;
import theodolite.commons.workloadgeneration.RecordGenerator;
/**
* Representation of a entity of the workload generation that generates load for one fixed key.
*
* @param <T> The type of records the workload generator is dedicated for.
*/
public class WorkloadEntity<T> {
private final String key;
private final RecordGenerator<T> generator;
public WorkloadEntity(final String key, final RecordGenerator<T> generator) {
this.key = key;
this.generator = generator;
}
public T generateMessage() {
return this.generator.generate(this.key);
}
}
package theodolite.commons.workloadgeneration.misc;
/**
* Wrapper for connection information for ZooKeeper.
*/
@Deprecated
public class ZooKeeper {
private final String host;
private final int port;
/**
* Create a new representation of an ZooKeeper instance.
*
* @param host of zookeeper.
* @param port of zookeeper.
*/
public ZooKeeper(final String host, final int port) {
this.host = host;
this.port = port;
}
public String getHost() {
return this.host;
}
public int getPort() {
return this.port;
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment