diff --git a/uc1-workload-generator/src/main/java/uc2/workloadGenerator/ConfigPublisher.java b/uc1-workload-generator/src/main/java/uc1/workloadGenerator/ConfigPublisher.java similarity index 100% rename from uc1-workload-generator/src/main/java/uc2/workloadGenerator/ConfigPublisher.java rename to uc1-workload-generator/src/main/java/uc1/workloadGenerator/ConfigPublisher.java diff --git a/uc1-workload-generator/src/main/java/uc2/workloadGenerator/LoadGenerator.java b/uc1-workload-generator/src/main/java/uc1/workloadGenerator/LoadGenerator.java similarity index 100% rename from uc1-workload-generator/src/main/java/uc2/workloadGenerator/LoadGenerator.java rename to uc1-workload-generator/src/main/java/uc1/workloadGenerator/LoadGenerator.java diff --git a/uc1-workload-generator/src/main/java/uc2/workloadGenerator/LoadGeneratorExtrem.java b/uc1-workload-generator/src/main/java/uc1/workloadGenerator/LoadGeneratorExtrem.java similarity index 100% rename from uc1-workload-generator/src/main/java/uc2/workloadGenerator/LoadGeneratorExtrem.java rename to uc1-workload-generator/src/main/java/uc1/workloadGenerator/LoadGeneratorExtrem.java diff --git a/uc3-workload-generator/src/main/java/titan/ccp/kiekerbridge/KafkaRecordSender.java b/uc3-workload-generator/src/main/java/kafkaSender/KafkaRecordSender.java similarity index 98% rename from uc3-workload-generator/src/main/java/titan/ccp/kiekerbridge/KafkaRecordSender.java rename to uc3-workload-generator/src/main/java/kafkaSender/KafkaRecordSender.java index b46128c8ebfd52aeeb7127777bb6530761f35181..6c67cf722b4dce87f0bc197ba80f8f117f82198e 100644 --- a/uc3-workload-generator/src/main/java/titan/ccp/kiekerbridge/KafkaRecordSender.java +++ b/uc3-workload-generator/src/main/java/kafkaSender/KafkaRecordSender.java @@ -1,4 +1,4 @@ -package titan.ccp.kiekerbridge; +package kafkaSender; import java.util.Properties; import java.util.function.Function; diff --git a/uc3-workload-generator/src/main/java/titan/ccp/kiekerbridge/expbigdata19/ExperimentorBigData.java b/uc3-workload-generator/src/main/java/titan/ccp/kiekerbridge/expbigdata19/ExperimentorBigData.java deleted file mode 100644 index a50bbd942fccf5f8899414fe8cb7b82ad6953f87..0000000000000000000000000000000000000000 --- a/uc3-workload-generator/src/main/java/titan/ccp/kiekerbridge/expbigdata19/ExperimentorBigData.java +++ /dev/null @@ -1,21 +0,0 @@ -package titan.ccp.kiekerbridge.expbigdata19; - -import java.io.IOException; -import java.util.Objects; - -public class ExperimentorBigData { - - public static void main(final String[] args) throws InterruptedException, IOException { - - final String modus = Objects.requireNonNullElse(System.getenv("MODUS"), "LoadCounter"); - - if (modus.equals("LoadGenerator")) { - LoadGenerator.main(args); - } else if (modus.equals("LoadGeneratorExtrem")) { - LoadGeneratorExtrem.main(args); - } else if (modus.equals("LoadCounter")) { - LoadCounter.main(args); - } - - } -} diff --git a/uc3-workload-generator/src/main/java/titan/ccp/kiekerbridge/expbigdata19/LoadCounter.java b/uc3-workload-generator/src/main/java/titan/ccp/kiekerbridge/expbigdata19/LoadCounter.java deleted file mode 100644 index 798f3014446605afab2cf20f3232896baab02802..0000000000000000000000000000000000000000 --- a/uc3-workload-generator/src/main/java/titan/ccp/kiekerbridge/expbigdata19/LoadCounter.java +++ /dev/null @@ -1,84 +0,0 @@ -package titan.ccp.kiekerbridge.expbigdata19; - -import com.google.common.math.StatsAccumulator; -import java.time.Duration; -import java.util.List; -import java.util.Objects; -import java.util.Properties; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.serialization.Deserializer; -import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde; -import titan.ccp.models.records.AggregatedActivePowerRecord; -import titan.ccp.models.records.AggregatedActivePowerRecordFactory; - -public class LoadCounter { - - public static void main(final String[] args) throws InterruptedException { - - final String kafkaBootstrapServers = - Objects.requireNonNullElse(System.getenv("KAFKA_BOOTSTRAP_SERVERS"), "localhost:9092"); - final String kafkaInputTopic = - Objects.requireNonNullElse(System.getenv("KAFKA_INPUT_TOPIC"), "input"); - final String kafkaOutputTopic = - Objects.requireNonNullElse(System.getenv("KAFKA_OUTPUT_TOPIC"), "output"); - - final Properties props = new Properties(); - props.setProperty("bootstrap.servers", kafkaBootstrapServers); - props.setProperty("group.id", "load-counter"); - props.setProperty("enable.auto.commit", "true"); - props.setProperty("auto.commit.interval.ms", "1000"); - props.setProperty("max.poll.records", "1000000"); - props.setProperty("max.partition.fetch.bytes", "134217728"); // 128 MB - props.setProperty("key.deserializer", - "org.apache.kafka.common.serialization.StringDeserializer"); - props.setProperty("value.deserializer", - "org.apache.kafka.common.serialization.ByteArrayDeserializer"); - - final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); - final Deserializer<AggregatedActivePowerRecord> deserializer = - IMonitoringRecordSerde.deserializer(new AggregatedActivePowerRecordFactory()); - - final KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props); - consumer.subscribe(List.of(kafkaInputTopic, kafkaOutputTopic)); - - executor.scheduleAtFixedRate( - () -> { - final long time = System.currentTimeMillis(); - final ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(500)); - - long inputCount = 0; - for (final ConsumerRecord<String, byte[]> inputRecord : records - .records(kafkaInputTopic)) { - inputCount++; - } - - long outputCount = 0; - final StatsAccumulator statsAccumulator = new StatsAccumulator(); - for (final ConsumerRecord<String, byte[]> outputRecord : records - .records(kafkaOutputTopic)) { - outputCount++; - final AggregatedActivePowerRecord record = - deserializer.deserialize(kafkaOutputTopic, outputRecord.value()); - final long latency = time - record.getTimestamp(); - statsAccumulator.add(latency); - } - - final double latency = statsAccumulator.count() > 0 ? statsAccumulator.mean() : 0.0; - - final long elapsedTime = System.currentTimeMillis() - time; - System.out - .println("input," + time + ',' + elapsedTime + ',' + 0 + ',' + inputCount); - System.out - .println("output," + time + ',' + elapsedTime + ',' + latency + ',' + outputCount); - }, - 0, - 1, - TimeUnit.SECONDS); - } - -} diff --git a/uc3-workload-generator/src/main/java/titan/ccp/kiekerbridge/expbigdata19/LoadGenerator.java b/uc3-workload-generator/src/main/java/titan/ccp/kiekerbridge/expbigdata19/LoadGenerator.java deleted file mode 100644 index 97a7c84f872f3ab676128d903ae121c376bf7608..0000000000000000000000000000000000000000 --- a/uc3-workload-generator/src/main/java/titan/ccp/kiekerbridge/expbigdata19/LoadGenerator.java +++ /dev/null @@ -1,124 +0,0 @@ -package titan.ccp.kiekerbridge.expbigdata19; - -import java.io.IOException; -import java.util.List; -import java.util.Objects; -import java.util.Properties; -import java.util.Random; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; -import org.apache.kafka.clients.producer.ProducerConfig; -import titan.ccp.configuration.events.Event; -import titan.ccp.kiekerbridge.KafkaRecordSender; -import titan.ccp.model.sensorregistry.MutableAggregatedSensor; -import titan.ccp.model.sensorregistry.MutableSensorRegistry; -import titan.ccp.models.records.ActivePowerRecord; - -public class LoadGenerator { - - public static void main(final String[] args) throws InterruptedException, IOException { - - final String hierarchy = Objects.requireNonNullElse(System.getenv("HIERARCHY"), "deep"); - final int numNestedGroups = - Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_NESTED_GROUPS"), "1")); - final int numSensor = - Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_SENSORS"), "1")); - final int periodMs = - Integer.parseInt(Objects.requireNonNullElse(System.getenv("PERIOD_MS"), "1000")); - final int value = - Integer.parseInt(Objects.requireNonNullElse(System.getenv("VALUE"), "10")); - final boolean sendRegistry = - Boolean.parseBoolean(Objects.requireNonNullElse(System.getenv("SEND_REGISTRY"), "true")); - final int threads = - Integer.parseInt(Objects.requireNonNullElse(System.getenv("THREADS"), "4")); - final String kafkaBootstrapServers = - Objects.requireNonNullElse(System.getenv("KAFKA_BOOTSTRAP_SERVERS"), "localhost:9092"); - final String kafkaInputTopic = - Objects.requireNonNullElse(System.getenv("KAFKA_INPUT_TOPIC"), "input"); - final String kafkaBatchSize = System.getenv("KAFKA_BATCH_SIZE"); - final String kafkaLingerMs = System.getenv("KAFKA_LINGER_MS"); - final String kafkaBufferMemory = System.getenv("KAFKA_BUFFER_MEMORY"); - - final MutableSensorRegistry sensorRegistry = new MutableSensorRegistry("group_lvl_0"); - if (hierarchy.equals("deep")) { - MutableAggregatedSensor lastSensor = sensorRegistry.getTopLevelSensor(); - for (int lvl = 1; lvl < numNestedGroups; lvl++) { - lastSensor = lastSensor.addChildAggregatedSensor("group_lvl_" + lvl); - } - for (int s = 0; s < numSensor; s++) { - lastSensor.addChildMachineSensor("sensor_" + s); - } - } else if (hierarchy.equals("full")) { - addChildren(sensorRegistry.getTopLevelSensor(), numSensor, 1, numNestedGroups, 0); - } else { - throw new IllegalStateException(); - } - - final List<String> sensors = - sensorRegistry.getMachineSensors().stream().map(s -> s.getIdentifier()) - .collect(Collectors.toList()); - - if (sendRegistry) { - final ConfigPublisher configPublisher = - new ConfigPublisher(kafkaBootstrapServers, "configuration"); - configPublisher.publish(Event.SENSOR_REGISTRY_CHANGED, sensorRegistry.toJson()); - configPublisher.close(); - System.out.println("Configuration sent."); - - System.out.println("Now wait 30 seconds"); - Thread.sleep(30_000); - System.out.println("And woke up again :)"); - } - - - final Properties kafkaProperties = new Properties(); - // kafkaProperties.put("acks", this.acknowledges); - kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, (k, v) -> kafkaBatchSize); - kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG, (k, v) -> kafkaLingerMs); - kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG, (k, v) -> kafkaBufferMemory); - final KafkaRecordSender<ActivePowerRecord> kafkaRecordSender = new KafkaRecordSender<>( - kafkaBootstrapServers, kafkaInputTopic, r -> r.getIdentifier(), r -> r.getTimestamp(), - kafkaProperties); - - final ScheduledExecutorService executor = Executors.newScheduledThreadPool(threads); - final Random random = new Random(); - - for (final String sensor : sensors) { - final int initialDelay = random.nextInt(periodMs); - executor.scheduleAtFixedRate( - () -> { - kafkaRecordSender.write(new ActivePowerRecord( - sensor, - System.currentTimeMillis(), - value)); - }, - initialDelay, - periodMs, - TimeUnit.MILLISECONDS); - } - - System.out.println("Wait for termination..."); - executor.awaitTermination(30, TimeUnit.DAYS); - System.out.println("Will terminate now"); - - } - - private static int addChildren(final MutableAggregatedSensor parent, final int numChildren, - final int lvl, final int maxLvl, int nextId) { - for (int c = 0; c < numChildren; c++) { - if (lvl == maxLvl) { - parent.addChildMachineSensor("s_" + nextId); - nextId++; - } else { - final MutableAggregatedSensor newParent = - parent.addChildAggregatedSensor("g_" + lvl + '_' + nextId); - nextId++; - nextId = addChildren(newParent, numChildren, lvl + 1, maxLvl, nextId); - } - } - return nextId; - } - -} diff --git a/uc3-workload-generator/src/main/java/uc1/streamprocessing/KafkaStreamsBuilder.java b/uc3-workload-generator/src/main/java/uc1/streamprocessing/KafkaStreamsBuilder.java deleted file mode 100644 index 706cf79022b2485b349bfe7ae144145dda013d20..0000000000000000000000000000000000000000 --- a/uc3-workload-generator/src/main/java/uc1/streamprocessing/KafkaStreamsBuilder.java +++ /dev/null @@ -1,92 +0,0 @@ -package uc1.streamprocessing; - -import java.util.Objects; -import java.util.Properties; -import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.StreamsConfig; -import titan.ccp.common.kafka.streams.PropertiesBuilder; - -/** - * Builder for the Kafka Streams configuration. - */ -public class KafkaStreamsBuilder { - - private static final String APPLICATION_NAME = "titan-ccp-history"; - private static final String APPLICATION_VERSION = "0.0.1"; - - // private static final Logger LOGGER = LoggerFactory.getLogger(KafkaStreamsBuilder.class); - - private String bootstrapServers; // NOPMD - private String inputTopic; // NOPMD - private int numThreads = -1; // NOPMD - private int commitIntervalMs = -1; // NOPMD - private int cacheMaxBytesBuff = -1; // NOPMD - - public KafkaStreamsBuilder inputTopic(final String inputTopic) { - this.inputTopic = inputTopic; - return this; - } - - public KafkaStreamsBuilder bootstrapServers(final String bootstrapServers) { - this.bootstrapServers = bootstrapServers; - return this; - } - - /** - * Sets the Kafka Streams property for the number of threads (num.stream.threads). Can be minus - * one for using the default. - */ - public KafkaStreamsBuilder numThreads(final int numThreads) { - if (numThreads < -1 || numThreads == 0) { - throw new IllegalArgumentException("Number of threads must be greater 0 or -1."); - } - this.numThreads = numThreads; - return this; - } - - /** - * Sets the Kafka Streams property for the frequency with which to save the position (offsets in - * source topics) of tasks (commit.interval.ms). Must be zero for processing all record, for - * example, when processing bulks of records. Can be minus one for using the default. - */ - public KafkaStreamsBuilder commitIntervalMs(final int commitIntervalMs) { - if (commitIntervalMs < -1) { - throw new IllegalArgumentException("Commit interval must be greater or equal -1."); - } - this.commitIntervalMs = commitIntervalMs; - return this; - } - - /** - * Sets the Kafka Streams property for maximum number of memory bytes to be used for record caches - * across all threads (cache.max.bytes.buffering). Must be zero for processing all record, for - * example, when processing bulks of records. Can be minus one for using the default. - */ - public KafkaStreamsBuilder cacheMaxBytesBuffering(final int cacheMaxBytesBuffering) { - if (cacheMaxBytesBuffering < -1) { - throw new IllegalArgumentException("Cache max bytes buffering must be greater or equal -1."); - } - this.cacheMaxBytesBuff = cacheMaxBytesBuffering; - return this; - } - - /** - * Builds the {@link KafkaStreams} instance. - */ - public KafkaStreams build() { - Objects.requireNonNull(this.inputTopic, "Input topic has not been set."); - // TODO log parameters - final TopologyBuilder topologyBuilder = new TopologyBuilder( - this.inputTopic); - final Properties properties = PropertiesBuilder - .bootstrapServers(this.bootstrapServers) - .applicationId(APPLICATION_NAME + '-' + APPLICATION_VERSION) // TODO as parameter - .set(StreamsConfig.NUM_STREAM_THREADS_CONFIG, this.numThreads, p -> p > 0) - .set(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, this.commitIntervalMs, p -> p >= 0) - .set(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, this.cacheMaxBytesBuff, p -> p >= 0) - .set(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG") - .build(); - return new KafkaStreams(topologyBuilder.build(), properties); - } - -} diff --git a/uc3-workload-generator/src/main/java/uc1/streamprocessing/TopologyBuilder.java b/uc3-workload-generator/src/main/java/uc1/streamprocessing/TopologyBuilder.java deleted file mode 100644 index 140c592f4e33334fbced6a80b82173c00d19eb25..0000000000000000000000000000000000000000 --- a/uc3-workload-generator/src/main/java/uc1/streamprocessing/TopologyBuilder.java +++ /dev/null @@ -1,45 +0,0 @@ -package uc1.streamprocessing; - -import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.Topology; -import org.apache.kafka.streams.kstream.Consumed; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde; -import titan.ccp.models.records.ActivePowerRecordFactory; - -/** - * Builds Kafka Stream Topology for the History microservice. - */ -public class TopologyBuilder { - - private static final Logger LOGGER = LoggerFactory.getLogger(TopologyBuilder.class); - - private final String inputTopic; - - private final StreamsBuilder builder = new StreamsBuilder(); - - /** - * Create a new {@link TopologyBuilder} using the given topics. - */ - public TopologyBuilder(final String inputTopic) { - this.inputTopic = inputTopic; - } - - /** - * Build the {@link Topology} for the History microservice. - */ - public Topology build() { - - this.builder - .stream(this.inputTopic, Consumed.with( - Serdes.String(), - IMonitoringRecordSerde.serde(new ActivePowerRecordFactory()))) - .mapValues(value -> value.getValueInW()) - .foreach((key, measurement) -> LOGGER - .info("Key: " + key + " Value: " + measurement)); - - return this.builder.build(); - } -} diff --git a/uc3-workload-generator/src/main/java/titan/ccp/kiekerbridge/expbigdata19/ConfigPublisher.java b/uc3-workload-generator/src/main/java/uc3/workloadGenerator/ConfigPublisher.java similarity index 97% rename from uc3-workload-generator/src/main/java/titan/ccp/kiekerbridge/expbigdata19/ConfigPublisher.java rename to uc3-workload-generator/src/main/java/uc3/workloadGenerator/ConfigPublisher.java index d5f55a4ab7ca265b241e880363975070e9952c45..ab36397d810c276cf6e1e134364650a64d5997d1 100644 --- a/uc3-workload-generator/src/main/java/titan/ccp/kiekerbridge/expbigdata19/ConfigPublisher.java +++ b/uc3-workload-generator/src/main/java/uc3/workloadGenerator/ConfigPublisher.java @@ -1,4 +1,4 @@ -package titan.ccp.kiekerbridge.expbigdata19; +package uc3.workloadGenerator; import java.util.Properties; import java.util.concurrent.ExecutionException; diff --git a/uc3-workload-generator/src/main/java/uc3/workloadGenerator/LoadGenerator.java b/uc3-workload-generator/src/main/java/uc3/workloadGenerator/LoadGenerator.java new file mode 100644 index 0000000000000000000000000000000000000000..35defc90a06f8c6a834c54fdd69388106b5c3ceb --- /dev/null +++ b/uc3-workload-generator/src/main/java/uc3/workloadGenerator/LoadGenerator.java @@ -0,0 +1,87 @@ +package uc3.workloadGenerator; + +import java.io.IOException; +import java.util.List; +import java.util.Objects; +import java.util.Properties; +import java.util.Random; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import kafkaSender.KafkaRecordSender; +import org.apache.kafka.clients.producer.ProducerConfig; +import titan.ccp.configuration.events.Event; +import titan.ccp.model.sensorregistry.MutableAggregatedSensor; +import titan.ccp.model.sensorregistry.MutableSensorRegistry; +import titan.ccp.models.records.ActivePowerRecord; + +public class LoadGenerator { + + public static void main(final String[] args) throws InterruptedException, IOException { + // uc1 + + final int numSensor = Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_SENSORS"), "10")); + final int periodMs = Integer.parseInt(Objects.requireNonNullElse(System.getenv("PERIOD_MS"), "1000")); + final int value = Integer.parseInt(Objects.requireNonNullElse(System.getenv("VALUE"), "10")); + final boolean sendRegistry = Boolean + .parseBoolean(Objects.requireNonNullElse(System.getenv("SEND_REGISTRY"), "true")); + final int threads = Integer.parseInt(Objects.requireNonNullElse(System.getenv("THREADS"), "4")); + final String kafkaBootstrapServers = Objects.requireNonNullElse(System.getenv("KAFKA_BOOTSTRAP_SERVERS"), + "localhost:9092"); + final String kafkaInputTopic = Objects.requireNonNullElse(System.getenv("KAFKA_INPUT_TOPIC"), "input"); + final String kafkaBatchSize = System.getenv("KAFKA_BATCH_SIZE"); + final String kafkaLingerMs = System.getenv("KAFKA_LINGER_MS"); + final String kafkaBufferMemory = System.getenv("KAFKA_BUFFER_MEMORY"); + + // create sensorRegistry + final MutableSensorRegistry sensorRegistry = new MutableSensorRegistry("group_lvl_0"); + addChildrens(sensorRegistry.getTopLevelSensor(), numSensor, 0); + + final List<String> sensors = sensorRegistry.getMachineSensors().stream().map(s -> s.getIdentifier()) + .collect(Collectors.toList()); + + if (sendRegistry) { + final ConfigPublisher configPublisher = new ConfigPublisher(kafkaBootstrapServers, "configuration"); + configPublisher.publish(Event.SENSOR_REGISTRY_CHANGED, sensorRegistry.toJson()); + configPublisher.close(); + System.out.println("Configuration sent."); + + System.out.println("Now wait 30 seconds"); + Thread.sleep(30_000); + System.out.println("And woke up again :)"); + } + + final Properties kafkaProperties = new Properties(); + // kafkaProperties.put("acks", this.acknowledges); + kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, (k, v) -> kafkaBatchSize); + kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG, (k, v) -> kafkaLingerMs); + kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG, (k, v) -> kafkaBufferMemory); + final KafkaRecordSender<ActivePowerRecord> kafkaRecordSender = new KafkaRecordSender<>(kafkaBootstrapServers, + kafkaInputTopic, r -> r.getIdentifier(), r -> r.getTimestamp(), kafkaProperties); + + final ScheduledExecutorService executor = Executors.newScheduledThreadPool(threads); + final Random random = new Random(); + + for (final String sensor : sensors) { + System.out.println("working"); + final int initialDelay = random.nextInt(periodMs); + executor.scheduleAtFixedRate(() -> { + kafkaRecordSender.write(new ActivePowerRecord(sensor, System.currentTimeMillis(), value)); + }, initialDelay, periodMs, TimeUnit.MILLISECONDS); + } + + System.out.println("Wait for termination..."); + executor.awaitTermination(30, TimeUnit.DAYS); + System.out.println("Will terminate now"); + + } + + private static void addChildrens(final MutableAggregatedSensor parent, final int numChildren, int nextId) { + for (int c = 0; c < numChildren; c++) { + parent.addChildMachineSensor("s_" + nextId); + nextId++; + } + } + +} diff --git a/uc3-workload-generator/src/main/java/titan/ccp/kiekerbridge/expbigdata19/LoadGeneratorExtrem.java b/uc3-workload-generator/src/main/java/uc3/workloadGenerator/LoadGeneratorExtrem.java similarity index 98% rename from uc3-workload-generator/src/main/java/titan/ccp/kiekerbridge/expbigdata19/LoadGeneratorExtrem.java rename to uc3-workload-generator/src/main/java/uc3/workloadGenerator/LoadGeneratorExtrem.java index 5bfb6ad488e90f39ded2b9e4cb57d10099f1c538..2361cf2c04a1bc3bd05af089e6bdf72213eb6cb1 100644 --- a/uc3-workload-generator/src/main/java/titan/ccp/kiekerbridge/expbigdata19/LoadGeneratorExtrem.java +++ b/uc3-workload-generator/src/main/java/uc3/workloadGenerator/LoadGeneratorExtrem.java @@ -1,4 +1,4 @@ -package titan.ccp.kiekerbridge.expbigdata19; +package uc3.workloadGenerator; import java.io.IOException; import java.lang.management.ManagementFactory; @@ -9,9 +9,9 @@ import java.util.Objects; import java.util.Properties; import java.util.stream.Collectors; import java.util.stream.Stream; +import kafkaSender.KafkaRecordSender; import org.apache.kafka.clients.producer.ProducerConfig; import titan.ccp.configuration.events.Event; -import titan.ccp.kiekerbridge.KafkaRecordSender; import titan.ccp.model.sensorregistry.MutableAggregatedSensor; import titan.ccp.model.sensorregistry.MutableSensorRegistry; import titan.ccp.model.sensorregistry.SensorRegistry;