diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/HttpRecordSender.java b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/HttpRecordSender.java new file mode 100644 index 0000000000000000000000000000000000000000..8d64d59a3d3784868ddc00fe9834f87a00d087d4 --- /dev/null +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/HttpRecordSender.java @@ -0,0 +1,55 @@ +package theodolite.commons.workloadgeneration; + +import java.io.IOException; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse.BodyHandler; +import java.net.http.HttpResponse.BodyHandlers; +import org.apache.avro.specific.SpecificRecord; + +/** + * Sends monitoring records via HTTP. + * + * @param <T> {@link SpecificRecord} to send + */ +public class HttpRecordSender<T extends SpecificRecord> implements RecordSender<T> { + + // private static final Logger LOGGER = LoggerFactory.getLogger(HttpRecordSender.class); + + private final HttpClient httpClient; + + private final URI uri; + + private final boolean async; + + /** + * Create a new {@link HttpRecordSender}. + */ + public HttpRecordSender(final URI uri) { + this.httpClient = HttpClient.newBuilder().build(); + this.uri = uri; + this.async = true; + } + + @Override + public void send(final T message) { + final HttpRequest request = HttpRequest.newBuilder() + .uri(this.uri) // TODO + .POST(HttpRequest.BodyPublishers.ofString(message.toString())) // TODO to JSON + .build(); + final BodyHandler<Void> bodyHandler = BodyHandlers.discarding(); + // final BodyHandler<String> bodyHandler = BodyHandlers.ofString(); + if (this.async) { + this.httpClient.sendAsync(request, bodyHandler); + // this.httpClient.sendAsync(request, bodyHandler).thenAccept(s -> System.out.println(s)); + } else { + try { + this.httpClient.send(request, bodyHandler); + } catch (IOException | InterruptedException e) { + throw new IllegalStateException(e); // TODO + } + } + } + +} diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/KafkaRecordSender.java b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/KafkaRecordSender.java index 6e4a43271fbf1e0193c2d39569a0814d1f7935cd..e91e3877bc3aed7bf91ebb5f7a15ed4be5f26217 100644 --- a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/KafkaRecordSender.java +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/KafkaRecordSender.java @@ -15,7 +15,7 @@ import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; /** * Sends monitoring records to Kafka. * - * @param <T> {@link IMonitoringRecord} to send + * @param <T> {@link SpecificRecord} to send */ public class KafkaRecordSender<T extends SpecificRecord> implements RecordSender<T> { diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGenerator.java b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGenerator.java index a9a1ce65ac32e3508299c99a38ecd21e4c9461cf..4bd4e4661fa7949494d4b510c24bdcf9e6ce80dd 100644 --- a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGenerator.java +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGenerator.java @@ -170,6 +170,9 @@ public final class LoadGenerator { new KeySpace(SENSOR_PREFIX_DEFAULT, numSensors), Duration.ofMillis(periodMs))) .setGeneratorConfig(new LoadGeneratorConfig( + // TitanMessageHttpGeneratorFactory + // .withHttpConfig( + // URI.create("http://localhost:8080")) TitanMessageGeneratorFactory .withKafkaConfig( kafkaBootstrapServers, diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/TitanMessageHttpGeneratorFactory.java b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/TitanMessageHttpGeneratorFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..8ea176e4476ee77f20d2ec5c2650231c29877d6d --- /dev/null +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/TitanMessageHttpGeneratorFactory.java @@ -0,0 +1,36 @@ +package theodolite.commons.workloadgeneration; + +import java.net.URI; +import titan.ccp.model.records.ActivePowerRecord; + +/** + * A factory for creating {@link MessageGenerator}s that creates Titan {@link ActivePowerRecord}s + * and sends them via HTTP. + */ +public final class TitanMessageHttpGeneratorFactory { + + private final RecordSender<ActivePowerRecord> recordSender; + + private TitanMessageHttpGeneratorFactory(final RecordSender<ActivePowerRecord> recordSender) { + this.recordSender = recordSender; + } + + /** + * Create a {@link MessageGenerator} that generates Titan {@link ActivePowerRecord}s with a + * constant value. + */ + public MessageGenerator forConstantValue(final double value) { + return MessageGenerator.from( + sensor -> new ActivePowerRecord(sensor, System.currentTimeMillis(), value), + this.recordSender); + } + + /** + * Create a new {@link TitanMessageHttpGeneratorFactory} for the given HTTP configuration. + */ + public static TitanMessageHttpGeneratorFactory withHttpConfig(final URI uri) { + final HttpRecordSender<ActivePowerRecord> httpRecordSender = new HttpRecordSender<>(uri); + return new TitanMessageHttpGeneratorFactory(httpRecordSender); + } + +}