1from __future__ import annotations
  2
  3from typing import TYPE_CHECKING, Any
  4
  5from plain.models.backends.base.schema import BaseDatabaseSchemaEditor
  6from plain.models.constants import LOOKUP_SEP
  7from plain.models.constraints import UniqueConstraint
  8from plain.models.expressions import F
  9from plain.models.fields import NOT_PROVIDED
 10
 11if TYPE_CHECKING:
 12    from collections.abc import Sequence
 13
 14    from plain.models.base import Model
 15    from plain.models.constraints import BaseConstraint
 16    from plain.models.fields import Field
 17    from plain.models.indexes import Index
 18
 19
 20class DatabaseSchemaEditor(BaseDatabaseSchemaEditor):
 21    sql_rename_table = "RENAME TABLE %(old_table)s TO %(new_table)s"
 22
 23    sql_alter_column_null = "MODIFY %(column)s %(type)s NULL"
 24    sql_alter_column_not_null = "MODIFY %(column)s %(type)s NOT NULL"
 25    sql_alter_column_type = "MODIFY %(column)s %(type)s%(collation)s%(comment)s"
 26    sql_alter_column_no_default_null = "ALTER COLUMN %(column)s SET DEFAULT NULL"
 27
 28    # No 'CASCADE' which works as a no-op in MySQL but is undocumented
 29    sql_delete_column = "ALTER TABLE %(table)s DROP COLUMN %(column)s"
 30
 31    sql_delete_unique = "ALTER TABLE %(table)s DROP INDEX %(name)s"
 32    sql_create_column_inline_fk = (
 33        ", ADD CONSTRAINT %(name)s FOREIGN KEY (%(column)s) "
 34        "REFERENCES %(to_table)s(%(to_column)s)"
 35    )
 36    sql_delete_fk = "ALTER TABLE %(table)s DROP FOREIGN KEY %(name)s"
 37
 38    sql_delete_index = "DROP INDEX %(name)s ON %(table)s"
 39    sql_rename_index = "ALTER TABLE %(table)s RENAME INDEX %(old_name)s TO %(new_name)s"
 40
 41    sql_create_pk = (
 42        "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s PRIMARY KEY (%(columns)s)"
 43    )
 44    sql_delete_pk = "ALTER TABLE %(table)s DROP PRIMARY KEY"
 45
 46    sql_create_index = "CREATE INDEX %(name)s ON %(table)s (%(columns)s)%(extra)s"
 47
 48    sql_alter_table_comment = "ALTER TABLE %(table)s COMMENT = %(comment)s"
 49    sql_alter_column_comment = None
 50
 51    @property
 52    def sql_delete_check(self) -> str:
 53        if self.connection.mysql_is_mariadb:
 54            # The name of the column check constraint is the same as the field
 55            # name on MariaDB. Adding IF EXISTS clause prevents migrations
 56            # crash. Constraint is removed during a "MODIFY" column statement.
 57            return "ALTER TABLE %(table)s DROP CONSTRAINT IF EXISTS %(name)s"
 58        return "ALTER TABLE %(table)s DROP CHECK %(name)s"
 59
 60    @property
 61    def sql_rename_column(self) -> str:
 62        # MariaDB >= 10.5.2 and MySQL >= 8.0.4 support an
 63        # "ALTER TABLE ... RENAME COLUMN" statement.
 64        if self.connection.mysql_is_mariadb:
 65            if self.connection.mysql_version >= (10, 5, 2):
 66                return super().sql_rename_column
 67        elif self.connection.mysql_version >= (8, 0, 4):
 68            return super().sql_rename_column
 69        return "ALTER TABLE %(table)s CHANGE %(old_column)s %(new_column)s %(type)s"
 70
 71    def quote_value(self, value: Any) -> str:
 72        self.connection.ensure_connection()
 73        if isinstance(value, str):
 74            value = value.replace("%", "%%")
 75        # MySQLdb escapes to string, PyMySQL to bytes.
 76        quoted = self.connection.connection.escape(
 77            value, self.connection.connection.encoders
 78        )
 79        if isinstance(value, str) and isinstance(quoted, bytes):
 80            quoted = quoted.decode()
 81        return quoted
 82
 83    def _is_limited_data_type(self, field: Field) -> bool:
 84        db_type = field.db_type(self.connection)
 85        return (
 86            db_type is not None
 87            and db_type.lower() in self.connection._limited_data_types
 88        )
 89
 90    def skip_default(self, field: Field) -> bool:
 91        if not self._supports_limited_data_type_defaults:
 92            return self._is_limited_data_type(field)
 93        return False
 94
 95    def skip_default_on_alter(self, field: Field) -> bool:
 96        if self._is_limited_data_type(field) and not self.connection.mysql_is_mariadb:
 97            # MySQL doesn't support defaults for BLOB and TEXT in the
 98            # ALTER COLUMN statement.
 99            return True
100        return False
101
102    @property
103    def _supports_limited_data_type_defaults(self) -> bool:
104        # MariaDB and MySQL >= 8.0.13 support defaults for BLOB and TEXT.
105        if self.connection.mysql_is_mariadb:
106            return True
107        return self.connection.mysql_version >= (8, 0, 13)
108
109    def _column_default_sql(self, field: Field) -> str:
110        if (
111            not self.connection.mysql_is_mariadb
112            and self._supports_limited_data_type_defaults
113            and self._is_limited_data_type(field)
114        ):
115            # MySQL supports defaults for BLOB and TEXT columns only if the
116            # default value is written as an expression i.e. in parentheses.
117            return "(%s)"
118        return super()._column_default_sql(field)
119
120    def add_field(self, model: type[Model], field: Field) -> None:
121        super().add_field(model, field)
122
123        # Simulate the effect of a one-off default.
124        # field.default may be unhashable, so a set isn't used for "in" check.
125        if self.skip_default(field) and field.default not in (None, NOT_PROVIDED):
126            effective_default = self.effective_default(field)
127            self.execute(
128                f"UPDATE {self.quote_name(model.model_options.db_table)} SET {self.quote_name(field.column)} = %s",
129                [effective_default],
130            )
131
132    def remove_constraint(self, model: type[Model], constraint: BaseConstraint) -> None:
133        if (
134            isinstance(constraint, UniqueConstraint)
135            and constraint.create_sql(model, self) is not None
136        ):
137            self._create_missing_fk_index(
138                model,
139                fields=constraint.fields,
140                expressions=constraint.expressions,
141            )
142        super().remove_constraint(model, constraint)
143
144    def remove_index(self, model: type[Model], index: Index) -> None:
145        self._create_missing_fk_index(
146            model,
147            fields=[field_name for field_name, _ in index.fields_orders],
148            expressions=index.expressions,
149        )
150        super().remove_index(model, index)
151
152    def _field_should_be_indexed(self, model: type[Model], field: Field) -> bool:
153        if not super()._field_should_be_indexed(model, field):
154            return False
155
156        storage = self.connection.introspection.get_storage_engine(
157            self.connection.cursor(), model.model_options.db_table
158        )
159        # No need to create an index for ForeignKey fields except if
160        # db_constraint=False because the index from that constraint won't be
161        # created.
162        if (
163            storage == "InnoDB"
164            and field.get_internal_type() == "ForeignKey"
165            and field.db_constraint  # type: ignore[attr-defined]
166        ):
167            return False
168        return not self._is_limited_data_type(field)
169
170    def _create_missing_fk_index(
171        self,
172        model: type[Model],
173        *,
174        fields: Sequence[str],
175        expressions: Sequence[Any] | None = None,
176    ) -> None:
177        """
178        MySQL can remove an implicit FK index on a field when that field is
179        covered by another index. "covered" here means
180        that the more complex index has the FK field as its first field (see
181        https://bugs.mysql.com/bug.php?id=37910).
182
183        Manually create an implicit FK index to make it possible to remove the
184        composed index.
185        """
186        first_field_name = None
187        if fields:
188            first_field_name = fields[0]
189        elif (
190            expressions
191            and self.connection.features.supports_expression_indexes
192            and isinstance(expressions[0], F)
193            and LOOKUP_SEP not in expressions[0].name
194        ):
195            first_field_name = expressions[0].name
196
197        if not first_field_name:
198            return
199
200        first_field = model._model_meta.get_field(first_field_name)
201        if first_field.get_internal_type() == "ForeignKey":
202            column = self.connection.introspection.identifier_converter(
203                first_field.column
204            )
205            with self.connection.cursor() as cursor:
206                constraint_names = [
207                    name
208                    for name, infodict in self.connection.introspection.get_constraints(
209                        cursor, model.model_options.db_table
210                    ).items()
211                    if infodict["index"] and infodict["columns"][0] == column
212                ]
213            # There are no other indexes that starts with the FK field, only
214            # the index that is expected to be deleted.
215            if len(constraint_names) == 1:
216                self.execute(
217                    self._create_index_sql(model, fields=[first_field], suffix="")
218                )
219
220    def _set_field_new_type_null_status(self, field: Field, new_type: str) -> str:
221        """
222        Keep the null property of the old field. If it has changed, it will be
223        handled separately.
224        """
225        if field.allow_null:
226            new_type += " NULL"
227        else:
228            new_type += " NOT NULL"
229        return new_type
230
231    def _alter_column_type_sql(
232        self,
233        model: type[Model],
234        old_field: Field,
235        new_field: Field,
236        new_type: str,
237        old_collation: str,
238        new_collation: str,
239    ) -> tuple[str, list[Any]]:
240        new_type = self._set_field_new_type_null_status(old_field, new_type)
241        return super()._alter_column_type_sql(
242            model, old_field, new_field, new_type, old_collation, new_collation
243        )
244
245    def _field_db_check(
246        self, field: Field, field_db_params: dict[str, Any]
247    ) -> str | None:
248        if self.connection.mysql_is_mariadb and self.connection.mysql_version >= (
249            10,
250            5,
251            2,
252        ):
253            return super()._field_db_check(field, field_db_params)
254        # On MySQL and MariaDB < 10.5.2 (no support for
255        # "ALTER TABLE ... RENAME COLUMN" statements), check constraints with
256        # the column name as it requires explicit recreation when the column is
257        # renamed.
258        return field_db_params["check"]
259
260    def _rename_field_sql(
261        self, table: str, old_field: Field, new_field: Field, new_type: str
262    ) -> str:
263        new_type = self._set_field_new_type_null_status(old_field, new_type)
264        return super()._rename_field_sql(table, old_field, new_field, new_type)
265
266    def _alter_column_comment_sql(
267        self, model: type[Model], new_field: Field, new_type: str, new_db_comment: str
268    ) -> tuple[str, list[Any]]:
269        # Comment is alter when altering the column type.
270        return "", []
271
272    def _comment_sql(self, comment: str | None) -> str:
273        comment_sql = super()._comment_sql(comment)
274        return f" COMMENT {comment_sql}"