diff --git a/docs/theodolite-benchmarks/load-generator.md b/docs/theodolite-benchmarks/load-generator.md index 5ae10d16a50aaa16a76975d8127ef379508b1a37..a41c97d52f62f399c9289a15a64991d0fed228ce 100644 --- a/docs/theodolite-benchmarks/load-generator.md +++ b/docs/theodolite-benchmarks/load-generator.md @@ -56,6 +56,7 @@ The prebuilt container images can be configured with the following environment v | `KAFKA_BUFFER_MEMORY` | Value for the Kafka producer configuration: [`buffer.memory`](https://kafka.apache.org/documentation/#producerconfigs_buffer.memory) Only used if Kafka is set as `TARGET`. | see Kafka producer config: [`buffer.memory`](https://kafka.apache.org/documentation/#producerconfigs_buffer.memory) | | `HTTP_URL` | The URL the load generator should post messages to. Only used if HTTP is set as `TARGET`. | | | `HTTP_ASYNC` | Whether the load generator should send HTTP messages asynchronously. Only used if HTTP is set as `TARGET`. | `false` | +| `HTTP_TIMEOUT_MS` | Timeout in milliseconds for sending HTTP messages. Only used if HTTP is set as `TARGET`. | 1000 | | `PUBSUB_INPUT_TOPIC` | The Google Cloud Pub/Sub topic to write messages to. Only used if Pub/Sub is set as `TARGET`. | input | | `PUBSUB_PROJECT` | The Google Cloud this Pub/Sub topic is associated with. Only used if Pub/Sub is set as `TARGET`. | | | `PUBSUB_EMULATOR_HOST` | A Pub/Sub emulator host. Only used if Pub/Sub is set as `TARGET`. | | diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/ConfigurationKeys.java b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/ConfigurationKeys.java index efb7db61cc4c81ec2d1ffd49141d6d70a23dacaa..eb80d25eb327f2e3dc10dc2977131ac7edfef69d 100644 --- a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/ConfigurationKeys.java +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/ConfigurationKeys.java @@ -43,6 +43,8 @@ public final class ConfigurationKeys { public static final String HTTP_ASYNC = "HTTP_ASYNC"; + public static final String HTTP_TIMEOUT_MS = "HTTP_TIMEOUT_MS"; + public static final String PUBSUB_INPUT_TOPIC = "PUBSUB_INPUT_TOPIC"; public static final String PUBSUB_PROJECT = "PUBSUB_PROJECT"; diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/EnvVarLoadGeneratorFactory.java b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/EnvVarLoadGeneratorFactory.java index 29ede821eefe171f377d58fce8d98eee28e8a277..3772e3b03e39092d8e9181bac0f553934ee73e2d 100644 --- a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/EnvVarLoadGeneratorFactory.java +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/EnvVarLoadGeneratorFactory.java @@ -122,7 +122,10 @@ class EnvVarLoadGeneratorFactory { final boolean async = Boolean.parseBoolean(Objects.requireNonNullElse( System.getenv(ConfigurationKeys.HTTP_ASYNC), Boolean.toString(LoadGenerator.HTTP_ASYNC_DEFAULT))); - recordSender = new HttpRecordSender<>(url, async); + final long timeoutMs = Integer.parseInt(Objects.requireNonNullElse( + System.getenv(ConfigurationKeys.HTTP_TIMEOUT_MS), + Long.toString(LoadGenerator.HTTP_TIMEOUT_MS_DEFAULT))); + recordSender = new HttpRecordSender<>(url, async, Duration.ofSeconds(timeoutMs)); LOGGER.info("Use HTTP server as target with URL '{}' and asynchronously: '{}'.", url, async); } else if (target == LoadGeneratorTarget.PUBSUB) { final String project = System.getenv(ConfigurationKeys.PUBSUB_PROJECT); diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/HttpRecordSender.java b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/HttpRecordSender.java index 124f11a979f3afad5507db86c68e9eeb42c64eb6..77706d824808132eaa7212194de0d69c346e4eba 100644 --- a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/HttpRecordSender.java +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/HttpRecordSender.java @@ -24,7 +24,7 @@ public class HttpRecordSender<T extends SpecificRecord> implements RecordSender< private static final int HTTP_OK = 200; - private static final Duration CONNECTION_TIMEOUT = Duration.ofSeconds(1); + private static final Duration DEFAULT_CONNECTION_TIMEOUT = Duration.ofSeconds(1); private static final Logger LOGGER = LoggerFactory.getLogger(HttpRecordSender.class); @@ -36,6 +36,8 @@ public class HttpRecordSender<T extends SpecificRecord> implements RecordSender< private final boolean async; + private final Duration connectionTimeout; + private final List<Integer> validStatusCodes; /** @@ -44,7 +46,7 @@ public class HttpRecordSender<T extends SpecificRecord> implements RecordSender< * @param uri the {@link URI} records should be sent to */ public HttpRecordSender(final URI uri) { - this(uri, false, List.of(HTTP_OK)); + this(uri, false, DEFAULT_CONNECTION_TIMEOUT); } /** @@ -52,9 +54,10 @@ public class HttpRecordSender<T extends SpecificRecord> implements RecordSender< * * @param uri the {@link URI} records should be sent to * @param async whether HTTP requests should be sent asynchronous + * @param connectionTimeout timeout for the HTTP connection */ - public HttpRecordSender(final URI uri, final boolean async) { - this(uri, async, List.of(HTTP_OK)); + public HttpRecordSender(final URI uri, final boolean async, final Duration connectionTimeout) { + this(uri, async, connectionTimeout, List.of(HTTP_OK)); } /** @@ -62,12 +65,17 @@ public class HttpRecordSender<T extends SpecificRecord> implements RecordSender< * * @param uri the {@link URI} records should be sent to * @param async whether HTTP requests should be sent asynchronous + * @param connectionTimeout timeout for the HTTP connection * @param validStatusCodes a list of HTTP status codes which are considered as successful */ - public HttpRecordSender(final URI uri, final boolean async, + public HttpRecordSender( + final URI uri, + final boolean async, + final Duration connectionTimeout, final List<Integer> validStatusCodes) { this.uri = uri; this.async = async; + this.connectionTimeout = connectionTimeout; this.validStatusCodes = validStatusCodes; } @@ -76,7 +84,7 @@ public class HttpRecordSender<T extends SpecificRecord> implements RecordSender< final String json = this.gson.toJson(message); final HttpRequest request = HttpRequest.newBuilder() .uri(this.uri) - .timeout(CONNECTION_TIMEOUT) + .timeout(this.connectionTimeout) .POST(HttpRequest.BodyPublishers.ofString(json)) .build(); final BodyHandler<Void> bodyHandler = BodyHandlers.discarding(); @@ -98,8 +106,10 @@ public class HttpRecordSender<T extends SpecificRecord> implements RecordSender< if (this.isSync()) { try { result.get(); - } catch (InterruptedException | ExecutionException e) { + } catch (final InterruptedException e) { LOGGER.error("Couldn't get result for request to {}.", this.uri, e); + } catch (final ExecutionException e) { // NOPMD + // Do nothing, Exception is already handled } } } diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/LoadGenerator.java b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/LoadGenerator.java index 27edb97efc335400acf1d6244db0ce384ee20f59..4be9b5695a54dedac6df78e6ceb8230752301e22 100644 --- a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/LoadGenerator.java +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/LoadGenerator.java @@ -18,6 +18,7 @@ public final class LoadGenerator { // Target: HTTP public static final String HTTP_URI_DEFAULT = "http://localhost:8080"; public static final boolean HTTP_ASYNC_DEFAULT = false; + public static final long HTTP_TIMEOUT_MS_DEFAULT = 1_000; // Target: Kafka public static final String SCHEMA_REGISTRY_URL_DEFAULT = "http://localhost:8081"; public static final String KAFKA_TOPIC_DEFAULT = "input"; // NOCS diff --git a/theodolite-benchmarks/load-generator-commons/src/test/java/rocks/theodolite/benchmarks/loadgenerator/HttpRecordSenderTest.java b/theodolite-benchmarks/load-generator-commons/src/test/java/rocks/theodolite/benchmarks/loadgenerator/HttpRecordSenderTest.java index 7c565ace82698bf47f6b3711a28e08f87e8e412b..731dda6c74fd3cd6d74771f95896c2260ce6df29 100644 --- a/theodolite-benchmarks/load-generator-commons/src/test/java/rocks/theodolite/benchmarks/loadgenerator/HttpRecordSenderTest.java +++ b/theodolite-benchmarks/load-generator-commons/src/test/java/rocks/theodolite/benchmarks/loadgenerator/HttpRecordSenderTest.java @@ -46,4 +46,22 @@ public class HttpRecordSenderTest { .withRequestBody(equalTo(expectedJson))); // toJson } + @Test + public void testTimeout() { + this.wireMockRule.stubFor( + post(urlPathEqualTo("/")) + .willReturn( + aResponse() + .withFixedDelay(2_000) + .withStatus(200) + .withBody("received"))); + + final ActivePowerRecord record = new ActivePowerRecord("my-id", 12345L, 12.34); + this.httpRecordSender.send(record); + + final String expectedJson = "{\"identifier\":\"my-id\",\"timestamp\":12345,\"valueInW\":12.34}"; + verify(exactly(1), postRequestedFor(urlEqualTo("/")) + .withRequestBody(equalTo(expectedJson))); // toJson + } + }