Skip to content
Snippets Groups Projects
Commit 8cc23263 authored by Simon Ehrenstein's avatar Simon Ehrenstein
Browse files

Cleanup and restructuring code

parent 393adf9d
No related branches found
No related tags found
No related merge requests found
Showing
with 103 additions and 119 deletions
...@@ -47,7 +47,7 @@ sidecar: ...@@ -47,7 +47,7 @@ sidecar:
# If specified, the sidecar will search for datasource config-maps inside this namespace. # If specified, the sidecar will search for datasource config-maps inside this namespace.
# Otherwise the namespace in which the sidecar is running will be used. # Otherwise the namespace in which the sidecar is running will be used.
# It's also possible to specify ALL to search in all namespaces # It's also possible to specify ALL to search in all namespaces
searchNamespace: default searchNamespace: null
service: service:
......
...@@ -15,7 +15,7 @@ spec: ...@@ -15,7 +15,7 @@ spec:
terminationGracePeriodSeconds: 0 terminationGracePeriodSeconds: 0
containers: containers:
- name: uc2-application - name: uc2-application
image: "benediktwetzel/uc2-app:latest" image: "soerenhenning/uc2-app:latest"
ports: ports:
- containerPort: 5555 - containerPort: 5555
name: jmx name: jmx
......
...@@ -15,7 +15,7 @@ spec: ...@@ -15,7 +15,7 @@ spec:
terminationGracePeriodSeconds: 0 terminationGracePeriodSeconds: 0
containers: containers:
- name: workload-generator - name: workload-generator
image: benediktwetzel/uc2-wg:latest image: soerenhenning/uc2-wg:latest
env: env:
- name: ZK_HOST - name: ZK_HOST
value: "my-confluent-cp-zookeeper" value: "my-confluent-cp-zookeeper"
......
...@@ -26,6 +26,9 @@ public final class LoadGenerator { ...@@ -26,6 +26,9 @@ public final class LoadGenerator {
private LoadGenerator() {} private LoadGenerator() {}
/**
* Entry point.
*/
public static void main(final String[] args) throws InterruptedException, IOException { public static void main(final String[] args) throws InterruptedException, IOException {
// uc1 // uc1
LOGGER.info("Start workload generator for use case UC1."); LOGGER.info("Start workload generator for use case UC1.");
......
package theodolite.kafkasender;
import java.util.Properties;
import java.util.function.Function;
import kieker.common.record.IMonitoringRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde;
/**
* Sends monitoring records to Kafka.
*
* @param <T> {@link IMonitoringRecord} to send
*/
public class KafkaRecordSender<T extends IMonitoringRecord> {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaRecordSender.class);
private final String topic;
private final Function<T, String> keyAccessor;
private final Function<T, Long> timestampAccessor;
private final Producer<String, T> producer;
public KafkaRecordSender(final String bootstrapServers, final String topic) {
this(bootstrapServers, topic, x -> "", x -> null, new Properties());
}
public KafkaRecordSender(final String bootstrapServers, final String topic,
final Function<T, String> keyAccessor) {
this(bootstrapServers, topic, keyAccessor, x -> null, new Properties());
}
public KafkaRecordSender(final String bootstrapServers, final String topic,
final Function<T, String> keyAccessor, final Function<T, Long> timestampAccessor) {
this(bootstrapServers, topic, keyAccessor, timestampAccessor, new Properties());
}
/**
* Create a new {@link KafkaRecordSender}.
*/
public KafkaRecordSender(final String bootstrapServers, final String topic,
final Function<T, String> keyAccessor, final Function<T, Long> timestampAccessor,
final Properties defaultProperties) {
this.topic = topic;
this.keyAccessor = keyAccessor;
this.timestampAccessor = timestampAccessor;
final Properties properties = new Properties();
properties.putAll(defaultProperties);
properties.put("bootstrap.servers", bootstrapServers);
// properties.put("acks", this.acknowledges);
// properties.put("batch.size", this.batchSize);
// properties.put("linger.ms", this.lingerMs);
// properties.put("buffer.memory", this.bufferMemory);
this.producer = new KafkaProducer<>(properties, new StringSerializer(),
IMonitoringRecordSerde.serializer());
}
/**
* Write the passed monitoring record to Kafka.
*/
public void write(final T monitoringRecord) {
final ProducerRecord<String, T> record =
new ProducerRecord<>(this.topic, null, this.timestampAccessor.apply(monitoringRecord),
this.keyAccessor.apply(monitoringRecord), monitoringRecord);
LOGGER.debug("Send record to Kafka topic {}: {}", this.topic, record);
this.producer.send(record);
}
public void terminate() {
this.producer.close();
}
}
...@@ -51,6 +51,8 @@ public class LoadGenerator { ...@@ -51,6 +51,8 @@ public class LoadGenerator {
final String kafkaBatchSize = System.getenv("KAFKA_BATCH_SIZE"); final String kafkaBatchSize = System.getenv("KAFKA_BATCH_SIZE");
final String kafkaLingerMs = System.getenv("KAFKA_LINGER_MS"); final String kafkaLingerMs = System.getenv("KAFKA_LINGER_MS");
final String kafkaBufferMemory = System.getenv("KAFKA_BUFFER_MEMORY"); final String kafkaBufferMemory = System.getenv("KAFKA_BUFFER_MEMORY");
final int instances =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("INSTANCES"), "1"));
// build sensor registry // build sensor registry
final MutableSensorRegistry sensorRegistry = final MutableSensorRegistry sensorRegistry =
...@@ -69,6 +71,7 @@ public class LoadGenerator { ...@@ -69,6 +71,7 @@ public class LoadGenerator {
// create workload generator // create workload generator
final KafkaWorkloadGenerator<ActivePowerRecord> workloadGenerator = final KafkaWorkloadGenerator<ActivePowerRecord> workloadGenerator =
KafkaWorkloadGeneratorBuilder.<ActivePowerRecord>builder() KafkaWorkloadGeneratorBuilder.<ActivePowerRecord>builder()
.setInstances(instances)
.setKeySpace(new KeySpace("s_", numSensors)) .setKeySpace(new KeySpace("s_", numSensors))
.setThreads(threads) .setThreads(threads)
.setPeriod(Duration.of(periodMs, ChronoUnit.MILLIS)) .setPeriod(Duration.of(periodMs, ChronoUnit.MILLIS))
......
...@@ -43,6 +43,8 @@ public class LoadGenerator { ...@@ -43,6 +43,8 @@ public class LoadGenerator {
final String kafkaBatchSize = System.getenv("KAFKA_BATCH_SIZE"); final String kafkaBatchSize = System.getenv("KAFKA_BATCH_SIZE");
final String kafkaLingerMs = System.getenv("KAFKA_LINGER_MS"); final String kafkaLingerMs = System.getenv("KAFKA_LINGER_MS");
final String kafkaBufferMemory = System.getenv("KAFKA_BUFFER_MEMORY"); final String kafkaBufferMemory = System.getenv("KAFKA_BUFFER_MEMORY");
final int instances =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("INSTANCES"), "1"));
// create kafka record sender // create kafka record sender
final Properties kafkaProperties = new Properties(); final Properties kafkaProperties = new Properties();
...@@ -57,6 +59,7 @@ public class LoadGenerator { ...@@ -57,6 +59,7 @@ public class LoadGenerator {
// create workload generator // create workload generator
final KafkaWorkloadGenerator<ActivePowerRecord> workloadGenerator = final KafkaWorkloadGenerator<ActivePowerRecord> workloadGenerator =
KafkaWorkloadGeneratorBuilder.<ActivePowerRecord>builder() KafkaWorkloadGeneratorBuilder.<ActivePowerRecord>builder()
.setInstances(instances)
.setKeySpace(new KeySpace("s_", numSensors)) .setKeySpace(new KeySpace("s_", numSensors))
.setThreads(threads) .setThreads(threads)
.setPeriod(Duration.of(periodMs, ChronoUnit.MILLIS)) .setPeriod(Duration.of(periodMs, ChronoUnit.MILLIS))
......
...@@ -43,6 +43,8 @@ public class LoadGenerator { ...@@ -43,6 +43,8 @@ public class LoadGenerator {
final String kafkaBatchSize = System.getenv("KAFKA_BATCH_SIZE"); final String kafkaBatchSize = System.getenv("KAFKA_BATCH_SIZE");
final String kafkaLingerMs = System.getenv("KAFKA_LINGER_MS"); final String kafkaLingerMs = System.getenv("KAFKA_LINGER_MS");
final String kafkaBufferMemory = System.getenv("KAFKA_BUFFER_MEMORY"); final String kafkaBufferMemory = System.getenv("KAFKA_BUFFER_MEMORY");
final int instances =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("INSTANCES"), "1"));
// create kafka record sender // create kafka record sender
final Properties kafkaProperties = new Properties(); final Properties kafkaProperties = new Properties();
...@@ -57,6 +59,7 @@ public class LoadGenerator { ...@@ -57,6 +59,7 @@ public class LoadGenerator {
// create workload generator // create workload generator
final KafkaWorkloadGenerator<ActivePowerRecord> workloadGenerator = final KafkaWorkloadGenerator<ActivePowerRecord> workloadGenerator =
KafkaWorkloadGeneratorBuilder.<ActivePowerRecord>builder() KafkaWorkloadGeneratorBuilder.<ActivePowerRecord>builder()
.setInstances(instances)
.setKeySpace(new KeySpace("s_", numSensors)) .setKeySpace(new KeySpace("s_", numSensors))
.setThreads(threads) .setThreads(threads)
.setPeriod(Duration.of(periodMs, ChronoUnit.MILLIS)) .setPeriod(Duration.of(periodMs, ChronoUnit.MILLIS))
......
...@@ -2,13 +2,13 @@ package theodolite.commons.workloadgeneration.communication.kafka; ...@@ -2,13 +2,13 @@ package theodolite.commons.workloadgeneration.communication.kafka;
import java.util.Properties; import java.util.Properties;
import java.util.function.Function; import java.util.function.Function;
import kieker.common.record.IMonitoringRecord;
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import kieker.common.record.IMonitoringRecord;
import theodolite.commons.workloadgeneration.functions.Transport; import theodolite.commons.workloadgeneration.functions.Transport;
import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde; import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde;
......
...@@ -19,7 +19,7 @@ import theodolite.commons.workloadgeneration.functions.BeforeAction; ...@@ -19,7 +19,7 @@ import theodolite.commons.workloadgeneration.functions.BeforeAction;
import theodolite.commons.workloadgeneration.misc.WorkloadDefinition; import theodolite.commons.workloadgeneration.misc.WorkloadDefinition;
import theodolite.commons.workloadgeneration.misc.ZooKeeper; import theodolite.commons.workloadgeneration.misc.ZooKeeper;
/* /**
* The central class responsible for distributing the workload through all workload generators. * The central class responsible for distributing the workload through all workload generators.
*/ */
public class WorkloadDistributor { public class WorkloadDistributor {
...@@ -30,6 +30,14 @@ public class WorkloadDistributor { ...@@ -30,6 +30,14 @@ public class WorkloadDistributor {
private static final String COUNTER_PATH = "/counter"; private static final String COUNTER_PATH = "/counter";
private static final String WORKLOAD_PATH = "/workload"; private static final String WORKLOAD_PATH = "/workload";
private static final String WORKLOAD_DEFINITION_PATH = "/workload/definition"; private static final String WORKLOAD_DEFINITION_PATH = "/workload/definition";
// Curator retry strategy
private static final int BASE_SLEEP_TIME_MS = 2000;
private static final int MAX_RETRIES = 5;
// Wait time
private static final int MAX_WAIT_TIME = 20_000;
private final DistributedAtomicInteger counter; private final DistributedAtomicInteger counter;
private final KeySpace keySpace; private final KeySpace keySpace;
private final BeforeAction beforeAction; private final BeforeAction beforeAction;
...@@ -39,7 +47,7 @@ public class WorkloadDistributor { ...@@ -39,7 +47,7 @@ public class WorkloadDistributor {
private final ZooKeeper zooKeeper; private final ZooKeeper zooKeeper;
private final CuratorFramework client; private final CuratorFramework client;
private boolean workloadGenerationStarted = true; private boolean workloadGenerationStarted = false;
/** /**
* Create a new workload distributor. * Create a new workload distributor.
...@@ -63,7 +71,7 @@ public class WorkloadDistributor { ...@@ -63,7 +71,7 @@ public class WorkloadDistributor {
this.client = CuratorFrameworkFactory.builder() this.client = CuratorFrameworkFactory.builder()
.namespace(NAMESPACE) .namespace(NAMESPACE)
.connectString(this.zooKeeper.getHost() + ":" + this.zooKeeper.getPort()) .connectString(this.zooKeeper.getHost() + ":" + this.zooKeeper.getPort())
.retryPolicy(new ExponentialBackoffRetry(2000, 5)) .retryPolicy(new ExponentialBackoffRetry(BASE_SLEEP_TIME_MS, MAX_RETRIES))
.build(); .build();
this.client.start(); this.client.start();
...@@ -77,7 +85,7 @@ public class WorkloadDistributor { ...@@ -77,7 +85,7 @@ public class WorkloadDistributor {
this.counter = this.counter =
new DistributedAtomicInteger(this.client, COUNTER_PATH, new DistributedAtomicInteger(this.client, COUNTER_PATH,
new ExponentialBackoffRetry(2000, 5)); new ExponentialBackoffRetry(BASE_SLEEP_TIME_MS, MAX_RETRIES));
} }
/** /**
...@@ -94,7 +102,11 @@ public class WorkloadDistributor { ...@@ -94,7 +102,11 @@ public class WorkloadDistributor {
final CuratorWatcher watcher = this.buildWatcher(workerId); final CuratorWatcher watcher = this.buildWatcher(workerId);
final Stat nodeExists =
this.client.checkExists().creatingParentsIfNeeded().forPath(WORKLOAD_PATH); this.client.checkExists().creatingParentsIfNeeded().forPath(WORKLOAD_PATH);
if (nodeExists == null) {
this.client.create().forPath(WORKLOAD_PATH);
}
if (workerId == 0) { if (workerId == 0) {
LOGGER.info("This instance is master with id {}", workerId); LOGGER.info("This instance is master with id {}", workerId);
...@@ -117,16 +129,19 @@ public class WorkloadDistributor { ...@@ -117,16 +129,19 @@ public class WorkloadDistributor {
this.client.getChildren().usingWatcher(watcher).forPath(WORKLOAD_PATH); this.client.getChildren().usingWatcher(watcher).forPath(WORKLOAD_PATH);
final Stat exists = final Stat definitionExists =
this.client.checkExists().creatingParentsIfNeeded().forPath(WORKLOAD_DEFINITION_PATH); this.client.checkExists().creatingParentsIfNeeded().forPath(WORKLOAD_DEFINITION_PATH);
if (exists != null) { if (definitionExists != null) {
this.startWorkloadGeneration(workerId); this.startWorkloadGeneration(workerId);
} }
} }
Thread.sleep(20_000); Thread.sleep(MAX_WAIT_TIME);
if (!this.workloadGenerationStarted) {
LOGGER.warn("No workload definition retrieved for 20 s. Terminating now..");
}
} catch (final Exception e) { } catch (final Exception e) {
LOGGER.error("", e); LOGGER.error("", e);
throw new IllegalStateException("Error when starting the distribution of the workload."); throw new IllegalStateException("Error when starting the distribution of the workload.");
...@@ -137,7 +152,7 @@ public class WorkloadDistributor { ...@@ -137,7 +152,7 @@ public class WorkloadDistributor {
* Start the workload generation. This methods body does only get executed once. * Start the workload generation. This methods body does only get executed once.
* *
* @param workerId the ID of this worker * @param workerId the ID of this worker
* @throws Exception * @throws Exception when an error occurs
*/ */
private synchronized void startWorkloadGeneration(final int workerId) throws Exception { private synchronized void startWorkloadGeneration(final int workerId) throws Exception {
if (!this.workloadGenerationStarted) { if (!this.workloadGenerationStarted) {
......
...@@ -21,6 +21,11 @@ import theodolite.commons.workloadgeneration.misc.WorkloadDefinition; ...@@ -21,6 +21,11 @@ import theodolite.commons.workloadgeneration.misc.WorkloadDefinition;
import theodolite.commons.workloadgeneration.misc.WorkloadEntity; import theodolite.commons.workloadgeneration.misc.WorkloadEntity;
import theodolite.commons.workloadgeneration.misc.ZooKeeper; import theodolite.commons.workloadgeneration.misc.ZooKeeper;
/**
* Base for workload generators.
*
* @param <T> The type of records the workload generator is dedicated for.
*/
public abstract class AbstractWorkloadGenerator<T extends IMonitoringRecord> public abstract class AbstractWorkloadGenerator<T extends IMonitoringRecord>
implements WorkloadGenerator { implements WorkloadGenerator {
...@@ -51,19 +56,18 @@ public abstract class AbstractWorkloadGenerator<T extends IMonitoringRecord> ...@@ -51,19 +56,18 @@ public abstract class AbstractWorkloadGenerator<T extends IMonitoringRecord>
private final ScheduledExecutorService executor; private final ScheduledExecutorService executor;
/** /**
* Start the workload generation. The generation terminates automatically after the specified * Create a new workload generator.
* {@code duration}. *
* @param instances the number of workload-generator instances.
* @param zooKeeper the zookeeper connection.
* @param keySpace the keyspace.
* @param threads the number of threads that is used to generate the load.
* @param period the period, how often a new record is emitted.
* @param duration the maximum runtime.
* @param beforeAction the action to perform before the workload generation starts.
* @param generatorFunction the function that is used to generate the individual records.
* @param transport the function that is used to send generated messages to the messaging system.
*/ */
@Override
public void start() {
this.workloadDistributor.start();
}
@Override
public void stop() {
this.workloadDistributor.stop();
}
public AbstractWorkloadGenerator( public AbstractWorkloadGenerator(
final int instances, final int instances,
final ZooKeeper zooKeeper, final ZooKeeper zooKeeper,
...@@ -99,19 +103,25 @@ public abstract class AbstractWorkloadGenerator<T extends IMonitoringRecord> ...@@ -99,19 +103,25 @@ public abstract class AbstractWorkloadGenerator<T extends IMonitoringRecord>
this.executor = Executors.newScheduledThreadPool(threads); this.executor = Executors.newScheduledThreadPool(threads);
final Random random = new Random(); final Random random = new Random();
final int periodMs = period.getNano() / 1_000_000; final int periodMs = (int) period.toMillis();
LOGGER.info("Period: " + periodMs);
final BiConsumer<WorkloadDefinition, Integer> workerAction = (declaration, workerId) -> { final BiConsumer<WorkloadDefinition, Integer> workerAction = (declaration, workerId) -> {
final List<WorkloadEntity<T>> entities = this.workloadSelector.apply(declaration, workerId); final List<WorkloadEntity<T>> entities = this.workloadSelector.apply(declaration, workerId);
LOGGER.info("Beginning of Experiment..."); LOGGER.info("Beginning of Experiment...");
LOGGER.info("Generating records for {} keys.", entities.size());
LOGGER.info("Experiment is going to be executed for the specified duration..."); LOGGER.info("Experiment is going to be executed for the specified duration...");
entities.forEach(entity -> { entities.forEach(entity -> {
final T message = entity.generateMessage(); final T message = entity.generateMessage();
final long initialDelay = random.nextInt(periodMs); final long initialDelay = random.nextInt(periodMs);
this.executor.scheduleAtFixedRate(() -> this.transport.transport(message), initialDelay, final Runnable task = () -> {
this.transport.transport(message);
};
this.executor.scheduleAtFixedRate(task, initialDelay,
periodMs, TimeUnit.MILLISECONDS); periodMs, TimeUnit.MILLISECONDS);
}); });
...@@ -130,4 +140,18 @@ public abstract class AbstractWorkloadGenerator<T extends IMonitoringRecord> ...@@ -130,4 +140,18 @@ public abstract class AbstractWorkloadGenerator<T extends IMonitoringRecord>
new WorkloadDistributor(this.instances, this.zooKeeper, this.keySpace, this.beforeAction, new WorkloadDistributor(this.instances, this.zooKeeper, this.keySpace, this.beforeAction,
workerAction); workerAction);
} }
/**
* Start the workload generation. The generation terminates automatically after the specified
* {@code duration}.
*/
@Override
public void start() {
this.workloadDistributor.start();
}
@Override
public void stop() {
this.workloadDistributor.stop();
}
} }
...@@ -10,6 +10,8 @@ import theodolite.commons.workloadgeneration.misc.ZooKeeper; ...@@ -10,6 +10,8 @@ import theodolite.commons.workloadgeneration.misc.ZooKeeper;
/** /**
* Workload generator for generating load for the kafka messaging system. * Workload generator for generating load for the kafka messaging system.
*
* @param <T> The type of records the workload generator is dedicated for.
*/ */
public class KafkaWorkloadGenerator<T extends IMonitoringRecord> public class KafkaWorkloadGenerator<T extends IMonitoringRecord>
extends AbstractWorkloadGenerator<T> { extends AbstractWorkloadGenerator<T> {
......
...@@ -9,6 +9,11 @@ import theodolite.commons.workloadgeneration.functions.BeforeAction; ...@@ -9,6 +9,11 @@ import theodolite.commons.workloadgeneration.functions.BeforeAction;
import theodolite.commons.workloadgeneration.functions.MessageGenerator; import theodolite.commons.workloadgeneration.functions.MessageGenerator;
import theodolite.commons.workloadgeneration.misc.ZooKeeper; import theodolite.commons.workloadgeneration.misc.ZooKeeper;
/**
* Builder for {@link workload generators}.
*
* @param <T> the record for which the builder is dedicated for.
*/
public final class KafkaWorkloadGeneratorBuilder<T extends IMonitoringRecord> { public final class KafkaWorkloadGeneratorBuilder<T extends IMonitoringRecord> {
private int instances; private int instances;
...@@ -45,7 +50,7 @@ public final class KafkaWorkloadGeneratorBuilder<T extends IMonitoringRecord> { ...@@ -45,7 +50,7 @@ public final class KafkaWorkloadGeneratorBuilder<T extends IMonitoringRecord> {
/** /**
* Set the number of instances. * Set the number of instances.
* *
* @param zooKeeper a reference to the ZooKeeper instance. * @param instances the number of instances.
* @return the builder. * @return the builder.
*/ */
public KafkaWorkloadGeneratorBuilder<T> setInstances(final int instances) { public KafkaWorkloadGeneratorBuilder<T> setInstances(final int instances) {
...@@ -89,7 +94,7 @@ public final class KafkaWorkloadGeneratorBuilder<T extends IMonitoringRecord> { ...@@ -89,7 +94,7 @@ public final class KafkaWorkloadGeneratorBuilder<T extends IMonitoringRecord> {
/** /**
* Set the key space for the {@link KafkaWorkloadGenerator}. * Set the key space for the {@link KafkaWorkloadGenerator}.
* *
* @param keySpace the {@link KeySpace}. * @param threads the number of threads.
* @return the builder. * @return the builder.
*/ */
public KafkaWorkloadGeneratorBuilder<T> setThreads(final int threads) { public KafkaWorkloadGeneratorBuilder<T> setThreads(final int threads) {
......
...@@ -2,11 +2,17 @@ package theodolite.commons.workloadgeneration.misc; ...@@ -2,11 +2,17 @@ package theodolite.commons.workloadgeneration.misc;
import theodolite.commons.workloadgeneration.dimensions.KeySpace; import theodolite.commons.workloadgeneration.dimensions.KeySpace;
/* /**
* The central class that contains all information that needs to be exchanged between the nodes for * The central class that contains all information that needs to be exchanged between the nodes for
* distributed workload generation. * distributed workload generation.
*/ */
public class WorkloadDefinition { public class WorkloadDefinition {
private static final int ZERO = 0;
private static final int ONE = 1;
private static final int TWO = 2;
private static final int THREE = 3;
private static final int FOUR = 4;
private final KeySpace keySpace; private final KeySpace keySpace;
private final int numberOfWorkers; private final int numberOfWorkers;
...@@ -52,12 +58,14 @@ public class WorkloadDefinition { ...@@ -52,12 +58,14 @@ public class WorkloadDefinition {
public static WorkloadDefinition fromString(final String workloadDefinitionString) { public static WorkloadDefinition fromString(final String workloadDefinitionString) {
final String[] deserialized = workloadDefinitionString.split(";"); final String[] deserialized = workloadDefinitionString.split(";");
if (deserialized.length != 4) { if (deserialized.length != FOUR) {
throw new IllegalArgumentException( throw new IllegalArgumentException(
"Wrong workload definition string when trying to parse the workload generation."); "Wrong workload definition string when trying to parse the workload generation.");
} }
return new WorkloadDefinition(new KeySpace(deserialized[0], Integer.valueOf(deserialized[1]), return new WorkloadDefinition(
Integer.valueOf(deserialized[2])), Integer.valueOf(deserialized[3])); new KeySpace(deserialized[ZERO], Integer.valueOf(deserialized[ONE]),
Integer.valueOf(deserialized[TWO])),
Integer.valueOf(deserialized[THREE]));
} }
} }
...@@ -3,8 +3,10 @@ package theodolite.commons.workloadgeneration.misc; ...@@ -3,8 +3,10 @@ package theodolite.commons.workloadgeneration.misc;
import kieker.common.record.IMonitoringRecord; import kieker.common.record.IMonitoringRecord;
import theodolite.commons.workloadgeneration.functions.MessageGenerator; import theodolite.commons.workloadgeneration.functions.MessageGenerator;
/* /**
* Representation of a entity of the workload generation that generates load for one fixed key. * Representation of a entity of the workload generation that generates load for one fixed key.
*
* @param <T> The type of records the workload generator is dedicated for.
*/ */
public class WorkloadEntity<T extends IMonitoringRecord> { public class WorkloadEntity<T extends IMonitoringRecord> {
private final String key; private final String key;
......
package theodolite.commons.workloadgeneration.misc; package theodolite.commons.workloadgeneration.misc;
/* /**
* Wrapper for connection information for ZooKeeper. * Wrapper for connection information for ZooKeeper.
*/ */
public class ZooKeeper { public class ZooKeeper {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment