diff --git a/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/LoadGenerator.java b/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/LoadGenerator.java index a33fba0ea5688a2673b193d45a57693da56b1db4..3eb3e8d25b1f1aa6f302673727b8457a744fb503 100644 --- a/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/LoadGenerator.java +++ b/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/LoadGenerator.java @@ -15,8 +15,7 @@ import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGeneratorBu import theodolite.commons.workloadgeneration.misc.ZooKeeper; import titan.ccp.configuration.events.Event; import titan.ccp.model.records.ActivePowerRecord; -import titan.ccp.model.sensorregistry.MutableAggregatedSensor; -import titan.ccp.model.sensorregistry.MutableSensorRegistry; +import titan.ccp.model.sensorregistry.SensorRegistry; /** * The {@code LoadGenerator} creates a load in Kafka. @@ -48,7 +47,11 @@ public final class LoadGenerator { LOGGER.info("Start workload generator for use case UC2."); // get environment variables - final String hierarchy = Objects.requireNonNullElse(System.getenv("HIERARCHY"), DEEP); + final String hierarchy = System.getenv("HIERARCHY"); + if (hierarchy != null && hierarchy.equals(DEEP)) { + LOGGER.error( + "The HIERARCHY parameter is no longer supported. Creating a full hierachy instead."); + } final int numNestedGroups = Integer .parseInt(Objects.requireNonNullElse(System.getenv("NUM_NESTED_GROUPS"), "1")); final String zooKeeperHost = Objects.requireNonNullElse(System.getenv("ZK_HOST"), "localhost"); @@ -77,8 +80,8 @@ public final class LoadGenerator { Integer.parseInt(Objects.requireNonNullElse(System.getenv("INSTANCES"), "1")); // build sensor registry - final MutableSensorRegistry sensorRegistry = - buildSensorRegistry(hierarchy, numNestedGroups, numSensors); + final SensorRegistry sensorRegistry = + new SensorRegistryBuilder(numNestedGroups, numSensors).build(); // create kafka record sender final Properties kafkaProperties = new Properties(); @@ -101,7 +104,7 @@ public final class LoadGenerator { final KafkaWorkloadGenerator<ActivePowerRecord> workloadGenerator = KafkaWorkloadGeneratorBuilder.<ActivePowerRecord>builder() .instances(instances) - .keySpace(new KeySpace("s_", numSensors)) + .keySpace(new KeySpace("s_", sensorRegistry.getMachineSensors().size())) .threads(threads) .period(Duration.of(periodMs, ChronoUnit.MILLIS)) .duration(Duration.of(MAX_DURATION_IN_DAYS, ChronoUnit.DAYS)) @@ -133,42 +136,4 @@ public final class LoadGenerator { workloadGenerator.start(); } - private static MutableSensorRegistry buildSensorRegistry( - final String hierarchy, - final int numNestedGroups, - final int numSensors) { - final MutableSensorRegistry sensorRegistry = new MutableSensorRegistry("group_lvl_0"); - if (DEEP.equals(hierarchy)) { - MutableAggregatedSensor lastSensor = sensorRegistry.getTopLevelSensor(); - for (int lvl = 1; lvl < numNestedGroups; lvl++) { - lastSensor = lastSensor.addChildAggregatedSensor("group_lvl_" + lvl); - } - for (int s = 0; s < numSensors; s++) { - lastSensor.addChildMachineSensor("sensor_" + s); - } - } else if ("full".equals(hierarchy)) { - addChildren(sensorRegistry.getTopLevelSensor(), numSensors, 1, numNestedGroups, 0); - } else { - throw new IllegalStateException(); - } - return sensorRegistry; - } - - private static int addChildren(final MutableAggregatedSensor parent, final int numChildren, - final int lvl, final int maxLvl, final int startId) { - int nextId = startId; - for (int c = 0; c < numChildren; c++) { - if (lvl == maxLvl) { - parent.addChildMachineSensor("s_" + nextId); - nextId++; - } else { - final MutableAggregatedSensor newParent = - parent.addChildAggregatedSensor("g_" + lvl + '_' + nextId); - nextId++; - nextId = addChildren(newParent, numChildren, lvl + 1, maxLvl, nextId); - } - } - return nextId; - } - } diff --git a/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/SensorRegistryBuilder.java b/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/SensorRegistryBuilder.java new file mode 100644 index 0000000000000000000000000000000000000000..7c34ac89471386f4ddd508a304f2197602beab27 --- /dev/null +++ b/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/SensorRegistryBuilder.java @@ -0,0 +1,51 @@ +package theodolite.uc2.workloadgenerator; + +import titan.ccp.model.sensorregistry.MutableAggregatedSensor; +import titan.ccp.model.sensorregistry.MutableSensorRegistry; +import titan.ccp.model.sensorregistry.SensorRegistry; + +/** + * Builder for creating a nested {@link SensorRegistry} with {@code numNestedGroups} levels and + * {@code numSensors} children per group. + */ +public final class SensorRegistryBuilder { + + private final int numNestedGroups; + private final int numSensors; + + public SensorRegistryBuilder(final int numNestedGroups, final int numSensors) { + this.numNestedGroups = numNestedGroups; + this.numSensors = numSensors; + } + + /** + * Creates the {@link SensorRegistry}. + */ + public SensorRegistry build() { + final MutableSensorRegistry sensorRegistry = new MutableSensorRegistry("group_lvl_0"); + this.addChildren( + sensorRegistry.getTopLevelSensor(), + this.numSensors, + 1, + this.numNestedGroups, + 0); + return sensorRegistry; + } + + private int addChildren(final MutableAggregatedSensor parent, final int numChildren, + final int lvl, final int maxLvl, final int startId) { + int nextId = startId; + for (int c = 0; c < numChildren; c++) { + if (lvl == maxLvl) { + parent.addChildMachineSensor("s_" + nextId); + nextId++; + } else { + final MutableAggregatedSensor newParent = + parent.addChildAggregatedSensor("g_" + lvl + '_' + nextId); + nextId = this.addChildren(newParent, numChildren, lvl + 1, maxLvl, nextId); + } + } + return nextId; + } + +} diff --git a/uc2-workload-generator/src/test/java/theodolite/uc2/workloadgenerator/SensorRegistryBuilderTest.java b/uc2-workload-generator/src/test/java/theodolite/uc2/workloadgenerator/SensorRegistryBuilderTest.java new file mode 100644 index 0000000000000000000000000000000000000000..17b208edac4acafa92b7a75e053e2fe97a9afdb6 --- /dev/null +++ b/uc2-workload-generator/src/test/java/theodolite/uc2/workloadgenerator/SensorRegistryBuilderTest.java @@ -0,0 +1,46 @@ +package theodolite.uc2.workloadgenerator; + + +import java.util.Collection; +import java.util.Set; +import java.util.stream.Collectors; +import org.junit.Assert; +import org.junit.Test; +import titan.ccp.model.sensorregistry.AggregatedSensor; +import titan.ccp.model.sensorregistry.MachineSensor; +import titan.ccp.model.sensorregistry.Sensor; +import titan.ccp.model.sensorregistry.SensorRegistry; + +public class SensorRegistryBuilderTest { + + @Test + public void testStructure() { + final SensorRegistry registry = new SensorRegistryBuilder(2, 2).build(); + final AggregatedSensor root = registry.getTopLevelSensor(); + final Collection<Sensor> firstLevelSensors = root.getChildren(); + Assert.assertEquals(2, firstLevelSensors.size()); + for (final Sensor sensor : firstLevelSensors) { + Assert.assertTrue(sensor instanceof AggregatedSensor); + final AggregatedSensor aggregatedSensor = (AggregatedSensor) sensor; + final Collection<Sensor> secondLevelSensors = aggregatedSensor.getChildren(); + Assert.assertEquals(2, secondLevelSensors.size()); + for (final Sensor machineSensors : secondLevelSensors) { + Assert.assertTrue(machineSensors instanceof MachineSensor); + + } + } + } + + @Test + public void testMachineSensorNaming() { + final SensorRegistry registry = new SensorRegistryBuilder(2, 2).build(); + final Set<String> machineSensors = registry.getMachineSensors().stream() + .map(s -> s.getIdentifier()).collect(Collectors.toSet()); + + Assert.assertTrue(machineSensors.contains("s_0")); + Assert.assertTrue(machineSensors.contains("s_1")); + Assert.assertTrue(machineSensors.contains("s_2")); + Assert.assertTrue(machineSensors.contains("s_3")); + } + +}