1from __future__ import annotations
2
3from typing import TYPE_CHECKING, Any
4
5from plain.models.backends.base.schema import BaseDatabaseSchemaEditor
6from plain.models.constants import LOOKUP_SEP
7from plain.models.constraints import UniqueConstraint
8from plain.models.expressions import F
9from plain.models.fields import NOT_PROVIDED, DbParameters
10from plain.models.fields.related import ForeignKeyField
11
12if TYPE_CHECKING:
13 from collections.abc import Sequence
14
15 from plain.models.backends.mysql.base import MySQLDatabaseWrapper
16 from plain.models.base import Model
17 from plain.models.constraints import BaseConstraint
18 from plain.models.fields import Field
19 from plain.models.indexes import Index
20
21
22class DatabaseSchemaEditor(BaseDatabaseSchemaEditor):
23 # Type checker hint: connection is always MySQLDatabaseWrapper in this class
24 connection: MySQLDatabaseWrapper
25
26 sql_rename_table = "RENAME TABLE %(old_table)s TO %(new_table)s"
27
28 sql_alter_column_null = "MODIFY %(column)s %(type)s NULL"
29 sql_alter_column_not_null = "MODIFY %(column)s %(type)s NOT NULL"
30 sql_alter_column_type = "MODIFY %(column)s %(type)s"
31 sql_alter_column_no_default_null = "ALTER COLUMN %(column)s SET DEFAULT NULL"
32
33 # No 'CASCADE' which works as a no-op in MySQL but is undocumented
34 sql_delete_column = "ALTER TABLE %(table)s DROP COLUMN %(column)s"
35
36 sql_delete_unique = "ALTER TABLE %(table)s DROP INDEX %(name)s"
37 sql_create_column_inline_fk = (
38 ", ADD CONSTRAINT %(name)s FOREIGN KEY (%(column)s) "
39 "REFERENCES %(to_table)s(%(to_column)s)"
40 )
41 sql_delete_fk = "ALTER TABLE %(table)s DROP FOREIGN KEY %(name)s"
42
43 sql_delete_index = "DROP INDEX %(name)s ON %(table)s"
44 sql_rename_index = "ALTER TABLE %(table)s RENAME INDEX %(old_name)s TO %(new_name)s"
45
46 sql_create_pk = (
47 "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s PRIMARY KEY (%(columns)s)"
48 )
49 sql_delete_pk = "ALTER TABLE %(table)s DROP PRIMARY KEY"
50
51 sql_create_index = "CREATE INDEX %(name)s ON %(table)s (%(columns)s)%(extra)s"
52
53 @property
54 def sql_delete_check(self) -> str:
55 if self.connection.mysql_is_mariadb:
56 # The name of the column check constraint is the same as the field
57 # name on MariaDB. Adding IF EXISTS clause prevents migrations
58 # crash. Constraint is removed during a "MODIFY" column statement.
59 return "ALTER TABLE %(table)s DROP CONSTRAINT IF EXISTS %(name)s"
60 return "ALTER TABLE %(table)s DROP CHECK %(name)s"
61
62 @property
63 def sql_rename_column(self) -> str:
64 # MariaDB >= 10.5.2 and MySQL >= 8.0.4 support an
65 # "ALTER TABLE ... RENAME COLUMN" statement.
66 if self.connection.mysql_is_mariadb:
67 if self.connection.mysql_version >= (10, 5, 2):
68 return super().sql_rename_column
69 elif self.connection.mysql_version >= (8, 0, 4):
70 return super().sql_rename_column
71 return "ALTER TABLE %(table)s CHANGE %(old_column)s %(new_column)s %(type)s"
72
73 def quote_value(self, value: Any) -> str:
74 self.connection.ensure_connection()
75 if isinstance(value, str):
76 value = value.replace("%", "%%")
77 # MySQLdb escapes to string, PyMySQL to bytes.
78 quoted = self.connection.connection.escape(
79 value, self.connection.connection.encoders
80 )
81 if isinstance(value, str) and isinstance(quoted, bytes):
82 quoted = quoted.decode()
83 return quoted
84
85 def _is_limited_data_type(self, field: Field) -> bool:
86 db_type = field.db_type(self.connection)
87 return (
88 db_type is not None
89 and db_type.lower() in self.connection._limited_data_types
90 )
91
92 def skip_default(self, field: Field) -> bool:
93 if not self._supports_limited_data_type_defaults:
94 return self._is_limited_data_type(field)
95 return False
96
97 def skip_default_on_alter(self, field: Field) -> bool:
98 if self._is_limited_data_type(field) and not self.connection.mysql_is_mariadb:
99 # MySQL doesn't support defaults for BLOB and TEXT in the
100 # ALTER COLUMN statement.
101 return True
102 return False
103
104 @property
105 def _supports_limited_data_type_defaults(self) -> bool:
106 # MariaDB and MySQL >= 8.0.13 support defaults for BLOB and TEXT.
107 if self.connection.mysql_is_mariadb:
108 return True
109 return self.connection.mysql_version >= (8, 0, 13)
110
111 def _column_default_sql(self, field: Field) -> str:
112 if (
113 not self.connection.mysql_is_mariadb
114 and self._supports_limited_data_type_defaults
115 and self._is_limited_data_type(field)
116 ):
117 # MySQL supports defaults for BLOB and TEXT columns only if the
118 # default value is written as an expression i.e. in parentheses.
119 return "(%s)"
120 return super()._column_default_sql(field)
121
122 def add_field(self, model: type[Model], field: Field) -> None:
123 super().add_field(model, field)
124
125 # Simulate the effect of a one-off default.
126 # field.default may be unhashable, so a set isn't used for "in" check.
127 if self.skip_default(field) and field.default not in (None, NOT_PROVIDED):
128 effective_default = self.effective_default(field)
129 self.execute(
130 f"UPDATE {self.quote_name(model.model_options.db_table)} SET {self.quote_name(field.column)} = %s",
131 [effective_default],
132 )
133
134 def remove_constraint(self, model: type[Model], constraint: BaseConstraint) -> None:
135 if (
136 isinstance(constraint, UniqueConstraint)
137 and constraint.create_sql(model, self) is not None
138 ):
139 self._create_missing_fk_index(
140 model,
141 fields=constraint.fields,
142 expressions=constraint.expressions,
143 )
144 super().remove_constraint(model, constraint)
145
146 def remove_index(self, model: type[Model], index: Index) -> None:
147 self._create_missing_fk_index(
148 model,
149 fields=[field_name for field_name, _ in index.fields_orders],
150 expressions=index.expressions,
151 )
152 super().remove_index(model, index)
153
154 def _field_should_be_indexed(self, model: type[Model], field: Field) -> bool:
155 if not super()._field_should_be_indexed(model, field):
156 return False
157
158 storage = self.connection.introspection.get_storage_engine(
159 self.connection.cursor(), model.model_options.db_table
160 )
161 # No need to create an index for ForeignKeyField fields except if
162 # db_constraint=False because the index from that constraint won't be
163 # created.
164 if (
165 storage == "InnoDB"
166 and isinstance(field, ForeignKeyField)
167 and field.db_constraint
168 ):
169 return False
170 return not self._is_limited_data_type(field)
171
172 def _create_missing_fk_index(
173 self,
174 model: type[Model],
175 *,
176 fields: Sequence[str],
177 expressions: Sequence[Any] | None = None,
178 ) -> None:
179 """
180 MySQL can remove an implicit FK index on a field when that field is
181 covered by another index. "covered" here means
182 that the more complex index has the FK field as its first field (see
183 https://bugs.mysql.com/bug.php?id=37910).
184
185 Manually create an implicit FK index to make it possible to remove the
186 composed index.
187 """
188 first_field_name = None
189 if fields:
190 first_field_name = fields[0]
191 elif (
192 expressions
193 and self.connection.features.supports_expression_indexes
194 and isinstance(expressions[0], F)
195 and LOOKUP_SEP not in expressions[0].name
196 ):
197 first_field_name = expressions[0].name
198
199 if not first_field_name:
200 return
201
202 first_field = model._model_meta.get_forward_field(first_field_name)
203 if first_field.get_internal_type() == "ForeignKeyField":
204 column = self.connection.introspection.identifier_converter(
205 first_field.column
206 )
207 with self.connection.cursor() as cursor:
208 constraint_names = [
209 name
210 for name, infodict in self.connection.introspection.get_constraints(
211 cursor, model.model_options.db_table
212 ).items()
213 if infodict["index"] and infodict["columns"][0] == column
214 ]
215 # There are no other indexes that starts with the FK field, only
216 # the index that is expected to be deleted.
217 if len(constraint_names) == 1:
218 self.execute(
219 self._create_index_sql(model, fields=[first_field], suffix="")
220 )
221
222 def _set_field_new_type_null_status(self, field: Field, new_type: str) -> str:
223 """
224 Keep the null property of the old field. If it has changed, it will be
225 handled separately.
226 """
227 if field.allow_null:
228 new_type += " NULL"
229 else:
230 new_type += " NOT NULL"
231 return new_type
232
233 def _alter_column_type_sql(
234 self,
235 model: type[Model],
236 old_field: Field,
237 new_field: Field,
238 new_type: str,
239 ) -> tuple[tuple[str, list[Any]], list[tuple[str, list[Any]]]]:
240 new_type = self._set_field_new_type_null_status(old_field, new_type)
241 return super()._alter_column_type_sql(model, old_field, new_field, new_type)
242
243 def _field_db_check(
244 self, field: Field, field_db_params: DbParameters
245 ) -> str | None:
246 if self.connection.mysql_is_mariadb and self.connection.mysql_version >= (
247 10,
248 5,
249 2,
250 ):
251 return super()._field_db_check(field, field_db_params)
252 # On MySQL and MariaDB < 10.5.2 (no support for
253 # "ALTER TABLE ... RENAME COLUMN" statements), check constraints with
254 # the column name as it requires explicit recreation when the column is
255 # renamed.
256 return field_db_params["check"]
257
258 def _rename_field_sql(
259 self, table: str, old_field: Field, new_field: Field, new_type: str
260 ) -> str:
261 new_type = self._set_field_new_type_null_status(old_field, new_type)
262 return super()._rename_field_sql(table, old_field, new_field, new_type)