diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 7d14e478aed45186eb71aa78ca65d8b54ade5856..fc590f029433f64a0f655dc28c3898c7847e4ab5 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -131,7 +131,7 @@ lint-helm: script: helm lint helm/ rules: - changes: - - helm/* + - helm/**/* - when: manual allow_failure: true diff --git a/docs/_config.yml b/docs/_config.yml index 0d2a1aa774a83347c80b538a97d5dbfa1b7639b3..95cc5a635494842f1894c572cb117aafb3bb6810 100644 --- a/docs/_config.yml +++ b/docs/_config.yml @@ -3,7 +3,7 @@ description: >- Theodolite is a framework for benchmarking the horizontal and vertical scalability of cloud-native applications. -remote_theme: pmarsceill/just-the-docs +remote_theme: just-the-docs/just-the-docs aux_links: "Theodolite on GitHub": - "//github.com/cau-se/theodolite" diff --git a/docs/running-benchmarks.md b/docs/running-benchmarks.md index 0a76316c0515233f9445b363f941d60ab7aa0e06..5051cb5b685deb17212c1489c585e75262bf9da5 100644 --- a/docs/running-benchmarks.md +++ b/docs/running-benchmarks.md @@ -130,7 +130,7 @@ If [persisting results](installation#persisting-results) is enabled in Theodolit For installations without persistence, but also as an alternative for installations with persistence, we provide a second option to access results: Theodolite comes with a *results access sidecar*. It allows to copy all benchmark results from the Theodolite pod to your current working directory on your host machine with the following command: ```sh -kubectl cp $(kubectl get pod -l app=theodolite -o jsonpath="{.items[0].metadata.name}"):/results . -c results-access +kubectl cp $(kubectl get pod -l app=theodolite -o jsonpath="{.items[0].metadata.name}"):results . -c results-access ``` ## Analyzing Benchmark Results diff --git a/docs/theodolite-benchmarks/load-generator.md b/docs/theodolite-benchmarks/load-generator.md index 8fefa19617263784cdf102d91e93ad492adbe63c..a41c97d52f62f399c9289a15a64991d0fed228ce 100644 --- a/docs/theodolite-benchmarks/load-generator.md +++ b/docs/theodolite-benchmarks/load-generator.md @@ -55,6 +55,8 @@ The prebuilt container images can be configured with the following environment v | `KAFKA_LINGER_MS` | Value for the Kafka producer configuration: [`linger.ms`](https://kafka.apache.org/documentation/#producerconfigs_linger.ms). Only used if Kafka is set as `TARGET`. | see Kafka producer config: [`linger.ms`](https://kafka.apache.org/documentation/#producerconfigs_linger.ms) | | `KAFKA_BUFFER_MEMORY` | Value for the Kafka producer configuration: [`buffer.memory`](https://kafka.apache.org/documentation/#producerconfigs_buffer.memory) Only used if Kafka is set as `TARGET`. | see Kafka producer config: [`buffer.memory`](https://kafka.apache.org/documentation/#producerconfigs_buffer.memory) | | `HTTP_URL` | The URL the load generator should post messages to. Only used if HTTP is set as `TARGET`. | | +| `HTTP_ASYNC` | Whether the load generator should send HTTP messages asynchronously. Only used if HTTP is set as `TARGET`. | `false` | +| `HTTP_TIMEOUT_MS` | Timeout in milliseconds for sending HTTP messages. Only used if HTTP is set as `TARGET`. | 1000 | | `PUBSUB_INPUT_TOPIC` | The Google Cloud Pub/Sub topic to write messages to. Only used if Pub/Sub is set as `TARGET`. | input | | `PUBSUB_PROJECT` | The Google Cloud this Pub/Sub topic is associated with. Only used if Pub/Sub is set as `TARGET`. | | | `PUBSUB_EMULATOR_HOST` | A Pub/Sub emulator host. Only used if Pub/Sub is set as `TARGET`. | | diff --git a/helm/preconfigs/minimal.yaml b/helm/preconfigs/minimal.yaml index 80a83f06cc9838e01f812e730932b9b79d947161..3f2dba0b55b729654125377996381718d7c15eb4 100644 --- a/helm/preconfigs/minimal.yaml +++ b/helm/preconfigs/minimal.yaml @@ -1,15 +1,15 @@ -cp-helm-charts: - cp-zookeeper: - servers: 1 - - cp-kafka: - brokers: 1 - configurationOverrides: - offsets.topic.replication.factor: "1" - operator: sloChecker: droppedRecordsKStreams: enabled: false resultsVolume: enabled: false + +strimzi: + kafka: + replicas: 1 + config: + "offsets.topic.replication.factor": "1" + zookeeper: + replicas: 1 + \ No newline at end of file diff --git a/helm/templates/kafka/kafka-cluster.yaml b/helm/templates/kafka/kafka-cluster.yaml index 29cf038f12aa6ee38b21697b8d79b5aea378c7d8..1ff89396513a134e553bc4b97f771822f52ac2ed 100644 --- a/helm/templates/kafka/kafka-cluster.yaml +++ b/helm/templates/kafka/kafka-cluster.yaml @@ -30,6 +30,15 @@ spec: configMapKeyRef: name: {{ template "theodolite.fullname" . }}-kafka-metrics key: kafka-metrics-config.yml + {{- with .Values.strimzi.kafka.nodeSelectorTerms}} + template: + pod: + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + {{- toYaml . | nindent 16 }} + {{- end}} zookeeper: {{- with .Values.strimzi.zookeeper.replicas }} @@ -37,7 +46,16 @@ spec: {{- toYaml . | nindent 6 }} {{- end }} storage: - type: ephemeral + type: ephemeral + {{- with .Values.strimzi.zookeeper.nodeSelectorTerms}} + template: + pod: + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + {{- toYaml . | nindent 16 }} + {{- end}} kafkaExporter: {} diff --git a/helm/templates/theodolite/role.yaml b/helm/templates/theodolite/role.yaml index ba5a223b6527df94b64fac3574ee5f90fdb3903b..b8d4d2d005d5a969c2c72cdca145f829d748e419 100644 --- a/helm/templates/theodolite/role.yaml +++ b/helm/templates/theodolite/role.yaml @@ -45,6 +45,19 @@ rules: - list - create - get + - apiGroups: + - kafka.strimzi.io + resources: + - kafkatopics + verbs: + - delete + - list + - get + - create + - update + {{- with .Values.rbac.additionalRules }} +{{ toYaml . | indent 2 }} + {{- end }} {{- if .Values.operator.enabled }} - apiGroups: - theodolite.com diff --git a/helm/values.yaml b/helm/values.yaml index 0f1d4790b6d03d42f6ccc65a5156eb70999867e6..765f8e4e6bd0a0f9d59dc812d4b7a01d134e10b0 100644 --- a/helm/values.yaml +++ b/helm/values.yaml @@ -173,6 +173,8 @@ strimzi: jvmOptions: "-Xmx": "512M" "-Xms": "512M" + nodeSelectorTerms: [] + zookeeper: replicas: 3 zooEntrance: @@ -180,8 +182,10 @@ strimzi: zookeeperClient: enabled: true nodeSelector: {} + nodeSelectorTerms: [] + topicOperator: - enabled: false + enabled: true ### @@ -341,6 +345,7 @@ serviceAccount: rbac: create: true + additionalRules: [] randomScheduler: enabled: true diff --git a/slo-checker/record-lag/app/main.py b/slo-checker/record-lag/app/main.py index 2e38354d45df57087a94e57d5c9ca412ed5534d3..bb68580a638a40bc7ae975594b859d10784adc67 100644 --- a/slo-checker/record-lag/app/main.py +++ b/slo-checker/record-lag/app/main.py @@ -24,7 +24,7 @@ elif os.getenv('LOG_LEVEL') == 'DEBUG': def calculate_slope_trend(results, warmup): d = [] for result in results: - group = result['metric']['consumergroup'] + group = result['metric'].get('consumergroup', "default") for value in result['values']: d.append({'group': group, 'timestamp': int( value[0]), 'value': int(value[1]) if value[1] != 'NaN' else 0}) diff --git a/theodolite-benchmarks/beam-commons/src/main/java/rocks/theodolite/benchmarks/commons/beam/AbstractPipelineFactory.java b/theodolite-benchmarks/beam-commons/src/main/java/rocks/theodolite/benchmarks/commons/beam/AbstractPipelineFactory.java index 4976f46e231c472599f85a72f698e26a09cbc860..f6dc64bb2c8f4deb0df6e48db23b0d62c6d86279 100644 --- a/theodolite-benchmarks/beam-commons/src/main/java/rocks/theodolite/benchmarks/commons/beam/AbstractPipelineFactory.java +++ b/theodolite-benchmarks/beam-commons/src/main/java/rocks/theodolite/benchmarks/commons/beam/AbstractPipelineFactory.java @@ -48,10 +48,13 @@ public abstract class AbstractPipelineFactory { final Map<String, Object> consumerConfig = new HashMap<>(); consumerConfig.put( ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, - this.config.getString(ConfigurationKeys.ENABLE_AUTO_COMMIT_CONFIG)); + this.config.getString(ConfigurationKeys.ENABLE_AUTO_COMMIT)); + consumerConfig.put( + ConsumerConfig.MAX_POLL_RECORDS_CONFIG, + this.config.getString(ConfigurationKeys.MAX_POLL_RECORDS)); consumerConfig.put( ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, - this.config.getString(ConfigurationKeys.AUTO_OFFSET_RESET_CONFIG)); + this.config.getString(ConfigurationKeys.AUTO_OFFSET_RESET)); consumerConfig.put( AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL)); diff --git a/theodolite-benchmarks/beam-commons/src/main/java/rocks/theodolite/benchmarks/commons/beam/BeamService.java b/theodolite-benchmarks/beam-commons/src/main/java/rocks/theodolite/benchmarks/commons/beam/BeamService.java index dd410f8d52e269a863b5d6dab62196c0d9690c98..0165fa644e1853353e73caeaf0b9d2df0f8e9aea 100644 --- a/theodolite-benchmarks/beam-commons/src/main/java/rocks/theodolite/benchmarks/commons/beam/BeamService.java +++ b/theodolite-benchmarks/beam-commons/src/main/java/rocks/theodolite/benchmarks/commons/beam/BeamService.java @@ -1,7 +1,9 @@ package rocks.theodolite.benchmarks.commons.beam; +import java.io.IOException; import java.util.function.Function; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.PipelineRunner; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; @@ -23,6 +25,7 @@ public class BeamService { private final AbstractPipelineFactory pipelineFactory; private final PipelineOptions pipelineOptions; + private PipelineResult pipelineResult; /** * Create a new {@link BeamService}. @@ -43,14 +46,43 @@ public class BeamService { } /** - * Start this microservice, by running the underlying Beam pipeline. + * Start this microservice by running the underlying Beam pipeline. */ public void run() { - LOGGER.info("Construct Beam pipeline with pipeline options: {}", + LOGGER.info("Constructing Beam pipeline with pipeline options: {}", this.pipelineOptions.toString()); final Pipeline pipeline = this.pipelineFactory.create(this.pipelineOptions); LOGGER.info("Starting BeamService {}.", this.applicationName); - pipeline.run().waitUntilFinish(); + this.pipelineResult = pipeline.run(); + } + + /** + * Start this microservice by running the underlying Beam pipeline and block until this process is + * terminated. + */ + public void runStandalone() { + this.run(); + Runtime.getRuntime().addShutdownHook(new Thread(() -> this.stop())); + this.pipelineResult.waitUntilFinish(); + } + + /** + * Stop this microservice by canceling the underlying Beam pipeline. + */ + public void stop() { + LOGGER.info("Initiate shutdown of Beam service {}.", this.applicationName); + if (this.pipelineResult == null) { + throw new IllegalStateException("Cannot stop service since it has never been started."); + } + LOGGER.info("Stopping Beam pipeline."); + try { + this.pipelineResult.cancel(); + this.pipelineResult = null; // NOPMD use null to indicate absence + } catch (final IOException e) { + throw new IllegalStateException( + "Stopping the service failed due to failed stop of Beam pipeline.", e); + } + LOGGER.info("Shutdown of Beam service {} complete.", this.applicationName); } } diff --git a/theodolite-benchmarks/beam-commons/src/main/java/rocks/theodolite/benchmarks/commons/beam/ConfigurationKeys.java b/theodolite-benchmarks/beam-commons/src/main/java/rocks/theodolite/benchmarks/commons/beam/ConfigurationKeys.java index 487b8de00c35bbe28961f29de7ba0bb9d57e98ec..c22c164f62ad22d3c18add75ad5115fd15fb8f14 100644 --- a/theodolite-benchmarks/beam-commons/src/main/java/rocks/theodolite/benchmarks/commons/beam/ConfigurationKeys.java +++ b/theodolite-benchmarks/beam-commons/src/main/java/rocks/theodolite/benchmarks/commons/beam/ConfigurationKeys.java @@ -33,16 +33,17 @@ public final class ConfigurationKeys { // BEAM - public static final String ENABLE_AUTO_COMMIT_CONFIG = "enable.auto.commit.config"; + public static final String ENABLE_AUTO_COMMIT = "enable.auto.commit"; - public static final String AUTO_OFFSET_RESET_CONFIG = "auto.offset.reset.config"; + public static final String MAX_POLL_RECORDS = "max.poll.records"; + + public static final String AUTO_OFFSET_RESET = "auto.offset.reset"; public static final String SPECIFIC_AVRO_READER = "specific.avro.reader"; - public static final String TRIGGER_INTERVAL = "trigger.interval"; + public static final String TRIGGER_INTERVAL = "trigger.interval"; - private ConfigurationKeys() { - } + private ConfigurationKeys() {} } diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/ConfigurationKeys.java b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/ConfigurationKeys.java index 3ffa7a647a308d9a8fde210ff54f32b177604c36..eb80d25eb327f2e3dc10dc2977131ac7edfef69d 100644 --- a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/ConfigurationKeys.java +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/ConfigurationKeys.java @@ -41,6 +41,10 @@ public final class ConfigurationKeys { public static final String HTTP_URL = "HTTP_URL"; + public static final String HTTP_ASYNC = "HTTP_ASYNC"; + + public static final String HTTP_TIMEOUT_MS = "HTTP_TIMEOUT_MS"; + public static final String PUBSUB_INPUT_TOPIC = "PUBSUB_INPUT_TOPIC"; public static final String PUBSUB_PROJECT = "PUBSUB_PROJECT"; diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/EnvVarLoadGeneratorFactory.java b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/EnvVarLoadGeneratorFactory.java index 445ccc6ed9f8a7e7ad8834aeb98b18206286e9d4..ae9a6d4220ceaec091a0a2fb49fb82f16fdbb42e 100644 --- a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/EnvVarLoadGeneratorFactory.java +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/EnvVarLoadGeneratorFactory.java @@ -42,9 +42,9 @@ class EnvVarLoadGeneratorFactory { .setLoadDefinition(new WorkloadDefinition( new KeySpace(LoadGenerator.SENSOR_PREFIX_DEFAULT, numSensors), Duration.ofMillis(periodMs))) - .setGeneratorConfig(new LoadGeneratorConfig( + .setGeneratorConfig(new LoadGeneratorConfig(GeneratorAction.from( TitanRecordGenerator.forConstantValue(value), - this.buildRecordSender())) + this.buildRecordSender()))) .withThreads(threads); } @@ -119,8 +119,14 @@ class EnvVarLoadGeneratorFactory { Objects.requireNonNullElse( System.getenv(ConfigurationKeys.HTTP_URL), LoadGenerator.HTTP_URI_DEFAULT)); - recordSender = new HttpRecordSender<>(url); - LOGGER.info("Use HTTP server as target with url '{}'.", url); + final boolean async = Boolean.parseBoolean(Objects.requireNonNullElse( + System.getenv(ConfigurationKeys.HTTP_ASYNC), + Boolean.toString(LoadGenerator.HTTP_ASYNC_DEFAULT))); + final long timeoutMs = Integer.parseInt(Objects.requireNonNullElse( + System.getenv(ConfigurationKeys.HTTP_TIMEOUT_MS), + Long.toString(LoadGenerator.HTTP_TIMEOUT_MS_DEFAULT))); + recordSender = new HttpRecordSender<>(url, async, Duration.ofMillis(timeoutMs)); + LOGGER.info("Use HTTP server as target with URL '{}' and asynchronously: '{}'.", url, async); } else if (target == LoadGeneratorTarget.PUBSUB) { final String project = System.getenv(ConfigurationKeys.PUBSUB_PROJECT); final String inputTopic = Objects.requireNonNullElse( @@ -136,7 +142,7 @@ class EnvVarLoadGeneratorFactory { LOGGER.info("Use Pub/Sub as target with project {} and topic '{}'.", project, inputTopic); recordSender = TitanPubSubSenderFactory.forPubSubConfig(project, inputTopic); } else { - throw new IllegalStateException("Neither an emulator host nor a project was provided."); + throw new IllegalStateException("Neither an emulator host nor a project was provided."); } } else { // Should never happen diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/GeneratorAction.java b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/GeneratorAction.java index e8a03fc8776d0cb07fc9623df93688db7455a042..fb3bf1c9f802d2af2a0eae72bc58c9e609a6b624 100644 --- a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/GeneratorAction.java +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/GeneratorAction.java @@ -5,14 +5,18 @@ package rocks.theodolite.benchmarks.loadgenerator; * it. */ @FunctionalInterface -interface GeneratorAction { +public interface GeneratorAction { void generate(final String key); + default void shutdown() { + // Nothing to do per default + } + public static <T> GeneratorAction from( final RecordGenerator<? extends T> generator, final RecordSender<? super T> sender) { - return key -> sender.send(generator.generate(key)); + return new GeneratorActionImpl<>(generator, sender); } } diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/GeneratorActionImpl.java b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/GeneratorActionImpl.java new file mode 100644 index 0000000000000000000000000000000000000000..1d177b7193ee8df8b4d65546cbbcbb6f49a95488 --- /dev/null +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/GeneratorActionImpl.java @@ -0,0 +1,27 @@ +package rocks.theodolite.benchmarks.loadgenerator; + +class GeneratorActionImpl<T> implements GeneratorAction { + + private final RecordGenerator<? extends T> generator; + + private final RecordSender<? super T> sender; + + public GeneratorActionImpl( + final RecordGenerator<? extends T> generator, + final RecordSender<? super T> sender) { + this.generator = generator; + this.sender = sender; + } + + @Override + public void shutdown() { + this.generator.close(); + this.sender.close(); + } + + @Override + public void generate(final String key) { + this.sender.send(this.generator.generate(key)); + } + +} diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/HazelcastRunner.java b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/HazelcastRunner.java index f3895f5d46dedcfa38d5fa9f2c43fb12a1aee672..3b222a332fc76159aedcf82a1753363c7b1e414e 100644 --- a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/HazelcastRunner.java +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/HazelcastRunner.java @@ -36,7 +36,7 @@ public class HazelcastRunner { } /** - * Start the workload generation and blocks until the workload generation is stopped again. + * Start the load generation and blocks until the load generation is stopped again. */ public void runBlocking() { while (!this.stopAction.isDone()) { @@ -52,19 +52,24 @@ public class HazelcastRunner { } public void restart() { - this.stopRunnerState(); + this.stopRunnerStateAsync(); } + /** + * Stop generating load and clean up the entire state. + */ public void stop() { this.stopAction.complete(null); - this.stopRunnerState(); + this.stopRunnerStateAsync().join(); + this.hzInstance.shutdown(); } - private void stopRunnerState() { + private CompletableFuture<Void> stopRunnerStateAsync() { synchronized (this) { if (this.runnerState != null) { - this.runnerState.stopAsync(); + return this.runnerState.stopAsync(); } + return CompletableFuture.completedFuture(null); } } diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/HazelcastRunnerStateInstance.java b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/HazelcastRunnerStateInstance.java index 79b86b6eecd980b8d90acab682687b6f8804d1b6..81a6db7329309a8c2b09fb3308cb496c5420a206 100644 --- a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/HazelcastRunnerStateInstance.java +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/HazelcastRunnerStateInstance.java @@ -27,6 +27,7 @@ public class HazelcastRunnerStateInstance { private static final Duration TASK_ASSIGNMENT_WAIT_DURATION = Duration.ofMillis(500); private final CompletableFuture<Void> stopAction = new CompletableFuture<>(); + private final CompletableFuture<Void> stopFinished = new CompletableFuture<>(); private LoadGeneratorExecution loadGeneratorExecution; private final LoadGeneratorConfig loadGeneratorConfig; @@ -61,10 +62,12 @@ public class HazelcastRunnerStateInstance { } this.stopAction.join(); this.stopLoadGeneration(); + this.stopFinished.complete(null); } - public void stopAsync() { + public CompletableFuture<Void> stopAsync() { this.stopAction.complete(null); + return this.stopFinished; } private void tryPerformBeforeAction() { diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/HttpRecordSender.java b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/HttpRecordSender.java index f9c0d3830e955ca88864d61e8bb74dc0974c1f26..77706d824808132eaa7212194de0d69c346e4eba 100644 --- a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/HttpRecordSender.java +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/HttpRecordSender.java @@ -7,6 +7,7 @@ import java.net.http.HttpRequest; import java.net.http.HttpResponse; import java.net.http.HttpResponse.BodyHandler; import java.net.http.HttpResponse.BodyHandlers; +import java.time.Duration; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -23,6 +24,8 @@ public class HttpRecordSender<T extends SpecificRecord> implements RecordSender< private static final int HTTP_OK = 200; + private static final Duration DEFAULT_CONNECTION_TIMEOUT = Duration.ofSeconds(1); + private static final Logger LOGGER = LoggerFactory.getLogger(HttpRecordSender.class); private final Gson gson = new Gson(); @@ -33,6 +36,8 @@ public class HttpRecordSender<T extends SpecificRecord> implements RecordSender< private final boolean async; + private final Duration connectionTimeout; + private final List<Integer> validStatusCodes; /** @@ -41,7 +46,18 @@ public class HttpRecordSender<T extends SpecificRecord> implements RecordSender< * @param uri the {@link URI} records should be sent to */ public HttpRecordSender(final URI uri) { - this(uri, true, List.of(HTTP_OK)); + this(uri, false, DEFAULT_CONNECTION_TIMEOUT); + } + + /** + * Create a new {@link HttpRecordSender}. + * + * @param uri the {@link URI} records should be sent to + * @param async whether HTTP requests should be sent asynchronous + * @param connectionTimeout timeout for the HTTP connection + */ + public HttpRecordSender(final URI uri, final boolean async, final Duration connectionTimeout) { + this(uri, async, connectionTimeout, List.of(HTTP_OK)); } /** @@ -49,12 +65,17 @@ public class HttpRecordSender<T extends SpecificRecord> implements RecordSender< * * @param uri the {@link URI} records should be sent to * @param async whether HTTP requests should be sent asynchronous + * @param connectionTimeout timeout for the HTTP connection * @param validStatusCodes a list of HTTP status codes which are considered as successful */ - public HttpRecordSender(final URI uri, final boolean async, + public HttpRecordSender( + final URI uri, + final boolean async, + final Duration connectionTimeout, final List<Integer> validStatusCodes) { this.uri = uri; this.async = async; + this.connectionTimeout = connectionTimeout; this.validStatusCodes = validStatusCodes; } @@ -63,6 +84,7 @@ public class HttpRecordSender<T extends SpecificRecord> implements RecordSender< final String json = this.gson.toJson(message); final HttpRequest request = HttpRequest.newBuilder() .uri(this.uri) + .timeout(this.connectionTimeout) .POST(HttpRequest.BodyPublishers.ofString(json)) .build(); final BodyHandler<Void> bodyHandler = BodyHandlers.discarding(); @@ -81,13 +103,19 @@ public class HttpRecordSender<T extends SpecificRecord> implements RecordSender< response.statusCode()); } }); - if (this.async) { + if (this.isSync()) { try { result.get(); - } catch (InterruptedException | ExecutionException e) { + } catch (final InterruptedException e) { LOGGER.error("Couldn't get result for request to {}.", this.uri, e); + } catch (final ExecutionException e) { // NOPMD + // Do nothing, Exception is already handled } } } + private boolean isSync() { + return !this.async; + } + } diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/KafkaRecordSender.java b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/KafkaRecordSender.java index 45731e8bce79264252c55a61b9efa5245f610c3b..56b1946ad78d888fe6e5140fdc373bb2cd3a4ed4 100644 --- a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/KafkaRecordSender.java +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/KafkaRecordSender.java @@ -4,115 +4,72 @@ import java.util.Properties; import java.util.function.Function; import org.apache.avro.specific.SpecificRecord; import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.StringSerializer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; /** - * Sends monitoring records to Kafka. + * Sends records to Kafka. * - * @param <T> {@link SpecificRecord} to send + * @param <T> Record type to send. */ -public class KafkaRecordSender<T extends SpecificRecord> implements RecordSender<T> { +public interface KafkaRecordSender<T> extends RecordSender<T> { - private static final Logger LOGGER = LoggerFactory.getLogger(KafkaRecordSender.class); - - private final String topic; - - private final Function<T, String> keyAccessor; - - private final Function<T, Long> timestampAccessor; - - private final Producer<String, T> producer; + @Override + public void close(); /** - * Create a new {@link KafkaRecordSender}. + * Creates a builder object for a {@link KafkaRecordSender} based on a Kafka {@link Serializer}. + * + * @param bootstrapServers The server to for accessing Kafka. + * @param topic The topic where to write. + * @param serializer The {@link Serializer} for mapping a value to keys. */ - private KafkaRecordSender(final Builder<T> builder) { - this.topic = builder.topic; - this.keyAccessor = builder.keyAccessor; - this.timestampAccessor = builder.timestampAccessor; - - final Properties properties = new Properties(); - properties.putAll(builder.defaultProperties); - properties.put("bootstrap.servers", builder.bootstrapServers); - // properties.put("acks", this.acknowledges); - // properties.put("batch.size", this.batchSize); - // properties.put("linger.ms", this.lingerMs); - // properties.put("buffer.memory", this.bufferMemory); - - final SchemaRegistryAvroSerdeFactory avroSerdeFactory = - new SchemaRegistryAvroSerdeFactory(builder.schemaRegistryUrl); - this.producer = new KafkaProducer<>( - properties, - new StringSerializer(), - avroSerdeFactory.<T>forKeys().serializer()); + public static <T> Builder<T> builderWithSerializer( + final String bootstrapServers, + final String topic, + final Serializer<T> serializer) { + return new Builder<>(bootstrapServers, topic, serializer); } /** - * Write the passed monitoring record to Kafka. + * Creates a Builder object for a {@link KafkaRecordSender} based on a Confluent Schema Registry + * URL. + * + * @param bootstrapServers The Server to for accessing Kafka. + * @param topic The topic where to write. + * @param schemaRegistryUrl URL to the schema registry for avro. */ - public void write(final T monitoringRecord) { - final ProducerRecord<String, T> record = - new ProducerRecord<>(this.topic, null, this.timestampAccessor.apply(monitoringRecord), - this.keyAccessor.apply(monitoringRecord), monitoringRecord); - - LOGGER.debug("Send record to Kafka topic {}: {}", this.topic, record); - try { - this.producer.send(record); - } catch (final SerializationException e) { - LOGGER.warn( - "Record could not be serialized and thus not sent to Kafka due to exception. Skipping this record.", // NOCS - e); - } - } - - public void terminate() { - this.producer.close(); - } - - @Override - public void send(final T message) { - this.write(message); - } - - public static <T extends SpecificRecord> Builder<T> builder( + public static <T extends SpecificRecord> Builder<T> builderWithSchemaRegistry( final String bootstrapServers, final String topic, final String schemaRegistryUrl) { - return new Builder<>(bootstrapServers, topic, schemaRegistryUrl); + final SchemaRegistryAvroSerdeFactory avroSerdeFactory = + new SchemaRegistryAvroSerdeFactory(schemaRegistryUrl); + return new Builder<>(bootstrapServers, topic, avroSerdeFactory.<T>forValues().serializer()); } /** - * Builder class to build a new {@link KafkaRecordSender}. + * Builder class to build a new {@link KafkaRecordSenderImpl}. * * @param <T> Type of the records that should later be send. */ - public static class Builder<T extends SpecificRecord> { + public static class Builder<T> { private final String bootstrapServers; private final String topic; - private final String schemaRegistryUrl; + private final Serializer<T> serializer; private Function<T, String> keyAccessor = x -> ""; // NOPMD private Function<T, Long> timestampAccessor = x -> null; // NOPMD private Properties defaultProperties = new Properties(); // NOPMD - /** - * Creates a Builder object for a {@link KafkaRecordSender}. - * - * @param bootstrapServers The Server to for accessing Kafka. - * @param topic The topic where to write. - * @param schemaRegistryUrl URL to the schema registry for avro. - */ private Builder(final String bootstrapServers, final String topic, - final String schemaRegistryUrl) { + final Serializer<T> serializer) { this.bootstrapServers = bootstrapServers; this.topic = topic; - this.schemaRegistryUrl = schemaRegistryUrl; + this.serializer = serializer; } public Builder<T> keyAccessor(final Function<T, String> keyAccessor) { @@ -130,9 +87,51 @@ public class KafkaRecordSender<T extends SpecificRecord> implements RecordSender return this; } + /** + * Create a {@link KafkaRecordSender} from this builder. + */ public KafkaRecordSender<T> build() { - return new KafkaRecordSender<>(this); + final Properties properties = new Properties(); + properties.putAll(this.defaultProperties); + properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers); + // properties.put("acks", this.acknowledges); + // properties.put("batch.size", this.batchSize); + // properties.put("linger.ms", this.lingerMs); + // properties.put("buffer.memory", this.bufferMemory); + + return new KafkaRecordSenderImpl<>( + new KafkaProducer<>( + properties, + new StringSerializer(), + this.serializer), + new DefaultRecordFactory<>(), + this.topic, + this.keyAccessor, + this.timestampAccessor); } + + private static class DefaultRecordFactory<T> implements KafkaRecordFactory<T, String, T> { + + @Override + public ProducerRecord<String, T> create(final String topic, final String key, final T value, + final long timestamp) { + return new ProducerRecord<>(topic, null, timestamp, key, value); + } + + } + } + + /** + * Create Kafka {@link ProducerRecord}s from a topic, a key, a value and a timestamp. + * + * @param <T> type the records should be created from. + * @param <K> key type of the {@link ProducerRecord}s. + * @param <V> value type of the {@link ProducerRecord}s. + */ + public static interface KafkaRecordFactory<T, K, V> { + + ProducerRecord<K, V> create(String topic, String key, T value, long timestamp); + } } diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/KafkaRecordSenderImpl.java b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/KafkaRecordSenderImpl.java new file mode 100644 index 0000000000000000000000000000000000000000..02a4d206b0a2414d5f12f5348f6c1bfc56852281 --- /dev/null +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/KafkaRecordSenderImpl.java @@ -0,0 +1,70 @@ +package rocks.theodolite.benchmarks.loadgenerator; + +import java.util.function.Function; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.errors.SerializationException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Sends records to Kafka. + * + * @param <T> Record type to send. + * @param <K> Internal key type for Kafka records. + * @param <V> Internal value type for Kafka records. + */ +/* default */ class KafkaRecordSenderImpl<T, K, V> implements KafkaRecordSender<T> { + + private static final Logger LOGGER = LoggerFactory.getLogger(KafkaRecordSenderImpl.class); + + private final String topic; + + private final Function<T, String> keyAccessor; + + private final Function<T, Long> timestampAccessor; + + private final Producer<K, V> producer; + + private final KafkaRecordFactory<T, K, V> recordFactory; + + /** + * Create a new {@link KafkaRecordSenderImpl}. + */ + protected KafkaRecordSenderImpl( + final Producer<K, V> producer, + final KafkaRecordFactory<T, K, V> recordFactory, + final String topic, + final Function<T, String> keyAccessor, + final Function<T, Long> timestampAccessor) { + this.topic = topic; + this.producer = producer; + this.recordFactory = recordFactory; + this.keyAccessor = keyAccessor; + this.timestampAccessor = timestampAccessor; + } + + @Override + public void close() { + this.producer.close(); + } + + @Override + public void send(final T message) { + final ProducerRecord<K, V> record = this.recordFactory.create( + this.topic, + this.keyAccessor.apply(message), + message, + this.timestampAccessor.apply(message)); + + LOGGER.debug("Send record to Kafka topic {}: {}", this.topic, record); + try { + this.producer.send(record); + } catch (final SerializationException e) { + LOGGER.warn( + "Record could not be serialized and thus not sent to Kafka due to exception. Skipping this record.", // NOCS + e); + } + } + +} diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/LoadGenerator.java b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/LoadGenerator.java index 6866b39e51570299d05795557b66997dc934c035..4be9b5695a54dedac6df78e6ceb8230752301e22 100644 --- a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/LoadGenerator.java +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/LoadGenerator.java @@ -17,6 +17,8 @@ public final class LoadGenerator { public static final LoadGeneratorTarget TARGET_DEFAULT = LoadGeneratorTarget.KAFKA; // Target: HTTP public static final String HTTP_URI_DEFAULT = "http://localhost:8080"; + public static final boolean HTTP_ASYNC_DEFAULT = false; + public static final long HTTP_TIMEOUT_MS_DEFAULT = 1_000; // Target: Kafka public static final String SCHEMA_REGISTRY_URL_DEFAULT = "http://localhost:8081"; public static final String KAFKA_TOPIC_DEFAULT = "input"; // NOCS @@ -78,6 +80,7 @@ public final class LoadGenerator { this.clusterConfig, this.generatorConfig, this.loadDefinition); + Runtime.getRuntime().addShutdownHook(new Thread(() -> runner.stop())); runner.runBlocking(); } @@ -91,11 +94,12 @@ public final class LoadGenerator { new KeySpace(SENSOR_PREFIX_DEFAULT, NUMBER_OF_KEYS_DEFAULT), Duration.ofMillis(PERIOD_MS_DEFAULT))) .setGeneratorConfig(new LoadGeneratorConfig( - TitanRecordGenerator.forConstantValue(VALUE_DEFAULT), - TitanKafkaSenderFactory.forKafkaConfig( - KAFKA_BOOTSTRAP_SERVERS_DEFAULT, - KAFKA_TOPIC_DEFAULT, - SCHEMA_REGISTRY_URL_DEFAULT))); + GeneratorAction.from( + TitanRecordGenerator.forConstantValue(VALUE_DEFAULT), + TitanKafkaSenderFactory.forKafkaConfig( + KAFKA_BOOTSTRAP_SERVERS_DEFAULT, + KAFKA_TOPIC_DEFAULT, + SCHEMA_REGISTRY_URL_DEFAULT)))); } /** diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/LoadGeneratorConfig.java b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/LoadGeneratorConfig.java index 97ed0b8fce6a18050e2c5846da1c590e891ed80b..e854138b38613ba614c871febcb80cf9c6b059ef 100644 --- a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/LoadGeneratorConfig.java +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/LoadGeneratorConfig.java @@ -5,21 +5,16 @@ package rocks.theodolite.benchmarks.loadgenerator; */ public class LoadGeneratorConfig { - private final GeneratorAction messageGenerator; + private final GeneratorAction generatorAction; private BeforeAction beforeAction = BeforeAction.doNothing(); private int threads = 1; - public <T> LoadGeneratorConfig( - final RecordGenerator<? extends T> generator, - final RecordSender<? super T> sender) { - this.messageGenerator = GeneratorAction.from(generator, sender); + public LoadGeneratorConfig(final GeneratorAction generatorAction) { + this.generatorAction = generatorAction; } - public <T> LoadGeneratorConfig( - final RecordGenerator<? extends T> generator, - final RecordSender<? super T> sender, - final int threads) { - this(generator, sender); + public LoadGeneratorConfig(final GeneratorAction generatorAction, final int threads) { + this(generatorAction); this.threads = threads; } @@ -37,7 +32,7 @@ public class LoadGeneratorConfig { public LoadGeneratorExecution buildLoadGeneratorExecution( final WorkloadDefinition workloadDefinition) { - return new LoadGeneratorExecution(workloadDefinition, this.messageGenerator, this.threads); + return new LoadGeneratorExecution(workloadDefinition, this.generatorAction, this.threads); } } diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/PubSubRecordSender.java b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/PubSubRecordSender.java index 97c4533dc4b8904f8ae9a5c46c3459216e86b5ca..ecba6961245651c7420d89c5da9bd1f993972188 100644 --- a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/PubSubRecordSender.java +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/PubSubRecordSender.java @@ -55,7 +55,8 @@ public class PubSubRecordSender<T> implements RecordSender<T> { /** * Terminate this {@link PubSubRecordSender} and shutdown the underlying {@link Publisher}. */ - public void terminate() { + @Override + public void close() { this.publisher.shutdown(); try { this.publisher.awaitTermination(SHUTDOWN_TIMEOUT_SEC, TimeUnit.SECONDS); diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/RecordGenerator.java b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/RecordGenerator.java index 0b64ace46a9e04f013f843ecd08dd6fcdf5eed9d..05e127eb019cf877cc5df73e09a6f053ef793fc3 100644 --- a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/RecordGenerator.java +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/RecordGenerator.java @@ -1,5 +1,7 @@ package rocks.theodolite.benchmarks.loadgenerator; +import java.io.Closeable; + /** * This interface describes a function that takes meta information from a string key and produces an * object of type T. @@ -7,8 +9,13 @@ package rocks.theodolite.benchmarks.loadgenerator; * @param <T> the type of the objects that will be generated by the function. */ @FunctionalInterface -public interface RecordGenerator<T> { +public interface RecordGenerator<T> extends Closeable { T generate(final String key); + @Override + default void close() { + // Nothing to do per default + } + } diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/RecordSender.java b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/RecordSender.java index 71732b88d2cf3f119140474c387f78b92a9521f8..f1f1bef980f01da4a23b49440be71ba552c13905 100644 --- a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/RecordSender.java +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/RecordSender.java @@ -1,5 +1,7 @@ package rocks.theodolite.benchmarks.loadgenerator; +import java.io.Closeable; + /** * This interface describes a function that consumes a message {@code T}. This function is dedicated * to be used to transport individual messages to the messaging system. @@ -7,8 +9,13 @@ package rocks.theodolite.benchmarks.loadgenerator; * @param <T> the type of records to send as messages. */ @FunctionalInterface -public interface RecordSender<T> { +public interface RecordSender<T> extends Closeable { void send(final T message); + @Override + default void close() { + // Nothing to do per default + } + } diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/TitanKafkaSenderFactory.java b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/TitanKafkaSenderFactory.java index 063bbaaab4a24d9dd2d90ef744672e03ac852b8b..ee7d416513439a5d0ba7bad7bcdb09e1baf5e4c7 100644 --- a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/TitanKafkaSenderFactory.java +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/TitanKafkaSenderFactory.java @@ -31,7 +31,7 @@ public final class TitanKafkaSenderFactory { final String schemaRegistryUrl, final Properties properties) { return KafkaRecordSender - .<ActivePowerRecord>builder( + .<ActivePowerRecord>builderWithSchemaRegistry( bootstrapServers, topic, schemaRegistryUrl) diff --git a/theodolite-benchmarks/load-generator-commons/src/test/java/rocks/theodolite/benchmarks/loadgenerator/HttpRecordSenderTest.java b/theodolite-benchmarks/load-generator-commons/src/test/java/rocks/theodolite/benchmarks/loadgenerator/HttpRecordSenderTest.java index 7c565ace82698bf47f6b3711a28e08f87e8e412b..731dda6c74fd3cd6d74771f95896c2260ce6df29 100644 --- a/theodolite-benchmarks/load-generator-commons/src/test/java/rocks/theodolite/benchmarks/loadgenerator/HttpRecordSenderTest.java +++ b/theodolite-benchmarks/load-generator-commons/src/test/java/rocks/theodolite/benchmarks/loadgenerator/HttpRecordSenderTest.java @@ -46,4 +46,22 @@ public class HttpRecordSenderTest { .withRequestBody(equalTo(expectedJson))); // toJson } + @Test + public void testTimeout() { + this.wireMockRule.stubFor( + post(urlPathEqualTo("/")) + .willReturn( + aResponse() + .withFixedDelay(2_000) + .withStatus(200) + .withBody("received"))); + + final ActivePowerRecord record = new ActivePowerRecord("my-id", 12345L, 12.34); + this.httpRecordSender.send(record); + + final String expectedJson = "{\"identifier\":\"my-id\",\"timestamp\":12345,\"valueInW\":12.34}"; + verify(exactly(1), postRequestedFor(urlEqualTo("/")) + .withRequestBody(equalTo(expectedJson))); // toJson + } + } diff --git a/theodolite-benchmarks/uc1-beam-flink/src/main/java/rocks/theodolite/benchmarks/uc1/beam/flink/Uc1BeamFlink.java b/theodolite-benchmarks/uc1-beam-flink/src/main/java/rocks/theodolite/benchmarks/uc1/beam/flink/Uc1BeamFlink.java index e1317219fedf24bc4b0eb4a3f9668da7de196cca..7f39500433a77612fe5ab010372a24ca46035135 100644 --- a/theodolite-benchmarks/uc1-beam-flink/src/main/java/rocks/theodolite/benchmarks/uc1/beam/flink/Uc1BeamFlink.java +++ b/theodolite-benchmarks/uc1-beam-flink/src/main/java/rocks/theodolite/benchmarks/uc1/beam/flink/Uc1BeamFlink.java @@ -17,7 +17,7 @@ public final class Uc1BeamFlink { private Uc1BeamFlink() {} public static void main(final String[] args) { - new BeamService(PipelineFactory.factory(), FlinkRunner.class, args).run(); + new BeamService(PipelineFactory.factory(), FlinkRunner.class, args).runStandalone(); } } diff --git a/theodolite-benchmarks/uc1-beam-samza/src/main/java/rocks/theodolite/benchmarks/uc1/beam/samza/Uc1BeamSamza.java b/theodolite-benchmarks/uc1-beam-samza/src/main/java/rocks/theodolite/benchmarks/uc1/beam/samza/Uc1BeamSamza.java index d3455db71bc3520bfa11c4da3a58c32da46337f9..9c3f650b7ddbe5e3c08139cdec2e590f5d55f3b3 100644 --- a/theodolite-benchmarks/uc1-beam-samza/src/main/java/rocks/theodolite/benchmarks/uc1/beam/samza/Uc1BeamSamza.java +++ b/theodolite-benchmarks/uc1-beam-samza/src/main/java/rocks/theodolite/benchmarks/uc1/beam/samza/Uc1BeamSamza.java @@ -21,6 +21,6 @@ public final class Uc1BeamSamza { * Main method. */ public static void main(final String[] args) { - new BeamService(PipelineFactory.factory(), SamzaRunner.class, args).run(); + new BeamService(PipelineFactory.factory(), SamzaRunner.class, args).runStandalone(); } } diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/rocks/theodolite/benchmarks/uc1/beam/PipelineFactory.java b/theodolite-benchmarks/uc1-beam/src/main/java/rocks/theodolite/benchmarks/uc1/beam/PipelineFactory.java index 32658a21b8b80fddb5baf58002a701e8e35b542e..1f35d592ed9b2b1507eb5c30090d392d37ed7c1e 100644 --- a/theodolite-benchmarks/uc1-beam/src/main/java/rocks/theodolite/benchmarks/uc1/beam/PipelineFactory.java +++ b/theodolite-benchmarks/uc1-beam/src/main/java/rocks/theodolite/benchmarks/uc1/beam/PipelineFactory.java @@ -9,6 +9,7 @@ import org.apache.beam.sdk.transforms.Values; import org.apache.commons.configuration2.Configuration; import rocks.theodolite.benchmarks.commons.beam.AbstractPipelineFactory; import rocks.theodolite.benchmarks.commons.beam.kafka.KafkaActivePowerTimestampReader; +import rocks.theodolite.benchmarks.uc1.beam.firestore.FirestoreOptionsExpander; import titan.ccp.model.records.ActivePowerRecord; /** @@ -17,6 +18,8 @@ import titan.ccp.model.records.ActivePowerRecord; public class PipelineFactory extends AbstractPipelineFactory { public static final String SINK_TYPE_KEY = "sink.type"; + + private final SinkType sinkType = SinkType.from(this.config.getString(SINK_TYPE_KEY)); public PipelineFactory(final Configuration configuration) { super(configuration); @@ -31,17 +34,18 @@ public class PipelineFactory extends AbstractPipelineFactory { // final PubsubOptions pubSubOptions = options.as(PubsubOptions.class); // pubSubOptions.setPubsubRootUrl("http://" + pubSubEmulatorHost); // } + if (this.sinkType == SinkType.FIRESTORE) { + FirestoreOptionsExpander.expandOptions(options); + } } @Override protected void constructPipeline(final Pipeline pipeline) { - final SinkType sinkType = SinkType.from(this.config.getString(SINK_TYPE_KEY)); - final KafkaActivePowerTimestampReader kafkaReader = super.buildKafkaReader(); pipeline.apply(kafkaReader) .apply(Values.create()) - .apply(sinkType.create(this.config)); + .apply(this.sinkType.create(this.config)); } @Override diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/rocks/theodolite/benchmarks/uc1/beam/WriterAdapter.java b/theodolite-benchmarks/uc1-beam/src/main/java/rocks/theodolite/benchmarks/uc1/beam/WriterAdapter.java index 4519515cf7d74abb0c447c56df4bbe313133c6a7..c1dc2f7305d01b47de644e4f8d391955540f530c 100644 --- a/theodolite-benchmarks/uc1-beam/src/main/java/rocks/theodolite/benchmarks/uc1/beam/WriterAdapter.java +++ b/theodolite-benchmarks/uc1-beam/src/main/java/rocks/theodolite/benchmarks/uc1/beam/WriterAdapter.java @@ -6,7 +6,7 @@ import rocks.theodolite.benchmarks.uc1.commons.DatabaseWriter; /** * {@link DoFn} which wraps a {@link DatabaseAdapter} to be used with Beam. - * + * * @param <T> type the {@link DatabaseWriter} is associated with. */ public class WriterAdapter<T> extends DoFn<T, Void> { diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/rocks/theodolite/benchmarks/uc1/beam/firestore/FirestoreOptionsExpander.java b/theodolite-benchmarks/uc1-beam/src/main/java/rocks/theodolite/benchmarks/uc1/beam/firestore/FirestoreOptionsExpander.java new file mode 100644 index 0000000000000000000000000000000000000000..0447450b45b971f96e2f2cbb7ce91f78604d5a23 --- /dev/null +++ b/theodolite-benchmarks/uc1-beam/src/main/java/rocks/theodolite/benchmarks/uc1/beam/firestore/FirestoreOptionsExpander.java @@ -0,0 +1,34 @@ +package rocks.theodolite.benchmarks.uc1.beam.firestore; + +import java.io.IOException; +import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; +import org.apache.beam.sdk.options.PipelineOptions; + +/** + * Provides a method to expand {@link PipelineOptions} for Firestore. + */ +public final class FirestoreOptionsExpander { + + private FirestoreOptionsExpander() {} + + /** + * Expand {@link PipelineOptions} by special options required for Firestore derived from a default + * configuration. + * + * @param options {@link PipelineOptions} to be expanded. + */ + public static void expandOptions(final PipelineOptions options) { + final GcpOptions firestoreOptions = options.as(GcpOptions.class); + final FirestoreConfig firestoreConfig = getFirestoreConfig(); + firestoreOptions.setProject(firestoreConfig.getProjectId()); + } + + private static FirestoreConfig getFirestoreConfig() { + try { + return FirestoreConfig.createFromDefaults(); + } catch (final IOException e) { + throw new IllegalStateException("Cannot create Firestore configuration.", e); + } + } + +} diff --git a/theodolite-benchmarks/uc1-beam/src/main/resources/META-INF/application.properties b/theodolite-benchmarks/uc1-beam/src/main/resources/META-INF/application.properties index b785d698cd59a31bff7e9cffc21ca1d877f037fe..e9de96c0df34b1254a8ec9886586e163999c7c6e 100644 --- a/theodolite-benchmarks/uc1-beam/src/main/resources/META-INF/application.properties +++ b/theodolite-benchmarks/uc1-beam/src/main/resources/META-INF/application.properties @@ -14,6 +14,7 @@ num.threads=1 commit.interval.ms=1000 cache.max.bytes.buffering=-1 -specific.avro.reader=True -enable.auto.commit.config=True -auto.offset.reset.config=earliest +specific.avro.reader=true +enable.auto.commit=true +max.poll.records=500 +auto.offset.reset=earliest diff --git a/theodolite-benchmarks/uc2-beam-flink/src/main/java/rocks/theodolite/benchmarks/uc2/beam/flink/Uc2BeamFlink.java b/theodolite-benchmarks/uc2-beam-flink/src/main/java/rocks/theodolite/benchmarks/uc2/beam/flink/Uc2BeamFlink.java index ab6a9992a5dfca11a182235b467d5be76488ed55..2772d76fa26f504827ab74acb8fccc45f117365c 100644 --- a/theodolite-benchmarks/uc2-beam-flink/src/main/java/rocks/theodolite/benchmarks/uc2/beam/flink/Uc2BeamFlink.java +++ b/theodolite-benchmarks/uc2-beam-flink/src/main/java/rocks/theodolite/benchmarks/uc2/beam/flink/Uc2BeamFlink.java @@ -15,7 +15,7 @@ public final class Uc2BeamFlink { private Uc2BeamFlink() {} public static void main(final String[] args) { - new BeamService(PipelineFactory.factory(), FlinkRunner.class, args).run(); + new BeamService(PipelineFactory.factory(), FlinkRunner.class, args).runStandalone(); } } diff --git a/theodolite-benchmarks/uc2-beam-samza/src/main/java/rocks/theodolite/benchmarks/uc2/beam/samza/Uc2BeamSamza.java b/theodolite-benchmarks/uc2-beam-samza/src/main/java/rocks/theodolite/benchmarks/uc2/beam/samza/Uc2BeamSamza.java index 80981818d401b48ed61ee56987764684df9dd31f..1b3f4ac8a2d052f0d34051e6b17b62100feb129d 100644 --- a/theodolite-benchmarks/uc2-beam-samza/src/main/java/rocks/theodolite/benchmarks/uc2/beam/samza/Uc2BeamSamza.java +++ b/theodolite-benchmarks/uc2-beam-samza/src/main/java/rocks/theodolite/benchmarks/uc2/beam/samza/Uc2BeamSamza.java @@ -19,7 +19,7 @@ public final class Uc2BeamSamza { private Uc2BeamSamza() {} public static void main(final String[] args) { - new BeamService(PipelineFactory.factory(), SamzaRunner.class, args).run(); + new BeamService(PipelineFactory.factory(), SamzaRunner.class, args).runStandalone(); } } diff --git a/theodolite-benchmarks/uc2-beam/src/main/resources/META-INF/application.properties b/theodolite-benchmarks/uc2-beam/src/main/resources/META-INF/application.properties index 1545a0f6630c8ea51d694f4056ca3aa750463f5b..c6672125a8b6a074cb7eca31bd90700cd4da736a 100644 --- a/theodolite-benchmarks/uc2-beam/src/main/resources/META-INF/application.properties +++ b/theodolite-benchmarks/uc2-beam/src/main/resources/META-INF/application.properties @@ -12,6 +12,7 @@ num.threads=1 commit.interval.ms=1000 cache.max.bytes.buffering=-1 -specific.avro.reader=True -enable.auto.commit.config=True -auto.offset.reset.config=earliest \ No newline at end of file +specific.avro.reader=true +enable.auto.commit=true +max.poll.records=500 +auto.offset.reset=earliest \ No newline at end of file diff --git a/theodolite-benchmarks/uc3-beam-flink/src/main/java/rocks/theodolite/benchmarks/uc3/beam/flink/Uc3BeamFlink.java b/theodolite-benchmarks/uc3-beam-flink/src/main/java/rocks/theodolite/benchmarks/uc3/beam/flink/Uc3BeamFlink.java index 8782559fea6a08ad2c5a92b355149e3a2ee02ea2..f4f4563925ede4d61edcaab29c3d6e7aed0b5e9c 100644 --- a/theodolite-benchmarks/uc3-beam-flink/src/main/java/rocks/theodolite/benchmarks/uc3/beam/flink/Uc3BeamFlink.java +++ b/theodolite-benchmarks/uc3-beam-flink/src/main/java/rocks/theodolite/benchmarks/uc3/beam/flink/Uc3BeamFlink.java @@ -21,7 +21,7 @@ public final class Uc3BeamFlink { * Start running this microservice. */ public static void main(final String[] args) { - new BeamService(PipelineFactory.factory(), FlinkRunner.class, args).run(); + new BeamService(PipelineFactory.factory(), FlinkRunner.class, args).runStandalone(); } } diff --git a/theodolite-benchmarks/uc3-beam-samza/src/main/java/rocks/theodolite/benchmarks/uc3/beam/samza/Uc3BeamSamza.java b/theodolite-benchmarks/uc3-beam-samza/src/main/java/rocks/theodolite/benchmarks/uc3/beam/samza/Uc3BeamSamza.java index 84e705f6f52f41f5c553a1ef3fb2ebd7ce95e20a..247cd99becff8a200185c8fa40efb49bf31a6806 100644 --- a/theodolite-benchmarks/uc3-beam-samza/src/main/java/rocks/theodolite/benchmarks/uc3/beam/samza/Uc3BeamSamza.java +++ b/theodolite-benchmarks/uc3-beam-samza/src/main/java/rocks/theodolite/benchmarks/uc3/beam/samza/Uc3BeamSamza.java @@ -21,7 +21,7 @@ public final class Uc3BeamSamza { * Start running this microservice. */ public static void main(final String[] args) { - new BeamService(PipelineFactory.factory(), SamzaRunner.class, args).run(); + new BeamService(PipelineFactory.factory(), SamzaRunner.class, args).runStandalone(); } } diff --git a/theodolite-benchmarks/uc3-beam/src/main/resources/META-INF/application.properties b/theodolite-benchmarks/uc3-beam/src/main/resources/META-INF/application.properties index 2db723927eaee10d39e02a6b2d369a06af7711fc..0fe4b240d97f087f00c28430740488f7e01f1577 100644 --- a/theodolite-benchmarks/uc3-beam/src/main/resources/META-INF/application.properties +++ b/theodolite-benchmarks/uc3-beam/src/main/resources/META-INF/application.properties @@ -17,6 +17,7 @@ num.threads=1 commit.interval.ms=1000 cache.max.bytes.buffering=-1 -specific.avro.reader=True -enable.auto.commit.config=True -auto.offset.reset.config=earliest \ No newline at end of file +specific.avro.reader=true +enable.auto.commit=true +max.poll.records=500 +auto.offset.reset=earliest \ No newline at end of file diff --git a/theodolite-benchmarks/uc4-beam-flink/src/main/java/rocks/theodolite/benchmarks/uc4/beam/flink/Uc4BeamFlink.java b/theodolite-benchmarks/uc4-beam-flink/src/main/java/rocks/theodolite/benchmarks/uc4/beam/flink/Uc4BeamFlink.java index 5d398d610a12890e3fb9c85804a4b59a69163b4f..f5f9af3fc14b57476975708a139788e7f0386953 100644 --- a/theodolite-benchmarks/uc4-beam-flink/src/main/java/rocks/theodolite/benchmarks/uc4/beam/flink/Uc4BeamFlink.java +++ b/theodolite-benchmarks/uc4-beam-flink/src/main/java/rocks/theodolite/benchmarks/uc4/beam/flink/Uc4BeamFlink.java @@ -15,7 +15,7 @@ public final class Uc4BeamFlink { * Start running this microservice. */ public static void main(final String[] args) { - new BeamService(PipelineFactory.factory(), FlinkRunner.class, args).run(); + new BeamService(PipelineFactory.factory(), FlinkRunner.class, args).runStandalone(); } } diff --git a/theodolite-benchmarks/uc4-beam-samza/src/main/java/rocks/theodolite/benchmarks/uc4/beam/samza/Uc4BeamSamza.java b/theodolite-benchmarks/uc4-beam-samza/src/main/java/rocks/theodolite/benchmarks/uc4/beam/samza/Uc4BeamSamza.java index 044b3dc4b647dffa02a62d17c9fcdaf15b0a0869..585e3ff9589c0262c12b6fa33023cd69b58c53f1 100644 --- a/theodolite-benchmarks/uc4-beam-samza/src/main/java/rocks/theodolite/benchmarks/uc4/beam/samza/Uc4BeamSamza.java +++ b/theodolite-benchmarks/uc4-beam-samza/src/main/java/rocks/theodolite/benchmarks/uc4/beam/samza/Uc4BeamSamza.java @@ -22,7 +22,7 @@ public final class Uc4BeamSamza { * Start running this microservice. */ public static void main(final String[] args) { - new BeamService(PipelineFactory.factory(), SamzaRunner.class, args).run(); + new BeamService(PipelineFactory.factory(), SamzaRunner.class, args).runStandalone(); } } diff --git a/theodolite-benchmarks/uc4-beam/src/main/java/rocks/theodolite/benchmarks/uc4/beam/PipelineFactory.java b/theodolite-benchmarks/uc4-beam/src/main/java/rocks/theodolite/benchmarks/uc4/beam/PipelineFactory.java index 4cb707017ff90236df4546b87e472b86eb495e10..a71c24eda5385b10a73b9eb65a83bba8363dd3e7 100644 --- a/theodolite-benchmarks/uc4-beam/src/main/java/rocks/theodolite/benchmarks/uc4/beam/PipelineFactory.java +++ b/theodolite-benchmarks/uc4-beam/src/main/java/rocks/theodolite/benchmarks/uc4/beam/PipelineFactory.java @@ -251,10 +251,10 @@ public class PipelineFactory extends AbstractPipelineFactory { final Map<String, Object> consumerConfig = new HashMap<>(); consumerConfig.put( ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, - this.config.getString(ConfigurationKeys.ENABLE_AUTO_COMMIT_CONFIG)); + this.config.getString(ConfigurationKeys.ENABLE_AUTO_COMMIT)); consumerConfig.put( ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, - this.config.getString(ConfigurationKeys.AUTO_OFFSET_RESET_CONFIG)); + this.config.getString(ConfigurationKeys.AUTO_OFFSET_RESET)); consumerConfig.put( ConsumerConfig.GROUP_ID_CONFIG, this.config .getString(ConfigurationKeys.APPLICATION_NAME) + "-configuration"); @@ -265,10 +265,10 @@ public class PipelineFactory extends AbstractPipelineFactory { final Map<String, Object> consumerConfig = new HashMap<>(); consumerConfig.put( ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, - this.config.getString(ConfigurationKeys.ENABLE_AUTO_COMMIT_CONFIG)); + this.config.getString(ConfigurationKeys.ENABLE_AUTO_COMMIT)); consumerConfig.put( ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, - this.config.getString(ConfigurationKeys.AUTO_OFFSET_RESET_CONFIG)); + this.config.getString(ConfigurationKeys.AUTO_OFFSET_RESET)); consumerConfig.put( AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL)); diff --git a/theodolite-benchmarks/uc4-beam/src/main/resources/META-INF/application.properties b/theodolite-benchmarks/uc4-beam/src/main/resources/META-INF/application.properties index bc679580dadf969e181b6787e8287066426be7e2..c1a8ca17b41ab8c8f0fa939c748200db5ba7d0d2 100644 --- a/theodolite-benchmarks/uc4-beam/src/main/resources/META-INF/application.properties +++ b/theodolite-benchmarks/uc4-beam/src/main/resources/META-INF/application.properties @@ -20,6 +20,7 @@ num.threads=1 commit.interval.ms=1000 cache.max.bytes.buffering=-1 -specific.avro.reader=True -enable.auto.commit.config=True -auto.offset.reset.config=earliest \ No newline at end of file +specific.avro.reader=true +enable.auto.commit=true +max.poll.records=500 +auto.offset.reset=earliest \ No newline at end of file diff --git a/theodolite/build.gradle b/theodolite/build.gradle index a066e94f09b71720f9392947640b077b153ccb9c..521137d7315de193f26fdf2307155e587c0dd921 100644 --- a/theodolite/build.gradle +++ b/theodolite/build.gradle @@ -59,6 +59,12 @@ compileTestKotlin { kotlinOptions.jvmTarget = JavaVersion.VERSION_11 } +test { + // Required because of https://github.com/quarkusio/quarkus/issues/18973 + minHeapSize = "256m" + maxHeapSize = "1024m" +} + detekt { failFast = true // fail build on any finding buildUponDefaultConfig = true diff --git a/theodolite/gradle.properties b/theodolite/gradle.properties index fd5768bc24a65dbd43b3ea770c854ae7c0da0a91..a7c5a39013639072e1ef47f9226b95b513d678d7 100644 --- a/theodolite/gradle.properties +++ b/theodolite/gradle.properties @@ -1,8 +1,8 @@ #Gradle properties quarkusPluginId=io.quarkus -quarkusPluginVersion=2.6.3.Final +quarkusPluginVersion=2.7.4.Final quarkusPlatformGroupId=io.quarkus.platform quarkusPlatformArtifactId=quarkus-bom -quarkusPlatformVersion=2.6.3.Final +quarkusPlatformVersion=2.7.4.Final #org.gradle.logging.level=INFO \ No newline at end of file diff --git a/theodolite/src/main/kotlin/theodolite/benchmark/ActionCommand.kt b/theodolite/src/main/kotlin/theodolite/benchmark/ActionCommand.kt index 966fa56329c8d7d466dd14858bcbc06bb5b857c3..a4345c43ac6a75667c3c3e85c8534697193e1458 100644 --- a/theodolite/src/main/kotlin/theodolite/benchmark/ActionCommand.kt +++ b/theodolite/src/main/kotlin/theodolite/benchmark/ActionCommand.kt @@ -7,7 +7,6 @@ import io.fabric8.kubernetes.client.dsl.ExecListener import io.fabric8.kubernetes.client.dsl.ExecWatch import io.fabric8.kubernetes.client.utils.Serialization import mu.KotlinLogging -import okhttp3.Response import theodolite.util.ActionCommandFailedException import theodolite.util.Configuration import java.io.ByteArrayOutputStream @@ -145,10 +144,8 @@ class ActionCommand(val client: NamespacedKubernetesClient) { } private class ActionCommandListener(val execLatch: CountDownLatch) : ExecListener { - override fun onOpen(response: Response) { - } - override fun onFailure(throwable: Throwable, response: Response) { + override fun onFailure(throwable: Throwable, response: ExecListener.Response) { execLatch.countDown() throw ActionCommandFailedException("Some error encountered while executing action, caused ${throwable.message})") } diff --git a/theodolite/src/main/kotlin/theodolite/benchmark/ConfigMapResourceSet.kt b/theodolite/src/main/kotlin/theodolite/benchmark/ConfigMapResourceSet.kt index f85b83497e5d69e43c1d4784ef86170a5436e929..eea5b15cb1db7242328033a1bc46fb224d287bc2 100644 --- a/theodolite/src/main/kotlin/theodolite/benchmark/ConfigMapResourceSet.kt +++ b/theodolite/src/main/kotlin/theodolite/benchmark/ConfigMapResourceSet.kt @@ -1,23 +1,21 @@ package theodolite.benchmark import com.fasterxml.jackson.databind.annotation.JsonDeserialize +import io.fabric8.kubernetes.api.model.HasMetadata import io.fabric8.kubernetes.api.model.KubernetesResource import io.fabric8.kubernetes.client.KubernetesClientException import io.fabric8.kubernetes.client.NamespacedKubernetesClient import io.quarkus.runtime.annotations.RegisterForReflection -import theodolite.k8s.resourceLoader.K8sResourceLoaderFromString import theodolite.util.DeploymentFailedException -import theodolite.util.YamlParserFromString import java.lang.IllegalArgumentException @RegisterForReflection @JsonDeserialize class ConfigMapResourceSet : ResourceSet, KubernetesResource { lateinit var name: String - lateinit var files: List<String> // load all files, iff files is not set + var files: List<String>? = null // load all files, iff files is not set - override fun getResourceSet(client: NamespacedKubernetesClient): Collection<Pair<String, KubernetesResource>> { - val loader = K8sResourceLoaderFromString(client) + override fun getResourceSet(client: NamespacedKubernetesClient): Collection<Pair<String, HasMetadata>> { var resources: Map<String, String> try { @@ -31,9 +29,9 @@ class ConfigMapResourceSet : ResourceSet, KubernetesResource { throw DeploymentFailedException("Cannot find or read ConfigMap with name '$name'.", e) } - if (::files.isInitialized) { - val filteredResources = resources.filter { files.contains(it.key) } - if (filteredResources.size != files.size) { + files?.run { + val filteredResources = resources.filter { this.contains(it.key) } + if (filteredResources.size != this.size) { throw DeploymentFailedException("Could not find all specified Kubernetes manifests files") } resources = filteredResources @@ -43,14 +41,8 @@ class ConfigMapResourceSet : ResourceSet, KubernetesResource { resources .map { Pair( - getKind(resource = it.value), - it - ) - } - .map { - Pair( - it.second.key, - loader.loadK8sResource(it.first, it.second.value) + it.key, // filename + client.resource(it.value).get() ) } } catch (e: IllegalArgumentException) { @@ -59,11 +51,4 @@ class ConfigMapResourceSet : ResourceSet, KubernetesResource { } - private fun getKind(resource: String): String { - val parser = YamlParserFromString() - val resourceAsMap = parser.parse(resource, HashMap<String, String>()::class.java) - - return resourceAsMap?.get("kind") - ?: throw DeploymentFailedException("Could not find field kind of Kubernetes resource: ${resourceAsMap?.get("name")}") - } } \ No newline at end of file diff --git a/theodolite/src/main/kotlin/theodolite/benchmark/FileSystemResourceSet.kt b/theodolite/src/main/kotlin/theodolite/benchmark/FileSystemResourceSet.kt index f830232de4b6956fa0f989cae131903377862e6c..e95a637ab88f11902062de73b0c34603b08aded3 100644 --- a/theodolite/src/main/kotlin/theodolite/benchmark/FileSystemResourceSet.kt +++ b/theodolite/src/main/kotlin/theodolite/benchmark/FileSystemResourceSet.kt @@ -1,62 +1,57 @@ package theodolite.benchmark import com.fasterxml.jackson.databind.annotation.JsonDeserialize +import io.fabric8.kubernetes.api.model.HasMetadata import io.fabric8.kubernetes.api.model.KubernetesResource import io.fabric8.kubernetes.client.NamespacedKubernetesClient import io.quarkus.runtime.annotations.RegisterForReflection -import theodolite.k8s.resourceLoader.K8sResourceLoaderFromFile import theodolite.util.DeploymentFailedException -import theodolite.util.YamlParserFromFile -import java.io.File +import java.io.BufferedReader +import java.io.FileInputStream import java.io.FileNotFoundException -import java.lang.IllegalArgumentException +import java.io.InputStreamReader +import java.nio.charset.StandardCharsets +import java.nio.file.Path +import java.nio.file.Paths +import java.util.stream.Collectors +import kotlin.io.path.listDirectoryEntries + @RegisterForReflection @JsonDeserialize class FileSystemResourceSet: ResourceSet, KubernetesResource { lateinit var path: String - lateinit var files: List<String> - - override fun getResourceSet(client: NamespacedKubernetesClient): Collection<Pair<String, KubernetesResource>> { - - //if files is set ... - if(::files.isInitialized){ - return files - .map { loadSingleResource(resourceURL = it, client = client) } - } - - return try { - File(path) - .list() !! - .filter { it.endsWith(".yaml") || it.endsWith(".yml") } - .map { - loadSingleResource(resourceURL = it, client = client) - } - } catch (e: NullPointerException) { + var files: List<String>? = null + + override fun getResourceSet(client: NamespacedKubernetesClient): Collection<Pair<String, HasMetadata>> { + // if files is set ... + return files?.run { + return this + .map { Paths.get(path, it) } + .map { loadSingleResource(resource = it, client = client) } + } ?: + try { + Paths.get(path) + .listDirectoryEntries() + .filter { it.toString().endsWith(".yaml") || it.toString().endsWith(".yml") } + .map { loadSingleResource(resource = it, client = client) } + } catch (e: java.nio.file.NoSuchFileException) { // not to be confused with Kotlin exception throw DeploymentFailedException("Could not load files located in $path", e) } } - private fun loadSingleResource(resourceURL: String, client: NamespacedKubernetesClient): Pair<String, KubernetesResource> { - val parser = YamlParserFromFile() - val loader = K8sResourceLoaderFromFile(client) - val resourcePath = "$path/$resourceURL" - lateinit var kind: String - - try { - kind = parser.parse(resourcePath, HashMap<String, String>()::class.java)?.get("kind")!! - } catch (e: NullPointerException) { - throw DeploymentFailedException("Can not get Kind from resource $resourcePath", e) - } catch (e: FileNotFoundException){ - throw DeploymentFailedException("File $resourcePath not found", e) - - } - + private fun loadSingleResource(resource: Path, client: NamespacedKubernetesClient): Pair<String, HasMetadata> { return try { - val k8sResource = loader.loadK8sResource(kind, resourcePath) - Pair(resourceURL, k8sResource) + val stream = FileInputStream(resource.toFile()) + val text = BufferedReader( + InputStreamReader(stream, StandardCharsets.UTF_8) + ).lines().collect(Collectors.joining("\n")) + val k8sResource = client.resource(text).get() + Pair(resource.last().toString(), k8sResource) + } catch (e: FileNotFoundException){ + throw DeploymentFailedException("File $resource not found.", e) } catch (e: IllegalArgumentException) { - throw DeploymentFailedException("Could not load resource: $resourcePath", e) + throw DeploymentFailedException("Could not load resource: $resource.", e) } } } \ No newline at end of file diff --git a/theodolite/src/main/kotlin/theodolite/benchmark/KubernetesBenchmark.kt b/theodolite/src/main/kotlin/theodolite/benchmark/KubernetesBenchmark.kt index d42c2ea3c0ed5394fdcf5b89be0fe0470a15ba62..6857b9bf8918593dbe5085f40eb28fd8bd809d85 100644 --- a/theodolite/src/main/kotlin/theodolite/benchmark/KubernetesBenchmark.kt +++ b/theodolite/src/main/kotlin/theodolite/benchmark/KubernetesBenchmark.kt @@ -1,13 +1,13 @@ package theodolite.benchmark import com.fasterxml.jackson.databind.annotation.JsonDeserialize +import io.fabric8.kubernetes.api.model.HasMetadata import io.fabric8.kubernetes.api.model.KubernetesResource import io.fabric8.kubernetes.client.DefaultKubernetesClient import io.fabric8.kubernetes.client.NamespacedKubernetesClient import io.quarkus.runtime.annotations.RegisterForReflection import mu.KotlinLogging import theodolite.k8s.K8sManager -import theodolite.k8s.resourceLoader.K8sResourceLoader import theodolite.patcher.PatcherFactory import theodolite.util.* @@ -53,22 +53,27 @@ class KubernetesBenchmark : KubernetesResource, Benchmark { * It first loads them via the [YamlParserFromFile] to check for their concrete type and afterwards initializes them using * the [K8sResourceLoader] */ - fun loadKubernetesResources(resourceSet: List<ResourceSets>): Collection<Pair<String, KubernetesResource>> { + @Deprecated("Use `loadResourceSet` from `ResourceSets`") + fun loadKubernetesResources(resourceSet: List<ResourceSets>): Collection<Pair<String, HasMetadata>> { + return loadResources(resourceSet) + } + + private fun loadResources(resourceSet: List<ResourceSets>): Collection<Pair<String, HasMetadata>> { return resourceSet.flatMap { it.loadResourceSet(this.client) } } override fun setupInfrastructure() { this.infrastructure.beforeActions.forEach { it.exec(client = client) } val kubernetesManager = K8sManager(this.client) - loadKubernetesResources(this.infrastructure.resources) - .map{it.second} + loadResources(this.infrastructure.resources) + .map { it.second } .forEach { kubernetesManager.deploy(it) } } override fun teardownInfrastructure() { val kubernetesManager = K8sManager(this.client) - loadKubernetesResources(this.infrastructure.resources) - .map{it.second} + loadResources(this.infrastructure.resources) + .map { it.second } .forEach { kubernetesManager.remove(it) } this.infrastructure.afterActions.forEach { it.exec(client = client) } } @@ -91,8 +96,8 @@ class KubernetesBenchmark : KubernetesResource, Benchmark { ): BenchmarkDeployment { logger.info { "Using $namespace as namespace." } - val appResources = loadKubernetesResources(this.sut.resources) - val loadGenResources = loadKubernetesResources(this.loadGenerator.resources) + val appResources = loadResources(this.sut.resources) + val loadGenResources = loadResources(this.loadGenerator.resources) val patcherFactory = PatcherFactory() @@ -122,7 +127,7 @@ class KubernetesBenchmark : KubernetesResource, Benchmark { loadGenResources = loadGenResources.map { it.second }, loadGenerationDelay = loadGenerationDelay, afterTeardownDelay = afterTeardownDelay, - kafkaConfig = if (kafkaConfig != null) hashMapOf("bootstrap.servers" to kafkaConfig.bootstrapServer) else mapOf(), + kafkaConfig = if (kafkaConfig != null) mapOf("bootstrap.servers" to kafkaConfig.bootstrapServer) else mapOf(), topics = kafkaConfig?.topics ?: listOf(), client = this.client ) diff --git a/theodolite/src/main/kotlin/theodolite/benchmark/KubernetesBenchmarkDeployment.kt b/theodolite/src/main/kotlin/theodolite/benchmark/KubernetesBenchmarkDeployment.kt index 2b3cf0fa13d894424e6a0546993e2fd9998b8620..b30032c524b1e421301e0e9d1ffe83772b43d900 100644 --- a/theodolite/src/main/kotlin/theodolite/benchmark/KubernetesBenchmarkDeployment.kt +++ b/theodolite/src/main/kotlin/theodolite/benchmark/KubernetesBenchmarkDeployment.kt @@ -1,5 +1,6 @@ package theodolite.benchmark +import io.fabric8.kubernetes.api.model.HasMetadata import io.fabric8.kubernetes.api.model.KubernetesResource import io.fabric8.kubernetes.client.NamespacedKubernetesClient import io.quarkus.runtime.annotations.RegisterForReflection @@ -27,8 +28,8 @@ class KubernetesBenchmarkDeployment( private val sutAfterActions: List<Action>, private val loadGenBeforeActions: List<Action>, private val loadGenAfterActions: List<Action>, - val appResources: List<KubernetesResource>, - val loadGenResources: List<KubernetesResource>, + val appResources: List<HasMetadata>, + val loadGenResources: List<HasMetadata>, private val loadGenerationDelay: Long, private val afterTeardownDelay: Long, private val kafkaConfig: Map<String, Any>, @@ -79,7 +80,7 @@ class KubernetesBenchmarkDeployment( labelName = LAG_EXPORTER_POD_LABEL_NAME, labelValue = LAG_EXPORTER_POD_LABEL_VALUE ) - logger.info { "Teardown complete. Wait $afterTeardownDelay ms to let everything come down." } + logger.info { "Teardown complete. Wait $afterTeardownDelay seconds to let everything cool down." } Thread.sleep(Duration.ofSeconds(afterTeardownDelay).toMillis()) } } diff --git a/theodolite/src/main/kotlin/theodolite/benchmark/ResourceSets.kt b/theodolite/src/main/kotlin/theodolite/benchmark/ResourceSets.kt index b6364949727d4ea134e348ce8b79e22334753c1c..0626a6e24369348d50b60fbb555665c58dd17281 100644 --- a/theodolite/src/main/kotlin/theodolite/benchmark/ResourceSets.kt +++ b/theodolite/src/main/kotlin/theodolite/benchmark/ResourceSets.kt @@ -3,6 +3,7 @@ package theodolite.benchmark import com.fasterxml.jackson.annotation.JsonInclude import com.fasterxml.jackson.annotation.JsonProperty import com.fasterxml.jackson.databind.annotation.JsonDeserialize +import io.fabric8.kubernetes.api.model.HasMetadata import io.fabric8.kubernetes.api.model.KubernetesResource import io.fabric8.kubernetes.client.NamespacedKubernetesClient import io.quarkus.runtime.annotations.RegisterForReflection @@ -19,13 +20,14 @@ class ResourceSets: KubernetesResource { @JsonInclude(JsonInclude.Include.NON_NULL) var fileSystem: FileSystemResourceSet? = null - fun loadResourceSet(client: NamespacedKubernetesClient): Collection<Pair<String, KubernetesResource>> { + fun loadResourceSet(client: NamespacedKubernetesClient): Collection<Pair<String, HasMetadata>> { + // TODO Find out whether field access (::configMap) is really what we want to do here (see #362) return if (::configMap != null) { - configMap?.getResourceSet(client= client) !! + configMap?.getResourceSet(client= client) !! } else if (::fileSystem != null) { - fileSystem?.getResourceSet(client= client ) !! + fileSystem?.getResourceSet(client= client ) !! } else { - throw DeploymentFailedException("could not load resourceSet.") + throw DeploymentFailedException("Could not load resourceSet.") } } } \ No newline at end of file diff --git a/theodolite/src/main/kotlin/theodolite/evaluation/ExternalSloChecker.kt b/theodolite/src/main/kotlin/theodolite/evaluation/ExternalSloChecker.kt index 7fb5417e200f64b0db74a8bebe69a751c5d484b8..7587e8326df98f3c45c016bfd3b2d7db8077e6d1 100644 --- a/theodolite/src/main/kotlin/theodolite/evaluation/ExternalSloChecker.kt +++ b/theodolite/src/main/kotlin/theodolite/evaluation/ExternalSloChecker.kt @@ -40,7 +40,7 @@ class ExternalSloChecker( val result = post(externalSlopeURL, data = data, timeout = TIMEOUT) if (result.statusCode != 200) { counter++ - logger.error { "Could not reach external SLO checker." } + logger.error { "Could not reach external SLO checker at $externalSlopeURL." } } else { val booleanResult = result.text.toBoolean() logger.info { "SLO checker result is: $booleanResult." } @@ -48,6 +48,6 @@ class ExternalSloChecker( } } - throw ConnectException("Could not reach external SLO checker") + throw ConnectException("Could not reach external SLO checker at $externalSlopeURL.") } } diff --git a/theodolite/src/main/kotlin/theodolite/evaluation/SloConfigHandler.kt b/theodolite/src/main/kotlin/theodolite/evaluation/SloConfigHandler.kt index d2f5906e2c4dcbe4eccba1fb3de9655145b692e2..089f40dc6b5ef7d8ac4b063cae68e5e9621d1f50 100644 --- a/theodolite/src/main/kotlin/theodolite/evaluation/SloConfigHandler.kt +++ b/theodolite/src/main/kotlin/theodolite/evaluation/SloConfigHandler.kt @@ -4,24 +4,24 @@ import theodolite.benchmark.BenchmarkExecution import theodolite.util.InvalidPatcherConfigurationException import javax.enterprise.context.ApplicationScoped -private const val CONSUMER_LAG_METRIC = "kafka_consumergroup_lag" -private const val DROPPED_RECORDS_QUERY = "sum by(job) (kafka_streams_stream_task_metrics_dropped_records_total>=0)" +private const val DEFAULT_CONSUMER_LAG_METRIC_BASE = "kafka_consumergroup_lag" +private const val DEFAULT_CONSUMER_LAG_QUERY = "sum by(consumergroup) (kafka_consumergroup_lag >= 0)" +private const val DEFAULT_DROPPED_RECORDS_QUERY = "sum by(job) (kafka_streams_stream_task_metrics_dropped_records_total>=0)" @ApplicationScoped class SloConfigHandler { companion object { fun getQueryString(slo: BenchmarkExecution.Slo): String { - return when (slo.sloType.toLowerCase()) { + return when (slo.sloType.lowercase()) { SloTypes.GENERIC.value -> slo.properties["promQLQuery"] ?: throw IllegalArgumentException("promQLQuery expected") - SloTypes.LAG_TREND.value, SloTypes.LAG_TREND_RATIO.value -> { - var projection: String; - if (slo.properties.containsKey("consumerGroup")) { - projection = "consumergroup='" + slo.properties["consumerGroup"] + "'"; - } else projection = ""; - return "sum by(consumergroup) ($CONSUMER_LAG_METRIC{$projection} >= 0)"; - } - SloTypes.DROPPED_RECORDS.value, SloTypes.DROPPED_RECORDS_RATIO.value -> DROPPED_RECORDS_QUERY - else -> throw InvalidPatcherConfigurationException("Could not find Prometheus query string for slo type $slo.sloType") + SloTypes.LAG_TREND.value, SloTypes.LAG_TREND_RATIO.value -> slo.properties["promQLQuery"] ?: + (slo.properties["consumerGroup"]?.let { "{consumergroup='$it'}" } ?: "").let { + "sum by(consumergroup) ($DEFAULT_CONSUMER_LAG_METRIC_BASE$it >= 0)" + } + SloTypes.DROPPED_RECORDS.value, SloTypes.DROPPED_RECORDS_RATIO.value -> slo.properties["promQLQuery"] ?: DEFAULT_DROPPED_RECORDS_QUERY + SloTypes.LAG_TREND.value, SloTypes.LAG_TREND_RATIO.value -> slo.properties["promQLQuery"] ?: DEFAULT_CONSUMER_LAG_QUERY + SloTypes.DROPPED_RECORDS.value, SloTypes.DROPPED_RECORDS_RATIO.value -> slo.properties["promQLQuery"] ?: DEFAULT_DROPPED_RECORDS_QUERY + else -> throw InvalidPatcherConfigurationException("Could not find Prometheus query string for slo type ${slo.sloType}") } } } diff --git a/theodolite/src/main/kotlin/theodolite/execution/operator/AbstractStateHandler.kt b/theodolite/src/main/kotlin/theodolite/execution/operator/AbstractStateHandler.kt index 93536282e2eefe6e476c3fde3fd86860fa24dcc3..84343ea7e8d7d420bcf320f36be02c39c41a1945 100644 --- a/theodolite/src/main/kotlin/theodolite/execution/operator/AbstractStateHandler.kt +++ b/theodolite/src/main/kotlin/theodolite/execution/operator/AbstractStateHandler.kt @@ -25,7 +25,12 @@ abstract class AbstractStateHandler<S : HasMetadata>( val resource = this.crdClient.withName(resourceName).get() if (resource != null) { val resourcePatched = setter(resource) - this.crdClient.patchStatus(resourcePatched) + // TODO replace with this.crdClient.replaceStatus(resourcePatched) with upcoming fabric8 release (> 5.12.1) + // find out the difference between patchStatus and replaceStatus + // see also https://github.com/fabric8io/kubernetes-client/pull/3798 + if (resourcePatched != null) { + this.crdClient.withName(resourcePatched.metadata.name).patchStatus(resourcePatched) + } } } catch (e: KubernetesClientException) { logger.warn(e) { "Status cannot be set for resource $resourceName." } diff --git a/theodolite/src/main/kotlin/theodolite/execution/operator/LeaderElector.kt b/theodolite/src/main/kotlin/theodolite/execution/operator/LeaderElector.kt index 1ce94c2fdd1ce13d50a21e01b9d4692c87d0da6f..558d06ce03074c38741b6c0a72c6ffa6eff96019 100644 --- a/theodolite/src/main/kotlin/theodolite/execution/operator/LeaderElector.kt +++ b/theodolite/src/main/kotlin/theodolite/execution/operator/LeaderElector.kt @@ -18,22 +18,22 @@ class LeaderElector( ) { // TODO(what is the name of the lock? .withName() or LeaseLock(..,name..) ?) - fun getLeadership(leader: KFunction0<Unit>) { + fun getLeadership(leader: () -> Unit) { val lockIdentity: String = UUID.randomUUID().toString() DefaultKubernetesClient().use { kc -> kc.leaderElector() .withConfig( LeaderElectionConfigBuilder() .withName("Theodolite") - .withLeaseDuration(Duration.ofSeconds(15L)) + .withLeaseDuration(Duration.ofSeconds(15)) .withLock(LeaseLock(client.namespace, name, lockIdentity)) - .withRenewDeadline(Duration.ofSeconds(10L)) - .withRetryPeriod(Duration.ofSeconds(2L)) + .withRenewDeadline(Duration.ofSeconds(10)) + .withRetryPeriod(Duration.ofSeconds(2)) .withLeaderCallbacks(LeaderCallbacks( { Thread { leader() }.start() }, - { logger.info { "STOPPED LEADERSHIP" } } + { logger.info { "Stop being the leading operator." } } ) { newLeader: String? -> - logger.info { "New leader elected $newLeader" } + logger.info { "New leader elected: $newLeader" } }) .build() ) diff --git a/theodolite/src/main/kotlin/theodolite/execution/operator/TheodoliteController.kt b/theodolite/src/main/kotlin/theodolite/execution/operator/TheodoliteController.kt index 5f4180b0b4b58fa94b979c71998314baae63a91b..d9cb33b189da02b807301dde8550f2ae532d7e5a 100644 --- a/theodolite/src/main/kotlin/theodolite/execution/operator/TheodoliteController.kt +++ b/theodolite/src/main/kotlin/theodolite/execution/operator/TheodoliteController.kt @@ -130,8 +130,7 @@ class TheodoliteController( .list() .items .map { - it.spec.name = it.metadata.name - it + it.apply { it.spec.name = it.metadata.name } } } diff --git a/theodolite/src/main/kotlin/theodolite/execution/operator/TheodoliteOperator.kt b/theodolite/src/main/kotlin/theodolite/execution/operator/TheodoliteOperator.kt index 071bd06071345499d01595df72e5de4c8535b3fc..ada30ec945dd602dabe3ddb5f0e635a4eeea7b5f 100644 --- a/theodolite/src/main/kotlin/theodolite/execution/operator/TheodoliteOperator.kt +++ b/theodolite/src/main/kotlin/theodolite/execution/operator/TheodoliteOperator.kt @@ -41,15 +41,14 @@ class TheodoliteOperator { LeaderElector( client = client, name = Configuration.COMPONENT_NAME - ) - .getLeadership(::startOperator) + ).getLeadership(::startOperator) } /** * Start the operator. */ private fun startOperator() { - logger.info { "Using $namespace as namespace." } + logger.info { "Becoming the leading operator. Use namespace '$namespace'." } client.use { KubernetesDeserializer.registerCustomKind( "$GROUP/$API_VERSION", diff --git a/theodolite/src/main/kotlin/theodolite/k8s/CustomResourceWrapper.kt b/theodolite/src/main/kotlin/theodolite/k8s/CustomResourceWrapper.kt deleted file mode 100644 index 797ed88389947d66aa626ba2ef3fdf6732f8369d..0000000000000000000000000000000000000000 --- a/theodolite/src/main/kotlin/theodolite/k8s/CustomResourceWrapper.kt +++ /dev/null @@ -1,47 +0,0 @@ -package theodolite.k8s - -import io.fabric8.kubernetes.api.model.KubernetesResource -import io.fabric8.kubernetes.client.NamespacedKubernetesClient -import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext -import mu.KotlinLogging - -private val logger = KotlinLogging.logger {} - -class CustomResourceWrapper( - val crAsMap: Map<String, String>, - private val context: CustomResourceDefinitionContext -) : KubernetesResource { - /** - * Deploy a service monitor - * - * @param client a namespaced Kubernetes client which are used to deploy the CR object. - * - * @throws java.io.IOException if the resource could not be deployed. - */ - fun deploy(client: NamespacedKubernetesClient) { - client.customResource(this.context) - .createOrReplace(client.configuration.namespace, this.crAsMap as Map<String, Any>) - } - - /** - * Delete a service monitor - * - * @param client a namespaced Kubernetes client which are used to delete the CR object. - */ - fun delete(client: NamespacedKubernetesClient) { - try { - client.customResource(this.context) - .delete(client.configuration.namespace, this.getName()) - } catch (e: Exception) { - logger.warn { "Could not delete custom resource" } - } - } - - /** - * @throws NullPointerException if name or metadata is null - */ - fun getName(): String { - val metadataAsMap = this.crAsMap["metadata"]!! as Map<String, String> - return metadataAsMap["name"]!! - } -} diff --git a/theodolite/src/main/kotlin/theodolite/k8s/K8sContextFactory.kt b/theodolite/src/main/kotlin/theodolite/k8s/K8sContextFactory.kt index 7eb209bfbab02bb94d34c985aa308173e509d4e4..38224f26a38a241e92b38e8b92a7fa5b4e198f5e 100644 --- a/theodolite/src/main/kotlin/theodolite/k8s/K8sContextFactory.kt +++ b/theodolite/src/main/kotlin/theodolite/k8s/K8sContextFactory.kt @@ -7,6 +7,7 @@ import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext * * @see CustomResourceDefinitionContext */ +@Deprecated("Use `CustomResourceDefinitionContext.Builder` instead.") class K8sContextFactory { /** diff --git a/theodolite/src/main/kotlin/theodolite/k8s/K8sManager.kt b/theodolite/src/main/kotlin/theodolite/k8s/K8sManager.kt index 389d5eefad556df502c218862e2f253ef8ad2100..5b4880b45db76d9e68e87fda0ece5b04966439c8 100644 --- a/theodolite/src/main/kotlin/theodolite/k8s/K8sManager.kt +++ b/theodolite/src/main/kotlin/theodolite/k8s/K8sManager.kt @@ -1,6 +1,7 @@ package theodolite.k8s import io.fabric8.kubernetes.api.model.ConfigMap +import io.fabric8.kubernetes.api.model.HasMetadata import io.fabric8.kubernetes.api.model.KubernetesResource import io.fabric8.kubernetes.api.model.Service import io.fabric8.kubernetes.api.model.apps.Deployment @@ -21,49 +22,31 @@ class K8sManager(private val client: NamespacedKubernetesClient) { * Deploys different k8s resources using the client. * @throws IllegalArgumentException if KubernetesResource not supported. */ - fun deploy(resource: KubernetesResource) { - when (resource) { - is Deployment -> - this.client.apps().deployments().createOrReplace(resource) - is Service -> - this.client.services().createOrReplace(resource) - is ConfigMap -> - this.client.configMaps().createOrReplace(resource) - is StatefulSet -> - this.client.apps().statefulSets().createOrReplace(resource) - is CustomResourceWrapper -> resource.deploy(client) - else -> throw IllegalArgumentException("Unknown Kubernetes resource.") - } + fun deploy(resource: HasMetadata) { + client.resource(resource).createOrReplace() } /** * Removes different k8s resources using the client. * @throws IllegalArgumentException if KubernetesResource not supported. */ - fun remove(resource: KubernetesResource) { + fun remove(resource: HasMetadata) { + client.resource(resource).delete() when (resource) { is Deployment -> { - this.client.apps().deployments().delete(resource) ResourceByLabelHandler(client = client) .blockUntilPodsDeleted( matchLabels = resource.spec.selector.matchLabels ) logger.info { "Deployment '${resource.metadata.name}' deleted." } } - is Service -> - this.client.services().delete(resource) - is ConfigMap -> - this.client.configMaps().delete(resource) is StatefulSet -> { - this.client.apps().statefulSets().delete(resource) ResourceByLabelHandler(client = client) .blockUntilPodsDeleted( matchLabels = resource.spec.selector.matchLabels ) logger.info { "StatefulSet '$resource.metadata.name' deleted." } } - is CustomResourceWrapper -> resource.delete(client) - else -> throw IllegalArgumentException("Unknown Kubernetes resource.") } } } diff --git a/theodolite/src/main/kotlin/theodolite/k8s/resourceLoader/AbstractK8sLoader.kt b/theodolite/src/main/kotlin/theodolite/k8s/resourceLoader/AbstractK8sLoader.kt deleted file mode 100644 index 36cfef9ce912886a638c200b502923dfe03ef5d0..0000000000000000000000000000000000000000 --- a/theodolite/src/main/kotlin/theodolite/k8s/resourceLoader/AbstractK8sLoader.kt +++ /dev/null @@ -1,84 +0,0 @@ -package theodolite.k8s.resourceLoader - -import io.fabric8.kubernetes.api.model.KubernetesResource -import mu.KotlinLogging -import theodolite.k8s.K8sContextFactory - -private val logger = KotlinLogging.logger {} - -abstract class AbstractK8sLoader: K8sResourceLoader { - - fun loadK8sResource(kind: String, resourceString: String): KubernetesResource { - return when (kind.replaceFirst(kind[0],kind[0].uppercaseChar())) { - "Deployment" -> loadDeployment(resourceString) - "Service" -> loadService(resourceString) - "ServiceMonitor" -> loadServiceMonitor(resourceString) - "PodMonitor" -> loadPodMonitor(resourceString) - "ConfigMap" -> loadConfigmap(resourceString) - "StatefulSet" -> loadStatefulSet(resourceString) - "Execution" -> loadExecution(resourceString) - "Benchmark" -> loadBenchmark(resourceString) - else -> { - logger.error { "Error during loading of unspecified resource Kind '$kind'." } - throw IllegalArgumentException("error while loading resource with kind: $kind") - } - } - } - - fun <T : KubernetesResource> loadGenericResource(resourceString: String, f: (String) -> T): T { - var resource: T? = null - - try { - resource = f(resourceString) - } catch (e: Exception) { - logger.warn { e } - } - - if (resource == null) { - throw IllegalArgumentException("The Resource: $resourceString could not be loaded") - } - return resource - } - - - - override fun loadServiceMonitor(resource: String): KubernetesResource { - val context = K8sContextFactory().create( - api = "v1", - scope = "Namespaced", - group = "monitoring.coreos.com", - plural = "servicemonitors" - ) - return loadCustomResourceWrapper(resource, context) - } - - override fun loadPodMonitor(resource: String): KubernetesResource { - val context = K8sContextFactory().create( - api = "v1", - scope = "Namespaced", - group = "monitoring.coreos.com", - plural = "podmonitors" - ) - return loadCustomResourceWrapper(resource, context) - } - - override fun loadExecution(resource: String): KubernetesResource { - val context = K8sContextFactory().create( - api = "v1", - scope = "Namespaced", - group = "theodolite.com", - plural = "executions" - ) - return loadCustomResourceWrapper(resource, context) - } - - override fun loadBenchmark(resource: String): KubernetesResource { - val context = K8sContextFactory().create( - api = "v1", - scope = "Namespaced", - group = "theodolite.com", - plural = "benchmarks" - ) - return loadCustomResourceWrapper(resource, context) - } -} \ No newline at end of file diff --git a/theodolite/src/main/kotlin/theodolite/k8s/resourceLoader/K8sResourceLoader.kt b/theodolite/src/main/kotlin/theodolite/k8s/resourceLoader/K8sResourceLoader.kt deleted file mode 100644 index 1487b64bf4f7fbcc735539a429be9237d41205bc..0000000000000000000000000000000000000000 --- a/theodolite/src/main/kotlin/theodolite/k8s/resourceLoader/K8sResourceLoader.kt +++ /dev/null @@ -1,16 +0,0 @@ -package theodolite.k8s.resourceLoader - -import io.fabric8.kubernetes.api.model.KubernetesResource -import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext - -interface K8sResourceLoader { - fun loadDeployment(resource: String): KubernetesResource - fun loadService(resource: String): KubernetesResource - fun loadStatefulSet(resource: String): KubernetesResource - fun loadExecution(resource: String): KubernetesResource - fun loadBenchmark(resource: String): KubernetesResource - fun loadConfigmap(resource: String): KubernetesResource - fun loadServiceMonitor(resource: String): KubernetesResource - fun loadPodMonitor(resource: String): KubernetesResource - fun loadCustomResourceWrapper(resource: String, context: CustomResourceDefinitionContext): KubernetesResource -} \ No newline at end of file diff --git a/theodolite/src/main/kotlin/theodolite/k8s/resourceLoader/K8sResourceLoaderFromFile.kt b/theodolite/src/main/kotlin/theodolite/k8s/resourceLoader/K8sResourceLoaderFromFile.kt deleted file mode 100644 index 08f34e1d67c9821c9f9a07a49f4ba8683a072611..0000000000000000000000000000000000000000 --- a/theodolite/src/main/kotlin/theodolite/k8s/resourceLoader/K8sResourceLoaderFromFile.kt +++ /dev/null @@ -1,75 +0,0 @@ -package theodolite.k8s.resourceLoader - -import io.fabric8.kubernetes.api.model.ConfigMap -import io.fabric8.kubernetes.api.model.KubernetesResource -import io.fabric8.kubernetes.api.model.Service -import io.fabric8.kubernetes.api.model.apps.Deployment -import io.fabric8.kubernetes.client.NamespacedKubernetesClient -import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext -import theodolite.k8s.CustomResourceWrapper -import theodolite.util.YamlParserFromFile - -/** - * Used to load different Kubernetes resources. - * Supports: Deployments, Services, ConfigMaps, and CustomResources. - * @param client KubernetesClient used to deploy or remove. - */ -class K8sResourceLoaderFromFile(private val client: NamespacedKubernetesClient): AbstractK8sLoader(), - K8sResourceLoader { - - /** - * Parses a Service from a service yaml - * @param resource of the yaml file - * @return Service from fabric8 - */ - override fun loadService(resource: String): Service { - return loadGenericResource(resource) { x: String -> client.services().load(x).get() } - } - - - /** - * Parses a CustomResource from a yaml - * @param path of the yaml file - * @param context specific crd context for this custom resource - * @return CustomResourceWrapper from fabric8 - */ - override fun loadCustomResourceWrapper(resource: String, context: CustomResourceDefinitionContext): CustomResourceWrapper { - return loadGenericResource(resource) { - CustomResourceWrapper( - YamlParserFromFile().parse( - resource, - HashMap<String, String>()::class.java - )!!, - context - ) - } - } - - /** - * Parses a Deployment from a Deployment yaml - * @param resource of the yaml file - * @return Deployment from fabric8 - */ - override fun loadDeployment(resource: String): Deployment { - return loadGenericResource(resource) { x: String -> client.apps().deployments().load(x).get() } - } - - /** - * Parses a ConfigMap from a ConfigMap yaml - * @param resource of the yaml file - * @return ConfigMap from fabric8 - */ - override fun loadConfigmap(resource: String): ConfigMap { - return loadGenericResource(resource) { x: String -> client.configMaps().load(x).get() } - } - - /** - * Parses a StatefulSet from a StatefulSet yaml - * @param resource of the yaml file - * @return StatefulSet from fabric8 - */ - override fun loadStatefulSet(resource: String): KubernetesResource { - return loadGenericResource(resource) { x: String -> client.apps().statefulSets().load(x).get() } - - } -} diff --git a/theodolite/src/main/kotlin/theodolite/k8s/resourceLoader/K8sResourceLoaderFromString.kt b/theodolite/src/main/kotlin/theodolite/k8s/resourceLoader/K8sResourceLoaderFromString.kt deleted file mode 100644 index 639e4c4584d47968cd718d601f1cd7064d85eda2..0000000000000000000000000000000000000000 --- a/theodolite/src/main/kotlin/theodolite/k8s/resourceLoader/K8sResourceLoaderFromString.kt +++ /dev/null @@ -1,55 +0,0 @@ -package theodolite.k8s.resourceLoader - -import io.fabric8.kubernetes.api.model.ConfigMap -import io.fabric8.kubernetes.api.model.KubernetesResource -import io.fabric8.kubernetes.api.model.Service -import io.fabric8.kubernetes.api.model.apps.Deployment -import io.fabric8.kubernetes.api.model.apps.StatefulSet -import io.fabric8.kubernetes.client.NamespacedKubernetesClient -import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext -import theodolite.k8s.CustomResourceWrapper -import theodolite.util.YamlParserFromString -import java.io.ByteArrayInputStream -import java.io.InputStream - -class K8sResourceLoaderFromString(private val client: NamespacedKubernetesClient): AbstractK8sLoader(), - K8sResourceLoader { - - override fun loadService(resource: String): Service { - return loadAnyResource(resource) { stream -> client.services().load(stream).get() } - } - - override fun loadDeployment(resource: String): Deployment { - return loadAnyResource(resource) { stream -> client.apps().deployments().load(stream).get() } - } - - override fun loadConfigmap(resource: String): ConfigMap { - return loadAnyResource(resource) { stream -> client.configMaps().load(stream).get() } - } - - override fun loadStatefulSet(resource: String): StatefulSet { - return loadAnyResource(resource) { stream -> client.apps().statefulSets().load(stream).get() } - } - - private fun <T : KubernetesResource> loadAnyResource(resource: String, f: (InputStream) -> T): T { - return loadGenericResource(resource) { f(ByteArrayInputStream(it.encodeToByteArray())) } - } - - /** - * Parses a CustomResource from a yaml - * @param resource of the yaml file - * @param context specific crd context for this custom resource - * @return CustomResourceWrapper from fabric8 - */ - override fun loadCustomResourceWrapper(resource: String, context: CustomResourceDefinitionContext): CustomResourceWrapper { - return loadGenericResource(resource) { - CustomResourceWrapper( - YamlParserFromString().parse( - resource, - HashMap<String, String>()::class.java - )!!, - context - ) - } - } -} \ No newline at end of file diff --git a/theodolite/src/main/kotlin/theodolite/util/PatcherDefinition.kt b/theodolite/src/main/kotlin/theodolite/util/PatcherDefinition.kt index 6ec0cce36751ec0343d40aa49fefa44f4c7fc918..fd2ac209a52e0d516ffa9ec07e465fa076ae665a 100644 --- a/theodolite/src/main/kotlin/theodolite/util/PatcherDefinition.kt +++ b/theodolite/src/main/kotlin/theodolite/util/PatcherDefinition.kt @@ -21,5 +21,5 @@ class PatcherDefinition { lateinit var resource: String @JsonSerialize - lateinit var properties: MutableMap<String, String> + lateinit var properties: Map<String, String> } diff --git a/theodolite/src/main/kotlin/theodolite/util/YamlParserFromFile.kt b/theodolite/src/main/kotlin/theodolite/util/YamlParserFromFile.kt index ae36349e628621bb7ad287d8cf557fbefa3ff5c5..58ca925e6aeeaca4f2f35c97c027ee2d24188e50 100644 --- a/theodolite/src/main/kotlin/theodolite/util/YamlParserFromFile.kt +++ b/theodolite/src/main/kotlin/theodolite/util/YamlParserFromFile.kt @@ -9,6 +9,7 @@ import java.io.InputStream /** * The YamlParser parses a YAML file */ +@Deprecated("Use Jackson ObjectMapper instead") class YamlParserFromFile : Parser { override fun <T> parse(path: String, E: Class<T>): T? { val input: InputStream = FileInputStream(File(path)) diff --git a/theodolite/src/main/kotlin/theodolite/util/YamlParserFromString.kt b/theodolite/src/main/kotlin/theodolite/util/YamlParserFromString.kt index 0e197908a501c0f6b89761a61989580b18e21f64..99c81f1ed674b2aa21f6aec7b3e0dff1b8c86840 100644 --- a/theodolite/src/main/kotlin/theodolite/util/YamlParserFromString.kt +++ b/theodolite/src/main/kotlin/theodolite/util/YamlParserFromString.kt @@ -6,6 +6,7 @@ import org.yaml.snakeyaml.constructor.Constructor /** * The YamlParser parses a YAML string */ +@Deprecated("Use Jackson ObjectMapper instead") class YamlParserFromString : Parser { override fun <T> parse(fileString: String, E: Class<T>): T? { val parser = Yaml(Constructor(E)) diff --git a/theodolite/src/test/kotlin/MockServerUtils.kt b/theodolite/src/test/kotlin/MockServerUtils.kt new file mode 100644 index 0000000000000000000000000000000000000000..0c8da45cd5d2dab948f9ccba1c2f4917050bb040 --- /dev/null +++ b/theodolite/src/test/kotlin/MockServerUtils.kt @@ -0,0 +1,20 @@ +import io.fabric8.kubernetes.api.model.APIResourceListBuilder +import io.fabric8.kubernetes.client.dsl.base.ResourceDefinitionContext +import io.fabric8.kubernetes.client.server.mock.KubernetesServer + +fun KubernetesServer.registerResource(context: ResourceDefinitionContext) { + val apiResourceList = APIResourceListBuilder() + .addNewResource() + .withName(context.plural) + .withKind(context.kind) + .withNamespaced(context.isNamespaceScoped) + .endResource() + .build() + + this + .expect() + .get() + .withPath("/apis/${context.group}/${context.version}") + .andReturn(200, apiResourceList) + .always() +} \ No newline at end of file diff --git a/theodolite/src/test/kotlin/theodolite/benchmark/ConfigMapResourceSetTest.kt b/theodolite/src/test/kotlin/theodolite/benchmark/ConfigMapResourceSetTest.kt index bc3263aa5fd06a8a19609d9f677db51f173cf54f..33a4572e368655744185312ff2352b1294d7bef6 100644 --- a/theodolite/src/test/kotlin/theodolite/benchmark/ConfigMapResourceSetTest.kt +++ b/theodolite/src/test/kotlin/theodolite/benchmark/ConfigMapResourceSetTest.kt @@ -1,32 +1,67 @@ package theodolite.benchmark -import com.google.gson.Gson +import com.fasterxml.jackson.databind.ObjectMapper import io.fabric8.kubernetes.api.model.* import io.fabric8.kubernetes.api.model.apps.Deployment import io.fabric8.kubernetes.api.model.apps.DeploymentBuilder import io.fabric8.kubernetes.api.model.apps.StatefulSet import io.fabric8.kubernetes.api.model.apps.StatefulSetBuilder +import io.fabric8.kubernetes.client.dsl.MixedOperation +import io.fabric8.kubernetes.client.dsl.Resource +import io.fabric8.kubernetes.client.dsl.base.ResourceDefinitionContext import io.fabric8.kubernetes.client.server.mock.KubernetesServer import io.quarkus.test.junit.QuarkusTest +import io.quarkus.test.kubernetes.client.KubernetesTestServer +import io.quarkus.test.kubernetes.client.WithKubernetesTestServer import org.junit.jupiter.api.AfterEach import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Assertions.assertTrue import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows -import theodolite.k8s.CustomResourceWrapper -import theodolite.k8s.resourceLoader.K8sResourceLoaderFromFile +import org.mockito.kotlin.mock +import registerResource +import theodolite.TestBenchmark +import theodolite.execution.operator.BenchmarkCRDummy +import theodolite.execution.operator.ExecutionClient +import theodolite.execution.operator.ExecutionEventHandler +import theodolite.execution.operator.ExecutionStateHandler +import theodolite.model.crd.BenchmarkCRD +import theodolite.model.crd.ExecutionCRD import theodolite.util.DeploymentFailedException +import java.io.FileInputStream -private const val testResourcePath = "./src/test/resources/k8s-resource-files/" +// TODO move somewhere else +typealias BenchmarkClient = MixedOperation<BenchmarkCRD, KubernetesResourceList<BenchmarkCRD>, Resource<BenchmarkCRD>> @QuarkusTest -class ConfigMapResourceSetTest { - private val server = KubernetesServer(false, true) +@WithKubernetesTestServer +internal class ConfigMapResourceSetTest { + + @KubernetesTestServer + private lateinit var server: KubernetesServer + + private val objectMapper: ObjectMapper = ObjectMapper() + + private lateinit var executionClient: ExecutionClient + private lateinit var benchmarkClient: BenchmarkClient @BeforeEach fun setUp() { server.before() + this.server.client + .apiextensions().v1() + .customResourceDefinitions() + .load(FileInputStream("crd/crd-execution.yaml")) + .create() + this.server.client + .apiextensions().v1() + .customResourceDefinitions() + .load(FileInputStream("crd/crd-benchmark.yaml")) + .create() + + this.executionClient = this.server.client.resources(ExecutionCRD::class.java) + this.benchmarkClient = this.server.client.resources(BenchmarkCRD::class.java) } @AfterEach @@ -34,184 +69,196 @@ class ConfigMapResourceSetTest { server.after() } - fun deployAndGetResource(resource: String): Collection<Pair<String, KubernetesResource>> { - val configMap1 = ConfigMapBuilder() + private fun deployAndGetResource(vararg resources: HasMetadata): ConfigMapResourceSet { + val configMap = ConfigMapBuilder() .withNewMetadata().withName("test-configmap").endMetadata() - .addToData("test-resource.yaml",resource) + .let { + resources.foldIndexed(it) { + i, b, r -> b.addToData("resource_$i.yaml", objectMapper.writeValueAsString(r)) + } + } .build() - server.client.configMaps().createOrReplace(configMap1) + server.client.configMaps().createOrReplace(configMap) val resourceSet = ConfigMapResourceSet() resourceSet.name = "test-configmap" - return resourceSet.getResourceSet(server.client) + return resourceSet } - @Test fun testLoadDeployment() { - val resourceBuilder = DeploymentBuilder() - resourceBuilder.withNewSpec().endSpec() - resourceBuilder.withNewMetadata().endMetadata() - val resource = resourceBuilder.build() - resource.metadata.name = "test-deployment" + val resource = DeploymentBuilder() + .withNewSpec() + .endSpec() + .withNewMetadata() + .withName("test-deployment") + .endMetadata() + .build() - val createdResource = deployAndGetResource(resource = Gson().toJson(resource)) + val createdResource = deployAndGetResource(resource).getResourceSet(server.client) assertEquals(1, createdResource.size) - assertTrue(createdResource.toMutableSet().first().second is Deployment) - assertTrue(createdResource.toMutableSet().first().second.toString().contains(other = resource.metadata.name)) + assertTrue(createdResource.toList().first().second is Deployment) + assertTrue(createdResource.toList().first().second.toString().contains(other = resource.metadata.name)) } @Test fun testLoadStateFulSet() { - val resourceBuilder = StatefulSetBuilder() - resourceBuilder.withNewSpec().endSpec() - resourceBuilder.withNewMetadata().endMetadata() - val resource = resourceBuilder.build() - resource.metadata.name = "test-resource" + val resource = StatefulSetBuilder() + .withNewSpec() + .endSpec() + .withNewMetadata() + .withName("test-sts") + .endMetadata() + .build() - val createdResource = deployAndGetResource(resource = Gson().toJson(resource)) + val createdResource = deployAndGetResource(resource).getResourceSet(server.client) assertEquals(1, createdResource.size) - assertTrue(createdResource.toMutableSet().first().second is StatefulSet) - assertTrue(createdResource.toMutableSet().first().second.toString().contains(other = resource.metadata.name)) + assertTrue(createdResource.toList().first().second is StatefulSet) + assertTrue(createdResource.toList().first().second.toString().contains(other = resource.metadata.name)) } @Test fun testLoadService() { - val resourceBuilder = ServiceBuilder() - resourceBuilder.withNewSpec().endSpec() - resourceBuilder.withNewMetadata().endMetadata() - val resource = resourceBuilder.build() - resource.metadata.name = "test-resource" + val resource = ServiceBuilder() + .withNewSpec() + .endSpec() + .withNewMetadata() + .withName("test-service") + .endMetadata() + .build() - val createdResource = deployAndGetResource(resource = Gson().toJson(resource)) + val createdResource = deployAndGetResource(resource).getResourceSet(server.client) assertEquals(1, createdResource.size) - assertTrue(createdResource.toMutableSet().first().second is Service) - assertTrue(createdResource.toMutableSet().first().second.toString().contains(other = resource.metadata.name)) + assertTrue(createdResource.toList().first().second is Service) + assertTrue(createdResource.toList().first().second.toString().contains(other = resource.metadata.name)) } @Test fun testLoadConfigMap() { - val resourceBuilder = ConfigMapBuilder() - resourceBuilder.withNewMetadata().endMetadata() - val resource = resourceBuilder.build() - resource.metadata.name = "test-resource" + val resource = ConfigMapBuilder() + .withNewMetadata() + .withName("test-configmap") + .endMetadata() + .build() - val createdResource = deployAndGetResource(resource = Gson().toJson(resource)) + val createdResource = deployAndGetResource(resource).getResourceSet(server.client) assertEquals(1, createdResource.size) - assertTrue(createdResource.toMutableSet().first().second is ConfigMap) - assertTrue(createdResource.toMutableSet().first().second.toString().contains(other = resource.metadata.name)) + assertTrue(createdResource.toList().first().second is ConfigMap) + assertTrue(createdResource.toList().first().second.toString().contains(other = resource.metadata.name)) } @Test fun testLoadExecution() { - val loader = K8sResourceLoaderFromFile(server.client) - val resource = loader.loadK8sResource("Execution", testResourcePath + "test-execution.yaml") as CustomResourceWrapper - val createdResource = deployAndGetResource(resource = Gson().toJson(resource.crAsMap)) + val stream = javaClass.getResourceAsStream("/k8s-resource-files/test-execution.yaml") + val execution = this.executionClient.load(stream).get() + val createdResource = deployAndGetResource(execution).getResourceSet(server.client) assertEquals(1, createdResource.size) - assertTrue(createdResource.toMutableSet().first().second is CustomResourceWrapper) + val loadedResource = createdResource.toList().first().second + assertTrue(loadedResource is ExecutionCRD) + assertEquals("example-execution", loadedResource.metadata.name) + - val loadedResource = createdResource.toMutableSet().first().second - if (loadedResource is CustomResourceWrapper){ - assertTrue(loadedResource.getName() == "example-execution") - } } @Test fun testLoadBenchmark() { - val loader = K8sResourceLoaderFromFile(server.client) - val resource = loader.loadK8sResource("Benchmark", testResourcePath + "test-benchmark.yaml") as CustomResourceWrapper - val createdResource = deployAndGetResource(resource = Gson().toJson(resource.crAsMap)) + val benchmark = BenchmarkCRDummy("example-benchmark").getCR() + val createdResource = deployAndGetResource(benchmark).getResourceSet(server.client) assertEquals(1, createdResource.size) - assertTrue(createdResource.toMutableSet().first().second is CustomResourceWrapper) - - val loadedResource = createdResource.toMutableSet().first().second - if (loadedResource is CustomResourceWrapper){ - assertTrue(loadedResource.getName() == "example-benchmark") - } + val loadedResource = createdResource.toList().first().second + assertTrue(loadedResource is BenchmarkCRD) + assertEquals("example-benchmark", loadedResource.metadata.name) } @Test fun testLoadServiceMonitor() { - val loader = K8sResourceLoaderFromFile(server.client) - val resource = loader.loadK8sResource("ServiceMonitor", testResourcePath + "test-service-monitor.yaml") as CustomResourceWrapper - val createdResource = deployAndGetResource(resource = Gson().toJson(resource.crAsMap)) + val serviceMonitorContext = ResourceDefinitionContext.Builder() + .withGroup("monitoring.coreos.com") + .withKind("ServiceMonitor") + .withPlural("servicemonitors") + .withNamespaced(true) + .withVersion("v1") + .build() + server.registerResource(serviceMonitorContext) - assertEquals(1, createdResource.size) - assertTrue(createdResource.toMutableSet().first().second is CustomResourceWrapper) + val stream = javaClass.getResourceAsStream("/k8s-resource-files/test-service-monitor.yaml") + val serviceMonitor = server.client.load(stream).get()[0] + val createdResource = deployAndGetResource(serviceMonitor).getResourceSet(server.client) - val loadedResource = createdResource.toMutableSet().first().second - if (loadedResource is CustomResourceWrapper){ - assertTrue(loadedResource.getName() == "test-service-monitor") - } + assertEquals(1, createdResource.size) + val loadedResource = createdResource.toList().first().second + assertTrue(loadedResource is GenericKubernetesResource) + assertEquals("ServiceMonitor", loadedResource.kind) + assertEquals("test-service-monitor", loadedResource.metadata.name) } @Test fun testMultipleFiles(){ - val resourceBuilder = DeploymentBuilder() - resourceBuilder.withNewSpec().endSpec() - resourceBuilder.withNewMetadata().endMetadata() - val resource = resourceBuilder.build() - resource.metadata.name = "test-deployment" - - val resourceBuilder1 = ConfigMapBuilder() - resourceBuilder1.withNewMetadata().endMetadata() - val resource1 = resourceBuilder1.build() - resource1.metadata.name = "test-configmap" - - val configMap1 = ConfigMapBuilder() - .withNewMetadata().withName("test-configmap").endMetadata() - .addToData("test-deployment.yaml",Gson().toJson(resource)) - .addToData("test-configmap.yaml",Gson().toJson(resource1)) + val deployment = DeploymentBuilder() + .withNewSpec() + .endSpec() + .withNewMetadata() + .withName("test-deployment") + .endMetadata() + .build() + val configMap = ConfigMapBuilder() + .withNewMetadata() + .withName("test-configmap") + .endMetadata() .build() - server.client.configMaps().createOrReplace(configMap1) - - val resourceSet = ConfigMapResourceSet() - resourceSet.name = "test-configmap" - - val createdResourcesSet = resourceSet.getResourceSet(server.client) + val createdResourceSet = deployAndGetResource(deployment, configMap).getResourceSet(server.client) - assertEquals(2,createdResourcesSet.size ) - assert(createdResourcesSet.toMutableList()[0].second is Deployment) - assert(createdResourcesSet.toMutableList()[1].second is ConfigMap) + assertEquals(2, createdResourceSet.size ) + assert(createdResourceSet.toList()[0].second is Deployment) + assert(createdResourceSet.toList()[1].second is ConfigMap) } @Test - fun testFileIsSet(){ - val resourceBuilder = DeploymentBuilder() - resourceBuilder.withNewSpec().endSpec() - resourceBuilder.withNewMetadata().endMetadata() - val resource = resourceBuilder.build() - resource.metadata.name = "test-deployment" - - val resourceBuilder1 = ConfigMapBuilder() - resourceBuilder1.withNewMetadata().endMetadata() - val resource1 = resourceBuilder1.build() - resource1.metadata.name = "test-configmap" - - val configMap1 = ConfigMapBuilder() - .withNewMetadata().withName("test-configmap").endMetadata() - .addToData("test-deployment.yaml",Gson().toJson(resource)) - .addToData("test-configmap.yaml",Gson().toJson(resource1)) + fun testFilesRestricted() { + val deployment = DeploymentBuilder() + .withNewSpec() + .endSpec() + .withNewMetadata() + .withName("test-deployment") + .endMetadata() + .build() + val configMap = ConfigMapBuilder() + .withNewMetadata() + .withName("test-configmap") + .endMetadata() .build() - server.client.configMaps().createOrReplace(configMap1) - - val resourceSet = ConfigMapResourceSet() - resourceSet.name = "test-configmap" - resourceSet.files = listOf("test-deployment.yaml") + val createdResourceSet = deployAndGetResource(deployment, configMap) + val allResources = createdResourceSet.getResourceSet(server.client) + assertEquals(2, allResources.size) + createdResourceSet.files = listOf(allResources.first().first) // only select first file from ConfigMa + val resources = createdResourceSet.getResourceSet(server.client) + assertEquals(1, resources.size) + assertTrue(resources.toList().first().second is Deployment) + } - val createdResourcesSet = resourceSet.getResourceSet(server.client) + @Test + fun testFileNotExist() { + val resource = DeploymentBuilder() + .withNewSpec() + .endSpec() + .withNewMetadata() + .withName("test-deployment") + .endMetadata() + .build() - assertEquals(1, createdResourcesSet.size ) - assert(createdResourcesSet.toMutableSet().first().second is Deployment) + val resourceSet = deployAndGetResource(resource) + resourceSet.files = listOf("non-existing-file.yaml") + assertThrows<DeploymentFailedException> { + resourceSet.getResourceSet(server.client) + } } - @Test fun testConfigMapNotExist() { val resourceSet = ConfigMapResourceSet() diff --git a/theodolite/src/test/kotlin/theodolite/benchmark/FileSystemResourceSetTest.kt b/theodolite/src/test/kotlin/theodolite/benchmark/FileSystemResourceSetTest.kt index f15685c8e0ecd67b99caabb77f68cc35a78b47f2..6a31875d00c8f578dcc475c3de21e130c595f673 100644 --- a/theodolite/src/test/kotlin/theodolite/benchmark/FileSystemResourceSetTest.kt +++ b/theodolite/src/test/kotlin/theodolite/benchmark/FileSystemResourceSetTest.kt @@ -1,28 +1,53 @@ package theodolite.benchmark -import io.fabric8.kubernetes.api.model.ConfigMap -import io.fabric8.kubernetes.api.model.Service +import io.fabric8.kubernetes.api.model.* import io.fabric8.kubernetes.api.model.apps.Deployment import io.fabric8.kubernetes.api.model.apps.StatefulSet +import io.fabric8.kubernetes.client.dsl.base.ResourceDefinitionContext import io.fabric8.kubernetes.client.server.mock.KubernetesServer -import org.junit.jupiter.api.AfterEach +import io.quarkus.test.junit.QuarkusTest +import io.quarkus.test.kubernetes.client.KubernetesTestServer +import io.quarkus.test.kubernetes.client.WithKubernetesTestServer +import org.junit.jupiter.api.* import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Assertions.assertTrue -import org.junit.jupiter.api.BeforeEach -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.assertThrows -import theodolite.k8s.CustomResourceWrapper +import org.junit.jupiter.api.io.TempDir +import registerResource +import theodolite.model.crd.BenchmarkCRD +import theodolite.model.crd.ExecutionCRD import theodolite.util.DeploymentFailedException +import java.io.FileInputStream +import java.nio.file.Files +import java.nio.file.Path -private const val testResourcePath = "./src/test/resources/k8s-resource-files/" - +@QuarkusTest +@WithKubernetesTestServer class FileSystemResourceSetTest { - private val server = KubernetesServer(false, true) + @KubernetesTestServer + private lateinit var server: KubernetesServer + + @TempDir + @JvmField + final var tempDir: Path? = null @BeforeEach fun setUp() { server.before() + this.server.client + .apiextensions().v1() + .customResourceDefinitions() + .load(FileInputStream("crd/crd-execution.yaml")) + .create() + this.server.client + .apiextensions().v1() + .customResourceDefinitions() + .load(FileInputStream("crd/crd-benchmark.yaml")) + .create() + + // Apparently we need create CRD clients once + this.server.client.resources(ExecutionCRD::class.java) + this.server.client.resources(BenchmarkCRD::class.java) } @AfterEach @@ -30,80 +55,113 @@ class FileSystemResourceSetTest { server.after() } + private fun copyTestResourceFile(fileName: String, tempDir: Path) { + val stream = javaClass.getResourceAsStream("/k8s-resource-files/$fileName") + ?: throw IllegalArgumentException("File does not exist") + val target = tempDir.resolve(fileName) + Files.copy(stream, target) + } + @Test - fun testLoadDeployment() { + fun testLoadDeployment(@TempDir tempDir: Path) { + copyTestResourceFile("test-deployment.yaml", tempDir) + val resourceSet = FileSystemResourceSet() - resourceSet.path = testResourcePath + resourceSet.path = tempDir.toString() resourceSet.files = listOf("test-deployment.yaml") - assertEquals(1,resourceSet.getResourceSet(server.client).size) - assertTrue(resourceSet.getResourceSet(server.client).toMutableSet().first().second is Deployment) + assertEquals(1, resourceSet.getResourceSet(server.client).size) + assertTrue(resourceSet.getResourceSet(server.client).toList().first().second is Deployment) } @Test - fun testLoadService() { + fun testLoadService(@TempDir tempDir: Path) { + copyTestResourceFile("test-service.yaml", tempDir) + val resourceSet = FileSystemResourceSet() - resourceSet.path = testResourcePath + resourceSet.path = tempDir.toString() resourceSet.files = listOf("test-service.yaml") - assertEquals(1,resourceSet.getResourceSet(server.client).size) - assertTrue(resourceSet.getResourceSet(server.client).toMutableSet().first().second is Service) + assertEquals(1, resourceSet.getResourceSet(server.client).size) + assertTrue(resourceSet.getResourceSet(server.client).toList().first().second is Service) } @Test - fun testLoadStatefulSet() { + fun testLoadStatefulSet(@TempDir tempDir: Path) { + copyTestResourceFile("test-statefulset.yaml", tempDir) + val resourceSet = FileSystemResourceSet() - resourceSet.path = testResourcePath + resourceSet.path = tempDir.toString() resourceSet.files = listOf("test-statefulset.yaml") - assertEquals(1,resourceSet.getResourceSet(server.client).size) - assertTrue(resourceSet.getResourceSet(server.client).toMutableSet().first().second is StatefulSet) + assertEquals(1, resourceSet.getResourceSet(server.client).size) + assertTrue(resourceSet.getResourceSet(server.client).toList().first().second is StatefulSet) } @Test - fun testLoadConfigMap() { + fun testLoadConfigMap(@TempDir tempDir: Path) { + copyTestResourceFile("test-configmap.yaml", tempDir) + val resourceSet = FileSystemResourceSet() - resourceSet.path = testResourcePath + resourceSet.path = tempDir.toString() resourceSet.files = listOf("test-configmap.yaml") - assertEquals(1,resourceSet.getResourceSet(server.client).size) - assertTrue(resourceSet.getResourceSet(server.client).toMutableSet().first().second is ConfigMap) + assertEquals(1, resourceSet.getResourceSet(server.client).size) + assertTrue(resourceSet.getResourceSet(server.client).toList().first().second is ConfigMap) } @Test - fun testLoadServiceMonitor() { + fun testLoadServiceMonitor(@TempDir tempDir: Path) { + val serviceMonitorContext = ResourceDefinitionContext.Builder() + .withGroup("monitoring.coreos.com") + .withKind("ServiceMonitor") + .withPlural("servicemonitors") + .withNamespaced(true) + .withVersion("v1") + .build() + server.registerResource(serviceMonitorContext) + + copyTestResourceFile("test-service-monitor.yaml", tempDir) + val resourceSet = FileSystemResourceSet() - resourceSet.path = testResourcePath + resourceSet.path = tempDir.toString() resourceSet.files = listOf("test-service-monitor.yaml") - assertEquals(1,resourceSet.getResourceSet(server.client).size) - assertTrue(resourceSet.getResourceSet(server.client).toMutableSet().first().second is CustomResourceWrapper) + assertEquals(1, resourceSet.getResourceSet(server.client).size) + assertTrue(resourceSet.getResourceSet(server.client).toList().first().second is GenericKubernetesResource) } @Test - fun testLoadBenchmark() { + fun testLoadBenchmark(@TempDir tempDir: Path) { + copyTestResourceFile("test-benchmark.yaml", tempDir) + val resourceSet = FileSystemResourceSet() - resourceSet.path = testResourcePath + resourceSet.path = tempDir.toString() resourceSet.files = listOf("test-benchmark.yaml") - assertEquals(1,resourceSet.getResourceSet(server.client).size) - assertTrue(resourceSet.getResourceSet(server.client).toMutableSet().first().second is CustomResourceWrapper) + assertEquals(1, resourceSet.getResourceSet(server.client).size) + assertTrue(resourceSet.getResourceSet(server.client).toList().first().second is BenchmarkCRD) } @Test - fun testLoadExecution() { + fun testLoadExecution(@TempDir tempDir: Path) { + copyTestResourceFile("test-execution.yaml", tempDir) + val resourceSet = FileSystemResourceSet() - resourceSet.path = testResourcePath + resourceSet.path = tempDir.toString() resourceSet.files = listOf("test-execution.yaml") - assertEquals(1,resourceSet.getResourceSet(server.client).size) - assertTrue(resourceSet.getResourceSet(server.client).toMutableSet().first().second is CustomResourceWrapper) + assertEquals(1, resourceSet.getResourceSet(server.client).size) + assertTrue(resourceSet.getResourceSet(server.client).toList().first().second is ExecutionCRD) } @Test - fun testFilesNotSet() { + fun testFilesNotSet(@TempDir tempDir: Path) { + copyTestResourceFile("test-deployment.yaml", tempDir) + copyTestResourceFile("test-service.yaml", tempDir) + val resourceSet = FileSystemResourceSet() - resourceSet.path = testResourcePath - assertEquals(9,resourceSet.getResourceSet(server.client).size) + resourceSet.path = tempDir.toString() + assertEquals(2, resourceSet.getResourceSet(server.client).size) } @Test - fun testWrongPath() { + fun testWrongPath(@TempDir tempDir: Path) { val resourceSet = FileSystemResourceSet() - resourceSet.path = "/abc/not-exist" + resourceSet.path = "/not/existing/path" assertThrows<DeploymentFailedException> { resourceSet.getResourceSet(server.client) } diff --git a/theodolite/src/test/kotlin/theodolite/execution/operator/StateHandlerTest.kt b/theodolite/src/test/kotlin/theodolite/execution/operator/StateHandlerTest.kt index 138f79eadc6bdee17e62cc7a961eb7de539fa3df..ba65356b65b6ac23ca56c268bb003815917cf162 100644 --- a/theodolite/src/test/kotlin/theodolite/execution/operator/StateHandlerTest.kt +++ b/theodolite/src/test/kotlin/theodolite/execution/operator/StateHandlerTest.kt @@ -11,14 +11,12 @@ import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.DisplayName import org.junit.jupiter.api.Test import theodolite.k8s.K8sManager -import theodolite.k8s.resourceLoader.K8sResourceLoaderFromFile +import theodolite.model.crd.ExecutionCRD import theodolite.model.crd.ExecutionState -import java.time.Duration @QuarkusTest @WithKubernetesTestServer class StateHandlerTest { - private val testResourcePath = "./src/test/resources/k8s-resource-files/" @KubernetesTestServer private lateinit var server: KubernetesServer @@ -26,8 +24,8 @@ class StateHandlerTest { @BeforeEach fun setUp() { server.before() - val executionResource = K8sResourceLoaderFromFile(server.client) - .loadK8sResource("Execution", testResourcePath + "test-execution.yaml") + val executionStream = javaClass.getResourceAsStream("/k8s-resource-files/test-execution.yaml") + val executionResource = server.client.resources(ExecutionCRD::class.java).load(executionStream).get() K8sManager(server.client).deploy(executionResource) } diff --git a/theodolite/src/test/kotlin/theodolite/k8s/K8sManagerTest.kt b/theodolite/src/test/kotlin/theodolite/k8s/K8sManagerTest.kt index ffc3f2f2b8083ab8b8170fa77c19de3a6ef387e7..ee80d55caf995642f6fff04cfeeb66bc08ab93d3 100644 --- a/theodolite/src/test/kotlin/theodolite/k8s/K8sManagerTest.kt +++ b/theodolite/src/test/kotlin/theodolite/k8s/K8sManagerTest.kt @@ -1,32 +1,31 @@ package theodolite.k8s -import com.fasterxml.jackson.annotation.JsonIgnoreProperties import io.fabric8.kubernetes.api.model.* import io.fabric8.kubernetes.api.model.apps.Deployment import io.fabric8.kubernetes.api.model.apps.DeploymentBuilder import io.fabric8.kubernetes.api.model.apps.StatefulSet import io.fabric8.kubernetes.api.model.apps.StatefulSetBuilder +import io.fabric8.kubernetes.client.dsl.base.ResourceDefinitionContext import io.fabric8.kubernetes.client.server.mock.KubernetesServer import io.quarkus.test.junit.QuarkusTest -import org.json.JSONObject -import org.junit.jupiter.api.AfterEach +import io.quarkus.test.kubernetes.client.KubernetesTestServer +import io.quarkus.test.kubernetes.client.WithKubernetesTestServer import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.DisplayName import org.junit.jupiter.api.Test -import theodolite.k8s.resourceLoader.K8sResourceLoaderFromFile +import registerResource + @QuarkusTest -@JsonIgnoreProperties(ignoreUnknown = true) -class K8sManagerTest { - @JsonIgnoreProperties(ignoreUnknown = true) - private final val server = KubernetesServer(false, true) - private final val testResourcePath = "./src/test/resources/k8s-resource-files/" +@WithKubernetesTestServer +internal class K8sManagerTest { + + @KubernetesTestServer + private lateinit var server: KubernetesServer private final val resourceName = "test-resource" private final val metadata: ObjectMeta = ObjectMetaBuilder().withName(resourceName).build() - val defaultDeployment: Deployment = DeploymentBuilder() .withMetadata(metadata) .withNewSpec() @@ -53,18 +52,6 @@ class K8sManagerTest { .withMetadata(metadata) .build() - @BeforeEach - fun setUp() { - server.before() - - } - - @AfterEach - fun tearDown() { - server.after() - - } - @Test @DisplayName("Test handling of Deployments") fun handleDeploymentTest() { @@ -121,32 +108,29 @@ class K8sManagerTest { @Test @DisplayName("Test handling of custom resources") fun handleCustomResourcesTest() { - val manager = K8sManager(server.client) - val servicemonitor = K8sResourceLoaderFromFile(server.client) - .loadK8sResource("ServiceMonitor", testResourcePath + "test-service-monitor.yaml") + val serviceMonitorContext = ResourceDefinitionContext.Builder() + .withGroup("monitoring.coreos.com") + .withKind("ServiceMonitor") + .withPlural("servicemonitors") + .withNamespaced(true) + .withVersion("v1") + .build() + server.registerResource(serviceMonitorContext) - val serviceMonitorContext = K8sContextFactory().create( - api = "v1", - scope = "Namespaced", - group = "monitoring.coreos.com", - plural = "servicemonitors" - ) - manager.deploy(servicemonitor) + val manager = K8sManager(server.client) - var serviceMonitors = JSONObject(server.client.customResource(serviceMonitorContext).list()) - .getJSONArray("items") + val serviceMonitorStream = javaClass.getResourceAsStream("/k8s-resource-files/test-service-monitor.yaml") + val serviceMonitor = server.client.load(serviceMonitorStream).get()[0] - assertEquals(1, serviceMonitors.length()) - assertEquals( - "test-service-monitor", - serviceMonitors.getJSONObject(0).getJSONObject("metadata").getString("name") - ) + manager.deploy(serviceMonitor) - manager.remove(servicemonitor) + val serviceMonitorsDeployed = server.client.genericKubernetesResources(serviceMonitorContext).list() + assertEquals(1, serviceMonitorsDeployed.items.size) + assertEquals("test-service-monitor", serviceMonitorsDeployed.items[0].metadata.name) - serviceMonitors = JSONObject(server.client.customResource(serviceMonitorContext).list()) - .getJSONArray("items") + manager.remove(serviceMonitor) - assertEquals(0, serviceMonitors.length()) + val serviceMonitorsDeleted = server.client.genericKubernetesResources(serviceMonitorContext).list() + assertEquals(0, serviceMonitorsDeleted.items.size) } } \ No newline at end of file diff --git a/theodolite/src/test/kotlin/theodolite/k8s/K8sResourceLoaderTest.kt b/theodolite/src/test/kotlin/theodolite/k8s/K8sResourceLoaderTest.kt deleted file mode 100644 index 4a41dac8b27b9d4ddcfb9915f759b14ea4eaba4a..0000000000000000000000000000000000000000 --- a/theodolite/src/test/kotlin/theodolite/k8s/K8sResourceLoaderTest.kt +++ /dev/null @@ -1,111 +0,0 @@ -package theodolite.k8s - -import io.fabric8.kubernetes.api.model.ConfigMap -import io.fabric8.kubernetes.api.model.Service -import io.fabric8.kubernetes.api.model.apps.Deployment -import io.fabric8.kubernetes.api.model.apps.StatefulSet -import io.fabric8.kubernetes.client.server.mock.KubernetesServer -import io.quarkus.test.junit.QuarkusTest -import org.junit.jupiter.api.AfterEach -import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.Assertions.assertTrue -import org.junit.jupiter.api.BeforeEach -import org.junit.jupiter.api.DisplayName -import org.junit.jupiter.api.Test -import theodolite.k8s.resourceLoader.K8sResourceLoaderFromFile - -@QuarkusTest -class K8sResourceLoaderTest { - private final val server = KubernetesServer(false, true) - private final val testResourcePath = "./src/test/resources/k8s-resource-files/" - - @BeforeEach - fun setUp() { - server.before() - } - - @AfterEach - fun tearDown() { - server.after() - } - - @Test - @DisplayName("Test loading of Deployments") - fun loadDeploymentTest() { - val loader = K8sResourceLoaderFromFile(server.client) - val resource = loader.loadK8sResource("Deployment", testResourcePath + "test-deployment.yaml") - - assertTrue(resource is Deployment) - assertTrue(resource.toString().contains("name=test-deployment")) - } - - @Test - @DisplayName("Test loading of StatefulSet") - fun loadStatefulSetTest() { - val loader = K8sResourceLoaderFromFile(server.client) - val resource = loader.loadK8sResource("StatefulSet", testResourcePath + "test-statefulset.yaml") - - assertTrue(resource is StatefulSet) - assertTrue(resource.toString().contains("name=test-statefulset")) - } - - @Test - @DisplayName("Test loading of Service") - fun loadServiceTest() { - val loader = K8sResourceLoaderFromFile(server.client) - val resource = loader.loadK8sResource("Service", testResourcePath + "test-service.yaml") - - assertTrue(resource is Service) - assertTrue(resource.toString().contains("name=test-service")) - } - - @Test - @DisplayName("Test loading of ConfigMap") - fun loadConfigMapTest() { - val loader = K8sResourceLoaderFromFile(server.client) - val resource = loader.loadK8sResource("ConfigMap", testResourcePath + "test-configmap.yaml") - - assertTrue(resource is ConfigMap) - assertTrue(resource.toString().contains("name=test-configmap")) - } - - @Test - @DisplayName("Test loading of ServiceMonitors") - fun loadServiceMonitorTest() { - val loader = K8sResourceLoaderFromFile(server.client) - val resource = loader.loadK8sResource("ServiceMonitor", testResourcePath + "test-service-monitor.yaml") - - assertTrue(resource is CustomResourceWrapper) - if (resource is CustomResourceWrapper) { - assertEquals("test-service-monitor", resource.getName()) - - } - } - - @Test - @DisplayName("Test loading of Executions") - fun loadExecutionTest() { - val loader = K8sResourceLoaderFromFile(server.client) - val resource = loader.loadK8sResource("Execution", testResourcePath + "test-execution.yaml") - - assertTrue(resource is CustomResourceWrapper) - if (resource is CustomResourceWrapper) { - assertEquals("example-execution", resource.getName()) - - } - } - - @Test - @DisplayName("Test loading of Benchmarks") - fun loadBenchmarkTest() { - val loader = K8sResourceLoaderFromFile(server.client) - val resource = loader.loadK8sResource("Benchmark", testResourcePath + "test-benchmark.yaml") - - assertTrue(resource is CustomResourceWrapper) - if (resource is CustomResourceWrapper) { - assertEquals("example-benchmark", resource.getName()) - - } - } - -} \ No newline at end of file diff --git a/theodolite/src/test/kotlin/theodolite/ResourceLimitPatcherTest.kt b/theodolite/src/test/kotlin/theodolite/patcher/ResourceLimitPatcherTest.kt similarity index 69% rename from theodolite/src/test/kotlin/theodolite/ResourceLimitPatcherTest.kt rename to theodolite/src/test/kotlin/theodolite/patcher/ResourceLimitPatcherTest.kt index b7fc2d9f1b2d5110f974b3805584baa3903d5eb1..2769f2fef607a03d820b0821969db98894944cb3 100644 --- a/theodolite/src/test/kotlin/theodolite/ResourceLimitPatcherTest.kt +++ b/theodolite/src/test/kotlin/theodolite/patcher/ResourceLimitPatcherTest.kt @@ -1,11 +1,12 @@ -package theodolite +package theodolite.patcher -import io.fabric8.kubernetes.api.model.apps.Deployment -import io.fabric8.kubernetes.client.DefaultKubernetesClient +import io.fabric8.kubernetes.client.server.mock.KubernetesServer import io.quarkus.test.junit.QuarkusTest +import io.quarkus.test.kubernetes.client.KubernetesTestServer +import io.quarkus.test.kubernetes.client.WithKubernetesTestServer import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.Disabled import org.junit.jupiter.api.Test -import theodolite.k8s.resourceLoader.K8sResourceLoaderFromFile import theodolite.patcher.PatcherFactory import theodolite.util.PatcherDefinition @@ -20,40 +21,44 @@ import theodolite.util.PatcherDefinition * Case 4: In the given YAML declaration neither `Resource Request` nor `Request Limit` is defined */ @QuarkusTest +@WithKubernetesTestServer +@Disabled class ResourceLimitPatcherTest { - val testPath = "./src/test/resources/" - val loader = K8sResourceLoaderFromFile(DefaultKubernetesClient().inNamespace("")) + val patcherFactory = PatcherFactory() + @KubernetesTestServer + private lateinit var server: KubernetesServer + fun applyTest(fileName: String) { val cpuValue = "50m" val memValue = "3Gi" - val k8sResource = loader.loadK8sResource("Deployment", testPath + fileName) as Deployment + val k8sResource = server.client.apps().deployments().load(javaClass.getResourceAsStream(fileName)).get() val defCPU = PatcherDefinition() - defCPU.resource = "cpu-memory-deployment.yaml" + defCPU.resource = "/cpu-memory-deployment.yaml" defCPU.type = "ResourceLimitPatcher" - defCPU.properties = mutableMapOf( + defCPU.properties = mapOf( "limitedResource" to "cpu", "container" to "application" ) val defMEM = PatcherDefinition() - defMEM.resource = "cpu-memory-deployment.yaml" + defMEM.resource = "/cpu-memory-deployment.yaml" defMEM.type = "ResourceLimitPatcher" - defMEM.properties = mutableMapOf( + defMEM.properties = mapOf( "limitedResource" to "memory", "container" to "uc-application" ) patcherFactory.createPatcher( patcherDefinition = defCPU, - k8sResources = listOf(Pair("cpu-memory-deployment.yaml", k8sResource)) + k8sResources = listOf(Pair("/cpu-memory-deployment.yaml", k8sResource)) ).patch(value = cpuValue) patcherFactory.createPatcher( patcherDefinition = defMEM, - k8sResources = listOf(Pair("cpu-memory-deployment.yaml", k8sResource)) + k8sResources = listOf(Pair("/cpu-memory-deployment.yaml", k8sResource)) ).patch(value = memValue) k8sResource.spec.template.spec.containers.filter { it.name == defCPU.properties["container"]!! } @@ -66,24 +71,24 @@ class ResourceLimitPatcherTest { @Test fun testWithExistingCpuAndMemoryDeclarations() { // Case 1: In the given YAML declaration memory and cpu are defined - applyTest("cpu-memory-deployment.yaml") + applyTest("/cpu-memory-deployment.yaml") } @Test fun testOnlyWithExistingCpuDeclarations() { // Case 2: In the given YAML declaration only cpu is defined - applyTest("cpu-deployment.yaml") + applyTest("/cpu-deployment.yaml") } @Test fun testOnlyWithExistingMemoryDeclarations() { // Case 3: In the given YAML declaration only memory is defined - applyTest("memory-deployment.yaml") + applyTest("/memory-deployment.yaml") } @Test fun testWithoutResourceDeclarations() { // Case 4: In the given YAML declaration neither `Resource Request` nor `Request Limit` is defined - applyTest("no-resources-deployment.yaml") + applyTest("/no-resources-deployment.yaml") } } diff --git a/theodolite/src/test/kotlin/theodolite/ResourceRequestPatcherTest.kt b/theodolite/src/test/kotlin/theodolite/patcher/ResourceRequestPatcherTest.kt similarity index 69% rename from theodolite/src/test/kotlin/theodolite/ResourceRequestPatcherTest.kt rename to theodolite/src/test/kotlin/theodolite/patcher/ResourceRequestPatcherTest.kt index 8794d4dc2d67b8af78f4fa409c727f882922d0b8..dba91eb65d4474d38f64d7fdd7f7ab981f8eb30f 100644 --- a/theodolite/src/test/kotlin/theodolite/ResourceRequestPatcherTest.kt +++ b/theodolite/src/test/kotlin/theodolite/patcher/ResourceRequestPatcherTest.kt @@ -1,12 +1,11 @@ -package theodolite +package theodolite.patcher -import io.fabric8.kubernetes.api.model.apps.Deployment -import io.fabric8.kubernetes.client.DefaultKubernetesClient +import io.fabric8.kubernetes.client.server.mock.KubernetesServer import io.quarkus.test.junit.QuarkusTest +import io.quarkus.test.kubernetes.client.KubernetesTestServer +import io.quarkus.test.kubernetes.client.WithKubernetesTestServer import io.smallrye.common.constraint.Assert.assertTrue import org.junit.jupiter.api.Test -import theodolite.k8s.resourceLoader.K8sResourceLoaderFromFile -import theodolite.patcher.PatcherFactory import theodolite.util.PatcherDefinition /** @@ -20,39 +19,42 @@ import theodolite.util.PatcherDefinition * Case 4: In the given YAML declaration neither `Resource Request` nor `Request Limit` is defined */ @QuarkusTest +@WithKubernetesTestServer class ResourceRequestPatcherTest { - val testPath = "./src/test/resources/" - val loader = K8sResourceLoaderFromFile(DefaultKubernetesClient().inNamespace("")) + + @KubernetesTestServer + private lateinit var server: KubernetesServer + val patcherFactory = PatcherFactory() fun applyTest(fileName: String) { val cpuValue = "50m" val memValue = "3Gi" - val k8sResource = loader.loadK8sResource("Deployment", testPath + fileName) as Deployment + val k8sResource = server.client.apps().deployments().load(javaClass.getResourceAsStream(fileName)).get() val defCPU = PatcherDefinition() - defCPU.resource = "cpu-memory-deployment.yaml" + defCPU.resource = "/cpu-memory-deployment.yaml" defCPU.type = "ResourceRequestPatcher" - defCPU.properties = mutableMapOf( + defCPU.properties = mapOf( "requestedResource" to "cpu", "container" to "application" ) val defMEM = PatcherDefinition() - defMEM.resource = "cpu-memory-deployment.yaml" + defMEM.resource = "/cpu-memory-deployment.yaml" defMEM.type = "ResourceRequestPatcher" - defMEM.properties = mutableMapOf( + defMEM.properties = mapOf( "requestedResource" to "memory", "container" to "application" ) patcherFactory.createPatcher( patcherDefinition = defCPU, - k8sResources = listOf(Pair("cpu-memory-deployment.yaml", k8sResource)) + k8sResources = listOf(Pair("/cpu-memory-deployment.yaml", k8sResource)) ).patch(value = cpuValue) patcherFactory.createPatcher( patcherDefinition = defMEM, - k8sResources = listOf(Pair("cpu-memory-deployment.yaml", k8sResource)) + k8sResources = listOf(Pair("/cpu-memory-deployment.yaml", k8sResource)) ).patch(value = memValue) k8sResource.spec.template.spec.containers.filter { it.name == defCPU.properties["container"]!! } @@ -65,24 +67,24 @@ class ResourceRequestPatcherTest { @Test fun testWithExistingCpuAndMemoryDeclarations() { // Case 1: In the given YAML declaration memory and cpu are defined - applyTest("cpu-memory-deployment.yaml") + applyTest("/cpu-memory-deployment.yaml") } @Test fun testOnlyWithExistingCpuDeclarations() { // Case 2: In the given YAML declaration only cpu is defined - applyTest("cpu-deployment.yaml") + applyTest("/cpu-deployment.yaml") } @Test fun testOnlyWithExistingMemoryDeclarations() { // Case 3: In the given YAML declaration only memory is defined - applyTest("memory-deployment.yaml") + applyTest("/memory-deployment.yaml") } @Test fun testWithoutResourceDeclarations() { // Case 4: In the given YAML declaration neither `Resource Request` nor `Request Limit` is defined - applyTest("no-resources-deployment.yaml") + applyTest("/no-resources-deployment.yaml") } } diff --git a/theodolite/src/test/kotlin/theodolite/util/IOHandlerTest.kt b/theodolite/src/test/kotlin/theodolite/util/IOHandlerTest.kt index f84536bfc029a829c1798293938386965eedcf47..3b31f389bdeb35e6016a56a98abb1e13bf83ae18 100644 --- a/theodolite/src/test/kotlin/theodolite/util/IOHandlerTest.kt +++ b/theodolite/src/test/kotlin/theodolite/util/IOHandlerTest.kt @@ -12,7 +12,6 @@ import org.junit.rules.TemporaryFolder import org.junitpioneer.jupiter.ClearEnvironmentVariable import org.junitpioneer.jupiter.SetEnvironmentVariable - const val FOLDER_URL = "Test-Folder" @QuarkusTest @@ -56,11 +55,12 @@ internal class IOHandlerTest { columns = columns ) - var expected = "Fruit,Color\n" - testContent.forEach { expected += it[0] + "," + it[1] + "\n" } + val expected = (listOf(listOf("Fruit", "Color")) + testContent) + .map { "${it[0]},${it[1]}" } + .reduce { left, right -> left + System.lineSeparator() + right } assertEquals( - expected.trim(), + expected, IOHandler().readFileAsString("${folder.absolutePath}/test-file.csv") ) } diff --git a/theodolite/src/test/resources/k8s-resource-files/test-benchmark.yaml b/theodolite/src/test/resources/k8s-resource-files/test-benchmark.yaml index 1ba204bb2821f9b734706d322322b28220ef19d5..ea9ee8471d3da1dc6011348bd978696bd0fa6f36 100644 --- a/theodolite/src/test/resources/k8s-resource-files/test-benchmark.yaml +++ b/theodolite/src/test/resources/k8s-resource-files/test-benchmark.yaml @@ -3,14 +3,19 @@ kind: benchmark metadata: name: example-benchmark spec: - appResource: - - "uc1-kstreams-deployment.yaml" - - "aggregation-service.yaml" - - "jmx-configmap.yaml" - - "uc1-service-monitor.yaml" - loadGenResource: - - "uc1-load-generator-deployment.yaml" - - "uc1-load-generator-service.yaml" + sut: + resources: + - configMap: + name: "example-configmap" + files: + - "uc1-kstreams-deployment.yaml" + loadGenerator: + resources: + - configMap: + name: "example-configmap" + files: + - uc1-load-generator-service.yaml + - uc1-load-generator-deployment.yaml resourceTypes: - typeName: "Instances" patchers: @@ -35,4 +40,4 @@ spec: numPartitions: 40 replicationFactor: 1 - name: "theodolite-.*" - removeOnly: True \ No newline at end of file + removeOnly: True