1from __future__ import annotations
  2
  3from typing import TYPE_CHECKING, Any
  4
  5from plain.models.backends.base.schema import BaseDatabaseSchemaEditor
  6from plain.models.constants import LOOKUP_SEP
  7from plain.models.constraints import UniqueConstraint
  8from plain.models.expressions import F
  9from plain.models.fields import NOT_PROVIDED, DbParameters
 10from plain.models.fields.related import ForeignKeyField
 11
 12if TYPE_CHECKING:
 13    from collections.abc import Sequence
 14
 15    from plain.models.backends.mysql.base import MySQLDatabaseWrapper
 16    from plain.models.base import Model
 17    from plain.models.constraints import BaseConstraint
 18    from plain.models.fields import Field
 19    from plain.models.indexes import Index
 20
 21
 22class DatabaseSchemaEditor(BaseDatabaseSchemaEditor):
 23    # Type checker hint: connection is always MySQLDatabaseWrapper in this class
 24    connection: MySQLDatabaseWrapper
 25
 26    sql_rename_table = "RENAME TABLE %(old_table)s TO %(new_table)s"
 27
 28    sql_alter_column_null = "MODIFY %(column)s %(type)s NULL"
 29    sql_alter_column_not_null = "MODIFY %(column)s %(type)s NOT NULL"
 30    sql_alter_column_type = "MODIFY %(column)s %(type)s"
 31    sql_alter_column_no_default_null = "ALTER COLUMN %(column)s SET DEFAULT NULL"
 32
 33    # No 'CASCADE' which works as a no-op in MySQL but is undocumented
 34    sql_delete_column = "ALTER TABLE %(table)s DROP COLUMN %(column)s"
 35
 36    sql_delete_unique = "ALTER TABLE %(table)s DROP INDEX %(name)s"
 37    sql_create_column_inline_fk = (
 38        ", ADD CONSTRAINT %(name)s FOREIGN KEY (%(column)s) "
 39        "REFERENCES %(to_table)s(%(to_column)s)"
 40    )
 41    sql_delete_fk = "ALTER TABLE %(table)s DROP FOREIGN KEY %(name)s"
 42
 43    sql_delete_index = "DROP INDEX %(name)s ON %(table)s"
 44    sql_rename_index = "ALTER TABLE %(table)s RENAME INDEX %(old_name)s TO %(new_name)s"
 45
 46    sql_create_pk = (
 47        "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s PRIMARY KEY (%(columns)s)"
 48    )
 49    sql_delete_pk = "ALTER TABLE %(table)s DROP PRIMARY KEY"
 50
 51    sql_create_index = "CREATE INDEX %(name)s ON %(table)s (%(columns)s)%(extra)s"
 52
 53    @property
 54    def sql_delete_check(self) -> str:
 55        if self.connection.mysql_is_mariadb:
 56            # The name of the column check constraint is the same as the field
 57            # name on MariaDB. Adding IF EXISTS clause prevents migrations
 58            # crash. Constraint is removed during a "MODIFY" column statement.
 59            return "ALTER TABLE %(table)s DROP CONSTRAINT IF EXISTS %(name)s"
 60        return "ALTER TABLE %(table)s DROP CHECK %(name)s"
 61
 62    @property
 63    def sql_rename_column(self) -> str:
 64        # MariaDB >= 10.5.2 and MySQL >= 8.0.4 support an
 65        # "ALTER TABLE ... RENAME COLUMN" statement.
 66        if self.connection.mysql_is_mariadb:
 67            if self.connection.mysql_version >= (10, 5, 2):
 68                return super().sql_rename_column
 69        elif self.connection.mysql_version >= (8, 0, 4):
 70            return super().sql_rename_column
 71        return "ALTER TABLE %(table)s CHANGE %(old_column)s %(new_column)s %(type)s"
 72
 73    def quote_value(self, value: Any) -> str:
 74        self.connection.ensure_connection()
 75        if isinstance(value, str):
 76            value = value.replace("%", "%%")
 77        # MySQLdb escapes to string, PyMySQL to bytes.
 78        quoted = self.connection.connection.escape(
 79            value, self.connection.connection.encoders
 80        )
 81        if isinstance(value, str) and isinstance(quoted, bytes):
 82            quoted = quoted.decode()
 83        return quoted
 84
 85    def _is_limited_data_type(self, field: Field) -> bool:
 86        db_type = field.db_type(self.connection)
 87        return (
 88            db_type is not None
 89            and db_type.lower() in self.connection._limited_data_types
 90        )
 91
 92    def skip_default(self, field: Field) -> bool:
 93        if not self._supports_limited_data_type_defaults:
 94            return self._is_limited_data_type(field)
 95        return False
 96
 97    def skip_default_on_alter(self, field: Field) -> bool:
 98        if self._is_limited_data_type(field) and not self.connection.mysql_is_mariadb:
 99            # MySQL doesn't support defaults for BLOB and TEXT in the
100            # ALTER COLUMN statement.
101            return True
102        return False
103
104    @property
105    def _supports_limited_data_type_defaults(self) -> bool:
106        # MariaDB and MySQL >= 8.0.13 support defaults for BLOB and TEXT.
107        if self.connection.mysql_is_mariadb:
108            return True
109        return self.connection.mysql_version >= (8, 0, 13)
110
111    def _column_default_sql(self, field: Field) -> str:
112        if (
113            not self.connection.mysql_is_mariadb
114            and self._supports_limited_data_type_defaults
115            and self._is_limited_data_type(field)
116        ):
117            # MySQL supports defaults for BLOB and TEXT columns only if the
118            # default value is written as an expression i.e. in parentheses.
119            return "(%s)"
120        return super()._column_default_sql(field)
121
122    def add_field(self, model: type[Model], field: Field) -> None:
123        super().add_field(model, field)
124
125        # Simulate the effect of a one-off default.
126        # field.default may be unhashable, so a set isn't used for "in" check.
127        if self.skip_default(field) and field.default not in (None, NOT_PROVIDED):
128            effective_default = self.effective_default(field)
129            self.execute(
130                f"UPDATE {self.quote_name(model.model_options.db_table)} SET {self.quote_name(field.column)} = %s",
131                [effective_default],
132            )
133
134    def remove_constraint(self, model: type[Model], constraint: BaseConstraint) -> None:
135        if (
136            isinstance(constraint, UniqueConstraint)
137            and constraint.create_sql(model, self) is not None
138        ):
139            self._create_missing_fk_index(
140                model,
141                fields=constraint.fields,
142                expressions=constraint.expressions,
143            )
144        super().remove_constraint(model, constraint)
145
146    def remove_index(self, model: type[Model], index: Index) -> None:
147        self._create_missing_fk_index(
148            model,
149            fields=[field_name for field_name, _ in index.fields_orders],
150            expressions=index.expressions,
151        )
152        super().remove_index(model, index)
153
154    def _field_should_be_indexed(self, model: type[Model], field: Field) -> bool:
155        if not super()._field_should_be_indexed(model, field):
156            return False
157
158        storage = self.connection.introspection.get_storage_engine(
159            self.connection.cursor(), model.model_options.db_table
160        )
161        # No need to create an index for ForeignKeyField fields except if
162        # db_constraint=False because the index from that constraint won't be
163        # created.
164        if (
165            storage == "InnoDB"
166            and isinstance(field, ForeignKeyField)
167            and field.db_constraint
168        ):
169            return False
170        return not self._is_limited_data_type(field)
171
172    def _create_missing_fk_index(
173        self,
174        model: type[Model],
175        *,
176        fields: Sequence[str],
177        expressions: Sequence[Any] | None = None,
178    ) -> None:
179        """
180        MySQL can remove an implicit FK index on a field when that field is
181        covered by another index. "covered" here means
182        that the more complex index has the FK field as its first field (see
183        https://bugs.mysql.com/bug.php?id=37910).
184
185        Manually create an implicit FK index to make it possible to remove the
186        composed index.
187        """
188        first_field_name = None
189        if fields:
190            first_field_name = fields[0]
191        elif (
192            expressions
193            and self.connection.features.supports_expression_indexes
194            and isinstance(expressions[0], F)
195            and LOOKUP_SEP not in expressions[0].name
196        ):
197            first_field_name = expressions[0].name
198
199        if not first_field_name:
200            return
201
202        first_field = model._model_meta.get_forward_field(first_field_name)
203        if first_field.get_internal_type() == "ForeignKeyField":
204            column = self.connection.introspection.identifier_converter(
205                first_field.column
206            )
207            with self.connection.cursor() as cursor:
208                constraint_names = [
209                    name
210                    for name, infodict in self.connection.introspection.get_constraints(
211                        cursor, model.model_options.db_table
212                    ).items()
213                    if infodict["index"] and infodict["columns"][0] == column
214                ]
215            # There are no other indexes that starts with the FK field, only
216            # the index that is expected to be deleted.
217            if len(constraint_names) == 1:
218                self.execute(
219                    self._create_index_sql(model, fields=[first_field], suffix="")
220                )
221
222    def _set_field_new_type_null_status(self, field: Field, new_type: str) -> str:
223        """
224        Keep the null property of the old field. If it has changed, it will be
225        handled separately.
226        """
227        if field.allow_null:
228            new_type += " NULL"
229        else:
230            new_type += " NOT NULL"
231        return new_type
232
233    def _alter_column_type_sql(
234        self,
235        model: type[Model],
236        old_field: Field,
237        new_field: Field,
238        new_type: str,
239    ) -> tuple[tuple[str, list[Any]], list[tuple[str, list[Any]]]]:
240        new_type = self._set_field_new_type_null_status(old_field, new_type)
241        return super()._alter_column_type_sql(model, old_field, new_field, new_type)
242
243    def _field_db_check(
244        self, field: Field, field_db_params: DbParameters
245    ) -> str | None:
246        if self.connection.mysql_is_mariadb and self.connection.mysql_version >= (
247            10,
248            5,
249            2,
250        ):
251            return super()._field_db_check(field, field_db_params)
252        # On MySQL and MariaDB < 10.5.2 (no support for
253        # "ALTER TABLE ... RENAME COLUMN" statements), check constraints with
254        # the column name as it requires explicit recreation when the column is
255        # renamed.
256        return field_db_params["check"]
257
258    def _rename_field_sql(
259        self, table: str, old_field: Field, new_field: Field, new_type: str
260    ) -> str:
261        new_type = self._set_field_new_type_null_status(old_field, new_type)
262        return super()._rename_field_sql(table, old_field, new_field, new_type)