Skip to content
Snippets Groups Projects
Commit f769f8e7 authored by Sören Henning's avatar Sören Henning
Browse files

Configure HTTP record sender via env variable

parent 0c0ec0c8
Branches
Tags
1 merge request!202Add option to generate load via HTTP
Pipeline #6062 failed
......@@ -23,6 +23,8 @@ public final class ConfigurationKeys {
public static final String THREADS = "THREADS";
public static final String TARGET = "TARGET";
public static final String KAFKA_BOOTSTRAP_SERVERS = "KAFKA_BOOTSTRAP_SERVERS";
public static final String SCHEMA_REGISTRY_URL = "SCHEMA_REGISTRY_URL";
......@@ -35,6 +37,8 @@ public final class ConfigurationKeys {
public static final String KAFKA_BUFFER_MEMORY = "KAFKA_BUFFER_MEMORY";
public static final String HTTP_URI = "HTTP_URI";
private ConfigurationKeys() {}
}
package theodolite.commons.workloadgeneration;
import java.io.IOException;
import com.google.gson.Gson;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodyHandler;
import java.net.http.HttpResponse.BodyHandlers;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.avro.specific.SpecificRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Sends monitoring records via HTTP.
......@@ -15,39 +21,69 @@ import org.apache.avro.specific.SpecificRecord;
*/
public class HttpRecordSender<T extends SpecificRecord> implements RecordSender<T> {
// private static final Logger LOGGER = LoggerFactory.getLogger(HttpRecordSender.class);
private static final Logger LOGGER = LoggerFactory.getLogger(HttpRecordSender.class);
private final HttpClient httpClient;
private final Gson gson = new Gson();
private final HttpClient httpClient = HttpClient.newBuilder().build();
private final URI uri;
private final boolean async;
private final List<Integer> validStatusCodes;
/**
* Create a new {@link HttpRecordSender}.
*
* @param uri the {@link URI} records should be sent to
*/
public HttpRecordSender(final URI uri) {
this.httpClient = HttpClient.newBuilder().build();
this(uri, true, List.of(200));
}
/**
* Create a new {@link HttpRecordSender}.
*
* @param uri the {@link URI} records should be sent to
* @param async whether HTTP requests should be sent asynchronous
* @param validStatusCodes a list of HTTP status codes which are considered as successful
*/
public HttpRecordSender(final URI uri, final boolean async,
final List<Integer> validStatusCodes) {
this.uri = uri;
this.async = true;
this.async = async;
this.validStatusCodes = validStatusCodes;
}
@Override
public void send(final T message) {
final String json = this.gson.toJson(message);
final HttpRequest request = HttpRequest.newBuilder()
.uri(this.uri) // TODO
.POST(HttpRequest.BodyPublishers.ofString(message.toString())) // TODO to JSON
.uri(this.uri)
.POST(HttpRequest.BodyPublishers.ofString(json))
.build();
final BodyHandler<Void> bodyHandler = BodyHandlers.discarding();
// final BodyHandler<String> bodyHandler = BodyHandlers.ofString();
if (this.async) {
this.httpClient.sendAsync(request, bodyHandler);
// this.httpClient.sendAsync(request, bodyHandler).thenAccept(s -> System.out.println(s));
final CompletableFuture<HttpResponse<Void>> result =
this.httpClient.sendAsync(request, bodyHandler)
.whenComplete((response, exception) -> {
if (exception != null) { // NOPMD
LOGGER.warn("Couldn't send request to {}.", this.uri, exception);
} else if (!this.validStatusCodes.contains(response.statusCode())) { // NOPMD
LOGGER.warn("Received status code {} for request to {}.", response.statusCode(),
this.uri);
} else {
LOGGER.debug("Sucessfully sent request to {} (status={}).", this.uri,
response.statusCode());
}
});
if (this.async) {
try {
this.httpClient.send(request, bodyHandler);
} catch (IOException | InterruptedException e) {
throw new IllegalStateException(e); // TODO
result.get();
} catch (InterruptedException | ExecutionException e) {
LOGGER.error("Couldn't get result for request to {}.", this.uri, e);
}
}
}
......
package theodolite.commons.workloadgeneration;
import java.net.URI;
import java.time.Duration;
import java.util.Objects;
import java.util.Properties;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import titan.ccp.model.records.ActivePowerRecord;
/**
* A Theodolite load generator.
......@@ -20,9 +22,11 @@ public final class LoadGenerator {
private static final int PERIOD_MS_DEFAULT = 1000;
private static final int VALUE_DEFAULT = 10;
private static final int THREADS_DEFAULT = 4;
private static final LoadGeneratorTarget TARGET_DEFAULT = LoadGeneratorTarget.KAFKA;
private static final String SCHEMA_REGISTRY_URL_DEFAULT = "http://localhost:8081";
private static final String KAFKA_TOPIC_DEFAULT = "input";
private static final String KAFKA_BOOTSTRAP_SERVERS_DEFAULT = "localhost:9092"; // NOPMD
private static final String HTTP_URI_DEFAULT = "http://localhost:8080";
private ClusterConfig clusterConfig;
private WorkloadDefinition loadDefinition;
......@@ -134,18 +138,12 @@ public final class LoadGenerator {
clusterConfig.setClusterNamePrefix(portAutoIncrement);
}
final int numSensors = Integer.parseInt(Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.NUM_SENSORS),
Integer.toString(NUMBER_OF_KEYS_DEFAULT)));
final int periodMs = Integer.parseInt(Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.PERIOD_MS),
Integer.toString(PERIOD_MS_DEFAULT)));
final double value = Double.parseDouble(Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.VALUE),
Integer.toString(VALUE_DEFAULT)));
final int threads = Integer.parseInt(Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.THREADS),
Integer.toString(THREADS_DEFAULT)));
final LoadGeneratorTarget target = LoadGeneratorTarget.from(
Objects.requireNonNullElse(System.getenv(ConfigurationKeys.TARGET),
TARGET_DEFAULT.value));
final RecordSender<ActivePowerRecord> recordSender; // NOPMD
if (target == LoadGeneratorTarget.KAFKA) {
final String kafkaBootstrapServers = Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS),
KAFKA_BOOTSTRAP_SERVERS_DEFAULT);
......@@ -162,6 +160,33 @@ public final class LoadGenerator {
(k, v) -> System.getenv(ConfigurationKeys.KAFKA_LINGER_MS));
kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG,
(k, v) -> System.getenv(ConfigurationKeys.KAFKA_BUFFER_MEMORY));
recordSender = TitanKafkaSenderFactory.forKafkaConfig(
kafkaBootstrapServers,
kafkaInputTopic,
schemaRegistryUrl);
} else if (target == LoadGeneratorTarget.HTTP) {
final URI uri = URI.create(
Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.HTTP_URI),
HTTP_URI_DEFAULT));
recordSender = new HttpRecordSender<>(uri);
} else {
// Should never happen
throw new IllegalStateException("Target " + target + " is not handled yet.");
}
final int numSensors = Integer.parseInt(Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.NUM_SENSORS),
Integer.toString(NUMBER_OF_KEYS_DEFAULT)));
final int periodMs = Integer.parseInt(Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.PERIOD_MS),
Integer.toString(PERIOD_MS_DEFAULT)));
final double value = Double.parseDouble(Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.VALUE),
Integer.toString(VALUE_DEFAULT)));
final int threads = Integer.parseInt(Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.THREADS),
Integer.toString(THREADS_DEFAULT)));
return new LoadGenerator()
.setClusterConfig(clusterConfig)
......@@ -170,11 +195,7 @@ public final class LoadGenerator {
Duration.ofMillis(periodMs)))
.setGeneratorConfig(new LoadGeneratorConfig(
TitanRecordGeneratorFactory.forConstantValue(value),
TitanKafkaSenderFactory.forKafkaConfig(
kafkaBootstrapServers,
kafkaInputTopic,
schemaRegistryUrl)))
// new HttpRecordSender<ActivePowerRecord>(URI.create("http://localhost:8080"))))
recordSender))
.withThreads(threads);
}
......
package theodolite.commons.workloadgeneration;
import java.util.stream.Stream;
enum LoadGeneratorTarget {
KAFKA("kafka"), HTTP("http");
final String value;
LoadGeneratorTarget(final String value) {
this.value = value;
}
static LoadGeneratorTarget from(final String value) {
return Stream.of(LoadGeneratorTarget.values())
.filter(t -> t.value.equals(value))
.findFirst()
.orElseThrow(() -> new IllegalArgumentException("Target '" + value + "' does not exist."));
}
}
......@@ -2,7 +2,6 @@ package theodolite.commons.workloadgeneration;
import org.junit.Assert;
import org.junit.Test;
import theodolite.commons.workloadgeneration.KeySpace;
public class KeySpaceTest {
......
package theodolite.commons.workloadgeneration;
import org.junit.Assert;
import org.junit.Test;
public class LoadGeneratorTargetTest {
@Test
public void testFromKafka() {
final LoadGeneratorTarget target = LoadGeneratorTarget.from("kafka");
Assert.assertEquals(LoadGeneratorTarget.KAFKA, target);
}
@Test
public void testFromHttp() {
final LoadGeneratorTarget target = LoadGeneratorTarget.from("http");
Assert.assertEquals(LoadGeneratorTarget.HTTP, target);
}
@Test(expected = IllegalArgumentException.class)
public void testFromInvalidTarget() {
LoadGeneratorTarget.from("<invalid-target>");
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment