Skip to content
Snippets Groups Projects
Commit ffe78ce0 authored by Sören Henning's avatar Sören Henning
Browse files

Introduce new workload generator based on Hazelcast

parent fcc9b6c9
No related branches found
No related tags found
No related merge requests found
Showing
with 771 additions and 32 deletions
......@@ -8,8 +8,8 @@ import java.util.Properties;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import theodolite.commons.workloadgeneration.KeySpace;
import theodolite.commons.workloadgeneration.communication.kafka.KafkaRecordSender;
import theodolite.commons.workloadgeneration.dimensions.KeySpace;
import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGenerator;
import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGeneratorBuilder;
import theodolite.commons.workloadgeneration.misc.ZooKeeper;
......
......@@ -8,8 +8,8 @@ import java.util.Properties;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import theodolite.commons.workloadgeneration.KeySpace;
import theodolite.commons.workloadgeneration.communication.kafka.KafkaRecordSender;
import theodolite.commons.workloadgeneration.dimensions.KeySpace;
import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGenerator;
import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGeneratorBuilder;
import theodolite.commons.workloadgeneration.misc.ZooKeeper;
......
......@@ -8,8 +8,8 @@ import java.util.Properties;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import theodolite.commons.workloadgeneration.KeySpace;
import theodolite.commons.workloadgeneration.communication.kafka.KafkaRecordSender;
import theodolite.commons.workloadgeneration.dimensions.KeySpace;
import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGenerator;
import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGeneratorBuilder;
import theodolite.commons.workloadgeneration.misc.ZooKeeper;
......@@ -72,7 +72,7 @@ public final class LoadGenerator {
kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG, (k, v) -> kafkaLingerMs);
kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG, (k, v) -> kafkaBufferMemory);
final KafkaRecordSender<ActivePowerRecord> kafkaRecordSender =
new KafkaRecordSender.Builder<ActivePowerRecord>(
KafkaRecordSender.<ActivePowerRecord>builder(
kafkaBootstrapServers,
kafkaInputTopic,
schemaRegistryUrl)
......
......@@ -8,8 +8,8 @@ import java.util.Properties;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import theodolite.commons.workloadgeneration.KeySpace;
import theodolite.commons.workloadgeneration.communication.kafka.KafkaRecordSender;
import theodolite.commons.workloadgeneration.dimensions.KeySpace;
import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGenerator;
import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGeneratorBuilder;
import theodolite.commons.workloadgeneration.misc.ZooKeeper;
......
dependencies {
implementation 'org.apache.curator:curator-recipes:4.3.0'
implementation 'com.hazelcast:hazelcast:4.1.1'
// compile group: "com.hazelcast", name: "hazelcast-kubernetes", version: "4.1.1"
}
\ No newline at end of file
package theodolite.commons.workloadgeneration;
public class ClusterConfig {
public static final String BOOTSTRAP_SERVER_DEFAULT = "localhost:5701";
public static final int PORT_DEFAULT = 5701;
public static final boolean PORT_AUTO_INCREMENT_DEFAULT = true;
public static final String CLUSTER_NAME_PREFIX_DEFAULT = "theodolite-load-generation";
private final String bootstrapServer;
private final int port;
private final boolean portAutoIncrement;
private final String clusterNamePrefix;
public ClusterConfig() {
this(
BOOTSTRAP_SERVER_DEFAULT,
PORT_DEFAULT,
PORT_AUTO_INCREMENT_DEFAULT,
CLUSTER_NAME_PREFIX_DEFAULT);
}
public ClusterConfig(final String bootstrapServer, final int port,
final boolean portAutoIncrement, final String clusterNamePrefix) {
this.bootstrapServer = bootstrapServer;
this.port = port;
this.portAutoIncrement = portAutoIncrement;
this.clusterNamePrefix = clusterNamePrefix;
}
public String getBootstrapServer() {
return this.bootstrapServer;
}
public int getPort() {
return this.port;
}
public boolean isPortAutoIncrement() {
return this.portAutoIncrement;
}
public String getClusterNamePrefix() {
return this.clusterNamePrefix;
}
}
package theodolite.commons.workloadgeneration;
/**
* Keys to access configuration parameters.
*/
public final class ConfigurationKeys {
public static final String BOOTSTRAP_SERVER = "BOOTSTRAP_SERVER";
public static final String PORT = "PORT";
public static final String PORT_AUTO_INCREMENT = "PORT_AUTO_INCREMENT";
public static final String CLUSTER_NAME_PREFIX = "CLUSTER_NAME_PREFIX";
public static final String NUM_SENSORS = "NUM_SENSORS";
public static final String PERIOD_MS = "PERIOD_MS";
public static final String VALUE = "VALUE";
public static final String THREADS = "THREADS";
public static final String KAFKA_BOOTSTRAP_SERVERS = "KAFKA_BOOTSTRAP_SERVERS";
public static final String SCHEMA_REGISTRY_URL = "SCHEMA_REGISTRY_URL";
public static final String KAFKA_INPUT_TOPIC = "KAFKA_INPUT_TOPIC";
public static final String KAFKA_BATCH_SIZE = "KAFKA_BATCH_SIZE";
public static final String KAFKA_LINGER_MS = "KAFKA_LINGER_MS";
public static final String KAFKA_BUFFER_MEMORY = "KAFKA_BUFFER_MEMORY";
private ConfigurationKeys() {}
}
package theodolite.commons.workloadgeneration;
import com.hazelcast.cluster.Member;
import com.hazelcast.cluster.MembershipEvent;
import com.hazelcast.cluster.MembershipListener;
import com.hazelcast.config.Config;
import com.hazelcast.config.JoinConfig;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
public class HazelcastRunner {
private final HazelcastInstance hzInstance;
private volatile HazelcastRunnerStateInstance runnerState;
private final CompletableFuture<Void> stopAction = new CompletableFuture<>();
private final LoadGeneratorConfig loadConfig;
private final WorkloadDefinition totalLoadDefinition;
public HazelcastRunner(
final ClusterConfig clusterConfig,
final LoadGeneratorConfig loadConfig,
final WorkloadDefinition totalLoadDefinition) {
this.loadConfig = loadConfig;
this.totalLoadDefinition = totalLoadDefinition;
this.hzInstance = buildhazelcastInstance(clusterConfig, totalLoadDefinition.toString());
this.hzInstance.getCluster().addMembershipListener(new RunnerMembershipListener());
}
public void runBlocking() {
while (!this.stopAction.isDone()) {
synchronized (this) {
final Set<Member> members = this.hzInstance.getCluster().getMembers();
this.runnerState = new HazelcastRunnerStateInstance(
this.loadConfig,
this.totalLoadDefinition,
this.hzInstance, members);
}
this.runnerState.runBlocking();
}
}
public void restart() {
this.stopRunnerState();
}
public void stop() {
this.stopAction.complete(null);
this.stopRunnerState();
}
private void stopRunnerState() {
synchronized (this) {
if (this.runnerState != null) {
this.runnerState.stopAsync();
}
}
}
private class RunnerMembershipListener implements MembershipListener {
@Override
public void memberAdded(final MembershipEvent membershipEvent) {
HazelcastRunner.this.restart();
}
@Override
public void memberRemoved(final MembershipEvent membershipEvent) {
HazelcastRunner.this.restart();
}
}
private static HazelcastInstance buildhazelcastInstance(
final ClusterConfig cluster,
final String clusterName) {
final Config config = new Config()
.setClusterName(cluster.getClusterNamePrefix() + '_' + clusterName);
final JoinConfig joinConfig = config.getNetworkConfig()
.setPort(cluster.getPort())
.setPortAutoIncrement(cluster.isPortAutoIncrement())
.getJoin();
joinConfig.getMulticastConfig().setEnabled(false);
// joinConfig.getKubernetesConfig().setEnabled(true);
joinConfig.getTcpIpConfig().addMember(cluster.getBootstrapServer());
return Hazelcast.newHazelcastInstance(config);
}
}
package theodolite.commons.workloadgeneration;
import com.google.common.collect.Streams;
import com.hazelcast.cluster.Member;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.cp.IAtomicReference;
import com.hazelcast.cp.lock.FencedLock;
import java.time.Duration;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
// TODO Reconsider name -> Hazelcast Instance
public class HazelcastRunnerStateInstance {
private static final Logger LOGGER = LoggerFactory.getLogger(HazelcastRunnerStateInstance.class);
private static final Duration BEFORE_ACTION_WAIT_DURATION = Duration.ofMillis(500);
private static final Duration TASK_ASSIGNMENT_WAIT_DURATION = Duration.ofMillis(500);
private final CompletableFuture<Void> stopAction = new CompletableFuture<>();
private LoadGeneratorExecution loadGeneratorExecution;
private final LoadGeneratorConfig loadGeneratorConfig;
private final WorkloadDefinition totalLoadDefinition;
private final HazelcastInstance hzInstance;
private final Set<Member> members;
public HazelcastRunnerStateInstance(
final LoadGeneratorConfig loadGeneratorConfig,
final WorkloadDefinition totalLoadDefinition,
final HazelcastInstance hzInstance,
final Set<Member> members) {
this.hzInstance = hzInstance;
this.members = members;
this.loadGeneratorConfig = loadGeneratorConfig;
this.totalLoadDefinition = totalLoadDefinition;
LOGGER.info("Created new Hazelcast runner instance for member set '{}'", this.members);
}
public void runBlocking() {
if (!this.stopAction.isDone()) {
this.tryPerformBeforeAction();
this.tryCreateTaskAssignment();
this.startLoadGeneration();
}
this.stopAction.join();
this.stopLoadGeneration();
}
public void stopAsync() {
this.stopAction.complete(null);
}
private void tryPerformBeforeAction() {
final FencedLock lock = this.getBeforeActionPerformerLock();
final IAtomicReference<Boolean> isActionPerformed = this.getIsBeforeActionPerformed(); // NOPMD
isActionPerformed.alter(p -> p != null && p); // p -> p == null ? false : p
boolean triedPerformingBeforeAction = false;
while (!isActionPerformed.get()) {
// Try performing the before action
triedPerformingBeforeAction = true;
if (lock.tryLock()) {
try {
if (!isActionPerformed.get()) {
LOGGER.info("This instance is elected to perform the before action.");
this.loadGeneratorConfig.getBeforeAction().run();
LOGGER.info("Before action performed.");
isActionPerformed.set(true);
}
} finally {
lock.unlock();
}
} else {
LOGGER.info("Wait for before action to be performed.");
delay(BEFORE_ACTION_WAIT_DURATION);
}
}
if (!triedPerformingBeforeAction) {
LOGGER.info("Before action has already been performed.");
}
}
private void tryCreateTaskAssignment() {
final Map<UUID, WorkloadDefinition> taskAssignment = this.getTaskAssignment();
final FencedLock lock = this.getTaskAssignmentLock();
boolean triedCreatingTaskAssignment = false;
while (taskAssignment.size() != this.members.size()) {
// Try creating task assignment
triedCreatingTaskAssignment = true;
if (lock.tryLock()) {
try {
if (taskAssignment.size() != this.members.size()) {
LOGGER.info("This instance is elected to create the task assignment.");
final Set<WorkloadDefinition> subLoadDefinitions =
this.totalLoadDefinition.divide(this.members.size());
Streams
.zip(
subLoadDefinitions.stream(),
this.members.stream(),
(loadDef, member) -> new LoadDefPerMember(loadDef, member))
.forEach(l -> taskAssignment.put(l.member.getUuid(), l.loadDefinition));
LOGGER.info("Task assignment created.");
}
} finally {
lock.unlock();
}
} else {
LOGGER.info("Wait for task assignment to be available.");
delay(TASK_ASSIGNMENT_WAIT_DURATION);
}
}
if (!triedCreatingTaskAssignment) {
LOGGER.info("Task assignment is already available.");
}
}
private void startLoadGeneration() {
if (this.loadGeneratorExecution != null) {
throw new IllegalStateException("Load generation has already started before.");
}
LOGGER.info("Start running load generation and pick assigned task.");
final Member member = this.hzInstance.getCluster().getLocalMember();
final WorkloadDefinition workload = this.getTaskAssignment().get(member.getUuid());
LOGGER.info("Run load generation for assigned task: {}", workload);
this.loadGeneratorExecution = this.loadGeneratorConfig.buildLoadGeneratorExecution(workload);
this.loadGeneratorExecution.start();
}
private void stopLoadGeneration() {
this.loadGeneratorExecution.stop();
}
private IAtomicReference<Boolean> getIsBeforeActionPerformed() {
return this.hzInstance.getCPSubsystem().getAtomicReference("isBeforeActionPerformed");
}
private FencedLock getBeforeActionPerformerLock() {
return this.hzInstance.getCPSubsystem().getLock("beforeActionPerformer");
}
private Map<UUID, WorkloadDefinition> getTaskAssignment() {
return this.hzInstance.getReplicatedMap(this.getTaskAssignmentName());
}
private FencedLock getTaskAssignmentLock() {
return this.hzInstance.getCPSubsystem().getLock(this.getTaskAssignmentName() + "_assigner");
}
private String getTaskAssignmentName() {
return this.members.stream()
.map(m -> m.getUuid().toString())
.collect(Collectors.joining("/"));
}
private static void delay(final Duration duration) {
try {
TimeUnit.MILLISECONDS.sleep(duration.toMillis());
} catch (final InterruptedException e) {
throw new IllegalStateException(e);
}
}
private static final class LoadDefPerMember {
public final WorkloadDefinition loadDefinition;
public final Member member;
public LoadDefPerMember(final WorkloadDefinition loadDefinition, final Member member) {
this.loadDefinition = loadDefinition;
this.member = member;
}
}
}
package theodolite.commons.workloadgeneration.dimensions;
package theodolite.commons.workloadgeneration;
import java.util.Collection;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import theodolite.commons.workloadgeneration.generators.AbstractWorkloadGenerator;
/**
......@@ -12,34 +15,24 @@ public class KeySpace {
private final int min;
private final int max;
/**
* Create a new key space. All keys will have the prefix {@code prefix}. The remaining part of
* each key will be determined by a number of the interval ({@code min}, {@code max}-1).
*
* @param prefix the prefix to use for all keys
* @param min the lower bound (inclusive) to start counting from
* @param max the upper bound (exclusive) to count to
* @param max the upper bound (inclusive) to count to
*/
public KeySpace(final String prefix, final int min, final int max) {
if (prefix == null || prefix.contains(";")) {
throw new IllegalArgumentException(
"The prefix must not be null and must not contain the ';' character.");
}
this.prefix = prefix;
this.min = min;
this.max = max;
}
public KeySpace(final String prefix, final int numberOfKeys) {
this(prefix, 0, numberOfKeys - 1);
}
public KeySpace(final int numberOfKeys) {
this("sensor_", 0, numberOfKeys - 1);
}
public String getPrefix() {
return this.prefix;
}
......@@ -53,4 +46,20 @@ public class KeySpace {
public int getMax() {
return this.max;
}
public int getCount() {
return this.getMax() - this.getMin() + 1;
}
public Collection<String> getKeys() {
return IntStream.rangeClosed(this.min, this.max)
.mapToObj(id -> this.prefix + id)
.collect(Collectors.toUnmodifiableList());
}
@Override
public String toString() {
return this.prefix + '[' + this.min + '-' + this.max + ']';
}
}
package theodolite.commons.workloadgeneration;
import java.time.Duration;
import java.util.Objects;
import java.util.Properties;
import org.apache.kafka.clients.producer.ProducerConfig;
import theodolite.commons.workloadgeneration.functions.BeforeAction;
import theodolite.commons.workloadgeneration.functions.TitanMessageGeneratorFactory;
public final class LoadGenerator {
private static final String SENSOR_PREFIX_DEFAULT = "s_";
private static final int NUMBER_OF_KEYS_DEFAULT = 10;
private static final int PERIOD_MS_DEFAULT = 1000;
private static final int VALUE_DEFAULT = 10;
private static final String SCHEMA_REGISTRY_URL_DEFAULT = "http://localhost:8081";
private static final String KAFKA_TOPIC_DEFAULT = "input";
private static final String KAFKA_BOOTSTRAP_SERVERS_DEFAULT = "localhost:9092"; // NOPMD
private ClusterConfig clusterConfig;
private WorkloadDefinition loadDefinition;
private LoadGeneratorConfig generatorConfig;
private boolean isStarted;
private LoadGenerator() {}
// Add constructor for creating from environment variables
public LoadGenerator setClusterConfig(final ClusterConfig clusterConfig) { // NOPMD
this.clusterConfig = clusterConfig;
return this;
}
public LoadGenerator setLoadDefinition(final WorkloadDefinition loadDefinition) { // NOPMD
this.loadDefinition = loadDefinition;
return this;
}
public LoadGenerator setGeneratorConfig(final LoadGeneratorConfig generatorConfig) { // NOPMD
this.generatorConfig = generatorConfig;
return this;
}
public LoadGenerator withBeforeAction(final BeforeAction beforeAction) {
this.generatorConfig.setBeforeAction(beforeAction);
return this;
}
public LoadGenerator withThreads(final int threads) {
this.generatorConfig.setThreads(threads);
return this;
}
public void run() {
Objects.requireNonNull(this.clusterConfig, "No cluster config set.");
Objects.requireNonNull(this.generatorConfig, "No generator config set.");
Objects.requireNonNull(this.loadDefinition, "No load definition set.");
if (this.isStarted) {
throw new IllegalStateException("Load generator can only be started once.");
}
this.isStarted = true;
final HazelcastRunner runner = new HazelcastRunner(
this.clusterConfig,
this.generatorConfig,
this.loadDefinition);
runner.runBlocking();
}
public static LoadGenerator fromDefaults() {
return new LoadGenerator()
.setClusterConfig(new ClusterConfig())
.setLoadDefinition(new WorkloadDefinition(
new KeySpace(SENSOR_PREFIX_DEFAULT, NUMBER_OF_KEYS_DEFAULT),
Duration.ofMillis(PERIOD_MS_DEFAULT)))
.setGeneratorConfig(new LoadGeneratorConfig(
TitanMessageGeneratorFactory
.withKafkaConfig(
KAFKA_BOOTSTRAP_SERVERS_DEFAULT,
KAFKA_TOPIC_DEFAULT,
SCHEMA_REGISTRY_URL_DEFAULT)
.forConstantValue(VALUE_DEFAULT)));
}
public static LoadGenerator fromEnvironment() {
final String bootstrapServer = Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.BOOTSTRAP_SERVER),
ClusterConfig.BOOTSTRAP_SERVER_DEFAULT);
final int port = Integer.parseInt(Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.PORT),
Integer.toString(ClusterConfig.PORT_DEFAULT)));
final boolean portAutoIncrement = Boolean.parseBoolean(Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.PORT_AUTO_INCREMENT),
Boolean.toString(ClusterConfig.PORT_AUTO_INCREMENT_DEFAULT)));
final String clusterNamePrefix = Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.CLUSTER_NAME_PREFIX),
ClusterConfig.CLUSTER_NAME_PREFIX_DEFAULT);
final int numSensors = Integer.parseInt(Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.NUM_SENSORS),
Integer.toString(NUMBER_OF_KEYS_DEFAULT)));
final int periodMs = Integer.parseInt(Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.PERIOD_MS),
Integer.toString(PERIOD_MS_DEFAULT)));
final double value = Double.parseDouble(Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.VALUE),
Integer.toString(VALUE_DEFAULT)));
final String kafkaBootstrapServers = Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS),
KAFKA_BOOTSTRAP_SERVERS_DEFAULT);
final String kafkaInputTopic = Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.KAFKA_INPUT_TOPIC),
KAFKA_TOPIC_DEFAULT);
final String schemaRegistryUrl = Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.SCHEMA_REGISTRY_URL),
SCHEMA_REGISTRY_URL_DEFAULT);
final Properties kafkaProperties = new Properties();
kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG,
(k, v) -> System.getenv(ConfigurationKeys.KAFKA_BATCH_SIZE));
kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG,
(k, v) -> System.getenv(ConfigurationKeys.KAFKA_LINGER_MS));
kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG,
(k, v) -> System.getenv(ConfigurationKeys.KAFKA_BUFFER_MEMORY));
return new LoadGenerator()
.setClusterConfig(new ClusterConfig(
bootstrapServer,
port,
portAutoIncrement,
clusterNamePrefix))
.setLoadDefinition(new WorkloadDefinition(
new KeySpace(SENSOR_PREFIX_DEFAULT, numSensors),
Duration.ofMillis(periodMs)))
.setGeneratorConfig(new LoadGeneratorConfig(
TitanMessageGeneratorFactory
.withKafkaConfig(
kafkaBootstrapServers,
kafkaInputTopic,
schemaRegistryUrl,
kafkaProperties)
.forConstantValue(value)));
}
public static void main(final String[] args) {
LoadGenerator.fromEnvironment()
.run();
}
}
package theodolite.commons.workloadgeneration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import theodolite.commons.workloadgeneration.functions.BeforeAction;
import theodolite.commons.workloadgeneration.functions.MessageGenerator;
public class LoadGeneratorConfig {
private static final Logger LOGGER = LoggerFactory.getLogger(LoadGeneratorConfig.class);
private final MessageGenerator messageGenerator;
private BeforeAction beforeAction = BeforeAction.doNothing();
private int threads = 1;
public LoadGeneratorConfig(final MessageGenerator messageGenerator) {
this.messageGenerator = messageGenerator;
}
public LoadGeneratorConfig(
final MessageGenerator messageGenerator,
final BeforeAction beforeAction,
final int threads) {
this.messageGenerator = messageGenerator;
this.beforeAction = beforeAction;
this.threads = threads;
}
public LoadGeneratorExecution buildLoadGeneratorExecution(
final WorkloadDefinition workloadDefinition) {
return new LoadGeneratorExecution(workloadDefinition, this.messageGenerator, this.threads);
}
public BeforeAction getBeforeAction() {
return this.beforeAction;
}
public void setThreads(final int threads) {
this.threads = threads;
}
public void setBeforeAction(final BeforeAction beforeAction) {
this.beforeAction = beforeAction;
}
}
package theodolite.commons.workloadgeneration;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import theodolite.commons.workloadgeneration.functions.MessageGenerator;
public class LoadGeneratorExecution {
private static final Logger LOGGER = LoggerFactory.getLogger(LoadGeneratorExecution.class);
private final Random random = new Random();
private final WorkloadDefinition workloadDefinition;
private final MessageGenerator messageGenerator;
private final ScheduledExecutorService executor;
public LoadGeneratorExecution(
final WorkloadDefinition workloadDefinition,
final MessageGenerator messageGenerator,
final int threads) {
this.workloadDefinition = workloadDefinition;
this.messageGenerator = messageGenerator;
this.executor = Executors.newScheduledThreadPool(threads);
}
public void start() {
LOGGER.info("Beginning of Experiment...");
LOGGER.info("Generating records for {} keys.",
this.workloadDefinition.getKeySpace().getCount());
LOGGER.info("Experiment is going to be executed until cancelation...");
final int periodMs = (int) this.workloadDefinition.getPeriod().toMillis();
for (final String key : this.workloadDefinition.getKeySpace().getKeys()) {
final long initialDelay = this.random.nextInt(periodMs);
final Runnable task = () -> this.messageGenerator.generate(key);
this.executor.scheduleAtFixedRate(task, initialDelay, periodMs, TimeUnit.MILLISECONDS);
}
}
public void stop() {
this.executor.shutdownNow();
}
}
package theodolite.commons.workloadgeneration;
import java.time.Duration;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class WorkloadDefinition {
private final KeySpace keySpace;
private final Duration period;
/**
* Create a new workload definition.
*
* @param keySpace the key space to use.
*/
public WorkloadDefinition(
final KeySpace keySpace,
final Duration period) {
this.keySpace = keySpace;
this.period = period;
}
public KeySpace getKeySpace() {
return this.keySpace;
}
public Duration getPeriod() {
return this.period;
}
public Set<WorkloadDefinition> divide(final int parts) {
final int size = (this.keySpace.getCount() + parts - 1) / parts; // = ceil(count/parts)
return IntStream.range(0, parts)
.mapToObj(part -> new KeySpace(
this.keySpace.getPrefix(),
this.keySpace.getMin() + part * size,
Math.min(this.keySpace.getMin() + (part + 1) * size - 1, this.keySpace.getMax())))
.map(keySpace -> new WorkloadDefinition(
keySpace,
this.period))
.collect(Collectors.toUnmodifiableSet());
}
@Override
public String toString() {
return this.keySpace + ";" + this.period.toMillis();
}
}
......@@ -9,7 +9,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import theodolite.commons.workloadgeneration.functions.Transport;
import theodolite.commons.workloadgeneration.functions.RecordSender;
import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory;
/**
......@@ -17,7 +17,7 @@ import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory;
*
* @param <T> {@link IMonitoringRecord} to send
*/
public class KafkaRecordSender<T extends SpecificRecord> implements Transport<T> {
public class KafkaRecordSender<T extends SpecificRecord> implements RecordSender<T> {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaRecordSender.class);
......@@ -47,10 +47,19 @@ public class KafkaRecordSender<T extends SpecificRecord> implements Transport<T>
final SchemaRegistryAvroSerdeFactory avroSerdeFactory =
new SchemaRegistryAvroSerdeFactory(builder.schemaRegistryUrl);
this.producer = new KafkaProducer<>(properties, new StringSerializer(),
this.producer = new KafkaProducer<>(
properties,
new StringSerializer(),
avroSerdeFactory.<T>forKeys().serializer());
}
public static <T extends SpecificRecord> Builder<T> builder(
final String bootstrapServers,
final String topic,
final String schemaRegistryUrl) {
return new Builder<>(bootstrapServers, topic, schemaRegistryUrl);
}
/**
* Builder class to build a new {@link KafkaRecordSender}.
*
......@@ -72,7 +81,7 @@ public class KafkaRecordSender<T extends SpecificRecord> implements Transport<T>
* @param topic The topic where to write.
* @param schemaRegistryUrl URL to the schema registry for avro.
*/
public Builder(final String bootstrapServers, final String topic,
private Builder(final String bootstrapServers, final String topic,
final String schemaRegistryUrl) {
this.bootstrapServers = bootstrapServers;
this.topic = topic;
......@@ -116,7 +125,7 @@ public class KafkaRecordSender<T extends SpecificRecord> implements Transport<T>
}
@Override
public void transport(final T message) {
public void send(final T message) {
this.write(message);
}
......
......@@ -8,4 +8,9 @@ public interface BeforeAction {
public void run();
public static BeforeAction doNothing() {
return () -> {
};
}
}
package theodolite.commons.workloadgeneration.functions;
/**
* This interface describes a function that takes meta information from a string (e.g. an ID) and
* produces an object of type T.
*
* @param <T> the type of the objects that will be generated by the function.
*/
@FunctionalInterface
public interface MessageGenerator<T> {
public interface MessageGenerator {
T generateMessage(final String key);
void generate(final String key);
public static <T> MessageGenerator from(
final RecordGenerator<T> generator,
final RecordSender<T> sender) {
return key -> sender.send(generator.generate(key));
}
}
package theodolite.commons.workloadgeneration.functions;
/**
* This interface describes a function that takes meta information from a string (e.g. an ID) and
* produces an object of type T.
*
* @param <T> the type of the objects that will be generated by the function.
*/
@FunctionalInterface
public interface RecordGenerator<T> {
T generate(final String key);
}
......@@ -7,8 +7,8 @@ package theodolite.commons.workloadgeneration.functions;
* @param <T> the type of records to send as messages.
*/
@FunctionalInterface
public interface Transport<T> {
public interface RecordSender<T> {
void transport(final T message);
void send(final T message);
}
package theodolite.commons.workloadgeneration.functions;
import java.util.Properties;
import theodolite.commons.workloadgeneration.communication.kafka.KafkaRecordSender;
import titan.ccp.model.records.ActivePowerRecord;
public final class TitanMessageGeneratorFactory {
private final RecordSender<ActivePowerRecord> recordSender;
private TitanMessageGeneratorFactory(final RecordSender<ActivePowerRecord> recordSender) {
this.recordSender = recordSender;
}
public MessageGenerator forConstantValue(final double value) {
return MessageGenerator.from(
sensor -> new ActivePowerRecord(sensor, System.currentTimeMillis(), value),
this.recordSender);
}
public static TitanMessageGeneratorFactory withKafkaConfig(
final String bootstrapServers,
final String topic,
final String schemaRegistryUrl) {
return withKafkaConfig(bootstrapServers, topic, schemaRegistryUrl, new Properties());
}
public static TitanMessageGeneratorFactory withKafkaConfig(
final String bootstrapServers,
final String topic,
final String schemaRegistryUrl,
final Properties properties) {
final KafkaRecordSender<ActivePowerRecord> kafkaRecordSender = KafkaRecordSender
.<ActivePowerRecord>builder(
bootstrapServers,
topic,
schemaRegistryUrl)
.keyAccessor(r -> r.getIdentifier())
.timestampAccessor(r -> r.getTimestamp())
.build();
return new TitanMessageGeneratorFactory(kafkaRecordSender);
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment