diff --git a/benchmarks/uc1-workload-generator/.settings/org.eclipse.jdt.ui.prefs b/benchmarks/uc1-workload-generator/.settings/org.eclipse.jdt.ui.prefs index 4e04e2891754324a6e1bf55348b6a38f592bb301..4d01df75552c562406705858b6368ecf59d6e82f 100644 --- a/benchmarks/uc1-workload-generator/.settings/org.eclipse.jdt.ui.prefs +++ b/benchmarks/uc1-workload-generator/.settings/org.eclipse.jdt.ui.prefs @@ -32,7 +32,7 @@ cleanup.qualify_static_member_accesses_with_declaring_class=true cleanup.qualify_static_method_accesses_with_declaring_class=false cleanup.remove_private_constructors=true cleanup.remove_redundant_modifiers=false -cleanup.remove_redundant_semicolons=false +cleanup.remove_redundant_semicolons=true cleanup.remove_redundant_type_arguments=true cleanup.remove_trailing_whitespaces=true cleanup.remove_trailing_whitespaces_all=true @@ -66,6 +66,7 @@ org.eclipse.jdt.ui.ignorelowercasenames=true org.eclipse.jdt.ui.importorder=; org.eclipse.jdt.ui.ondemandthreshold=99 org.eclipse.jdt.ui.staticondemandthreshold=99 +org.eclipse.jdt.ui.text.custom_code_templates= sp_cleanup.add_default_serial_version_id=true sp_cleanup.add_generated_serial_version_id=false sp_cleanup.add_missing_annotations=true diff --git a/benchmarks/uc1-workload-generator/src/main/java/theodolite/uc1/workloadgenerator/LoadGenerator.java b/benchmarks/uc1-workload-generator/src/main/java/theodolite/uc1/workloadgenerator/LoadGenerator.java index a7b27dfdb25760f0b96c930c9705c2eed0402442..26741eb33b2a8d1c23a40938d1261254ac37b636 100644 --- a/benchmarks/uc1-workload-generator/src/main/java/theodolite/uc1/workloadgenerator/LoadGenerator.java +++ b/benchmarks/uc1-workload-generator/src/main/java/theodolite/uc1/workloadgenerator/LoadGenerator.java @@ -1,94 +1,23 @@ package theodolite.uc1.workloadgenerator; import java.io.IOException; -import java.time.Duration; -import java.time.temporal.ChronoUnit; -import java.util.Objects; -import java.util.Properties; -import org.apache.kafka.clients.producer.ProducerConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import theodolite.commons.workloadgeneration.communication.kafka.KafkaRecordSender; -import theodolite.commons.workloadgeneration.dimensions.KeySpace; -import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGenerator; -import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGeneratorBuilder; -import theodolite.commons.workloadgeneration.misc.ZooKeeper; -import titan.ccp.model.records.ActivePowerRecord; /** - * Load Generator for UC1. + * Load Generator for Theodolite use case UC1. */ public final class LoadGenerator { private static final Logger LOGGER = LoggerFactory.getLogger(LoadGenerator.class); - private static final long MAX_DURATION_IN_DAYS = 30L; - private LoadGenerator() {} /** - * Entry point. + * Start load generator for use case UC1. */ public static void main(final String[] args) throws InterruptedException, IOException { - // uc1 LOGGER.info("Start workload generator for use case UC1."); - - // get environment variables - final String zooKeeperHost = Objects.requireNonNullElse(System.getenv("ZK_HOST"), "localhost"); - final int zooKeeperPort = - Integer.parseInt(Objects.requireNonNullElse(System.getenv("ZK_PORT"), "2181")); - final int numSensors = - Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_SENSORS"), "10")); - final int periodMs = - Integer.parseInt(Objects.requireNonNullElse(System.getenv("PERIOD_MS"), "1000")); - final double value = - Double.parseDouble(Objects.requireNonNullElse(System.getenv("VALUE"), "10")); - final int threads = Integer.parseInt(Objects.requireNonNullElse(System.getenv("THREADS"), - "4")); - final String kafkaBootstrapServers = - Objects.requireNonNullElse(System.getenv("KAFKA_BOOTSTRAP_SERVERS"), "localhost:9092"); - final String schemaRegistryUrl = - Objects.requireNonNullElse(System.getenv("SCHEMA_REGISTRY_URL"), "http://localhost:8091"); - final String kafkaInputTopic = - Objects.requireNonNullElse(System.getenv("KAFKA_INPUT_TOPIC"), "input"); - final String kafkaBatchSize = System.getenv("KAFKA_BATCH_SIZE"); - final String kafkaLingerMs = System.getenv("KAFKA_LINGER_MS"); - final String kafkaBufferMemory = System.getenv("KAFKA_BUFFER_MEMORY"); - final int instances = - Integer.parseInt(Objects.requireNonNullElse(System.getenv("INSTANCES"), "1")); - - // create kafka record sender - final Properties kafkaProperties = new Properties(); - // kafkaProperties.put("acks", this.acknowledges); - kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, (k, v) -> kafkaBatchSize); - kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG, (k, v) -> kafkaLingerMs); - kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG, (k, v) -> kafkaBufferMemory); - - final KafkaRecordSender<ActivePowerRecord> kafkaRecordSender = - new KafkaRecordSender.Builder<ActivePowerRecord>( - kafkaBootstrapServers, - kafkaInputTopic, - schemaRegistryUrl) - .keyAccessor(r -> r.getIdentifier()) - .timestampAccessor(r -> r.getTimestamp()) - .defaultProperties(kafkaProperties) - .build(); - - // create workload generator - final KafkaWorkloadGenerator<ActivePowerRecord> workloadGenerator = - KafkaWorkloadGeneratorBuilder.<ActivePowerRecord>builder() - .instances(instances) - .keySpace(new KeySpace("s_", numSensors)) - .threads(threads) - .period(Duration.of(periodMs, ChronoUnit.MILLIS)) - .duration(Duration.of(MAX_DURATION_IN_DAYS, ChronoUnit.DAYS)) - .generatorFunction( - sensor -> new ActivePowerRecord(sensor, System.currentTimeMillis(), value)) - .zooKeeper(new ZooKeeper(zooKeeperHost, zooKeeperPort)) - .kafkaRecordSender(kafkaRecordSender) - .build(); - - // start - workloadGenerator.start(); + theodolite.commons.workloadgeneration.LoadGenerator.fromEnvironment().run(); } } diff --git a/benchmarks/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/LoadGenerator.java b/benchmarks/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/LoadGenerator.java index 3eb3e8d25b1f1aa6f302673727b8457a744fb503..826387c484455fed4a7accb5dda56a66a4b63713 100644 --- a/benchmarks/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/LoadGenerator.java +++ b/benchmarks/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/LoadGenerator.java @@ -1,24 +1,14 @@ package theodolite.uc2.workloadgenerator; -import java.io.IOException; -import java.time.Duration; -import java.time.temporal.ChronoUnit; import java.util.Objects; -import java.util.Properties; -import org.apache.kafka.clients.producer.ProducerConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import theodolite.commons.workloadgeneration.communication.kafka.KafkaRecordSender; -import theodolite.commons.workloadgeneration.dimensions.KeySpace; -import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGenerator; -import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGeneratorBuilder; -import theodolite.commons.workloadgeneration.misc.ZooKeeper; +import theodolite.commons.workloadgeneration.KeySpace; import titan.ccp.configuration.events.Event; -import titan.ccp.model.records.ActivePowerRecord; import titan.ccp.model.sensorregistry.SensorRegistry; /** - * The {@code LoadGenerator} creates a load in Kafka. + * Load generator for Theodolite use case UC2. */ public final class LoadGenerator { @@ -26,114 +16,50 @@ public final class LoadGenerator { private static final Logger LOGGER = LoggerFactory.getLogger(LoadGenerator.class); - // Constants - private static final String DEEP = "deep"; - private static final long MAX_DURATION_IN_DAYS = 30L; - - // Make this a utility class, because all methods are static. - private LoadGenerator() { - throw new UnsupportedOperationException(); - } + private LoadGenerator() {} /** - * Main method. - * - * @param args CLI arguments - * @throws InterruptedException Interrupt happened - * @throws IOException happened. + * Start load generator. */ - public static void main(final String[] args) throws InterruptedException, IOException { - // uc2 - LOGGER.info("Start workload generator for use case UC2."); - - // get environment variables - final String hierarchy = System.getenv("HIERARCHY"); - if (hierarchy != null && hierarchy.equals(DEEP)) { - LOGGER.error( - "The HIERARCHY parameter is no longer supported. Creating a full hierachy instead."); - } - final int numNestedGroups = Integer - .parseInt(Objects.requireNonNullElse(System.getenv("NUM_NESTED_GROUPS"), "1")); - final String zooKeeperHost = Objects.requireNonNullElse(System.getenv("ZK_HOST"), "localhost"); - final int zooKeeperPort = - Integer.parseInt(Objects.requireNonNullElse(System.getenv("ZK_PORT"), "2181")); - final int numSensors = - Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_SENSORS"), "1")); - final int periodMs = - Integer.parseInt(Objects.requireNonNullElse(System.getenv("PERIOD_MS"), "1000")); - final double value = - Double.parseDouble(Objects.requireNonNullElse(System.getenv("VALUE"), "10")); - final boolean sendRegistry = Boolean - .parseBoolean(Objects.requireNonNullElse(System.getenv("SEND_REGISTRY"), "true")); - final int threads = Integer.parseInt(Objects.requireNonNullElse(System.getenv("THREADS"), "4")); - final String kafkaBootstrapServers = - Objects.requireNonNullElse(System.getenv("KAFKA_BOOTSTRAP_SERVERS"), - "localhost:9092"); - final String schemaRegistryUrl = - Objects.requireNonNullElse(System.getenv("SCHEMA_REGISTRY_URL"), "http://localhost:8091"); - final String kafkaInputTopic = - Objects.requireNonNullElse(System.getenv("KAFKA_INPUT_TOPIC"), "input"); - final String kafkaBatchSize = System.getenv("KAFKA_BATCH_SIZE"); - final String kafkaLingerMs = System.getenv("KAFKA_LINGER_MS"); - final String kafkaBufferMemory = System.getenv("KAFKA_BUFFER_MEMORY"); - final int instances = - Integer.parseInt(Objects.requireNonNullElse(System.getenv("INSTANCES"), "1")); + public static void main(final String[] args) { + final boolean sendRegistry = Boolean.parseBoolean(Objects.requireNonNullElse( + System.getenv("SEND_REGISTRY"), + "true")); + final String kafkaBootstrapServers = Objects.requireNonNullElse( + System.getenv("KAFKA_BOOTSTRAP_SERVERS"), + "localhost:9092"); + final int numSensors = Integer.parseInt(Objects.requireNonNullElse( + System.getenv("NUM_SENSORS"), + "1")); + final int numNestedGroups = Integer.parseInt(Objects.requireNonNullElse( + System.getenv("NUM_NESTED_GROUPS"), + "1")); - // build sensor registry + // Build sensor hierarchy final SensorRegistry sensorRegistry = new SensorRegistryBuilder(numNestedGroups, numSensors).build(); - // create kafka record sender - final Properties kafkaProperties = new Properties(); - // kafkaProperties.put("acks", this.acknowledges); - kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, (k, v) -> kafkaBatchSize); - kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG, (k, v) -> kafkaLingerMs); - kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG, (k, v) -> kafkaBufferMemory); - - final KafkaRecordSender<ActivePowerRecord> kafkaRecordSender = - new KafkaRecordSender.Builder<ActivePowerRecord>( - kafkaBootstrapServers, - kafkaInputTopic, - schemaRegistryUrl) - .keyAccessor(r -> r.getIdentifier()) - .timestampAccessor(r -> r.getTimestamp()) - .defaultProperties(kafkaProperties) - .build(); - - // create workload generator - final KafkaWorkloadGenerator<ActivePowerRecord> workloadGenerator = - KafkaWorkloadGeneratorBuilder.<ActivePowerRecord>builder() - .instances(instances) - .keySpace(new KeySpace("s_", sensorRegistry.getMachineSensors().size())) - .threads(threads) - .period(Duration.of(periodMs, ChronoUnit.MILLIS)) - .duration(Duration.of(MAX_DURATION_IN_DAYS, ChronoUnit.DAYS)) - .beforeAction(() -> { - if (sendRegistry) { - final ConfigPublisher configPublisher = - new ConfigPublisher(kafkaBootstrapServers, "configuration"); - configPublisher.publish(Event.SENSOR_REGISTRY_CHANGED, sensorRegistry.toJson()); - configPublisher.close(); - LOGGER.info("Configuration sent."); - - LOGGER.info("Now wait 30 seconds"); - try { - Thread.sleep(SLEEP_PERIOD); - } catch (final InterruptedException e) { - // TODO Auto-generated catch block - LOGGER.error(e.getMessage(), e); - } - LOGGER.info("And woke up again :)"); - } - }) - .generatorFunction( - sensor -> new ActivePowerRecord(sensor, System.currentTimeMillis(), value)) - .zooKeeper(new ZooKeeper(zooKeeperHost, zooKeeperPort)) - .kafkaRecordSender(kafkaRecordSender) - .build(); + LOGGER.info("Start workload generator for use case UC2"); + theodolite.commons.workloadgeneration.LoadGenerator.fromEnvironment() + .withKeySpace(new KeySpace("s_", sensorRegistry.getMachineSensors().size())) + .withBeforeAction(() -> { + if (sendRegistry) { + final ConfigPublisher configPublisher = + new ConfigPublisher(kafkaBootstrapServers, "configuration"); + configPublisher.publish(Event.SENSOR_REGISTRY_CHANGED, sensorRegistry.toJson()); + configPublisher.close(); + LOGGER.info("Configuration sent."); - // start - workloadGenerator.start(); + LOGGER.info("Now wait 30 seconds..."); + try { + Thread.sleep(SLEEP_PERIOD); + } catch (final InterruptedException e) { + LOGGER.error(e.getMessage(), e); + } + LOGGER.info("...and start generating load."); + } + }) + .run(); } } diff --git a/benchmarks/uc3-workload-generator/src/main/java/theodolite/uc3/workloadgenerator/LoadGenerator.java b/benchmarks/uc3-workload-generator/src/main/java/theodolite/uc3/workloadgenerator/LoadGenerator.java index 85f6a94036c53b48973ba2200212fc8e5dfd663d..662113fd1ae76e64d13933a01d18d9d08e950613 100644 --- a/benchmarks/uc3-workload-generator/src/main/java/theodolite/uc3/workloadgenerator/LoadGenerator.java +++ b/benchmarks/uc3-workload-generator/src/main/java/theodolite/uc3/workloadgenerator/LoadGenerator.java @@ -1,102 +1,19 @@ package theodolite.uc3.workloadgenerator; -import java.io.IOException; -import java.time.Duration; -import java.time.temporal.ChronoUnit; -import java.util.Objects; -import java.util.Properties; -import org.apache.kafka.clients.producer.ProducerConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import theodolite.commons.workloadgeneration.communication.kafka.KafkaRecordSender; -import theodolite.commons.workloadgeneration.dimensions.KeySpace; -import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGenerator; -import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGeneratorBuilder; -import theodolite.commons.workloadgeneration.misc.ZooKeeper; -import titan.ccp.model.records.ActivePowerRecord; /** - * The {@code LoadGenerator} creates a load in Kafka. + * Load generator for Theodolite use case UC3. */ public final class LoadGenerator { private static final Logger LOGGER = LoggerFactory.getLogger(LoadGenerator.class); - // constants - private static final long MAX_DURATION_IN_DAYS = 30L; - - // Make this a utility class, because all methods are static. - private LoadGenerator() { - throw new UnsupportedOperationException(); - } - - /** - * Main method. - * - * @param args CLI arguments - * @throws InterruptedException Interrupt happened - * @throws IOException happened. - */ - public static void main(final String[] args) throws InterruptedException, IOException { - // uc2 - LOGGER.info("Start workload generator for use case UC3."); - - // get environment variables - final String zooKeeperHost = Objects.requireNonNullElse(System.getenv("ZK_HOST"), "localhost"); - final int zooKeeperPort = - Integer.parseInt(Objects.requireNonNullElse(System.getenv("ZK_PORT"), "2181")); - final int numSensors = - Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_SENSORS"), "10")); - final int periodMs = - Integer.parseInt(Objects.requireNonNullElse(System.getenv("PERIOD_MS"), "1000")); - final double value = - Double.parseDouble(Objects.requireNonNullElse(System.getenv("VALUE"), "10")); - final int threads = Integer.parseInt(Objects.requireNonNullElse(System.getenv("THREADS"), "4")); - final String kafkaBootstrapServers = - Objects.requireNonNullElse(System.getenv("KAFKA_BOOTSTRAP_SERVERS"), - "localhost:9092"); - final String schemaRegistryUrl = - Objects.requireNonNullElse(System.getenv("SCHEMA_REGISTRY_URL"), "http://localhost:8091"); - final String kafkaInputTopic = - Objects.requireNonNullElse(System.getenv("KAFKA_INPUT_TOPIC"), "input"); - final String kafkaBatchSize = System.getenv("KAFKA_BATCH_SIZE"); - final String kafkaLingerMs = System.getenv("KAFKA_LINGER_MS"); - final String kafkaBufferMemory = System.getenv("KAFKA_BUFFER_MEMORY"); - final int instances = - Integer.parseInt(Objects.requireNonNullElse(System.getenv("INSTANCES"), "1")); - - // create kafka record sender - final Properties kafkaProperties = new Properties(); - // kafkaProperties.put("acks", this.acknowledges); - kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, (k, v) -> kafkaBatchSize); - kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG, (k, v) -> kafkaLingerMs); - kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG, (k, v) -> kafkaBufferMemory); - final KafkaRecordSender<ActivePowerRecord> kafkaRecordSender = - new KafkaRecordSender.Builder<ActivePowerRecord>( - kafkaBootstrapServers, - kafkaInputTopic, - schemaRegistryUrl) - .keyAccessor(r -> r.getIdentifier()) - .timestampAccessor(r -> r.getTimestamp()) - .defaultProperties(kafkaProperties) - .build(); - - // create workload generator - final KafkaWorkloadGenerator<ActivePowerRecord> workloadGenerator = - KafkaWorkloadGeneratorBuilder.<ActivePowerRecord>builder() - .instances(instances) - .keySpace(new KeySpace("s_", numSensors)) - .threads(threads) - .period(Duration.of(periodMs, ChronoUnit.MILLIS)) - .duration(Duration.of(MAX_DURATION_IN_DAYS, ChronoUnit.DAYS)) - .generatorFunction( - sensor -> new ActivePowerRecord(sensor, System.currentTimeMillis(), value)) - .zooKeeper(new ZooKeeper(zooKeeperHost, zooKeeperPort)) - .kafkaRecordSender(kafkaRecordSender) - .build(); - - // start - workloadGenerator.start(); + private LoadGenerator() {} + public static void main(final String[] args) { + LOGGER.info("Start workload generator for use case UC3"); + theodolite.commons.workloadgeneration.LoadGenerator.fromEnvironment().run(); } } diff --git a/benchmarks/uc4-workload-generator/src/main/java/theodolite/uc4/workloadgenerator/LoadGenerator.java b/benchmarks/uc4-workload-generator/src/main/java/theodolite/uc4/workloadgenerator/LoadGenerator.java index ff551e7ef423633137d122dfed7d6e03d362e7ff..c0d885ed1730e0d658a7d176d21d7c57529c55b0 100644 --- a/benchmarks/uc4-workload-generator/src/main/java/theodolite/uc4/workloadgenerator/LoadGenerator.java +++ b/benchmarks/uc4-workload-generator/src/main/java/theodolite/uc4/workloadgenerator/LoadGenerator.java @@ -1,103 +1,22 @@ package theodolite.uc4.workloadgenerator; -import java.io.IOException; -import java.time.Duration; -import java.time.temporal.ChronoUnit; -import java.util.Objects; -import java.util.Properties; -import org.apache.kafka.clients.producer.ProducerConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import theodolite.commons.workloadgeneration.communication.kafka.KafkaRecordSender; -import theodolite.commons.workloadgeneration.dimensions.KeySpace; -import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGenerator; -import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGeneratorBuilder; -import theodolite.commons.workloadgeneration.misc.ZooKeeper; -import titan.ccp.model.records.ActivePowerRecord; /** - * The {@code LoadGenerator} creates a load in Kafka. + * Load generator for Theodolite use case UC4. */ public final class LoadGenerator { private static final Logger LOGGER = LoggerFactory.getLogger(LoadGenerator.class); - // constants - private static final long MAX_DURATION_IN_DAYS = 30L; - - // Make this a utility class, because all methods are static. private LoadGenerator() { throw new UnsupportedOperationException(); } - /** - * Main method. - * - * @param args CLI arguments - * @throws InterruptedException Interrupt happened - * @throws IOException happened. - */ - public static void main(final String[] args) throws InterruptedException, IOException { - // uc4 - LOGGER.info("Start workload generator for use case UC4."); - - // get environment variables - final String zooKeeperHost = Objects.requireNonNullElse(System.getenv("ZK_HOST"), "localhost"); - final int zooKeeperPort = - Integer.parseInt(Objects.requireNonNullElse(System.getenv("ZK_PORT"), "2181")); - final int numSensors = - Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_SENSORS"), "10")); - final int periodMs = - Integer.parseInt(Objects.requireNonNullElse(System.getenv("PERIOD_MS"), "1000")); - final double value = - Double.parseDouble(Objects.requireNonNullElse(System.getenv("VALUE"), "10")); - final int threads = Integer.parseInt(Objects.requireNonNullElse(System.getenv("THREADS"), "1")); - final String kafkaBootstrapServers = - Objects.requireNonNullElse(System.getenv("KAFKA_BOOTSTRAP_SERVERS"), - "localhost:9092"); - final String schemaRegistryUrl = - Objects.requireNonNullElse(System.getenv("SCHEMA_REGISTRY_URL"), "http://localhost:8091"); - final String kafkaInputTopic = - Objects.requireNonNullElse(System.getenv("KAFKA_INPUT_TOPIC"), "input"); - final String kafkaBatchSize = System.getenv("KAFKA_BATCH_SIZE"); - final String kafkaLingerMs = System.getenv("KAFKA_LINGER_MS"); - final String kafkaBufferMemory = System.getenv("KAFKA_BUFFER_MEMORY"); - final int instances = - Integer.parseInt(Objects.requireNonNullElse(System.getenv("INSTANCES"), "1")); - - // create kafka record sender - final Properties kafkaProperties = new Properties(); - // kafkaProperties.put("acks", this.acknowledges); - kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, (k, v) -> kafkaBatchSize); - kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG, (k, v) -> kafkaLingerMs); - kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG, (k, v) -> kafkaBufferMemory); - - final KafkaRecordSender<ActivePowerRecord> kafkaRecordSender = - new KafkaRecordSender.Builder<ActivePowerRecord>( - kafkaBootstrapServers, - kafkaInputTopic, - schemaRegistryUrl) - .keyAccessor(r -> r.getIdentifier()) - .timestampAccessor(r -> r.getTimestamp()) - .defaultProperties(kafkaProperties) - .build(); - - // create workload generator - final KafkaWorkloadGenerator<ActivePowerRecord> workloadGenerator = - KafkaWorkloadGeneratorBuilder.<ActivePowerRecord>builder() - .instances(instances) - .keySpace(new KeySpace("s_", numSensors)) - .threads(threads) - .period(Duration.of(periodMs, ChronoUnit.MILLIS)) - .duration(Duration.of(MAX_DURATION_IN_DAYS, ChronoUnit.DAYS)) - .generatorFunction( - sensor -> new ActivePowerRecord(sensor, System.currentTimeMillis(), value)) - .zooKeeper(new ZooKeeper(zooKeeperHost, zooKeeperPort)) - .kafkaRecordSender(kafkaRecordSender) - .build(); - - // start - workloadGenerator.start(); + public static void main(final String[] args) { + LOGGER.info("Start workload generator for use case UC4"); + theodolite.commons.workloadgeneration.LoadGenerator.fromEnvironment().run(); } } diff --git a/benchmarks/workload-generator-commons/build.gradle b/benchmarks/workload-generator-commons/build.gradle index eef987cd444c3b6c3d8a532c8d192e94311176db..98d820b480ba0b357b74f82ebce5a647ee392461 100644 --- a/benchmarks/workload-generator-commons/build.gradle +++ b/benchmarks/workload-generator-commons/build.gradle @@ -1,3 +1,5 @@ dependencies { - implementation 'org.apache.curator:curator-recipes:4.3.0' + implementation 'com.google.guava:guava:30.1-jre' + implementation 'com.hazelcast:hazelcast:4.1.1' + implementation 'com.hazelcast:hazelcast-kubernetes:2.2.1' } \ No newline at end of file diff --git a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/functions/BeforeAction.java b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/BeforeAction.java similarity index 57% rename from benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/functions/BeforeAction.java rename to benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/BeforeAction.java index 7914a4985b6df40f7146c1fd681d1fba063f8b98..56af95d70f762095a6fe090457b7d4b473a43b1a 100644 --- a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/functions/BeforeAction.java +++ b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/BeforeAction.java @@ -1,4 +1,4 @@ -package theodolite.commons.workloadgeneration.functions; +package theodolite.commons.workloadgeneration; /** * Describes the before action which is executed before every sub experiment. @@ -8,4 +8,9 @@ public interface BeforeAction { public void run(); + public static BeforeAction doNothing() { + return () -> { + }; + } + } diff --git a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/ClusterConfig.java b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/ClusterConfig.java new file mode 100644 index 0000000000000000000000000000000000000000..9d84dc67461f98fabdee4c8e0784ad7394d7f108 --- /dev/null +++ b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/ClusterConfig.java @@ -0,0 +1,76 @@ +package theodolite.commons.workloadgeneration; + +/** + * Configuration of a load generator cluster. + */ +public final class ClusterConfig { + + private static final int PORT_DEFAULT = 5701; + private static final String CLUSTER_NAME_PREFIX_DEFAULT = "theodolite-load-generation"; + + private final String bootstrapServer; + private final String kubernetesDnsName; + private int port = PORT_DEFAULT; + private boolean portAutoIncrement = true; + private String clusterNamePrefix = CLUSTER_NAME_PREFIX_DEFAULT; + + /** + * Create a new {@link ClusterConfig} with the given parameter values. + */ + private ClusterConfig(final String bootstrapServer, final String kubernetesDnsName) { + this.bootstrapServer = bootstrapServer; + this.kubernetesDnsName = kubernetesDnsName; + } + + public boolean hasBootstrapServer() { + return this.bootstrapServer != null; + } + + public String getBootstrapServer() { + return this.bootstrapServer; + } + + public boolean hasKubernetesDnsName() { + return this.kubernetesDnsName != null; + } + + public String getKubernetesDnsName() { + return this.kubernetesDnsName; + } + + public int getPort() { + return this.port; + } + + public boolean isPortAutoIncrement() { + return this.portAutoIncrement; + } + + public ClusterConfig setPortAutoIncrement(final boolean portAutoIncrement) { // NOPMD + this.portAutoIncrement = portAutoIncrement; + return this; + } + + public ClusterConfig setPort(final int port) { // NOPMD + this.port = port; + return this; + } + + public String getClusterNamePrefix() { + return this.clusterNamePrefix; + } + + public ClusterConfig setClusterNamePrefix(final String clusterNamePrefix) { // NOPMD + this.clusterNamePrefix = clusterNamePrefix; + return this; + } + + public static ClusterConfig fromBootstrapServer(final String bootstrapServer) { + return new ClusterConfig(bootstrapServer, null); + } + + public static ClusterConfig fromKubernetesDnsName(final String kubernetesDnsName) { + return new ClusterConfig(null, kubernetesDnsName); + } + +} diff --git a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/ConfigurationKeys.java b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/ConfigurationKeys.java new file mode 100644 index 0000000000000000000000000000000000000000..45ac1d5bb9c21a1b6303de2f248d08b69c02fc28 --- /dev/null +++ b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/ConfigurationKeys.java @@ -0,0 +1,40 @@ +package theodolite.commons.workloadgeneration; + +/** + * Keys to access configuration parameters. + */ +public final class ConfigurationKeys { + + public static final String BOOTSTRAP_SERVER = "BOOTSTRAP_SERVER"; + + public static final String KUBERNETES_DNS_NAME = "KUBERNETES_DNS_NAME"; + + public static final String PORT = "PORT"; + + public static final String PORT_AUTO_INCREMENT = "PORT_AUTO_INCREMENT"; + + public static final String CLUSTER_NAME_PREFIX = "CLUSTER_NAME_PREFIX"; + + public static final String NUM_SENSORS = "NUM_SENSORS"; + + public static final String PERIOD_MS = "PERIOD_MS"; + + public static final String VALUE = "VALUE"; + + public static final String THREADS = "THREADS"; + + public static final String KAFKA_BOOTSTRAP_SERVERS = "KAFKA_BOOTSTRAP_SERVERS"; + + public static final String SCHEMA_REGISTRY_URL = "SCHEMA_REGISTRY_URL"; + + public static final String KAFKA_INPUT_TOPIC = "KAFKA_INPUT_TOPIC"; + + public static final String KAFKA_BATCH_SIZE = "KAFKA_BATCH_SIZE"; + + public static final String KAFKA_LINGER_MS = "KAFKA_LINGER_MS"; + + public static final String KAFKA_BUFFER_MEMORY = "KAFKA_BUFFER_MEMORY"; + + private ConfigurationKeys() {} + +} diff --git a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/HazelcastRunner.java b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/HazelcastRunner.java new file mode 100644 index 0000000000000000000000000000000000000000..c010492950c5caace9ff85baefee1af4e46d25bb --- /dev/null +++ b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/HazelcastRunner.java @@ -0,0 +1,107 @@ +package theodolite.commons.workloadgeneration; + +import com.hazelcast.cluster.Member; +import com.hazelcast.cluster.MembershipEvent; +import com.hazelcast.cluster.MembershipListener; +import com.hazelcast.config.Config; +import com.hazelcast.config.JoinConfig; +import com.hazelcast.core.Hazelcast; +import com.hazelcast.core.HazelcastInstance; +import java.util.Set; +import java.util.concurrent.CompletableFuture; + +/** + * A Theodolite load generator runner that establishes a cluster using Hazelcast. + */ +public class HazelcastRunner { + + private static final String HZ_KUBERNETES_SERVICE_DNS_KEY = "service-dns"; + private final HazelcastInstance hzInstance; + private volatile HazelcastRunnerStateInstance runnerState; + private final CompletableFuture<Void> stopAction = new CompletableFuture<>(); + private final LoadGeneratorConfig loadConfig; + private final WorkloadDefinition totalLoadDefinition; + + /** + * Create a new {@link HazelcastRunner} from the given configuration. + */ + public HazelcastRunner( + final ClusterConfig clusterConfig, + final LoadGeneratorConfig loadConfig, + final WorkloadDefinition totalLoadDefinition) { + this.loadConfig = loadConfig; + this.totalLoadDefinition = totalLoadDefinition; + this.hzInstance = buildhazelcastInstance(clusterConfig, totalLoadDefinition.toString()); + this.hzInstance.getCluster().addMembershipListener(new RunnerMembershipListener()); + } + + /** + * Start the workload generation and blocks until the workload generation is stopped again. + */ + public void runBlocking() { + while (!this.stopAction.isDone()) { + synchronized (this) { + final Set<Member> members = this.hzInstance.getCluster().getMembers(); + this.runnerState = new HazelcastRunnerStateInstance( + this.loadConfig, + this.totalLoadDefinition, + this.hzInstance, members); + } + this.runnerState.runBlocking(); + } + } + + public void restart() { + this.stopRunnerState(); + } + + public void stop() { + this.stopAction.complete(null); + this.stopRunnerState(); + } + + private void stopRunnerState() { + synchronized (this) { + if (this.runnerState != null) { + this.runnerState.stopAsync(); + } + } + } + + private class RunnerMembershipListener implements MembershipListener { + + @Override + public void memberAdded(final MembershipEvent membershipEvent) { + HazelcastRunner.this.restart(); + } + + @Override + public void memberRemoved(final MembershipEvent membershipEvent) { + HazelcastRunner.this.restart(); + } + + } + + private static HazelcastInstance buildhazelcastInstance( + final ClusterConfig cluster, + final String clusterName) { + final Config config = new Config() + .setClusterName(cluster.getClusterNamePrefix() + '_' + clusterName); + + final JoinConfig joinConfig = config.getNetworkConfig() + .setPort(cluster.getPort()) + .setPortAutoIncrement(cluster.isPortAutoIncrement()) + .getJoin(); + joinConfig.getMulticastConfig().setEnabled(false); + if (cluster.hasBootstrapServer()) { + joinConfig.getTcpIpConfig().addMember(cluster.getBootstrapServer()); + } else if (cluster.hasKubernetesDnsName()) { + joinConfig.getKubernetesConfig() + .setEnabled(true) + .setProperty(HZ_KUBERNETES_SERVICE_DNS_KEY, cluster.getKubernetesDnsName()); + } + + return Hazelcast.newHazelcastInstance(config); + } + +} diff --git a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/HazelcastRunnerStateInstance.java b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/HazelcastRunnerStateInstance.java new file mode 100644 index 0000000000000000000000000000000000000000..d8fd7de421b88749a2077f81329213ff754e1608 --- /dev/null +++ b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/HazelcastRunnerStateInstance.java @@ -0,0 +1,196 @@ +package theodolite.commons.workloadgeneration; + +import com.google.common.collect.Streams; +import com.hazelcast.cluster.Member; +import com.hazelcast.core.HazelcastInstance; +import com.hazelcast.cp.IAtomicReference; +import com.hazelcast.cp.lock.FencedLock; +import java.time.Duration; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An instance of a Hazelcast runner state, that is a load generator cluster with a given set of + * members. + */ +public class HazelcastRunnerStateInstance { + + private static final Logger LOGGER = LoggerFactory.getLogger(HazelcastRunnerStateInstance.class); + + private static final Duration BEFORE_ACTION_WAIT_DURATION = Duration.ofMillis(500); + private static final Duration TASK_ASSIGNMENT_WAIT_DURATION = Duration.ofMillis(500); + + private final CompletableFuture<Void> stopAction = new CompletableFuture<>(); + private LoadGeneratorExecution loadGeneratorExecution; + + private final LoadGeneratorConfig loadGeneratorConfig; + private final WorkloadDefinition totalLoadDefinition; + private final HazelcastInstance hzInstance; + private final Set<Member> members; + + /** + * Create a new {@link HazelcastRunnerStateInstance}. + */ + public HazelcastRunnerStateInstance( + final LoadGeneratorConfig loadGeneratorConfig, + final WorkloadDefinition totalLoadDefinition, + final HazelcastInstance hzInstance, + final Set<Member> members) { + this.hzInstance = hzInstance; + this.members = members; + this.loadGeneratorConfig = loadGeneratorConfig; + this.totalLoadDefinition = totalLoadDefinition; + + LOGGER.info("Created new Hazelcast runner instance for member set '{}'", this.members); + } + + /** + * Start and block load generation for the configured member set. + */ + public void runBlocking() { + if (!this.stopAction.isDone()) { + this.tryPerformBeforeAction(); + this.tryCreateTaskAssignment(); + this.startLoadGeneration(); + } + this.stopAction.join(); + this.stopLoadGeneration(); + } + + public void stopAsync() { + this.stopAction.complete(null); + } + + private void tryPerformBeforeAction() { + final FencedLock lock = this.getBeforeActionPerformerLock(); + final IAtomicReference<Boolean> isActionPerformed = this.getIsBeforeActionPerformed(); // NOPMD + isActionPerformed.alter(p -> p != null && p); // p -> p == null ? false : p + boolean triedPerformingBeforeAction = false; + while (!isActionPerformed.get()) { + // Try performing the before action + triedPerformingBeforeAction = true; + if (lock.tryLock()) { + try { + if (!isActionPerformed.get()) { + LOGGER.info("This instance is elected to perform the before action."); + this.loadGeneratorConfig.getBeforeAction().run(); + LOGGER.info("Before action performed."); + isActionPerformed.set(true); + } + } finally { + lock.unlock(); + } + } else { + LOGGER.info("Wait for before action to be performed."); + delay(BEFORE_ACTION_WAIT_DURATION); + } + } + if (!triedPerformingBeforeAction) { + LOGGER.info("Before action has already been performed."); + } + } + + + + private void tryCreateTaskAssignment() { + final Map<UUID, WorkloadDefinition> taskAssignment = this.getTaskAssignment(); + final FencedLock lock = this.getTaskAssignmentLock(); + + boolean triedCreatingTaskAssignment = false; + while (taskAssignment.size() != this.members.size()) { + // Try creating task assignment + triedCreatingTaskAssignment = true; + if (lock.tryLock()) { + try { + if (taskAssignment.size() != this.members.size()) { + LOGGER.info("This instance is elected to create the task assignment."); + + final Set<WorkloadDefinition> subLoadDefinitions = + this.totalLoadDefinition.divide(this.members.size()); + Streams + .zip( + subLoadDefinitions.stream(), + this.members.stream(), + (loadDef, member) -> new LoadDefPerMember(loadDef, member)) + .forEach(l -> taskAssignment.put(l.member.getUuid(), l.loadDefinition)); + + LOGGER.info("Task assignment created."); + } + } finally { + lock.unlock(); + } + } else { + LOGGER.info("Wait for task assignment to be available."); + delay(TASK_ASSIGNMENT_WAIT_DURATION); + } + } + if (!triedCreatingTaskAssignment) { + LOGGER.info("Task assignment is already available."); + } + } + + private void startLoadGeneration() { + if (this.loadGeneratorExecution != null) { + throw new IllegalStateException("Load generation has already started before."); + } + LOGGER.info("Start running load generation and pick assigned task."); + + final Member member = this.hzInstance.getCluster().getLocalMember(); + final WorkloadDefinition workload = this.getTaskAssignment().get(member.getUuid()); + + LOGGER.info("Run load generation for assigned task: {}", workload); + this.loadGeneratorExecution = this.loadGeneratorConfig.buildLoadGeneratorExecution(workload); + this.loadGeneratorExecution.start(); + } + + private void stopLoadGeneration() { + this.loadGeneratorExecution.stop(); + } + + private IAtomicReference<Boolean> getIsBeforeActionPerformed() { + return this.hzInstance.getCPSubsystem().getAtomicReference("isBeforeActionPerformed"); + } + + private FencedLock getBeforeActionPerformerLock() { + return this.hzInstance.getCPSubsystem().getLock("beforeActionPerformer"); + } + + private Map<UUID, WorkloadDefinition> getTaskAssignment() { + return this.hzInstance.getReplicatedMap(this.getTaskAssignmentName()); + } + + private FencedLock getTaskAssignmentLock() { + return this.hzInstance.getCPSubsystem().getLock(this.getTaskAssignmentName() + "_assigner"); + } + + private String getTaskAssignmentName() { + return this.members.stream() + .map(m -> m.getUuid().toString()) + .collect(Collectors.joining("/")); + } + + private static void delay(final Duration duration) { + try { + TimeUnit.MILLISECONDS.sleep(duration.toMillis()); + } catch (final InterruptedException e) { + throw new IllegalStateException(e); + } + } + + private static final class LoadDefPerMember { + public final WorkloadDefinition loadDefinition; // NOCS used only internally + public final Member member; // NOCS used only internally + + public LoadDefPerMember(final WorkloadDefinition loadDefinition, final Member member) { + this.loadDefinition = loadDefinition; + this.member = member; + } + } + +} diff --git a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/communication/kafka/KafkaRecordSender.java b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/KafkaRecordSender.java similarity index 88% rename from benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/communication/kafka/KafkaRecordSender.java rename to benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/KafkaRecordSender.java index 33818b51084ce33a564d6f30cefb26b481d0a859..dd17234bf1adb1f0fcf3ff3ab134a0743b917369 100644 --- a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/communication/kafka/KafkaRecordSender.java +++ b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/KafkaRecordSender.java @@ -1,4 +1,4 @@ -package theodolite.commons.workloadgeneration.communication.kafka; +package theodolite.commons.workloadgeneration; import java.util.Properties; import java.util.function.Function; @@ -9,7 +9,6 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import theodolite.commons.workloadgeneration.functions.Transport; import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; /** @@ -17,7 +16,7 @@ import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; * * @param <T> {@link IMonitoringRecord} to send */ -public class KafkaRecordSender<T extends SpecificRecord> implements Transport<T> { +public class KafkaRecordSender<T extends SpecificRecord> implements RecordSender<T> { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaRecordSender.class); @@ -47,10 +46,19 @@ public class KafkaRecordSender<T extends SpecificRecord> implements Transport<T> final SchemaRegistryAvroSerdeFactory avroSerdeFactory = new SchemaRegistryAvroSerdeFactory(builder.schemaRegistryUrl); - this.producer = new KafkaProducer<>(properties, new StringSerializer(), + this.producer = new KafkaProducer<>( + properties, + new StringSerializer(), avroSerdeFactory.<T>forKeys().serializer()); } + public static <T extends SpecificRecord> Builder<T> builder( + final String bootstrapServers, + final String topic, + final String schemaRegistryUrl) { + return new Builder<>(bootstrapServers, topic, schemaRegistryUrl); + } + /** * Builder class to build a new {@link KafkaRecordSender}. * @@ -72,7 +80,7 @@ public class KafkaRecordSender<T extends SpecificRecord> implements Transport<T> * @param topic The topic where to write. * @param schemaRegistryUrl URL to the schema registry for avro. */ - public Builder(final String bootstrapServers, final String topic, + private Builder(final String bootstrapServers, final String topic, final String schemaRegistryUrl) { this.bootstrapServers = bootstrapServers; this.topic = topic; @@ -116,7 +124,7 @@ public class KafkaRecordSender<T extends SpecificRecord> implements Transport<T> } @Override - public void transport(final T message) { + public void send(final T message) { this.write(message); } diff --git a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/KeySpace.java b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/KeySpace.java new file mode 100644 index 0000000000000000000000000000000000000000..51255d774427a9e00de0d4c921b884022585edab --- /dev/null +++ b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/KeySpace.java @@ -0,0 +1,72 @@ +package theodolite.commons.workloadgeneration; + +import java.io.Serializable; +import java.util.Collection; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * A set of keys, where each key consists of a prefix and a number. + */ +public class KeySpace implements Serializable { + + private static final long serialVersionUID = 7343135392720315515L; // NOPMD + + private final String prefix; + private final int min; + private final int max; + + /** + * Create a new key space. All keys will have the prefix {@code prefix}. The remaining part of + * each key will be determined by a number of the interval ({@code min}, {@code max}). + * + * @param prefix the prefix to use for all keys + * @param min the lower bound (inclusive) to start counting from + * @param max the upper bound (inclusive) to count to + */ + public KeySpace(final String prefix, final int min, final int max) { + this.prefix = prefix; + this.min = min; + this.max = max; + } + + public KeySpace(final String prefix, final int numberOfKeys) { + this(prefix, 0, numberOfKeys - 1); + } + + public String getPrefix() { + return this.prefix; + } + + + public int getMin() { + return this.min; + } + + + public int getMax() { + return this.max; + } + + /** + * Get the amount of keys in this {@link KeySpace}. + */ + public int getCount() { + return this.getMax() - this.getMin() + 1; + } + + /** + * Get all keys in this {@link KeySpace}. + */ + public Collection<String> getKeys() { + return IntStream.rangeClosed(this.min, this.max) + .mapToObj(id -> this.prefix + id) + .collect(Collectors.toUnmodifiableList()); + } + + @Override + public String toString() { + return this.prefix + '[' + this.min + '-' + this.max + ']'; + } + +} diff --git a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGenerator.java b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGenerator.java new file mode 100644 index 0000000000000000000000000000000000000000..a9a1ce65ac32e3508299c99a38ecd21e4c9461cf --- /dev/null +++ b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGenerator.java @@ -0,0 +1,183 @@ +package theodolite.commons.workloadgeneration; + +import java.time.Duration; +import java.util.Objects; +import java.util.Properties; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A Theodolite load generator. + */ +public final class LoadGenerator { + + private static final Logger LOGGER = LoggerFactory.getLogger(LoadGenerator.class); + + private static final String BOOTSTRAP_SERVER_DEFAULT = "localhost:5701"; + private static final String SENSOR_PREFIX_DEFAULT = "s_"; + private static final int NUMBER_OF_KEYS_DEFAULT = 10; + private static final int PERIOD_MS_DEFAULT = 1000; + private static final int VALUE_DEFAULT = 10; + private static final int THREADS_DEFAULT = 4; + private static final String SCHEMA_REGISTRY_URL_DEFAULT = "http://localhost:8081"; + private static final String KAFKA_TOPIC_DEFAULT = "input"; + private static final String KAFKA_BOOTSTRAP_SERVERS_DEFAULT = "localhost:19092"; // NOPMD + + private ClusterConfig clusterConfig; + private WorkloadDefinition loadDefinition; + private LoadGeneratorConfig generatorConfig; + private boolean isStarted; + + private LoadGenerator() {} + + // Add constructor for creating from environment variables + + public LoadGenerator setClusterConfig(final ClusterConfig clusterConfig) { // NOPMD + this.clusterConfig = clusterConfig; + return this; + } + + public LoadGenerator setLoadDefinition(final WorkloadDefinition loadDefinition) { // NOPMD + this.loadDefinition = loadDefinition; + return this; + } + + public LoadGenerator setGeneratorConfig(final LoadGeneratorConfig generatorConfig) { // NOPMD + this.generatorConfig = generatorConfig; + return this; + } + + public LoadGenerator withKeySpace(final KeySpace keySpace) { + this.loadDefinition = new WorkloadDefinition(keySpace, this.loadDefinition.getPeriod()); + return this; + } + + public LoadGenerator withBeforeAction(final BeforeAction beforeAction) { + this.generatorConfig.setBeforeAction(beforeAction); + return this; + } + + public LoadGenerator withThreads(final int threads) { + this.generatorConfig.setThreads(threads); + return this; + } + + /** + * Run the constructed load generator until cancellation. + */ + public void run() { + Objects.requireNonNull(this.clusterConfig, "No cluster config set."); + Objects.requireNonNull(this.generatorConfig, "No generator config set."); + Objects.requireNonNull(this.loadDefinition, "No load definition set."); + if (this.isStarted) { + throw new IllegalStateException("Load generator can only be started once."); + } + this.isStarted = true; + final HazelcastRunner runner = new HazelcastRunner( + this.clusterConfig, + this.generatorConfig, + this.loadDefinition); + runner.runBlocking(); + } + + /** + * Create a basic {@link LoadGenerator} from its default values. + */ + public static LoadGenerator fromDefaults() { + return new LoadGenerator() + .setClusterConfig(ClusterConfig.fromBootstrapServer(BOOTSTRAP_SERVER_DEFAULT)) + .setLoadDefinition(new WorkloadDefinition( + new KeySpace(SENSOR_PREFIX_DEFAULT, NUMBER_OF_KEYS_DEFAULT), + Duration.ofMillis(PERIOD_MS_DEFAULT))) + .setGeneratorConfig(new LoadGeneratorConfig( + TitanMessageGeneratorFactory + .withKafkaConfig( + KAFKA_BOOTSTRAP_SERVERS_DEFAULT, + KAFKA_TOPIC_DEFAULT, + SCHEMA_REGISTRY_URL_DEFAULT) + .forConstantValue(VALUE_DEFAULT))); + } + + /** + * Create a basic {@link LoadGenerator} from environment variables. + */ + public static LoadGenerator fromEnvironment() { + final String bootstrapServer = System.getenv(ConfigurationKeys.BOOTSTRAP_SERVER); + final String kubernetesDnsName = System.getenv(ConfigurationKeys.KUBERNETES_DNS_NAME); + + ClusterConfig clusterConfig; + if (bootstrapServer != null) { // NOPMD + clusterConfig = ClusterConfig.fromBootstrapServer(bootstrapServer); + LOGGER.info("Use bootstrap server '{}'.", bootstrapServer); + } else if (kubernetesDnsName != null) { // NOPMD + clusterConfig = ClusterConfig.fromKubernetesDnsName(kubernetesDnsName); + LOGGER.info("Use Kubernetes DNS name '{}'.", kubernetesDnsName); + } else { + clusterConfig = ClusterConfig.fromBootstrapServer(BOOTSTRAP_SERVER_DEFAULT); + LOGGER.info( + "Neither a bootstrap server nor a Kubernetes DNS name was provided. Use default bootstrap server '{}'.", // NOCS + BOOTSTRAP_SERVER_DEFAULT); + } + + final String port = System.getenv(ConfigurationKeys.PORT); + if (port != null) { + clusterConfig.setPort(Integer.parseInt(port)); + } + + final String portAutoIncrement = System.getenv(ConfigurationKeys.PORT_AUTO_INCREMENT); + if (portAutoIncrement != null) { + clusterConfig.setPortAutoIncrement(Boolean.parseBoolean(portAutoIncrement)); + } + + final String clusterNamePrefix = System.getenv(ConfigurationKeys.CLUSTER_NAME_PREFIX); + if (clusterNamePrefix != null) { + clusterConfig.setClusterNamePrefix(portAutoIncrement); + } + + final int numSensors = Integer.parseInt(Objects.requireNonNullElse( + System.getenv(ConfigurationKeys.NUM_SENSORS), + Integer.toString(NUMBER_OF_KEYS_DEFAULT))); + final int periodMs = Integer.parseInt(Objects.requireNonNullElse( + System.getenv(ConfigurationKeys.PERIOD_MS), + Integer.toString(PERIOD_MS_DEFAULT))); + final double value = Double.parseDouble(Objects.requireNonNullElse( + System.getenv(ConfigurationKeys.VALUE), + Integer.toString(VALUE_DEFAULT))); + final int threads = Integer.parseInt(Objects.requireNonNullElse( + System.getenv(ConfigurationKeys.THREADS), + Integer.toString(THREADS_DEFAULT))); + final String kafkaBootstrapServers = Objects.requireNonNullElse( + System.getenv(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS), + KAFKA_BOOTSTRAP_SERVERS_DEFAULT); + final String kafkaInputTopic = Objects.requireNonNullElse( + System.getenv(ConfigurationKeys.KAFKA_INPUT_TOPIC), + KAFKA_TOPIC_DEFAULT); + final String schemaRegistryUrl = Objects.requireNonNullElse( + System.getenv(ConfigurationKeys.SCHEMA_REGISTRY_URL), + SCHEMA_REGISTRY_URL_DEFAULT); + final Properties kafkaProperties = new Properties(); + kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, + (k, v) -> System.getenv(ConfigurationKeys.KAFKA_BATCH_SIZE)); + kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG, + (k, v) -> System.getenv(ConfigurationKeys.KAFKA_LINGER_MS)); + kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG, + (k, v) -> System.getenv(ConfigurationKeys.KAFKA_BUFFER_MEMORY)); + + return new LoadGenerator() + .setClusterConfig(clusterConfig) + .setLoadDefinition(new WorkloadDefinition( + new KeySpace(SENSOR_PREFIX_DEFAULT, numSensors), + Duration.ofMillis(periodMs))) + .setGeneratorConfig(new LoadGeneratorConfig( + TitanMessageGeneratorFactory + .withKafkaConfig( + kafkaBootstrapServers, + kafkaInputTopic, + schemaRegistryUrl, + kafkaProperties) + .forConstantValue(value))) + .withThreads(threads); + } + +} diff --git a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGeneratorConfig.java b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGeneratorConfig.java new file mode 100644 index 0000000000000000000000000000000000000000..2e907d8e90172288099bc6a1776777c37ae90fff --- /dev/null +++ b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGeneratorConfig.java @@ -0,0 +1,42 @@ +package theodolite.commons.workloadgeneration; + +/** + * Configuration of a load generator. + */ +public class LoadGeneratorConfig { + + private final MessageGenerator messageGenerator; + private BeforeAction beforeAction = BeforeAction.doNothing(); + private int threads = 1; + + public LoadGeneratorConfig(final MessageGenerator messageGenerator) { + this.messageGenerator = messageGenerator; + } + + public LoadGeneratorConfig( + final MessageGenerator messageGenerator, + final int threads) { + this.messageGenerator = messageGenerator; + this.threads = threads; + } + + public LoadGeneratorExecution buildLoadGeneratorExecution( + final WorkloadDefinition workloadDefinition) { + return new LoadGeneratorExecution(workloadDefinition, this.messageGenerator, this.threads); + } + + public BeforeAction getBeforeAction() { + return this.beforeAction; + } + + public void setThreads(final int threads) { + this.threads = threads; + } + + public void setBeforeAction(final BeforeAction beforeAction) { + this.beforeAction = beforeAction; + } + + + +} diff --git a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGeneratorExecution.java b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGeneratorExecution.java new file mode 100644 index 0000000000000000000000000000000000000000..3934c3d3499215b37ce96391ff5ae1d5cc135f84 --- /dev/null +++ b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGeneratorExecution.java @@ -0,0 +1,56 @@ +package theodolite.commons.workloadgeneration; + +import java.util.Random; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link LoadGeneratorExecution} represents the execution of load generator, i.e., it can be + * started and stopped. + */ +public class LoadGeneratorExecution { + + private static final Logger LOGGER = LoggerFactory.getLogger(LoadGeneratorExecution.class); + + private final Random random = new Random(); + private final WorkloadDefinition workloadDefinition; + private final MessageGenerator messageGenerator; + private final ScheduledExecutorService executor; + + /** + * Create a new {@link LoadGeneratorExecution} for a given {@link WorkloadDefinition} and a + * {@link MessageGenerator}. Load is generated by the given number of threads. + */ + public LoadGeneratorExecution( + final WorkloadDefinition workloadDefinition, + final MessageGenerator messageGenerator, + final int threads) { + this.workloadDefinition = workloadDefinition; + this.messageGenerator = messageGenerator; + this.executor = Executors.newScheduledThreadPool(threads); + } + + /** + * Start the load generation and run it until it is stopped. + */ + public void start() { + LOGGER.info("Beginning of Experiment..."); + LOGGER.info("Generating records for {} keys.", + this.workloadDefinition.getKeySpace().getCount()); + LOGGER.info("Experiment is going to be executed until cancelation..."); + + final int periodMs = (int) this.workloadDefinition.getPeriod().toMillis(); + for (final String key : this.workloadDefinition.getKeySpace().getKeys()) { + final long initialDelay = this.random.nextInt(periodMs); + final Runnable task = () -> this.messageGenerator.generate(key); + this.executor.scheduleAtFixedRate(task, initialDelay, periodMs, TimeUnit.MILLISECONDS); + } + } + + public void stop() { + this.executor.shutdownNow(); + } +} diff --git a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/MessageGenerator.java b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/MessageGenerator.java new file mode 100644 index 0000000000000000000000000000000000000000..c369f16557d60dae50e22ec7ad820c6a0ab4d137 --- /dev/null +++ b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/MessageGenerator.java @@ -0,0 +1,18 @@ +package theodolite.commons.workloadgeneration; + +/** + * Interface representing a message generator, which sends messages for given keys to some + * destination. + */ +@FunctionalInterface +public interface MessageGenerator { + + void generate(final String key); + + public static <T> MessageGenerator from( + final RecordGenerator<T> generator, + final RecordSender<T> sender) { + return key -> sender.send(generator.generate(key)); + } + +} diff --git a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/RecordGenerator.java b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/RecordGenerator.java new file mode 100644 index 0000000000000000000000000000000000000000..ea6501f38ea57bf6cefb5c76b05f442454ca0d99 --- /dev/null +++ b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/RecordGenerator.java @@ -0,0 +1,14 @@ +package theodolite.commons.workloadgeneration; + +/** + * This interface describes a function that takes meta information from a string key and produces an + * object of type T. + * + * @param <T> the type of the objects that will be generated by the function. + */ +@FunctionalInterface +public interface RecordGenerator<T> { + + T generate(final String key); + +} diff --git a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/functions/Transport.java b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/RecordSender.java similarity index 68% rename from benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/functions/Transport.java rename to benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/RecordSender.java index 7e5100a4e99f13a98156311a9d892c9626b2318a..ee57f2f239a34dd6f8f329d47e4d698427e371b0 100644 --- a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/functions/Transport.java +++ b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/RecordSender.java @@ -1,4 +1,4 @@ -package theodolite.commons.workloadgeneration.functions; +package theodolite.commons.workloadgeneration; /** * This interface describes a function that consumes a message {@code T}. This function is dedicated @@ -7,8 +7,8 @@ package theodolite.commons.workloadgeneration.functions; * @param <T> the type of records to send as messages. */ @FunctionalInterface -public interface Transport<T> { +public interface RecordSender<T> { - void transport(final T message); + void send(final T message); } diff --git a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/TitanMessageGeneratorFactory.java b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/TitanMessageGeneratorFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..bd0b41d4e6e004d024ed2fd179eddcf6af50438f --- /dev/null +++ b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/TitanMessageGeneratorFactory.java @@ -0,0 +1,57 @@ +package theodolite.commons.workloadgeneration; + +import java.util.Properties; +import titan.ccp.model.records.ActivePowerRecord; + +/** + * A factory for creating {@link MessageGenerator}s that creates Titan {@link ActivePowerRecord}s + * and sends them via Kafka. + */ +public final class TitanMessageGeneratorFactory { + + private final RecordSender<ActivePowerRecord> recordSender; + + private TitanMessageGeneratorFactory(final RecordSender<ActivePowerRecord> recordSender) { + this.recordSender = recordSender; + } + + /** + * Create a {@link MessageGenerator} that generates Titan {@link ActivePowerRecord}s with a + * constant value. + */ + public MessageGenerator forConstantValue(final double value) { + return MessageGenerator.from( + sensor -> new ActivePowerRecord(sensor, System.currentTimeMillis(), value), + this.recordSender); + } + + /** + * Create a new TitanMessageGeneratorFactory for the given Kafka configuration. + */ + public static TitanMessageGeneratorFactory withKafkaConfig( + final String bootstrapServers, + final String topic, + final String schemaRegistryUrl) { + return withKafkaConfig(bootstrapServers, topic, schemaRegistryUrl, new Properties()); + } + + /** + * Create a new TitanMessageGeneratorFactory for the given Kafka configuration. + */ + public static TitanMessageGeneratorFactory withKafkaConfig( + final String bootstrapServers, + final String topic, + final String schemaRegistryUrl, + final Properties properties) { + final KafkaRecordSender<ActivePowerRecord> kafkaRecordSender = KafkaRecordSender + .<ActivePowerRecord>builder( + bootstrapServers, + topic, + schemaRegistryUrl) + .keyAccessor(r -> r.getIdentifier()) + .timestampAccessor(r -> r.getTimestamp()) + .build(); + return new TitanMessageGeneratorFactory(kafkaRecordSender); + } + +} diff --git a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/WorkloadDefinition.java b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/WorkloadDefinition.java new file mode 100644 index 0000000000000000000000000000000000000000..5795cad7a4d942476116f6453758aa2304b5eda0 --- /dev/null +++ b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/WorkloadDefinition.java @@ -0,0 +1,71 @@ +package theodolite.commons.workloadgeneration; + +import java.io.Serializable; +import java.time.Duration; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * Definition of a workload consisting of a {@link KeySpace} and a period with which messages will + * be generated for key of that {@link KeySpace}. + */ +public class WorkloadDefinition implements Serializable { + + private static final long serialVersionUID = -8337364281221817001L; // NOPMD + + private final KeySpace keySpace; + private final Duration period; + + /** + * Create a new workload definition. + * + * @param keySpace the key space to use. + */ + public WorkloadDefinition( + final KeySpace keySpace, + final Duration period) { + this.keySpace = keySpace; + this.period = period; + } + + public KeySpace getKeySpace() { + return this.keySpace; + } + + public Duration getPeriod() { + return this.period; + } + + /** + * Divide this {@link WorkloadDefinition} into {@code parts} {@link WorkloadDefinition}s by + * distributing its {@link KeySpace} (almost) equally among all {@link WorkloadDefinition}s. + */ + public Set<WorkloadDefinition> divide(final int parts) { + final int effParts = Math.min(parts, this.keySpace.getCount()); + final int minSize = this.keySpace.getCount() / effParts; + final int largerParts = this.keySpace.getCount() % effParts; + return IntStream.range(0, effParts) + .mapToObj(part -> { + final int thisSize = part < largerParts ? minSize + 1 : minSize; + final int largePartsBefore = Math.min(largerParts, part); + final int smallPartsBefore = part - largePartsBefore; + final int start = largePartsBefore * (minSize + 1) + smallPartsBefore * minSize; + final int end = start + thisSize - 1; + return new KeySpace( + this.keySpace.getPrefix(), + start, + end); + }) + .map(keySpace -> new WorkloadDefinition( + keySpace, + this.period)) + .collect(Collectors.toUnmodifiableSet()); + } + + @Override + public String toString() { + return this.keySpace + ";" + this.period.toMillis(); + } + +} diff --git a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/communication/zookeeper/WorkloadDistributor.java b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/communication/zookeeper/WorkloadDistributor.java deleted file mode 100644 index 2249abcbcb1071cf880b2ee80f5d41f2b3dab463..0000000000000000000000000000000000000000 --- a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/communication/zookeeper/WorkloadDistributor.java +++ /dev/null @@ -1,202 +0,0 @@ -package theodolite.commons.workloadgeneration.communication.zookeeper; - -import java.nio.charset.StandardCharsets; -import java.util.function.BiConsumer; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.CuratorFrameworkFactory; -import org.apache.curator.framework.api.CuratorWatcher; -import org.apache.curator.framework.recipes.atomic.AtomicValue; -import org.apache.curator.framework.recipes.atomic.DistributedAtomicInteger; -import org.apache.curator.retry.ExponentialBackoffRetry; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher.Event.EventType; -import org.apache.zookeeper.data.Stat; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import theodolite.commons.workloadgeneration.dimensions.KeySpace; -import theodolite.commons.workloadgeneration.functions.BeforeAction; -import theodolite.commons.workloadgeneration.misc.WorkloadDefinition; -import theodolite.commons.workloadgeneration.misc.ZooKeeper; - -/** - * The central class responsible for distributing the workload through all workload generators. - */ -public class WorkloadDistributor { - - private static final Logger LOGGER = LoggerFactory.getLogger(WorkloadDistributor.class); - - private static final String NAMESPACE = "workload-generation"; - private static final String COUNTER_PATH = "/counter"; - private static final String WORKLOAD_PATH = "/workload"; - private static final String WORKLOAD_DEFINITION_PATH = "/workload/definition"; - - // Curator retry strategy - private static final int BASE_SLEEP_TIME_MS = 2000; - private static final int MAX_RETRIES = 5; - - // Wait time - private static final int MAX_WAIT_TIME = 20_000; - - private final DistributedAtomicInteger counter; - private final KeySpace keySpace; - private final BeforeAction beforeAction; - private final BiConsumer<WorkloadDefinition, Integer> workerAction; - - private final int instances; - private final ZooKeeper zooKeeper; // NOPMD keep instance variable instead of local variable - private final CuratorFramework client; - - private boolean workloadGenerationStarted = false; // NOPMD explicit intention that false - - /** - * Create a new workload distributor. - * - * @param keySpace the keyspace for the workload generation. - * @param beforeAction the before action for the workload generation. - * @param workerAction the action to perform by the workers. - */ - public WorkloadDistributor( - final int instances, - final ZooKeeper zooKeeper, - final KeySpace keySpace, - final BeforeAction beforeAction, - final BiConsumer<WorkloadDefinition, Integer> workerAction) { - this.instances = instances; - this.zooKeeper = zooKeeper; - this.keySpace = keySpace; - this.beforeAction = beforeAction; - this.workerAction = workerAction; - - this.client = CuratorFrameworkFactory.builder() - .namespace(NAMESPACE) - .connectString(this.zooKeeper.getHost() + ":" + this.zooKeeper.getPort()) - .retryPolicy(new ExponentialBackoffRetry(BASE_SLEEP_TIME_MS, MAX_RETRIES)) - .build(); - - this.client.start(); - - try { - this.client.blockUntilConnected(); - } catch (final InterruptedException e) { - LOGGER.error(e.getMessage(), e); - throw new IllegalStateException(e); - } - - this.counter = - new DistributedAtomicInteger(this.client, COUNTER_PATH, - new ExponentialBackoffRetry(BASE_SLEEP_TIME_MS, MAX_RETRIES)); - } - - /** - * Start the workload distribution. - */ - public void start() { - try { - AtomicValue<Integer> result = this.counter.increment(); - while (!result.succeeded()) { - result = this.counter.increment(); - } - - final int workerId = result.preValue(); - - final CuratorWatcher watcher = this.buildWatcher(workerId); - - final Stat nodeExists = - this.client.checkExists().creatingParentsIfNeeded().forPath(WORKLOAD_PATH); - if (nodeExists == null) { - this.client.create().forPath(WORKLOAD_PATH); - } - - if (workerId == 0) { - LOGGER.info("This instance is master with id {}", workerId); - - this.beforeAction.run(); - - // register worker action, as master acts also as worker - this.client.getChildren().usingWatcher(watcher).forPath(WORKLOAD_PATH); - - LOGGER.info("Number of Workers: {}", this.instances); - - final WorkloadDefinition definition = - new WorkloadDefinition(this.keySpace, this.instances); - - this.client.create().withMode(CreateMode.EPHEMERAL).forPath(WORKLOAD_DEFINITION_PATH, - definition.toString().getBytes(StandardCharsets.UTF_8)); - - } else { - LOGGER.info("This instance is worker with id {}", workerId); - - this.client.getChildren().usingWatcher(watcher).forPath(WORKLOAD_PATH); - - final Stat definitionExists = - this.client.checkExists().creatingParentsIfNeeded().forPath(WORKLOAD_DEFINITION_PATH); - - if (definitionExists != null) { - this.startWorkloadGeneration(workerId); - } - } - - Thread.sleep(MAX_WAIT_TIME); - - if (!this.workloadGenerationStarted) { - LOGGER.warn("No workload definition retrieved for 20 s. Terminating now.."); - } - } catch (final Exception e) { // NOPMD need to catch exception because of external framework - LOGGER.error(e.getMessage(), e); - throw new IllegalStateException("Error when starting the distribution of the workload.", e); - } - } - - /** - * Start the workload generation. This methods body does only get executed once. - * - * @param workerId the ID of this worker - * @throws Exception when an error occurs - */ - // NOPMD because exception thrown from used framework - private synchronized void startWorkloadGeneration(final int workerId) throws Exception { // NOPMD - - if (!this.workloadGenerationStarted) { - this.workloadGenerationStarted = true; - - final byte[] bytes = - this.client.getData().forPath(WORKLOAD_DEFINITION_PATH); - final WorkloadDefinition definition = - WorkloadDefinition.fromString(new String(bytes, StandardCharsets.UTF_8)); - - this.workerAction.accept(definition, workerId); - } - } - - /** - * Build a curator watcher which performs the worker action. - * - * @param worker the worker to create the watcher for. - * @return the curator watcher. - */ - private CuratorWatcher buildWatcher(final int workerId) { - return new CuratorWatcher() { - - @Override - public void process(final WatchedEvent event) { - if (event.getType() == EventType.NodeChildrenChanged) { - try { - WorkloadDistributor.this.startWorkloadGeneration(workerId); - } catch (final Exception e) { // NOPMD external framework throws exception - LOGGER.error(e.getMessage(), e); - throw new IllegalStateException("Error starting workload generation.", e); - } - } - } - }; - } - - /** - * Stop the workload distributor. - */ - public void stop() { - this.client.close(); - } - -} diff --git a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/dimensions/KeySpace.java b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/dimensions/KeySpace.java deleted file mode 100644 index 2eaa1d487f67ae8325a3622a7ae6c4529fbb1cd6..0000000000000000000000000000000000000000 --- a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/dimensions/KeySpace.java +++ /dev/null @@ -1,56 +0,0 @@ -package theodolite.commons.workloadgeneration.dimensions; - -import theodolite.commons.workloadgeneration.generators.AbstractWorkloadGenerator; - -/** - * Wrapper class for the definition of the Keys that should be used by the - * {@link AbstractWorkloadGenerator}. - */ -public class KeySpace { - - private final String prefix; - private final int min; - private final int max; - - - /** - * Create a new key space. All keys will have the prefix {@code prefix}. The remaining part of - * each key will be determined by a number of the interval ({@code min}, {@code max}-1). - * - * @param prefix the prefix to use for all keys - * @param min the lower bound (inclusive) to start counting from - * @param max the upper bound (exclusive) to count to - */ - public KeySpace(final String prefix, final int min, final int max) { - if (prefix == null || prefix.contains(";")) { - throw new IllegalArgumentException( - "The prefix must not be null and must not contain the ';' character."); - } - this.prefix = prefix; - this.min = min; - this.max = max; - - } - - public KeySpace(final String prefix, final int numberOfKeys) { - this(prefix, 0, numberOfKeys - 1); - } - - public KeySpace(final int numberOfKeys) { - this("sensor_", 0, numberOfKeys - 1); - } - - public String getPrefix() { - return this.prefix; - } - - - public int getMin() { - return this.min; - } - - - public int getMax() { - return this.max; - } -} diff --git a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/functions/MessageGenerator.java b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/functions/MessageGenerator.java deleted file mode 100644 index 672b579ebbdf3cbb08f3d05d9511c9077f9dac6b..0000000000000000000000000000000000000000 --- a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/functions/MessageGenerator.java +++ /dev/null @@ -1,14 +0,0 @@ -package theodolite.commons.workloadgeneration.functions; - -/** - * This interface describes a function that takes meta information from a string (e.g. an ID) and - * produces an object of type T. - * - * @param <T> the type of the objects that will be generated by the function. - */ -@FunctionalInterface -public interface MessageGenerator<T> { - - T generateMessage(final String key); - -} diff --git a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/AbstractWorkloadGenerator.java b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/AbstractWorkloadGenerator.java deleted file mode 100644 index 104f1cefb34200a2cf34d1578faecdfdae6ccd56..0000000000000000000000000000000000000000 --- a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/AbstractWorkloadGenerator.java +++ /dev/null @@ -1,138 +0,0 @@ -package theodolite.commons.workloadgeneration.generators; - -import java.time.Duration; -import java.util.LinkedList; -import java.util.List; -import java.util.Random; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.function.BiConsumer; -import java.util.function.BiFunction; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import theodolite.commons.workloadgeneration.communication.zookeeper.WorkloadDistributor; -import theodolite.commons.workloadgeneration.dimensions.KeySpace; -import theodolite.commons.workloadgeneration.functions.BeforeAction; -import theodolite.commons.workloadgeneration.functions.MessageGenerator; -import theodolite.commons.workloadgeneration.functions.Transport; -import theodolite.commons.workloadgeneration.misc.WorkloadDefinition; -import theodolite.commons.workloadgeneration.misc.WorkloadEntity; -import theodolite.commons.workloadgeneration.misc.ZooKeeper; - -/** - * Base for workload generators. - * - * @param <T> The type of records the workload generator is dedicated for. - */ -public abstract class AbstractWorkloadGenerator<T> - implements WorkloadGenerator { - - private static final Logger LOGGER = LoggerFactory.getLogger(AbstractWorkloadGenerator.class); - - private final int instances; // NOPMD keep instance variable instead of local variable - private final ZooKeeper zooKeeper; // NOPMD keep instance variable instead of local variable - private final KeySpace keySpace;// NOPMD keep instance variable instead of local variable - private final BeforeAction beforeAction; // NOPMD keep instance variable instead of local variable - private final BiFunction<WorkloadDefinition, Integer, List<WorkloadEntity<T>>> workloadSelector; - private final MessageGenerator<T> generatorFunction; - private final Transport<T> transport; - private WorkloadDistributor workloadDistributor; // NOPMD keep instance variable instead of local - private final ScheduledExecutorService executor; - - /** - * Create a new workload generator. - * - * @param instances the number of workload-generator instances. - * @param zooKeeper the zookeeper connection. - * @param keySpace the keyspace. - * @param threads the number of threads that is used to generate the load. - * @param period the period, how often a new record is emitted. - * @param duration the maximum runtime. - * @param beforeAction the action to perform before the workload generation starts. - * @param generatorFunction the function that is used to generate the individual records. - * @param transport the function that is used to send generated messages to the messaging system. - */ - public AbstractWorkloadGenerator( - final int instances, - final ZooKeeper zooKeeper, - final KeySpace keySpace, - final int threads, - final Duration period, - final Duration duration, - final BeforeAction beforeAction, - final MessageGenerator<T> generatorFunction, - final Transport<T> transport) { - this.instances = instances; - this.zooKeeper = zooKeeper; - this.keySpace = keySpace; - this.beforeAction = beforeAction; - this.generatorFunction = generatorFunction; - this.workloadSelector = (workloadDefinition, workerId) -> { - final List<WorkloadEntity<T>> workloadEntities = new LinkedList<>(); - - for (int i = - workloadDefinition.getKeySpace().getMin() + workerId; i <= workloadDefinition - .getKeySpace().getMax(); i += workloadDefinition.getNumberOfWorkers()) { - final String id = workloadDefinition.getKeySpace().getPrefix() + i; - workloadEntities.add(new WorkloadEntity<>(id, this.generatorFunction)); - } - - return workloadEntities; - }; - this.transport = transport; - - this.executor = Executors.newScheduledThreadPool(threads); - final Random random = new Random(); - - final int periodMs = (int) period.toMillis(); - - LOGGER.info("Period: {}", periodMs); - - final BiConsumer<WorkloadDefinition, Integer> workerAction = (declaration, workerId) -> { - - final List<WorkloadEntity<T>> entities = this.workloadSelector.apply(declaration, workerId); - - LOGGER.info("Beginning of Experiment..."); - LOGGER.info("Generating records for {} keys.", entities.size()); - LOGGER.info("Experiment is going to be executed for the specified duration..."); - - entities.forEach(entity -> { - final long initialDelay = random.nextInt(periodMs); - final Runnable task = () -> this.transport.transport(entity.generateMessage()); - this.executor.scheduleAtFixedRate(task, initialDelay, periodMs, TimeUnit.MILLISECONDS); - }); - - - try { - this.executor.awaitTermination(duration.getSeconds(), TimeUnit.SECONDS); - LOGGER.info("Terminating now..."); - this.stop(); - } catch (final InterruptedException e) { - LOGGER.error("", e); - throw new IllegalStateException("Error when terminating the workload generation.", e); - } - }; - - this.workloadDistributor = new WorkloadDistributor( - this.instances, - this.zooKeeper, - this.keySpace, - this.beforeAction, - workerAction); - } - - /** - * Start the workload generation. The generation terminates automatically after the specified - * {@code duration}. - */ - @Override - public void start() { - this.workloadDistributor.start(); - } - - @Override - public void stop() { - this.workloadDistributor.stop(); - } -} diff --git a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/KafkaWorkloadGenerator.java b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/KafkaWorkloadGenerator.java deleted file mode 100644 index 944cec6a2dffed886f06fad1e36c9d35375fe15c..0000000000000000000000000000000000000000 --- a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/KafkaWorkloadGenerator.java +++ /dev/null @@ -1,59 +0,0 @@ -package theodolite.commons.workloadgeneration.generators; - -import java.time.Duration; -import org.apache.avro.specific.SpecificRecord; -import theodolite.commons.workloadgeneration.communication.kafka.KafkaRecordSender; -import theodolite.commons.workloadgeneration.dimensions.KeySpace; -import theodolite.commons.workloadgeneration.functions.BeforeAction; -import theodolite.commons.workloadgeneration.functions.MessageGenerator; -import theodolite.commons.workloadgeneration.misc.ZooKeeper; - -/** - * Workload generator for generating load for the kafka messaging system. - * - * @param <T> The type of records the workload generator is dedicated for. - */ -public class KafkaWorkloadGenerator<T extends SpecificRecord> - extends AbstractWorkloadGenerator<T> { - - private final KafkaRecordSender<T> recordSender; - - /** - * Create a new workload generator. - * - * @param zooKeeper a reference to the ZooKeeper instance. - * @param keySpace the key space to generate the workload for. - * @param threads tha amount of threads to use per instance. - * @param period the period how often a message is generated for each key specified in the - * {@code keySpace} - * @param duration the duration how long the workload generator will emit messages. - * @param beforeAction the action which will be performed before the workload generator starts - * generating messages. If {@code null}, no before action will be performed. - * @param generatorFunction the generator function. This function is executed, each time a message - * is generated. - * @param recordSender the record sender which is used to send the generated messages to kafka. - */ - public KafkaWorkloadGenerator( - final int instances, - final ZooKeeper zooKeeper, - final KeySpace keySpace, - final int threads, - final Duration period, - final Duration duration, - final BeforeAction beforeAction, - final MessageGenerator<T> generatorFunction, - final KafkaRecordSender<T> recordSender) { - super(instances, zooKeeper, keySpace, threads, period, duration, beforeAction, - generatorFunction, - recordSender); - this.recordSender = recordSender; - } - - - @Override - public void stop() { - this.recordSender.terminate(); - - super.stop(); - } -} diff --git a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/KafkaWorkloadGeneratorBuilder.java b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/KafkaWorkloadGeneratorBuilder.java deleted file mode 100644 index 785087c13480b7149a5726dfce8bbf4307b57933..0000000000000000000000000000000000000000 --- a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/KafkaWorkloadGeneratorBuilder.java +++ /dev/null @@ -1,185 +0,0 @@ -package theodolite.commons.workloadgeneration.generators; - -import java.time.Duration; -import java.util.Objects; -import org.apache.avro.specific.SpecificRecord; -import theodolite.commons.workloadgeneration.communication.kafka.KafkaRecordSender; -import theodolite.commons.workloadgeneration.dimensions.KeySpace; -import theodolite.commons.workloadgeneration.functions.BeforeAction; -import theodolite.commons.workloadgeneration.functions.MessageGenerator; -import theodolite.commons.workloadgeneration.misc.ZooKeeper; - -/** - * Builder for {@link workload generators}. - * - * @param <T> the record for which the builder is dedicated for. - */ -public final class KafkaWorkloadGeneratorBuilder<T extends SpecificRecord> { // NOPMD - - private int instances; // NOPMD - private ZooKeeper zooKeeper; // NOPMD - private KeySpace keySpace; // NOPMD - private int threads; // NOPMD - private Duration period; // NOPMD - private Duration duration; // NOPMD - private BeforeAction beforeAction; // NOPMD - private MessageGenerator<T> generatorFunction; // NOPMD - private KafkaRecordSender<T> kafkaRecordSender; // NOPMD - - private KafkaWorkloadGeneratorBuilder() { - - } - - /** - * Get a builder for the {@link KafkaWorkloadGenerator}. - * - * @return the builder. - */ - public static <T extends SpecificRecord> KafkaWorkloadGeneratorBuilder<T> builder() { - return new KafkaWorkloadGeneratorBuilder<>(); - } - - /** - * Set the number of instances. - * - * @param instances the number of instances. - * @return the builder. - */ - public KafkaWorkloadGeneratorBuilder<T> instances(final int instances) { - this.instances = instances; - return this; - } - - /** - * Set the ZooKeeper reference. - * - * @param zooKeeper a reference to the ZooKeeper instance. - * @return the builder. - */ - public KafkaWorkloadGeneratorBuilder<T> zooKeeper(final ZooKeeper zooKeeper) { - this.zooKeeper = zooKeeper; - return this; - } - - /** - * Set the before action for the {@link KafkaWorkloadGenerator}. - * - * @param beforeAction the {@link BeforeAction}. - * @return the builder. - */ - public KafkaWorkloadGeneratorBuilder<T> beforeAction(final BeforeAction beforeAction) { - this.beforeAction = beforeAction; - return this; - } - - /** - * Set the key space for the {@link KafkaWorkloadGenerator}. - * - * @param keySpace the {@link KeySpace}. - * @return the builder. - */ - public KafkaWorkloadGeneratorBuilder<T> keySpace(final KeySpace keySpace) { - this.keySpace = keySpace; - return this; - } - - /** - * Set the key space for the {@link KafkaWorkloadGenerator}. - * - * @param threads the number of threads. - * @return the builder. - */ - public KafkaWorkloadGeneratorBuilder<T> threads(final int threads) { - this.threads = threads; - return this; - } - - /** - * Set the period for the {@link KafkaWorkloadGenerator}. - * - * @param period the {@link Period} - * @return the builder. - */ - public KafkaWorkloadGeneratorBuilder<T> period(final Duration period) { - this.period = period; - return this; - } - - /** - * Set the durtion for the {@link KafkaWorkloadGenerator}. - * - * @param duration the {@link Duration}. - * @return the builder. - */ - public KafkaWorkloadGeneratorBuilder<T> duration(final Duration duration) { - this.duration = duration; - return this; - } - - /** - * Set the generator function for the {@link KafkaWorkloadGenerator}. - * - * @param generatorFunction the generator function. - * @return the builder. - */ - public KafkaWorkloadGeneratorBuilder<T> generatorFunction( - final MessageGenerator<T> generatorFunction) { - this.generatorFunction = generatorFunction; - return this; - } - - /** - * Set the {@link KafkaRecordSender} for the {@link KafkaWorkloadGenerator}. - * - * @param kafkaRecordSender the record sender to use. - * @return the builder. - */ - public KafkaWorkloadGeneratorBuilder<T> kafkaRecordSender( - final KafkaRecordSender<T> kafkaRecordSender) { - this.kafkaRecordSender = kafkaRecordSender; - return this; - } - - /** - * Build the actual {@link KafkaWorkloadGenerator}. The following parameters are must be - * specicified before this method is called: - * <ul> - * <li>zookeeper</li> - * <li>key space</li> - * <li>period</li> - * <li>duration</li> - * <li>generator function</li> - * <li>kafka record sender</li> - * </ul> - * - * @return the built instance of the {@link KafkaWorkloadGenerator}. - */ - public KafkaWorkloadGenerator<T> build() { - if (this.instances < 1) { // NOPMD - throw new IllegalArgumentException( - "Please specify a valid number of instances. Currently: " + this.instances); - } - Objects.requireNonNull(this.zooKeeper, "Please specify the ZooKeeper instance."); - if (this.threads < 1) { // NOPMD - this.threads = 1; - } - Objects.requireNonNull(this.keySpace, "Please specify the key space."); - Objects.requireNonNull(this.period, "Please specify the period."); - Objects.requireNonNull(this.duration, "Please specify the duration."); - this.beforeAction = Objects.requireNonNullElse(this.beforeAction, () -> { - }); - Objects.requireNonNull(this.generatorFunction, "Please specify the generator function."); - Objects.requireNonNull(this.kafkaRecordSender, "Please specify the kafka record sender."); - - return new KafkaWorkloadGenerator<>( - this.instances, - this.zooKeeper, - this.keySpace, - this.threads, - this.period, - this.duration, - this.beforeAction, - this.generatorFunction, - this.kafkaRecordSender); - } -} diff --git a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/WorkloadGenerator.java b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/WorkloadGenerator.java deleted file mode 100644 index b121ac157b84d64818d9fdfc90589d49fd933752..0000000000000000000000000000000000000000 --- a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/WorkloadGenerator.java +++ /dev/null @@ -1,18 +0,0 @@ -package theodolite.commons.workloadgeneration.generators; - -/** - * Base methods for workload generators. - */ -public interface WorkloadGenerator { - - /** - * Start the workload generation. - */ - void start(); - - /** - * Stop the workload generation. - */ - void stop(); - -} diff --git a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/misc/WorkloadDefinition.java b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/misc/WorkloadDefinition.java deleted file mode 100644 index 86369d6c883954b792b2ee0fd6a988377ecb8965..0000000000000000000000000000000000000000 --- a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/misc/WorkloadDefinition.java +++ /dev/null @@ -1,71 +0,0 @@ -package theodolite.commons.workloadgeneration.misc; - -import theodolite.commons.workloadgeneration.dimensions.KeySpace; - -/** - * The central class that contains all information that needs to be exchanged between the nodes for - * distributed workload generation. - */ -public class WorkloadDefinition { - private static final int ZERO = 0; - private static final int ONE = 1; - private static final int TWO = 2; - private static final int THREE = 3; - private static final int FOUR = 4; - - private final KeySpace keySpace; - private final int numberOfWorkers; - - /** - * Create a new workload definition. - * - * @param keySpace the key space to use. - * @param numberOfWorkers the number of workers participating in the workload generation. - */ - public WorkloadDefinition(final KeySpace keySpace, final int numberOfWorkers) { - - this.keySpace = keySpace; - this.numberOfWorkers = numberOfWorkers; - } - - public KeySpace getKeySpace() { - return this.keySpace; - } - - public int getNumberOfWorkers() { - return this.numberOfWorkers; - } - - /** - * Simple method for encoding all information of the workload definition into one string. - * - * @return a string that encodes all information of the workload generation in a compact format. - * The format is 'keySpace;keySpace.min;keySpace.max;numberOfWorkers'. - */ - @Override - public String toString() { - return this.getKeySpace().getPrefix() + ";" + this.getKeySpace().getMin() + ";" - + this.getKeySpace().getMax() + ";" + this.getNumberOfWorkers(); - } - - /** - * Parse a workload generation from a previously encoded string with the format returned by - * {@link WorkloadDefinition#toString()}. - * - * @param workloadDefinitionString the workload definition string. - * @return the parsed workload definition. - */ - public static WorkloadDefinition fromString(final String workloadDefinitionString) { - final String[] deserialized = workloadDefinitionString.split(";"); - - if (deserialized.length != FOUR) { - throw new IllegalArgumentException( - "Wrong workload definition string when trying to parse the workload generation."); - } - - return new WorkloadDefinition( - new KeySpace(deserialized[ZERO], Integer.valueOf(deserialized[ONE]), - Integer.valueOf(deserialized[TWO])), - Integer.valueOf(deserialized[THREE])); - } -} diff --git a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/misc/WorkloadEntity.java b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/misc/WorkloadEntity.java deleted file mode 100644 index d8665b3fb53e7d15ed61780e3b91fbfe56f709ba..0000000000000000000000000000000000000000 --- a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/misc/WorkloadEntity.java +++ /dev/null @@ -1,22 +0,0 @@ -package theodolite.commons.workloadgeneration.misc; - -import theodolite.commons.workloadgeneration.functions.MessageGenerator; - -/** - * Representation of a entity of the workload generation that generates load for one fixed key. - * - * @param <T> The type of records the workload generator is dedicated for. - */ -public class WorkloadEntity<T> { - private final String key; - private final MessageGenerator<T> generator; - - public WorkloadEntity(final String key, final MessageGenerator<T> generator) { - this.key = key; - this.generator = generator; - } - - public T generateMessage() { - return this.generator.generateMessage(this.key); - } -} diff --git a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/misc/ZooKeeper.java b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/misc/ZooKeeper.java deleted file mode 100644 index a80490600ad9c9c22c198fc76b6d9f73bdc30584..0000000000000000000000000000000000000000 --- a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/misc/ZooKeeper.java +++ /dev/null @@ -1,29 +0,0 @@ -package theodolite.commons.workloadgeneration.misc; - -/** - * Wrapper for connection information for ZooKeeper. - */ -public class ZooKeeper { - - private final String host; - private final int port; - - /** - * Create a new representation of an ZooKeeper instance. - * - * @param host of zookeeper. - * @param port of zookeeper. - */ - public ZooKeeper(final String host, final int port) { - this.host = host; - this.port = port; - } - - public String getHost() { - return this.host; - } - - public int getPort() { - return this.port; - } -} diff --git a/benchmarks/workload-generator-commons/src/test/java/theodolite/commons/workloadgeneration/KeySpaceTest.java b/benchmarks/workload-generator-commons/src/test/java/theodolite/commons/workloadgeneration/KeySpaceTest.java new file mode 100644 index 0000000000000000000000000000000000000000..20c094ddcc7ff110a25aaffa494766e89d4d2475 --- /dev/null +++ b/benchmarks/workload-generator-commons/src/test/java/theodolite/commons/workloadgeneration/KeySpaceTest.java @@ -0,0 +1,30 @@ +package theodolite.commons.workloadgeneration; + +import org.junit.Assert; +import org.junit.Test; +import theodolite.commons.workloadgeneration.KeySpace; + +public class KeySpaceTest { + + @Test + public void testCountFixedRangeFromZero() { + final KeySpace keySpace = new KeySpace("prefix", 0, 9); + final int count = keySpace.getCount(); + Assert.assertEquals(10, count); + } + + @Test + public void testCountFixedRangeNotFromZero() { + final KeySpace keySpace = new KeySpace("prefix", 4, 11); + final int count = keySpace.getCount(); + Assert.assertEquals(8, count); + } + + @Test + public void testCountAutoRange() { + final KeySpace keySpace = new KeySpace("prefix", 42); + final int count = keySpace.getCount(); + Assert.assertEquals(42, count); + } + +} diff --git a/benchmarks/workload-generator-commons/src/test/java/theodolite/commons/workloadgeneration/WorkloadDefinitionTest.java b/benchmarks/workload-generator-commons/src/test/java/theodolite/commons/workloadgeneration/WorkloadDefinitionTest.java new file mode 100644 index 0000000000000000000000000000000000000000..9a5dbf2d20e9e33b5902e5f352dc8a4023478cdf --- /dev/null +++ b/benchmarks/workload-generator-commons/src/test/java/theodolite/commons/workloadgeneration/WorkloadDefinitionTest.java @@ -0,0 +1,97 @@ +package theodolite.commons.workloadgeneration; + +import java.time.Duration; +import java.util.Comparator; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import org.junit.Assert; +import org.junit.Test; + +public class WorkloadDefinitionTest { + + @Test + public void testDivideByOneAmount() { + final KeySpace keySpace = new KeySpace("prefix", 100); + final WorkloadDefinition workload = new WorkloadDefinition(keySpace, Duration.ofSeconds(1)); + final Set<WorkloadDefinition> subworkloads = workload.divide(1); + Assert.assertEquals(1, subworkloads.size()); + } + + @Test + public void testDivideMultipleAmount() { + final KeySpace keySpace = new KeySpace("prefix", 100); + final WorkloadDefinition workload = new WorkloadDefinition(keySpace, Duration.ofSeconds(1)); + final Set<WorkloadDefinition> subworkloads = workload.divide(2); + Assert.assertEquals(2, subworkloads.size()); + } + + @Test + public void testDivideNonMultipleAmount() { + final KeySpace keySpace = new KeySpace("prefix", 100); + final WorkloadDefinition workload = new WorkloadDefinition(keySpace, Duration.ofSeconds(1)); + final Set<WorkloadDefinition> subworkloads = workload.divide(3); + Assert.assertEquals(3, subworkloads.size()); + } + + @Test + public void testDivide() { + final KeySpace keySpace = new KeySpace("prefix", 100); + final WorkloadDefinition workload = new WorkloadDefinition(keySpace, Duration.ofSeconds(1)); + final Set<WorkloadDefinition> subworkloads = workload.divide(3); + Assert.assertEquals(3, subworkloads.size()); + for (final WorkloadDefinition subworkload : subworkloads) { + Assert.assertEquals("prefix", subworkload.getKeySpace().getPrefix()); + Assert.assertEquals(Duration.ofSeconds(1), subworkload.getPeriod()); + } + final List<WorkloadDefinition> orderedSubworkloads = subworkloads.stream() + .sorted(Comparator.comparingInt(l -> l.getKeySpace().getMin())) + .collect(Collectors.toList()); + final WorkloadDefinition subworkload1 = orderedSubworkloads.get(0); + Assert.assertEquals(0, subworkload1.getKeySpace().getMin()); + Assert.assertEquals(33, subworkload1.getKeySpace().getMax()); + final WorkloadDefinition subworkload2 = orderedSubworkloads.get(1); + Assert.assertEquals(34, subworkload2.getKeySpace().getMin()); + Assert.assertEquals(66, subworkload2.getKeySpace().getMax()); + final WorkloadDefinition subworkload3 = orderedSubworkloads.get(2); + Assert.assertEquals(67, subworkload3.getKeySpace().getMin()); + Assert.assertEquals(99, subworkload3.getKeySpace().getMax()); + } + + @Test + public void testDivideMany() { + final KeySpace keySpace = new KeySpace("prefix", 10); + final WorkloadDefinition workload = new WorkloadDefinition(keySpace, Duration.ofSeconds(1)); + final Set<WorkloadDefinition> subworkloads = workload.divide(7); + Assert.assertEquals(7, subworkloads.size()); + for (final WorkloadDefinition subworkload : subworkloads) { + Assert.assertEquals("prefix", subworkload.getKeySpace().getPrefix()); + Assert.assertEquals(Duration.ofSeconds(1), subworkload.getPeriod()); + } + final List<WorkloadDefinition> orderedSubworkloads = subworkloads.stream() + .sorted(Comparator.comparingInt(l -> l.getKeySpace().getMin())) + .collect(Collectors.toList()); + final WorkloadDefinition subworkload1 = orderedSubworkloads.get(0); + Assert.assertEquals(0, subworkload1.getKeySpace().getMin()); + Assert.assertEquals(1, subworkload1.getKeySpace().getMax()); + final WorkloadDefinition subworkload2 = orderedSubworkloads.get(1); + Assert.assertEquals(2, subworkload2.getKeySpace().getMin()); + Assert.assertEquals(3, subworkload2.getKeySpace().getMax()); + final WorkloadDefinition subworkload3 = orderedSubworkloads.get(2); + Assert.assertEquals(4, subworkload3.getKeySpace().getMin()); + Assert.assertEquals(5, subworkload3.getKeySpace().getMax()); + final WorkloadDefinition subworkload4 = orderedSubworkloads.get(3); + Assert.assertEquals(6, subworkload4.getKeySpace().getMin()); + Assert.assertEquals(6, subworkload4.getKeySpace().getMax()); + final WorkloadDefinition subworkload5 = orderedSubworkloads.get(4); + Assert.assertEquals(7, subworkload5.getKeySpace().getMin()); + Assert.assertEquals(7, subworkload5.getKeySpace().getMax()); + final WorkloadDefinition subworkload6 = orderedSubworkloads.get(5); + Assert.assertEquals(8, subworkload6.getKeySpace().getMin()); + Assert.assertEquals(8, subworkload6.getKeySpace().getMax()); + final WorkloadDefinition subworkload7 = orderedSubworkloads.get(6); + Assert.assertEquals(9, subworkload7.getKeySpace().getMin()); + Assert.assertEquals(9, subworkload7.getKeySpace().getMax()); + } + +} diff --git a/execution/README.md b/execution/README.md index 6c6203ad983549bf0ed2fdb040f4165dc36bd6bd..93cd7656d9e5faee04649d874b8fb5ad705c08a7 100644 --- a/execution/README.md +++ b/execution/README.md @@ -247,11 +247,12 @@ Kubernetes volume. | --duration | DURATION | Duration in minutes subexperiments should be executed for. *Default:* `5`. | | --partitions | PARTITIONS | Number of partitions for Kafka topics. *Default:* `40`. | | --cpu-limit | CPU_LIMIT | Kubernetes CPU limit for a single Pod. *Default:* `1000m`. | -| --memory-limiT | MEMORY_LIMIT | Kubernetes memory limit for a single Pod. *Default:* `4Gi`. | +| --memory-limit | MEMORY_LIMIT | Kubernetes memory limit for a single Pod. *Default:* `4Gi`. | | --domain-restriction | DOMAIN_RESTRICTION | A flag that indiciates domain restriction should be used. *Default:* not set. For more details see Section [Domain Restriction](#domain-restriction). | | --search-strategy | SEARCH_STRATEGY | The benchmarking search strategy. Can be set to `check-all`, `linear-search` or `binary-search`. *Default:* `check-all`. For more details see Section [Benchmarking Search Strategies](#benchmarking-search-strategies). | | --reset | RESET | Resets the environment before each subexperiment. Useful if execution was aborted and just one experiment should be executed. | | --reset-only | RESET_ONLY | Only resets the environment. Ignores all other parameters. Useful if execution was aborted and one want a clean state for new executions. | +| --namespace | NAMESPACE | Kubernetes namespace. *Default:* `default`. | | --prometheus | PROMETHEUS_BASE_URL | Defines where to find the prometheus instance. *Default:* `http://localhost:9090` | | --path | RESULT_PATH | A directory path for the results. Relative to the Execution folder. *Default:* `results` | | --configurations | CONFIGURATIONS | Defines environment variables for the use cases and, thus, enables further configuration options. | diff --git a/execution/run_uc.py b/execution/run_uc.py index 9bbb2876447438c1c3ac676091b11f6baa990622..7f5580f20863482281095aa1fda220393c91e8b0 100644 --- a/execution/run_uc.py +++ b/execution/run_uc.py @@ -94,6 +94,7 @@ def load_yaml_files(): :return: wg, app_svc, app_svc_monitor ,app_jmx, app_deploy """ print('Load kubernetes yaml files') + wg_svc = load_yaml('uc-workload-generator/load-generator-service.yaml') wg = load_yaml('uc-workload-generator/workloadGenerator.yaml') app_svc = load_yaml('uc-application/aggregation-service.yaml') app_svc_monitor = load_yaml('uc-application/service-monitor.yaml') @@ -101,7 +102,7 @@ def load_yaml_files(): app_deploy = load_yaml('uc-application/aggregation-deployment.yaml') print('Kubernetes yaml files loaded') - return wg, app_svc, app_svc_monitor, app_jmx, app_deploy + return wg_svc, wg, app_svc, app_svc_monitor, app_jmx, app_deploy def replace_env_value(container, key, value): @@ -113,8 +114,9 @@ def replace_env_value(container, key, value): 'value'] = value -def start_workload_generator(wg_yaml, dim_value, uc_id): +def start_workload_generator(svc_yaml, wg_yaml, dim_value, uc_id): """Starts the workload generator. + :param wg_yaml: The yaml object for the workload generator service. :param wg_yaml: The yaml object for the workload generator. :param string dim_value: The dimension value the load generator should use. :param string uc_id: Use case id for which load should be generated. @@ -123,7 +125,18 @@ def start_workload_generator(wg_yaml, dim_value, uc_id): the yaml object. """ print('Start workload generator') + svc, wg_deploy = None, None + # Create Service + try: + svc = coreApi.create_namespaced_service( + namespace=namespace, body=svc_yaml) + print(f'Service {svc.metadata.name} created.') + except client.rest.ApiException as e: + svc = svc_yaml + logging.error("Service creation error: %s", e.reason) + + # Create Deployment num_sensors = dim_value wl_max_records = 150000 wl_instances = (num_sensors + wl_max_records - 1) // wl_max_records @@ -147,22 +160,22 @@ def start_workload_generator(wg_yaml, dim_value, uc_id): # Set environment variables replace_env_value(wg_containter['env'], 'NUM_SENSORS', str(num_sensors)) - replace_env_value(wg_containter['env'], 'INSTANCES', str(wl_instances)) if uc_id == '2': # Special configuration for uc2 replace_env_value( wg_containter['env'], 'NUM_NESTED_GROUPS', str(num_nested_groups)) try: - wg_ss = appsApi.create_namespaced_deployment( + wg_deploy = appsApi.create_namespaced_deployment( namespace=namespace, body=wg_yaml ) - print(f'Deployment {wg_ss.metadata.name} created.') - return wg_ss + print(f'Deployment {wg_deploy.metadata.name} created.') except client.rest.ApiException as e: print(f'Deployment creation error: {e.reason}') - return wg_yaml + wg_deploy = wg_yaml + + return svc, wg_deploy def start_application(svc_yaml, svc_monitor_yaml, jmx_yaml, deploy_yaml, @@ -317,19 +330,23 @@ def delete_resource(obj, del_func): print('Resource deleted') -def stop_applications(wg, app_svc, app_svc_monitor, app_jmx, app_deploy): +def stop_applications(wg_svc, wg, app_svc, app_svc_monitor, app_jmx, app_deploy): """Stops the applied applications and delete resources. - :param wg: The workload generator statefull set. + :param wg: The load generator service. + :param wg: The load generator deployment. :param app_svc: The application service. :param app_svc_monitor: The application service monitor. :param app_jmx: The application jmx config map. :param app_deploy: The application deployment. """ - print('Stop use case application and workload generator') + print('Stop use case application and load generator') - print('Delete workload generator') + print('Delete load generator deployment') delete_resource(wg, appsApi.delete_namespaced_deployment) + print('Delete load generator service') + delete_resource(wg_svc, coreApi.delete_namespaced_service) + print('Delete app service') delete_resource(app_svc, coreApi.delete_namespaced_service) @@ -492,12 +509,12 @@ def stop_lag_exporter(): return -def reset_cluster(wg, app_svc, app_svc_monitor, app_jmx, app_deploy, topics): +def reset_cluster(wg_svc, wg, app_svc, app_svc_monitor, app_jmx, app_deploy, topics): """ Stop the applications, delete topics, reset zookeeper and stop lag exporter. """ print('Reset cluster') - stop_applications(wg, app_svc, app_svc_monitor, app_jmx, app_deploy) + stop_applications(wg_svc, wg, app_svc, app_svc_monitor, app_jmx, app_deploy) print('---------------------') delete_topics(topics) print('---------------------') @@ -524,7 +541,7 @@ def main(exp_id, uc_id, dim_value, instances, partitions, cpu_limit, memory_limi """ global namespace namespace = ns - wg, app_svc, app_svc_monitor, app_jmx, app_deploy = load_yaml_files() + wg_svc, wg, app_svc, app_svc_monitor, app_jmx, app_deploy = load_yaml_files() print('---------------------') initialize_kubernetes_api() @@ -538,24 +555,24 @@ def main(exp_id, uc_id, dim_value, instances, partitions, cpu_limit, memory_limi # Check for reset options if reset_only: # Only reset cluster an then end program - reset_cluster(wg, app_svc, app_svc_monitor, + reset_cluster(wg_svc, wg, app_svc, app_svc_monitor, app_jmx, app_deploy, topics) sys.exit() if reset: # Reset cluster before execution print('Reset only mode') - reset_cluster(wg, app_svc, app_svc_monitor, + reset_cluster(wg_svc, wg, app_svc, app_svc_monitor, app_jmx, app_deploy, topics) print('---------------------') # Register the reset operation so that is executed at the abort of program - atexit.register(reset_cluster, wg, app_svc, + atexit.register(reset_cluster, wg_svc, wg, app_svc, app_svc_monitor, app_jmx, app_deploy, topics) create_topics(topics) print('---------------------') - wg = start_workload_generator(wg, dim_value, uc_id) + wg_svc, wg = start_workload_generator(wg_svc, wg, dim_value, uc_id) print('---------------------') app_svc, app_svc_monitor, app_jmx, app_deploy = start_application( @@ -578,7 +595,7 @@ def main(exp_id, uc_id, dim_value, instances, partitions, cpu_limit, memory_limi print('---------------------') # Reset cluster regular, therefore abort exit not needed anymore - reset_cluster(wg, app_svc, app_svc_monitor, app_jmx, app_deploy, topics) + reset_cluster(wg_svc, wg, app_svc, app_svc_monitor, app_jmx, app_deploy, topics) atexit.unregister(reset_cluster) diff --git a/execution/uc-workload-generator/load-generator-service.yaml b/execution/uc-workload-generator/load-generator-service.yaml new file mode 100644 index 0000000000000000000000000000000000000000..c1299e373009dee5fa4cc87093ebc684c7f2e333 --- /dev/null +++ b/execution/uc-workload-generator/load-generator-service.yaml @@ -0,0 +1,16 @@ +apiVersion: v1 +kind: Service +metadata: + name: titan-ccp-load-generator + labels: + app: titan-ccp-load-generator +spec: + type: ClusterIP + clusterIP: None + selector: + app: titan-ccp-load-generator + ports: + - name: coordination + port: 5701 + targetPort: 5701 + protocol: TCP diff --git a/execution/uc-workload-generator/workloadGenerator.yaml b/execution/uc-workload-generator/workloadGenerator.yaml index 794468b18dc74ca09872577b5b3c115605bd4620..146e285f66d4c0e1a88d613e4ac2d5571234fad6 100644 --- a/execution/uc-workload-generator/workloadGenerator.yaml +++ b/execution/uc-workload-generator/workloadGenerator.yaml @@ -16,23 +16,22 @@ spec: containers: - name: workload-generator image: workload-generator:latest + ports: + - containerPort: 5701 + name: coordination env: # Order need to be preserved for run_uc.py - name: NUM_SENSORS value: "25000" - - name: INSTANCES - value: "1" - name: NUM_NESTED_GROUPS value: "5" - - name: ZK_HOST - value: "my-confluent-cp-zookeeper" - - name: ZK_PORT - value: "2181" + - name: KUBERNETES_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: KUBERNETES_DNS_NAME + value: "titan-ccp-load-generator.$(KUBERNETES_NAMESPACE).svc.cluster.local" - name: KAFKA_BOOTSTRAP_SERVERS value: "my-confluent-cp-kafka:9092" - name: SCHEMA_REGISTRY_URL value: "http://my-confluent-cp-schema-registry:8081" - - name: POD_NAME - valueFrom: - fieldRef: - fieldPath: metadata.name