Plain is headed towards 1.0! Subscribe for development updates →

  1from plain.models.backends.base.schema import BaseDatabaseSchemaEditor
  2from plain.models.constants import LOOKUP_SEP
  3from plain.models.constraints import UniqueConstraint
  4from plain.models.expressions import F
  5from plain.models.fields import NOT_PROVIDED
  6
  7
  8class DatabaseSchemaEditor(BaseDatabaseSchemaEditor):
  9    sql_rename_table = "RENAME TABLE %(old_table)s TO %(new_table)s"
 10
 11    sql_alter_column_null = "MODIFY %(column)s %(type)s NULL"
 12    sql_alter_column_not_null = "MODIFY %(column)s %(type)s NOT NULL"
 13    sql_alter_column_type = "MODIFY %(column)s %(type)s%(collation)s%(comment)s"
 14    sql_alter_column_no_default_null = "ALTER COLUMN %(column)s SET DEFAULT NULL"
 15
 16    # No 'CASCADE' which works as a no-op in MySQL but is undocumented
 17    sql_delete_column = "ALTER TABLE %(table)s DROP COLUMN %(column)s"
 18
 19    sql_delete_unique = "ALTER TABLE %(table)s DROP INDEX %(name)s"
 20    sql_create_column_inline_fk = (
 21        ", ADD CONSTRAINT %(name)s FOREIGN KEY (%(column)s) "
 22        "REFERENCES %(to_table)s(%(to_column)s)"
 23    )
 24    sql_delete_fk = "ALTER TABLE %(table)s DROP FOREIGN KEY %(name)s"
 25
 26    sql_delete_index = "DROP INDEX %(name)s ON %(table)s"
 27    sql_rename_index = "ALTER TABLE %(table)s RENAME INDEX %(old_name)s TO %(new_name)s"
 28
 29    sql_create_pk = (
 30        "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s PRIMARY KEY (%(columns)s)"
 31    )
 32    sql_delete_pk = "ALTER TABLE %(table)s DROP PRIMARY KEY"
 33
 34    sql_create_index = "CREATE INDEX %(name)s ON %(table)s (%(columns)s)%(extra)s"
 35
 36    sql_alter_table_comment = "ALTER TABLE %(table)s COMMENT = %(comment)s"
 37    sql_alter_column_comment = None
 38
 39    @property
 40    def sql_delete_check(self):
 41        if self.connection.mysql_is_mariadb:
 42            # The name of the column check constraint is the same as the field
 43            # name on MariaDB. Adding IF EXISTS clause prevents migrations
 44            # crash. Constraint is removed during a "MODIFY" column statement.
 45            return "ALTER TABLE %(table)s DROP CONSTRAINT IF EXISTS %(name)s"
 46        return "ALTER TABLE %(table)s DROP CHECK %(name)s"
 47
 48    @property
 49    def sql_rename_column(self):
 50        # MariaDB >= 10.5.2 and MySQL >= 8.0.4 support an
 51        # "ALTER TABLE ... RENAME COLUMN" statement.
 52        if self.connection.mysql_is_mariadb:
 53            if self.connection.mysql_version >= (10, 5, 2):
 54                return super().sql_rename_column
 55        elif self.connection.mysql_version >= (8, 0, 4):
 56            return super().sql_rename_column
 57        return "ALTER TABLE %(table)s CHANGE %(old_column)s %(new_column)s %(type)s"
 58
 59    def quote_value(self, value):
 60        self.connection.ensure_connection()
 61        if isinstance(value, str):
 62            value = value.replace("%", "%%")
 63        # MySQLdb escapes to string, PyMySQL to bytes.
 64        quoted = self.connection.connection.escape(
 65            value, self.connection.connection.encoders
 66        )
 67        if isinstance(value, str) and isinstance(quoted, bytes):
 68            quoted = quoted.decode()
 69        return quoted
 70
 71    def _is_limited_data_type(self, field):
 72        db_type = field.db_type(self.connection)
 73        return (
 74            db_type is not None
 75            and db_type.lower() in self.connection._limited_data_types
 76        )
 77
 78    def skip_default(self, field):
 79        if not self._supports_limited_data_type_defaults:
 80            return self._is_limited_data_type(field)
 81        return False
 82
 83    def skip_default_on_alter(self, field):
 84        if self._is_limited_data_type(field) and not self.connection.mysql_is_mariadb:
 85            # MySQL doesn't support defaults for BLOB and TEXT in the
 86            # ALTER COLUMN statement.
 87            return True
 88        return False
 89
 90    @property
 91    def _supports_limited_data_type_defaults(self):
 92        # MariaDB and MySQL >= 8.0.13 support defaults for BLOB and TEXT.
 93        if self.connection.mysql_is_mariadb:
 94            return True
 95        return self.connection.mysql_version >= (8, 0, 13)
 96
 97    def _column_default_sql(self, field):
 98        if (
 99            not self.connection.mysql_is_mariadb
100            and self._supports_limited_data_type_defaults
101            and self._is_limited_data_type(field)
102        ):
103            # MySQL supports defaults for BLOB and TEXT columns only if the
104            # default value is written as an expression i.e. in parentheses.
105            return "(%s)"
106        return super()._column_default_sql(field)
107
108    def add_field(self, model, field):
109        super().add_field(model, field)
110
111        # Simulate the effect of a one-off default.
112        # field.default may be unhashable, so a set isn't used for "in" check.
113        if self.skip_default(field) and field.default not in (None, NOT_PROVIDED):
114            effective_default = self.effective_default(field)
115            self.execute(
116                f"UPDATE {self.quote_name(model._meta.db_table)} SET {self.quote_name(field.column)} = %s",
117                [effective_default],
118            )
119
120    def remove_constraint(self, model, constraint):
121        if (
122            isinstance(constraint, UniqueConstraint)
123            and constraint.create_sql(model, self) is not None
124        ):
125            self._create_missing_fk_index(
126                model,
127                fields=constraint.fields,
128                expressions=constraint.expressions,
129            )
130        super().remove_constraint(model, constraint)
131
132    def remove_index(self, model, index):
133        self._create_missing_fk_index(
134            model,
135            fields=[field_name for field_name, _ in index.fields_orders],
136            expressions=index.expressions,
137        )
138        super().remove_index(model, index)
139
140    def _field_should_be_indexed(self, model, field):
141        if not super()._field_should_be_indexed(model, field):
142            return False
143
144        storage = self.connection.introspection.get_storage_engine(
145            self.connection.cursor(), model._meta.db_table
146        )
147        # No need to create an index for ForeignKey fields except if
148        # db_constraint=False because the index from that constraint won't be
149        # created.
150        if (
151            storage == "InnoDB"
152            and field.get_internal_type() == "ForeignKey"
153            and field.db_constraint
154        ):
155            return False
156        return not self._is_limited_data_type(field)
157
158    def _create_missing_fk_index(
159        self,
160        model,
161        *,
162        fields,
163        expressions=None,
164    ):
165        """
166        MySQL can remove an implicit FK index on a field when that field is
167        covered by another index. "covered" here means
168        that the more complex index has the FK field as its first field (see
169        https://bugs.mysql.com/bug.php?id=37910).
170
171        Manually create an implicit FK index to make it possible to remove the
172        composed index.
173        """
174        first_field_name = None
175        if fields:
176            first_field_name = fields[0]
177        elif (
178            expressions
179            and self.connection.features.supports_expression_indexes
180            and isinstance(expressions[0], F)
181            and LOOKUP_SEP not in expressions[0].name
182        ):
183            first_field_name = expressions[0].name
184
185        if not first_field_name:
186            return
187
188        first_field = model._meta.get_field(first_field_name)
189        if first_field.get_internal_type() == "ForeignKey":
190            column = self.connection.introspection.identifier_converter(
191                first_field.column
192            )
193            with self.connection.cursor() as cursor:
194                constraint_names = [
195                    name
196                    for name, infodict in self.connection.introspection.get_constraints(
197                        cursor, model._meta.db_table
198                    ).items()
199                    if infodict["index"] and infodict["columns"][0] == column
200                ]
201            # There are no other indexes that starts with the FK field, only
202            # the index that is expected to be deleted.
203            if len(constraint_names) == 1:
204                self.execute(
205                    self._create_index_sql(model, fields=[first_field], suffix="")
206                )
207
208    def _set_field_new_type_null_status(self, field, new_type):
209        """
210        Keep the null property of the old field. If it has changed, it will be
211        handled separately.
212        """
213        if field.allow_null:
214            new_type += " NULL"
215        else:
216            new_type += " NOT NULL"
217        return new_type
218
219    def _alter_column_type_sql(
220        self, model, old_field, new_field, new_type, old_collation, new_collation
221    ):
222        new_type = self._set_field_new_type_null_status(old_field, new_type)
223        return super()._alter_column_type_sql(
224            model, old_field, new_field, new_type, old_collation, new_collation
225        )
226
227    def _field_db_check(self, field, field_db_params):
228        if self.connection.mysql_is_mariadb and self.connection.mysql_version >= (
229            10,
230            5,
231            2,
232        ):
233            return super()._field_db_check(field, field_db_params)
234        # On MySQL and MariaDB < 10.5.2 (no support for
235        # "ALTER TABLE ... RENAME COLUMN" statements), check constraints with
236        # the column name as it requires explicit recreation when the column is
237        # renamed.
238        return field_db_params["check"]
239
240    def _rename_field_sql(self, table, old_field, new_field, new_type):
241        new_type = self._set_field_new_type_null_status(old_field, new_type)
242        return super()._rename_field_sql(table, old_field, new_field, new_type)
243
244    def _alter_column_comment_sql(self, model, new_field, new_type, new_db_comment):
245        # Comment is alter when altering the column type.
246        return "", []
247
248    def _comment_sql(self, comment):
249        comment_sql = super()._comment_sql(comment)
250        return f" COMMENT {comment_sql}"