diff --git a/benchmarks/uc1-workload-generator/.settings/org.eclipse.jdt.ui.prefs b/benchmarks/uc1-workload-generator/.settings/org.eclipse.jdt.ui.prefs index 4e04e2891754324a6e1bf55348b6a38f592bb301..4d01df75552c562406705858b6368ecf59d6e82f 100644 --- a/benchmarks/uc1-workload-generator/.settings/org.eclipse.jdt.ui.prefs +++ b/benchmarks/uc1-workload-generator/.settings/org.eclipse.jdt.ui.prefs @@ -32,7 +32,7 @@ cleanup.qualify_static_member_accesses_with_declaring_class=true cleanup.qualify_static_method_accesses_with_declaring_class=false cleanup.remove_private_constructors=true cleanup.remove_redundant_modifiers=false -cleanup.remove_redundant_semicolons=false +cleanup.remove_redundant_semicolons=true cleanup.remove_redundant_type_arguments=true cleanup.remove_trailing_whitespaces=true cleanup.remove_trailing_whitespaces_all=true @@ -66,6 +66,7 @@ org.eclipse.jdt.ui.ignorelowercasenames=true org.eclipse.jdt.ui.importorder=; org.eclipse.jdt.ui.ondemandthreshold=99 org.eclipse.jdt.ui.staticondemandthreshold=99 +org.eclipse.jdt.ui.text.custom_code_templates= sp_cleanup.add_default_serial_version_id=true sp_cleanup.add_generated_serial_version_id=false sp_cleanup.add_missing_annotations=true diff --git a/benchmarks/uc1-workload-generator/src/main/java/theodolite/uc1/workloadgenerator/LoadGenerator.java b/benchmarks/uc1-workload-generator/src/main/java/theodolite/uc1/workloadgenerator/LoadGenerator.java index 1d9e85bd07bf5e47eca979e2bec0d5524f3a6a83..a5dc622a5102aaa33eb1604f1bf7bfa8a3b61680 100644 --- a/benchmarks/uc1-workload-generator/src/main/java/theodolite/uc1/workloadgenerator/LoadGenerator.java +++ b/benchmarks/uc1-workload-generator/src/main/java/theodolite/uc1/workloadgenerator/LoadGenerator.java @@ -1,19 +1,8 @@ package theodolite.uc1.workloadgenerator; import java.io.IOException; -import java.time.Duration; -import java.time.temporal.ChronoUnit; -import java.util.Objects; -import java.util.Properties; -import org.apache.kafka.clients.producer.ProducerConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import theodolite.commons.workloadgeneration.KeySpace; -import theodolite.commons.workloadgeneration.communication.kafka.KafkaRecordSender; -import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGenerator; -import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGeneratorBuilder; -import theodolite.commons.workloadgeneration.misc.ZooKeeper; -import titan.ccp.model.records.ActivePowerRecord; /** * Load Generator for UC1. @@ -22,73 +11,13 @@ public final class LoadGenerator { private static final Logger LOGGER = LoggerFactory.getLogger(LoadGenerator.class); - private static final long MAX_DURATION_IN_DAYS = 30L; - private LoadGenerator() {} /** - * Entry point. + * Start load generator for use case UC1. */ public static void main(final String[] args) throws InterruptedException, IOException { - // uc1 LOGGER.info("Start workload generator for use case UC1."); - - // get environment variables - final String zooKeeperHost = Objects.requireNonNullElse(System.getenv("ZK_HOST"), "localhost"); - final int zooKeeperPort = - Integer.parseInt(Objects.requireNonNullElse(System.getenv("ZK_PORT"), "2181")); - final int numSensors = - Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_SENSORS"), "10")); - final int periodMs = - Integer.parseInt(Objects.requireNonNullElse(System.getenv("PERIOD_MS"), "1000")); - final double value = - Double.parseDouble(Objects.requireNonNullElse(System.getenv("VALUE"), "10")); - final int threads = Integer.parseInt(Objects.requireNonNullElse(System.getenv("THREADS"), - "4")); - final String kafkaBootstrapServers = - Objects.requireNonNullElse(System.getenv("KAFKA_BOOTSTRAP_SERVERS"), "localhost:9092"); - final String schemaRegistryUrl = - Objects.requireNonNullElse(System.getenv("SCHEMA_REGISTRY_URL"), "http://localhost:8091"); - final String kafkaInputTopic = - Objects.requireNonNullElse(System.getenv("KAFKA_INPUT_TOPIC"), "input"); - final String kafkaBatchSize = System.getenv("KAFKA_BATCH_SIZE"); - final String kafkaLingerMs = System.getenv("KAFKA_LINGER_MS"); - final String kafkaBufferMemory = System.getenv("KAFKA_BUFFER_MEMORY"); - final int instances = - Integer.parseInt(Objects.requireNonNullElse(System.getenv("INSTANCES"), "1")); - - // create kafka record sender - final Properties kafkaProperties = new Properties(); - // kafkaProperties.put("acks", this.acknowledges); - kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, (k, v) -> kafkaBatchSize); - kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG, (k, v) -> kafkaLingerMs); - kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG, (k, v) -> kafkaBufferMemory); - - final KafkaRecordSender<ActivePowerRecord> kafkaRecordSender = - KafkaRecordSender.<ActivePowerRecord>builder( - kafkaBootstrapServers, - kafkaInputTopic, - schemaRegistryUrl) - .keyAccessor(r -> r.getIdentifier()) - .timestampAccessor(r -> r.getTimestamp()) - .defaultProperties(kafkaProperties) - .build(); - - // create workload generator - final KafkaWorkloadGenerator<ActivePowerRecord> workloadGenerator = - KafkaWorkloadGeneratorBuilder.<ActivePowerRecord>builder() - .instances(instances) - .keySpace(new KeySpace("s_", numSensors)) - .threads(threads) - .period(Duration.of(periodMs, ChronoUnit.MILLIS)) - .duration(Duration.of(MAX_DURATION_IN_DAYS, ChronoUnit.DAYS)) - .generatorFunction( - sensor -> new ActivePowerRecord(sensor, System.currentTimeMillis(), value)) - .zooKeeper(new ZooKeeper(zooKeeperHost, zooKeeperPort)) - .kafkaRecordSender(kafkaRecordSender) - .build(); - - // start - workloadGenerator.start(); + theodolite.commons.workloadgeneration.LoadGenerator.fromEnvironment().run(); } }