diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/ConfigurationKeys.java b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/ConfigurationKeys.java index 45ac1d5bb9c21a1b6303de2f248d08b69c02fc28..e4787db047232f6392a52ce81c248f11c1dbaaf2 100644 --- a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/ConfigurationKeys.java +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/ConfigurationKeys.java @@ -23,6 +23,8 @@ public final class ConfigurationKeys { public static final String THREADS = "THREADS"; + public static final String TARGET = "TARGET"; + public static final String KAFKA_BOOTSTRAP_SERVERS = "KAFKA_BOOTSTRAP_SERVERS"; public static final String SCHEMA_REGISTRY_URL = "SCHEMA_REGISTRY_URL"; @@ -35,6 +37,8 @@ public final class ConfigurationKeys { public static final String KAFKA_BUFFER_MEMORY = "KAFKA_BUFFER_MEMORY"; + public static final String HTTP_URI = "HTTP_URI"; + private ConfigurationKeys() {} } diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/HttpRecordSender.java b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/HttpRecordSender.java index 8d64d59a3d3784868ddc00fe9834f87a00d087d4..baa230b76e3cfa5d166c4be396f7685dc36e7487 100644 --- a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/HttpRecordSender.java +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/HttpRecordSender.java @@ -1,12 +1,18 @@ package theodolite.commons.workloadgeneration; -import java.io.IOException; +import com.google.gson.Gson; import java.net.URI; import java.net.http.HttpClient; import java.net.http.HttpRequest; +import java.net.http.HttpResponse; import java.net.http.HttpResponse.BodyHandler; import java.net.http.HttpResponse.BodyHandlers; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import org.apache.avro.specific.SpecificRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Sends monitoring records via HTTP. @@ -15,39 +21,69 @@ import org.apache.avro.specific.SpecificRecord; */ public class HttpRecordSender<T extends SpecificRecord> implements RecordSender<T> { - // private static final Logger LOGGER = LoggerFactory.getLogger(HttpRecordSender.class); + private static final Logger LOGGER = LoggerFactory.getLogger(HttpRecordSender.class); - private final HttpClient httpClient; + private final Gson gson = new Gson(); + + private final HttpClient httpClient = HttpClient.newBuilder().build(); private final URI uri; private final boolean async; + private final List<Integer> validStatusCodes; + /** * Create a new {@link HttpRecordSender}. + * + * @param uri the {@link URI} records should be sent to */ public HttpRecordSender(final URI uri) { - this.httpClient = HttpClient.newBuilder().build(); + this(uri, true, List.of(200)); + } + + /** + * Create a new {@link HttpRecordSender}. + * + * @param uri the {@link URI} records should be sent to + * @param async whether HTTP requests should be sent asynchronous + * @param validStatusCodes a list of HTTP status codes which are considered as successful + */ + public HttpRecordSender(final URI uri, final boolean async, + final List<Integer> validStatusCodes) { this.uri = uri; - this.async = true; + this.async = async; + this.validStatusCodes = validStatusCodes; } @Override public void send(final T message) { + final String json = this.gson.toJson(message); final HttpRequest request = HttpRequest.newBuilder() - .uri(this.uri) // TODO - .POST(HttpRequest.BodyPublishers.ofString(message.toString())) // TODO to JSON + .uri(this.uri) + .POST(HttpRequest.BodyPublishers.ofString(json)) .build(); final BodyHandler<Void> bodyHandler = BodyHandlers.discarding(); // final BodyHandler<String> bodyHandler = BodyHandlers.ofString(); + + final CompletableFuture<HttpResponse<Void>> result = + this.httpClient.sendAsync(request, bodyHandler) + .whenComplete((response, exception) -> { + if (exception != null) { // NOPMD + LOGGER.warn("Couldn't send request to {}.", this.uri, exception); + } else if (!this.validStatusCodes.contains(response.statusCode())) { // NOPMD + LOGGER.warn("Received status code {} for request to {}.", response.statusCode(), + this.uri); + } else { + LOGGER.debug("Sucessfully sent request to {} (status={}).", this.uri, + response.statusCode()); + } + }); if (this.async) { - this.httpClient.sendAsync(request, bodyHandler); - // this.httpClient.sendAsync(request, bodyHandler).thenAccept(s -> System.out.println(s)); - } else { try { - this.httpClient.send(request, bodyHandler); - } catch (IOException | InterruptedException e) { - throw new IllegalStateException(e); // TODO + result.get(); + } catch (InterruptedException | ExecutionException e) { + LOGGER.error("Couldn't get result for request to {}.", this.uri, e); } } } diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGenerator.java b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGenerator.java index 9507f20492e8f2b4b0517741d0c4b8bc50399557..206a7acd2eb597a55c3c0429dcee11c29c7c88db 100644 --- a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGenerator.java +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGenerator.java @@ -1,11 +1,13 @@ package theodolite.commons.workloadgeneration; +import java.net.URI; import java.time.Duration; import java.util.Objects; import java.util.Properties; import org.apache.kafka.clients.producer.ProducerConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import titan.ccp.model.records.ActivePowerRecord; /** * A Theodolite load generator. @@ -20,9 +22,11 @@ public final class LoadGenerator { private static final int PERIOD_MS_DEFAULT = 1000; private static final int VALUE_DEFAULT = 10; private static final int THREADS_DEFAULT = 4; + private static final LoadGeneratorTarget TARGET_DEFAULT = LoadGeneratorTarget.KAFKA; private static final String SCHEMA_REGISTRY_URL_DEFAULT = "http://localhost:8081"; private static final String KAFKA_TOPIC_DEFAULT = "input"; private static final String KAFKA_BOOTSTRAP_SERVERS_DEFAULT = "localhost:9092"; // NOPMD + private static final String HTTP_URI_DEFAULT = "http://localhost:8080"; private ClusterConfig clusterConfig; private WorkloadDefinition loadDefinition; @@ -134,6 +138,43 @@ public final class LoadGenerator { clusterConfig.setClusterNamePrefix(portAutoIncrement); } + final LoadGeneratorTarget target = LoadGeneratorTarget.from( + Objects.requireNonNullElse(System.getenv(ConfigurationKeys.TARGET), + TARGET_DEFAULT.value)); + + final RecordSender<ActivePowerRecord> recordSender; // NOPMD + if (target == LoadGeneratorTarget.KAFKA) { + final String kafkaBootstrapServers = Objects.requireNonNullElse( + System.getenv(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS), + KAFKA_BOOTSTRAP_SERVERS_DEFAULT); + final String kafkaInputTopic = Objects.requireNonNullElse( + System.getenv(ConfigurationKeys.KAFKA_INPUT_TOPIC), + KAFKA_TOPIC_DEFAULT); + final String schemaRegistryUrl = Objects.requireNonNullElse( + System.getenv(ConfigurationKeys.SCHEMA_REGISTRY_URL), + SCHEMA_REGISTRY_URL_DEFAULT); + final Properties kafkaProperties = new Properties(); + kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, + (k, v) -> System.getenv(ConfigurationKeys.KAFKA_BATCH_SIZE)); + kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG, + (k, v) -> System.getenv(ConfigurationKeys.KAFKA_LINGER_MS)); + kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG, + (k, v) -> System.getenv(ConfigurationKeys.KAFKA_BUFFER_MEMORY)); + recordSender = TitanKafkaSenderFactory.forKafkaConfig( + kafkaBootstrapServers, + kafkaInputTopic, + schemaRegistryUrl); + } else if (target == LoadGeneratorTarget.HTTP) { + final URI uri = URI.create( + Objects.requireNonNullElse( + System.getenv(ConfigurationKeys.HTTP_URI), + HTTP_URI_DEFAULT)); + recordSender = new HttpRecordSender<>(uri); + } else { + // Should never happen + throw new IllegalStateException("Target " + target + " is not handled yet."); + } + final int numSensors = Integer.parseInt(Objects.requireNonNullElse( System.getenv(ConfigurationKeys.NUM_SENSORS), Integer.toString(NUMBER_OF_KEYS_DEFAULT))); @@ -146,22 +187,6 @@ public final class LoadGenerator { final int threads = Integer.parseInt(Objects.requireNonNullElse( System.getenv(ConfigurationKeys.THREADS), Integer.toString(THREADS_DEFAULT))); - final String kafkaBootstrapServers = Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS), - KAFKA_BOOTSTRAP_SERVERS_DEFAULT); - final String kafkaInputTopic = Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.KAFKA_INPUT_TOPIC), - KAFKA_TOPIC_DEFAULT); - final String schemaRegistryUrl = Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.SCHEMA_REGISTRY_URL), - SCHEMA_REGISTRY_URL_DEFAULT); - final Properties kafkaProperties = new Properties(); - kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, - (k, v) -> System.getenv(ConfigurationKeys.KAFKA_BATCH_SIZE)); - kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG, - (k, v) -> System.getenv(ConfigurationKeys.KAFKA_LINGER_MS)); - kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG, - (k, v) -> System.getenv(ConfigurationKeys.KAFKA_BUFFER_MEMORY)); return new LoadGenerator() .setClusterConfig(clusterConfig) @@ -170,11 +195,7 @@ public final class LoadGenerator { Duration.ofMillis(periodMs))) .setGeneratorConfig(new LoadGeneratorConfig( TitanRecordGeneratorFactory.forConstantValue(value), - TitanKafkaSenderFactory.forKafkaConfig( - kafkaBootstrapServers, - kafkaInputTopic, - schemaRegistryUrl))) - // new HttpRecordSender<ActivePowerRecord>(URI.create("http://localhost:8080")))) + recordSender)) .withThreads(threads); } diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGeneratorTarget.java b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGeneratorTarget.java new file mode 100644 index 0000000000000000000000000000000000000000..567c0388a49e42840a9e1eab0a448231669f2e47 --- /dev/null +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGeneratorTarget.java @@ -0,0 +1,22 @@ +package theodolite.commons.workloadgeneration; + +import java.util.stream.Stream; + +enum LoadGeneratorTarget { + + KAFKA("kafka"), HTTP("http"); + + final String value; + + LoadGeneratorTarget(final String value) { + this.value = value; + } + + static LoadGeneratorTarget from(final String value) { + return Stream.of(LoadGeneratorTarget.values()) + .filter(t -> t.value.equals(value)) + .findFirst() + .orElseThrow(() -> new IllegalArgumentException("Target '" + value + "' does not exist.")); + } + +} diff --git a/theodolite-benchmarks/load-generator-commons/src/test/java/theodolite/commons/workloadgeneration/KeySpaceTest.java b/theodolite-benchmarks/load-generator-commons/src/test/java/theodolite/commons/workloadgeneration/KeySpaceTest.java index 20c094ddcc7ff110a25aaffa494766e89d4d2475..49004839a9c8fd280aba5006a1f08c2acb3c3136 100644 --- a/theodolite-benchmarks/load-generator-commons/src/test/java/theodolite/commons/workloadgeneration/KeySpaceTest.java +++ b/theodolite-benchmarks/load-generator-commons/src/test/java/theodolite/commons/workloadgeneration/KeySpaceTest.java @@ -2,7 +2,6 @@ package theodolite.commons.workloadgeneration; import org.junit.Assert; import org.junit.Test; -import theodolite.commons.workloadgeneration.KeySpace; public class KeySpaceTest { diff --git a/theodolite-benchmarks/load-generator-commons/src/test/java/theodolite/commons/workloadgeneration/LoadGeneratorTargetTest.java b/theodolite-benchmarks/load-generator-commons/src/test/java/theodolite/commons/workloadgeneration/LoadGeneratorTargetTest.java new file mode 100644 index 0000000000000000000000000000000000000000..644ffad9a4d2732f72ac307294d1311eba3a9ce8 --- /dev/null +++ b/theodolite-benchmarks/load-generator-commons/src/test/java/theodolite/commons/workloadgeneration/LoadGeneratorTargetTest.java @@ -0,0 +1,26 @@ +package theodolite.commons.workloadgeneration; + +import org.junit.Assert; +import org.junit.Test; + +public class LoadGeneratorTargetTest { + + @Test + public void testFromKafka() { + final LoadGeneratorTarget target = LoadGeneratorTarget.from("kafka"); + Assert.assertEquals(LoadGeneratorTarget.KAFKA, target); + } + + @Test + public void testFromHttp() { + final LoadGeneratorTarget target = LoadGeneratorTarget.from("http"); + Assert.assertEquals(LoadGeneratorTarget.HTTP, target); + } + + @Test(expected = IllegalArgumentException.class) + public void testFromInvalidTarget() { + LoadGeneratorTarget.from("<invalid-target>"); + } + + +}