Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • she/theodolite
1 result
Show changes
package theodolite.commons.workloadgeneration.generators;
import java.time.Duration;
import java.util.Objects;
import org.apache.avro.specific.SpecificRecord;
import theodolite.commons.workloadgeneration.communication.kafka.KafkaRecordSender;
import theodolite.commons.workloadgeneration.dimensions.KeySpace;
import theodolite.commons.workloadgeneration.functions.BeforeAction;
import theodolite.commons.workloadgeneration.functions.MessageGenerator;
import theodolite.commons.workloadgeneration.misc.ZooKeeper;
/**
* Builder for {@link workload generators}.
*
* @param <T> the record for which the builder is dedicated for.
*/
public final class KafkaWorkloadGeneratorBuilder<T extends SpecificRecord> { // NOPMD
private int instances; // NOPMD
private ZooKeeper zooKeeper; // NOPMD
private KeySpace keySpace; // NOPMD
private int threads; // NOPMD
private Duration period; // NOPMD
private Duration duration; // NOPMD
private BeforeAction beforeAction; // NOPMD
private MessageGenerator<T> generatorFunction; // NOPMD
private KafkaRecordSender<T> kafkaRecordSender; // NOPMD
private KafkaWorkloadGeneratorBuilder() {
}
/**
* Get a builder for the {@link KafkaWorkloadGenerator}.
*
* @return the builder.
*/
public static <T extends SpecificRecord> KafkaWorkloadGeneratorBuilder<T> builder() {
return new KafkaWorkloadGeneratorBuilder<>();
}
/**
* Set the number of instances.
*
* @param instances the number of instances.
* @return the builder.
*/
public KafkaWorkloadGeneratorBuilder<T> instances(final int instances) {
this.instances = instances;
return this;
}
/**
* Set the ZooKeeper reference.
*
* @param zooKeeper a reference to the ZooKeeper instance.
* @return the builder.
*/
public KafkaWorkloadGeneratorBuilder<T> zooKeeper(final ZooKeeper zooKeeper) {
this.zooKeeper = zooKeeper;
return this;
}
/**
* Set the before action for the {@link KafkaWorkloadGenerator}.
*
* @param beforeAction the {@link BeforeAction}.
* @return the builder.
*/
public KafkaWorkloadGeneratorBuilder<T> beforeAction(final BeforeAction beforeAction) {
this.beforeAction = beforeAction;
return this;
}
/**
* Set the key space for the {@link KafkaWorkloadGenerator}.
*
* @param keySpace the {@link KeySpace}.
* @return the builder.
*/
public KafkaWorkloadGeneratorBuilder<T> keySpace(final KeySpace keySpace) {
this.keySpace = keySpace;
return this;
}
/**
* Set the key space for the {@link KafkaWorkloadGenerator}.
*
* @param threads the number of threads.
* @return the builder.
*/
public KafkaWorkloadGeneratorBuilder<T> threads(final int threads) {
this.threads = threads;
return this;
}
/**
* Set the period for the {@link KafkaWorkloadGenerator}.
*
* @param period the {@link Period}
* @return the builder.
*/
public KafkaWorkloadGeneratorBuilder<T> period(final Duration period) {
this.period = period;
return this;
}
/**
* Set the durtion for the {@link KafkaWorkloadGenerator}.
*
* @param duration the {@link Duration}.
* @return the builder.
*/
public KafkaWorkloadGeneratorBuilder<T> duration(final Duration duration) {
this.duration = duration;
return this;
}
/**
* Set the generator function for the {@link KafkaWorkloadGenerator}.
*
* @param generatorFunction the generator function.
* @return the builder.
*/
public KafkaWorkloadGeneratorBuilder<T> generatorFunction(
final MessageGenerator<T> generatorFunction) {
this.generatorFunction = generatorFunction;
return this;
}
/**
* Set the {@link KafkaRecordSender} for the {@link KafkaWorkloadGenerator}.
*
* @param kafkaRecordSender the record sender to use.
* @return the builder.
*/
public KafkaWorkloadGeneratorBuilder<T> kafkaRecordSender(
final KafkaRecordSender<T> kafkaRecordSender) {
this.kafkaRecordSender = kafkaRecordSender;
return this;
}
/**
* Build the actual {@link KafkaWorkloadGenerator}. The following parameters are must be
* specicified before this method is called:
* <ul>
* <li>zookeeper</li>
* <li>key space</li>
* <li>period</li>
* <li>duration</li>
* <li>generator function</li>
* <li>kafka record sender</li>
* </ul>
*
* @return the built instance of the {@link KafkaWorkloadGenerator}.
*/
public KafkaWorkloadGenerator<T> build() {
if (this.instances < 1) { // NOPMD
throw new IllegalArgumentException(
"Please specify a valid number of instances. Currently: " + this.instances);
}
Objects.requireNonNull(this.zooKeeper, "Please specify the ZooKeeper instance.");
if (this.threads < 1) { // NOPMD
this.threads = 1;
}
Objects.requireNonNull(this.keySpace, "Please specify the key space.");
Objects.requireNonNull(this.period, "Please specify the period.");
Objects.requireNonNull(this.duration, "Please specify the duration.");
this.beforeAction = Objects.requireNonNullElse(this.beforeAction, () -> {
});
Objects.requireNonNull(this.generatorFunction, "Please specify the generator function.");
Objects.requireNonNull(this.kafkaRecordSender, "Please specify the kafka record sender.");
return new KafkaWorkloadGenerator<>(
this.instances,
this.zooKeeper,
this.keySpace,
this.threads,
this.period,
this.duration,
this.beforeAction,
this.generatorFunction,
this.kafkaRecordSender);
}
}
package theodolite.commons.workloadgeneration.generators;
/**
* Base methods for workload generators.
*/
public interface WorkloadGenerator {
/**
* Start the workload generation.
*/
void start();
/**
* Stop the workload generation.
*/
void stop();
}
package theodolite.commons.workloadgeneration.misc;
import theodolite.commons.workloadgeneration.dimensions.KeySpace;
/**
* The central class that contains all information that needs to be exchanged between the nodes for
* distributed workload generation.
*/
public class WorkloadDefinition {
private static final int ZERO = 0;
private static final int ONE = 1;
private static final int TWO = 2;
private static final int THREE = 3;
private static final int FOUR = 4;
private final KeySpace keySpace;
private final int numberOfWorkers;
/**
* Create a new workload definition.
*
* @param keySpace the key space to use.
* @param numberOfWorkers the number of workers participating in the workload generation.
*/
public WorkloadDefinition(final KeySpace keySpace, final int numberOfWorkers) {
this.keySpace = keySpace;
this.numberOfWorkers = numberOfWorkers;
}
public KeySpace getKeySpace() {
return this.keySpace;
}
public int getNumberOfWorkers() {
return this.numberOfWorkers;
}
/**
* Simple method for encoding all information of the workload definition into one string.
*
* @return a string that encodes all information of the workload generation in a compact format.
* The format is 'keySpace;keySpace.min;keySpace.max;numberOfWorkers'.
*/
@Override
public String toString() {
return this.getKeySpace().getPrefix() + ";" + this.getKeySpace().getMin() + ";"
+ this.getKeySpace().getMax() + ";" + this.getNumberOfWorkers();
}
/**
* Parse a workload generation from a previously encoded string with the format returned by
* {@link WorkloadDefinition#toString()}.
*
* @param workloadDefinitionString the workload definition string.
* @return the parsed workload definition.
*/
public static WorkloadDefinition fromString(final String workloadDefinitionString) {
final String[] deserialized = workloadDefinitionString.split(";");
if (deserialized.length != FOUR) {
throw new IllegalArgumentException(
"Wrong workload definition string when trying to parse the workload generation.");
}
return new WorkloadDefinition(
new KeySpace(deserialized[ZERO], Integer.valueOf(deserialized[ONE]),
Integer.valueOf(deserialized[TWO])),
Integer.valueOf(deserialized[THREE]));
}
}
package theodolite.commons.workloadgeneration.misc;
import theodolite.commons.workloadgeneration.functions.MessageGenerator;
/**
* Representation of a entity of the workload generation that generates load for one fixed key.
*
* @param <T> The type of records the workload generator is dedicated for.
*/
public class WorkloadEntity<T> {
private final String key;
private final MessageGenerator<T> generator;
public WorkloadEntity(final String key, final MessageGenerator<T> generator) {
this.key = key;
this.generator = generator;
}
public T generateMessage() {
return this.generator.generateMessage(this.key);
}
}
package theodolite.commons.workloadgeneration.misc;
/**
* Wrapper for connection information for ZooKeeper.
*/
public class ZooKeeper {
private final String host;
private final int port;
/**
* Create a new representation of an ZooKeeper instance.
*
* @param host of zookeeper.
* @param port of zookeeper.
*/
public ZooKeeper(final String host, final int port) {
this.host = host;
this.port = port;
}
public String getHost() {
return this.host;
}
public int getPort() {
return this.port;
}
}