Skip to content
Snippets Groups Projects
Commit f350b623 authored by Sören Henning's avatar Sören Henning
Browse files

Migrate specific load generators to Hazelcast-based framework

parent 1b1fd27e
No related branches found
No related tags found
No related merge requests found
...@@ -5,7 +5,7 @@ import org.slf4j.Logger; ...@@ -5,7 +5,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
/** /**
* Load Generator for UC1. * Load Generator for Theodolite use case UC1.
*/ */
public final class LoadGenerator { public final class LoadGenerator {
......
package theodolite.uc2.workloadgenerator; package theodolite.uc2.workloadgenerator;
import java.io.IOException;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Objects; import java.util.Objects;
import java.util.Properties;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import theodolite.commons.workloadgeneration.KafkaRecordSender;
import theodolite.commons.workloadgeneration.KeySpace; import theodolite.commons.workloadgeneration.KeySpace;
import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGenerator;
import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGeneratorBuilder;
import theodolite.commons.workloadgeneration.misc.ZooKeeper;
import titan.ccp.configuration.events.Event; import titan.ccp.configuration.events.Event;
import titan.ccp.model.records.ActivePowerRecord;
import titan.ccp.model.sensorregistry.SensorRegistry; import titan.ccp.model.sensorregistry.SensorRegistry;
/** /**
* The {@code LoadGenerator} creates a load in Kafka. * Load generator for Theodolite use case UC2.
*/ */
public final class LoadGenerator { public final class LoadGenerator {
...@@ -26,114 +16,50 @@ public final class LoadGenerator { ...@@ -26,114 +16,50 @@ public final class LoadGenerator {
private static final Logger LOGGER = LoggerFactory.getLogger(LoadGenerator.class); private static final Logger LOGGER = LoggerFactory.getLogger(LoadGenerator.class);
// Constants private LoadGenerator() {}
private static final String DEEP = "deep";
private static final long MAX_DURATION_IN_DAYS = 30L;
// Make this a utility class, because all methods are static.
private LoadGenerator() {
throw new UnsupportedOperationException();
}
/** /**
* Main method. * Start load generator.
*
* @param args CLI arguments
* @throws InterruptedException Interrupt happened
* @throws IOException happened.
*/ */
public static void main(final String[] args) throws InterruptedException, IOException { public static void main(final String[] args) {
// uc2 final boolean sendRegistry = Boolean.parseBoolean(Objects.requireNonNullElse(
LOGGER.info("Start workload generator for use case UC2."); System.getenv("SEND_REGISTRY"),
"true"));
// get environment variables final String kafkaBootstrapServers = Objects.requireNonNullElse(
final String hierarchy = System.getenv("HIERARCHY"); System.getenv("KAFKA_BOOTSTRAP_SERVERS"),
if (hierarchy != null && hierarchy.equals(DEEP)) { "localhost:9092");
LOGGER.error( final int numSensors = Integer.parseInt(Objects.requireNonNullElse(
"The HIERARCHY parameter is no longer supported. Creating a full hierachy instead."); System.getenv("NUM_SENSORS"),
} "1"));
final int numNestedGroups = Integer final int numNestedGroups = Integer.parseInt(Objects.requireNonNullElse(
.parseInt(Objects.requireNonNullElse(System.getenv("NUM_NESTED_GROUPS"), "1")); System.getenv("NUM_NESTED_GROUPS"),
final String zooKeeperHost = Objects.requireNonNullElse(System.getenv("ZK_HOST"), "localhost"); "1"));
final int zooKeeperPort =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("ZK_PORT"), "2181"));
final int numSensors =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_SENSORS"), "1"));
final int periodMs =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("PERIOD_MS"), "1000"));
final double value =
Double.parseDouble(Objects.requireNonNullElse(System.getenv("VALUE"), "10"));
final boolean sendRegistry = Boolean
.parseBoolean(Objects.requireNonNullElse(System.getenv("SEND_REGISTRY"), "true"));
final int threads = Integer.parseInt(Objects.requireNonNullElse(System.getenv("THREADS"), "4"));
final String kafkaBootstrapServers =
Objects.requireNonNullElse(System.getenv("KAFKA_BOOTSTRAP_SERVERS"),
"localhost:9092");
final String schemaRegistryUrl =
Objects.requireNonNullElse(System.getenv("SCHEMA_REGISTRY_URL"), "http://localhost:8091");
final String kafkaInputTopic =
Objects.requireNonNullElse(System.getenv("KAFKA_INPUT_TOPIC"), "input");
final String kafkaBatchSize = System.getenv("KAFKA_BATCH_SIZE");
final String kafkaLingerMs = System.getenv("KAFKA_LINGER_MS");
final String kafkaBufferMemory = System.getenv("KAFKA_BUFFER_MEMORY");
final int instances =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("INSTANCES"), "1"));
// build sensor registry // Build sensor hierarchy
final SensorRegistry sensorRegistry = final SensorRegistry sensorRegistry =
new SensorRegistryBuilder(numNestedGroups, numSensors).build(); new SensorRegistryBuilder(numNestedGroups, numSensors).build();
// create kafka record sender LOGGER.info("Start workload generator for use case UC2");
final Properties kafkaProperties = new Properties(); theodolite.commons.workloadgeneration.LoadGenerator.fromEnvironment()
// kafkaProperties.put("acks", this.acknowledges); .withKeySpace(new KeySpace("s_", sensorRegistry.getMachineSensors().size()))
kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, (k, v) -> kafkaBatchSize); .withBeforeAction(() -> {
kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG, (k, v) -> kafkaLingerMs); if (sendRegistry) {
kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG, (k, v) -> kafkaBufferMemory); final ConfigPublisher configPublisher =
new ConfigPublisher(kafkaBootstrapServers, "configuration");
final KafkaRecordSender<ActivePowerRecord> kafkaRecordSender = configPublisher.publish(Event.SENSOR_REGISTRY_CHANGED, sensorRegistry.toJson());
KafkaRecordSender.<ActivePowerRecord>builder( configPublisher.close();
kafkaBootstrapServers, LOGGER.info("Configuration sent.");
kafkaInputTopic,
schemaRegistryUrl)
.keyAccessor(r -> r.getIdentifier())
.timestampAccessor(r -> r.getTimestamp())
.defaultProperties(kafkaProperties)
.build();
// create workload generator
final KafkaWorkloadGenerator<ActivePowerRecord> workloadGenerator =
KafkaWorkloadGeneratorBuilder.<ActivePowerRecord>builder()
.instances(instances)
.keySpace(new KeySpace("s_", sensorRegistry.getMachineSensors().size()))
.threads(threads)
.period(Duration.of(periodMs, ChronoUnit.MILLIS))
.duration(Duration.of(MAX_DURATION_IN_DAYS, ChronoUnit.DAYS))
.beforeAction(() -> {
if (sendRegistry) {
final ConfigPublisher configPublisher =
new ConfigPublisher(kafkaBootstrapServers, "configuration");
configPublisher.publish(Event.SENSOR_REGISTRY_CHANGED, sensorRegistry.toJson());
configPublisher.close();
LOGGER.info("Configuration sent.");
LOGGER.info("Now wait 30 seconds");
try {
Thread.sleep(SLEEP_PERIOD);
} catch (final InterruptedException e) {
// TODO Auto-generated catch block
LOGGER.error(e.getMessage(), e);
}
LOGGER.info("And woke up again :)");
}
})
.generatorFunction(
sensor -> new ActivePowerRecord(sensor, System.currentTimeMillis(), value))
.zooKeeper(new ZooKeeper(zooKeeperHost, zooKeeperPort))
.kafkaRecordSender(kafkaRecordSender)
.build();
// start LOGGER.info("Now wait 30 seconds...");
workloadGenerator.start(); try {
Thread.sleep(SLEEP_PERIOD);
} catch (final InterruptedException e) {
LOGGER.error(e.getMessage(), e);
}
LOGGER.info("...and start generating load.");
}
})
.run();
} }
} }
package theodolite.uc3.workloadgenerator; package theodolite.uc3.workloadgenerator;
import java.io.IOException;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Objects;
import java.util.Properties;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import theodolite.commons.workloadgeneration.KafkaRecordSender;
import theodolite.commons.workloadgeneration.KeySpace;
import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGenerator;
import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGeneratorBuilder;
import theodolite.commons.workloadgeneration.misc.ZooKeeper;
import titan.ccp.model.records.ActivePowerRecord;
/** /**
* The {@code LoadGenerator} creates a load in Kafka. * Load generator for Theodolite use case UC3.
*/ */
public final class LoadGenerator { public final class LoadGenerator {
private static final Logger LOGGER = LoggerFactory.getLogger(LoadGenerator.class); private static final Logger LOGGER = LoggerFactory.getLogger(LoadGenerator.class);
// constants private LoadGenerator() {}
private static final long MAX_DURATION_IN_DAYS = 30L;
// Make this a utility class, because all methods are static.
private LoadGenerator() {
throw new UnsupportedOperationException();
}
/**
* Main method.
*
* @param args CLI arguments
* @throws InterruptedException Interrupt happened
* @throws IOException happened.
*/
public static void main(final String[] args) throws InterruptedException, IOException {
// uc2
LOGGER.info("Start workload generator for use case UC3.");
// get environment variables
final String zooKeeperHost = Objects.requireNonNullElse(System.getenv("ZK_HOST"), "localhost");
final int zooKeeperPort =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("ZK_PORT"), "2181"));
final int numSensors =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_SENSORS"), "10"));
final int periodMs =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("PERIOD_MS"), "1000"));
final double value =
Double.parseDouble(Objects.requireNonNullElse(System.getenv("VALUE"), "10"));
final int threads = Integer.parseInt(Objects.requireNonNullElse(System.getenv("THREADS"), "4"));
final String kafkaBootstrapServers =
Objects.requireNonNullElse(System.getenv("KAFKA_BOOTSTRAP_SERVERS"),
"localhost:9092");
final String schemaRegistryUrl =
Objects.requireNonNullElse(System.getenv("SCHEMA_REGISTRY_URL"), "http://localhost:8091");
final String kafkaInputTopic =
Objects.requireNonNullElse(System.getenv("KAFKA_INPUT_TOPIC"), "input");
final String kafkaBatchSize = System.getenv("KAFKA_BATCH_SIZE");
final String kafkaLingerMs = System.getenv("KAFKA_LINGER_MS");
final String kafkaBufferMemory = System.getenv("KAFKA_BUFFER_MEMORY");
final int instances =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("INSTANCES"), "1"));
// create kafka record sender
final Properties kafkaProperties = new Properties();
// kafkaProperties.put("acks", this.acknowledges);
kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, (k, v) -> kafkaBatchSize);
kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG, (k, v) -> kafkaLingerMs);
kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG, (k, v) -> kafkaBufferMemory);
final KafkaRecordSender<ActivePowerRecord> kafkaRecordSender =
KafkaRecordSender.<ActivePowerRecord>builder(
kafkaBootstrapServers,
kafkaInputTopic,
schemaRegistryUrl)
.keyAccessor(r -> r.getIdentifier())
.timestampAccessor(r -> r.getTimestamp())
.defaultProperties(kafkaProperties)
.build();
// create workload generator
final KafkaWorkloadGenerator<ActivePowerRecord> workloadGenerator =
KafkaWorkloadGeneratorBuilder.<ActivePowerRecord>builder()
.instances(instances)
.keySpace(new KeySpace("s_", numSensors))
.threads(threads)
.period(Duration.of(periodMs, ChronoUnit.MILLIS))
.duration(Duration.of(MAX_DURATION_IN_DAYS, ChronoUnit.DAYS))
.generatorFunction(
sensor -> new ActivePowerRecord(sensor, System.currentTimeMillis(), value))
.zooKeeper(new ZooKeeper(zooKeeperHost, zooKeeperPort))
.kafkaRecordSender(kafkaRecordSender)
.build();
// start
workloadGenerator.start();
public static void main(final String[] args) {
LOGGER.info("Start workload generator for use case UC3");
theodolite.commons.workloadgeneration.LoadGenerator.fromEnvironment().run();
} }
} }
package theodolite.uc4.workloadgenerator; package theodolite.uc4.workloadgenerator;
import java.io.IOException;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Objects;
import java.util.Properties;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import theodolite.commons.workloadgeneration.KafkaRecordSender;
import theodolite.commons.workloadgeneration.KeySpace;
import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGenerator;
import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGeneratorBuilder;
import theodolite.commons.workloadgeneration.misc.ZooKeeper;
import titan.ccp.model.records.ActivePowerRecord;
/** /**
* The {@code LoadGenerator} creates a load in Kafka. * Load generator for Theodolite use case UC4.
*/ */
public final class LoadGenerator { public final class LoadGenerator {
private static final Logger LOGGER = LoggerFactory.getLogger(LoadGenerator.class); private static final Logger LOGGER = LoggerFactory.getLogger(LoadGenerator.class);
// constants
private static final long MAX_DURATION_IN_DAYS = 30L;
// Make this a utility class, because all methods are static.
private LoadGenerator() { private LoadGenerator() {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
/** public static void main(final String[] args) {
* Main method. LOGGER.info("Start workload generator for use case UC4");
* theodolite.commons.workloadgeneration.LoadGenerator.fromEnvironment().run();
* @param args CLI arguments
* @throws InterruptedException Interrupt happened
* @throws IOException happened.
*/
public static void main(final String[] args) throws InterruptedException, IOException {
// uc4
LOGGER.info("Start workload generator for use case UC4.");
// get environment variables
final String zooKeeperHost = Objects.requireNonNullElse(System.getenv("ZK_HOST"), "localhost");
final int zooKeeperPort =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("ZK_PORT"), "2181"));
final int numSensors =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_SENSORS"), "10"));
final int periodMs =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("PERIOD_MS"), "1000"));
final double value =
Double.parseDouble(Objects.requireNonNullElse(System.getenv("VALUE"), "10"));
final int threads = Integer.parseInt(Objects.requireNonNullElse(System.getenv("THREADS"), "1"));
final String kafkaBootstrapServers =
Objects.requireNonNullElse(System.getenv("KAFKA_BOOTSTRAP_SERVERS"),
"localhost:9092");
final String schemaRegistryUrl =
Objects.requireNonNullElse(System.getenv("SCHEMA_REGISTRY_URL"), "http://localhost:8091");
final String kafkaInputTopic =
Objects.requireNonNullElse(System.getenv("KAFKA_INPUT_TOPIC"), "input");
final String kafkaBatchSize = System.getenv("KAFKA_BATCH_SIZE");
final String kafkaLingerMs = System.getenv("KAFKA_LINGER_MS");
final String kafkaBufferMemory = System.getenv("KAFKA_BUFFER_MEMORY");
final int instances =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("INSTANCES"), "1"));
// create kafka record sender
final Properties kafkaProperties = new Properties();
// kafkaProperties.put("acks", this.acknowledges);
kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, (k, v) -> kafkaBatchSize);
kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG, (k, v) -> kafkaLingerMs);
kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG, (k, v) -> kafkaBufferMemory);
final KafkaRecordSender<ActivePowerRecord> kafkaRecordSender =
KafkaRecordSender.<ActivePowerRecord>builder(
kafkaBootstrapServers,
kafkaInputTopic,
schemaRegistryUrl)
.keyAccessor(r -> r.getIdentifier())
.timestampAccessor(r -> r.getTimestamp())
.defaultProperties(kafkaProperties)
.build();
// create workload generator
final KafkaWorkloadGenerator<ActivePowerRecord> workloadGenerator =
KafkaWorkloadGeneratorBuilder.<ActivePowerRecord>builder()
.instances(instances)
.keySpace(new KeySpace("s_", numSensors))
.threads(threads)
.period(Duration.of(periodMs, ChronoUnit.MILLIS))
.duration(Duration.of(MAX_DURATION_IN_DAYS, ChronoUnit.DAYS))
.generatorFunction(
sensor -> new ActivePowerRecord(sensor, System.currentTimeMillis(), value))
.zooKeeper(new ZooKeeper(zooKeeperHost, zooKeeperPort))
.kafkaRecordSender(kafkaRecordSender)
.build();
// start
workloadGenerator.start();
} }
} }
...@@ -43,6 +43,11 @@ public final class LoadGenerator { ...@@ -43,6 +43,11 @@ public final class LoadGenerator {
return this; return this;
} }
public LoadGenerator withKeySpace(final KeySpace keySpace) {
this.loadDefinition = new WorkloadDefinition(keySpace, this.loadDefinition.getPeriod());
return this;
}
public LoadGenerator withBeforeAction(final BeforeAction beforeAction) { public LoadGenerator withBeforeAction(final BeforeAction beforeAction) {
this.generatorConfig.setBeforeAction(beforeAction); this.generatorConfig.setBeforeAction(beforeAction);
return this; return this;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment