diff --git a/benchmarks/uc1-workload-generator/src/main/java/theodolite/uc1/workloadgenerator/LoadGenerator.java b/benchmarks/uc1-workload-generator/src/main/java/theodolite/uc1/workloadgenerator/LoadGenerator.java index a5dc622a5102aaa33eb1604f1bf7bfa8a3b61680..26741eb33b2a8d1c23a40938d1261254ac37b636 100644 --- a/benchmarks/uc1-workload-generator/src/main/java/theodolite/uc1/workloadgenerator/LoadGenerator.java +++ b/benchmarks/uc1-workload-generator/src/main/java/theodolite/uc1/workloadgenerator/LoadGenerator.java @@ -5,7 +5,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Load Generator for UC1. + * Load Generator for Theodolite use case UC1. */ public final class LoadGenerator { diff --git a/benchmarks/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/LoadGenerator.java b/benchmarks/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/LoadGenerator.java index 3fbde336f18e71036700b5241e3307437db810a1..826387c484455fed4a7accb5dda56a66a4b63713 100644 --- a/benchmarks/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/LoadGenerator.java +++ b/benchmarks/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/LoadGenerator.java @@ -1,24 +1,14 @@ package theodolite.uc2.workloadgenerator; -import java.io.IOException; -import java.time.Duration; -import java.time.temporal.ChronoUnit; import java.util.Objects; -import java.util.Properties; -import org.apache.kafka.clients.producer.ProducerConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import theodolite.commons.workloadgeneration.KafkaRecordSender; import theodolite.commons.workloadgeneration.KeySpace; -import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGenerator; -import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGeneratorBuilder; -import theodolite.commons.workloadgeneration.misc.ZooKeeper; import titan.ccp.configuration.events.Event; -import titan.ccp.model.records.ActivePowerRecord; import titan.ccp.model.sensorregistry.SensorRegistry; /** - * The {@code LoadGenerator} creates a load in Kafka. + * Load generator for Theodolite use case UC2. */ public final class LoadGenerator { @@ -26,114 +16,50 @@ public final class LoadGenerator { private static final Logger LOGGER = LoggerFactory.getLogger(LoadGenerator.class); - // Constants - private static final String DEEP = "deep"; - private static final long MAX_DURATION_IN_DAYS = 30L; - - // Make this a utility class, because all methods are static. - private LoadGenerator() { - throw new UnsupportedOperationException(); - } + private LoadGenerator() {} /** - * Main method. - * - * @param args CLI arguments - * @throws InterruptedException Interrupt happened - * @throws IOException happened. + * Start load generator. */ - public static void main(final String[] args) throws InterruptedException, IOException { - // uc2 - LOGGER.info("Start workload generator for use case UC2."); - - // get environment variables - final String hierarchy = System.getenv("HIERARCHY"); - if (hierarchy != null && hierarchy.equals(DEEP)) { - LOGGER.error( - "The HIERARCHY parameter is no longer supported. Creating a full hierachy instead."); - } - final int numNestedGroups = Integer - .parseInt(Objects.requireNonNullElse(System.getenv("NUM_NESTED_GROUPS"), "1")); - final String zooKeeperHost = Objects.requireNonNullElse(System.getenv("ZK_HOST"), "localhost"); - final int zooKeeperPort = - Integer.parseInt(Objects.requireNonNullElse(System.getenv("ZK_PORT"), "2181")); - final int numSensors = - Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_SENSORS"), "1")); - final int periodMs = - Integer.parseInt(Objects.requireNonNullElse(System.getenv("PERIOD_MS"), "1000")); - final double value = - Double.parseDouble(Objects.requireNonNullElse(System.getenv("VALUE"), "10")); - final boolean sendRegistry = Boolean - .parseBoolean(Objects.requireNonNullElse(System.getenv("SEND_REGISTRY"), "true")); - final int threads = Integer.parseInt(Objects.requireNonNullElse(System.getenv("THREADS"), "4")); - final String kafkaBootstrapServers = - Objects.requireNonNullElse(System.getenv("KAFKA_BOOTSTRAP_SERVERS"), - "localhost:9092"); - final String schemaRegistryUrl = - Objects.requireNonNullElse(System.getenv("SCHEMA_REGISTRY_URL"), "http://localhost:8091"); - final String kafkaInputTopic = - Objects.requireNonNullElse(System.getenv("KAFKA_INPUT_TOPIC"), "input"); - final String kafkaBatchSize = System.getenv("KAFKA_BATCH_SIZE"); - final String kafkaLingerMs = System.getenv("KAFKA_LINGER_MS"); - final String kafkaBufferMemory = System.getenv("KAFKA_BUFFER_MEMORY"); - final int instances = - Integer.parseInt(Objects.requireNonNullElse(System.getenv("INSTANCES"), "1")); + public static void main(final String[] args) { + final boolean sendRegistry = Boolean.parseBoolean(Objects.requireNonNullElse( + System.getenv("SEND_REGISTRY"), + "true")); + final String kafkaBootstrapServers = Objects.requireNonNullElse( + System.getenv("KAFKA_BOOTSTRAP_SERVERS"), + "localhost:9092"); + final int numSensors = Integer.parseInt(Objects.requireNonNullElse( + System.getenv("NUM_SENSORS"), + "1")); + final int numNestedGroups = Integer.parseInt(Objects.requireNonNullElse( + System.getenv("NUM_NESTED_GROUPS"), + "1")); - // build sensor registry + // Build sensor hierarchy final SensorRegistry sensorRegistry = new SensorRegistryBuilder(numNestedGroups, numSensors).build(); - // create kafka record sender - final Properties kafkaProperties = new Properties(); - // kafkaProperties.put("acks", this.acknowledges); - kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, (k, v) -> kafkaBatchSize); - kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG, (k, v) -> kafkaLingerMs); - kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG, (k, v) -> kafkaBufferMemory); - - final KafkaRecordSender<ActivePowerRecord> kafkaRecordSender = - KafkaRecordSender.<ActivePowerRecord>builder( - kafkaBootstrapServers, - kafkaInputTopic, - schemaRegistryUrl) - .keyAccessor(r -> r.getIdentifier()) - .timestampAccessor(r -> r.getTimestamp()) - .defaultProperties(kafkaProperties) - .build(); - - // create workload generator - final KafkaWorkloadGenerator<ActivePowerRecord> workloadGenerator = - KafkaWorkloadGeneratorBuilder.<ActivePowerRecord>builder() - .instances(instances) - .keySpace(new KeySpace("s_", sensorRegistry.getMachineSensors().size())) - .threads(threads) - .period(Duration.of(periodMs, ChronoUnit.MILLIS)) - .duration(Duration.of(MAX_DURATION_IN_DAYS, ChronoUnit.DAYS)) - .beforeAction(() -> { - if (sendRegistry) { - final ConfigPublisher configPublisher = - new ConfigPublisher(kafkaBootstrapServers, "configuration"); - configPublisher.publish(Event.SENSOR_REGISTRY_CHANGED, sensorRegistry.toJson()); - configPublisher.close(); - LOGGER.info("Configuration sent."); - - LOGGER.info("Now wait 30 seconds"); - try { - Thread.sleep(SLEEP_PERIOD); - } catch (final InterruptedException e) { - // TODO Auto-generated catch block - LOGGER.error(e.getMessage(), e); - } - LOGGER.info("And woke up again :)"); - } - }) - .generatorFunction( - sensor -> new ActivePowerRecord(sensor, System.currentTimeMillis(), value)) - .zooKeeper(new ZooKeeper(zooKeeperHost, zooKeeperPort)) - .kafkaRecordSender(kafkaRecordSender) - .build(); + LOGGER.info("Start workload generator for use case UC2"); + theodolite.commons.workloadgeneration.LoadGenerator.fromEnvironment() + .withKeySpace(new KeySpace("s_", sensorRegistry.getMachineSensors().size())) + .withBeforeAction(() -> { + if (sendRegistry) { + final ConfigPublisher configPublisher = + new ConfigPublisher(kafkaBootstrapServers, "configuration"); + configPublisher.publish(Event.SENSOR_REGISTRY_CHANGED, sensorRegistry.toJson()); + configPublisher.close(); + LOGGER.info("Configuration sent."); - // start - workloadGenerator.start(); + LOGGER.info("Now wait 30 seconds..."); + try { + Thread.sleep(SLEEP_PERIOD); + } catch (final InterruptedException e) { + LOGGER.error(e.getMessage(), e); + } + LOGGER.info("...and start generating load."); + } + }) + .run(); } } diff --git a/benchmarks/uc3-workload-generator/src/main/java/theodolite/uc3/workloadgenerator/LoadGenerator.java b/benchmarks/uc3-workload-generator/src/main/java/theodolite/uc3/workloadgenerator/LoadGenerator.java index 41cdc4a19e09b9042a6f171aef49ff514ccc4c91..662113fd1ae76e64d13933a01d18d9d08e950613 100644 --- a/benchmarks/uc3-workload-generator/src/main/java/theodolite/uc3/workloadgenerator/LoadGenerator.java +++ b/benchmarks/uc3-workload-generator/src/main/java/theodolite/uc3/workloadgenerator/LoadGenerator.java @@ -1,102 +1,19 @@ package theodolite.uc3.workloadgenerator; -import java.io.IOException; -import java.time.Duration; -import java.time.temporal.ChronoUnit; -import java.util.Objects; -import java.util.Properties; -import org.apache.kafka.clients.producer.ProducerConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import theodolite.commons.workloadgeneration.KafkaRecordSender; -import theodolite.commons.workloadgeneration.KeySpace; -import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGenerator; -import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGeneratorBuilder; -import theodolite.commons.workloadgeneration.misc.ZooKeeper; -import titan.ccp.model.records.ActivePowerRecord; /** - * The {@code LoadGenerator} creates a load in Kafka. + * Load generator for Theodolite use case UC3. */ public final class LoadGenerator { private static final Logger LOGGER = LoggerFactory.getLogger(LoadGenerator.class); - // constants - private static final long MAX_DURATION_IN_DAYS = 30L; - - // Make this a utility class, because all methods are static. - private LoadGenerator() { - throw new UnsupportedOperationException(); - } - - /** - * Main method. - * - * @param args CLI arguments - * @throws InterruptedException Interrupt happened - * @throws IOException happened. - */ - public static void main(final String[] args) throws InterruptedException, IOException { - // uc2 - LOGGER.info("Start workload generator for use case UC3."); - - // get environment variables - final String zooKeeperHost = Objects.requireNonNullElse(System.getenv("ZK_HOST"), "localhost"); - final int zooKeeperPort = - Integer.parseInt(Objects.requireNonNullElse(System.getenv("ZK_PORT"), "2181")); - final int numSensors = - Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_SENSORS"), "10")); - final int periodMs = - Integer.parseInt(Objects.requireNonNullElse(System.getenv("PERIOD_MS"), "1000")); - final double value = - Double.parseDouble(Objects.requireNonNullElse(System.getenv("VALUE"), "10")); - final int threads = Integer.parseInt(Objects.requireNonNullElse(System.getenv("THREADS"), "4")); - final String kafkaBootstrapServers = - Objects.requireNonNullElse(System.getenv("KAFKA_BOOTSTRAP_SERVERS"), - "localhost:9092"); - final String schemaRegistryUrl = - Objects.requireNonNullElse(System.getenv("SCHEMA_REGISTRY_URL"), "http://localhost:8091"); - final String kafkaInputTopic = - Objects.requireNonNullElse(System.getenv("KAFKA_INPUT_TOPIC"), "input"); - final String kafkaBatchSize = System.getenv("KAFKA_BATCH_SIZE"); - final String kafkaLingerMs = System.getenv("KAFKA_LINGER_MS"); - final String kafkaBufferMemory = System.getenv("KAFKA_BUFFER_MEMORY"); - final int instances = - Integer.parseInt(Objects.requireNonNullElse(System.getenv("INSTANCES"), "1")); - - // create kafka record sender - final Properties kafkaProperties = new Properties(); - // kafkaProperties.put("acks", this.acknowledges); - kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, (k, v) -> kafkaBatchSize); - kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG, (k, v) -> kafkaLingerMs); - kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG, (k, v) -> kafkaBufferMemory); - final KafkaRecordSender<ActivePowerRecord> kafkaRecordSender = - KafkaRecordSender.<ActivePowerRecord>builder( - kafkaBootstrapServers, - kafkaInputTopic, - schemaRegistryUrl) - .keyAccessor(r -> r.getIdentifier()) - .timestampAccessor(r -> r.getTimestamp()) - .defaultProperties(kafkaProperties) - .build(); - - // create workload generator - final KafkaWorkloadGenerator<ActivePowerRecord> workloadGenerator = - KafkaWorkloadGeneratorBuilder.<ActivePowerRecord>builder() - .instances(instances) - .keySpace(new KeySpace("s_", numSensors)) - .threads(threads) - .period(Duration.of(periodMs, ChronoUnit.MILLIS)) - .duration(Duration.of(MAX_DURATION_IN_DAYS, ChronoUnit.DAYS)) - .generatorFunction( - sensor -> new ActivePowerRecord(sensor, System.currentTimeMillis(), value)) - .zooKeeper(new ZooKeeper(zooKeeperHost, zooKeeperPort)) - .kafkaRecordSender(kafkaRecordSender) - .build(); - - // start - workloadGenerator.start(); + private LoadGenerator() {} + public static void main(final String[] args) { + LOGGER.info("Start workload generator for use case UC3"); + theodolite.commons.workloadgeneration.LoadGenerator.fromEnvironment().run(); } } diff --git a/benchmarks/uc4-workload-generator/src/main/java/theodolite/uc4/workloadgenerator/LoadGenerator.java b/benchmarks/uc4-workload-generator/src/main/java/theodolite/uc4/workloadgenerator/LoadGenerator.java index 892498e1737e0a9b5187c35b8d88ec44e1ff0dc7..c0d885ed1730e0d658a7d176d21d7c57529c55b0 100644 --- a/benchmarks/uc4-workload-generator/src/main/java/theodolite/uc4/workloadgenerator/LoadGenerator.java +++ b/benchmarks/uc4-workload-generator/src/main/java/theodolite/uc4/workloadgenerator/LoadGenerator.java @@ -1,103 +1,22 @@ package theodolite.uc4.workloadgenerator; -import java.io.IOException; -import java.time.Duration; -import java.time.temporal.ChronoUnit; -import java.util.Objects; -import java.util.Properties; -import org.apache.kafka.clients.producer.ProducerConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import theodolite.commons.workloadgeneration.KafkaRecordSender; -import theodolite.commons.workloadgeneration.KeySpace; -import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGenerator; -import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGeneratorBuilder; -import theodolite.commons.workloadgeneration.misc.ZooKeeper; -import titan.ccp.model.records.ActivePowerRecord; /** - * The {@code LoadGenerator} creates a load in Kafka. + * Load generator for Theodolite use case UC4. */ public final class LoadGenerator { private static final Logger LOGGER = LoggerFactory.getLogger(LoadGenerator.class); - // constants - private static final long MAX_DURATION_IN_DAYS = 30L; - - // Make this a utility class, because all methods are static. private LoadGenerator() { throw new UnsupportedOperationException(); } - /** - * Main method. - * - * @param args CLI arguments - * @throws InterruptedException Interrupt happened - * @throws IOException happened. - */ - public static void main(final String[] args) throws InterruptedException, IOException { - // uc4 - LOGGER.info("Start workload generator for use case UC4."); - - // get environment variables - final String zooKeeperHost = Objects.requireNonNullElse(System.getenv("ZK_HOST"), "localhost"); - final int zooKeeperPort = - Integer.parseInt(Objects.requireNonNullElse(System.getenv("ZK_PORT"), "2181")); - final int numSensors = - Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_SENSORS"), "10")); - final int periodMs = - Integer.parseInt(Objects.requireNonNullElse(System.getenv("PERIOD_MS"), "1000")); - final double value = - Double.parseDouble(Objects.requireNonNullElse(System.getenv("VALUE"), "10")); - final int threads = Integer.parseInt(Objects.requireNonNullElse(System.getenv("THREADS"), "1")); - final String kafkaBootstrapServers = - Objects.requireNonNullElse(System.getenv("KAFKA_BOOTSTRAP_SERVERS"), - "localhost:9092"); - final String schemaRegistryUrl = - Objects.requireNonNullElse(System.getenv("SCHEMA_REGISTRY_URL"), "http://localhost:8091"); - final String kafkaInputTopic = - Objects.requireNonNullElse(System.getenv("KAFKA_INPUT_TOPIC"), "input"); - final String kafkaBatchSize = System.getenv("KAFKA_BATCH_SIZE"); - final String kafkaLingerMs = System.getenv("KAFKA_LINGER_MS"); - final String kafkaBufferMemory = System.getenv("KAFKA_BUFFER_MEMORY"); - final int instances = - Integer.parseInt(Objects.requireNonNullElse(System.getenv("INSTANCES"), "1")); - - // create kafka record sender - final Properties kafkaProperties = new Properties(); - // kafkaProperties.put("acks", this.acknowledges); - kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, (k, v) -> kafkaBatchSize); - kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG, (k, v) -> kafkaLingerMs); - kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG, (k, v) -> kafkaBufferMemory); - - final KafkaRecordSender<ActivePowerRecord> kafkaRecordSender = - KafkaRecordSender.<ActivePowerRecord>builder( - kafkaBootstrapServers, - kafkaInputTopic, - schemaRegistryUrl) - .keyAccessor(r -> r.getIdentifier()) - .timestampAccessor(r -> r.getTimestamp()) - .defaultProperties(kafkaProperties) - .build(); - - // create workload generator - final KafkaWorkloadGenerator<ActivePowerRecord> workloadGenerator = - KafkaWorkloadGeneratorBuilder.<ActivePowerRecord>builder() - .instances(instances) - .keySpace(new KeySpace("s_", numSensors)) - .threads(threads) - .period(Duration.of(periodMs, ChronoUnit.MILLIS)) - .duration(Duration.of(MAX_DURATION_IN_DAYS, ChronoUnit.DAYS)) - .generatorFunction( - sensor -> new ActivePowerRecord(sensor, System.currentTimeMillis(), value)) - .zooKeeper(new ZooKeeper(zooKeeperHost, zooKeeperPort)) - .kafkaRecordSender(kafkaRecordSender) - .build(); - - // start - workloadGenerator.start(); + public static void main(final String[] args) { + LOGGER.info("Start workload generator for use case UC4"); + theodolite.commons.workloadgeneration.LoadGenerator.fromEnvironment().run(); } } diff --git a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGenerator.java b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGenerator.java index ed5dd116c12ce76d800e904c8534282a0823239b..685f4e5b65c73888e8eecab149cddc4bb9aec2b8 100644 --- a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGenerator.java +++ b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGenerator.java @@ -43,6 +43,11 @@ public final class LoadGenerator { return this; } + public LoadGenerator withKeySpace(final KeySpace keySpace) { + this.loadDefinition = new WorkloadDefinition(keySpace, this.loadDefinition.getPeriod()); + return this; + } + public LoadGenerator withBeforeAction(final BeforeAction beforeAction) { this.generatorConfig.setBeforeAction(beforeAction); return this;