Plain is headed towards 1.0! Subscribe for development updates →

  1from plain.models.backends.base.schema import BaseDatabaseSchemaEditor
  2from plain.models.constants import LOOKUP_SEP
  3from plain.models.fields import NOT_PROVIDED, F, UniqueConstraint
  6class DatabaseSchemaEditor(BaseDatabaseSchemaEditor):
  7    sql_rename_table = "RENAME TABLE %(old_table)s TO %(new_table)s"
  9    sql_alter_column_null = "MODIFY %(column)s %(type)s NULL"
 10    sql_alter_column_not_null = "MODIFY %(column)s %(type)s NOT NULL"
 11    sql_alter_column_type = "MODIFY %(column)s %(type)s%(collation)s%(comment)s"
 12    sql_alter_column_no_default_null = "ALTER COLUMN %(column)s SET DEFAULT NULL"
 14    # No 'CASCADE' which works as a no-op in MySQL but is undocumented
 15    sql_delete_column = "ALTER TABLE %(table)s DROP COLUMN %(column)s"
 17    sql_delete_unique = "ALTER TABLE %(table)s DROP INDEX %(name)s"
 18    sql_create_column_inline_fk = (
 19        ", ADD CONSTRAINT %(name)s FOREIGN KEY (%(column)s) "
 20        "REFERENCES %(to_table)s(%(to_column)s)"
 21    )
 22    sql_delete_fk = "ALTER TABLE %(table)s DROP FOREIGN KEY %(name)s"
 24    sql_delete_index = "DROP INDEX %(name)s ON %(table)s"
 25    sql_rename_index = "ALTER TABLE %(table)s RENAME INDEX %(old_name)s TO %(new_name)s"
 27    sql_create_pk = (
 28        "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s PRIMARY KEY (%(columns)s)"
 29    )
 30    sql_delete_pk = "ALTER TABLE %(table)s DROP PRIMARY KEY"
 32    sql_create_index = "CREATE INDEX %(name)s ON %(table)s (%(columns)s)%(extra)s"
 34    sql_alter_table_comment = "ALTER TABLE %(table)s COMMENT = %(comment)s"
 35    sql_alter_column_comment = None
 37    @property
 38    def sql_delete_check(self):
 39        if self.connection.mysql_is_mariadb:
 40            # The name of the column check constraint is the same as the field
 41            # name on MariaDB. Adding IF EXISTS clause prevents migrations
 42            # crash. Constraint is removed during a "MODIFY" column statement.
 43            return "ALTER TABLE %(table)s DROP CONSTRAINT IF EXISTS %(name)s"
 44        return "ALTER TABLE %(table)s DROP CHECK %(name)s"
 46    @property
 47    def sql_rename_column(self):
 48        # MariaDB >= 10.5.2 and MySQL >= 8.0.4 support an
 49        # "ALTER TABLE ... RENAME COLUMN" statement.
 50        if self.connection.mysql_is_mariadb:
 51            if self.connection.mysql_version >= (10, 5, 2):
 52                return super().sql_rename_column
 53        elif self.connection.mysql_version >= (8, 0, 4):
 54            return super().sql_rename_column
 55        return "ALTER TABLE %(table)s CHANGE %(old_column)s %(new_column)s %(type)s"
 57    def quote_value(self, value):
 58        self.connection.ensure_connection()
 59        if isinstance(value, str):
 60            value = value.replace("%", "%%")
 61        # MySQLdb escapes to string, PyMySQL to bytes.
 62        quoted = self.connection.connection.escape(
 63            value, self.connection.connection.encoders
 64        )
 65        if isinstance(value, str) and isinstance(quoted, bytes):
 66            quoted = quoted.decode()
 67        return quoted
 69    def _is_limited_data_type(self, field):
 70        db_type = field.db_type(self.connection)
 71        return (
 72            db_type is not None
 73            and db_type.lower() in self.connection._limited_data_types
 74        )
 76    def skip_default(self, field):
 77        if not self._supports_limited_data_type_defaults:
 78            return self._is_limited_data_type(field)
 79        return False
 81    def skip_default_on_alter(self, field):
 82        if self._is_limited_data_type(field) and not self.connection.mysql_is_mariadb:
 83            # MySQL doesn't support defaults for BLOB and TEXT in the
 84            # ALTER COLUMN statement.
 85            return True
 86        return False
 88    @property
 89    def _supports_limited_data_type_defaults(self):
 90        # MariaDB and MySQL >= 8.0.13 support defaults for BLOB and TEXT.
 91        if self.connection.mysql_is_mariadb:
 92            return True
 93        return self.connection.mysql_version >= (8, 0, 13)
 95    def _column_default_sql(self, field):
 96        if (
 97            not self.connection.mysql_is_mariadb
 98            and self._supports_limited_data_type_defaults
 99            and self._is_limited_data_type(field)
100        ):
101            # MySQL supports defaults for BLOB and TEXT columns only if the
102            # default value is written as an expression i.e. in parentheses.
103            return "(%s)"
104        return super()._column_default_sql(field)
106    def add_field(self, model, field):
107        super().add_field(model, field)
109        # Simulate the effect of a one-off default.
110        # field.default may be unhashable, so a set isn't used for "in" check.
111        if self.skip_default(field) and field.default not in (None, NOT_PROVIDED):
112            effective_default = self.effective_default(field)
113            self.execute(
114                "UPDATE {table} SET {column} = %s".format(
115                    table=self.quote_name(model._meta.db_table),
116                    column=self.quote_name(field.column),
117                ),
118                [effective_default],
119            )
121    def remove_constraint(self, model, constraint):
122        if (
123            isinstance(constraint, UniqueConstraint)
124            and constraint.create_sql(model, self) is not None
125        ):
126            self._create_missing_fk_index(
127                model,
128                fields=constraint.fields,
129                expressions=constraint.expressions,
130            )
131        super().remove_constraint(model, constraint)
133    def remove_index(self, model, index):
134        self._create_missing_fk_index(
135            model,
136            fields=[field_name for field_name, _ in index.fields_orders],
137            expressions=index.expressions,
138        )
139        super().remove_index(model, index)
141    def _field_should_be_indexed(self, model, field):
142        if not super()._field_should_be_indexed(model, field):
143            return False
145        storage = self.connection.introspection.get_storage_engine(
146            self.connection.cursor(), model._meta.db_table
147        )
148        # No need to create an index for ForeignKey fields except if
149        # db_constraint=False because the index from that constraint won't be
150        # created.
151        if (
152            storage == "InnoDB"
153            and field.get_internal_type() == "ForeignKey"
154            and field.db_constraint
155        ):
156            return False
157        return not self._is_limited_data_type(field)
159    def _create_missing_fk_index(
160        self,
161        model,
162        *,
163        fields,
164        expressions=None,
165    ):
166        """
167        MySQL can remove an implicit FK index on a field when that field is
168        covered by another index. "covered" here means
169        that the more complex index has the FK field as its first field (see
172        Manually create an implicit FK index to make it possible to remove the
173        composed index.
174        """
175        first_field_name = None
176        if fields:
177            first_field_name = fields[0]
178        elif (
179            expressions
180            and self.connection.features.supports_expression_indexes
181            and isinstance(expressions[0], F)
182            and LOOKUP_SEP not in expressions[0].name
183        ):
184            first_field_name = expressions[0].name
186        if not first_field_name:
187            return
189        first_field = model._meta.get_field(first_field_name)
190        if first_field.get_internal_type() == "ForeignKey":
191            column = self.connection.introspection.identifier_converter(
192                first_field.column
193            )
194            with self.connection.cursor() as cursor:
195                constraint_names = [
196                    name
197                    for name, infodict in self.connection.introspection.get_constraints(
198                        cursor, model._meta.db_table
199                    ).items()
200                    if infodict["index"] and infodict["columns"][0] == column
201                ]
202            # There are no other indexes that starts with the FK field, only
203            # the index that is expected to be deleted.
204            if len(constraint_names) == 1:
205                self.execute(
206                    self._create_index_sql(model, fields=[first_field], suffix="")
207                )
209    def _set_field_new_type_null_status(self, field, new_type):
210        """
211        Keep the null property of the old field. If it has changed, it will be
212        handled separately.
213        """
214        if field.null:
215            new_type += " NULL"
216        else:
217            new_type += " NOT NULL"
218        return new_type
220    def _alter_column_type_sql(
221        self, model, old_field, new_field, new_type, old_collation, new_collation
222    ):
223        new_type = self._set_field_new_type_null_status(old_field, new_type)
224        return super()._alter_column_type_sql(
225            model, old_field, new_field, new_type, old_collation, new_collation
226        )
228    def _field_db_check(self, field, field_db_params):
229        if self.connection.mysql_is_mariadb and self.connection.mysql_version >= (
230            10,
231            5,
232            2,
233        ):
234            return super()._field_db_check(field, field_db_params)
235        # On MySQL and MariaDB < 10.5.2 (no support for
236        # "ALTER TABLE ... RENAME COLUMN" statements), check constraints with
237        # the column name as it requires explicit recreation when the column is
238        # renamed.
239        return field_db_params["check"]
241    def _rename_field_sql(self, table, old_field, new_field, new_type):
242        new_type = self._set_field_new_type_null_status(old_field, new_type)
243        return super()._rename_field_sql(table, old_field, new_field, new_type)
245    def _alter_column_comment_sql(self, model, new_field, new_type, new_db_comment):
246        # Comment is alter when altering the column type.
247        return "", []
249    def _comment_sql(self, comment):
250        comment_sql = super()._comment_sql(comment)
251        return f" COMMENT {comment_sql}"