From bf02aa55ed43f89013ecc41f12c423396d0891cd Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?S=C3=B6ren=20Henning?= <soeren.henning@email.uni-kiel.de>
Date: Thu, 2 Dec 2021 16:54:30 +0100
Subject: [PATCH] Add draft for HTTP load generator

---
 .../workloadgeneration/HttpRecordSender.java  | 55 +++++++++++++++++++
 .../workloadgeneration/KafkaRecordSender.java |  2 +-
 .../workloadgeneration/LoadGenerator.java     |  3 +
 .../TitanMessageHttpGeneratorFactory.java     | 36 ++++++++++++
 4 files changed, 95 insertions(+), 1 deletion(-)
 create mode 100644 theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/HttpRecordSender.java
 create mode 100644 theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/TitanMessageHttpGeneratorFactory.java

diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/HttpRecordSender.java b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/HttpRecordSender.java
new file mode 100644
index 000000000..8d64d59a3
--- /dev/null
+++ b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/HttpRecordSender.java
@@ -0,0 +1,55 @@
+package theodolite.commons.workloadgeneration;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse.BodyHandler;
+import java.net.http.HttpResponse.BodyHandlers;
+import org.apache.avro.specific.SpecificRecord;
+
+/**
+ * Sends monitoring records via HTTP.
+ *
+ * @param <T> {@link SpecificRecord} to send
+ */
+public class HttpRecordSender<T extends SpecificRecord> implements RecordSender<T> {
+
+  // private static final Logger LOGGER = LoggerFactory.getLogger(HttpRecordSender.class);
+
+  private final HttpClient httpClient;
+
+  private final URI uri;
+
+  private final boolean async;
+
+  /**
+   * Create a new {@link HttpRecordSender}.
+   */
+  public HttpRecordSender(final URI uri) {
+    this.httpClient = HttpClient.newBuilder().build();
+    this.uri = uri;
+    this.async = true;
+  }
+
+  @Override
+  public void send(final T message) {
+    final HttpRequest request = HttpRequest.newBuilder()
+        .uri(this.uri) // TODO
+        .POST(HttpRequest.BodyPublishers.ofString(message.toString())) // TODO to JSON
+        .build();
+    final BodyHandler<Void> bodyHandler = BodyHandlers.discarding();
+    // final BodyHandler<String> bodyHandler = BodyHandlers.ofString();
+    if (this.async) {
+      this.httpClient.sendAsync(request, bodyHandler);
+      // this.httpClient.sendAsync(request, bodyHandler).thenAccept(s -> System.out.println(s));
+    } else {
+      try {
+        this.httpClient.send(request, bodyHandler);
+      } catch (IOException | InterruptedException e) {
+        throw new IllegalStateException(e); // TODO
+      }
+    }
+  }
+
+}
diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/KafkaRecordSender.java b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/KafkaRecordSender.java
index 6e4a43271..e91e3877b 100644
--- a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/KafkaRecordSender.java
+++ b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/KafkaRecordSender.java
@@ -15,7 +15,7 @@ import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory;
 /**
  * Sends monitoring records to Kafka.
  *
- * @param <T> {@link IMonitoringRecord} to send
+ * @param <T> {@link SpecificRecord} to send
  */
 public class KafkaRecordSender<T extends SpecificRecord> implements RecordSender<T> {
 
diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGenerator.java b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGenerator.java
index a9a1ce65a..4bd4e4661 100644
--- a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGenerator.java
+++ b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGenerator.java
@@ -170,6 +170,9 @@ public final class LoadGenerator {
             new KeySpace(SENSOR_PREFIX_DEFAULT, numSensors),
             Duration.ofMillis(periodMs)))
         .setGeneratorConfig(new LoadGeneratorConfig(
+            // TitanMessageHttpGeneratorFactory
+            // .withHttpConfig(
+            // URI.create("http://localhost:8080"))
             TitanMessageGeneratorFactory
                 .withKafkaConfig(
                     kafkaBootstrapServers,
diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/TitanMessageHttpGeneratorFactory.java b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/TitanMessageHttpGeneratorFactory.java
new file mode 100644
index 000000000..8ea176e44
--- /dev/null
+++ b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/TitanMessageHttpGeneratorFactory.java
@@ -0,0 +1,36 @@
+package theodolite.commons.workloadgeneration;
+
+import java.net.URI;
+import titan.ccp.model.records.ActivePowerRecord;
+
+/**
+ * A factory for creating {@link MessageGenerator}s that creates Titan {@link ActivePowerRecord}s
+ * and sends them via HTTP.
+ */
+public final class TitanMessageHttpGeneratorFactory {
+
+  private final RecordSender<ActivePowerRecord> recordSender;
+
+  private TitanMessageHttpGeneratorFactory(final RecordSender<ActivePowerRecord> recordSender) {
+    this.recordSender = recordSender;
+  }
+
+  /**
+   * Create a {@link MessageGenerator} that generates Titan {@link ActivePowerRecord}s with a
+   * constant value.
+   */
+  public MessageGenerator forConstantValue(final double value) {
+    return MessageGenerator.from(
+        sensor -> new ActivePowerRecord(sensor, System.currentTimeMillis(), value),
+        this.recordSender);
+  }
+
+  /**
+   * Create a new {@link TitanMessageHttpGeneratorFactory} for the given HTTP configuration.
+   */
+  public static TitanMessageHttpGeneratorFactory withHttpConfig(final URI uri) {
+    final HttpRecordSender<ActivePowerRecord> httpRecordSender = new HttpRecordSender<>(uri);
+    return new TitanMessageHttpGeneratorFactory(httpRecordSender);
+  }
+
+}
-- 
GitLab