Plain is headed towards 1.0! Subscribe for development updates →

  1from plain.models.backends.base.schema import BaseDatabaseSchemaEditor
  2from plain.models.constants import LOOKUP_SEP
  3from plain.models.fields import NOT_PROVIDED, F, UniqueConstraint
  4
  5
  6class DatabaseSchemaEditor(BaseDatabaseSchemaEditor):
  7    sql_rename_table = "RENAME TABLE %(old_table)s TO %(new_table)s"
  8
  9    sql_alter_column_null = "MODIFY %(column)s %(type)s NULL"
 10    sql_alter_column_not_null = "MODIFY %(column)s %(type)s NOT NULL"
 11    sql_alter_column_type = "MODIFY %(column)s %(type)s%(collation)s%(comment)s"
 12    sql_alter_column_no_default_null = "ALTER COLUMN %(column)s SET DEFAULT NULL"
 13
 14    # No 'CASCADE' which works as a no-op in MySQL but is undocumented
 15    sql_delete_column = "ALTER TABLE %(table)s DROP COLUMN %(column)s"
 16
 17    sql_delete_unique = "ALTER TABLE %(table)s DROP INDEX %(name)s"
 18    sql_create_column_inline_fk = (
 19        ", ADD CONSTRAINT %(name)s FOREIGN KEY (%(column)s) "
 20        "REFERENCES %(to_table)s(%(to_column)s)"
 21    )
 22    sql_delete_fk = "ALTER TABLE %(table)s DROP FOREIGN KEY %(name)s"
 23
 24    sql_delete_index = "DROP INDEX %(name)s ON %(table)s"
 25    sql_rename_index = "ALTER TABLE %(table)s RENAME INDEX %(old_name)s TO %(new_name)s"
 26
 27    sql_create_pk = (
 28        "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s PRIMARY KEY (%(columns)s)"
 29    )
 30    sql_delete_pk = "ALTER TABLE %(table)s DROP PRIMARY KEY"
 31
 32    sql_create_index = "CREATE INDEX %(name)s ON %(table)s (%(columns)s)%(extra)s"
 33
 34    sql_alter_table_comment = "ALTER TABLE %(table)s COMMENT = %(comment)s"
 35    sql_alter_column_comment = None
 36
 37    @property
 38    def sql_delete_check(self):
 39        if self.connection.mysql_is_mariadb:
 40            # The name of the column check constraint is the same as the field
 41            # name on MariaDB. Adding IF EXISTS clause prevents migrations
 42            # crash. Constraint is removed during a "MODIFY" column statement.
 43            return "ALTER TABLE %(table)s DROP CONSTRAINT IF EXISTS %(name)s"
 44        return "ALTER TABLE %(table)s DROP CHECK %(name)s"
 45
 46    @property
 47    def sql_rename_column(self):
 48        # MariaDB >= 10.5.2 and MySQL >= 8.0.4 support an
 49        # "ALTER TABLE ... RENAME COLUMN" statement.
 50        if self.connection.mysql_is_mariadb:
 51            if self.connection.mysql_version >= (10, 5, 2):
 52                return super().sql_rename_column
 53        elif self.connection.mysql_version >= (8, 0, 4):
 54            return super().sql_rename_column
 55        return "ALTER TABLE %(table)s CHANGE %(old_column)s %(new_column)s %(type)s"
 56
 57    def quote_value(self, value):
 58        self.connection.ensure_connection()
 59        if isinstance(value, str):
 60            value = value.replace("%", "%%")
 61        # MySQLdb escapes to string, PyMySQL to bytes.
 62        quoted = self.connection.connection.escape(
 63            value, self.connection.connection.encoders
 64        )
 65        if isinstance(value, str) and isinstance(quoted, bytes):
 66            quoted = quoted.decode()
 67        return quoted
 68
 69    def _is_limited_data_type(self, field):
 70        db_type = field.db_type(self.connection)
 71        return (
 72            db_type is not None
 73            and db_type.lower() in self.connection._limited_data_types
 74        )
 75
 76    def skip_default(self, field):
 77        if not self._supports_limited_data_type_defaults:
 78            return self._is_limited_data_type(field)
 79        return False
 80
 81    def skip_default_on_alter(self, field):
 82        if self._is_limited_data_type(field) and not self.connection.mysql_is_mariadb:
 83            # MySQL doesn't support defaults for BLOB and TEXT in the
 84            # ALTER COLUMN statement.
 85            return True
 86        return False
 87
 88    @property
 89    def _supports_limited_data_type_defaults(self):
 90        # MariaDB and MySQL >= 8.0.13 support defaults for BLOB and TEXT.
 91        if self.connection.mysql_is_mariadb:
 92            return True
 93        return self.connection.mysql_version >= (8, 0, 13)
 94
 95    def _column_default_sql(self, field):
 96        if (
 97            not self.connection.mysql_is_mariadb
 98            and self._supports_limited_data_type_defaults
 99            and self._is_limited_data_type(field)
100        ):
101            # MySQL supports defaults for BLOB and TEXT columns only if the
102            # default value is written as an expression i.e. in parentheses.
103            return "(%s)"
104        return super()._column_default_sql(field)
105
106    def add_field(self, model, field):
107        super().add_field(model, field)
108
109        # Simulate the effect of a one-off default.
110        # field.default may be unhashable, so a set isn't used for "in" check.
111        if self.skip_default(field) and field.default not in (None, NOT_PROVIDED):
112            effective_default = self.effective_default(field)
113            self.execute(
114                "UPDATE {table} SET {column} = %s".format(
115                    table=self.quote_name(model._meta.db_table),
116                    column=self.quote_name(field.column),
117                ),
118                [effective_default],
119            )
120
121    def remove_constraint(self, model, constraint):
122        if (
123            isinstance(constraint, UniqueConstraint)
124            and constraint.create_sql(model, self) is not None
125        ):
126            self._create_missing_fk_index(
127                model,
128                fields=constraint.fields,
129                expressions=constraint.expressions,
130            )
131        super().remove_constraint(model, constraint)
132
133    def remove_index(self, model, index):
134        self._create_missing_fk_index(
135            model,
136            fields=[field_name for field_name, _ in index.fields_orders],
137            expressions=index.expressions,
138        )
139        super().remove_index(model, index)
140
141    def _field_should_be_indexed(self, model, field):
142        if not super()._field_should_be_indexed(model, field):
143            return False
144
145        storage = self.connection.introspection.get_storage_engine(
146            self.connection.cursor(), model._meta.db_table
147        )
148        # No need to create an index for ForeignKey fields except if
149        # db_constraint=False because the index from that constraint won't be
150        # created.
151        if (
152            storage == "InnoDB"
153            and field.get_internal_type() == "ForeignKey"
154            and field.db_constraint
155        ):
156            return False
157        return not self._is_limited_data_type(field)
158
159    def _create_missing_fk_index(
160        self,
161        model,
162        *,
163        fields,
164        expressions=None,
165    ):
166        """
167        MySQL can remove an implicit FK index on a field when that field is
168        covered by another index. "covered" here means
169        that the more complex index has the FK field as its first field (see
170        https://bugs.mysql.com/bug.php?id=37910).
171
172        Manually create an implicit FK index to make it possible to remove the
173        composed index.
174        """
175        first_field_name = None
176        if fields:
177            first_field_name = fields[0]
178        elif (
179            expressions
180            and self.connection.features.supports_expression_indexes
181            and isinstance(expressions[0], F)
182            and LOOKUP_SEP not in expressions[0].name
183        ):
184            first_field_name = expressions[0].name
185
186        if not first_field_name:
187            return
188
189        first_field = model._meta.get_field(first_field_name)
190        if first_field.get_internal_type() == "ForeignKey":
191            column = self.connection.introspection.identifier_converter(
192                first_field.column
193            )
194            with self.connection.cursor() as cursor:
195                constraint_names = [
196                    name
197                    for name, infodict in self.connection.introspection.get_constraints(
198                        cursor, model._meta.db_table
199                    ).items()
200                    if infodict["index"] and infodict["columns"][0] == column
201                ]
202            # There are no other indexes that starts with the FK field, only
203            # the index that is expected to be deleted.
204            if len(constraint_names) == 1:
205                self.execute(
206                    self._create_index_sql(model, fields=[first_field], suffix="")
207                )
208
209    def _set_field_new_type_null_status(self, field, new_type):
210        """
211        Keep the null property of the old field. If it has changed, it will be
212        handled separately.
213        """
214        if field.null:
215            new_type += " NULL"
216        else:
217            new_type += " NOT NULL"
218        return new_type
219
220    def _alter_column_type_sql(
221        self, model, old_field, new_field, new_type, old_collation, new_collation
222    ):
223        new_type = self._set_field_new_type_null_status(old_field, new_type)
224        return super()._alter_column_type_sql(
225            model, old_field, new_field, new_type, old_collation, new_collation
226        )
227
228    def _field_db_check(self, field, field_db_params):
229        if self.connection.mysql_is_mariadb and self.connection.mysql_version >= (
230            10,
231            5,
232            2,
233        ):
234            return super()._field_db_check(field, field_db_params)
235        # On MySQL and MariaDB < 10.5.2 (no support for
236        # "ALTER TABLE ... RENAME COLUMN" statements), check constraints with
237        # the column name as it requires explicit recreation when the column is
238        # renamed.
239        return field_db_params["check"]
240
241    def _rename_field_sql(self, table, old_field, new_field, new_type):
242        new_type = self._set_field_new_type_null_status(old_field, new_type)
243        return super()._rename_field_sql(table, old_field, new_field, new_type)
244
245    def _alter_column_comment_sql(self, model, new_field, new_type, new_db_comment):
246        # Comment is alter when altering the column type.
247        return "", []
248
249    def _comment_sql(self, comment):
250        comment_sql = super()._comment_sql(comment)
251        return f" COMMENT {comment_sql}"