diff --git a/uc1-workload-generator/src/main/java/theodolite/uc1/workloadgenerator/LoadGenerator.java b/uc1-workload-generator/src/main/java/theodolite/uc1/workloadgenerator/LoadGenerator.java index 381aef34dd42cc3cac9908480719a98fc55f3a27..842a668c1bc10fd63db738362b3babe3e2c63293 100644 --- a/uc1-workload-generator/src/main/java/theodolite/uc1/workloadgenerator/LoadGenerator.java +++ b/uc1-workload-generator/src/main/java/theodolite/uc1/workloadgenerator/LoadGenerator.java @@ -13,7 +13,7 @@ import theodolite.commons.workloadgeneration.dimensions.KeySpace; import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGenerator; import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGeneratorBuilder; import theodolite.commons.workloadgeneration.misc.ZooKeeper; -import titan.ccp.models.records.ActivePowerRecord; +import titan.ccp.model.records.ActivePowerRecord; /** * Load Generator for UC1. @@ -41,11 +41,14 @@ public final class LoadGenerator { Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_SENSORS"), "10")); final int periodMs = Integer.parseInt(Objects.requireNonNullElse(System.getenv("PERIOD_MS"), "1000")); - final int value = Integer.parseInt(Objects.requireNonNullElse(System.getenv("VALUE"), "10")); + final double value = + Double.parseDouble(Objects.requireNonNullElse(System.getenv("VALUE"), "10")); final int threads = Integer.parseInt(Objects.requireNonNullElse(System.getenv("THREADS"), "4")); final String kafkaBootstrapServers = Objects.requireNonNullElse(System.getenv("KAFKA_BOOTSTRAP_SERVERS"), "localhost:9092"); + final String schemaRegistryUrl = + Objects.requireNonNullElse(System.getenv("SCHEMA_REGISTRY_URL"), "http://localhost:8091"); final String kafkaInputTopic = Objects.requireNonNullElse(System.getenv("KAFKA_INPUT_TOPIC"), "input"); final String kafkaBatchSize = System.getenv("KAFKA_BATCH_SIZE"); @@ -60,12 +63,16 @@ public final class LoadGenerator { kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, (k, v) -> kafkaBatchSize); kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG, (k, v) -> kafkaLingerMs); kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG, (k, v) -> kafkaBufferMemory); - final KafkaRecordSender<ActivePowerRecord> kafkaRecordSender = new KafkaRecordSender<>( - kafkaBootstrapServers, - kafkaInputTopic, - r -> r.getIdentifier(), - r -> r.getTimestamp(), - kafkaProperties); + + final KafkaRecordSender<ActivePowerRecord> kafkaRecordSender = + new KafkaRecordSender.Builder<ActivePowerRecord>( + kafkaBootstrapServers, + kafkaInputTopic, + schemaRegistryUrl) + .keyAccessor(r -> r.getIdentifier()) + .timestampAccessor(r -> r.getTimestamp()) + .defaultProperties(kafkaProperties) + .build(); // create workload generator final KafkaWorkloadGenerator<ActivePowerRecord> workloadGenerator = diff --git a/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/LoadGenerator.java b/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/LoadGenerator.java index 6cf7d81af21545b288c9bc24177575e6966b95de..eadb4ccd293929d0d46383813e1e98dffebc47a5 100644 --- a/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/LoadGenerator.java +++ b/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/LoadGenerator.java @@ -14,9 +14,9 @@ import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGenerator; import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGeneratorBuilder; import theodolite.commons.workloadgeneration.misc.ZooKeeper; import titan.ccp.configuration.events.Event; +import titan.ccp.model.records.ActivePowerRecord; import titan.ccp.model.sensorregistry.MutableAggregatedSensor; import titan.ccp.model.sensorregistry.MutableSensorRegistry; -import titan.ccp.models.records.ActivePowerRecord; public class LoadGenerator { @@ -39,13 +39,16 @@ public class LoadGenerator { Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_SENSORS"), "1")); final int periodMs = Integer.parseInt(Objects.requireNonNullElse(System.getenv("PERIOD_MS"), "1000")); - final int value = Integer.parseInt(Objects.requireNonNullElse(System.getenv("VALUE"), "10")); + final double value = + Double.parseDouble(Objects.requireNonNullElse(System.getenv("VALUE"), "10")); final boolean sendRegistry = Boolean .parseBoolean(Objects.requireNonNullElse(System.getenv("SEND_REGISTRY"), "true")); final int threads = Integer.parseInt(Objects.requireNonNullElse(System.getenv("THREADS"), "4")); final String kafkaBootstrapServers = Objects.requireNonNullElse(System.getenv("KAFKA_BOOTSTRAP_SERVERS"), "localhost:9092"); + final String schemaRegistryUrl = + Objects.requireNonNullElse(System.getenv("SCHEMA_REGISTRY_URL"), "http://localhost:8091"); final String kafkaInputTopic = Objects.requireNonNullElse(System.getenv("KAFKA_INPUT_TOPIC"), "input"); final String kafkaBatchSize = System.getenv("KAFKA_BATCH_SIZE"); @@ -64,9 +67,16 @@ public class LoadGenerator { kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, (k, v) -> kafkaBatchSize); kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG, (k, v) -> kafkaLingerMs); kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG, (k, v) -> kafkaBufferMemory); + final KafkaRecordSender<ActivePowerRecord> kafkaRecordSender = - new KafkaRecordSender<>(kafkaBootstrapServers, - kafkaInputTopic, r -> r.getIdentifier(), r -> r.getTimestamp(), kafkaProperties); + new KafkaRecordSender.Builder<ActivePowerRecord>( + kafkaBootstrapServers, + kafkaInputTopic, + schemaRegistryUrl) + .keyAccessor(r -> r.getIdentifier()) + .timestampAccessor(r -> r.getTimestamp()) + .defaultProperties(kafkaProperties) + .build(); // create workload generator final KafkaWorkloadGenerator<ActivePowerRecord> workloadGenerator = diff --git a/uc3-workload-generator/src/main/java/theodolite/uc3/workloadgenerator/LoadGenerator.java b/uc3-workload-generator/src/main/java/theodolite/uc3/workloadgenerator/LoadGenerator.java index 80e3810f9d9a8e872d44f794e7a3f29ce8a3b2e0..5c7441845d83c9b36e0af3de09d2d688b2f8cbaf 100644 --- a/uc3-workload-generator/src/main/java/theodolite/uc3/workloadgenerator/LoadGenerator.java +++ b/uc3-workload-generator/src/main/java/theodolite/uc3/workloadgenerator/LoadGenerator.java @@ -13,7 +13,7 @@ import theodolite.commons.workloadgeneration.dimensions.KeySpace; import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGenerator; import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGeneratorBuilder; import theodolite.commons.workloadgeneration.misc.ZooKeeper; -import titan.ccp.models.records.ActivePowerRecord; +import titan.ccp.model.records.ActivePowerRecord; public class LoadGenerator { @@ -33,11 +33,14 @@ public class LoadGenerator { Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_SENSORS"), "10")); final int periodMs = Integer.parseInt(Objects.requireNonNullElse(System.getenv("PERIOD_MS"), "1000")); - final int value = Integer.parseInt(Objects.requireNonNullElse(System.getenv("VALUE"), "10")); + final double value = + Double.parseDouble(Objects.requireNonNullElse(System.getenv("VALUE"), "10")); final int threads = Integer.parseInt(Objects.requireNonNullElse(System.getenv("THREADS"), "4")); final String kafkaBootstrapServers = Objects.requireNonNullElse(System.getenv("KAFKA_BOOTSTRAP_SERVERS"), "localhost:9092"); + final String schemaRegistryUrl = + Objects.requireNonNullElse(System.getenv("SCHEMA_REGISTRY_URL"), "http://localhost:8091"); final String kafkaInputTopic = Objects.requireNonNullElse(System.getenv("KAFKA_INPUT_TOPIC"), "input"); final String kafkaBatchSize = System.getenv("KAFKA_BATCH_SIZE"); @@ -53,8 +56,14 @@ public class LoadGenerator { kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG, (k, v) -> kafkaLingerMs); kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG, (k, v) -> kafkaBufferMemory); final KafkaRecordSender<ActivePowerRecord> kafkaRecordSender = - new KafkaRecordSender<>(kafkaBootstrapServers, - kafkaInputTopic, r -> r.getIdentifier(), r -> r.getTimestamp(), kafkaProperties); + new KafkaRecordSender.Builder<ActivePowerRecord>( + kafkaBootstrapServers, + kafkaInputTopic, + schemaRegistryUrl) + .keyAccessor(r -> r.getIdentifier()) + .timestampAccessor(r -> r.getTimestamp()) + .defaultProperties(kafkaProperties) + .build(); // create workload generator final KafkaWorkloadGenerator<ActivePowerRecord> workloadGenerator = diff --git a/uc4-workload-generator/src/main/java/theodolite/uc4/workloadgenerator/LoadGenerator.java b/uc4-workload-generator/src/main/java/theodolite/uc4/workloadgenerator/LoadGenerator.java index 84df87a6a7a55b3b001db8037ca156d9b28fd39c..14e9e589dc5997b71b2d1437f1a7e8a78936a2bf 100644 --- a/uc4-workload-generator/src/main/java/theodolite/uc4/workloadgenerator/LoadGenerator.java +++ b/uc4-workload-generator/src/main/java/theodolite/uc4/workloadgenerator/LoadGenerator.java @@ -13,7 +13,7 @@ import theodolite.commons.workloadgeneration.dimensions.KeySpace; import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGenerator; import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGeneratorBuilder; import theodolite.commons.workloadgeneration.misc.ZooKeeper; -import titan.ccp.models.records.ActivePowerRecord; +import titan.ccp.model.records.ActivePowerRecord; public class LoadGenerator { @@ -33,11 +33,14 @@ public class LoadGenerator { Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_SENSORS"), "10")); final int periodMs = Integer.parseInt(Objects.requireNonNullElse(System.getenv("PERIOD_MS"), "1000")); - final int value = Integer.parseInt(Objects.requireNonNullElse(System.getenv("VALUE"), "10")); + final double value = + Double.parseDouble(Objects.requireNonNullElse(System.getenv("VALUE"), "10")); final int threads = Integer.parseInt(Objects.requireNonNullElse(System.getenv("THREADS"), "1")); final String kafkaBootstrapServers = Objects.requireNonNullElse(System.getenv("KAFKA_BOOTSTRAP_SERVERS"), "localhost:9092"); + final String schemaRegistryUrl = + Objects.requireNonNullElse(System.getenv("SCHEMA_REGISTRY_URL"), "http://localhost:8091"); final String kafkaInputTopic = Objects.requireNonNullElse(System.getenv("KAFKA_INPUT_TOPIC"), "input"); final String kafkaBatchSize = System.getenv("KAFKA_BATCH_SIZE"); @@ -52,9 +55,16 @@ public class LoadGenerator { kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, (k, v) -> kafkaBatchSize); kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG, (k, v) -> kafkaLingerMs); kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG, (k, v) -> kafkaBufferMemory); + final KafkaRecordSender<ActivePowerRecord> kafkaRecordSender = - new KafkaRecordSender<>(kafkaBootstrapServers, - kafkaInputTopic, r -> r.getIdentifier(), r -> r.getTimestamp(), kafkaProperties); + new KafkaRecordSender.Builder<ActivePowerRecord>( + kafkaBootstrapServers, + kafkaInputTopic, + schemaRegistryUrl) + .keyAccessor(r -> r.getIdentifier()) + .timestampAccessor(r -> r.getTimestamp()) + .defaultProperties(kafkaProperties) + .build(); // create workload generator final KafkaWorkloadGenerator<ActivePowerRecord> workloadGenerator =