Plain is headed towards 1.0! Subscribe for development updates →

  1from __future__ import annotations
  2
  3from typing import TYPE_CHECKING, Any
  4
  5from plain.models.backends.base.schema import BaseDatabaseSchemaEditor
  6from plain.models.constants import LOOKUP_SEP
  7from plain.models.constraints import UniqueConstraint
  8from plain.models.expressions import F
  9from plain.models.fields import NOT_PROVIDED
 10from plain.models.fields.related import ForeignKeyField
 11
 12if TYPE_CHECKING:
 13    from collections.abc import Sequence
 14
 15    from plain.models.backends.mysql.base import MySQLDatabaseWrapper
 16    from plain.models.base import Model
 17    from plain.models.constraints import BaseConstraint
 18    from plain.models.fields import Field
 19    from plain.models.indexes import Index
 20
 21
 22class DatabaseSchemaEditor(BaseDatabaseSchemaEditor):
 23    # Type checker hint: connection is always MySQLDatabaseWrapper in this class
 24    connection: MySQLDatabaseWrapper
 25
 26    sql_rename_table = "RENAME TABLE %(old_table)s TO %(new_table)s"
 27
 28    sql_alter_column_null = "MODIFY %(column)s %(type)s NULL"
 29    sql_alter_column_not_null = "MODIFY %(column)s %(type)s NOT NULL"
 30    sql_alter_column_type = "MODIFY %(column)s %(type)s%(collation)s%(comment)s"
 31    sql_alter_column_no_default_null = "ALTER COLUMN %(column)s SET DEFAULT NULL"
 32
 33    # No 'CASCADE' which works as a no-op in MySQL but is undocumented
 34    sql_delete_column = "ALTER TABLE %(table)s DROP COLUMN %(column)s"
 35
 36    sql_delete_unique = "ALTER TABLE %(table)s DROP INDEX %(name)s"
 37    sql_create_column_inline_fk = (
 38        ", ADD CONSTRAINT %(name)s FOREIGN KEY (%(column)s) "
 39        "REFERENCES %(to_table)s(%(to_column)s)"
 40    )
 41    sql_delete_fk = "ALTER TABLE %(table)s DROP FOREIGN KEY %(name)s"
 42
 43    sql_delete_index = "DROP INDEX %(name)s ON %(table)s"
 44    sql_rename_index = "ALTER TABLE %(table)s RENAME INDEX %(old_name)s TO %(new_name)s"
 45
 46    sql_create_pk = (
 47        "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s PRIMARY KEY (%(columns)s)"
 48    )
 49    sql_delete_pk = "ALTER TABLE %(table)s DROP PRIMARY KEY"
 50
 51    sql_create_index = "CREATE INDEX %(name)s ON %(table)s (%(columns)s)%(extra)s"
 52
 53    sql_alter_table_comment = "ALTER TABLE %(table)s COMMENT = %(comment)s"
 54    sql_alter_column_comment = None
 55
 56    @property
 57    def sql_delete_check(self) -> str:
 58        if self.connection.mysql_is_mariadb:
 59            # The name of the column check constraint is the same as the field
 60            # name on MariaDB. Adding IF EXISTS clause prevents migrations
 61            # crash. Constraint is removed during a "MODIFY" column statement.
 62            return "ALTER TABLE %(table)s DROP CONSTRAINT IF EXISTS %(name)s"
 63        return "ALTER TABLE %(table)s DROP CHECK %(name)s"
 64
 65    @property
 66    def sql_rename_column(self) -> str:
 67        # MariaDB >= 10.5.2 and MySQL >= 8.0.4 support an
 68        # "ALTER TABLE ... RENAME COLUMN" statement.
 69        if self.connection.mysql_is_mariadb:
 70            if self.connection.mysql_version >= (10, 5, 2):
 71                return super().sql_rename_column
 72        elif self.connection.mysql_version >= (8, 0, 4):
 73            return super().sql_rename_column
 74        return "ALTER TABLE %(table)s CHANGE %(old_column)s %(new_column)s %(type)s"
 75
 76    def quote_value(self, value: Any) -> str:
 77        self.connection.ensure_connection()
 78        if isinstance(value, str):
 79            value = value.replace("%", "%%")
 80        # MySQLdb escapes to string, PyMySQL to bytes.
 81        quoted = self.connection.connection.escape(
 82            value, self.connection.connection.encoders
 83        )
 84        if isinstance(value, str) and isinstance(quoted, bytes):
 85            quoted = quoted.decode()
 86        return quoted
 87
 88    def _is_limited_data_type(self, field: Field) -> bool:
 89        db_type = field.db_type(self.connection)
 90        return (
 91            db_type is not None
 92            and db_type.lower() in self.connection._limited_data_types
 93        )
 94
 95    def skip_default(self, field: Field) -> bool:
 96        if not self._supports_limited_data_type_defaults:
 97            return self._is_limited_data_type(field)
 98        return False
 99
100    def skip_default_on_alter(self, field: Field) -> bool:
101        if self._is_limited_data_type(field) and not self.connection.mysql_is_mariadb:
102            # MySQL doesn't support defaults for BLOB and TEXT in the
103            # ALTER COLUMN statement.
104            return True
105        return False
106
107    @property
108    def _supports_limited_data_type_defaults(self) -> bool:
109        # MariaDB and MySQL >= 8.0.13 support defaults for BLOB and TEXT.
110        if self.connection.mysql_is_mariadb:
111            return True
112        return self.connection.mysql_version >= (8, 0, 13)
113
114    def _column_default_sql(self, field: Field) -> str:
115        if (
116            not self.connection.mysql_is_mariadb
117            and self._supports_limited_data_type_defaults
118            and self._is_limited_data_type(field)
119        ):
120            # MySQL supports defaults for BLOB and TEXT columns only if the
121            # default value is written as an expression i.e. in parentheses.
122            return "(%s)"
123        return super()._column_default_sql(field)
124
125    def add_field(self, model: type[Model], field: Field) -> None:
126        super().add_field(model, field)
127
128        # Simulate the effect of a one-off default.
129        # field.default may be unhashable, so a set isn't used for "in" check.
130        if self.skip_default(field) and field.default not in (None, NOT_PROVIDED):
131            effective_default = self.effective_default(field)
132            self.execute(
133                f"UPDATE {self.quote_name(model.model_options.db_table)} SET {self.quote_name(field.column)} = %s",
134                [effective_default],
135            )
136
137    def remove_constraint(self, model: type[Model], constraint: BaseConstraint) -> None:
138        if (
139            isinstance(constraint, UniqueConstraint)
140            and constraint.create_sql(model, self) is not None
141        ):
142            self._create_missing_fk_index(
143                model,
144                fields=constraint.fields,
145                expressions=constraint.expressions,
146            )
147        super().remove_constraint(model, constraint)
148
149    def remove_index(self, model: type[Model], index: Index) -> None:
150        self._create_missing_fk_index(
151            model,
152            fields=[field_name for field_name, _ in index.fields_orders],
153            expressions=index.expressions,
154        )
155        super().remove_index(model, index)
156
157    def _field_should_be_indexed(self, model: type[Model], field: Field) -> bool:
158        if not super()._field_should_be_indexed(model, field):
159            return False
160
161        storage = self.connection.introspection.get_storage_engine(
162            self.connection.cursor(), model.model_options.db_table
163        )
164        # No need to create an index for ForeignKeyField fields except if
165        # db_constraint=False because the index from that constraint won't be
166        # created.
167        if (
168            storage == "InnoDB"
169            and isinstance(field, ForeignKeyField)
170            and field.db_constraint
171        ):
172            return False
173        return not self._is_limited_data_type(field)
174
175    def _create_missing_fk_index(
176        self,
177        model: type[Model],
178        *,
179        fields: Sequence[str],
180        expressions: Sequence[Any] | None = None,
181    ) -> None:
182        """
183        MySQL can remove an implicit FK index on a field when that field is
184        covered by another index. "covered" here means
185        that the more complex index has the FK field as its first field (see
186        https://bugs.mysql.com/bug.php?id=37910).
187
188        Manually create an implicit FK index to make it possible to remove the
189        composed index.
190        """
191        first_field_name = None
192        if fields:
193            first_field_name = fields[0]
194        elif (
195            expressions
196            and self.connection.features.supports_expression_indexes
197            and isinstance(expressions[0], F)
198            and LOOKUP_SEP not in expressions[0].name
199        ):
200            first_field_name = expressions[0].name
201
202        if not first_field_name:
203            return
204
205        first_field = model._model_meta.get_forward_field(first_field_name)
206        if first_field.get_internal_type() == "ForeignKeyField":
207            column = self.connection.introspection.identifier_converter(
208                first_field.column
209            )
210            with self.connection.cursor() as cursor:
211                constraint_names = [
212                    name
213                    for name, infodict in self.connection.introspection.get_constraints(
214                        cursor, model.model_options.db_table
215                    ).items()
216                    if infodict["index"] and infodict["columns"][0] == column
217                ]
218            # There are no other indexes that starts with the FK field, only
219            # the index that is expected to be deleted.
220            if len(constraint_names) == 1:
221                self.execute(
222                    self._create_index_sql(model, fields=[first_field], suffix="")
223                )
224
225    def _set_field_new_type_null_status(self, field: Field, new_type: str) -> str:
226        """
227        Keep the null property of the old field. If it has changed, it will be
228        handled separately.
229        """
230        if field.allow_null:
231            new_type += " NULL"
232        else:
233            new_type += " NOT NULL"
234        return new_type
235
236    def _alter_column_type_sql(
237        self,
238        model: type[Model],
239        old_field: Field,
240        new_field: Field,
241        new_type: str,
242        old_collation: str,
243        new_collation: str,
244    ) -> tuple[tuple[str, list[Any]], list[tuple[str, list[Any]]]]:
245        new_type = self._set_field_new_type_null_status(old_field, new_type)
246        return super()._alter_column_type_sql(
247            model, old_field, new_field, new_type, old_collation, new_collation
248        )
249
250    def _field_db_check(
251        self, field: Field, field_db_params: dict[str, Any]
252    ) -> str | None:
253        if self.connection.mysql_is_mariadb and self.connection.mysql_version >= (
254            10,
255            5,
256            2,
257        ):
258            return super()._field_db_check(field, field_db_params)
259        # On MySQL and MariaDB < 10.5.2 (no support for
260        # "ALTER TABLE ... RENAME COLUMN" statements), check constraints with
261        # the column name as it requires explicit recreation when the column is
262        # renamed.
263        return field_db_params["check"]
264
265    def _rename_field_sql(
266        self, table: str, old_field: Field, new_field: Field, new_type: str
267    ) -> str:
268        new_type = self._set_field_new_type_null_status(old_field, new_type)
269        return super()._rename_field_sql(table, old_field, new_field, new_type)
270
271    def _alter_column_comment_sql(
272        self, model: type[Model], new_field: Field, new_type: str, new_db_comment: str
273    ) -> tuple[str, list[Any]]:
274        # Comment is alter when altering the column type.
275        return "", []
276
277    def _comment_sql(self, comment: str | None) -> str:
278        comment_sql = super()._comment_sql(comment)
279        return f" COMMENT {comment_sql}"