1import logging
2import operator
3from datetime import datetime
4
5from plain.models.backends.ddl_references import (
6 Columns,
7 Expressions,
8 ForeignKeyName,
9 IndexName,
10 Statement,
11 Table,
12)
13from plain.models.backends.utils import names_digest, split_identifier, truncate_name
14from plain.models.constraints import Deferrable
15from plain.models.indexes import Index
16from plain.models.sql import Query
17from plain.models.transaction import TransactionManagementError, atomic
18from plain.utils import timezone
19
20logger = logging.getLogger("plain.models.backends.schema")
21
22
23def _is_relevant_relation(relation, altered_field):
24 """
25 When altering the given field, must constraints on its model from the given
26 relation be temporarily dropped?
27 """
28 field = relation.field
29 if field.many_to_many:
30 # M2M reverse field
31 return False
32 if altered_field.primary_key and field.to_fields == [None]:
33 # Foreign key constraint on the primary key, which is being altered.
34 return True
35 # Is the constraint targeting the field being altered?
36 return altered_field.name in field.to_fields
37
38
39def _all_related_fields(model):
40 # Related fields must be returned in a deterministic order.
41 return sorted(
42 model._meta._get_fields(
43 forward=False,
44 reverse=True,
45 include_hidden=True,
46 ),
47 key=operator.attrgetter("name"),
48 )
49
50
51def _related_non_m2m_objects(old_field, new_field):
52 # Filter out m2m objects from reverse relations.
53 # Return (old_relation, new_relation) tuples.
54 related_fields = zip(
55 (
56 obj
57 for obj in _all_related_fields(old_field.model)
58 if _is_relevant_relation(obj, old_field)
59 ),
60 (
61 obj
62 for obj in _all_related_fields(new_field.model)
63 if _is_relevant_relation(obj, new_field)
64 ),
65 )
66 for old_rel, new_rel in related_fields:
67 yield old_rel, new_rel
68 yield from _related_non_m2m_objects(
69 old_rel.remote_field,
70 new_rel.remote_field,
71 )
72
73
74class BaseDatabaseSchemaEditor:
75 """
76 This class and its subclasses are responsible for emitting schema-changing
77 statements to the databases - model creation/removal/alteration, field
78 renaming, index fiddling, and so on.
79 """
80
81 # Overrideable SQL templates
82 sql_create_table = "CREATE TABLE %(table)s (%(definition)s)"
83 sql_rename_table = "ALTER TABLE %(old_table)s RENAME TO %(new_table)s"
84 sql_delete_table = "DROP TABLE %(table)s CASCADE"
85
86 sql_create_column = "ALTER TABLE %(table)s ADD COLUMN %(column)s %(definition)s"
87 sql_alter_column = "ALTER TABLE %(table)s %(changes)s"
88 sql_alter_column_type = "ALTER COLUMN %(column)s TYPE %(type)s%(collation)s"
89 sql_alter_column_null = "ALTER COLUMN %(column)s DROP NOT NULL"
90 sql_alter_column_not_null = "ALTER COLUMN %(column)s SET NOT NULL"
91 sql_alter_column_default = "ALTER COLUMN %(column)s SET DEFAULT %(default)s"
92 sql_alter_column_no_default = "ALTER COLUMN %(column)s DROP DEFAULT"
93 sql_alter_column_no_default_null = sql_alter_column_no_default
94 sql_delete_column = "ALTER TABLE %(table)s DROP COLUMN %(column)s CASCADE"
95 sql_rename_column = (
96 "ALTER TABLE %(table)s RENAME COLUMN %(old_column)s TO %(new_column)s"
97 )
98 sql_update_with_default = (
99 "UPDATE %(table)s SET %(column)s = %(default)s WHERE %(column)s IS NULL"
100 )
101
102 sql_unique_constraint = "UNIQUE (%(columns)s)%(deferrable)s"
103 sql_check_constraint = "CHECK (%(check)s)"
104 sql_delete_constraint = "ALTER TABLE %(table)s DROP CONSTRAINT %(name)s"
105 sql_constraint = "CONSTRAINT %(name)s %(constraint)s"
106
107 sql_create_check = "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s CHECK (%(check)s)"
108 sql_delete_check = sql_delete_constraint
109
110 sql_create_unique = (
111 "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s "
112 "UNIQUE (%(columns)s)%(deferrable)s"
113 )
114 sql_delete_unique = sql_delete_constraint
115
116 sql_create_fk = (
117 "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s FOREIGN KEY (%(column)s) "
118 "REFERENCES %(to_table)s (%(to_column)s)%(deferrable)s"
119 )
120 sql_create_inline_fk = None
121 sql_create_column_inline_fk = None
122 sql_delete_fk = sql_delete_constraint
123
124 sql_create_index = (
125 "CREATE INDEX %(name)s ON %(table)s "
126 "(%(columns)s)%(include)s%(extra)s%(condition)s"
127 )
128 sql_create_unique_index = (
129 "CREATE UNIQUE INDEX %(name)s ON %(table)s "
130 "(%(columns)s)%(include)s%(condition)s"
131 )
132 sql_rename_index = "ALTER INDEX %(old_name)s RENAME TO %(new_name)s"
133 sql_delete_index = "DROP INDEX %(name)s"
134
135 sql_create_pk = (
136 "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s PRIMARY KEY (%(columns)s)"
137 )
138 sql_delete_pk = sql_delete_constraint
139
140 sql_alter_table_comment = "COMMENT ON TABLE %(table)s IS %(comment)s"
141 sql_alter_column_comment = "COMMENT ON COLUMN %(table)s.%(column)s IS %(comment)s"
142
143 def __init__(self, connection, collect_sql=False, atomic=True):
144 self.connection = connection
145 self.collect_sql = collect_sql
146 if self.collect_sql:
147 self.collected_sql = []
148 self.atomic_migration = self.connection.features.can_rollback_ddl and atomic
149
150 # State-managing methods
151
152 def __enter__(self):
153 self.deferred_sql = []
154 if self.atomic_migration:
155 self.atomic = atomic()
156 self.atomic.__enter__()
157 return self
158
159 def __exit__(self, exc_type, exc_value, traceback):
160 if exc_type is None:
161 for sql in self.deferred_sql:
162 self.execute(sql)
163 if self.atomic_migration:
164 self.atomic.__exit__(exc_type, exc_value, traceback)
165
166 # Core utility functions
167
168 def execute(self, sql, params=()):
169 """Execute the given SQL statement, with optional parameters."""
170 # Don't perform the transactional DDL check if SQL is being collected
171 # as it's not going to be executed anyway.
172 if (
173 not self.collect_sql
174 and self.connection.in_atomic_block
175 and not self.connection.features.can_rollback_ddl
176 ):
177 raise TransactionManagementError(
178 "Executing DDL statements while in a transaction on databases "
179 "that can't perform a rollback is prohibited."
180 )
181 # Account for non-string statement objects.
182 sql = str(sql)
183 # Log the command we're running, then run it
184 logger.debug(
185 "%s; (params %r)", sql, params, extra={"params": params, "sql": sql}
186 )
187 if self.collect_sql:
188 ending = "" if sql.rstrip().endswith(";") else ";"
189 if params is not None:
190 self.collected_sql.append(
191 (sql % tuple(map(self.quote_value, params))) + ending
192 )
193 else:
194 self.collected_sql.append(sql + ending)
195 else:
196 with self.connection.cursor() as cursor:
197 cursor.execute(sql, params)
198
199 def quote_name(self, name):
200 return self.connection.ops.quote_name(name)
201
202 def table_sql(self, model):
203 """Take a model and return its table definition."""
204 # Create column SQL, add FK deferreds if needed.
205 column_sqls = []
206 params = []
207 for field in model._meta.local_fields:
208 # SQL.
209 definition, extra_params = self.column_sql(model, field)
210 if definition is None:
211 continue
212 # Check constraints can go on the column SQL here.
213 db_params = field.db_parameters(connection=self.connection)
214 if db_params["check"]:
215 definition += " " + self.sql_check_constraint % db_params
216 # Autoincrement SQL (for backends with inline variant).
217 col_type_suffix = field.db_type_suffix(connection=self.connection)
218 if col_type_suffix:
219 definition += f" {col_type_suffix}"
220 params.extend(extra_params)
221 # FK.
222 if field.remote_field and field.db_constraint:
223 to_table = field.remote_field.model._meta.db_table
224 to_column = field.remote_field.model._meta.get_field(
225 field.remote_field.field_name
226 ).column
227 if self.sql_create_inline_fk:
228 definition += " " + self.sql_create_inline_fk % {
229 "to_table": self.quote_name(to_table),
230 "to_column": self.quote_name(to_column),
231 }
232 elif self.connection.features.supports_foreign_keys:
233 self.deferred_sql.append(
234 self._create_fk_sql(
235 model, field, "_fk_%(to_table)s_%(to_column)s"
236 )
237 )
238 # Add the SQL to our big list.
239 column_sqls.append(f"{self.quote_name(field.column)} {definition}")
240 # Autoincrement SQL (for backends with post table definition
241 # variant).
242 if field.get_internal_type() in (
243 "AutoField",
244 "BigAutoField",
245 "SmallAutoField",
246 ):
247 autoinc_sql = self.connection.ops.autoinc_sql(
248 model._meta.db_table, field.column
249 )
250 if autoinc_sql:
251 self.deferred_sql.extend(autoinc_sql)
252 constraints = [
253 constraint.constraint_sql(model, self)
254 for constraint in model._meta.constraints
255 ]
256 sql = self.sql_create_table % {
257 "table": self.quote_name(model._meta.db_table),
258 "definition": ", ".join(
259 str(constraint)
260 for constraint in (*column_sqls, *constraints)
261 if constraint
262 ),
263 }
264 return sql, params
265
266 # Field <-> database mapping functions
267
268 def _iter_column_sql(
269 self, column_db_type, params, model, field, field_db_params, include_default
270 ):
271 yield column_db_type
272 if collation := field_db_params.get("collation"):
273 yield self._collate_sql(collation)
274 if self.connection.features.supports_comments_inline and field.db_comment:
275 yield self._comment_sql(field.db_comment)
276 # Work out nullability.
277 null = field.allow_null
278 # Include a default value, if requested.
279 include_default = (
280 include_default
281 and not self.skip_default(field)
282 and
283 # Don't include a default value if it's a nullable field and the
284 # default cannot be dropped in the ALTER COLUMN statement (e.g.
285 # MySQL longtext and longblob).
286 not (null and self.skip_default_on_alter(field))
287 )
288 if include_default:
289 default_value = self.effective_default(field)
290 if default_value is not None:
291 column_default = "DEFAULT " + self._column_default_sql(field)
292 if self.connection.features.requires_literal_defaults:
293 # Some databases can't take defaults as a parameter (Oracle).
294 # If this is the case, the individual schema backend should
295 # implement prepare_default().
296 yield column_default % self.prepare_default(default_value)
297 else:
298 yield column_default
299 params.append(default_value)
300
301 if not null:
302 yield "NOT NULL"
303 elif not self.connection.features.implied_column_null:
304 yield "NULL"
305
306 if field.primary_key:
307 yield "PRIMARY KEY"
308
309 def column_sql(self, model, field, include_default=False):
310 """
311 Return the column definition for a field. The field must already have
312 had set_attributes_from_name() called.
313 """
314 # Get the column's type and use that as the basis of the SQL.
315 field_db_params = field.db_parameters(connection=self.connection)
316 column_db_type = field_db_params["type"]
317 # Check for fields that aren't actually columns (e.g. M2M).
318 if column_db_type is None:
319 return None, None
320 params = []
321 return (
322 " ".join(
323 # This appends to the params being returned.
324 self._iter_column_sql(
325 column_db_type,
326 params,
327 model,
328 field,
329 field_db_params,
330 include_default,
331 )
332 ),
333 params,
334 )
335
336 def skip_default(self, field):
337 """
338 Some backends don't accept default values for certain columns types
339 (i.e. MySQL longtext and longblob).
340 """
341 return False
342
343 def skip_default_on_alter(self, field):
344 """
345 Some backends don't accept default values for certain columns types
346 (i.e. MySQL longtext and longblob) in the ALTER COLUMN statement.
347 """
348 return False
349
350 def prepare_default(self, value):
351 """
352 Only used for backends which have requires_literal_defaults feature
353 """
354 raise NotImplementedError(
355 "subclasses of BaseDatabaseSchemaEditor for backends which have "
356 "requires_literal_defaults must provide a prepare_default() method"
357 )
358
359 def _column_default_sql(self, field):
360 """
361 Return the SQL to use in a DEFAULT clause. The resulting string should
362 contain a '%s' placeholder for a default value.
363 """
364 return "%s"
365
366 @staticmethod
367 def _effective_default(field):
368 # This method allows testing its logic without a connection.
369 if field.has_default():
370 default = field.get_default()
371 elif (
372 not field.allow_null and not field.required and field.empty_strings_allowed
373 ):
374 if field.get_internal_type() == "BinaryField":
375 default = b""
376 else:
377 default = ""
378 elif getattr(field, "auto_now", False) or getattr(field, "auto_now_add", False):
379 internal_type = field.get_internal_type()
380 if internal_type == "DateTimeField":
381 default = timezone.now()
382 else:
383 default = datetime.now()
384 if internal_type == "DateField":
385 default = default.date()
386 elif internal_type == "TimeField":
387 default = default.time()
388 else:
389 default = None
390 return default
391
392 def effective_default(self, field):
393 """Return a field's effective database default value."""
394 return field.get_db_prep_save(self._effective_default(field), self.connection)
395
396 def quote_value(self, value):
397 """
398 Return a quoted version of the value so it's safe to use in an SQL
399 string. This is not safe against injection from user code; it is
400 intended only for use in making SQL scripts or preparing default values
401 for particularly tricky backends (defaults are not user-defined, though,
402 so this is safe).
403 """
404 raise NotImplementedError()
405
406 # Actions
407
408 def create_model(self, model):
409 """
410 Create a table and any accompanying indexes or unique constraints for
411 the given `model`.
412 """
413 sql, params = self.table_sql(model)
414 # Prevent using [] as params, in the case a literal '%' is used in the
415 # definition.
416 self.execute(sql, params or None)
417
418 if self.connection.features.supports_comments:
419 # Add table comment.
420 if model._meta.db_table_comment:
421 self.alter_db_table_comment(model, None, model._meta.db_table_comment)
422 # Add column comments.
423 if not self.connection.features.supports_comments_inline:
424 for field in model._meta.local_fields:
425 if field.db_comment:
426 field_db_params = field.db_parameters(
427 connection=self.connection
428 )
429 field_type = field_db_params["type"]
430 self.execute(
431 *self._alter_column_comment_sql(
432 model, field, field_type, field.db_comment
433 )
434 )
435 # Add any field index (deferred as SQLite _remake_table needs it).
436 self.deferred_sql.extend(self._model_indexes_sql(model))
437
438 def delete_model(self, model):
439 """Delete a model from the database."""
440
441 # Delete the table
442 self.execute(
443 self.sql_delete_table
444 % {
445 "table": self.quote_name(model._meta.db_table),
446 }
447 )
448 # Remove all deferred statements referencing the deleted table.
449 for sql in list(self.deferred_sql):
450 if isinstance(sql, Statement) and sql.references_table(
451 model._meta.db_table
452 ):
453 self.deferred_sql.remove(sql)
454
455 def add_index(self, model, index):
456 """Add an index on a model."""
457 if (
458 index.contains_expressions
459 and not self.connection.features.supports_expression_indexes
460 ):
461 return None
462 # Index.create_sql returns interpolated SQL which makes params=None a
463 # necessity to avoid escaping attempts on execution.
464 self.execute(index.create_sql(model, self), params=None)
465
466 def remove_index(self, model, index):
467 """Remove an index from a model."""
468 if (
469 index.contains_expressions
470 and not self.connection.features.supports_expression_indexes
471 ):
472 return None
473 self.execute(index.remove_sql(model, self))
474
475 def rename_index(self, model, old_index, new_index):
476 if self.connection.features.can_rename_index:
477 self.execute(
478 self._rename_index_sql(model, old_index.name, new_index.name),
479 params=None,
480 )
481 else:
482 self.remove_index(model, old_index)
483 self.add_index(model, new_index)
484
485 def add_constraint(self, model, constraint):
486 """Add a constraint to a model."""
487 sql = constraint.create_sql(model, self)
488 if sql:
489 # Constraint.create_sql returns interpolated SQL which makes
490 # params=None a necessity to avoid escaping attempts on execution.
491 self.execute(sql, params=None)
492
493 def remove_constraint(self, model, constraint):
494 """Remove a constraint from a model."""
495 sql = constraint.remove_sql(model, self)
496 if sql:
497 self.execute(sql)
498
499 def alter_db_table(self, model, old_db_table, new_db_table):
500 """Rename the table a model points to."""
501 if old_db_table == new_db_table or (
502 self.connection.features.ignores_table_name_case
503 and old_db_table.lower() == new_db_table.lower()
504 ):
505 return
506 self.execute(
507 self.sql_rename_table
508 % {
509 "old_table": self.quote_name(old_db_table),
510 "new_table": self.quote_name(new_db_table),
511 }
512 )
513 # Rename all references to the old table name.
514 for sql in self.deferred_sql:
515 if isinstance(sql, Statement):
516 sql.rename_table_references(old_db_table, new_db_table)
517
518 def alter_db_table_comment(self, model, old_db_table_comment, new_db_table_comment):
519 self.execute(
520 self.sql_alter_table_comment
521 % {
522 "table": self.quote_name(model._meta.db_table),
523 "comment": self.quote_value(new_db_table_comment or ""),
524 }
525 )
526
527 def add_field(self, model, field):
528 """
529 Create a field on a model. Usually involves adding a column, but may
530 involve adding a table instead (for M2M fields).
531 """
532 # Get the column's definition
533 definition, params = self.column_sql(model, field, include_default=True)
534 # It might not actually have a column behind it
535 if definition is None:
536 return
537 if col_type_suffix := field.db_type_suffix(connection=self.connection):
538 definition += f" {col_type_suffix}"
539 # Check constraints can go on the column SQL here
540 db_params = field.db_parameters(connection=self.connection)
541 if db_params["check"]:
542 definition += " " + self.sql_check_constraint % db_params
543 if (
544 field.remote_field
545 and self.connection.features.supports_foreign_keys
546 and field.db_constraint
547 ):
548 constraint_suffix = "_fk_%(to_table)s_%(to_column)s"
549 # Add FK constraint inline, if supported.
550 if self.sql_create_column_inline_fk:
551 to_table = field.remote_field.model._meta.db_table
552 to_column = field.remote_field.model._meta.get_field(
553 field.remote_field.field_name
554 ).column
555 namespace, _ = split_identifier(model._meta.db_table)
556 definition += " " + self.sql_create_column_inline_fk % {
557 "name": self._fk_constraint_name(model, field, constraint_suffix),
558 "namespace": f"{self.quote_name(namespace)}." if namespace else "",
559 "column": self.quote_name(field.column),
560 "to_table": self.quote_name(to_table),
561 "to_column": self.quote_name(to_column),
562 "deferrable": self.connection.ops.deferrable_sql(),
563 }
564 # Otherwise, add FK constraints later.
565 else:
566 self.deferred_sql.append(
567 self._create_fk_sql(model, field, constraint_suffix)
568 )
569 # Build the SQL and run it
570 sql = self.sql_create_column % {
571 "table": self.quote_name(model._meta.db_table),
572 "column": self.quote_name(field.column),
573 "definition": definition,
574 }
575 self.execute(sql, params)
576 # Drop the default if we need to
577 # (Plain usually does not use in-database defaults)
578 if (
579 not self.skip_default_on_alter(field)
580 and self.effective_default(field) is not None
581 ):
582 changes_sql, params = self._alter_column_default_sql(
583 model, None, field, drop=True
584 )
585 sql = self.sql_alter_column % {
586 "table": self.quote_name(model._meta.db_table),
587 "changes": changes_sql,
588 }
589 self.execute(sql, params)
590 # Add field comment, if required.
591 if (
592 field.db_comment
593 and self.connection.features.supports_comments
594 and not self.connection.features.supports_comments_inline
595 ):
596 field_type = db_params["type"]
597 self.execute(
598 *self._alter_column_comment_sql(
599 model, field, field_type, field.db_comment
600 )
601 )
602 # Add an index, if required
603 self.deferred_sql.extend(self._field_indexes_sql(model, field))
604 # Reset connection if required
605 if self.connection.features.connection_persists_old_columns:
606 self.connection.close()
607
608 def remove_field(self, model, field):
609 """
610 Remove a field from a model. Usually involves deleting a column,
611 but for M2Ms may involve deleting a table.
612 """
613 # It might not actually have a column behind it
614 if field.db_parameters(connection=self.connection)["type"] is None:
615 return
616 # Drop any FK constraints, MySQL requires explicit deletion
617 if field.remote_field:
618 fk_names = self._constraint_names(model, [field.column], foreign_key=True)
619 for fk_name in fk_names:
620 self.execute(self._delete_fk_sql(model, fk_name))
621 # Delete the column
622 sql = self.sql_delete_column % {
623 "table": self.quote_name(model._meta.db_table),
624 "column": self.quote_name(field.column),
625 }
626 self.execute(sql)
627 # Reset connection if required
628 if self.connection.features.connection_persists_old_columns:
629 self.connection.close()
630 # Remove all deferred statements referencing the deleted column.
631 for sql in list(self.deferred_sql):
632 if isinstance(sql, Statement) and sql.references_column(
633 model._meta.db_table, field.column
634 ):
635 self.deferred_sql.remove(sql)
636
637 def alter_field(self, model, old_field, new_field, strict=False):
638 """
639 Allow a field's type, uniqueness, nullability, default, column,
640 constraints, etc. to be modified.
641 `old_field` is required to compute the necessary changes.
642 If `strict` is True, raise errors if the old column does not match
643 `old_field` precisely.
644 """
645 if not self._field_should_be_altered(old_field, new_field):
646 return
647 # Ensure this field is even column-based
648 old_db_params = old_field.db_parameters(connection=self.connection)
649 old_type = old_db_params["type"]
650 new_db_params = new_field.db_parameters(connection=self.connection)
651 new_type = new_db_params["type"]
652 if (old_type is None and old_field.remote_field is None) or (
653 new_type is None and new_field.remote_field is None
654 ):
655 raise ValueError(
656 f"Cannot alter field {old_field} into {new_field} - they do not properly define "
657 "db_type (are you using a badly-written custom field?)",
658 )
659 elif (
660 old_type is None
661 and new_type is None
662 and (old_field.remote_field.through and new_field.remote_field.through)
663 ):
664 # Both sides have through models; this is a no-op.
665 return
666 elif old_type is None or new_type is None:
667 raise ValueError(
668 f"Cannot alter field {old_field} into {new_field} - they are not compatible types "
669 "(you cannot alter to or from M2M fields, or add or remove "
670 "through= on M2M fields)"
671 )
672
673 self._alter_field(
674 model,
675 old_field,
676 new_field,
677 old_type,
678 new_type,
679 old_db_params,
680 new_db_params,
681 strict,
682 )
683
684 def _field_db_check(self, field, field_db_params):
685 # Always check constraints with the same mocked column name to avoid
686 # recreating constrains when the column is renamed.
687 check_constraints = self.connection.data_type_check_constraints
688 data = field.db_type_parameters(self.connection)
689 data["column"] = "__column_name__"
690 try:
691 return check_constraints[field.get_internal_type()] % data
692 except KeyError:
693 return None
694
695 def _alter_field(
696 self,
697 model,
698 old_field,
699 new_field,
700 old_type,
701 new_type,
702 old_db_params,
703 new_db_params,
704 strict=False,
705 ):
706 """Perform a "physical" (non-ManyToMany) field update."""
707 # Drop any FK constraints, we'll remake them later
708 fks_dropped = set()
709 if (
710 self.connection.features.supports_foreign_keys
711 and old_field.remote_field
712 and old_field.db_constraint
713 and self._field_should_be_altered(
714 old_field,
715 new_field,
716 ignore={"db_comment"},
717 )
718 ):
719 fk_names = self._constraint_names(
720 model, [old_field.column], foreign_key=True
721 )
722 if strict and len(fk_names) != 1:
723 raise ValueError(
724 f"Found wrong number ({len(fk_names)}) of foreign key constraints for {model._meta.db_table}.{old_field.column}"
725 )
726 for fk_name in fk_names:
727 fks_dropped.add((old_field.column,))
728 self.execute(self._delete_fk_sql(model, fk_name))
729 # Has unique been removed?
730 if old_field.primary_key and (
731 not new_field.primary_key
732 or self._field_became_primary_key(old_field, new_field)
733 ):
734 # Find the unique constraint for this field
735 meta_constraint_names = {
736 constraint.name for constraint in model._meta.constraints
737 }
738 constraint_names = self._constraint_names(
739 model,
740 [old_field.column],
741 unique=True,
742 primary_key=False,
743 exclude=meta_constraint_names,
744 )
745 if strict and len(constraint_names) != 1:
746 raise ValueError(
747 f"Found wrong number ({len(constraint_names)}) of unique constraints for {model._meta.db_table}.{old_field.column}"
748 )
749 for constraint_name in constraint_names:
750 self.execute(self._delete_unique_sql(model, constraint_name))
751 # Drop incoming FK constraints if the field is a primary key or unique,
752 # which might be a to_field target, and things are going to change.
753 old_collation = old_db_params.get("collation")
754 new_collation = new_db_params.get("collation")
755 drop_foreign_keys = (
756 self.connection.features.supports_foreign_keys
757 and (old_field.primary_key and new_field.primary_key)
758 and ((old_type != new_type) or (old_collation != new_collation))
759 )
760 if drop_foreign_keys:
761 # '_meta.related_field' also contains M2M reverse fields, these
762 # will be filtered out
763 for _old_rel, new_rel in _related_non_m2m_objects(old_field, new_field):
764 rel_fk_names = self._constraint_names(
765 new_rel.related_model, [new_rel.field.column], foreign_key=True
766 )
767 for fk_name in rel_fk_names:
768 self.execute(self._delete_fk_sql(new_rel.related_model, fk_name))
769 # Removed an index? (no strict check, as multiple indexes are possible)
770 # Remove indexes if db_index switched to False or a unique constraint
771 # will now be used in lieu of an index. The following lines from the
772 # truth table show all True cases; the rest are False:
773 #
774 # old_field.db_index | old_field.primary_key | new_field.db_index | new_field.primary_key
775 # ------------------------------------------------------------------------------
776 # True | False | False | False
777 # True | False | False | True
778 # True | False | True | True
779 if (
780 (old_field.remote_field and old_field.db_index)
781 and not old_field.primary_key
782 and (
783 not (new_field.remote_field and new_field.db_index)
784 or new_field.primary_key
785 )
786 ):
787 # Find the index for this field
788 meta_index_names = {index.name for index in model._meta.indexes}
789 # Retrieve only BTREE indexes since this is what's created with
790 # db_index=True.
791 index_names = self._constraint_names(
792 model,
793 [old_field.column],
794 index=True,
795 type_=Index.suffix,
796 exclude=meta_index_names,
797 )
798 for index_name in index_names:
799 # The only way to check if an index was created with
800 # db_index=True or with Index(['field'], name='foo')
801 # is to look at its name (refs #28053).
802 self.execute(self._delete_index_sql(model, index_name))
803 # Change check constraints?
804 old_db_check = self._field_db_check(old_field, old_db_params)
805 new_db_check = self._field_db_check(new_field, new_db_params)
806 if old_db_check != new_db_check and old_db_check:
807 meta_constraint_names = {
808 constraint.name for constraint in model._meta.constraints
809 }
810 constraint_names = self._constraint_names(
811 model,
812 [old_field.column],
813 check=True,
814 exclude=meta_constraint_names,
815 )
816 if strict and len(constraint_names) != 1:
817 raise ValueError(
818 f"Found wrong number ({len(constraint_names)}) of check constraints for {model._meta.db_table}.{old_field.column}"
819 )
820 for constraint_name in constraint_names:
821 self.execute(self._delete_check_sql(model, constraint_name))
822 # Have they renamed the column?
823 if old_field.column != new_field.column:
824 self.execute(
825 self._rename_field_sql(
826 model._meta.db_table, old_field, new_field, new_type
827 )
828 )
829 # Rename all references to the renamed column.
830 for sql in self.deferred_sql:
831 if isinstance(sql, Statement):
832 sql.rename_column_references(
833 model._meta.db_table, old_field.column, new_field.column
834 )
835 # Next, start accumulating actions to do
836 actions = []
837 null_actions = []
838 post_actions = []
839 # Type suffix change? (e.g. auto increment).
840 old_type_suffix = old_field.db_type_suffix(connection=self.connection)
841 new_type_suffix = new_field.db_type_suffix(connection=self.connection)
842 # Type, collation, or comment change?
843 if (
844 old_type != new_type
845 or old_type_suffix != new_type_suffix
846 or old_collation != new_collation
847 or (
848 self.connection.features.supports_comments
849 and old_field.db_comment != new_field.db_comment
850 )
851 ):
852 fragment, other_actions = self._alter_column_type_sql(
853 model, old_field, new_field, new_type, old_collation, new_collation
854 )
855 actions.append(fragment)
856 post_actions.extend(other_actions)
857 # When changing a column NULL constraint to NOT NULL with a given
858 # default value, we need to perform 4 steps:
859 # 1. Add a default for new incoming writes
860 # 2. Update existing NULL rows with new default
861 # 3. Replace NULL constraint with NOT NULL
862 # 4. Drop the default again.
863 # Default change?
864 needs_database_default = False
865 if old_field.allow_null and not new_field.allow_null:
866 old_default = self.effective_default(old_field)
867 new_default = self.effective_default(new_field)
868 if (
869 not self.skip_default_on_alter(new_field)
870 and old_default != new_default
871 and new_default is not None
872 ):
873 needs_database_default = True
874 actions.append(
875 self._alter_column_default_sql(model, old_field, new_field)
876 )
877 # Nullability change?
878 if old_field.allow_null != new_field.allow_null:
879 fragment = self._alter_column_null_sql(model, old_field, new_field)
880 if fragment:
881 null_actions.append(fragment)
882 # Only if we have a default and there is a change from NULL to NOT NULL
883 four_way_default_alteration = new_field.has_default() and (
884 old_field.allow_null and not new_field.allow_null
885 )
886 if actions or null_actions:
887 if not four_way_default_alteration:
888 # If we don't have to do a 4-way default alteration we can
889 # directly run a (NOT) NULL alteration
890 actions += null_actions
891 # Combine actions together if we can (e.g. postgres)
892 if self.connection.features.supports_combined_alters and actions:
893 sql, params = tuple(zip(*actions))
894 actions = [(", ".join(sql), sum(params, []))]
895 # Apply those actions
896 for sql, params in actions:
897 self.execute(
898 self.sql_alter_column
899 % {
900 "table": self.quote_name(model._meta.db_table),
901 "changes": sql,
902 },
903 params,
904 )
905 if four_way_default_alteration:
906 # Update existing rows with default value
907 self.execute(
908 self.sql_update_with_default
909 % {
910 "table": self.quote_name(model._meta.db_table),
911 "column": self.quote_name(new_field.column),
912 "default": "%s",
913 },
914 [new_default],
915 )
916 # Since we didn't run a NOT NULL change before we need to do it
917 # now
918 for sql, params in null_actions:
919 self.execute(
920 self.sql_alter_column
921 % {
922 "table": self.quote_name(model._meta.db_table),
923 "changes": sql,
924 },
925 params,
926 )
927 if post_actions:
928 for sql, params in post_actions:
929 self.execute(sql, params)
930 # If primary_key changed to False, delete the primary key constraint.
931 if old_field.primary_key and not new_field.primary_key:
932 self._delete_primary_key(model, strict)
933
934 # Added an index? Add an index if db_index switched to True or a unique
935 # constraint will no longer be used in lieu of an index. The following
936 # lines from the truth table show all True cases; the rest are False:
937 #
938 # old_field.db_index | old_field.primary_key | new_field.db_index | new_field.primary_key
939 # ------------------------------------------------------------------------------
940 # False | False | True | False
941 # False | True | True | False
942 # True | True | True | False
943 if (
944 (
945 not (old_field.remote_field and old_field.db_index)
946 or old_field.primary_key
947 )
948 and (new_field.remote_field and new_field.db_index)
949 and not new_field.primary_key
950 ):
951 self.execute(self._create_index_sql(model, fields=[new_field]))
952 # Type alteration on primary key? Then we need to alter the column
953 # referring to us.
954 rels_to_update = []
955 if drop_foreign_keys:
956 rels_to_update.extend(_related_non_m2m_objects(old_field, new_field))
957 # Changed to become primary key?
958 if self._field_became_primary_key(old_field, new_field):
959 # Make the new one
960 self.execute(self._create_primary_key_sql(model, new_field))
961 # Update all referencing columns
962 rels_to_update.extend(_related_non_m2m_objects(old_field, new_field))
963 # Handle our type alters on the other end of rels from the PK stuff above
964 for old_rel, new_rel in rels_to_update:
965 rel_db_params = new_rel.field.db_parameters(connection=self.connection)
966 rel_type = rel_db_params["type"]
967 rel_collation = rel_db_params.get("collation")
968 old_rel_db_params = old_rel.field.db_parameters(connection=self.connection)
969 old_rel_collation = old_rel_db_params.get("collation")
970 fragment, other_actions = self._alter_column_type_sql(
971 new_rel.related_model,
972 old_rel.field,
973 new_rel.field,
974 rel_type,
975 old_rel_collation,
976 rel_collation,
977 )
978 self.execute(
979 self.sql_alter_column
980 % {
981 "table": self.quote_name(new_rel.related_model._meta.db_table),
982 "changes": fragment[0],
983 },
984 fragment[1],
985 )
986 for sql, params in other_actions:
987 self.execute(sql, params)
988 # Does it have a foreign key?
989 if (
990 self.connection.features.supports_foreign_keys
991 and new_field.remote_field
992 and (
993 fks_dropped or not old_field.remote_field or not old_field.db_constraint
994 )
995 and new_field.db_constraint
996 ):
997 self.execute(
998 self._create_fk_sql(model, new_field, "_fk_%(to_table)s_%(to_column)s")
999 )
1000 # Rebuild FKs that pointed to us if we previously had to drop them
1001 if drop_foreign_keys:
1002 for _, rel in rels_to_update:
1003 if rel.field.db_constraint:
1004 self.execute(
1005 self._create_fk_sql(rel.related_model, rel.field, "_fk")
1006 )
1007 # Does it have check constraints we need to add?
1008 if old_db_check != new_db_check and new_db_check:
1009 constraint_name = self._create_index_name(
1010 model._meta.db_table, [new_field.column], suffix="_check"
1011 )
1012 self.execute(
1013 self._create_check_sql(model, constraint_name, new_db_params["check"])
1014 )
1015 # Drop the default if we need to
1016 # (Plain usually does not use in-database defaults)
1017 if needs_database_default:
1018 changes_sql, params = self._alter_column_default_sql(
1019 model, old_field, new_field, drop=True
1020 )
1021 sql = self.sql_alter_column % {
1022 "table": self.quote_name(model._meta.db_table),
1023 "changes": changes_sql,
1024 }
1025 self.execute(sql, params)
1026 # Reset connection if required
1027 if self.connection.features.connection_persists_old_columns:
1028 self.connection.close()
1029
1030 def _alter_column_null_sql(self, model, old_field, new_field):
1031 """
1032 Hook to specialize column null alteration.
1033
1034 Return a (sql, params) fragment to set a column to null or non-null
1035 as required by new_field, or None if no changes are required.
1036 """
1037 new_db_params = new_field.db_parameters(connection=self.connection)
1038 sql = (
1039 self.sql_alter_column_null
1040 if new_field.allow_null
1041 else self.sql_alter_column_not_null
1042 )
1043 return (
1044 sql
1045 % {
1046 "column": self.quote_name(new_field.column),
1047 "type": new_db_params["type"],
1048 },
1049 [],
1050 )
1051
1052 def _alter_column_default_sql(self, model, old_field, new_field, drop=False):
1053 """
1054 Hook to specialize column default alteration.
1055
1056 Return a (sql, params) fragment to add or drop (depending on the drop
1057 argument) a default to new_field's column.
1058 """
1059 new_default = self.effective_default(new_field)
1060 default = self._column_default_sql(new_field)
1061 params = [new_default]
1062
1063 if drop:
1064 params = []
1065 elif self.connection.features.requires_literal_defaults:
1066 # Some databases (Oracle) can't take defaults as a parameter
1067 # If this is the case, the SchemaEditor for that database should
1068 # implement prepare_default().
1069 default = self.prepare_default(new_default)
1070 params = []
1071
1072 new_db_params = new_field.db_parameters(connection=self.connection)
1073 if drop:
1074 if new_field.allow_null:
1075 sql = self.sql_alter_column_no_default_null
1076 else:
1077 sql = self.sql_alter_column_no_default
1078 else:
1079 sql = self.sql_alter_column_default
1080 return (
1081 sql
1082 % {
1083 "column": self.quote_name(new_field.column),
1084 "type": new_db_params["type"],
1085 "default": default,
1086 },
1087 params,
1088 )
1089
1090 def _alter_column_type_sql(
1091 self, model, old_field, new_field, new_type, old_collation, new_collation
1092 ):
1093 """
1094 Hook to specialize column type alteration for different backends,
1095 for cases when a creation type is different to an alteration type
1096 (e.g. SERIAL in PostgreSQL, PostGIS fields).
1097
1098 Return a two-tuple of: an SQL fragment of (sql, params) to insert into
1099 an ALTER TABLE statement and a list of extra (sql, params) tuples to
1100 run once the field is altered.
1101 """
1102 other_actions = []
1103 if collate_sql := self._collate_sql(
1104 new_collation, old_collation, model._meta.db_table
1105 ):
1106 collate_sql = f" {collate_sql}"
1107 else:
1108 collate_sql = ""
1109 # Comment change?
1110 comment_sql = ""
1111 if self.connection.features.supports_comments and not new_field.many_to_many:
1112 if old_field.db_comment != new_field.db_comment:
1113 # PostgreSQL and Oracle can't execute 'ALTER COLUMN ...' and
1114 # 'COMMENT ON ...' at the same time.
1115 sql, params = self._alter_column_comment_sql(
1116 model, new_field, new_type, new_field.db_comment
1117 )
1118 if sql:
1119 other_actions.append((sql, params))
1120 if new_field.db_comment:
1121 comment_sql = self._comment_sql(new_field.db_comment)
1122 return (
1123 (
1124 self.sql_alter_column_type
1125 % {
1126 "column": self.quote_name(new_field.column),
1127 "type": new_type,
1128 "collation": collate_sql,
1129 "comment": comment_sql,
1130 },
1131 [],
1132 ),
1133 other_actions,
1134 )
1135
1136 def _alter_column_comment_sql(self, model, new_field, new_type, new_db_comment):
1137 return (
1138 self.sql_alter_column_comment
1139 % {
1140 "table": self.quote_name(model._meta.db_table),
1141 "column": self.quote_name(new_field.column),
1142 "comment": self._comment_sql(new_db_comment),
1143 },
1144 [],
1145 )
1146
1147 def _comment_sql(self, comment):
1148 return self.quote_value(comment or "")
1149
1150 def _alter_many_to_many(self, model, old_field, new_field, strict):
1151 """Alter M2Ms to repoint their to= endpoints."""
1152 # Rename the through table
1153 if (
1154 old_field.remote_field.through._meta.db_table
1155 != new_field.remote_field.through._meta.db_table
1156 ):
1157 self.alter_db_table(
1158 old_field.remote_field.through,
1159 old_field.remote_field.through._meta.db_table,
1160 new_field.remote_field.through._meta.db_table,
1161 )
1162 # Repoint the FK to the other side
1163 self.alter_field(
1164 new_field.remote_field.through,
1165 # The field that points to the target model is needed, so we can
1166 # tell alter_field to change it - this is m2m_reverse_field_name()
1167 # (as opposed to m2m_field_name(), which points to our model).
1168 old_field.remote_field.through._meta.get_field(
1169 old_field.m2m_reverse_field_name()
1170 ),
1171 new_field.remote_field.through._meta.get_field(
1172 new_field.m2m_reverse_field_name()
1173 ),
1174 )
1175 self.alter_field(
1176 new_field.remote_field.through,
1177 # for self-referential models we need to alter field from the other end too
1178 old_field.remote_field.through._meta.get_field(old_field.m2m_field_name()),
1179 new_field.remote_field.through._meta.get_field(new_field.m2m_field_name()),
1180 )
1181
1182 def _create_index_name(self, table_name, column_names, suffix=""):
1183 """
1184 Generate a unique name for an index/unique constraint.
1185
1186 The name is divided into 3 parts: the table name, the column names,
1187 and a unique digest and suffix.
1188 """
1189 _, table_name = split_identifier(table_name)
1190 hash_suffix_part = (
1191 f"{names_digest(table_name, *column_names, length=8)}{suffix}"
1192 )
1193 max_length = self.connection.ops.max_name_length() or 200
1194 # If everything fits into max_length, use that name.
1195 index_name = "{}_{}_{}".format(
1196 table_name, "_".join(column_names), hash_suffix_part
1197 )
1198 if len(index_name) <= max_length:
1199 return index_name
1200 # Shorten a long suffix.
1201 if len(hash_suffix_part) > max_length / 3:
1202 hash_suffix_part = hash_suffix_part[: max_length // 3]
1203 other_length = (max_length - len(hash_suffix_part)) // 2 - 1
1204 index_name = "{}_{}_{}".format(
1205 table_name[:other_length],
1206 "_".join(column_names)[:other_length],
1207 hash_suffix_part,
1208 )
1209 # Prepend D if needed to prevent the name from starting with an
1210 # underscore or a number (not permitted on Oracle).
1211 if index_name[0] == "_" or index_name[0].isdigit():
1212 index_name = f"D{index_name[:-1]}"
1213 return index_name
1214
1215 def _index_condition_sql(self, condition):
1216 if condition:
1217 return " WHERE " + condition
1218 return ""
1219
1220 def _index_include_sql(self, model, columns):
1221 if not columns or not self.connection.features.supports_covering_indexes:
1222 return ""
1223 return Statement(
1224 " INCLUDE (%(columns)s)",
1225 columns=Columns(model._meta.db_table, columns, self.quote_name),
1226 )
1227
1228 def _create_index_sql(
1229 self,
1230 model,
1231 *,
1232 fields=None,
1233 name=None,
1234 suffix="",
1235 using="",
1236 col_suffixes=(),
1237 sql=None,
1238 opclasses=(),
1239 condition=None,
1240 include=None,
1241 expressions=None,
1242 ):
1243 """
1244 Return the SQL statement to create the index for one or several fields
1245 or expressions. `sql` can be specified if the syntax differs from the
1246 standard (GIS indexes, ...).
1247 """
1248 fields = fields or []
1249 expressions = expressions or []
1250 compiler = Query(model, alias_cols=False).get_compiler()
1251 columns = [field.column for field in fields]
1252 sql_create_index = sql or self.sql_create_index
1253 table = model._meta.db_table
1254
1255 def create_index_name(*args, **kwargs):
1256 nonlocal name
1257 if name is None:
1258 name = self._create_index_name(*args, **kwargs)
1259 return self.quote_name(name)
1260
1261 return Statement(
1262 sql_create_index,
1263 table=Table(table, self.quote_name),
1264 name=IndexName(table, columns, suffix, create_index_name),
1265 using=using,
1266 columns=(
1267 self._index_columns(table, columns, col_suffixes, opclasses)
1268 if columns
1269 else Expressions(table, expressions, compiler, self.quote_value)
1270 ),
1271 extra="",
1272 condition=self._index_condition_sql(condition),
1273 include=self._index_include_sql(model, include),
1274 )
1275
1276 def _delete_index_sql(self, model, name, sql=None):
1277 return Statement(
1278 sql or self.sql_delete_index,
1279 table=Table(model._meta.db_table, self.quote_name),
1280 name=self.quote_name(name),
1281 )
1282
1283 def _rename_index_sql(self, model, old_name, new_name):
1284 return Statement(
1285 self.sql_rename_index,
1286 table=Table(model._meta.db_table, self.quote_name),
1287 old_name=self.quote_name(old_name),
1288 new_name=self.quote_name(new_name),
1289 )
1290
1291 def _index_columns(self, table, columns, col_suffixes, opclasses):
1292 return Columns(table, columns, self.quote_name, col_suffixes=col_suffixes)
1293
1294 def _model_indexes_sql(self, model):
1295 """
1296 Return a list of all index SQL statements (field indexes, Meta.indexes) for the specified model.
1297 """
1298 output = []
1299 for field in model._meta.local_fields:
1300 output.extend(self._field_indexes_sql(model, field))
1301
1302 for index in model._meta.indexes:
1303 if (
1304 not index.contains_expressions
1305 or self.connection.features.supports_expression_indexes
1306 ):
1307 output.append(index.create_sql(model, self))
1308 return output
1309
1310 def _field_indexes_sql(self, model, field):
1311 """
1312 Return a list of all index SQL statements for the specified field.
1313 """
1314 output = []
1315 if self._field_should_be_indexed(model, field):
1316 output.append(self._create_index_sql(model, fields=[field]))
1317 return output
1318
1319 def _field_should_be_altered(self, old_field, new_field, ignore=None):
1320 ignore = ignore or set()
1321 _, old_path, old_args, old_kwargs = old_field.deconstruct()
1322 _, new_path, new_args, new_kwargs = new_field.deconstruct()
1323 # Don't alter when:
1324 # - changing only a field name
1325 # - changing an attribute that doesn't affect the schema
1326 # - changing an attribute in the provided set of ignored attributes
1327 # - adding only a db_column and the column name is not changed
1328 for attr in ignore.union(old_field.non_db_attrs):
1329 old_kwargs.pop(attr, None)
1330 for attr in ignore.union(new_field.non_db_attrs):
1331 new_kwargs.pop(attr, None)
1332 return self.quote_name(old_field.column) != self.quote_name(
1333 new_field.column
1334 ) or (old_path, old_args, old_kwargs) != (new_path, new_args, new_kwargs)
1335
1336 def _field_should_be_indexed(self, model, field):
1337 return (field.remote_field and field.db_index) and not field.primary_key
1338
1339 def _field_became_primary_key(self, old_field, new_field):
1340 return not old_field.primary_key and new_field.primary_key
1341
1342 def _rename_field_sql(self, table, old_field, new_field, new_type):
1343 return self.sql_rename_column % {
1344 "table": self.quote_name(table),
1345 "old_column": self.quote_name(old_field.column),
1346 "new_column": self.quote_name(new_field.column),
1347 "type": new_type,
1348 }
1349
1350 def _create_fk_sql(self, model, field, suffix):
1351 table = Table(model._meta.db_table, self.quote_name)
1352 name = self._fk_constraint_name(model, field, suffix)
1353 column = Columns(model._meta.db_table, [field.column], self.quote_name)
1354 to_table = Table(field.target_field.model._meta.db_table, self.quote_name)
1355 to_column = Columns(
1356 field.target_field.model._meta.db_table,
1357 [field.target_field.column],
1358 self.quote_name,
1359 )
1360 deferrable = self.connection.ops.deferrable_sql()
1361 return Statement(
1362 self.sql_create_fk,
1363 table=table,
1364 name=name,
1365 column=column,
1366 to_table=to_table,
1367 to_column=to_column,
1368 deferrable=deferrable,
1369 )
1370
1371 def _fk_constraint_name(self, model, field, suffix):
1372 def create_fk_name(*args, **kwargs):
1373 return self.quote_name(self._create_index_name(*args, **kwargs))
1374
1375 return ForeignKeyName(
1376 model._meta.db_table,
1377 [field.column],
1378 split_identifier(field.target_field.model._meta.db_table)[1],
1379 [field.target_field.column],
1380 suffix,
1381 create_fk_name,
1382 )
1383
1384 def _delete_fk_sql(self, model, name):
1385 return self._delete_constraint_sql(self.sql_delete_fk, model, name)
1386
1387 def _deferrable_constraint_sql(self, deferrable):
1388 if deferrable is None:
1389 return ""
1390 if deferrable == Deferrable.DEFERRED:
1391 return " DEFERRABLE INITIALLY DEFERRED"
1392 if deferrable == Deferrable.IMMEDIATE:
1393 return " DEFERRABLE INITIALLY IMMEDIATE"
1394
1395 def _unique_sql(
1396 self,
1397 model,
1398 fields,
1399 name,
1400 condition=None,
1401 deferrable=None,
1402 include=None,
1403 opclasses=None,
1404 expressions=None,
1405 ):
1406 if (
1407 deferrable
1408 and not self.connection.features.supports_deferrable_unique_constraints
1409 ):
1410 return None
1411 if condition or include or opclasses or expressions:
1412 # Databases support conditional, covering, and functional unique
1413 # constraints via a unique index.
1414 sql = self._create_unique_sql(
1415 model,
1416 fields,
1417 name=name,
1418 condition=condition,
1419 include=include,
1420 opclasses=opclasses,
1421 expressions=expressions,
1422 )
1423 if sql:
1424 self.deferred_sql.append(sql)
1425 return None
1426 constraint = self.sql_unique_constraint % {
1427 "columns": ", ".join([self.quote_name(field.column) for field in fields]),
1428 "deferrable": self._deferrable_constraint_sql(deferrable),
1429 }
1430 return self.sql_constraint % {
1431 "name": self.quote_name(name),
1432 "constraint": constraint,
1433 }
1434
1435 def _create_unique_sql(
1436 self,
1437 model,
1438 fields,
1439 name=None,
1440 condition=None,
1441 deferrable=None,
1442 include=None,
1443 opclasses=None,
1444 expressions=None,
1445 ):
1446 if (
1447 (
1448 deferrable
1449 and not self.connection.features.supports_deferrable_unique_constraints
1450 )
1451 or (condition and not self.connection.features.supports_partial_indexes)
1452 or (include and not self.connection.features.supports_covering_indexes)
1453 or (
1454 expressions and not self.connection.features.supports_expression_indexes
1455 )
1456 ):
1457 return None
1458
1459 compiler = Query(model, alias_cols=False).get_compiler()
1460 table = model._meta.db_table
1461 columns = [field.column for field in fields]
1462 if name is None:
1463 name = self._unique_constraint_name(table, columns, quote=True)
1464 else:
1465 name = self.quote_name(name)
1466 if condition or include or opclasses or expressions:
1467 sql = self.sql_create_unique_index
1468 else:
1469 sql = self.sql_create_unique
1470 if columns:
1471 columns = self._index_columns(
1472 table, columns, col_suffixes=(), opclasses=opclasses
1473 )
1474 else:
1475 columns = Expressions(table, expressions, compiler, self.quote_value)
1476 return Statement(
1477 sql,
1478 table=Table(table, self.quote_name),
1479 name=name,
1480 columns=columns,
1481 condition=self._index_condition_sql(condition),
1482 deferrable=self._deferrable_constraint_sql(deferrable),
1483 include=self._index_include_sql(model, include),
1484 )
1485
1486 def _unique_constraint_name(self, table, columns, quote=True):
1487 if quote:
1488
1489 def create_unique_name(*args, **kwargs):
1490 return self.quote_name(self._create_index_name(*args, **kwargs))
1491
1492 else:
1493 create_unique_name = self._create_index_name
1494
1495 return IndexName(table, columns, "_uniq", create_unique_name)
1496
1497 def _delete_unique_sql(
1498 self,
1499 model,
1500 name,
1501 condition=None,
1502 deferrable=None,
1503 include=None,
1504 opclasses=None,
1505 expressions=None,
1506 ):
1507 if (
1508 (
1509 deferrable
1510 and not self.connection.features.supports_deferrable_unique_constraints
1511 )
1512 or (condition and not self.connection.features.supports_partial_indexes)
1513 or (include and not self.connection.features.supports_covering_indexes)
1514 or (
1515 expressions and not self.connection.features.supports_expression_indexes
1516 )
1517 ):
1518 return None
1519 if condition or include or opclasses or expressions:
1520 sql = self.sql_delete_index
1521 else:
1522 sql = self.sql_delete_unique
1523 return self._delete_constraint_sql(sql, model, name)
1524
1525 def _check_sql(self, name, check):
1526 return self.sql_constraint % {
1527 "name": self.quote_name(name),
1528 "constraint": self.sql_check_constraint % {"check": check},
1529 }
1530
1531 def _create_check_sql(self, model, name, check):
1532 if not self.connection.features.supports_table_check_constraints:
1533 return None
1534 return Statement(
1535 self.sql_create_check,
1536 table=Table(model._meta.db_table, self.quote_name),
1537 name=self.quote_name(name),
1538 check=check,
1539 )
1540
1541 def _delete_check_sql(self, model, name):
1542 if not self.connection.features.supports_table_check_constraints:
1543 return None
1544 return self._delete_constraint_sql(self.sql_delete_check, model, name)
1545
1546 def _delete_constraint_sql(self, template, model, name):
1547 return Statement(
1548 template,
1549 table=Table(model._meta.db_table, self.quote_name),
1550 name=self.quote_name(name),
1551 )
1552
1553 def _constraint_names(
1554 self,
1555 model,
1556 column_names=None,
1557 unique=None,
1558 primary_key=None,
1559 index=None,
1560 foreign_key=None,
1561 check=None,
1562 type_=None,
1563 exclude=None,
1564 ):
1565 """Return all constraint names matching the columns and conditions."""
1566 if column_names is not None:
1567 column_names = [
1568 self.connection.introspection.identifier_converter(
1569 truncate_name(name, self.connection.ops.max_name_length())
1570 )
1571 if self.connection.features.truncates_names
1572 else self.connection.introspection.identifier_converter(name)
1573 for name in column_names
1574 ]
1575 with self.connection.cursor() as cursor:
1576 constraints = self.connection.introspection.get_constraints(
1577 cursor, model._meta.db_table
1578 )
1579 result = []
1580 for name, infodict in constraints.items():
1581 if column_names is None or column_names == infodict["columns"]:
1582 if unique is not None and infodict["unique"] != unique:
1583 continue
1584 if primary_key is not None and infodict["primary_key"] != primary_key:
1585 continue
1586 if index is not None and infodict["index"] != index:
1587 continue
1588 if check is not None and infodict["check"] != check:
1589 continue
1590 if foreign_key is not None and not infodict["foreign_key"]:
1591 continue
1592 if type_ is not None and infodict["type"] != type_:
1593 continue
1594 if not exclude or name not in exclude:
1595 result.append(name)
1596 return result
1597
1598 def _delete_primary_key(self, model, strict=False):
1599 constraint_names = self._constraint_names(model, primary_key=True)
1600 if strict and len(constraint_names) != 1:
1601 raise ValueError(
1602 f"Found wrong number ({len(constraint_names)}) of PK constraints for {model._meta.db_table}"
1603 )
1604 for constraint_name in constraint_names:
1605 self.execute(self._delete_primary_key_sql(model, constraint_name))
1606
1607 def _create_primary_key_sql(self, model, field):
1608 return Statement(
1609 self.sql_create_pk,
1610 table=Table(model._meta.db_table, self.quote_name),
1611 name=self.quote_name(
1612 self._create_index_name(
1613 model._meta.db_table, [field.column], suffix="_pk"
1614 )
1615 ),
1616 columns=Columns(model._meta.db_table, [field.column], self.quote_name),
1617 )
1618
1619 def _delete_primary_key_sql(self, model, name):
1620 return self._delete_constraint_sql(self.sql_delete_pk, model, name)
1621
1622 def _collate_sql(self, collation, old_collation=None, table_name=None):
1623 return "COLLATE " + self.quote_name(collation) if collation else ""