1from plain.models.backends.base.schema import BaseDatabaseSchemaEditor
2from plain.models.constants import LOOKUP_SEP
3from plain.models.constraints import UniqueConstraint
4from plain.models.expressions import F
5from plain.models.fields import NOT_PROVIDED
6
7
8class DatabaseSchemaEditor(BaseDatabaseSchemaEditor):
9 sql_rename_table = "RENAME TABLE %(old_table)s TO %(new_table)s"
10
11 sql_alter_column_null = "MODIFY %(column)s %(type)s NULL"
12 sql_alter_column_not_null = "MODIFY %(column)s %(type)s NOT NULL"
13 sql_alter_column_type = "MODIFY %(column)s %(type)s%(collation)s%(comment)s"
14 sql_alter_column_no_default_null = "ALTER COLUMN %(column)s SET DEFAULT NULL"
15
16 # No 'CASCADE' which works as a no-op in MySQL but is undocumented
17 sql_delete_column = "ALTER TABLE %(table)s DROP COLUMN %(column)s"
18
19 sql_delete_unique = "ALTER TABLE %(table)s DROP INDEX %(name)s"
20 sql_create_column_inline_fk = (
21 ", ADD CONSTRAINT %(name)s FOREIGN KEY (%(column)s) "
22 "REFERENCES %(to_table)s(%(to_column)s)"
23 )
24 sql_delete_fk = "ALTER TABLE %(table)s DROP FOREIGN KEY %(name)s"
25
26 sql_delete_index = "DROP INDEX %(name)s ON %(table)s"
27 sql_rename_index = "ALTER TABLE %(table)s RENAME INDEX %(old_name)s TO %(new_name)s"
28
29 sql_create_pk = (
30 "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s PRIMARY KEY (%(columns)s)"
31 )
32 sql_delete_pk = "ALTER TABLE %(table)s DROP PRIMARY KEY"
33
34 sql_create_index = "CREATE INDEX %(name)s ON %(table)s (%(columns)s)%(extra)s"
35
36 sql_alter_table_comment = "ALTER TABLE %(table)s COMMENT = %(comment)s"
37 sql_alter_column_comment = None
38
39 @property
40 def sql_delete_check(self):
41 if self.connection.mysql_is_mariadb:
42 # The name of the column check constraint is the same as the field
43 # name on MariaDB. Adding IF EXISTS clause prevents migrations
44 # crash. Constraint is removed during a "MODIFY" column statement.
45 return "ALTER TABLE %(table)s DROP CONSTRAINT IF EXISTS %(name)s"
46 return "ALTER TABLE %(table)s DROP CHECK %(name)s"
47
48 @property
49 def sql_rename_column(self):
50 # MariaDB >= 10.5.2 and MySQL >= 8.0.4 support an
51 # "ALTER TABLE ... RENAME COLUMN" statement.
52 if self.connection.mysql_is_mariadb:
53 if self.connection.mysql_version >= (10, 5, 2):
54 return super().sql_rename_column
55 elif self.connection.mysql_version >= (8, 0, 4):
56 return super().sql_rename_column
57 return "ALTER TABLE %(table)s CHANGE %(old_column)s %(new_column)s %(type)s"
58
59 def quote_value(self, value):
60 self.connection.ensure_connection()
61 if isinstance(value, str):
62 value = value.replace("%", "%%")
63 # MySQLdb escapes to string, PyMySQL to bytes.
64 quoted = self.connection.connection.escape(
65 value, self.connection.connection.encoders
66 )
67 if isinstance(value, str) and isinstance(quoted, bytes):
68 quoted = quoted.decode()
69 return quoted
70
71 def _is_limited_data_type(self, field):
72 db_type = field.db_type(self.connection)
73 return (
74 db_type is not None
75 and db_type.lower() in self.connection._limited_data_types
76 )
77
78 def skip_default(self, field):
79 if not self._supports_limited_data_type_defaults:
80 return self._is_limited_data_type(field)
81 return False
82
83 def skip_default_on_alter(self, field):
84 if self._is_limited_data_type(field) and not self.connection.mysql_is_mariadb:
85 # MySQL doesn't support defaults for BLOB and TEXT in the
86 # ALTER COLUMN statement.
87 return True
88 return False
89
90 @property
91 def _supports_limited_data_type_defaults(self):
92 # MariaDB and MySQL >= 8.0.13 support defaults for BLOB and TEXT.
93 if self.connection.mysql_is_mariadb:
94 return True
95 return self.connection.mysql_version >= (8, 0, 13)
96
97 def _column_default_sql(self, field):
98 if (
99 not self.connection.mysql_is_mariadb
100 and self._supports_limited_data_type_defaults
101 and self._is_limited_data_type(field)
102 ):
103 # MySQL supports defaults for BLOB and TEXT columns only if the
104 # default value is written as an expression i.e. in parentheses.
105 return "(%s)"
106 return super()._column_default_sql(field)
107
108 def add_field(self, model, field):
109 super().add_field(model, field)
110
111 # Simulate the effect of a one-off default.
112 # field.default may be unhashable, so a set isn't used for "in" check.
113 if self.skip_default(field) and field.default not in (None, NOT_PROVIDED):
114 effective_default = self.effective_default(field)
115 self.execute(
116 f"UPDATE {self.quote_name(model._meta.db_table)} SET {self.quote_name(field.column)} = %s",
117 [effective_default],
118 )
119
120 def remove_constraint(self, model, constraint):
121 if (
122 isinstance(constraint, UniqueConstraint)
123 and constraint.create_sql(model, self) is not None
124 ):
125 self._create_missing_fk_index(
126 model,
127 fields=constraint.fields,
128 expressions=constraint.expressions,
129 )
130 super().remove_constraint(model, constraint)
131
132 def remove_index(self, model, index):
133 self._create_missing_fk_index(
134 model,
135 fields=[field_name for field_name, _ in index.fields_orders],
136 expressions=index.expressions,
137 )
138 super().remove_index(model, index)
139
140 def _field_should_be_indexed(self, model, field):
141 if not super()._field_should_be_indexed(model, field):
142 return False
143
144 storage = self.connection.introspection.get_storage_engine(
145 self.connection.cursor(), model._meta.db_table
146 )
147 # No need to create an index for ForeignKey fields except if
148 # db_constraint=False because the index from that constraint won't be
149 # created.
150 if (
151 storage == "InnoDB"
152 and field.get_internal_type() == "ForeignKey"
153 and field.db_constraint
154 ):
155 return False
156 return not self._is_limited_data_type(field)
157
158 def _create_missing_fk_index(
159 self,
160 model,
161 *,
162 fields,
163 expressions=None,
164 ):
165 """
166 MySQL can remove an implicit FK index on a field when that field is
167 covered by another index. "covered" here means
168 that the more complex index has the FK field as its first field (see
169 https://bugs.mysql.com/bug.php?id=37910).
170
171 Manually create an implicit FK index to make it possible to remove the
172 composed index.
173 """
174 first_field_name = None
175 if fields:
176 first_field_name = fields[0]
177 elif (
178 expressions
179 and self.connection.features.supports_expression_indexes
180 and isinstance(expressions[0], F)
181 and LOOKUP_SEP not in expressions[0].name
182 ):
183 first_field_name = expressions[0].name
184
185 if not first_field_name:
186 return
187
188 first_field = model._meta.get_field(first_field_name)
189 if first_field.get_internal_type() == "ForeignKey":
190 column = self.connection.introspection.identifier_converter(
191 first_field.column
192 )
193 with self.connection.cursor() as cursor:
194 constraint_names = [
195 name
196 for name, infodict in self.connection.introspection.get_constraints(
197 cursor, model._meta.db_table
198 ).items()
199 if infodict["index"] and infodict["columns"][0] == column
200 ]
201 # There are no other indexes that starts with the FK field, only
202 # the index that is expected to be deleted.
203 if len(constraint_names) == 1:
204 self.execute(
205 self._create_index_sql(model, fields=[first_field], suffix="")
206 )
207
208 def _set_field_new_type_null_status(self, field, new_type):
209 """
210 Keep the null property of the old field. If it has changed, it will be
211 handled separately.
212 """
213 if field.allow_null:
214 new_type += " NULL"
215 else:
216 new_type += " NOT NULL"
217 return new_type
218
219 def _alter_column_type_sql(
220 self, model, old_field, new_field, new_type, old_collation, new_collation
221 ):
222 new_type = self._set_field_new_type_null_status(old_field, new_type)
223 return super()._alter_column_type_sql(
224 model, old_field, new_field, new_type, old_collation, new_collation
225 )
226
227 def _field_db_check(self, field, field_db_params):
228 if self.connection.mysql_is_mariadb and self.connection.mysql_version >= (
229 10,
230 5,
231 2,
232 ):
233 return super()._field_db_check(field, field_db_params)
234 # On MySQL and MariaDB < 10.5.2 (no support for
235 # "ALTER TABLE ... RENAME COLUMN" statements), check constraints with
236 # the column name as it requires explicit recreation when the column is
237 # renamed.
238 return field_db_params["check"]
239
240 def _rename_field_sql(self, table, old_field, new_field, new_type):
241 new_type = self._set_field_new_type_null_status(old_field, new_type)
242 return super()._rename_field_sql(table, old_field, new_field, new_type)
243
244 def _alter_column_comment_sql(self, model, new_field, new_type, new_db_comment):
245 # Comment is alter when altering the column type.
246 return "", []
247
248 def _comment_sql(self, comment):
249 comment_sql = super()._comment_sql(comment)
250 return f" COMMENT {comment_sql}"