Skip to content
Snippets Groups Projects
Commit 06fe4973 authored by Sören Henning's avatar Sören Henning
Browse files

Set KeySpace correctly for benchmark UC2

parent 5aed17b7
Branches
Tags
1 merge request!33Fix workload generation for benchmark UC2
Pipeline #829 passed
......@@ -15,8 +15,7 @@ import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGeneratorBu
import theodolite.commons.workloadgeneration.misc.ZooKeeper;
import titan.ccp.configuration.events.Event;
import titan.ccp.model.records.ActivePowerRecord;
import titan.ccp.model.sensorregistry.MutableAggregatedSensor;
import titan.ccp.model.sensorregistry.MutableSensorRegistry;
import titan.ccp.model.sensorregistry.SensorRegistry;
/**
* The {@code LoadGenerator} creates a load in Kafka.
......@@ -48,7 +47,11 @@ public final class LoadGenerator {
LOGGER.info("Start workload generator for use case UC2.");
// get environment variables
final String hierarchy = Objects.requireNonNullElse(System.getenv("HIERARCHY"), DEEP);
final String hierarchy = System.getenv("HIERARCHY");
if (hierarchy != null && hierarchy.equals(DEEP)) {
LOGGER.error(
"The HIERARCHY parameter is no longer supported. Creating a full hierachy instead.");
}
final int numNestedGroups = Integer
.parseInt(Objects.requireNonNullElse(System.getenv("NUM_NESTED_GROUPS"), "1"));
final String zooKeeperHost = Objects.requireNonNullElse(System.getenv("ZK_HOST"), "localhost");
......@@ -77,8 +80,8 @@ public final class LoadGenerator {
Integer.parseInt(Objects.requireNonNullElse(System.getenv("INSTANCES"), "1"));
// build sensor registry
final MutableSensorRegistry sensorRegistry =
buildSensorRegistry(hierarchy, numNestedGroups, numSensors);
final SensorRegistry sensorRegistry =
new SensorRegistryBuilder(numNestedGroups, numSensors).build();
// create kafka record sender
final Properties kafkaProperties = new Properties();
......@@ -101,7 +104,7 @@ public final class LoadGenerator {
final KafkaWorkloadGenerator<ActivePowerRecord> workloadGenerator =
KafkaWorkloadGeneratorBuilder.<ActivePowerRecord>builder()
.instances(instances)
.keySpace(new KeySpace("s_", numSensors))
.keySpace(new KeySpace("s_", sensorRegistry.getMachineSensors().size()))
.threads(threads)
.period(Duration.of(periodMs, ChronoUnit.MILLIS))
.duration(Duration.of(MAX_DURATION_IN_DAYS, ChronoUnit.DAYS))
......@@ -133,42 +136,4 @@ public final class LoadGenerator {
workloadGenerator.start();
}
private static MutableSensorRegistry buildSensorRegistry(
final String hierarchy,
final int numNestedGroups,
final int numSensors) {
final MutableSensorRegistry sensorRegistry = new MutableSensorRegistry("group_lvl_0");
if (DEEP.equals(hierarchy)) {
MutableAggregatedSensor lastSensor = sensorRegistry.getTopLevelSensor();
for (int lvl = 1; lvl < numNestedGroups; lvl++) {
lastSensor = lastSensor.addChildAggregatedSensor("group_lvl_" + lvl);
}
for (int s = 0; s < numSensors; s++) {
lastSensor.addChildMachineSensor("sensor_" + s);
}
} else if ("full".equals(hierarchy)) {
addChildren(sensorRegistry.getTopLevelSensor(), numSensors, 1, numNestedGroups, 0);
} else {
throw new IllegalStateException();
}
return sensorRegistry;
}
private static int addChildren(final MutableAggregatedSensor parent, final int numChildren,
final int lvl, final int maxLvl, final int startId) {
int nextId = startId;
for (int c = 0; c < numChildren; c++) {
if (lvl == maxLvl) {
parent.addChildMachineSensor("s_" + nextId);
nextId++;
} else {
final MutableAggregatedSensor newParent =
parent.addChildAggregatedSensor("g_" + lvl + '_' + nextId);
nextId++;
nextId = addChildren(newParent, numChildren, lvl + 1, maxLvl, nextId);
}
}
return nextId;
}
}
package theodolite.uc2.workloadgenerator;
import titan.ccp.model.sensorregistry.MutableAggregatedSensor;
import titan.ccp.model.sensorregistry.MutableSensorRegistry;
import titan.ccp.model.sensorregistry.SensorRegistry;
/**
* Builder for creating a nested {@link SensorRegistry} with {@code numNestedGroups} levels and
* {@code numSensors} children per group.
*/
public final class SensorRegistryBuilder {
private final int numNestedGroups;
private final int numSensors;
public SensorRegistryBuilder(final int numNestedGroups, final int numSensors) {
this.numNestedGroups = numNestedGroups;
this.numSensors = numSensors;
}
/**
* Creates the {@link SensorRegistry}.
*/
public SensorRegistry build() {
final MutableSensorRegistry sensorRegistry = new MutableSensorRegistry("group_lvl_0");
this.addChildren(
sensorRegistry.getTopLevelSensor(),
this.numSensors,
1,
this.numNestedGroups,
0);
return sensorRegistry;
}
private int addChildren(final MutableAggregatedSensor parent, final int numChildren,
final int lvl, final int maxLvl, final int startId) {
int nextId = startId;
for (int c = 0; c < numChildren; c++) {
if (lvl == maxLvl) {
parent.addChildMachineSensor("s_" + nextId);
nextId++;
} else {
final MutableAggregatedSensor newParent =
parent.addChildAggregatedSensor("g_" + lvl + '_' + nextId);
nextId = this.addChildren(newParent, numChildren, lvl + 1, maxLvl, nextId);
}
}
return nextId;
}
}
package theodolite.uc2.workloadgenerator;
import java.util.Collection;
import java.util.Set;
import java.util.stream.Collectors;
import org.junit.Assert;
import org.junit.Test;
import titan.ccp.model.sensorregistry.AggregatedSensor;
import titan.ccp.model.sensorregistry.MachineSensor;
import titan.ccp.model.sensorregistry.Sensor;
import titan.ccp.model.sensorregistry.SensorRegistry;
public class SensorRegistryBuilderTest {
@Test
public void testStructure() {
final SensorRegistry registry = new SensorRegistryBuilder(2, 2).build();
final AggregatedSensor root = registry.getTopLevelSensor();
final Collection<Sensor> firstLevelSensors = root.getChildren();
Assert.assertEquals(2, firstLevelSensors.size());
for (final Sensor sensor : firstLevelSensors) {
Assert.assertTrue(sensor instanceof AggregatedSensor);
final AggregatedSensor aggregatedSensor = (AggregatedSensor) sensor;
final Collection<Sensor> secondLevelSensors = aggregatedSensor.getChildren();
Assert.assertEquals(2, secondLevelSensors.size());
for (final Sensor machineSensors : secondLevelSensors) {
Assert.assertTrue(machineSensors instanceof MachineSensor);
}
}
}
@Test
public void testMachineSensorNaming() {
final SensorRegistry registry = new SensorRegistryBuilder(2, 2).build();
final Set<String> machineSensors = registry.getMachineSensors().stream()
.map(s -> s.getIdentifier()).collect(Collectors.toSet());
Assert.assertTrue(machineSensors.contains("s_0"));
Assert.assertTrue(machineSensors.contains("s_1"));
Assert.assertTrue(machineSensors.contains("s_2"));
Assert.assertTrue(machineSensors.contains("s_3"));
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment