diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index a8bf42eddaa0b4896d853b7935040774f300353b..0dda0bdb6be4434c91801cb6665364fb7fd63d6a 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -18,4 +18,15 @@ benchmarks: - benchmarks/* - when: manual allow_failure: true - \ No newline at end of file + +execution: + stage: triggers + trigger: + include: execution/.gitlab-ci.yml + strategy: depend + rules: + - if: "$CI_COMMIT_TAG" + - changes: + - execution/* + - when: manual + allow_failure: true diff --git a/analysis/.dockerignore b/analysis/.dockerignore new file mode 100644 index 0000000000000000000000000000000000000000..9a715f53b8129933fe1b20baa4af20772de3c872 --- /dev/null +++ b/analysis/.dockerignore @@ -0,0 +1,2 @@ +.dockerignore +Dockerfile \ No newline at end of file diff --git a/analysis/Dockerfile b/analysis/Dockerfile new file mode 100644 index 0000000000000000000000000000000000000000..1e396697f34f86e578890cbb68b7a8d40a21ebf8 --- /dev/null +++ b/analysis/Dockerfile @@ -0,0 +1,7 @@ +FROM jupyter/base-notebook + +COPY . /home/jovyan + +WORKDIR /home/jovyan +RUN rm -r work +RUN pip install -r requirements.txt diff --git a/analysis/README.md b/analysis/README.md index 263b1db16fcabefe5409ebe744afe5997bc90d89..3c96cf0b6e67a60ebbb4c610ca69fcbcb27876a0 100644 --- a/analysis/README.md +++ b/analysis/README.md @@ -3,20 +3,44 @@ This directory contains Jupyter notebooks for analyzing and visualizing benchmark execution results and plotting. The following notebooks are provided: +* [demand-metric.ipynb](demand-metric.ipynb): Create CSV files describing scalability according to the Theodolite `demand` metric. +* [demand-metric-plot.ipynb](demand-metric-plot.ipynb): Create plots based on such CSV files of the `demand` metric. + +For legacy reasons, we also provide the following notebooks, which, however, are not documented: + * [scalability-graph.ipynb](scalability-graph.ipynb): Creates a scalability graph for a certain benchmark execution. * [scalability-graph-final.ipynb](scalability-graph-final.ipynb): Combines the scalability graphs of multiple benchmarks executions (e.g. for comparing different configuration). * [lag-trend-graph.ipynb](lag-trend-graph.ipynb): Visualizes the consumer lag evaluation over time along with the computed trend. ## Usage -For executing benchmarks and analyzing their results, a **Python 3.7** -installation is required (e.g., in a virtual environment). Our notebooks require some -Python libraries, which can be installed via: +In general, the Theodolite Analysis Jupyter notebooks should be runnable by any Jupyter server. To make it a bit easier, +we provide introductions for running notebooks with Docker and with Visual Studio Code. These intoduction may also be +a good starting point for using another service. + +For analyzing and visualizing benchmark results, either Docker or a Jupyter installation with Python 3.7 or 3.8 is +required (e.g., in a virtual environment). **Please note that Python 3.9 seems not to be working as not all our +dependencies are ported to Python 3.9 yet.** + +### Running with Docker + +This option requires Docker to be installed. You can build and run a container using the following commands. Make sure +to set the `results` volume to the directory with your execution results and `results-inst` to a directory where the +final scalability graphs should be placed. The output of the *run* command gives you an URL of the form +`http://127.0.0.1:8888/?token=...`, which you should open in your webbrowser. From there you can access all notebooks. +You can stop the Jupyter server with Crtl + C. ```sh -pip install -r requirements.txt +docker build . -t theodolite-analysis +docker run --rm -p 8888:8888 -v "$PWD/../results":/home/jovyan/results -v "$PWD/../results-inst":/home/jovyan/results-inst theodolite-analysis ``` -We have tested these -notebooks with [Visual Studio Code](https://code.visualstudio.com/docs/python/jupyter-support), -however, every other server should be fine as well. +### Running with Visual Studio Code + +The [Visual Studio Code Documentation](https://code.visualstudio.com/docs/python/jupyter-support) shows to run Jupyter +notebooks with Visual Studio Code. For our notebooks, Python 3.7 or newer is required (e.g., in a virtual environment). +Moreover, they require some Python libraries, which can be installed by: + +```sh +pip install -r requirements.txt +``` \ No newline at end of file diff --git a/analysis/demand-metric-plot.ipynb b/analysis/demand-metric-plot.ipynb new file mode 100644 index 0000000000000000000000000000000000000000..95f371510bbcc8af785739c50bce42e969ea2b80 --- /dev/null +++ b/analysis/demand-metric-plot.ipynb @@ -0,0 +1,173 @@ +{ + "cells": [ + { + "source": [ + "# Theodolite Analysis - Plotting the Demand Metric\n", + "\n", + "This notebook creates a plot, showing scalability as a function that maps load intensities to the resources required for processing them. It is able to combine multiple such plots in one figure, for example, to compare multiple systems or configurations.\n", + "\n", + "The notebook takes a CSV file for each plot mapping load intensities to minimum required resources, computed by the `demand-metric-plot.ipynb` notebook." + ], + "cell_type": "markdown", + "metadata": {} + }, + { + "source": [ + "First, we need to import some libraries, which are required for creating the plots." + ], + "cell_type": "markdown", + "metadata": {} + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "import pandas as pd\n", + "from functools import reduce\n", + "import matplotlib.pyplot as plt\n", + "from matplotlib.ticker import FuncFormatter\n", + "from matplotlib.ticker import MaxNLocator" + ] + }, + { + "source": [ + "We need to specify the directory, where the demand CSV files can be found, and a dictionary that maps a system description (e.g. its name) to the corresponding CSV file (prefix). " + ], + "cell_type": "markdown", + "metadata": {} + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "results_dir = '<path-to>/results'\n", + "\n", + "experiments = {\n", + " 'System XYZ': 'exp200',\n", + "}\n" + ] + }, + { + "source": [ + "Now, we combie all systems described in `experiments`." + ], + "cell_type": "markdown", + "metadata": {} + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "dataframes = [pd.read_csv(os.path.join(results_dir, f'{v}_demand.csv')).set_index('load').rename(columns={\"resources\": k}) for k, v in experiments.items()]\n", + "\n", + "df = reduce(lambda df1,df2: df1.join(df2,how='outer'), dataframes)" + ] + }, + { + "source": [ + "We might want to display the mappings before we plot it." + ], + "cell_type": "markdown", + "metadata": {} + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "df" + ] + }, + { + "source": [ + "The following code creates a MatPlotLib figure showing the scalability plots for all specified systems. You might want to adjust its styling etc. according to your preferences. Make sure to also set a filename." + ], + "cell_type": "markdown", + "metadata": {} + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "plt.style.use('ggplot')\n", + "plt.rcParams['axes.facecolor']='w'\n", + "plt.rcParams['axes.edgecolor']='555555'\n", + "#plt.rcParams['ytick.color']='black'\n", + "plt.rcParams['grid.color']='dddddd'\n", + "plt.rcParams['axes.spines.top']='false'\n", + "plt.rcParams['axes.spines.right']='false'\n", + "plt.rcParams['legend.frameon']='true'\n", + "plt.rcParams['legend.framealpha']='1'\n", + "plt.rcParams['legend.edgecolor']='1'\n", + "plt.rcParams['legend.borderpad']='1'\n", + "\n", + "@FuncFormatter\n", + "def load_formatter(x, pos):\n", + " return f'{(x/1000):.0f}k'\n", + "\n", + "markers = ['s', 'D', 'o', 'v', '^', '<', '>', 'p', 'X']\n", + "\n", + "def splitSerToArr(ser):\n", + " return [ser.index, ser.as_matrix()]\n", + "\n", + "plt.figure()\n", + "#plt.figure(figsize=(4.8, 3.6)) # For other plot sizes\n", + "#ax = df.plot(kind='line', marker='o')\n", + "for i, column in enumerate(df):\n", + " plt.plot(df[column].dropna(), marker=markers[i], label=column)\n", + "plt.legend()\n", + "ax = plt.gca()\n", + "#ax = df.plot(kind='line',x='dim_value', legend=False, use_index=True)\n", + "ax.set_ylabel('number of instances')\n", + "ax.set_xlabel('messages/second')\n", + "ax.set_ylim(ymin=0)\n", + "#ax.set_xlim(xmin=0)\n", + "ax.yaxis.set_major_locator(MaxNLocator(integer=True))\n", + "ax.xaxis.set_major_formatter(FuncFormatter(load_formatter))\n", + "\n", + "plt.savefig('temp.pdf', bbox_inches='tight')" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "language_info": { + "name": "python", + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "version": "3.8.5-final" + }, + "orig_nbformat": 2, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "npconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": 3, + "kernelspec": { + "name": "python37064bitvenvvenv6c432ee1239d4f3cb23f871068b0267d", + "display_name": "Python 3.7.0 64-bit ('.venv': venv)", + "language": "python" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} \ No newline at end of file diff --git a/analysis/demand-metric.ipynb b/analysis/demand-metric.ipynb new file mode 100644 index 0000000000000000000000000000000000000000..525bde211afcabeecf52f1e88f3c91c02a77a152 --- /dev/null +++ b/analysis/demand-metric.ipynb @@ -0,0 +1,119 @@ +{ + "cells": [ + { + "source": [ + "# Theodolite Analysis - Demand Metric\n", + "\n", + "This notebook allows applies Theodolite's *demand* metric to describe scalability of a SUT based on Theodolite measurement data.\n", + "\n", + "Theodolite's *demand* metric is a function, mapping load intensities to the minimum required resources (e.g., instances) that are required to process this load. With this notebook, the *demand* metric function is approximated by a map of tested load intensities to their minimum required resources.\n", + "\n", + "The final output when running this notebook will be a CSV file, providig this mapping. It can be used to create nice plots of a system's scalability using the `demand-metric-plot.ipynb` notebook." + ], + "cell_type": "markdown", + "metadata": {} + }, + { + "source": [ + "In the following cell, we need to specifiy:\n", + "\n", + "* `exp_id`: The experiment id that is to be analyzed.\n", + "* `warmup_sec`: The number of seconds which are to be ignored in the beginning of each experiment.\n", + "* `max_lag_trend_slope`: The maximum tolerable increase in queued messages per second.\n", + "* `measurement_dir`: The directory where the measurement data files are to be found.\n", + "* `results_dir`: The directory where the computed demand CSV files are to be stored." + ], + "cell_type": "markdown", + "metadata": {} + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "exp_id = 200\n", + "warmup_sec = 60\n", + "max_lag_trend_slope = 2000\n", + "measurement_dir = '<path-to>/measurements'\n", + "results_dir = '<path-to>/results'\n" + ] + }, + { + "source": [ + "With the following call, we compute our demand mapping." + ], + "cell_type": "markdown", + "metadata": {} + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from src.demand import demand\n", + "\n", + "demand = demand(exp_id, measurement_dir, max_lag_trend_slope, warmup_sec)" + ] + }, + { + "source": [ + "We might already want to plot a simple visualization here:" + ], + "cell_type": "markdown", + "metadata": {} + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "demand.plot(kind='line',x='load',y='resources')" + ] + }, + { + "source": [ + "Finally we store the results in a CSV file." + ], + "cell_type": "markdown", + "metadata": {} + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "\n", + "demand.to_csv(os.path.join(results_dir, f'exp{exp_id}_demand.csv'), index=False)" + ] + } + ], + "metadata": { + "language_info": { + "name": "python", + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "version": "3.8.5-final" + }, + "orig_nbformat": 2, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "npconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": 3, + "kernelspec": { + "name": "python37064bitvenvvenv6c432ee1239d4f3cb23f871068b0267d", + "display_name": "Python 3.7.0 64-bit ('.venv': venv)", + "language": "python" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} \ No newline at end of file diff --git a/analysis/scalability-graph.ipynb b/analysis/scalability-graph.ipynb index 868f950dfea091b8fd6dbc78dc4b7471086c8947..8e4b3bd99ef032b75826535eaebd2b435ccf0881 100644 --- a/analysis/scalability-graph.ipynb +++ b/analysis/scalability-graph.ipynb @@ -245,7 +245,7 @@ "metadata": {}, "outputs": [], "source": [ - "min_suitable_instances.to_csv(os.path.join(directory_out, f'../results-inst/exp{exp_id}_min-suitable-instances.csv'), index=False)" + "min_suitable_instances.to_csv(os.path.join(directory_out, f'exp{exp_id}_min-suitable-instances.csv'), index=False)" ] }, { diff --git a/analysis/src/demand.py b/analysis/src/demand.py new file mode 100644 index 0000000000000000000000000000000000000000..dfb20c05af8e9a134eedd2cdb584c961a82369f5 --- /dev/null +++ b/analysis/src/demand.py @@ -0,0 +1,59 @@ +import os +from datetime import datetime, timedelta, timezone +import pandas as pd +from sklearn.linear_model import LinearRegression + +def demand(exp_id, directory, threshold, warmup_sec): + raw_runs = [] + + # Compute SL, i.e., lag trend, for each tested configuration + filenames = [filename for filename in os.listdir(directory) if filename.startswith(f"exp{exp_id}") and filename.endswith("totallag.csv")] + for filename in filenames: + #print(filename) + run_params = filename[:-4].split("_") + dim_value = run_params[2] + instances = run_params[3] + + df = pd.read_csv(os.path.join(directory, filename)) + #input = df.loc[df['topic'] == "input"] + input = df + #print(input) + input['sec_start'] = input.loc[0:, 'timestamp'] - input.iloc[0]['timestamp'] + #print(input) + #print(input.iloc[0, 'timestamp']) + regress = input.loc[input['sec_start'] >= warmup_sec] # Warm-Up + #regress = input + + #input.plot(kind='line',x='timestamp',y='value',color='red') + #plt.show() + + X = regress.iloc[:, 2].values.reshape(-1, 1) # values converts it into a numpy array + Y = regress.iloc[:, 3].values.reshape(-1, 1) # -1 means that calculate the dimension of rows, but have 1 column + linear_regressor = LinearRegression() # create object for the class + linear_regressor.fit(X, Y) # perform linear regression + Y_pred = linear_regressor.predict(X) # make predictions + + trend_slope = linear_regressor.coef_[0][0] + #print(linear_regressor.coef_) + + row = {'load': int(dim_value), 'resources': int(instances), 'trend_slope': trend_slope} + #print(row) + raw_runs.append(row) + + runs = pd.DataFrame(raw_runs) + + # Set suitable = True if SLOs are met, i.e., lag trend is below threshold + runs["suitable"] = runs.apply(lambda row: row['trend_slope'] < threshold, axis=1) + + # Sort results table (unsure if required) + runs.columns = runs.columns.str.strip() + runs.sort_values(by=["load", "resources"]) + + # Filter only suitable configurations + filtered = runs[runs.apply(lambda x: x['suitable'], axis=1)] + + # Compute demand per load intensity + grouped = filtered.groupby(['load'])['resources'].min() + demand_per_load = grouped.to_frame().reset_index() + + return demand_per_load diff --git a/benchmarks/application-kafkastreams-commons/src/main/java/theodolite/commons/kafkastreams/KafkaStreamsBuilder.java b/benchmarks/application-kafkastreams-commons/src/main/java/theodolite/commons/kafkastreams/KafkaStreamsBuilder.java index ef1ece3549b1aabf60a4ff5b15028b7e50288cd9..89bd3147f0d3bb7a5fecc5d8c7d277bd294494ad 100644 --- a/benchmarks/application-kafkastreams-commons/src/main/java/theodolite/commons/kafkastreams/KafkaStreamsBuilder.java +++ b/benchmarks/application-kafkastreams-commons/src/main/java/theodolite/commons/kafkastreams/KafkaStreamsBuilder.java @@ -110,14 +110,15 @@ public abstract class KafkaStreamsBuilder { * * @return A {@code Topology} for a {@code KafkaStreams} application. */ - protected abstract Topology buildTopology(); + protected abstract Topology buildTopology(Properties properties); /** * Builds the {@link KafkaStreams} instance. */ public KafkaStreams build() { // Create the Kafka streams instance. - return new KafkaStreams(this.buildTopology(), this.buildProperties()); + final Properties properties = this.buildProperties(); + return new KafkaStreams(this.buildTopology(properties), properties); } } diff --git a/benchmarks/uc1-application/src/main/java/theodolite/uc1/streamprocessing/TopologyBuilder.java b/benchmarks/uc1-application/src/main/java/theodolite/uc1/streamprocessing/TopologyBuilder.java index 1c30e0c2c83b3d8a2f3dca4df0c7aec99cc4f450..75c833aa722654395b1adc6f739395eea5256820 100644 --- a/benchmarks/uc1-application/src/main/java/theodolite/uc1/streamprocessing/TopologyBuilder.java +++ b/benchmarks/uc1-application/src/main/java/theodolite/uc1/streamprocessing/TopologyBuilder.java @@ -1,6 +1,7 @@ package theodolite.uc1.streamprocessing; import com.google.gson.Gson; +import java.util.Properties; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.Topology; @@ -36,7 +37,7 @@ public class TopologyBuilder { /** * Build the {@link Topology} for the History microservice. */ - public Topology build() { + public Topology build(final Properties properties) { this.builder .stream(this.inputTopic, Consumed.with( Serdes.String(), @@ -44,6 +45,6 @@ public class TopologyBuilder { .mapValues(v -> this.gson.toJson(v)) .foreach((k, v) -> LOGGER.info("Key: " + k + " Value: " + v)); - return this.builder.build(); + return this.builder.build(properties); } } diff --git a/benchmarks/uc1-application/src/main/java/theodolite/uc1/streamprocessing/Uc1KafkaStreamsBuilder.java b/benchmarks/uc1-application/src/main/java/theodolite/uc1/streamprocessing/Uc1KafkaStreamsBuilder.java index 14335282863bff5a170716b228ea363e3d739685..cc39bb04623c06a4d41cb2c695804ed41818a67c 100644 --- a/benchmarks/uc1-application/src/main/java/theodolite/uc1/streamprocessing/Uc1KafkaStreamsBuilder.java +++ b/benchmarks/uc1-application/src/main/java/theodolite/uc1/streamprocessing/Uc1KafkaStreamsBuilder.java @@ -1,6 +1,7 @@ package theodolite.uc1.streamprocessing; import java.util.Objects; +import java.util.Properties; import org.apache.commons.configuration2.Configuration; import org.apache.kafka.streams.Topology; import theodolite.commons.kafkastreams.KafkaStreamsBuilder; @@ -16,9 +17,9 @@ public class Uc1KafkaStreamsBuilder extends KafkaStreamsBuilder { } @Override - protected Topology buildTopology() { + protected Topology buildTopology(final Properties properties) { Objects.requireNonNull(this.inputTopic, "Input topic has not been set."); return new TopologyBuilder(this.inputTopic, - new SchemaRegistryAvroSerdeFactory(this.schemaRegistryUrl)).build(); + new SchemaRegistryAvroSerdeFactory(this.schemaRegistryUrl)).build(properties); } } diff --git a/benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/TopologyBuilder.java b/benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/TopologyBuilder.java index 6a254d9b75ae3d9d39cf9dd887f6d4fccb6119c4..74e9bb99b80efec4c27d7eb50668d622a5d951f9 100644 --- a/benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/TopologyBuilder.java +++ b/benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/TopologyBuilder.java @@ -1,6 +1,7 @@ package theodolite.uc2.streamprocessing; import java.time.Duration; +import java.util.Properties; import java.util.Set; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KeyValue; @@ -59,9 +60,9 @@ public class TopologyBuilder { final Duration emitPeriod, final Duration gracePeriod, final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory) { this.inputTopic = inputTopic; + this.outputTopic = outputTopic; this.feedbackTopic = feedbackTopic; this.configurationTopic = configurationTopic; - this.outputTopic = outputTopic; this.emitPeriod = emitPeriod; this.gracePeriod = gracePeriod; @@ -71,7 +72,7 @@ public class TopologyBuilder { /** * Build the {@link Topology} for the Aggregation microservice. */ - public Topology build() { + public Topology build(final Properties properties) { // 1. Build Parent-Sensor Table final KTable<String, Set<String>> parentSensorTable = this.buildParentSensorTable(); @@ -92,7 +93,7 @@ public class TopologyBuilder { // 5. Expose Aggregations Stream this.exposeOutputStream(aggregations); - return this.builder.build(); + return this.builder.build(properties); } private KTable<String, ActivePowerRecord> buildInputTable() { diff --git a/benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/Uc2KafkaStreamsBuilder.java b/benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/Uc2KafkaStreamsBuilder.java index 1a606ee3df5e6ac2f43b650afe4a9aed036df9cd..7e077b101c0e1bfab359fc347ffe8c4acc9b88fc 100644 --- a/benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/Uc2KafkaStreamsBuilder.java +++ b/benchmarks/uc2-application/src/main/java/theodolite/uc2/streamprocessing/Uc2KafkaStreamsBuilder.java @@ -2,6 +2,7 @@ package theodolite.uc2.streamprocessing; import java.time.Duration; import java.util.Objects; +import java.util.Properties; import org.apache.commons.configuration2.Configuration; import org.apache.kafka.streams.Topology; import theodolite.commons.kafkastreams.KafkaStreamsBuilder; @@ -51,7 +52,7 @@ public class Uc2KafkaStreamsBuilder extends KafkaStreamsBuilder { // NOPMD build } @Override - protected Topology buildTopology() { + protected Topology buildTopology(final Properties properties) { Objects.requireNonNull(this.inputTopic, "Input topic has not been set."); Objects.requireNonNull(this.feedbackTopic, "Feedback topic has not been set."); Objects.requireNonNull(this.outputTopic, "Output topic has not been set."); @@ -59,14 +60,14 @@ public class Uc2KafkaStreamsBuilder extends KafkaStreamsBuilder { // NOPMD build final TopologyBuilder topologyBuilder = new TopologyBuilder( this.inputTopic, - this.feedbackTopic, this.outputTopic, + this.feedbackTopic, this.configurationTopic, this.emitPeriod == null ? EMIT_PERIOD_DEFAULT : this.emitPeriod, this.gracePeriod == null ? GRACE_PERIOD_DEFAULT : this.gracePeriod, new SchemaRegistryAvroSerdeFactory(this.schemaRegistryUrl)); - return topologyBuilder.build(); + return topologyBuilder.build(properties); } } diff --git a/benchmarks/uc3-application/src/main/java/theodolite/uc3/streamprocessing/TopologyBuilder.java b/benchmarks/uc3-application/src/main/java/theodolite/uc3/streamprocessing/TopologyBuilder.java index 74eed74c52a78df229c02542bc6e66d7f796c2c7..d6d6d4ffb7ebb1236be73dd681c900311853e732 100644 --- a/benchmarks/uc3-application/src/main/java/theodolite/uc3/streamprocessing/TopologyBuilder.java +++ b/benchmarks/uc3-application/src/main/java/theodolite/uc3/streamprocessing/TopologyBuilder.java @@ -2,6 +2,7 @@ package theodolite.uc3.streamprocessing; import com.google.common.math.Stats; import java.time.Duration; +import java.util.Properties; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; @@ -46,7 +47,7 @@ public class TopologyBuilder { /** * Build the {@link Topology} for the History microservice. */ - public Topology build() { + public Topology build(final Properties properties) { this.builder .stream(this.inputTopic, Consumed.with(Serdes.String(), @@ -68,6 +69,6 @@ public class TopologyBuilder { .peek((k, v) -> LOGGER.info(k + ": " + v)) .to(this.outputTopic, Produced.with(Serdes.String(), Serdes.String())); - return this.builder.build(); + return this.builder.build(properties); } } diff --git a/benchmarks/uc3-application/src/main/java/theodolite/uc3/streamprocessing/Uc3KafkaStreamsBuilder.java b/benchmarks/uc3-application/src/main/java/theodolite/uc3/streamprocessing/Uc3KafkaStreamsBuilder.java index 9ab4ea0a96c663af09008bd5358066ca3f8520ac..70113271a9d3c23499b85c07bf9d0a76db59f820 100644 --- a/benchmarks/uc3-application/src/main/java/theodolite/uc3/streamprocessing/Uc3KafkaStreamsBuilder.java +++ b/benchmarks/uc3-application/src/main/java/theodolite/uc3/streamprocessing/Uc3KafkaStreamsBuilder.java @@ -2,6 +2,7 @@ package theodolite.uc3.streamprocessing; import java.time.Duration; import java.util.Objects; +import java.util.Properties; import org.apache.commons.configuration2.Configuration; import org.apache.kafka.streams.Topology; import theodolite.commons.kafkastreams.KafkaStreamsBuilder; @@ -30,14 +31,14 @@ public class Uc3KafkaStreamsBuilder extends KafkaStreamsBuilder { } @Override - protected Topology buildTopology() { + protected Topology buildTopology(final Properties properties) { Objects.requireNonNull(this.inputTopic, "Input topic has not been set."); Objects.requireNonNull(this.outputTopic, "Output topic has not been set."); Objects.requireNonNull(this.windowDuration, "Window duration has not been set."); final TopologyBuilder topologyBuilder = new TopologyBuilder(this.inputTopic, this.outputTopic, new SchemaRegistryAvroSerdeFactory(this.schemaRegistryUrl), this.windowDuration); - return topologyBuilder.build(); + return topologyBuilder.build(properties); } } diff --git a/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/TopologyBuilder.java b/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/TopologyBuilder.java index a92abae6e11c4bf66a5d8d8dee0f10b088e8274b..a0c87ba4702b9c3f191291a3f04679cc73fcb04b 100644 --- a/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/TopologyBuilder.java +++ b/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/TopologyBuilder.java @@ -5,6 +5,7 @@ import java.time.Duration; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneId; +import java.util.Properties; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KeyValue; @@ -54,7 +55,7 @@ public class TopologyBuilder { /** * Build the {@link Topology} for the History microservice. */ - public Topology build() { + public Topology build(final Properties properties) { final StatsKeyFactory<HourOfDayKey> keyFactory = new HourOfDayKeyFactory(); final Serde<HourOfDayKey> keySerde = HourOfDayKeySerde.create(); @@ -89,6 +90,6 @@ public class TopologyBuilder { Serdes.String())); // this.serdes.avroValues())); - return this.builder.build(); + return this.builder.build(properties); } } diff --git a/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/Uc4KafkaStreamsBuilder.java b/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/Uc4KafkaStreamsBuilder.java index bbbb043119857612b1a8b0c60e3a5466cd68447e..67c652967194f59db560b8ad6fd86410725b3c9c 100644 --- a/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/Uc4KafkaStreamsBuilder.java +++ b/benchmarks/uc4-application/src/main/java/theodolite/uc4/streamprocessing/Uc4KafkaStreamsBuilder.java @@ -2,6 +2,7 @@ package theodolite.uc4.streamprocessing; import java.time.Duration; import java.util.Objects; +import java.util.Properties; import org.apache.commons.configuration2.Configuration; import org.apache.kafka.streams.Topology; import theodolite.commons.kafkastreams.KafkaStreamsBuilder; @@ -36,7 +37,7 @@ public class Uc4KafkaStreamsBuilder extends KafkaStreamsBuilder { } @Override - protected Topology buildTopology() { + protected Topology buildTopology(final Properties properties) { Objects.requireNonNull(this.inputTopic, "Input topic has not been set."); Objects.requireNonNull(this.outputTopic, "Output topic has not been set."); Objects.requireNonNull(this.aggregtionDuration, "Aggregation duration has not been set."); @@ -49,7 +50,7 @@ public class Uc4KafkaStreamsBuilder extends KafkaStreamsBuilder { this.aggregtionDuration, this.aggregationAdvance); - return topologyBuilder.build(); + return topologyBuilder.build(properties); } } diff --git a/execution/.gitlab-ci.yml b/execution/.gitlab-ci.yml new file mode 100644 index 0000000000000000000000000000000000000000..5577de7a083708a6bb9b83571f458e2c1fbfb340 --- /dev/null +++ b/execution/.gitlab-ci.yml @@ -0,0 +1,61 @@ +stages: + - deploy + +deploy: + stage: deploy + tags: + - exec-dind + image: docker:19.03.1 + services: + - docker:19.03.1-dind + variables: + DOCKER_TLS_CERTDIR: "/certs" + script: + - DOCKER_TAG_NAME=$(echo $CI_COMMIT_REF_SLUG- | sed 's/^master-$//') + - docker build --pull -t theodolite ./execution + - "[ ! $CI_COMMIT_TAG ] && docker tag theodolite $DOCKERHUB_ORG/theodolite:${DOCKER_TAG_NAME}latest" + - "[ ! $CI_COMMIT_TAG ] && docker tag theodolite $DOCKERHUB_ORG/theodolite:$DOCKER_TAG_NAME$CI_COMMIT_SHORT_SHA" + - "[ $CI_COMMIT_TAG ] && docker tag theodolite $DOCKERHUB_ORG/theodolite:$CI_COMMIT_TAG" + - echo $DOCKERHUB_PW | docker login -u $DOCKERHUB_ID --password-stdin + - docker push $DOCKERHUB_ORG/theodolite + - docker logout + rules: + - if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW && $CI_COMMIT_TAG" + when: always + - changes: + - execution/**/* + if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW" + when: always + - if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW" + when: manual + allow_failure: true + +deploy-ghcr: + stage: deploy + tags: + - exec-dind + image: docker:19.03.1 + services: + - docker:19.03.1-dind + variables: + DOCKER_TLS_CERTDIR: "/certs" + script: + - DOCKER_TAG_NAME=$(echo $CI_COMMIT_REF_SLUG- | sed 's/^master-$//') + - docker build --pull -t theodolite ./execution + - "[ ! $CI_COMMIT_TAG ] && docker tag theodolite ghcr.io/$GITHUB_CR_ORG/theodolite:${DOCKER_TAG_NAME}latest" + - "[ ! $CI_COMMIT_TAG ] && docker tag theodolite ghcr.io/$GITHUB_CR_ORG/theodolite:$DOCKER_TAG_NAME$CI_COMMIT_SHORT_SHA" + - "[ $CI_COMMIT_TAG ] && docker tag theodolite ghcr.io/$GITHUB_CR_ORG/theodolite:$CI_COMMIT_TAG" + - echo $GITHUB_CR_TOKEN | docker login ghcr.io -u $GITHUB_CR_USER --password-stdin + - docker push ghcr.io/$GITHUB_CR_ORG/theodolite + - docker logout + rules: + - if: "$GITHUB_CR_ORG && $GITHUB_CR_USER && $GITHUB_CR_TOKEN && $CI_COMMIT_TAG" + when: always + - changes: + - execution/**/* + if: "$GITHUB_CR_ORG && $GITHUB_CR_USER && $GITHUB_CR_TOKEN" + when: always + - if: "$GITHUB_CR_ORG && $GITHUB_CR_USER && $GITHUB_CR_TOKEN" + when: manual + allow_failure: true + \ No newline at end of file diff --git a/execution/README.md b/execution/README.md index a77eecd20e0c30a98ee1fb5af4496d653458e59e..358ce270400d1e4e4947a8ef736feac74c314163 100644 --- a/execution/README.md +++ b/execution/README.md @@ -8,45 +8,28 @@ benchmarks](#execution). ## Installation -### Kubernetes Cluster +For executing benchmarks, access to a Kubernetes cluster is required. If you already run other applications inside your +cluster, you might want to consider creating a dedicated namespace for your benchmarks. -For executing benchmarks, access to Kubernetes cluster is required. We suggest -to create a dedicated namespace for executing your benchmarks. The following -services need to be available as well. +### Installing Dependencies -### Kubernetes Volume - -For executing the benchmark as a Kubernetes job it is required to use a volume to store the results of the executions. -In `infrastructure/kubernetes` are two files for creating a volume. -Either one of them should be used. - -The `volumeSingle.yaml` is meant for systems where Kubernetes is run locally (e.g. minikube, kind etc.). -However, you can also use the other file. -In `volumeSingle.yaml` you need to set `path` to the path on your machine where the results should be stored. - -The `volumeCluster.yaml` should be used when Kubernetes runs in the cloud. -In the `nodeAffinity` section you need to exchange `<node-name>` to the name of the node where the volume should be created (this node will most likely execute also the job). -However, you can also set a different `nodeAffinity`. -Further you need to set `path` to the path on the node where the results should be stored. - -After setting the properties you can create the volume with: - -```sh -kubectl apply -f infrastructure/kubernetes/volume(Single|Cluster).yaml -``` +The following third-party services need to be installed in your cluster. For most of them, the suggested way to install +them is via [Helm](https://helm.sh). #### Prometheus We suggest to use the [Prometheus Operator](https://github.com/coreos/prometheus-operator) and create a dedicated Prometheus instance for these benchmarks. -If Prometheus Operator is not already available on your cluster, a convenient -way to install is via the [**unofficial** Prometheus Operator Helm chart](https://github.com/helm/charts/tree/master/stable/prometheus-operator). -As you may not need an entire cluster monitoring stack, you can use our Helm -configuration to only install the operator: +If Prometheus Operator is not already available on your cluster, a convenient way to install it is via the +[Prometheus community Helm chart](https://github.com/prometheus-community/helm-charts/tree/main/charts/kube-prometheus-stack). +As you may not need an entire cluster monitoring stack, you can use our Helm configuration to only install the +operator: ```sh -helm install prometheus-operator stable/prometheus-operator -f infrastructure/prometheus/helm-values.yaml +helm repo add prometheus-community https://prometheus-community.github.io/helm-charts +helm repo update +helm install prometheus-operator prometheus-community/kube-prometheus-stack -f infrastructure/prometheus/helm-values.yaml ``` After installation, you need to create a Prometheus instance: @@ -55,9 +38,17 @@ After installation, you need to create a Prometheus instance: kubectl apply -f infrastructure/prometheus/prometheus.yaml ``` -You might also need to apply the [ServiceAccount](infrastructure/prometheus/service-account.yaml), [ClusterRole](infrastructure/prometheus/cluster-role.yaml) -and the [CusterRoleBinding](infrastructure/prometheus/cluster-role-binding.yaml), -depending on your cluster's security policies. +You might also need to apply the [ClusterRole](infrastructure/prometheus/cluster-role.yaml), the +[CusterRoleBinding](infrastructure/prometheus/cluster-role-binding.yaml) and the +[ServiceAccount](infrastructure/prometheus/service-account.yaml), depending on your cluster's security +policies. If you are not in the *default* namespace, alter the namespace in +[Prometheus' ClusterRoleBinding](infrastructure/prometheus/cluster-role-binding.yaml) accordingly. + +```sh +kubectl apply -f infrastructure/prometheus/cluster-role.yaml +kubectl apply -f infrastructure/prometheus/cluster-role-binding.yaml +kubectl apply -f infrastructure/prometheus/service-account.yaml +``` For the individual benchmarking components to be monitored, [ServiceMonitors](https://github.com/coreos/prometheus-operator#customresourcedefinitions) are used. See the corresponding sections below for how to install them. @@ -68,14 +59,16 @@ As with Prometheus, we suggest to create a dedicated Grafana instance. Grafana with our default configuration can be installed with Helm: ```sh -helm install grafana stable/grafana -f infrastructure/grafana/values.yaml +helm repo add grafana https://grafana.github.io/helm-charts +helm repo update +helm install grafana grafana/grafana -f infrastructure/grafana/values.yaml ``` The official [Grafana Helm Chart repository](https://github.com/helm/charts/tree/master/stable/grafana) provides further documentation including a table of configuration options. -We provide ConfigMaps for a [Grafana dashboard](infrastructure/grafana/dashboard-config-map.yaml) and a [Grafana data source](infrastructure/grafana/prometheus-datasource-config-map.yaml). -Create them as follows: +We provide ConfigMaps for a [Grafana dashboard](infrastructure/grafana/dashboard-config-map.yaml) and a +[Grafana data source](infrastructure/grafana/prometheus-datasource-config-map.yaml). Create them as follows: ```sh kubectl apply -f infrastructure/grafana/dashboard-config-map.yaml @@ -102,6 +95,9 @@ kubectl apply -f infrastructure/kafka/service-monitor.yaml Other Kafka deployments, for example, using Strimzi, should work in a similar way. +*Please note that currently, even if installed differently, the corresponding services must run at +*my-confluent-cp-kafka:9092*, *my-confluent-cp-zookeeper:2181* and *my-confluent-cp-schema-registry:8081*. + #### A Kafka Client Pod A permanently running pod used for Kafka configuration is started via: @@ -128,71 +124,91 @@ To install it: helm install kafka-lag-exporter https://github.com/lightbend/kafka-lag-exporter/releases/download/v0.6.3/kafka-lag-exporter-0.6.3.tgz -f infrastructure/kafka-lag-exporter/values.yaml ``` +### Installing Theodolite -### Python 3.7 (Only required for local Execution Control) +While Theodolite itself has not be installed as it is loaded at runtime (see [execution](#Execution)), it requires some +resources to be deployed in your cluster. These resources are grouped under RBAC and Volume in the following paragraphs. -For executing benchmarks, a **Python 3.7** installation is required. We suggest -to use a virtual environment placed in the `.venv` directory (in the Theodolite -root directory). As set of requirements is needed. You can install them with the following -command (make sure to be in your virtual environment if you use one): +#### Theodolite RBAC + +**The following step is only required if RBAC is enabled in your cluster.** If you are not sure whether this is the +case, you want to simply try it without the following step. + +If RBAC is enabled in your cluster, you have to allow Theodolite to start and stop pods etc. To do so, deploy the RBAC +resources via: ```sh -pip install -r requirements.txt +kubectl apply -f infrastructure/kubernetes/rbac/role.yaml +kubectl apply -f infrastructure/kubernetes/rbac/role-binding.yaml +kubectl apply -f infrastructure/kubernetes/rbac/service-account.yaml ``` +#### Theodolite Volume -### Required Manual Adjustments +In order to persistently store benchmark results, Theodolite needs a volume mounted. We provide pre-configured +declarations for different volume types. -Depending on your setup, some additional adjustments may be necessary: +##### *hostPath* volume -* Change Kafka and Zookeeper servers in the Kubernetes deployments (uc1-application etc.) and `run_XX.sh` scripts -* Change the name of your Kubernetes namespace for [Prometheus' ClusterRoleBinding](infrastructure/prometheus/cluster-role-binding.yaml) -* *Please let us know if there are further adjustments necessary* +Using a [hostPath volume](https://kubernetes.io/docs/concepts/storage/volumes/#hostpath) is the easiest option when +running Theodolite locally, e.g., with minikube or kind. +Just modify `infrastructure/kubernetes/volume-hostpath.yaml` by setting `path` to the directory on your host machine where +all benchmark results should be stored and run: +```sh +kubectl apply -f infrastructure/kubernetes/volume-hostpath.yaml +``` -## Execution +##### *local* volume -You can either execute the Execution Control on your machine or also deploy the Execution control in Kubernetes. +A [local volume](https://kubernetes.io/docs/concepts/storage/volumes/#local) is a simple option to use when having +access (e.g. via SSH) to one of your cluster nodes. -### Local Execution +You first need to create a directory on a selected node where all benchmark results should be stored. Next, modify +`infrastructure/kubernetes/volume-local.yaml` by setting `<node-name>` to your selected node. (This node will most +likely also execute the [Theodolite job](#Execution).) Further, you have to set `path` to the directory on the node you just created. To deploy +you volume run: -Please note that a **Python 3.7** installation is required for executing Theodolite. +```sh +kubectl apply -f infrastructure/kubernetes/volume-local.yaml +``` -The `theodolite.py` is the entrypoint for all benchmark executions. Is has to be called as follows: +##### Other volumes -```python -python theodolite.py --uc <uc> --loads <load> [<load> ...] --instances <instances> [<instances> ...] -``` +To use volumes provided by public cloud providers or network-based file systems, you can use the definitions in +`infrastructure/kubernetes/` as a starting point. See the offical +[volumes documentation](https://kubernetes.io/docs/concepts/storage/volumes/) for additional information. -The command above is the minimal command for execution. -Further configurations options are described [below](#configuration) or available via `python theodolite.py -h` -### Kubernetes Execution +## Execution -The Execution Control will be run by a Kubernetes Job. -This Job creates a pod that will execute the Executuion Control. -To configure the parameters, the `theodolite.yaml` need to be changed. -For the options take a look at [configuration](#configuration). +The preferred way to run scalability benchmarks with Theodolite is to deploy Theodolite +[Kubernetes Jobs](https://kubernetes.io/docs/concepts/workloads/controllers/job/) in your cluster. For running +Theodolite locally on your machine see the description below. -To start the Benchmark the following command need to be executed: -```sh -kubectl apply -f theodolite.yaml -``` +`theodolite.yaml` provides a template for your own Theodolite job. To run your own job, create a copy, give it a name +(`metadata.name`) and adjust configuration parameters as desired. For a description of available configuration options +see the [Configuration](#configuration) section below. Note, that you might uncomment the `serviceAccountName` line if +RBAC is enabled on your cluster (see installation of [Theodolite RBAC](#Theodolite-RBAC)). -With `kubectl logs -f theodolite-<*>` you can show the log of the execution control. +To start the execution of a benchmark run (with `<your-theodolite-yaml>` being your job definition): -When the job is finished, your results should be in your mounted [Kubernetes volume](#kubernetes-volume). -In order to start a new benchmark, the old job needs to be deleted. -This can be done with: ```sh -kubectl delete -f theodolite.yaml +kubectl create -f <your-theodolite-yaml> ``` +This will create a pod with a name such as `your-job-name-xxxxxx`. You can verifiy this via `kubectl get pods`. With +`kubectl logs -f <your-job-name-xxxxxx>`, you can follow the benchmark execution logs. + +Once your job is completed (you can verify via `kubectl get jobs), its results are stored inside your configured +Kubernetes volume. + +**Make sure to always run only one Theodolite job at a time.** ### Configuration -| Python | Kubernetes | Description | +| Command line | Kubernetes | Description | | -------------------- | ------------------- | ------------------------------------------------------------ | | --uc | UC | **[Mandatory]** Stream processing use case to be benchmarked. Has to be one of `1`, `2`, `3` or `4`. | | --loads | LOADS | **[Mandatory]** Values for the workload generator to be tested, should be sorted in ascending order. | @@ -208,6 +224,7 @@ kubectl delete -f theodolite.yaml | --prometheus | PROMETHEUS_BASE_URL | Defines where to find the prometheus instance. *Default:* `http://localhost:9090` | | --path | RESULT_PATH | A directory path for the results. Relative to the Execution folder. *Default:* `results` | | --configurations | CONFIGURATIONS | Defines environment variables for the use cases and, thus, enables further configuration options. | +| --threshold | THRESHOLD | The threshold for the trend slop that the search strategies use to determine that a load could be handled. *Default:* `2000` | ### Domain Restriction @@ -219,8 +236,45 @@ For dimension value, we have a domain of the amounts of instances. As a conseque * If the dimension value is not the smallest dimension value and N is the amount of minimal amount of instances that was suitable for the last smaller dimension value the domain for this dimension value contains all amounts of instances greater than, or equal to N. ### Benchmarking Search Strategies + There are the following benchmarking strategies: * `check-all`: For each dimension value, execute one lag experiment for all amounts of instances within the current domain. * `linear-search`: A heuristic which works as follows: For each dimension value, execute one lag experiment for all number of instances within the current domain. The execution order is from the lowest number of instances to the highest amount of instances and the execution for each dimension value is stopped, when a suitable amount of instances is found or if all lag experiments for the dimension value were not successful. * `binary-search`: A heuristic which works as follows: For each dimension value, execute one lag experiment for all number of instances within the current domain. The execution order is in a binary-search-like manner. The execution is stopped, when a suitable amount of instances is found or if all lag experiments for the dimension value were not successful. + +## Observation + +The installed Grafana instance provides a dashboard to observe the benchmark execution. Unless configured otherwise, +this dashboard can be accessed via `http://<cluster-ip>:31199` or via `http://localhost:31199` if proxied with +`kubectl port-forward svc/grafana 8080:service`. Default credentials are user *admin* with password *admin*. + + +## Local Execution (e.g. for Development) + +As an alternative to executing Theodolite as a Kubernetes Job, it is also possible to run it from your local system, +for example, for development purposes. In addition to the generel installation instructions, the following adjustments +are neccessary. + +### Installation + +For local execution a **Python 3.7** installation is required. We suggest to use a virtual environment placed in the `.venv` +directory (in the Theodolite root directory). A set of requirements is needed. You can install them with the following +command (make sure to be in your virtual environment if you use one): + +```sh +pip install -r requirements.txt +``` + +Kubernetes volumes and service accounts, roles, and role bindings for Theodolite are not required in this case. + +### Local Execution + +The `theodolite.py` is the entrypoint for all benchmark executions. Is has to be called as follows: + +```python +python theodolite.py --uc <uc> --loads <load> [<load> ...] --instances <instances> [<instances> ...] +``` + +This command is the minimal command for execution. Further configurations options are described [above](#configuration) +or available via `python theodolite.py -h`. \ No newline at end of file diff --git a/execution/infrastructure/grafana/values.yaml b/execution/infrastructure/grafana/values.yaml index 211a72a61a2699c7108ec4adb9a7edebbccecb69..562516ad76f9a0f88c0db8557da51178dbbc9871 100644 --- a/execution/infrastructure/grafana/values.yaml +++ b/execution/infrastructure/grafana/values.yaml @@ -11,7 +11,9 @@ adminPassword: admin ## Sidecars that collect the configmaps with specified label and stores the included files them into the respective folders ## Requires at least Grafana 5 to work and can't be used together with parameters dashboardProviders, datasources and dashboards sidecar: - image: kiwigrid/k8s-sidecar:0.1.99 + image: + repository: "kiwigrid/k8s-sidecar" + tag: "1.1.0" imagePullPolicy: IfNotPresent dashboards: enabled: true diff --git a/execution/infrastructure/kafka-lag-exporter/values.yaml b/execution/infrastructure/kafka-lag-exporter/values.yaml index b83a911283a7e8264f982f9eb5d550ad5497ec9d..8e53454345df75b55d5d36799dd0b0f0f75233a0 100644 --- a/execution/infrastructure/kafka-lag-exporter/values.yaml +++ b/execution/infrastructure/kafka-lag-exporter/values.yaml @@ -1,3 +1,6 @@ +image: + pullPolicy: IfNotPresent + clusters: - name: "my-confluent-cp-kafka" bootstrapBrokers: "my-confluent-cp-kafka:9092" diff --git a/execution/infrastructure/kafka/values.yaml b/execution/infrastructure/kafka/values.yaml index 1efbda0515d0a9c881552cb63293ca8cc28c98b2..e65a5fc567d39c7389479d406fa9e6d7156b0f0a 100644 --- a/execution/infrastructure/kafka/values.yaml +++ b/execution/infrastructure/kafka/values.yaml @@ -55,6 +55,7 @@ cp-kafka: # "min.insync.replicas": 2 "auto.create.topics.enable": false "log.retention.ms": "10000" # 10s + #"log.retention.ms": "86400000" # 24h "metrics.sample.window.ms": "5000" #5s ## ------------------------------------------------------ diff --git a/execution/infrastructure/kubernetes/volumeSingle.yaml b/execution/infrastructure/kubernetes/volume-hostpath.yaml similarity index 100% rename from execution/infrastructure/kubernetes/volumeSingle.yaml rename to execution/infrastructure/kubernetes/volume-hostpath.yaml diff --git a/execution/infrastructure/kubernetes/volumeCluster.yaml b/execution/infrastructure/kubernetes/volume-local.yaml similarity index 100% rename from execution/infrastructure/kubernetes/volumeCluster.yaml rename to execution/infrastructure/kubernetes/volume-local.yaml diff --git a/execution/lib/cli_parser.py b/execution/lib/cli_parser.py index eaebaa6cc99959bc8a41e50f3d6a63acaf5ab817..de609bc55e21e9467a2b28168be6e478171cfddd 100644 --- a/execution/lib/cli_parser.py +++ b/execution/lib/cli_parser.py @@ -136,6 +136,11 @@ def benchmark_parser(description): metavar='<strategy>', default=os.environ.get('SEARCH_STRATEGY', 'default'), help='The benchmarking search strategy. Can be set to default, linear-search or binary-search') + parser.add_argument('--threshold', + type=int, + metavar='<threshold>', + default=os.environ.get('THRESHOLD', 2000), + help='The threshold for the trend slop that the search strategies use to determine that a load could be handled') return parser diff --git a/execution/lib/trend_slope_computer.py b/execution/lib/trend_slope_computer.py index 294226c35c0038a01804f7f5e8eb3a1e53c79b79..90ae26cfd275f53307e19532f047e5e0a9326d3a 100644 --- a/execution/lib/trend_slope_computer.py +++ b/execution/lib/trend_slope_computer.py @@ -2,7 +2,7 @@ from sklearn.linear_model import LinearRegression import pandas as pd import os -def compute(directory, filename, warmup_sec, threshold): +def compute(directory, filename, warmup_sec): df = pd.read_csv(os.path.join(directory, filename)) input = df input['sec_start'] = input.loc[0:, 'timestamp'] - input.iloc[0]['timestamp'] @@ -16,4 +16,4 @@ def compute(directory, filename, warmup_sec, threshold): trend_slope = linear_regressor.coef_[0][0] - return trend_slope \ No newline at end of file + return trend_slope diff --git a/execution/run_uc.py b/execution/run_uc.py index 56d6d0fcf4f2f6edd49c6692e6e9de0bbbfefb47..a0fcdbb6d57e5dc67d18e69b7d07fcdbfa809307 100644 --- a/execution/run_uc.py +++ b/execution/run_uc.py @@ -282,8 +282,16 @@ def run_evaluation(exp_id, uc_id, dim_value, instances, execution_minutes, prome :param int execution_minutes: How long the use case where executed. """ print('Run evaluation function') - lag_analysis.main(exp_id, f'uc{uc_id}', dim_value, instances, - execution_minutes, prometheus_base_url, result_path) + try: + lag_analysis.main(exp_id, f'uc{uc_id}', dim_value, instances, + execution_minutes, prometheus_base_url, + result_path) + except Exception as e: + err_msg = 'Evaluation function failed' + print(err_msg) + logging.exception(err_msg) + print('Benchmark execution continues') + return diff --git a/execution/strategies/config.py b/execution/strategies/config.py index c3cd1ff82c4926f5efcc741b027996dbc800916b..d4df97c18ae54c7c181ddf08264c013f9447350f 100644 --- a/execution/strategies/config.py +++ b/execution/strategies/config.py @@ -18,5 +18,6 @@ class ExperimentConfig: configurations: dict domain_restriction_strategy: object search_strategy: object + threshold: int subexperiment_executor: object subexperiment_evaluator: object diff --git a/execution/strategies/strategies/search/binary_search_strategy.py b/execution/strategies/strategies/search/binary_search_strategy.py index 8856ead0502279f8f8642da87cf56f794cb1b11c..46748cbda250597b3a7644522126268be4599293 100644 --- a/execution/strategies/strategies/search/binary_search_strategy.py +++ b/execution/strategies/strategies/search/binary_search_strategy.py @@ -7,8 +7,9 @@ def binary_search(config, dim_value, lower, upper, subexperiment_counter): print(f"Run subexperiment {subexperiment_counter} with config {dim_value} {config.replicass[lower]}") subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[lower], config.partitions, config.cpu_limit, config.memory_limit, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path, config.configurations) config.subexperiment_executor.execute(subexperiment_config) - result = config.subexperiment_evaluator.execute(subexperiment_config) - if result==1: # successful, the upper neighbor is assumed to also has been successful + success = config.subexperiment_evaluator.execute(subexperiment_config, + config.threshold) + if success: # successful, the upper neighbor is assumed to also has been successful return (lower, subexperiment_counter+1) else: # not successful return (lower+1, subexperiment_counter) @@ -16,15 +17,17 @@ def binary_search(config, dim_value, lower, upper, subexperiment_counter): print(f"Run subexperiment {subexperiment_counter} with config {dim_value} {config.replicass[lower]}") subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[lower], config.partitions, config.cpu_limit, config.memory_limit, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path, config.configurations) config.subexperiment_executor.execute(subexperiment_config) - result = config.subexperiment_evaluator.execute(subexperiment_config) - if result==1: # minimal instances found + success = config.subexperiment_evaluator.execute(subexperiment_config, + config.threshold) + if success: # minimal instances found return (lower, subexperiment_counter) else: # not successful, check if lower+1 instances are sufficient print(f"Run subexperiment {subexperiment_counter} with config {dim_value} {config.replicass[upper]}") subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[upper], config.partitions, config.cpu_limit, config.memory_limit, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path, config.configurations) config.subexperiment_executor.execute(subexperiment_config) - result = config.subexperiment_evaluator.execute(subexperiment_config) - if result == 1: # minimal instances found + success = config.subexperiment_evaluator.execute(subexperiment_config, + config.threshold) + if success: # minimal instances found return (upper, subexperiment_counter) else: return (upper+1, subexperiment_counter) @@ -34,8 +37,9 @@ def binary_search(config, dim_value, lower, upper, subexperiment_counter): print(f"Run subexperiment {subexperiment_counter} with config {dim_value} {config.replicass[mid]}") subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[mid], config.partitions, config.cpu_limit, config.memory_limit, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path, config.configurations) config.subexperiment_executor.execute(subexperiment_config) - result = config.subexperiment_evaluator.execute(subexperiment_config) - if result == 1: # success -> search in (lower, mid-1) + success = config.subexperiment_evaluator.execute(subexperiment_config, + config.threshold) + if success: # success -> search in (lower, mid-1) return binary_search(config, dim_value, lower, mid-1, subexperiment_counter+1) else: # not success -> search in (mid+1, upper) return binary_search(config, dim_value, mid+1, upper, subexperiment_counter+1) diff --git a/execution/strategies/strategies/search/check_all_strategy.py b/execution/strategies/strategies/search/check_all_strategy.py index 8e9d6c3ca0924d724c4f55032ebc24a92bc3ad93..0861945113b829fa79317d8a1a6312b4d6e4f71d 100644 --- a/execution/strategies/strategies/search/check_all_strategy.py +++ b/execution/strategies/strategies/search/check_all_strategy.py @@ -2,23 +2,30 @@ import os from strategies.strategies.config import SubexperimentConfig + def execute(config, dim_value_index, lower_replicas_bound_index, subexperiment_counter): - new_lower_replicas_bound_index=lower_replicas_bound_index - new_lower_replicas_bound_found=False - subexperiments_total=len(config.dim_values)*len(config.replicass) + new_lower_replicas_bound_index = lower_replicas_bound_index + new_lower_replicas_bound_found = False + subexperiments_total = len(config.dim_values) * len(config.replicass) while lower_replicas_bound_index < len(config.replicass): - subexperiment_counter+=1 - dim_value=config.dim_values[dim_value_index] - replicas=config.replicass[lower_replicas_bound_index] - print(f"Run subexperiment {subexperiment_counter} of {subexperiments_total} with dimension value {dim_value} and {replicas} replicas.") + subexperiment_counter += 1 + dim_value = config.dim_values[dim_value_index] + replicas = config.replicass[lower_replicas_bound_index] + print( + f"Run subexperiment {subexperiment_counter} of {subexperiments_total} with dimension value {dim_value} and {replicas} replicas.") - subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, replicas, config.partitions, config.cpu_limit, config.memory_limit, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path, config.configurations) + subexperiment_config = SubexperimentConfig( + config.use_case, config.exp_id, subexperiment_counter, dim_value, + replicas, config.partitions, config.cpu_limit, config.memory_limit, + config.execution_minutes, config.prometheus_base_url, config.reset, + config.namespace, config.result_path, config.configurations) config.subexperiment_executor.execute(subexperiment_config) - result = config.subexperiment_evaluator.execute(subexperiment_config) == 1 - if result == 1 and not new_lower_replicas_bound_found: + success = config.subexperiment_evaluator.execute(subexperiment_config, + config.threshold) + if success and not new_lower_replicas_bound_found: new_lower_replicas_bound_found = True new_lower_replicas_bound_index = lower_replicas_bound_index - lower_replicas_bound_index+=1 + lower_replicas_bound_index += 1 return (new_lower_replicas_bound_index, subexperiment_counter) diff --git a/execution/strategies/strategies/search/linear_search_strategy.py b/execution/strategies/strategies/search/linear_search_strategy.py index f2436658eec0bd4160259a09c272def40fbc130c..8e777303742e54cf2a11a1bde60e95b8aa85489d 100644 --- a/execution/strategies/strategies/search/linear_search_strategy.py +++ b/execution/strategies/strategies/search/linear_search_strategy.py @@ -14,8 +14,9 @@ def execute(config, dim_value_index, lower_replicas_bound_index, subexperiment_c subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, replicas, config.partitions, config.cpu_limit, config.memory_limit, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path, config.configurations) config.subexperiment_executor.execute(subexperiment_config) - result = config.subexperiment_evaluator.execute(subexperiment_config) - if result == 1: + success = config.subexperiment_evaluator.execute(subexperiment_config, + config.threshold) + if success: return (lower_replicas_bound_index, subexperiment_counter) else: lower_replicas_bound_index+=1 diff --git a/execution/strategies/subexperiment_evaluation/subexperiment_evaluator.py b/execution/strategies/subexperiment_evaluation/subexperiment_evaluator.py index 4e46d2d6ccabb601d9df373a540d23e73d60be28..30188de837746b76113ec635ca77fadc3a91cb92 100644 --- a/execution/strategies/subexperiment_evaluation/subexperiment_evaluator.py +++ b/execution/strategies/subexperiment_evaluation/subexperiment_evaluator.py @@ -1,17 +1,29 @@ -import os -import sys -import os import lib.trend_slope_computer as trend_slope_computer +import logging +import os -THRESHOLD = 2000 WARMUP_SEC = 60 -def execute(config): +def execute(config, threshold): + """ + Check the trend slope of the totallag of the subexperiment if it comes below + the threshold. + + :param config: Configuration of the subexperiment. + :param threshold: The threshold the trendslope need to come below. + """ cwd = f'{os.getcwd()}/{config.result_path}' file = f"exp{config.exp_id}_uc{config.use_case}_{config.dim_value}_{config.replicas}_totallag.csv" - trend_slope = trend_slope_computer.compute(cwd, file, WARMUP_SEC, THRESHOLD) + try: + trend_slope = trend_slope_computer.compute(cwd, file, WARMUP_SEC) + except Exception as e: + err_msg = 'Computing trend slope failed' + print(err_msg) + logging.exception(err_msg) + print('Mark this subexperiment as not successful and continue benchmark') + return False print(f"Trend Slope: {trend_slope}") - success = 0 if trend_slope > THRESHOLD else 1 - return success + + return trend_slope < threshold diff --git a/execution/theodolite.py b/execution/theodolite.py index 93717a2c90d874a90b11a4d3c6273b5ca1702687..bd273c4405e2a406b5b5537e084957625c19aa96 100755 --- a/execution/theodolite.py +++ b/execution/theodolite.py @@ -31,8 +31,8 @@ def load_variables(): def main(uc, loads, instances_list, partitions, cpu_limit, memory_limit, - duration, domain_restriction, search_strategy, prometheus_base_url, - reset, namespace, result_path, configurations): + duration, domain_restriction, search_strategy, threshold, + prometheus_base_url, reset, namespace, result_path, configurations): print( f"Domain restriction of search space activated: {domain_restriction}") @@ -107,6 +107,7 @@ def main(uc, loads, instances_list, partitions, cpu_limit, memory_limit, result_path=result_path, domain_restriction_strategy=domain_restriction_strategy, search_strategy=search_strategy_method, + threshold=threshold, subexperiment_executor=subexperiment_executor, subexperiment_evaluator=subexperiment_evaluator) @@ -119,10 +120,11 @@ if __name__ == '__main__': args = load_variables() if args.reset_only: print('Only reset the cluster') - run_uc.main(None, None, None, None, None, None, None, None, - None, None, args.namespace, None, None, reset_only=True) + run_uc.main(None, None, None, None, None, None, None, None, None, + None, args.namespace, None, None, reset_only=True) else: main(args.uc, args.loads, args.instances_list, args.partitions, args.cpu_limit, args.memory_limit, args.duration, - args.domain_restriction, args.search_strategy, args.prometheus, - args.reset, args.namespace, args.path, args.configurations) + args.domain_restriction, args.search_strategy, + args.threshold, args.prometheus, args.reset, args.namespace, + args.path, args.configurations) diff --git a/execution/theodolite.yaml b/execution/theodolite.yaml index 68d53386bcf5e77ce08d964f3c04eb000794575c..06d14a0f589b2ac7a16ebaaae4d1490b840ea57b 100644 --- a/execution/theodolite.yaml +++ b/execution/theodolite.yaml @@ -11,7 +11,7 @@ spec: claimName: theodolite-pv-claim containers: - name: theodolite - image: bvonheid/theodolite:latest + image: ghcr.io/cau-se/theodolite:latest # imagePullPolicy: Never # Used to pull "own" local image env: - name: UC # mandatory