1from __future__ import annotations
2
3from typing import TYPE_CHECKING, Any
4
5from plain.models.backends.base.schema import BaseDatabaseSchemaEditor
6from plain.models.constants import LOOKUP_SEP
7from plain.models.constraints import UniqueConstraint
8from plain.models.expressions import F
9from plain.models.fields import NOT_PROVIDED
10from plain.models.fields.related import ForeignKeyField
11
12if TYPE_CHECKING:
13 from collections.abc import Sequence
14
15 from plain.models.backends.mysql.base import MySQLDatabaseWrapper
16 from plain.models.base import Model
17 from plain.models.constraints import BaseConstraint
18 from plain.models.fields import Field
19 from plain.models.indexes import Index
20
21
22class DatabaseSchemaEditor(BaseDatabaseSchemaEditor):
23 # Type checker hint: connection is always MySQLDatabaseWrapper in this class
24 connection: MySQLDatabaseWrapper
25
26 sql_rename_table = "RENAME TABLE %(old_table)s TO %(new_table)s"
27
28 sql_alter_column_null = "MODIFY %(column)s %(type)s NULL"
29 sql_alter_column_not_null = "MODIFY %(column)s %(type)s NOT NULL"
30 sql_alter_column_type = "MODIFY %(column)s %(type)s%(collation)s%(comment)s"
31 sql_alter_column_no_default_null = "ALTER COLUMN %(column)s SET DEFAULT NULL"
32
33 # No 'CASCADE' which works as a no-op in MySQL but is undocumented
34 sql_delete_column = "ALTER TABLE %(table)s DROP COLUMN %(column)s"
35
36 sql_delete_unique = "ALTER TABLE %(table)s DROP INDEX %(name)s"
37 sql_create_column_inline_fk = (
38 ", ADD CONSTRAINT %(name)s FOREIGN KEY (%(column)s) "
39 "REFERENCES %(to_table)s(%(to_column)s)"
40 )
41 sql_delete_fk = "ALTER TABLE %(table)s DROP FOREIGN KEY %(name)s"
42
43 sql_delete_index = "DROP INDEX %(name)s ON %(table)s"
44 sql_rename_index = "ALTER TABLE %(table)s RENAME INDEX %(old_name)s TO %(new_name)s"
45
46 sql_create_pk = (
47 "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s PRIMARY KEY (%(columns)s)"
48 )
49 sql_delete_pk = "ALTER TABLE %(table)s DROP PRIMARY KEY"
50
51 sql_create_index = "CREATE INDEX %(name)s ON %(table)s (%(columns)s)%(extra)s"
52
53 sql_alter_table_comment = "ALTER TABLE %(table)s COMMENT = %(comment)s"
54 sql_alter_column_comment = None
55
56 @property
57 def sql_delete_check(self) -> str:
58 if self.connection.mysql_is_mariadb:
59 # The name of the column check constraint is the same as the field
60 # name on MariaDB. Adding IF EXISTS clause prevents migrations
61 # crash. Constraint is removed during a "MODIFY" column statement.
62 return "ALTER TABLE %(table)s DROP CONSTRAINT IF EXISTS %(name)s"
63 return "ALTER TABLE %(table)s DROP CHECK %(name)s"
64
65 @property
66 def sql_rename_column(self) -> str:
67 # MariaDB >= 10.5.2 and MySQL >= 8.0.4 support an
68 # "ALTER TABLE ... RENAME COLUMN" statement.
69 if self.connection.mysql_is_mariadb:
70 if self.connection.mysql_version >= (10, 5, 2):
71 return super().sql_rename_column
72 elif self.connection.mysql_version >= (8, 0, 4):
73 return super().sql_rename_column
74 return "ALTER TABLE %(table)s CHANGE %(old_column)s %(new_column)s %(type)s"
75
76 def quote_value(self, value: Any) -> str:
77 self.connection.ensure_connection()
78 if isinstance(value, str):
79 value = value.replace("%", "%%")
80 # MySQLdb escapes to string, PyMySQL to bytes.
81 quoted = self.connection.connection.escape(
82 value, self.connection.connection.encoders
83 )
84 if isinstance(value, str) and isinstance(quoted, bytes):
85 quoted = quoted.decode()
86 return quoted
87
88 def _is_limited_data_type(self, field: Field) -> bool:
89 db_type = field.db_type(self.connection)
90 return (
91 db_type is not None
92 and db_type.lower() in self.connection._limited_data_types
93 )
94
95 def skip_default(self, field: Field) -> bool:
96 if not self._supports_limited_data_type_defaults:
97 return self._is_limited_data_type(field)
98 return False
99
100 def skip_default_on_alter(self, field: Field) -> bool:
101 if self._is_limited_data_type(field) and not self.connection.mysql_is_mariadb:
102 # MySQL doesn't support defaults for BLOB and TEXT in the
103 # ALTER COLUMN statement.
104 return True
105 return False
106
107 @property
108 def _supports_limited_data_type_defaults(self) -> bool:
109 # MariaDB and MySQL >= 8.0.13 support defaults for BLOB and TEXT.
110 if self.connection.mysql_is_mariadb:
111 return True
112 return self.connection.mysql_version >= (8, 0, 13)
113
114 def _column_default_sql(self, field: Field) -> str:
115 if (
116 not self.connection.mysql_is_mariadb
117 and self._supports_limited_data_type_defaults
118 and self._is_limited_data_type(field)
119 ):
120 # MySQL supports defaults for BLOB and TEXT columns only if the
121 # default value is written as an expression i.e. in parentheses.
122 return "(%s)"
123 return super()._column_default_sql(field)
124
125 def add_field(self, model: type[Model], field: Field) -> None:
126 super().add_field(model, field)
127
128 # Simulate the effect of a one-off default.
129 # field.default may be unhashable, so a set isn't used for "in" check.
130 if self.skip_default(field) and field.default not in (None, NOT_PROVIDED):
131 effective_default = self.effective_default(field)
132 self.execute(
133 f"UPDATE {self.quote_name(model.model_options.db_table)} SET {self.quote_name(field.column)} = %s",
134 [effective_default],
135 )
136
137 def remove_constraint(self, model: type[Model], constraint: BaseConstraint) -> None:
138 if (
139 isinstance(constraint, UniqueConstraint)
140 and constraint.create_sql(model, self) is not None
141 ):
142 self._create_missing_fk_index(
143 model,
144 fields=constraint.fields,
145 expressions=constraint.expressions,
146 )
147 super().remove_constraint(model, constraint)
148
149 def remove_index(self, model: type[Model], index: Index) -> None:
150 self._create_missing_fk_index(
151 model,
152 fields=[field_name for field_name, _ in index.fields_orders],
153 expressions=index.expressions,
154 )
155 super().remove_index(model, index)
156
157 def _field_should_be_indexed(self, model: type[Model], field: Field) -> bool:
158 if not super()._field_should_be_indexed(model, field):
159 return False
160
161 storage = self.connection.introspection.get_storage_engine(
162 self.connection.cursor(), model.model_options.db_table
163 )
164 # No need to create an index for ForeignKeyField fields except if
165 # db_constraint=False because the index from that constraint won't be
166 # created.
167 if (
168 storage == "InnoDB"
169 and isinstance(field, ForeignKeyField)
170 and field.db_constraint
171 ):
172 return False
173 return not self._is_limited_data_type(field)
174
175 def _create_missing_fk_index(
176 self,
177 model: type[Model],
178 *,
179 fields: Sequence[str],
180 expressions: Sequence[Any] | None = None,
181 ) -> None:
182 """
183 MySQL can remove an implicit FK index on a field when that field is
184 covered by another index. "covered" here means
185 that the more complex index has the FK field as its first field (see
186 https://bugs.mysql.com/bug.php?id=37910).
187
188 Manually create an implicit FK index to make it possible to remove the
189 composed index.
190 """
191 first_field_name = None
192 if fields:
193 first_field_name = fields[0]
194 elif (
195 expressions
196 and self.connection.features.supports_expression_indexes
197 and isinstance(expressions[0], F)
198 and LOOKUP_SEP not in expressions[0].name
199 ):
200 first_field_name = expressions[0].name
201
202 if not first_field_name:
203 return
204
205 first_field = model._model_meta.get_forward_field(first_field_name)
206 if first_field.get_internal_type() == "ForeignKeyField":
207 column = self.connection.introspection.identifier_converter(
208 first_field.column
209 )
210 with self.connection.cursor() as cursor:
211 constraint_names = [
212 name
213 for name, infodict in self.connection.introspection.get_constraints(
214 cursor, model.model_options.db_table
215 ).items()
216 if infodict["index"] and infodict["columns"][0] == column
217 ]
218 # There are no other indexes that starts with the FK field, only
219 # the index that is expected to be deleted.
220 if len(constraint_names) == 1:
221 self.execute(
222 self._create_index_sql(model, fields=[first_field], suffix="")
223 )
224
225 def _set_field_new_type_null_status(self, field: Field, new_type: str) -> str:
226 """
227 Keep the null property of the old field. If it has changed, it will be
228 handled separately.
229 """
230 if field.allow_null:
231 new_type += " NULL"
232 else:
233 new_type += " NOT NULL"
234 return new_type
235
236 def _alter_column_type_sql(
237 self,
238 model: type[Model],
239 old_field: Field,
240 new_field: Field,
241 new_type: str,
242 old_collation: str,
243 new_collation: str,
244 ) -> tuple[tuple[str, list[Any]], list[tuple[str, list[Any]]]]:
245 new_type = self._set_field_new_type_null_status(old_field, new_type)
246 return super()._alter_column_type_sql(
247 model, old_field, new_field, new_type, old_collation, new_collation
248 )
249
250 def _field_db_check(
251 self, field: Field, field_db_params: dict[str, Any]
252 ) -> str | None:
253 if self.connection.mysql_is_mariadb and self.connection.mysql_version >= (
254 10,
255 5,
256 2,
257 ):
258 return super()._field_db_check(field, field_db_params)
259 # On MySQL and MariaDB < 10.5.2 (no support for
260 # "ALTER TABLE ... RENAME COLUMN" statements), check constraints with
261 # the column name as it requires explicit recreation when the column is
262 # renamed.
263 return field_db_params["check"]
264
265 def _rename_field_sql(
266 self, table: str, old_field: Field, new_field: Field, new_type: str
267 ) -> str:
268 new_type = self._set_field_new_type_null_status(old_field, new_type)
269 return super()._rename_field_sql(table, old_field, new_field, new_type)
270
271 def _alter_column_comment_sql(
272 self, model: type[Model], new_field: Field, new_type: str, new_db_comment: str
273 ) -> tuple[str, list[Any]]:
274 # Comment is alter when altering the column type.
275 return "", []
276
277 def _comment_sql(self, comment: str | None) -> str:
278 comment_sql = super()._comment_sql(comment)
279 return f" COMMENT {comment_sql}"