Skip to content
Snippets Groups Projects
Commit d80f620d authored by Sören Henning's avatar Sören Henning
Browse files

Add distributed load generation for UC3

parent ad1cdc09
No related branches found
No related tags found
No related merge requests found
......@@ -24,14 +24,11 @@ public class LoadGenerator {
private static final int WL_MAX_RECORDS = 150_000;
public static void main(final String[] args) throws InterruptedException, IOException {
// uc1
LOGGER.info("Start workload generator for use case UC1.");
final int numSensors =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_SENSORS"), "10"));
final int instanceId = getInstanceId();
final int instances =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("INSTANCES"), "1"));
final int periodMs =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("PERIOD_MS"), "1000"));
final int value = Integer.parseInt(Objects.requireNonNullElse(System.getenv("VALUE"), "10"));
......
......@@ -8,25 +8,27 @@ import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import kafkaSender.KafkaRecordSender;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import titan.ccp.model.sensorregistry.MutableAggregatedSensor;
import titan.ccp.model.sensorregistry.MutableSensorRegistry;
import titan.ccp.models.records.ActivePowerRecord;
public class LoadGenerator {
private static final Logger LOGGER = LoggerFactory.getLogger(LoadGenerator.class);
private static final int WL_MAX_RECORDS = 150_000;
public static void main(final String[] args) throws InterruptedException, IOException {
// uc3
LOGGER.info("Start workload generator for use case UC3.");
final int numSensor =
final int numSensors =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_SENSORS"), "10"));
final int instanceId = getInstanceId();
final int periodMs =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("PERIOD_MS"), "1000"));
final int value = Integer.parseInt(Objects.requireNonNullElse(System.getenv("VALUE"), "10"));
......@@ -40,14 +42,12 @@ public class LoadGenerator {
final String kafkaLingerMs = System.getenv("KAFKA_LINGER_MS");
final String kafkaBufferMemory = System.getenv("KAFKA_BUFFER_MEMORY");
// create sensorRegistry
// TODO replace as in UC1 or UC4
final MutableSensorRegistry sensorRegistry = new MutableSensorRegistry("group_lvl_0");
addChildrens(sensorRegistry.getTopLevelSensor(), numSensor, 0);
final List<String> sensors =
sensorRegistry.getMachineSensors().stream().map(s -> s.getIdentifier())
.collect(Collectors.toList());
final int idStart = instanceId * WL_MAX_RECORDS;
final int idEnd = Math.min((instanceId + 1) * WL_MAX_RECORDS, numSensors);
LOGGER.info("Generating data for sensors with IDs from {} to {} (exclusive).", idStart, idEnd);
final List<String> sensors = IntStream.range(idStart, idEnd)
.mapToObj(i -> "s_" + i)
.collect(Collectors.toList());
final Properties kafkaProperties = new Properties();
// kafkaProperties.put("acks", this.acknowledges);
......@@ -75,11 +75,16 @@ public class LoadGenerator {
}
private static void addChildrens(final MutableAggregatedSensor parent, final int numChildren,
int nextId) {
for (int c = 0; c < numChildren; c++) {
parent.addChildMachineSensor("s_" + nextId);
nextId++;
private static int getInstanceId() {
final String podName = System.getenv("POD_NAME");
if (podName == null) {
return 0;
} else {
return Pattern.compile("-")
.splitAsStream(podName)
.reduce((p, x) -> x)
.map(Integer::parseInt)
.orElse(0);
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment