diff --git a/theodolite-benchmarks/load-generator-commons/build.gradle b/theodolite-benchmarks/load-generator-commons/build.gradle index f2aa10b079f4be80d19d9ac5d822b7bdab0b6d78..2d8f77b5154b5b788e0729da69122b443740ce75 100644 --- a/theodolite-benchmarks/load-generator-commons/build.gradle +++ b/theodolite-benchmarks/load-generator-commons/build.gradle @@ -13,14 +13,16 @@ repositories { } dependencies { - implementation 'com.google.guava:guava:30.1-jre' implementation 'com.hazelcast:hazelcast:4.1.1' implementation 'com.hazelcast:hazelcast-kubernetes:2.2.1' implementation 'org.slf4j:slf4j-simple:1.7.25' + implementation 'com.google.guava:guava:30.1-jre' + implementation 'com.google.code.gson:gson:2.8.2' implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true } implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true } implementation 'org.apache.kafka:kafka-streams:2.6.0' // TODO required? // Use JUnit test framework testImplementation 'junit:junit:4.12' + testImplementation 'com.github.tomakehurst:wiremock-jre8:2.32.0' } diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/ConfigurationKeys.java b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/ConfigurationKeys.java index 45ac1d5bb9c21a1b6303de2f248d08b69c02fc28..e4787db047232f6392a52ce81c248f11c1dbaaf2 100644 --- a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/ConfigurationKeys.java +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/ConfigurationKeys.java @@ -23,6 +23,8 @@ public final class ConfigurationKeys { public static final String THREADS = "THREADS"; + public static final String TARGET = "TARGET"; + public static final String KAFKA_BOOTSTRAP_SERVERS = "KAFKA_BOOTSTRAP_SERVERS"; public static final String SCHEMA_REGISTRY_URL = "SCHEMA_REGISTRY_URL"; @@ -35,6 +37,8 @@ public final class ConfigurationKeys { public static final String KAFKA_BUFFER_MEMORY = "KAFKA_BUFFER_MEMORY"; + public static final String HTTP_URI = "HTTP_URI"; + private ConfigurationKeys() {} } diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/HttpRecordSender.java b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/HttpRecordSender.java new file mode 100644 index 0000000000000000000000000000000000000000..6b7a5db067c8117f046aa0ff1c6f5d56c35c4321 --- /dev/null +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/HttpRecordSender.java @@ -0,0 +1,93 @@ +package theodolite.commons.workloadgeneration; + +import com.google.gson.Gson; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.net.http.HttpResponse.BodyHandler; +import java.net.http.HttpResponse.BodyHandlers; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import org.apache.avro.specific.SpecificRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Sends monitoring records via HTTP. + * + * @param <T> {@link SpecificRecord} to send + */ +public class HttpRecordSender<T extends SpecificRecord> implements RecordSender<T> { + + private static final int HTTP_OK = 200; + + private static final Logger LOGGER = LoggerFactory.getLogger(HttpRecordSender.class); + + private final Gson gson = new Gson(); + + private final HttpClient httpClient = HttpClient.newBuilder().build(); + + private final URI uri; + + private final boolean async; + + private final List<Integer> validStatusCodes; + + /** + * Create a new {@link HttpRecordSender}. + * + * @param uri the {@link URI} records should be sent to + */ + public HttpRecordSender(final URI uri) { + this(uri, true, List.of(HTTP_OK)); + } + + /** + * Create a new {@link HttpRecordSender}. + * + * @param uri the {@link URI} records should be sent to + * @param async whether HTTP requests should be sent asynchronous + * @param validStatusCodes a list of HTTP status codes which are considered as successful + */ + public HttpRecordSender(final URI uri, final boolean async, + final List<Integer> validStatusCodes) { + this.uri = uri; + this.async = async; + this.validStatusCodes = validStatusCodes; + } + + @Override + public void send(final T message) { + final String json = this.gson.toJson(message); + final HttpRequest request = HttpRequest.newBuilder() + .uri(this.uri) + .POST(HttpRequest.BodyPublishers.ofString(json)) + .build(); + final BodyHandler<Void> bodyHandler = BodyHandlers.discarding(); + // final BodyHandler<String> bodyHandler = BodyHandlers.ofString(); + + final CompletableFuture<HttpResponse<Void>> result = + this.httpClient.sendAsync(request, bodyHandler) + .whenComplete((response, exception) -> { + if (exception != null) { // NOPMD + LOGGER.warn("Couldn't send request to {}.", this.uri, exception); // NOPMD false-p. + } else if (!this.validStatusCodes.contains(response.statusCode())) { // NOPMD + LOGGER.warn("Received status code {} for request to {}.", response.statusCode(), + this.uri); + } else { + LOGGER.debug("Sucessfully sent request to {} (status={}).", this.uri, + response.statusCode()); + } + }); + if (this.async) { + try { + result.get(); + } catch (InterruptedException | ExecutionException e) { + LOGGER.error("Couldn't get result for request to {}.", this.uri, e); + } + } + } + +} diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/KafkaRecordSender.java b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/KafkaRecordSender.java index ded7c347c8d6b057581dc63b691df5bb60997791..44ff8a92afd5356b4bb2af203899a61f7af48b2d 100644 --- a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/KafkaRecordSender.java +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/KafkaRecordSender.java @@ -15,7 +15,7 @@ import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; /** * Sends monitoring records to Kafka. * - * @param <T> {@link IMonitoringRecord} to send + * @param <T> {@link SpecificRecord} to send */ public class KafkaRecordSender<T extends SpecificRecord> implements RecordSender<T> { diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGenerator.java b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGenerator.java index 3f5d14c2e7dccb94e4aacde1f531ec2e9d1fb8db..9e467864f89f746069aa3ca06aa087955ebc1575 100644 --- a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGenerator.java +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGenerator.java @@ -1,11 +1,13 @@ package theodolite.commons.workloadgeneration; +import java.net.URI; import java.time.Duration; import java.util.Objects; import java.util.Properties; import org.apache.kafka.clients.producer.ProducerConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import titan.ccp.model.records.ActivePowerRecord; /** * A Theodolite load generator. @@ -20,9 +22,11 @@ public final class LoadGenerator { private static final int PERIOD_MS_DEFAULT = 1000; private static final int VALUE_DEFAULT = 10; private static final int THREADS_DEFAULT = 4; + private static final LoadGeneratorTarget TARGET_DEFAULT = LoadGeneratorTarget.KAFKA; private static final String SCHEMA_REGISTRY_URL_DEFAULT = "http://localhost:8081"; private static final String KAFKA_TOPIC_DEFAULT = "input"; private static final String KAFKA_BOOTSTRAP_SERVERS_DEFAULT = "localhost:9092"; // NOPMD + private static final String HTTP_URI_DEFAULT = "http://localhost:8080"; private ClusterConfig clusterConfig; private WorkloadDefinition loadDefinition; @@ -134,6 +138,43 @@ public final class LoadGenerator { clusterConfig.setClusterNamePrefix(portAutoIncrement); } + final LoadGeneratorTarget target = LoadGeneratorTarget.from( + Objects.requireNonNullElse(System.getenv(ConfigurationKeys.TARGET), + TARGET_DEFAULT.getValue())); + + final RecordSender<ActivePowerRecord> recordSender; // NOPMD + if (target == LoadGeneratorTarget.KAFKA) { + final String kafkaBootstrapServers = Objects.requireNonNullElse( + System.getenv(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS), + KAFKA_BOOTSTRAP_SERVERS_DEFAULT); + final String kafkaInputTopic = Objects.requireNonNullElse( + System.getenv(ConfigurationKeys.KAFKA_INPUT_TOPIC), + KAFKA_TOPIC_DEFAULT); + final String schemaRegistryUrl = Objects.requireNonNullElse( + System.getenv(ConfigurationKeys.SCHEMA_REGISTRY_URL), + SCHEMA_REGISTRY_URL_DEFAULT); + final Properties kafkaProperties = new Properties(); + kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, + (k, v) -> System.getenv(ConfigurationKeys.KAFKA_BATCH_SIZE)); + kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG, + (k, v) -> System.getenv(ConfigurationKeys.KAFKA_LINGER_MS)); + kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG, + (k, v) -> System.getenv(ConfigurationKeys.KAFKA_BUFFER_MEMORY)); + recordSender = TitanKafkaSenderFactory.forKafkaConfig( + kafkaBootstrapServers, + kafkaInputTopic, + schemaRegistryUrl); + } else if (target == LoadGeneratorTarget.HTTP) { + final URI uri = URI.create( + Objects.requireNonNullElse( + System.getenv(ConfigurationKeys.HTTP_URI), + HTTP_URI_DEFAULT)); + recordSender = new HttpRecordSender<>(uri); + } else { + // Should never happen + throw new IllegalStateException("Target " + target + " is not handled yet."); + } + final int numSensors = Integer.parseInt(Objects.requireNonNullElse( System.getenv(ConfigurationKeys.NUM_SENSORS), Integer.toString(NUMBER_OF_KEYS_DEFAULT))); @@ -146,22 +187,6 @@ public final class LoadGenerator { final int threads = Integer.parseInt(Objects.requireNonNullElse( System.getenv(ConfigurationKeys.THREADS), Integer.toString(THREADS_DEFAULT))); - final String kafkaBootstrapServers = Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS), - KAFKA_BOOTSTRAP_SERVERS_DEFAULT); - final String kafkaInputTopic = Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.KAFKA_INPUT_TOPIC), - KAFKA_TOPIC_DEFAULT); - final String schemaRegistryUrl = Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.SCHEMA_REGISTRY_URL), - SCHEMA_REGISTRY_URL_DEFAULT); - final Properties kafkaProperties = new Properties(); - kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, - (k, v) -> System.getenv(ConfigurationKeys.KAFKA_BATCH_SIZE)); - kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG, - (k, v) -> System.getenv(ConfigurationKeys.KAFKA_LINGER_MS)); - kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG, - (k, v) -> System.getenv(ConfigurationKeys.KAFKA_BUFFER_MEMORY)); return new LoadGenerator() .setClusterConfig(clusterConfig) @@ -170,10 +195,7 @@ public final class LoadGenerator { Duration.ofMillis(periodMs))) .setGeneratorConfig(new LoadGeneratorConfig( TitanRecordGeneratorFactory.forConstantValue(value), - TitanKafkaSenderFactory.forKafkaConfig( - kafkaBootstrapServers, - kafkaInputTopic, - schemaRegistryUrl))) + recordSender)) .withThreads(threads); } diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGeneratorTarget.java b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGeneratorTarget.java new file mode 100644 index 0000000000000000000000000000000000000000..086e4de36301693c6873016122a47709b858a0d4 --- /dev/null +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGeneratorTarget.java @@ -0,0 +1,26 @@ +package theodolite.commons.workloadgeneration; + +import java.util.stream.Stream; + +enum LoadGeneratorTarget { + + KAFKA("kafka"), HTTP("http"); + + private final String value; + + LoadGeneratorTarget(final String value) { + this.value = value; + } + + String getValue() { + return this.value; + } + + static LoadGeneratorTarget from(final String value) { + return Stream.of(LoadGeneratorTarget.values()) + .filter(t -> t.value.equals(value)) + .findFirst() + .orElseThrow(() -> new IllegalArgumentException("Target '" + value + "' does not exist.")); + } + +} diff --git a/theodolite-benchmarks/load-generator-commons/src/test/java/theodolite/commons/workloadgeneration/HttpRecordSenderTest.java b/theodolite-benchmarks/load-generator-commons/src/test/java/theodolite/commons/workloadgeneration/HttpRecordSenderTest.java new file mode 100644 index 0000000000000000000000000000000000000000..002a97d1badfca560410b76d7d2ad260b90afbfd --- /dev/null +++ b/theodolite-benchmarks/load-generator-commons/src/test/java/theodolite/commons/workloadgeneration/HttpRecordSenderTest.java @@ -0,0 +1,52 @@ +package theodolite.commons.workloadgeneration; + +import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; +import static com.github.tomakehurst.wiremock.client.WireMock.equalTo; +import static com.github.tomakehurst.wiremock.client.WireMock.exactly; +import static com.github.tomakehurst.wiremock.client.WireMock.post; +import static com.github.tomakehurst.wiremock.client.WireMock.postRequestedFor; +import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo; +import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo; +import static com.github.tomakehurst.wiremock.client.WireMock.verify; +import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.options; +import com.github.tomakehurst.wiremock.junit.WireMockRule; +import com.google.gson.Gson; +import java.net.URI; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import titan.ccp.model.records.ActivePowerRecord; + +public class HttpRecordSenderTest { + + private HttpRecordSender<ActivePowerRecord> httpRecordSender; + + private Gson gson; + + @Rule + public WireMockRule wireMockRule = new WireMockRule(options().dynamicPort()); + + @Before + public void setup() { + this.httpRecordSender = + new HttpRecordSender<>(URI.create("http://localhost:" + this.wireMockRule.port())); + this.gson = new Gson(); + } + + @Test + public void testValidUri() { + this.wireMockRule.stubFor( + post(urlPathEqualTo("/")) + .willReturn( + aResponse() + .withStatus(200) + .withBody("received"))); + + final ActivePowerRecord record = new ActivePowerRecord("my-id", 12345L, 12.34); + this.httpRecordSender.send(record); + + verify(exactly(1), postRequestedFor(urlEqualTo("/")) + .withRequestBody(equalTo(this.gson.toJson(record)))); // toJson + } + +} diff --git a/theodolite-benchmarks/load-generator-commons/src/test/java/theodolite/commons/workloadgeneration/KeySpaceTest.java b/theodolite-benchmarks/load-generator-commons/src/test/java/theodolite/commons/workloadgeneration/KeySpaceTest.java index 20c094ddcc7ff110a25aaffa494766e89d4d2475..49004839a9c8fd280aba5006a1f08c2acb3c3136 100644 --- a/theodolite-benchmarks/load-generator-commons/src/test/java/theodolite/commons/workloadgeneration/KeySpaceTest.java +++ b/theodolite-benchmarks/load-generator-commons/src/test/java/theodolite/commons/workloadgeneration/KeySpaceTest.java @@ -2,7 +2,6 @@ package theodolite.commons.workloadgeneration; import org.junit.Assert; import org.junit.Test; -import theodolite.commons.workloadgeneration.KeySpace; public class KeySpaceTest { diff --git a/theodolite-benchmarks/load-generator-commons/src/test/java/theodolite/commons/workloadgeneration/LoadGeneratorTargetTest.java b/theodolite-benchmarks/load-generator-commons/src/test/java/theodolite/commons/workloadgeneration/LoadGeneratorTargetTest.java new file mode 100644 index 0000000000000000000000000000000000000000..644ffad9a4d2732f72ac307294d1311eba3a9ce8 --- /dev/null +++ b/theodolite-benchmarks/load-generator-commons/src/test/java/theodolite/commons/workloadgeneration/LoadGeneratorTargetTest.java @@ -0,0 +1,26 @@ +package theodolite.commons.workloadgeneration; + +import org.junit.Assert; +import org.junit.Test; + +public class LoadGeneratorTargetTest { + + @Test + public void testFromKafka() { + final LoadGeneratorTarget target = LoadGeneratorTarget.from("kafka"); + Assert.assertEquals(LoadGeneratorTarget.KAFKA, target); + } + + @Test + public void testFromHttp() { + final LoadGeneratorTarget target = LoadGeneratorTarget.from("http"); + Assert.assertEquals(LoadGeneratorTarget.HTTP, target); + } + + @Test(expected = IllegalArgumentException.class) + public void testFromInvalidTarget() { + LoadGeneratorTarget.from("<invalid-target>"); + } + + +}