1from plain.models.backends.base.schema import BaseDatabaseSchemaEditor
2from plain.models.constants import LOOKUP_SEP
3from plain.models.fields import NOT_PROVIDED, F, UniqueConstraint
4
5
6class DatabaseSchemaEditor(BaseDatabaseSchemaEditor):
7 sql_rename_table = "RENAME TABLE %(old_table)s TO %(new_table)s"
8
9 sql_alter_column_null = "MODIFY %(column)s %(type)s NULL"
10 sql_alter_column_not_null = "MODIFY %(column)s %(type)s NOT NULL"
11 sql_alter_column_type = "MODIFY %(column)s %(type)s%(collation)s%(comment)s"
12 sql_alter_column_no_default_null = "ALTER COLUMN %(column)s SET DEFAULT NULL"
13
14 # No 'CASCADE' which works as a no-op in MySQL but is undocumented
15 sql_delete_column = "ALTER TABLE %(table)s DROP COLUMN %(column)s"
16
17 sql_delete_unique = "ALTER TABLE %(table)s DROP INDEX %(name)s"
18 sql_create_column_inline_fk = (
19 ", ADD CONSTRAINT %(name)s FOREIGN KEY (%(column)s) "
20 "REFERENCES %(to_table)s(%(to_column)s)"
21 )
22 sql_delete_fk = "ALTER TABLE %(table)s DROP FOREIGN KEY %(name)s"
23
24 sql_delete_index = "DROP INDEX %(name)s ON %(table)s"
25 sql_rename_index = "ALTER TABLE %(table)s RENAME INDEX %(old_name)s TO %(new_name)s"
26
27 sql_create_pk = (
28 "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s PRIMARY KEY (%(columns)s)"
29 )
30 sql_delete_pk = "ALTER TABLE %(table)s DROP PRIMARY KEY"
31
32 sql_create_index = "CREATE INDEX %(name)s ON %(table)s (%(columns)s)%(extra)s"
33
34 sql_alter_table_comment = "ALTER TABLE %(table)s COMMENT = %(comment)s"
35 sql_alter_column_comment = None
36
37 @property
38 def sql_delete_check(self):
39 if self.connection.mysql_is_mariadb:
40 # The name of the column check constraint is the same as the field
41 # name on MariaDB. Adding IF EXISTS clause prevents migrations
42 # crash. Constraint is removed during a "MODIFY" column statement.
43 return "ALTER TABLE %(table)s DROP CONSTRAINT IF EXISTS %(name)s"
44 return "ALTER TABLE %(table)s DROP CHECK %(name)s"
45
46 @property
47 def sql_rename_column(self):
48 # MariaDB >= 10.5.2 and MySQL >= 8.0.4 support an
49 # "ALTER TABLE ... RENAME COLUMN" statement.
50 if self.connection.mysql_is_mariadb:
51 if self.connection.mysql_version >= (10, 5, 2):
52 return super().sql_rename_column
53 elif self.connection.mysql_version >= (8, 0, 4):
54 return super().sql_rename_column
55 return "ALTER TABLE %(table)s CHANGE %(old_column)s %(new_column)s %(type)s"
56
57 def quote_value(self, value):
58 self.connection.ensure_connection()
59 if isinstance(value, str):
60 value = value.replace("%", "%%")
61 # MySQLdb escapes to string, PyMySQL to bytes.
62 quoted = self.connection.connection.escape(
63 value, self.connection.connection.encoders
64 )
65 if isinstance(value, str) and isinstance(quoted, bytes):
66 quoted = quoted.decode()
67 return quoted
68
69 def _is_limited_data_type(self, field):
70 db_type = field.db_type(self.connection)
71 return (
72 db_type is not None
73 and db_type.lower() in self.connection._limited_data_types
74 )
75
76 def skip_default(self, field):
77 if not self._supports_limited_data_type_defaults:
78 return self._is_limited_data_type(field)
79 return False
80
81 def skip_default_on_alter(self, field):
82 if self._is_limited_data_type(field) and not self.connection.mysql_is_mariadb:
83 # MySQL doesn't support defaults for BLOB and TEXT in the
84 # ALTER COLUMN statement.
85 return True
86 return False
87
88 @property
89 def _supports_limited_data_type_defaults(self):
90 # MariaDB and MySQL >= 8.0.13 support defaults for BLOB and TEXT.
91 if self.connection.mysql_is_mariadb:
92 return True
93 return self.connection.mysql_version >= (8, 0, 13)
94
95 def _column_default_sql(self, field):
96 if (
97 not self.connection.mysql_is_mariadb
98 and self._supports_limited_data_type_defaults
99 and self._is_limited_data_type(field)
100 ):
101 # MySQL supports defaults for BLOB and TEXT columns only if the
102 # default value is written as an expression i.e. in parentheses.
103 return "(%s)"
104 return super()._column_default_sql(field)
105
106 def add_field(self, model, field):
107 super().add_field(model, field)
108
109 # Simulate the effect of a one-off default.
110 # field.default may be unhashable, so a set isn't used for "in" check.
111 if self.skip_default(field) and field.default not in (None, NOT_PROVIDED):
112 effective_default = self.effective_default(field)
113 self.execute(
114 "UPDATE {table} SET {column} = %s".format(
115 table=self.quote_name(model._meta.db_table),
116 column=self.quote_name(field.column),
117 ),
118 [effective_default],
119 )
120
121 def remove_constraint(self, model, constraint):
122 if (
123 isinstance(constraint, UniqueConstraint)
124 and constraint.create_sql(model, self) is not None
125 ):
126 self._create_missing_fk_index(
127 model,
128 fields=constraint.fields,
129 expressions=constraint.expressions,
130 )
131 super().remove_constraint(model, constraint)
132
133 def remove_index(self, model, index):
134 self._create_missing_fk_index(
135 model,
136 fields=[field_name for field_name, _ in index.fields_orders],
137 expressions=index.expressions,
138 )
139 super().remove_index(model, index)
140
141 def _field_should_be_indexed(self, model, field):
142 if not super()._field_should_be_indexed(model, field):
143 return False
144
145 storage = self.connection.introspection.get_storage_engine(
146 self.connection.cursor(), model._meta.db_table
147 )
148 # No need to create an index for ForeignKey fields except if
149 # db_constraint=False because the index from that constraint won't be
150 # created.
151 if (
152 storage == "InnoDB"
153 and field.get_internal_type() == "ForeignKey"
154 and field.db_constraint
155 ):
156 return False
157 return not self._is_limited_data_type(field)
158
159 def _create_missing_fk_index(
160 self,
161 model,
162 *,
163 fields,
164 expressions=None,
165 ):
166 """
167 MySQL can remove an implicit FK index on a field when that field is
168 covered by another index. "covered" here means
169 that the more complex index has the FK field as its first field (see
170 https://bugs.mysql.com/bug.php?id=37910).
171
172 Manually create an implicit FK index to make it possible to remove the
173 composed index.
174 """
175 first_field_name = None
176 if fields:
177 first_field_name = fields[0]
178 elif (
179 expressions
180 and self.connection.features.supports_expression_indexes
181 and isinstance(expressions[0], F)
182 and LOOKUP_SEP not in expressions[0].name
183 ):
184 first_field_name = expressions[0].name
185
186 if not first_field_name:
187 return
188
189 first_field = model._meta.get_field(first_field_name)
190 if first_field.get_internal_type() == "ForeignKey":
191 column = self.connection.introspection.identifier_converter(
192 first_field.column
193 )
194 with self.connection.cursor() as cursor:
195 constraint_names = [
196 name
197 for name, infodict in self.connection.introspection.get_constraints(
198 cursor, model._meta.db_table
199 ).items()
200 if infodict["index"] and infodict["columns"][0] == column
201 ]
202 # There are no other indexes that starts with the FK field, only
203 # the index that is expected to be deleted.
204 if len(constraint_names) == 1:
205 self.execute(
206 self._create_index_sql(model, fields=[first_field], suffix="")
207 )
208
209 def _set_field_new_type_null_status(self, field, new_type):
210 """
211 Keep the null property of the old field. If it has changed, it will be
212 handled separately.
213 """
214 if field.null:
215 new_type += " NULL"
216 else:
217 new_type += " NOT NULL"
218 return new_type
219
220 def _alter_column_type_sql(
221 self, model, old_field, new_field, new_type, old_collation, new_collation
222 ):
223 new_type = self._set_field_new_type_null_status(old_field, new_type)
224 return super()._alter_column_type_sql(
225 model, old_field, new_field, new_type, old_collation, new_collation
226 )
227
228 def _field_db_check(self, field, field_db_params):
229 if self.connection.mysql_is_mariadb and self.connection.mysql_version >= (
230 10,
231 5,
232 2,
233 ):
234 return super()._field_db_check(field, field_db_params)
235 # On MySQL and MariaDB < 10.5.2 (no support for
236 # "ALTER TABLE ... RENAME COLUMN" statements), check constraints with
237 # the column name as it requires explicit recreation when the column is
238 # renamed.
239 return field_db_params["check"]
240
241 def _rename_field_sql(self, table, old_field, new_field, new_type):
242 new_type = self._set_field_new_type_null_status(old_field, new_type)
243 return super()._rename_field_sql(table, old_field, new_field, new_type)
244
245 def _alter_column_comment_sql(self, model, new_field, new_type, new_db_comment):
246 # Comment is alter when altering the column type.
247 return "", []
248
249 def _comment_sql(self, comment):
250 comment_sql = super()._comment_sql(comment)
251 return f" COMMENT {comment_sql}"