Skip to content
Snippets Groups Projects
Commit 086892fa authored by Simon Ehrenstein's avatar Simon Ehrenstein
Browse files

Cleanup and restructuring code

parent 618f3627
No related branches found
No related tags found
1 merge request!6Add Distributed Workload Generator
Showing
with 103 additions and 119 deletions
......@@ -47,7 +47,7 @@ sidecar:
# If specified, the sidecar will search for datasource config-maps inside this namespace.
# Otherwise the namespace in which the sidecar is running will be used.
# It's also possible to specify ALL to search in all namespaces
searchNamespace: default
searchNamespace: null
service:
......
......@@ -15,7 +15,7 @@ spec:
terminationGracePeriodSeconds: 0
containers:
- name: uc2-application
image: "benediktwetzel/uc2-app:latest"
image: "soerenhenning/uc2-app:latest"
ports:
- containerPort: 5555
name: jmx
......
......@@ -15,7 +15,7 @@ spec:
terminationGracePeriodSeconds: 0
containers:
- name: workload-generator
image: benediktwetzel/uc2-wg:latest
image: soerenhenning/uc2-wg:latest
env:
- name: ZK_HOST
value: "my-confluent-cp-zookeeper"
......
......@@ -26,6 +26,9 @@ public final class LoadGenerator {
private LoadGenerator() {}
/**
* Entry point.
*/
public static void main(final String[] args) throws InterruptedException, IOException {
// uc1
LOGGER.info("Start workload generator for use case UC1.");
......
package theodolite.kafkasender;
import java.util.Properties;
import java.util.function.Function;
import kieker.common.record.IMonitoringRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde;
/**
* Sends monitoring records to Kafka.
*
* @param <T> {@link IMonitoringRecord} to send
*/
public class KafkaRecordSender<T extends IMonitoringRecord> {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaRecordSender.class);
private final String topic;
private final Function<T, String> keyAccessor;
private final Function<T, Long> timestampAccessor;
private final Producer<String, T> producer;
public KafkaRecordSender(final String bootstrapServers, final String topic) {
this(bootstrapServers, topic, x -> "", x -> null, new Properties());
}
public KafkaRecordSender(final String bootstrapServers, final String topic,
final Function<T, String> keyAccessor) {
this(bootstrapServers, topic, keyAccessor, x -> null, new Properties());
}
public KafkaRecordSender(final String bootstrapServers, final String topic,
final Function<T, String> keyAccessor, final Function<T, Long> timestampAccessor) {
this(bootstrapServers, topic, keyAccessor, timestampAccessor, new Properties());
}
/**
* Create a new {@link KafkaRecordSender}.
*/
public KafkaRecordSender(final String bootstrapServers, final String topic,
final Function<T, String> keyAccessor, final Function<T, Long> timestampAccessor,
final Properties defaultProperties) {
this.topic = topic;
this.keyAccessor = keyAccessor;
this.timestampAccessor = timestampAccessor;
final Properties properties = new Properties();
properties.putAll(defaultProperties);
properties.put("bootstrap.servers", bootstrapServers);
// properties.put("acks", this.acknowledges);
// properties.put("batch.size", this.batchSize);
// properties.put("linger.ms", this.lingerMs);
// properties.put("buffer.memory", this.bufferMemory);
this.producer = new KafkaProducer<>(properties, new StringSerializer(),
IMonitoringRecordSerde.serializer());
}
/**
* Write the passed monitoring record to Kafka.
*/
public void write(final T monitoringRecord) {
final ProducerRecord<String, T> record =
new ProducerRecord<>(this.topic, null, this.timestampAccessor.apply(monitoringRecord),
this.keyAccessor.apply(monitoringRecord), monitoringRecord);
LOGGER.debug("Send record to Kafka topic {}: {}", this.topic, record);
this.producer.send(record);
}
public void terminate() {
this.producer.close();
}
}
......@@ -51,6 +51,8 @@ public class LoadGenerator {
final String kafkaBatchSize = System.getenv("KAFKA_BATCH_SIZE");
final String kafkaLingerMs = System.getenv("KAFKA_LINGER_MS");
final String kafkaBufferMemory = System.getenv("KAFKA_BUFFER_MEMORY");
final int instances =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("INSTANCES"), "1"));
// build sensor registry
final MutableSensorRegistry sensorRegistry =
......@@ -69,6 +71,7 @@ public class LoadGenerator {
// create workload generator
final KafkaWorkloadGenerator<ActivePowerRecord> workloadGenerator =
KafkaWorkloadGeneratorBuilder.<ActivePowerRecord>builder()
.setInstances(instances)
.setKeySpace(new KeySpace("s_", numSensors))
.setThreads(threads)
.setPeriod(Duration.of(periodMs, ChronoUnit.MILLIS))
......
......@@ -43,6 +43,8 @@ public class LoadGenerator {
final String kafkaBatchSize = System.getenv("KAFKA_BATCH_SIZE");
final String kafkaLingerMs = System.getenv("KAFKA_LINGER_MS");
final String kafkaBufferMemory = System.getenv("KAFKA_BUFFER_MEMORY");
final int instances =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("INSTANCES"), "1"));
// create kafka record sender
final Properties kafkaProperties = new Properties();
......@@ -57,6 +59,7 @@ public class LoadGenerator {
// create workload generator
final KafkaWorkloadGenerator<ActivePowerRecord> workloadGenerator =
KafkaWorkloadGeneratorBuilder.<ActivePowerRecord>builder()
.setInstances(instances)
.setKeySpace(new KeySpace("s_", numSensors))
.setThreads(threads)
.setPeriod(Duration.of(periodMs, ChronoUnit.MILLIS))
......
......@@ -43,6 +43,8 @@ public class LoadGenerator {
final String kafkaBatchSize = System.getenv("KAFKA_BATCH_SIZE");
final String kafkaLingerMs = System.getenv("KAFKA_LINGER_MS");
final String kafkaBufferMemory = System.getenv("KAFKA_BUFFER_MEMORY");
final int instances =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("INSTANCES"), "1"));
// create kafka record sender
final Properties kafkaProperties = new Properties();
......@@ -57,6 +59,7 @@ public class LoadGenerator {
// create workload generator
final KafkaWorkloadGenerator<ActivePowerRecord> workloadGenerator =
KafkaWorkloadGeneratorBuilder.<ActivePowerRecord>builder()
.setInstances(instances)
.setKeySpace(new KeySpace("s_", numSensors))
.setThreads(threads)
.setPeriod(Duration.of(periodMs, ChronoUnit.MILLIS))
......
......@@ -2,13 +2,13 @@ package theodolite.commons.workloadgeneration.communication.kafka;
import java.util.Properties;
import java.util.function.Function;
import kieker.common.record.IMonitoringRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import kieker.common.record.IMonitoringRecord;
import theodolite.commons.workloadgeneration.functions.Transport;
import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde;
......
......@@ -19,7 +19,7 @@ import theodolite.commons.workloadgeneration.functions.BeforeAction;
import theodolite.commons.workloadgeneration.misc.WorkloadDefinition;
import theodolite.commons.workloadgeneration.misc.ZooKeeper;
/*
/**
* The central class responsible for distributing the workload through all workload generators.
*/
public class WorkloadDistributor {
......@@ -30,6 +30,14 @@ public class WorkloadDistributor {
private static final String COUNTER_PATH = "/counter";
private static final String WORKLOAD_PATH = "/workload";
private static final String WORKLOAD_DEFINITION_PATH = "/workload/definition";
// Curator retry strategy
private static final int BASE_SLEEP_TIME_MS = 2000;
private static final int MAX_RETRIES = 5;
// Wait time
private static final int MAX_WAIT_TIME = 20_000;
private final DistributedAtomicInteger counter;
private final KeySpace keySpace;
private final BeforeAction beforeAction;
......@@ -39,7 +47,7 @@ public class WorkloadDistributor {
private final ZooKeeper zooKeeper;
private final CuratorFramework client;
private boolean workloadGenerationStarted = true;
private boolean workloadGenerationStarted = false;
/**
* Create a new workload distributor.
......@@ -63,7 +71,7 @@ public class WorkloadDistributor {
this.client = CuratorFrameworkFactory.builder()
.namespace(NAMESPACE)
.connectString(this.zooKeeper.getHost() + ":" + this.zooKeeper.getPort())
.retryPolicy(new ExponentialBackoffRetry(2000, 5))
.retryPolicy(new ExponentialBackoffRetry(BASE_SLEEP_TIME_MS, MAX_RETRIES))
.build();
this.client.start();
......@@ -77,7 +85,7 @@ public class WorkloadDistributor {
this.counter =
new DistributedAtomicInteger(this.client, COUNTER_PATH,
new ExponentialBackoffRetry(2000, 5));
new ExponentialBackoffRetry(BASE_SLEEP_TIME_MS, MAX_RETRIES));
}
/**
......@@ -94,7 +102,11 @@ public class WorkloadDistributor {
final CuratorWatcher watcher = this.buildWatcher(workerId);
this.client.checkExists().creatingParentsIfNeeded().forPath(WORKLOAD_PATH);
final Stat nodeExists =
this.client.checkExists().creatingParentsIfNeeded().forPath(WORKLOAD_PATH);
if (nodeExists == null) {
this.client.create().forPath(WORKLOAD_PATH);
}
if (workerId == 0) {
LOGGER.info("This instance is master with id {}", workerId);
......@@ -117,16 +129,19 @@ public class WorkloadDistributor {
this.client.getChildren().usingWatcher(watcher).forPath(WORKLOAD_PATH);
final Stat exists =
final Stat definitionExists =
this.client.checkExists().creatingParentsIfNeeded().forPath(WORKLOAD_DEFINITION_PATH);
if (exists != null) {
if (definitionExists != null) {
this.startWorkloadGeneration(workerId);
}
}
Thread.sleep(20_000);
Thread.sleep(MAX_WAIT_TIME);
if (!this.workloadGenerationStarted) {
LOGGER.warn("No workload definition retrieved for 20 s. Terminating now..");
}
} catch (final Exception e) {
LOGGER.error("", e);
throw new IllegalStateException("Error when starting the distribution of the workload.");
......@@ -137,7 +152,7 @@ public class WorkloadDistributor {
* Start the workload generation. This methods body does only get executed once.
*
* @param workerId the ID of this worker
* @throws Exception
* @throws Exception when an error occurs
*/
private synchronized void startWorkloadGeneration(final int workerId) throws Exception {
if (!this.workloadGenerationStarted) {
......
......@@ -21,6 +21,11 @@ import theodolite.commons.workloadgeneration.misc.WorkloadDefinition;
import theodolite.commons.workloadgeneration.misc.WorkloadEntity;
import theodolite.commons.workloadgeneration.misc.ZooKeeper;
/**
* Base for workload generators.
*
* @param <T> The type of records the workload generator is dedicated for.
*/
public abstract class AbstractWorkloadGenerator<T extends IMonitoringRecord>
implements WorkloadGenerator {
......@@ -51,19 +56,18 @@ public abstract class AbstractWorkloadGenerator<T extends IMonitoringRecord>
private final ScheduledExecutorService executor;
/**
* Start the workload generation. The generation terminates automatically after the specified
* {@code duration}.
* Create a new workload generator.
*
* @param instances the number of workload-generator instances.
* @param zooKeeper the zookeeper connection.
* @param keySpace the keyspace.
* @param threads the number of threads that is used to generate the load.
* @param period the period, how often a new record is emitted.
* @param duration the maximum runtime.
* @param beforeAction the action to perform before the workload generation starts.
* @param generatorFunction the function that is used to generate the individual records.
* @param transport the function that is used to send generated messages to the messaging system.
*/
@Override
public void start() {
this.workloadDistributor.start();
}
@Override
public void stop() {
this.workloadDistributor.stop();
}
public AbstractWorkloadGenerator(
final int instances,
final ZooKeeper zooKeeper,
......@@ -99,19 +103,25 @@ public abstract class AbstractWorkloadGenerator<T extends IMonitoringRecord>
this.executor = Executors.newScheduledThreadPool(threads);
final Random random = new Random();
final int periodMs = period.getNano() / 1_000_000;
final int periodMs = (int) period.toMillis();
LOGGER.info("Period: " + periodMs);
final BiConsumer<WorkloadDefinition, Integer> workerAction = (declaration, workerId) -> {
final List<WorkloadEntity<T>> entities = this.workloadSelector.apply(declaration, workerId);
LOGGER.info("Beginning of Experiment...");
LOGGER.info("Generating records for {} keys.", entities.size());
LOGGER.info("Experiment is going to be executed for the specified duration...");
entities.forEach(entity -> {
final T message = entity.generateMessage();
final long initialDelay = random.nextInt(periodMs);
this.executor.scheduleAtFixedRate(() -> this.transport.transport(message), initialDelay,
final Runnable task = () -> {
this.transport.transport(message);
};
this.executor.scheduleAtFixedRate(task, initialDelay,
periodMs, TimeUnit.MILLISECONDS);
});
......@@ -130,4 +140,18 @@ public abstract class AbstractWorkloadGenerator<T extends IMonitoringRecord>
new WorkloadDistributor(this.instances, this.zooKeeper, this.keySpace, this.beforeAction,
workerAction);
}
/**
* Start the workload generation. The generation terminates automatically after the specified
* {@code duration}.
*/
@Override
public void start() {
this.workloadDistributor.start();
}
@Override
public void stop() {
this.workloadDistributor.stop();
}
}
......@@ -10,6 +10,8 @@ import theodolite.commons.workloadgeneration.misc.ZooKeeper;
/**
* Workload generator for generating load for the kafka messaging system.
*
* @param <T> The type of records the workload generator is dedicated for.
*/
public class KafkaWorkloadGenerator<T extends IMonitoringRecord>
extends AbstractWorkloadGenerator<T> {
......
......@@ -9,6 +9,11 @@ import theodolite.commons.workloadgeneration.functions.BeforeAction;
import theodolite.commons.workloadgeneration.functions.MessageGenerator;
import theodolite.commons.workloadgeneration.misc.ZooKeeper;
/**
* Builder for {@link workload generators}.
*
* @param <T> the record for which the builder is dedicated for.
*/
public final class KafkaWorkloadGeneratorBuilder<T extends IMonitoringRecord> {
private int instances;
......@@ -45,7 +50,7 @@ public final class KafkaWorkloadGeneratorBuilder<T extends IMonitoringRecord> {
/**
* Set the number of instances.
*
* @param zooKeeper a reference to the ZooKeeper instance.
* @param instances the number of instances.
* @return the builder.
*/
public KafkaWorkloadGeneratorBuilder<T> setInstances(final int instances) {
......@@ -89,7 +94,7 @@ public final class KafkaWorkloadGeneratorBuilder<T extends IMonitoringRecord> {
/**
* Set the key space for the {@link KafkaWorkloadGenerator}.
*
* @param keySpace the {@link KeySpace}.
* @param threads the number of threads.
* @return the builder.
*/
public KafkaWorkloadGeneratorBuilder<T> setThreads(final int threads) {
......
......@@ -2,11 +2,17 @@ package theodolite.commons.workloadgeneration.misc;
import theodolite.commons.workloadgeneration.dimensions.KeySpace;
/*
/**
* The central class that contains all information that needs to be exchanged between the nodes for
* distributed workload generation.
*/
public class WorkloadDefinition {
private static final int ZERO = 0;
private static final int ONE = 1;
private static final int TWO = 2;
private static final int THREE = 3;
private static final int FOUR = 4;
private final KeySpace keySpace;
private final int numberOfWorkers;
......@@ -52,12 +58,14 @@ public class WorkloadDefinition {
public static WorkloadDefinition fromString(final String workloadDefinitionString) {
final String[] deserialized = workloadDefinitionString.split(";");
if (deserialized.length != 4) {
if (deserialized.length != FOUR) {
throw new IllegalArgumentException(
"Wrong workload definition string when trying to parse the workload generation.");
}
return new WorkloadDefinition(new KeySpace(deserialized[0], Integer.valueOf(deserialized[1]),
Integer.valueOf(deserialized[2])), Integer.valueOf(deserialized[3]));
return new WorkloadDefinition(
new KeySpace(deserialized[ZERO], Integer.valueOf(deserialized[ONE]),
Integer.valueOf(deserialized[TWO])),
Integer.valueOf(deserialized[THREE]));
}
}
......@@ -3,8 +3,10 @@ package theodolite.commons.workloadgeneration.misc;
import kieker.common.record.IMonitoringRecord;
import theodolite.commons.workloadgeneration.functions.MessageGenerator;
/*
/**
* Representation of a entity of the workload generation that generates load for one fixed key.
*
* @param <T> The type of records the workload generator is dedicated for.
*/
public class WorkloadEntity<T extends IMonitoringRecord> {
private final String key;
......
package theodolite.commons.workloadgeneration.misc;
/*
/**
* Wrapper for connection information for ZooKeeper.
*/
public class ZooKeeper {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment