Skip to content
Snippets Groups Projects
Commit 7d5fef40 authored by Sören Henning's avatar Sören Henning
Browse files

Clean up

parent b95c89fa
No related branches found
No related tags found
No related merge requests found
......@@ -11,7 +11,6 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import kafkaSender.KafkaRecordSender;
import org.apache.kafka.clients.producer.ProducerConfig;
import titan.ccp.configuration.events.Event;
import titan.ccp.model.sensorregistry.MutableAggregatedSensor;
import titan.ccp.model.sensorregistry.MutableSensorRegistry;
import titan.ccp.models.records.ActivePowerRecord;
......@@ -19,45 +18,39 @@ import titan.ccp.models.records.ActivePowerRecord;
public class LoadGenerator {
public static void main(final String[] args) throws InterruptedException, IOException {
// uc1
// uc3
final int numSensor = Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_SENSORS"), "10"));
final int periodMs = Integer.parseInt(Objects.requireNonNullElse(System.getenv("PERIOD_MS"), "1000"));
final int numSensor =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_SENSORS"), "10"));
final int periodMs =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("PERIOD_MS"), "1000"));
final int value = Integer.parseInt(Objects.requireNonNullElse(System.getenv("VALUE"), "10"));
final boolean sendRegistry = Boolean
.parseBoolean(Objects.requireNonNullElse(System.getenv("SEND_REGISTRY"), "true"));
final int threads = Integer.parseInt(Objects.requireNonNullElse(System.getenv("THREADS"), "4"));
final String kafkaBootstrapServers = Objects.requireNonNullElse(System.getenv("KAFKA_BOOTSTRAP_SERVERS"),
final String kafkaBootstrapServers =
Objects.requireNonNullElse(System.getenv("KAFKA_BOOTSTRAP_SERVERS"),
"localhost:9092");
final String kafkaInputTopic = Objects.requireNonNullElse(System.getenv("KAFKA_INPUT_TOPIC"), "input");
final String kafkaInputTopic =
Objects.requireNonNullElse(System.getenv("KAFKA_INPUT_TOPIC"), "input");
final String kafkaBatchSize = System.getenv("KAFKA_BATCH_SIZE");
final String kafkaLingerMs = System.getenv("KAFKA_LINGER_MS");
final String kafkaBufferMemory = System.getenv("KAFKA_BUFFER_MEMORY");
// create sensorRegistry
// TODO replace as in UC1 or UC4
final MutableSensorRegistry sensorRegistry = new MutableSensorRegistry("group_lvl_0");
addChildrens(sensorRegistry.getTopLevelSensor(), numSensor, 0);
final List<String> sensors = sensorRegistry.getMachineSensors().stream().map(s -> s.getIdentifier())
final List<String> sensors =
sensorRegistry.getMachineSensors().stream().map(s -> s.getIdentifier())
.collect(Collectors.toList());
if (sendRegistry) {
final ConfigPublisher configPublisher = new ConfigPublisher(kafkaBootstrapServers, "configuration");
configPublisher.publish(Event.SENSOR_REGISTRY_CHANGED, sensorRegistry.toJson());
configPublisher.close();
System.out.println("Configuration sent.");
System.out.println("Now wait 30 seconds");
Thread.sleep(30_000);
System.out.println("And woke up again :)");
}
final Properties kafkaProperties = new Properties();
// kafkaProperties.put("acks", this.acknowledges);
kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, (k, v) -> kafkaBatchSize);
kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG, (k, v) -> kafkaLingerMs);
kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG, (k, v) -> kafkaBufferMemory);
final KafkaRecordSender<ActivePowerRecord> kafkaRecordSender = new KafkaRecordSender<>(kafkaBootstrapServers,
final KafkaRecordSender<ActivePowerRecord> kafkaRecordSender =
new KafkaRecordSender<>(kafkaBootstrapServers,
kafkaInputTopic, r -> r.getIdentifier(), r -> r.getTimestamp(), kafkaProperties);
final ScheduledExecutorService executor = Executors.newScheduledThreadPool(threads);
......@@ -77,7 +70,8 @@ public class LoadGenerator {
}
private static void addChildrens(final MutableAggregatedSensor parent, final int numChildren, int nextId) {
private static void addChildrens(final MutableAggregatedSensor parent, final int numChildren,
int nextId) {
for (int c = 0; c < numChildren; c++) {
parent.addChildMachineSensor("s_" + nextId);
nextId++;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment