Skip to content
Snippets Groups Projects
Commit 298ad96b authored by Sören Henning's avatar Sören Henning
Browse files

Merge branch 'http-load-generator' into 'master'

Add option to generate load via HTTP

Closes #279

See merge request !202
parents cdca4888 60503b73
No related branches found
No related tags found
1 merge request!202Add option to generate load via HTTP
Pipeline #6072 passed
Showing
with 247 additions and 23 deletions
...@@ -13,14 +13,16 @@ repositories { ...@@ -13,14 +13,16 @@ repositories {
} }
dependencies { dependencies {
implementation 'com.google.guava:guava:30.1-jre'
implementation 'com.hazelcast:hazelcast:4.1.1' implementation 'com.hazelcast:hazelcast:4.1.1'
implementation 'com.hazelcast:hazelcast-kubernetes:2.2.1' implementation 'com.hazelcast:hazelcast-kubernetes:2.2.1'
implementation 'org.slf4j:slf4j-simple:1.7.25' implementation 'org.slf4j:slf4j-simple:1.7.25'
implementation 'com.google.guava:guava:30.1-jre'
implementation 'com.google.code.gson:gson:2.8.2'
implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true } implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true }
implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true } implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true }
implementation 'org.apache.kafka:kafka-streams:2.6.0' // TODO required? implementation 'org.apache.kafka:kafka-streams:2.6.0' // TODO required?
// Use JUnit test framework // Use JUnit test framework
testImplementation 'junit:junit:4.12' testImplementation 'junit:junit:4.12'
testImplementation 'com.github.tomakehurst:wiremock-jre8:2.32.0'
} }
...@@ -23,6 +23,8 @@ public final class ConfigurationKeys { ...@@ -23,6 +23,8 @@ public final class ConfigurationKeys {
public static final String THREADS = "THREADS"; public static final String THREADS = "THREADS";
public static final String TARGET = "TARGET";
public static final String KAFKA_BOOTSTRAP_SERVERS = "KAFKA_BOOTSTRAP_SERVERS"; public static final String KAFKA_BOOTSTRAP_SERVERS = "KAFKA_BOOTSTRAP_SERVERS";
public static final String SCHEMA_REGISTRY_URL = "SCHEMA_REGISTRY_URL"; public static final String SCHEMA_REGISTRY_URL = "SCHEMA_REGISTRY_URL";
...@@ -35,6 +37,8 @@ public final class ConfigurationKeys { ...@@ -35,6 +37,8 @@ public final class ConfigurationKeys {
public static final String KAFKA_BUFFER_MEMORY = "KAFKA_BUFFER_MEMORY"; public static final String KAFKA_BUFFER_MEMORY = "KAFKA_BUFFER_MEMORY";
public static final String HTTP_URI = "HTTP_URI";
private ConfigurationKeys() {} private ConfigurationKeys() {}
} }
package theodolite.commons.workloadgeneration;
import com.google.gson.Gson;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodyHandler;
import java.net.http.HttpResponse.BodyHandlers;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.avro.specific.SpecificRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Sends monitoring records via HTTP.
*
* @param <T> {@link SpecificRecord} to send
*/
public class HttpRecordSender<T extends SpecificRecord> implements RecordSender<T> {
private static final int HTTP_OK = 200;
private static final Logger LOGGER = LoggerFactory.getLogger(HttpRecordSender.class);
private final Gson gson = new Gson();
private final HttpClient httpClient = HttpClient.newBuilder().build();
private final URI uri;
private final boolean async;
private final List<Integer> validStatusCodes;
/**
* Create a new {@link HttpRecordSender}.
*
* @param uri the {@link URI} records should be sent to
*/
public HttpRecordSender(final URI uri) {
this(uri, true, List.of(HTTP_OK));
}
/**
* Create a new {@link HttpRecordSender}.
*
* @param uri the {@link URI} records should be sent to
* @param async whether HTTP requests should be sent asynchronous
* @param validStatusCodes a list of HTTP status codes which are considered as successful
*/
public HttpRecordSender(final URI uri, final boolean async,
final List<Integer> validStatusCodes) {
this.uri = uri;
this.async = async;
this.validStatusCodes = validStatusCodes;
}
@Override
public void send(final T message) {
final String json = this.gson.toJson(message);
final HttpRequest request = HttpRequest.newBuilder()
.uri(this.uri)
.POST(HttpRequest.BodyPublishers.ofString(json))
.build();
final BodyHandler<Void> bodyHandler = BodyHandlers.discarding();
// final BodyHandler<String> bodyHandler = BodyHandlers.ofString();
final CompletableFuture<HttpResponse<Void>> result =
this.httpClient.sendAsync(request, bodyHandler)
.whenComplete((response, exception) -> {
if (exception != null) { // NOPMD
LOGGER.warn("Couldn't send request to {}.", this.uri, exception); // NOPMD false-p.
} else if (!this.validStatusCodes.contains(response.statusCode())) { // NOPMD
LOGGER.warn("Received status code {} for request to {}.", response.statusCode(),
this.uri);
} else {
LOGGER.debug("Sucessfully sent request to {} (status={}).", this.uri,
response.statusCode());
}
});
if (this.async) {
try {
result.get();
} catch (InterruptedException | ExecutionException e) {
LOGGER.error("Couldn't get result for request to {}.", this.uri, e);
}
}
}
}
...@@ -15,7 +15,7 @@ import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; ...@@ -15,7 +15,7 @@ import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory;
/** /**
* Sends monitoring records to Kafka. * Sends monitoring records to Kafka.
* *
* @param <T> {@link IMonitoringRecord} to send * @param <T> {@link SpecificRecord} to send
*/ */
public class KafkaRecordSender<T extends SpecificRecord> implements RecordSender<T> { public class KafkaRecordSender<T extends SpecificRecord> implements RecordSender<T> {
......
package theodolite.commons.workloadgeneration; package theodolite.commons.workloadgeneration;
import java.net.URI;
import java.time.Duration; import java.time.Duration;
import java.util.Objects; import java.util.Objects;
import java.util.Properties; import java.util.Properties;
import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerConfig;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import titan.ccp.model.records.ActivePowerRecord;
/** /**
* A Theodolite load generator. * A Theodolite load generator.
...@@ -20,9 +22,11 @@ public final class LoadGenerator { ...@@ -20,9 +22,11 @@ public final class LoadGenerator {
private static final int PERIOD_MS_DEFAULT = 1000; private static final int PERIOD_MS_DEFAULT = 1000;
private static final int VALUE_DEFAULT = 10; private static final int VALUE_DEFAULT = 10;
private static final int THREADS_DEFAULT = 4; private static final int THREADS_DEFAULT = 4;
private static final LoadGeneratorTarget TARGET_DEFAULT = LoadGeneratorTarget.KAFKA;
private static final String SCHEMA_REGISTRY_URL_DEFAULT = "http://localhost:8081"; private static final String SCHEMA_REGISTRY_URL_DEFAULT = "http://localhost:8081";
private static final String KAFKA_TOPIC_DEFAULT = "input"; private static final String KAFKA_TOPIC_DEFAULT = "input";
private static final String KAFKA_BOOTSTRAP_SERVERS_DEFAULT = "localhost:9092"; // NOPMD private static final String KAFKA_BOOTSTRAP_SERVERS_DEFAULT = "localhost:9092"; // NOPMD
private static final String HTTP_URI_DEFAULT = "http://localhost:8080";
private ClusterConfig clusterConfig; private ClusterConfig clusterConfig;
private WorkloadDefinition loadDefinition; private WorkloadDefinition loadDefinition;
...@@ -134,6 +138,43 @@ public final class LoadGenerator { ...@@ -134,6 +138,43 @@ public final class LoadGenerator {
clusterConfig.setClusterNamePrefix(portAutoIncrement); clusterConfig.setClusterNamePrefix(portAutoIncrement);
} }
final LoadGeneratorTarget target = LoadGeneratorTarget.from(
Objects.requireNonNullElse(System.getenv(ConfigurationKeys.TARGET),
TARGET_DEFAULT.getValue()));
final RecordSender<ActivePowerRecord> recordSender; // NOPMD
if (target == LoadGeneratorTarget.KAFKA) {
final String kafkaBootstrapServers = Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS),
KAFKA_BOOTSTRAP_SERVERS_DEFAULT);
final String kafkaInputTopic = Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.KAFKA_INPUT_TOPIC),
KAFKA_TOPIC_DEFAULT);
final String schemaRegistryUrl = Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.SCHEMA_REGISTRY_URL),
SCHEMA_REGISTRY_URL_DEFAULT);
final Properties kafkaProperties = new Properties();
kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG,
(k, v) -> System.getenv(ConfigurationKeys.KAFKA_BATCH_SIZE));
kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG,
(k, v) -> System.getenv(ConfigurationKeys.KAFKA_LINGER_MS));
kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG,
(k, v) -> System.getenv(ConfigurationKeys.KAFKA_BUFFER_MEMORY));
recordSender = TitanKafkaSenderFactory.forKafkaConfig(
kafkaBootstrapServers,
kafkaInputTopic,
schemaRegistryUrl);
} else if (target == LoadGeneratorTarget.HTTP) {
final URI uri = URI.create(
Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.HTTP_URI),
HTTP_URI_DEFAULT));
recordSender = new HttpRecordSender<>(uri);
} else {
// Should never happen
throw new IllegalStateException("Target " + target + " is not handled yet.");
}
final int numSensors = Integer.parseInt(Objects.requireNonNullElse( final int numSensors = Integer.parseInt(Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.NUM_SENSORS), System.getenv(ConfigurationKeys.NUM_SENSORS),
Integer.toString(NUMBER_OF_KEYS_DEFAULT))); Integer.toString(NUMBER_OF_KEYS_DEFAULT)));
...@@ -146,22 +187,6 @@ public final class LoadGenerator { ...@@ -146,22 +187,6 @@ public final class LoadGenerator {
final int threads = Integer.parseInt(Objects.requireNonNullElse( final int threads = Integer.parseInt(Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.THREADS), System.getenv(ConfigurationKeys.THREADS),
Integer.toString(THREADS_DEFAULT))); Integer.toString(THREADS_DEFAULT)));
final String kafkaBootstrapServers = Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS),
KAFKA_BOOTSTRAP_SERVERS_DEFAULT);
final String kafkaInputTopic = Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.KAFKA_INPUT_TOPIC),
KAFKA_TOPIC_DEFAULT);
final String schemaRegistryUrl = Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.SCHEMA_REGISTRY_URL),
SCHEMA_REGISTRY_URL_DEFAULT);
final Properties kafkaProperties = new Properties();
kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG,
(k, v) -> System.getenv(ConfigurationKeys.KAFKA_BATCH_SIZE));
kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG,
(k, v) -> System.getenv(ConfigurationKeys.KAFKA_LINGER_MS));
kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG,
(k, v) -> System.getenv(ConfigurationKeys.KAFKA_BUFFER_MEMORY));
return new LoadGenerator() return new LoadGenerator()
.setClusterConfig(clusterConfig) .setClusterConfig(clusterConfig)
...@@ -170,10 +195,7 @@ public final class LoadGenerator { ...@@ -170,10 +195,7 @@ public final class LoadGenerator {
Duration.ofMillis(periodMs))) Duration.ofMillis(periodMs)))
.setGeneratorConfig(new LoadGeneratorConfig( .setGeneratorConfig(new LoadGeneratorConfig(
TitanRecordGeneratorFactory.forConstantValue(value), TitanRecordGeneratorFactory.forConstantValue(value),
TitanKafkaSenderFactory.forKafkaConfig( recordSender))
kafkaBootstrapServers,
kafkaInputTopic,
schemaRegistryUrl)))
.withThreads(threads); .withThreads(threads);
} }
......
package theodolite.commons.workloadgeneration;
import java.util.stream.Stream;
enum LoadGeneratorTarget {
KAFKA("kafka"), HTTP("http");
private final String value;
LoadGeneratorTarget(final String value) {
this.value = value;
}
String getValue() {
return this.value;
}
static LoadGeneratorTarget from(final String value) {
return Stream.of(LoadGeneratorTarget.values())
.filter(t -> t.value.equals(value))
.findFirst()
.orElseThrow(() -> new IllegalArgumentException("Target '" + value + "' does not exist."));
}
}
package theodolite.commons.workloadgeneration;
import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
import static com.github.tomakehurst.wiremock.client.WireMock.equalTo;
import static com.github.tomakehurst.wiremock.client.WireMock.exactly;
import static com.github.tomakehurst.wiremock.client.WireMock.post;
import static com.github.tomakehurst.wiremock.client.WireMock.postRequestedFor;
import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo;
import static com.github.tomakehurst.wiremock.client.WireMock.verify;
import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.options;
import com.github.tomakehurst.wiremock.junit.WireMockRule;
import com.google.gson.Gson;
import java.net.URI;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import titan.ccp.model.records.ActivePowerRecord;
public class HttpRecordSenderTest {
private HttpRecordSender<ActivePowerRecord> httpRecordSender;
private Gson gson;
@Rule
public WireMockRule wireMockRule = new WireMockRule(options().dynamicPort());
@Before
public void setup() {
this.httpRecordSender =
new HttpRecordSender<>(URI.create("http://localhost:" + this.wireMockRule.port()));
this.gson = new Gson();
}
@Test
public void testValidUri() {
this.wireMockRule.stubFor(
post(urlPathEqualTo("/"))
.willReturn(
aResponse()
.withStatus(200)
.withBody("received")));
final ActivePowerRecord record = new ActivePowerRecord("my-id", 12345L, 12.34);
this.httpRecordSender.send(record);
verify(exactly(1), postRequestedFor(urlEqualTo("/"))
.withRequestBody(equalTo(this.gson.toJson(record)))); // toJson
}
}
...@@ -2,7 +2,6 @@ package theodolite.commons.workloadgeneration; ...@@ -2,7 +2,6 @@ package theodolite.commons.workloadgeneration;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import theodolite.commons.workloadgeneration.KeySpace;
public class KeySpaceTest { public class KeySpaceTest {
......
package theodolite.commons.workloadgeneration;
import org.junit.Assert;
import org.junit.Test;
public class LoadGeneratorTargetTest {
@Test
public void testFromKafka() {
final LoadGeneratorTarget target = LoadGeneratorTarget.from("kafka");
Assert.assertEquals(LoadGeneratorTarget.KAFKA, target);
}
@Test
public void testFromHttp() {
final LoadGeneratorTarget target = LoadGeneratorTarget.from("http");
Assert.assertEquals(LoadGeneratorTarget.HTTP, target);
}
@Test(expected = IllegalArgumentException.class)
public void testFromInvalidTarget() {
LoadGeneratorTarget.from("<invalid-target>");
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment