Skip to content
Snippets Groups Projects
Commit 1df0bc7a authored by Sören Henning's avatar Sören Henning
Browse files

Make HTTP connection timeout configurable

parent 2e509237
No related branches found
No related tags found
No related merge requests found
Pipeline #7037 canceled
......@@ -56,6 +56,7 @@ The prebuilt container images can be configured with the following environment v
| `KAFKA_BUFFER_MEMORY` | Value for the Kafka producer configuration: [`buffer.memory`](https://kafka.apache.org/documentation/#producerconfigs_buffer.memory) Only used if Kafka is set as `TARGET`. | see Kafka producer config: [`buffer.memory`](https://kafka.apache.org/documentation/#producerconfigs_buffer.memory) |
| `HTTP_URL` | The URL the load generator should post messages to. Only used if HTTP is set as `TARGET`. | |
| `HTTP_ASYNC` | Whether the load generator should send HTTP messages asynchronously. Only used if HTTP is set as `TARGET`. | `false` |
| `HTTP_TIMEOUT_MS` | Timeout in milliseconds for sending HTTP messages. Only used if HTTP is set as `TARGET`. | 1000 |
| `PUBSUB_INPUT_TOPIC` | The Google Cloud Pub/Sub topic to write messages to. Only used if Pub/Sub is set as `TARGET`. | input |
| `PUBSUB_PROJECT` | The Google Cloud this Pub/Sub topic is associated with. Only used if Pub/Sub is set as `TARGET`. | |
| `PUBSUB_EMULATOR_HOST` | A Pub/Sub emulator host. Only used if Pub/Sub is set as `TARGET`. | |
......
......@@ -43,6 +43,8 @@ public final class ConfigurationKeys {
public static final String HTTP_ASYNC = "HTTP_ASYNC";
public static final String HTTP_TIMEOUT_MS = "HTTP_TIMEOUT_MS";
public static final String PUBSUB_INPUT_TOPIC = "PUBSUB_INPUT_TOPIC";
public static final String PUBSUB_PROJECT = "PUBSUB_PROJECT";
......
......@@ -122,7 +122,10 @@ class EnvVarLoadGeneratorFactory {
final boolean async = Boolean.parseBoolean(Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.HTTP_ASYNC),
Boolean.toString(LoadGenerator.HTTP_ASYNC_DEFAULT)));
recordSender = new HttpRecordSender<>(url, async);
final long timeoutMs = Integer.parseInt(Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.HTTP_TIMEOUT_MS),
Long.toString(LoadGenerator.HTTP_TIMEOUT_MS_DEFAULT)));
recordSender = new HttpRecordSender<>(url, async, Duration.ofSeconds(timeoutMs));
LOGGER.info("Use HTTP server as target with URL '{}' and asynchronously: '{}'.", url, async);
} else if (target == LoadGeneratorTarget.PUBSUB) {
final String project = System.getenv(ConfigurationKeys.PUBSUB_PROJECT);
......
......@@ -24,7 +24,7 @@ public class HttpRecordSender<T extends SpecificRecord> implements RecordSender<
private static final int HTTP_OK = 200;
private static final Duration CONNECTION_TIMEOUT = Duration.ofSeconds(1);
private static final Duration DEFAULT_CONNECTION_TIMEOUT = Duration.ofSeconds(1);
private static final Logger LOGGER = LoggerFactory.getLogger(HttpRecordSender.class);
......@@ -36,6 +36,8 @@ public class HttpRecordSender<T extends SpecificRecord> implements RecordSender<
private final boolean async;
private final Duration connectionTimeout;
private final List<Integer> validStatusCodes;
/**
......@@ -44,7 +46,7 @@ public class HttpRecordSender<T extends SpecificRecord> implements RecordSender<
* @param uri the {@link URI} records should be sent to
*/
public HttpRecordSender(final URI uri) {
this(uri, false, List.of(HTTP_OK));
this(uri, false, DEFAULT_CONNECTION_TIMEOUT);
}
/**
......@@ -52,9 +54,10 @@ public class HttpRecordSender<T extends SpecificRecord> implements RecordSender<
*
* @param uri the {@link URI} records should be sent to
* @param async whether HTTP requests should be sent asynchronous
* @param connectionTimeout timeout for the HTTP connection
*/
public HttpRecordSender(final URI uri, final boolean async) {
this(uri, async, List.of(HTTP_OK));
public HttpRecordSender(final URI uri, final boolean async, final Duration connectionTimeout) {
this(uri, async, connectionTimeout, List.of(HTTP_OK));
}
/**
......@@ -62,12 +65,17 @@ public class HttpRecordSender<T extends SpecificRecord> implements RecordSender<
*
* @param uri the {@link URI} records should be sent to
* @param async whether HTTP requests should be sent asynchronous
* @param connectionTimeout timeout for the HTTP connection
* @param validStatusCodes a list of HTTP status codes which are considered as successful
*/
public HttpRecordSender(final URI uri, final boolean async,
public HttpRecordSender(
final URI uri,
final boolean async,
final Duration connectionTimeout,
final List<Integer> validStatusCodes) {
this.uri = uri;
this.async = async;
this.connectionTimeout = connectionTimeout;
this.validStatusCodes = validStatusCodes;
}
......@@ -76,7 +84,7 @@ public class HttpRecordSender<T extends SpecificRecord> implements RecordSender<
final String json = this.gson.toJson(message);
final HttpRequest request = HttpRequest.newBuilder()
.uri(this.uri)
.timeout(CONNECTION_TIMEOUT)
.timeout(this.connectionTimeout)
.POST(HttpRequest.BodyPublishers.ofString(json))
.build();
final BodyHandler<Void> bodyHandler = BodyHandlers.discarding();
......@@ -98,8 +106,10 @@ public class HttpRecordSender<T extends SpecificRecord> implements RecordSender<
if (this.isSync()) {
try {
result.get();
} catch (InterruptedException | ExecutionException e) {
} catch (final InterruptedException e) {
LOGGER.error("Couldn't get result for request to {}.", this.uri, e);
} catch (final ExecutionException e) { // NOPMD
// Do nothing, Exception is already handled
}
}
}
......
......@@ -18,6 +18,7 @@ public final class LoadGenerator {
// Target: HTTP
public static final String HTTP_URI_DEFAULT = "http://localhost:8080";
public static final boolean HTTP_ASYNC_DEFAULT = false;
public static final long HTTP_TIMEOUT_MS_DEFAULT = 1_000;
// Target: Kafka
public static final String SCHEMA_REGISTRY_URL_DEFAULT = "http://localhost:8081";
public static final String KAFKA_TOPIC_DEFAULT = "input"; // NOCS
......
......@@ -46,4 +46,22 @@ public class HttpRecordSenderTest {
.withRequestBody(equalTo(expectedJson))); // toJson
}
@Test
public void testTimeout() {
this.wireMockRule.stubFor(
post(urlPathEqualTo("/"))
.willReturn(
aResponse()
.withFixedDelay(2_000)
.withStatus(200)
.withBody("received")));
final ActivePowerRecord record = new ActivePowerRecord("my-id", 12345L, 12.34);
this.httpRecordSender.send(record);
final String expectedJson = "{\"identifier\":\"my-id\",\"timestamp\":12345,\"valueInW\":12.34}";
verify(exactly(1), postRequestedFor(urlEqualTo("/"))
.withRequestBody(equalTo(expectedJson))); // toJson
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment