Skip to content
Snippets Groups Projects
Commit e5b43286 authored by Sören Henning's avatar Sören Henning
Browse files

Clean up load generator for use case 4

parent 67652eba
No related branches found
No related tags found
No related merge requests found
Pipeline #364 passed
...@@ -9,79 +9,57 @@ import java.util.concurrent.Executors; ...@@ -9,79 +9,57 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.IntStream;
import kafkaSender.KafkaRecordSender; import kafkaSender.KafkaRecordSender;
import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerConfig;
import titan.ccp.configuration.events.Event;
import titan.ccp.model.sensorregistry.MutableAggregatedSensor;
import titan.ccp.model.sensorregistry.MutableSensorRegistry;
import titan.ccp.models.records.ActivePowerRecord; import titan.ccp.models.records.ActivePowerRecord;
public class LoadGenerator { public class LoadGenerator {
public static void main(final String[] args) throws InterruptedException, IOException { public static void main(final String[] args) throws InterruptedException, IOException {
// uc1 // uc4
final int numSensor = Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_SENSORS"), "10")); final int numSensor =
final int periodMs = Integer.parseInt(Objects.requireNonNullElse(System.getenv("PERIOD_MS"), "1000")); Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_SENSORS"), "10"));
final int value = Integer.parseInt(Objects.requireNonNullElse(System.getenv("VALUE"), "10")); final int periodMs =
final boolean sendRegistry = Boolean Integer.parseInt(Objects.requireNonNullElse(System.getenv("PERIOD_MS"), "1000"));
.parseBoolean(Objects.requireNonNullElse(System.getenv("SEND_REGISTRY"), "true")); final int value = Integer.parseInt(Objects.requireNonNullElse(System.getenv("VALUE"), "10"));
final int threads = Integer.parseInt(Objects.requireNonNullElse(System.getenv("THREADS"), "4")); final int threads = Integer.parseInt(Objects.requireNonNullElse(System.getenv("THREADS"), "1"));
final String kafkaBootstrapServers = Objects.requireNonNullElse(System.getenv("KAFKA_BOOTSTRAP_SERVERS"), final String kafkaBootstrapServers =
"localhost:9092"); Objects.requireNonNullElse(System.getenv("KAFKA_BOOTSTRAP_SERVERS"),
final String kafkaInputTopic = Objects.requireNonNullElse(System.getenv("KAFKA_INPUT_TOPIC"), "input"); "localhost:9092");
final String kafkaBatchSize = System.getenv("KAFKA_BATCH_SIZE"); final String kafkaInputTopic =
final String kafkaLingerMs = System.getenv("KAFKA_LINGER_MS"); Objects.requireNonNullElse(System.getenv("KAFKA_INPUT_TOPIC"), "input");
final String kafkaBufferMemory = System.getenv("KAFKA_BUFFER_MEMORY"); final String kafkaBatchSize = System.getenv("KAFKA_BATCH_SIZE");
final String kafkaLingerMs = System.getenv("KAFKA_LINGER_MS");
final String kafkaBufferMemory = System.getenv("KAFKA_BUFFER_MEMORY");
// create sensorRegistry final Properties kafkaProperties = new Properties();
final MutableSensorRegistry sensorRegistry = new MutableSensorRegistry("group_lvl_0"); // kafkaProperties.put("acks", this.acknowledges);
addChildrens(sensorRegistry.getTopLevelSensor(), numSensor, 0); kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, (k, v) -> kafkaBatchSize);
kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG, (k, v) -> kafkaLingerMs);
kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG, (k, v) -> kafkaBufferMemory);
final KafkaRecordSender<ActivePowerRecord> kafkaRecordSender =
new KafkaRecordSender<>(kafkaBootstrapServers,
kafkaInputTopic, r -> r.getIdentifier(), r -> r.getTimestamp(), kafkaProperties);
final List<String> sensors = sensorRegistry.getMachineSensors().stream().map(s -> s.getIdentifier()) final ScheduledExecutorService executor = Executors.newScheduledThreadPool(threads);
.collect(Collectors.toList()); final Random random = new Random();
// TODO Brauchen wir das ? final List<String> sensors =
if (sendRegistry) { IntStream.range(0, numSensor).mapToObj(i -> "s_" + i).collect(Collectors.toList());
final ConfigPublisher configPublisher = new ConfigPublisher(kafkaBootstrapServers, "configuration");
configPublisher.publish(Event.SENSOR_REGISTRY_CHANGED, sensorRegistry.toJson());
configPublisher.close();
System.out.println("Configuration sent.");
System.out.println("Now wait 30 seconds"); for (final String sensor : sensors) {
Thread.sleep(30_000); final int initialDelay = random.nextInt(periodMs);
System.out.println("And woke up again :)"); executor.scheduleAtFixedRate(() -> {
} kafkaRecordSender.write(new ActivePowerRecord(sensor, System.currentTimeMillis(), value));
}, initialDelay, periodMs, TimeUnit.MILLISECONDS);
}
final Properties kafkaProperties = new Properties(); System.out.println("Wait for termination...");
// kafkaProperties.put("acks", this.acknowledges); executor.awaitTermination(30, TimeUnit.DAYS);
kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, (k, v) -> kafkaBatchSize); System.out.println("Will terminate now");
kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG, (k, v) -> kafkaLingerMs);
kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG, (k, v) -> kafkaBufferMemory);
final KafkaRecordSender<ActivePowerRecord> kafkaRecordSender = new KafkaRecordSender<>(kafkaBootstrapServers,
kafkaInputTopic, r -> r.getIdentifier(), r -> r.getTimestamp(), kafkaProperties);
final ScheduledExecutorService executor = Executors.newScheduledThreadPool(threads); }
final Random random = new Random();
for (final String sensor : sensors) {
final int initialDelay = random.nextInt(periodMs);
executor.scheduleAtFixedRate(() -> {
kafkaRecordSender.write(new ActivePowerRecord(sensor, System.currentTimeMillis(), value));
}, initialDelay, periodMs, TimeUnit.MILLISECONDS);
}
System.out.println("Wait for termination...");
executor.awaitTermination(30, TimeUnit.DAYS);
System.out.println("Will terminate now");
}
private static void addChildrens(final MutableAggregatedSensor parent, final int numChildren, int nextId) {
for (int c = 0; c < numChildren; c++) {
parent.addChildMachineSensor("s_" + nextId);
nextId++;
}
}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment