1from __future__ import annotations
2
3from typing import TYPE_CHECKING, Any
4
5from plain.models.backends.base.schema import BaseDatabaseSchemaEditor
6from plain.models.constants import LOOKUP_SEP
7from plain.models.constraints import UniqueConstraint
8from plain.models.expressions import F
9from plain.models.fields import NOT_PROVIDED
10
11if TYPE_CHECKING:
12 from collections.abc import Sequence
13
14 from plain.models.backends.mysql.base import MySQLDatabaseWrapper
15 from plain.models.base import Model
16 from plain.models.constraints import BaseConstraint
17 from plain.models.fields import Field
18 from plain.models.indexes import Index
19
20
21class DatabaseSchemaEditor(BaseDatabaseSchemaEditor):
22 # Type checker hint: connection is always MySQLDatabaseWrapper in this class
23 connection: MySQLDatabaseWrapper
24
25 sql_rename_table = "RENAME TABLE %(old_table)s TO %(new_table)s"
26
27 sql_alter_column_null = "MODIFY %(column)s %(type)s NULL"
28 sql_alter_column_not_null = "MODIFY %(column)s %(type)s NOT NULL"
29 sql_alter_column_type = "MODIFY %(column)s %(type)s%(collation)s%(comment)s"
30 sql_alter_column_no_default_null = "ALTER COLUMN %(column)s SET DEFAULT NULL"
31
32 # No 'CASCADE' which works as a no-op in MySQL but is undocumented
33 sql_delete_column = "ALTER TABLE %(table)s DROP COLUMN %(column)s"
34
35 sql_delete_unique = "ALTER TABLE %(table)s DROP INDEX %(name)s"
36 sql_create_column_inline_fk = (
37 ", ADD CONSTRAINT %(name)s FOREIGN KEY (%(column)s) "
38 "REFERENCES %(to_table)s(%(to_column)s)"
39 )
40 sql_delete_fk = "ALTER TABLE %(table)s DROP FOREIGN KEY %(name)s"
41
42 sql_delete_index = "DROP INDEX %(name)s ON %(table)s"
43 sql_rename_index = "ALTER TABLE %(table)s RENAME INDEX %(old_name)s TO %(new_name)s"
44
45 sql_create_pk = (
46 "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s PRIMARY KEY (%(columns)s)"
47 )
48 sql_delete_pk = "ALTER TABLE %(table)s DROP PRIMARY KEY"
49
50 sql_create_index = "CREATE INDEX %(name)s ON %(table)s (%(columns)s)%(extra)s"
51
52 sql_alter_table_comment = "ALTER TABLE %(table)s COMMENT = %(comment)s"
53 sql_alter_column_comment = None
54
55 @property
56 def sql_delete_check(self) -> str:
57 if self.connection.mysql_is_mariadb:
58 # The name of the column check constraint is the same as the field
59 # name on MariaDB. Adding IF EXISTS clause prevents migrations
60 # crash. Constraint is removed during a "MODIFY" column statement.
61 return "ALTER TABLE %(table)s DROP CONSTRAINT IF EXISTS %(name)s"
62 return "ALTER TABLE %(table)s DROP CHECK %(name)s"
63
64 @property
65 def sql_rename_column(self) -> str:
66 # MariaDB >= 10.5.2 and MySQL >= 8.0.4 support an
67 # "ALTER TABLE ... RENAME COLUMN" statement.
68 if self.connection.mysql_is_mariadb:
69 if self.connection.mysql_version >= (10, 5, 2):
70 return super().sql_rename_column
71 elif self.connection.mysql_version >= (8, 0, 4):
72 return super().sql_rename_column
73 return "ALTER TABLE %(table)s CHANGE %(old_column)s %(new_column)s %(type)s"
74
75 def quote_value(self, value: Any) -> str:
76 self.connection.ensure_connection()
77 if isinstance(value, str):
78 value = value.replace("%", "%%")
79 # MySQLdb escapes to string, PyMySQL to bytes.
80 quoted = self.connection.connection.escape(
81 value, self.connection.connection.encoders
82 )
83 if isinstance(value, str) and isinstance(quoted, bytes):
84 quoted = quoted.decode()
85 return quoted
86
87 def _is_limited_data_type(self, field: Field) -> bool:
88 db_type = field.db_type(self.connection)
89 return (
90 db_type is not None
91 and db_type.lower() in self.connection._limited_data_types
92 )
93
94 def skip_default(self, field: Field) -> bool:
95 if not self._supports_limited_data_type_defaults:
96 return self._is_limited_data_type(field)
97 return False
98
99 def skip_default_on_alter(self, field: Field) -> bool:
100 if self._is_limited_data_type(field) and not self.connection.mysql_is_mariadb:
101 # MySQL doesn't support defaults for BLOB and TEXT in the
102 # ALTER COLUMN statement.
103 return True
104 return False
105
106 @property
107 def _supports_limited_data_type_defaults(self) -> bool:
108 # MariaDB and MySQL >= 8.0.13 support defaults for BLOB and TEXT.
109 if self.connection.mysql_is_mariadb:
110 return True
111 return self.connection.mysql_version >= (8, 0, 13)
112
113 def _column_default_sql(self, field: Field) -> str:
114 if (
115 not self.connection.mysql_is_mariadb
116 and self._supports_limited_data_type_defaults
117 and self._is_limited_data_type(field)
118 ):
119 # MySQL supports defaults for BLOB and TEXT columns only if the
120 # default value is written as an expression i.e. in parentheses.
121 return "(%s)"
122 return super()._column_default_sql(field)
123
124 def add_field(self, model: type[Model], field: Field) -> None:
125 super().add_field(model, field)
126
127 # Simulate the effect of a one-off default.
128 # field.default may be unhashable, so a set isn't used for "in" check.
129 if self.skip_default(field) and field.default not in (None, NOT_PROVIDED):
130 effective_default = self.effective_default(field)
131 self.execute(
132 f"UPDATE {self.quote_name(model.model_options.db_table)} SET {self.quote_name(field.column)} = %s",
133 [effective_default],
134 )
135
136 def remove_constraint(self, model: type[Model], constraint: BaseConstraint) -> None:
137 if (
138 isinstance(constraint, UniqueConstraint)
139 and constraint.create_sql(model, self) is not None
140 ):
141 self._create_missing_fk_index(
142 model,
143 fields=constraint.fields,
144 expressions=constraint.expressions,
145 )
146 super().remove_constraint(model, constraint)
147
148 def remove_index(self, model: type[Model], index: Index) -> None:
149 self._create_missing_fk_index(
150 model,
151 fields=[field_name for field_name, _ in index.fields_orders],
152 expressions=index.expressions,
153 )
154 super().remove_index(model, index)
155
156 def _field_should_be_indexed(self, model: type[Model], field: Field) -> bool:
157 if not super()._field_should_be_indexed(model, field):
158 return False
159
160 storage = self.connection.introspection.get_storage_engine(
161 self.connection.cursor(), model.model_options.db_table
162 )
163 # No need to create an index for ForeignKey fields except if
164 # db_constraint=False because the index from that constraint won't be
165 # created.
166 if (
167 storage == "InnoDB"
168 and field.get_internal_type() == "ForeignKey"
169 and field.db_constraint # type: ignore[attr-defined]
170 ):
171 return False
172 return not self._is_limited_data_type(field)
173
174 def _create_missing_fk_index(
175 self,
176 model: type[Model],
177 *,
178 fields: Sequence[str],
179 expressions: Sequence[Any] | None = None,
180 ) -> None:
181 """
182 MySQL can remove an implicit FK index on a field when that field is
183 covered by another index. "covered" here means
184 that the more complex index has the FK field as its first field (see
185 https://bugs.mysql.com/bug.php?id=37910).
186
187 Manually create an implicit FK index to make it possible to remove the
188 composed index.
189 """
190 first_field_name = None
191 if fields:
192 first_field_name = fields[0]
193 elif (
194 expressions
195 and self.connection.features.supports_expression_indexes
196 and isinstance(expressions[0], F)
197 and LOOKUP_SEP not in expressions[0].name
198 ):
199 first_field_name = expressions[0].name
200
201 if not first_field_name:
202 return
203
204 first_field = model._model_meta.get_field(first_field_name)
205 if first_field.get_internal_type() == "ForeignKey":
206 column = self.connection.introspection.identifier_converter(
207 first_field.column
208 )
209 with self.connection.cursor() as cursor:
210 constraint_names = [
211 name
212 for name, infodict in self.connection.introspection.get_constraints(
213 cursor, model.model_options.db_table
214 ).items()
215 if infodict["index"] and infodict["columns"][0] == column
216 ]
217 # There are no other indexes that starts with the FK field, only
218 # the index that is expected to be deleted.
219 if len(constraint_names) == 1:
220 self.execute(
221 self._create_index_sql(model, fields=[first_field], suffix="")
222 )
223
224 def _set_field_new_type_null_status(self, field: Field, new_type: str) -> str:
225 """
226 Keep the null property of the old field. If it has changed, it will be
227 handled separately.
228 """
229 if field.allow_null:
230 new_type += " NULL"
231 else:
232 new_type += " NOT NULL"
233 return new_type
234
235 def _alter_column_type_sql(
236 self,
237 model: type[Model],
238 old_field: Field,
239 new_field: Field,
240 new_type: str,
241 old_collation: str,
242 new_collation: str,
243 ) -> tuple[tuple[str, list[Any]], list[tuple[str, list[Any]]]]:
244 new_type = self._set_field_new_type_null_status(old_field, new_type)
245 return super()._alter_column_type_sql(
246 model, old_field, new_field, new_type, old_collation, new_collation
247 )
248
249 def _field_db_check(
250 self, field: Field, field_db_params: dict[str, Any]
251 ) -> str | None:
252 if self.connection.mysql_is_mariadb and self.connection.mysql_version >= (
253 10,
254 5,
255 2,
256 ):
257 return super()._field_db_check(field, field_db_params)
258 # On MySQL and MariaDB < 10.5.2 (no support for
259 # "ALTER TABLE ... RENAME COLUMN" statements), check constraints with
260 # the column name as it requires explicit recreation when the column is
261 # renamed.
262 return field_db_params["check"]
263
264 def _rename_field_sql(
265 self, table: str, old_field: Field, new_field: Field, new_type: str
266 ) -> str:
267 new_type = self._set_field_new_type_null_status(old_field, new_type)
268 return super()._rename_field_sql(table, old_field, new_field, new_type)
269
270 def _alter_column_comment_sql(
271 self, model: type[Model], new_field: Field, new_type: str, new_db_comment: str
272 ) -> tuple[str, list[Any]]:
273 # Comment is alter when altering the column type.
274 return "", []
275
276 def _comment_sql(self, comment: str | None) -> str:
277 comment_sql = super()._comment_sql(comment)
278 return f" COMMENT {comment_sql}"