Plain is headed towards 1.0! Subscribe for development updates →

  1from __future__ import annotations
  2
  3from typing import TYPE_CHECKING, Any
  4
  5from plain.models.backends.base.schema import BaseDatabaseSchemaEditor
  6from plain.models.constants import LOOKUP_SEP
  7from plain.models.constraints import UniqueConstraint
  8from plain.models.expressions import F
  9from plain.models.fields import NOT_PROVIDED
 10
 11if TYPE_CHECKING:
 12    from collections.abc import Sequence
 13
 14    from plain.models.backends.mysql.base import MySQLDatabaseWrapper
 15    from plain.models.base import Model
 16    from plain.models.constraints import BaseConstraint
 17    from plain.models.fields import Field
 18    from plain.models.indexes import Index
 19
 20
 21class DatabaseSchemaEditor(BaseDatabaseSchemaEditor):
 22    # Type checker hint: connection is always MySQLDatabaseWrapper in this class
 23    connection: MySQLDatabaseWrapper
 24
 25    sql_rename_table = "RENAME TABLE %(old_table)s TO %(new_table)s"
 26
 27    sql_alter_column_null = "MODIFY %(column)s %(type)s NULL"
 28    sql_alter_column_not_null = "MODIFY %(column)s %(type)s NOT NULL"
 29    sql_alter_column_type = "MODIFY %(column)s %(type)s%(collation)s%(comment)s"
 30    sql_alter_column_no_default_null = "ALTER COLUMN %(column)s SET DEFAULT NULL"
 31
 32    # No 'CASCADE' which works as a no-op in MySQL but is undocumented
 33    sql_delete_column = "ALTER TABLE %(table)s DROP COLUMN %(column)s"
 34
 35    sql_delete_unique = "ALTER TABLE %(table)s DROP INDEX %(name)s"
 36    sql_create_column_inline_fk = (
 37        ", ADD CONSTRAINT %(name)s FOREIGN KEY (%(column)s) "
 38        "REFERENCES %(to_table)s(%(to_column)s)"
 39    )
 40    sql_delete_fk = "ALTER TABLE %(table)s DROP FOREIGN KEY %(name)s"
 41
 42    sql_delete_index = "DROP INDEX %(name)s ON %(table)s"
 43    sql_rename_index = "ALTER TABLE %(table)s RENAME INDEX %(old_name)s TO %(new_name)s"
 44
 45    sql_create_pk = (
 46        "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s PRIMARY KEY (%(columns)s)"
 47    )
 48    sql_delete_pk = "ALTER TABLE %(table)s DROP PRIMARY KEY"
 49
 50    sql_create_index = "CREATE INDEX %(name)s ON %(table)s (%(columns)s)%(extra)s"
 51
 52    sql_alter_table_comment = "ALTER TABLE %(table)s COMMENT = %(comment)s"
 53    sql_alter_column_comment = None
 54
 55    @property
 56    def sql_delete_check(self) -> str:
 57        if self.connection.mysql_is_mariadb:
 58            # The name of the column check constraint is the same as the field
 59            # name on MariaDB. Adding IF EXISTS clause prevents migrations
 60            # crash. Constraint is removed during a "MODIFY" column statement.
 61            return "ALTER TABLE %(table)s DROP CONSTRAINT IF EXISTS %(name)s"
 62        return "ALTER TABLE %(table)s DROP CHECK %(name)s"
 63
 64    @property
 65    def sql_rename_column(self) -> str:
 66        # MariaDB >= 10.5.2 and MySQL >= 8.0.4 support an
 67        # "ALTER TABLE ... RENAME COLUMN" statement.
 68        if self.connection.mysql_is_mariadb:
 69            if self.connection.mysql_version >= (10, 5, 2):
 70                return super().sql_rename_column
 71        elif self.connection.mysql_version >= (8, 0, 4):
 72            return super().sql_rename_column
 73        return "ALTER TABLE %(table)s CHANGE %(old_column)s %(new_column)s %(type)s"
 74
 75    def quote_value(self, value: Any) -> str:
 76        self.connection.ensure_connection()
 77        if isinstance(value, str):
 78            value = value.replace("%", "%%")
 79        # MySQLdb escapes to string, PyMySQL to bytes.
 80        quoted = self.connection.connection.escape(
 81            value, self.connection.connection.encoders
 82        )
 83        if isinstance(value, str) and isinstance(quoted, bytes):
 84            quoted = quoted.decode()
 85        return quoted
 86
 87    def _is_limited_data_type(self, field: Field) -> bool:
 88        db_type = field.db_type(self.connection)
 89        return (
 90            db_type is not None
 91            and db_type.lower() in self.connection._limited_data_types
 92        )
 93
 94    def skip_default(self, field: Field) -> bool:
 95        if not self._supports_limited_data_type_defaults:
 96            return self._is_limited_data_type(field)
 97        return False
 98
 99    def skip_default_on_alter(self, field: Field) -> bool:
100        if self._is_limited_data_type(field) and not self.connection.mysql_is_mariadb:
101            # MySQL doesn't support defaults for BLOB and TEXT in the
102            # ALTER COLUMN statement.
103            return True
104        return False
105
106    @property
107    def _supports_limited_data_type_defaults(self) -> bool:
108        # MariaDB and MySQL >= 8.0.13 support defaults for BLOB and TEXT.
109        if self.connection.mysql_is_mariadb:
110            return True
111        return self.connection.mysql_version >= (8, 0, 13)
112
113    def _column_default_sql(self, field: Field) -> str:
114        if (
115            not self.connection.mysql_is_mariadb
116            and self._supports_limited_data_type_defaults
117            and self._is_limited_data_type(field)
118        ):
119            # MySQL supports defaults for BLOB and TEXT columns only if the
120            # default value is written as an expression i.e. in parentheses.
121            return "(%s)"
122        return super()._column_default_sql(field)
123
124    def add_field(self, model: type[Model], field: Field) -> None:
125        super().add_field(model, field)
126
127        # Simulate the effect of a one-off default.
128        # field.default may be unhashable, so a set isn't used for "in" check.
129        if self.skip_default(field) and field.default not in (None, NOT_PROVIDED):
130            effective_default = self.effective_default(field)
131            self.execute(
132                f"UPDATE {self.quote_name(model.model_options.db_table)} SET {self.quote_name(field.column)} = %s",
133                [effective_default],
134            )
135
136    def remove_constraint(self, model: type[Model], constraint: BaseConstraint) -> None:
137        if (
138            isinstance(constraint, UniqueConstraint)
139            and constraint.create_sql(model, self) is not None
140        ):
141            self._create_missing_fk_index(
142                model,
143                fields=constraint.fields,
144                expressions=constraint.expressions,
145            )
146        super().remove_constraint(model, constraint)
147
148    def remove_index(self, model: type[Model], index: Index) -> None:
149        self._create_missing_fk_index(
150            model,
151            fields=[field_name for field_name, _ in index.fields_orders],
152            expressions=index.expressions,
153        )
154        super().remove_index(model, index)
155
156    def _field_should_be_indexed(self, model: type[Model], field: Field) -> bool:
157        if not super()._field_should_be_indexed(model, field):
158            return False
159
160        storage = self.connection.introspection.get_storage_engine(
161            self.connection.cursor(), model.model_options.db_table
162        )
163        # No need to create an index for ForeignKey fields except if
164        # db_constraint=False because the index from that constraint won't be
165        # created.
166        if (
167            storage == "InnoDB"
168            and field.get_internal_type() == "ForeignKey"
169            and field.db_constraint  # type: ignore[attr-defined]
170        ):
171            return False
172        return not self._is_limited_data_type(field)
173
174    def _create_missing_fk_index(
175        self,
176        model: type[Model],
177        *,
178        fields: Sequence[str],
179        expressions: Sequence[Any] | None = None,
180    ) -> None:
181        """
182        MySQL can remove an implicit FK index on a field when that field is
183        covered by another index. "covered" here means
184        that the more complex index has the FK field as its first field (see
185        https://bugs.mysql.com/bug.php?id=37910).
186
187        Manually create an implicit FK index to make it possible to remove the
188        composed index.
189        """
190        first_field_name = None
191        if fields:
192            first_field_name = fields[0]
193        elif (
194            expressions
195            and self.connection.features.supports_expression_indexes
196            and isinstance(expressions[0], F)
197            and LOOKUP_SEP not in expressions[0].name
198        ):
199            first_field_name = expressions[0].name
200
201        if not first_field_name:
202            return
203
204        first_field = model._model_meta.get_field(first_field_name)
205        if first_field.get_internal_type() == "ForeignKey":
206            column = self.connection.introspection.identifier_converter(
207                first_field.column
208            )
209            with self.connection.cursor() as cursor:
210                constraint_names = [
211                    name
212                    for name, infodict in self.connection.introspection.get_constraints(
213                        cursor, model.model_options.db_table
214                    ).items()
215                    if infodict["index"] and infodict["columns"][0] == column
216                ]
217            # There are no other indexes that starts with the FK field, only
218            # the index that is expected to be deleted.
219            if len(constraint_names) == 1:
220                self.execute(
221                    self._create_index_sql(model, fields=[first_field], suffix="")
222                )
223
224    def _set_field_new_type_null_status(self, field: Field, new_type: str) -> str:
225        """
226        Keep the null property of the old field. If it has changed, it will be
227        handled separately.
228        """
229        if field.allow_null:
230            new_type += " NULL"
231        else:
232            new_type += " NOT NULL"
233        return new_type
234
235    def _alter_column_type_sql(
236        self,
237        model: type[Model],
238        old_field: Field,
239        new_field: Field,
240        new_type: str,
241        old_collation: str,
242        new_collation: str,
243    ) -> tuple[tuple[str, list[Any]], list[tuple[str, list[Any]]]]:
244        new_type = self._set_field_new_type_null_status(old_field, new_type)
245        return super()._alter_column_type_sql(
246            model, old_field, new_field, new_type, old_collation, new_collation
247        )
248
249    def _field_db_check(
250        self, field: Field, field_db_params: dict[str, Any]
251    ) -> str | None:
252        if self.connection.mysql_is_mariadb and self.connection.mysql_version >= (
253            10,
254            5,
255            2,
256        ):
257            return super()._field_db_check(field, field_db_params)
258        # On MySQL and MariaDB < 10.5.2 (no support for
259        # "ALTER TABLE ... RENAME COLUMN" statements), check constraints with
260        # the column name as it requires explicit recreation when the column is
261        # renamed.
262        return field_db_params["check"]
263
264    def _rename_field_sql(
265        self, table: str, old_field: Field, new_field: Field, new_type: str
266    ) -> str:
267        new_type = self._set_field_new_type_null_status(old_field, new_type)
268        return super()._rename_field_sql(table, old_field, new_field, new_type)
269
270    def _alter_column_comment_sql(
271        self, model: type[Model], new_field: Field, new_type: str, new_db_comment: str
272    ) -> tuple[str, list[Any]]:
273        # Comment is alter when altering the column type.
274        return "", []
275
276    def _comment_sql(self, comment: str | None) -> str:
277        comment_sql = super()._comment_sql(comment)
278        return f" COMMENT {comment_sql}"