Skip to content
Snippets Groups Projects
Commit df353559 authored by Sören Henning's avatar Sören Henning
Browse files

Migrate use case UC1 to new load generator

parent 8527d2d8
No related branches found
No related tags found
No related merge requests found
...@@ -32,7 +32,7 @@ cleanup.qualify_static_member_accesses_with_declaring_class=true ...@@ -32,7 +32,7 @@ cleanup.qualify_static_member_accesses_with_declaring_class=true
cleanup.qualify_static_method_accesses_with_declaring_class=false cleanup.qualify_static_method_accesses_with_declaring_class=false
cleanup.remove_private_constructors=true cleanup.remove_private_constructors=true
cleanup.remove_redundant_modifiers=false cleanup.remove_redundant_modifiers=false
cleanup.remove_redundant_semicolons=false cleanup.remove_redundant_semicolons=true
cleanup.remove_redundant_type_arguments=true cleanup.remove_redundant_type_arguments=true
cleanup.remove_trailing_whitespaces=true cleanup.remove_trailing_whitespaces=true
cleanup.remove_trailing_whitespaces_all=true cleanup.remove_trailing_whitespaces_all=true
...@@ -66,6 +66,7 @@ org.eclipse.jdt.ui.ignorelowercasenames=true ...@@ -66,6 +66,7 @@ org.eclipse.jdt.ui.ignorelowercasenames=true
org.eclipse.jdt.ui.importorder=; org.eclipse.jdt.ui.importorder=;
org.eclipse.jdt.ui.ondemandthreshold=99 org.eclipse.jdt.ui.ondemandthreshold=99
org.eclipse.jdt.ui.staticondemandthreshold=99 org.eclipse.jdt.ui.staticondemandthreshold=99
org.eclipse.jdt.ui.text.custom_code_templates=
sp_cleanup.add_default_serial_version_id=true sp_cleanup.add_default_serial_version_id=true
sp_cleanup.add_generated_serial_version_id=false sp_cleanup.add_generated_serial_version_id=false
sp_cleanup.add_missing_annotations=true sp_cleanup.add_missing_annotations=true
......
package theodolite.uc1.workloadgenerator; package theodolite.uc1.workloadgenerator;
import java.io.IOException; import java.io.IOException;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Objects;
import java.util.Properties;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import theodolite.commons.workloadgeneration.KeySpace;
import theodolite.commons.workloadgeneration.communication.kafka.KafkaRecordSender;
import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGenerator;
import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGeneratorBuilder;
import theodolite.commons.workloadgeneration.misc.ZooKeeper;
import titan.ccp.model.records.ActivePowerRecord;
/** /**
* Load Generator for UC1. * Load Generator for UC1.
...@@ -22,73 +11,13 @@ public final class LoadGenerator { ...@@ -22,73 +11,13 @@ public final class LoadGenerator {
private static final Logger LOGGER = LoggerFactory.getLogger(LoadGenerator.class); private static final Logger LOGGER = LoggerFactory.getLogger(LoadGenerator.class);
private static final long MAX_DURATION_IN_DAYS = 30L;
private LoadGenerator() {} private LoadGenerator() {}
/** /**
* Entry point. * Start load generator for use case UC1.
*/ */
public static void main(final String[] args) throws InterruptedException, IOException { public static void main(final String[] args) throws InterruptedException, IOException {
// uc1
LOGGER.info("Start workload generator for use case UC1."); LOGGER.info("Start workload generator for use case UC1.");
theodolite.commons.workloadgeneration.LoadGenerator.fromEnvironment().run();
// get environment variables
final String zooKeeperHost = Objects.requireNonNullElse(System.getenv("ZK_HOST"), "localhost");
final int zooKeeperPort =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("ZK_PORT"), "2181"));
final int numSensors =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_SENSORS"), "10"));
final int periodMs =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("PERIOD_MS"), "1000"));
final double value =
Double.parseDouble(Objects.requireNonNullElse(System.getenv("VALUE"), "10"));
final int threads = Integer.parseInt(Objects.requireNonNullElse(System.getenv("THREADS"),
"4"));
final String kafkaBootstrapServers =
Objects.requireNonNullElse(System.getenv("KAFKA_BOOTSTRAP_SERVERS"), "localhost:9092");
final String schemaRegistryUrl =
Objects.requireNonNullElse(System.getenv("SCHEMA_REGISTRY_URL"), "http://localhost:8091");
final String kafkaInputTopic =
Objects.requireNonNullElse(System.getenv("KAFKA_INPUT_TOPIC"), "input");
final String kafkaBatchSize = System.getenv("KAFKA_BATCH_SIZE");
final String kafkaLingerMs = System.getenv("KAFKA_LINGER_MS");
final String kafkaBufferMemory = System.getenv("KAFKA_BUFFER_MEMORY");
final int instances =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("INSTANCES"), "1"));
// create kafka record sender
final Properties kafkaProperties = new Properties();
// kafkaProperties.put("acks", this.acknowledges);
kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, (k, v) -> kafkaBatchSize);
kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG, (k, v) -> kafkaLingerMs);
kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG, (k, v) -> kafkaBufferMemory);
final KafkaRecordSender<ActivePowerRecord> kafkaRecordSender =
KafkaRecordSender.<ActivePowerRecord>builder(
kafkaBootstrapServers,
kafkaInputTopic,
schemaRegistryUrl)
.keyAccessor(r -> r.getIdentifier())
.timestampAccessor(r -> r.getTimestamp())
.defaultProperties(kafkaProperties)
.build();
// create workload generator
final KafkaWorkloadGenerator<ActivePowerRecord> workloadGenerator =
KafkaWorkloadGeneratorBuilder.<ActivePowerRecord>builder()
.instances(instances)
.keySpace(new KeySpace("s_", numSensors))
.threads(threads)
.period(Duration.of(periodMs, ChronoUnit.MILLIS))
.duration(Duration.of(MAX_DURATION_IN_DAYS, ChronoUnit.DAYS))
.generatorFunction(
sensor -> new ActivePowerRecord(sensor, System.currentTimeMillis(), value))
.zooKeeper(new ZooKeeper(zooKeeperHost, zooKeeperPort))
.kafkaRecordSender(kafkaRecordSender)
.build();
// start
workloadGenerator.start();
} }
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment