Skip to content
Snippets Groups Projects
Commit dc814193 authored by Sören Henning's avatar Sören Henning
Browse files

Merge branch 'master' into generic-resource-support

parents de0ad9f9 fff50f26
No related branches found
No related tags found
1 merge request!257Allow Theodolite to deploy arbitrary resources
Pipeline #7039 passed
......@@ -56,6 +56,7 @@ The prebuilt container images can be configured with the following environment v
| `KAFKA_BUFFER_MEMORY` | Value for the Kafka producer configuration: [`buffer.memory`](https://kafka.apache.org/documentation/#producerconfigs_buffer.memory) Only used if Kafka is set as `TARGET`. | see Kafka producer config: [`buffer.memory`](https://kafka.apache.org/documentation/#producerconfigs_buffer.memory) |
| `HTTP_URL` | The URL the load generator should post messages to. Only used if HTTP is set as `TARGET`. | |
| `HTTP_ASYNC` | Whether the load generator should send HTTP messages asynchronously. Only used if HTTP is set as `TARGET`. | `false` |
| `HTTP_TIMEOUT_MS` | Timeout in milliseconds for sending HTTP messages. Only used if HTTP is set as `TARGET`. | 1000 |
| `PUBSUB_INPUT_TOPIC` | The Google Cloud Pub/Sub topic to write messages to. Only used if Pub/Sub is set as `TARGET`. | input |
| `PUBSUB_PROJECT` | The Google Cloud this Pub/Sub topic is associated with. Only used if Pub/Sub is set as `TARGET`. | |
| `PUBSUB_EMULATOR_HOST` | A Pub/Sub emulator host. Only used if Pub/Sub is set as `TARGET`. | |
......
......@@ -43,6 +43,8 @@ public final class ConfigurationKeys {
public static final String HTTP_ASYNC = "HTTP_ASYNC";
public static final String HTTP_TIMEOUT_MS = "HTTP_TIMEOUT_MS";
public static final String PUBSUB_INPUT_TOPIC = "PUBSUB_INPUT_TOPIC";
public static final String PUBSUB_PROJECT = "PUBSUB_PROJECT";
......
......@@ -122,7 +122,10 @@ class EnvVarLoadGeneratorFactory {
final boolean async = Boolean.parseBoolean(Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.HTTP_ASYNC),
Boolean.toString(LoadGenerator.HTTP_ASYNC_DEFAULT)));
recordSender = new HttpRecordSender<>(url, async);
final long timeoutMs = Integer.parseInt(Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.HTTP_TIMEOUT_MS),
Long.toString(LoadGenerator.HTTP_TIMEOUT_MS_DEFAULT)));
recordSender = new HttpRecordSender<>(url, async, Duration.ofMillis(timeoutMs));
LOGGER.info("Use HTTP server as target with URL '{}' and asynchronously: '{}'.", url, async);
} else if (target == LoadGeneratorTarget.PUBSUB) {
final String project = System.getenv(ConfigurationKeys.PUBSUB_PROJECT);
......
......@@ -24,7 +24,7 @@ public class HttpRecordSender<T extends SpecificRecord> implements RecordSender<
private static final int HTTP_OK = 200;
private static final Duration CONNECTION_TIMEOUT = Duration.ofSeconds(1);
private static final Duration DEFAULT_CONNECTION_TIMEOUT = Duration.ofSeconds(1);
private static final Logger LOGGER = LoggerFactory.getLogger(HttpRecordSender.class);
......@@ -36,6 +36,8 @@ public class HttpRecordSender<T extends SpecificRecord> implements RecordSender<
private final boolean async;
private final Duration connectionTimeout;
private final List<Integer> validStatusCodes;
/**
......@@ -44,7 +46,7 @@ public class HttpRecordSender<T extends SpecificRecord> implements RecordSender<
* @param uri the {@link URI} records should be sent to
*/
public HttpRecordSender(final URI uri) {
this(uri, false, List.of(HTTP_OK));
this(uri, false, DEFAULT_CONNECTION_TIMEOUT);
}
/**
......@@ -52,9 +54,10 @@ public class HttpRecordSender<T extends SpecificRecord> implements RecordSender<
*
* @param uri the {@link URI} records should be sent to
* @param async whether HTTP requests should be sent asynchronous
* @param connectionTimeout timeout for the HTTP connection
*/
public HttpRecordSender(final URI uri, final boolean async) {
this(uri, async, List.of(HTTP_OK));
public HttpRecordSender(final URI uri, final boolean async, final Duration connectionTimeout) {
this(uri, async, connectionTimeout, List.of(HTTP_OK));
}
/**
......@@ -62,12 +65,17 @@ public class HttpRecordSender<T extends SpecificRecord> implements RecordSender<
*
* @param uri the {@link URI} records should be sent to
* @param async whether HTTP requests should be sent asynchronous
* @param connectionTimeout timeout for the HTTP connection
* @param validStatusCodes a list of HTTP status codes which are considered as successful
*/
public HttpRecordSender(final URI uri, final boolean async,
public HttpRecordSender(
final URI uri,
final boolean async,
final Duration connectionTimeout,
final List<Integer> validStatusCodes) {
this.uri = uri;
this.async = async;
this.connectionTimeout = connectionTimeout;
this.validStatusCodes = validStatusCodes;
}
......@@ -76,7 +84,7 @@ public class HttpRecordSender<T extends SpecificRecord> implements RecordSender<
final String json = this.gson.toJson(message);
final HttpRequest request = HttpRequest.newBuilder()
.uri(this.uri)
.timeout(CONNECTION_TIMEOUT)
.timeout(this.connectionTimeout)
.POST(HttpRequest.BodyPublishers.ofString(json))
.build();
final BodyHandler<Void> bodyHandler = BodyHandlers.discarding();
......@@ -98,8 +106,10 @@ public class HttpRecordSender<T extends SpecificRecord> implements RecordSender<
if (this.isSync()) {
try {
result.get();
} catch (InterruptedException | ExecutionException e) {
} catch (final InterruptedException e) {
LOGGER.error("Couldn't get result for request to {}.", this.uri, e);
} catch (final ExecutionException e) { // NOPMD
// Do nothing, Exception is already handled
}
}
}
......
......@@ -18,6 +18,7 @@ public final class LoadGenerator {
// Target: HTTP
public static final String HTTP_URI_DEFAULT = "http://localhost:8080";
public static final boolean HTTP_ASYNC_DEFAULT = false;
public static final long HTTP_TIMEOUT_MS_DEFAULT = 1_000;
// Target: Kafka
public static final String SCHEMA_REGISTRY_URL_DEFAULT = "http://localhost:8081";
public static final String KAFKA_TOPIC_DEFAULT = "input"; // NOCS
......
......@@ -46,4 +46,22 @@ public class HttpRecordSenderTest {
.withRequestBody(equalTo(expectedJson))); // toJson
}
@Test
public void testTimeout() {
this.wireMockRule.stubFor(
post(urlPathEqualTo("/"))
.willReturn(
aResponse()
.withFixedDelay(2_000)
.withStatus(200)
.withBody("received")));
final ActivePowerRecord record = new ActivePowerRecord("my-id", 12345L, 12.34);
this.httpRecordSender.send(record);
final String expectedJson = "{\"identifier\":\"my-id\",\"timestamp\":12345,\"valueInW\":12.34}";
verify(exactly(1), postRequestedFor(urlEqualTo("/"))
.withRequestBody(equalTo(expectedJson))); // toJson
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment