Skip to content
Snippets Groups Projects
Commit bf02aa55 authored by Sören Henning's avatar Sören Henning
Browse files

Add draft for HTTP load generator

parent b9e21dea
No related branches found
No related tags found
1 merge request!202Add option to generate load via HTTP
Pipeline #5389 passed
package theodolite.commons.workloadgeneration;
import java.io.IOException;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse.BodyHandler;
import java.net.http.HttpResponse.BodyHandlers;
import org.apache.avro.specific.SpecificRecord;
/**
* Sends monitoring records via HTTP.
*
* @param <T> {@link SpecificRecord} to send
*/
public class HttpRecordSender<T extends SpecificRecord> implements RecordSender<T> {
// private static final Logger LOGGER = LoggerFactory.getLogger(HttpRecordSender.class);
private final HttpClient httpClient;
private final URI uri;
private final boolean async;
/**
* Create a new {@link HttpRecordSender}.
*/
public HttpRecordSender(final URI uri) {
this.httpClient = HttpClient.newBuilder().build();
this.uri = uri;
this.async = true;
}
@Override
public void send(final T message) {
final HttpRequest request = HttpRequest.newBuilder()
.uri(this.uri) // TODO
.POST(HttpRequest.BodyPublishers.ofString(message.toString())) // TODO to JSON
.build();
final BodyHandler<Void> bodyHandler = BodyHandlers.discarding();
// final BodyHandler<String> bodyHandler = BodyHandlers.ofString();
if (this.async) {
this.httpClient.sendAsync(request, bodyHandler);
// this.httpClient.sendAsync(request, bodyHandler).thenAccept(s -> System.out.println(s));
} else {
try {
this.httpClient.send(request, bodyHandler);
} catch (IOException | InterruptedException e) {
throw new IllegalStateException(e); // TODO
}
}
}
}
......@@ -15,7 +15,7 @@ import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory;
/**
* Sends monitoring records to Kafka.
*
* @param <T> {@link IMonitoringRecord} to send
* @param <T> {@link SpecificRecord} to send
*/
public class KafkaRecordSender<T extends SpecificRecord> implements RecordSender<T> {
......
......@@ -170,6 +170,9 @@ public final class LoadGenerator {
new KeySpace(SENSOR_PREFIX_DEFAULT, numSensors),
Duration.ofMillis(periodMs)))
.setGeneratorConfig(new LoadGeneratorConfig(
// TitanMessageHttpGeneratorFactory
// .withHttpConfig(
// URI.create("http://localhost:8080"))
TitanMessageGeneratorFactory
.withKafkaConfig(
kafkaBootstrapServers,
......
package theodolite.commons.workloadgeneration;
import java.net.URI;
import titan.ccp.model.records.ActivePowerRecord;
/**
* A factory for creating {@link MessageGenerator}s that creates Titan {@link ActivePowerRecord}s
* and sends them via HTTP.
*/
public final class TitanMessageHttpGeneratorFactory {
private final RecordSender<ActivePowerRecord> recordSender;
private TitanMessageHttpGeneratorFactory(final RecordSender<ActivePowerRecord> recordSender) {
this.recordSender = recordSender;
}
/**
* Create a {@link MessageGenerator} that generates Titan {@link ActivePowerRecord}s with a
* constant value.
*/
public MessageGenerator forConstantValue(final double value) {
return MessageGenerator.from(
sensor -> new ActivePowerRecord(sensor, System.currentTimeMillis(), value),
this.recordSender);
}
/**
* Create a new {@link TitanMessageHttpGeneratorFactory} for the given HTTP configuration.
*/
public static TitanMessageHttpGeneratorFactory withHttpConfig(final URI uri) {
final HttpRecordSender<ActivePowerRecord> httpRecordSender = new HttpRecordSender<>(uri);
return new TitanMessageHttpGeneratorFactory(httpRecordSender);
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment