1import logging
2import operator
3from datetime import datetime
4
5from plain.models.backends.ddl_references import (
6 Columns,
7 Expressions,
8 ForeignKeyName,
9 IndexName,
10 Statement,
11 Table,
12)
13from plain.models.backends.utils import names_digest, split_identifier, truncate_name
14from plain.models.constraints import Deferrable
15from plain.models.indexes import Index
16from plain.models.sql import Query
17from plain.models.transaction import TransactionManagementError, atomic
18from plain.utils import timezone
19
20logger = logging.getLogger("plain.models.backends.schema")
21
22
23def _is_relevant_relation(relation, altered_field):
24 """
25 When altering the given field, must constraints on its model from the given
26 relation be temporarily dropped?
27 """
28 field = relation.field
29 if field.many_to_many:
30 # M2M reverse field
31 return False
32 if altered_field.primary_key:
33 # Foreign key constraint on the primary key, which is being altered.
34 return True
35 # ForeignKey always targets 'id'
36 return altered_field.name == "id"
37
38
39def _all_related_fields(model):
40 # Related fields must be returned in a deterministic order.
41 return sorted(
42 model._meta._get_fields(
43 forward=False,
44 reverse=True,
45 include_hidden=True,
46 ),
47 key=operator.attrgetter("name"),
48 )
49
50
51def _related_non_m2m_objects(old_field, new_field):
52 # Filter out m2m objects from reverse relations.
53 # Return (old_relation, new_relation) tuples.
54 related_fields = zip(
55 (
56 obj
57 for obj in _all_related_fields(old_field.model)
58 if _is_relevant_relation(obj, old_field)
59 ),
60 (
61 obj
62 for obj in _all_related_fields(new_field.model)
63 if _is_relevant_relation(obj, new_field)
64 ),
65 )
66 for old_rel, new_rel in related_fields:
67 yield old_rel, new_rel
68 yield from _related_non_m2m_objects(
69 old_rel.remote_field,
70 new_rel.remote_field,
71 )
72
73
74class BaseDatabaseSchemaEditor:
75 """
76 This class and its subclasses are responsible for emitting schema-changing
77 statements to the databases - model creation/removal/alteration, field
78 renaming, index fiddling, and so on.
79 """
80
81 # Overrideable SQL templates
82 sql_create_table = "CREATE TABLE %(table)s (%(definition)s)"
83 sql_rename_table = "ALTER TABLE %(old_table)s RENAME TO %(new_table)s"
84 sql_delete_table = "DROP TABLE %(table)s CASCADE"
85
86 sql_create_column = "ALTER TABLE %(table)s ADD COLUMN %(column)s %(definition)s"
87 sql_alter_column = "ALTER TABLE %(table)s %(changes)s"
88 sql_alter_column_type = "ALTER COLUMN %(column)s TYPE %(type)s%(collation)s"
89 sql_alter_column_null = "ALTER COLUMN %(column)s DROP NOT NULL"
90 sql_alter_column_not_null = "ALTER COLUMN %(column)s SET NOT NULL"
91 sql_alter_column_default = "ALTER COLUMN %(column)s SET DEFAULT %(default)s"
92 sql_alter_column_no_default = "ALTER COLUMN %(column)s DROP DEFAULT"
93 sql_alter_column_no_default_null = sql_alter_column_no_default
94 sql_delete_column = "ALTER TABLE %(table)s DROP COLUMN %(column)s CASCADE"
95 sql_rename_column = (
96 "ALTER TABLE %(table)s RENAME COLUMN %(old_column)s TO %(new_column)s"
97 )
98 sql_update_with_default = (
99 "UPDATE %(table)s SET %(column)s = %(default)s WHERE %(column)s IS NULL"
100 )
101
102 sql_unique_constraint = "UNIQUE (%(columns)s)%(deferrable)s"
103 sql_check_constraint = "CHECK (%(check)s)"
104 sql_delete_constraint = "ALTER TABLE %(table)s DROP CONSTRAINT %(name)s"
105 sql_constraint = "CONSTRAINT %(name)s %(constraint)s"
106
107 sql_create_check = "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s CHECK (%(check)s)"
108 sql_delete_check = sql_delete_constraint
109
110 sql_create_unique = (
111 "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s "
112 "UNIQUE (%(columns)s)%(deferrable)s"
113 )
114 sql_delete_unique = sql_delete_constraint
115
116 sql_create_fk = (
117 "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s FOREIGN KEY (%(column)s) "
118 "REFERENCES %(to_table)s (%(to_column)s)%(deferrable)s"
119 )
120 sql_create_inline_fk = None
121 sql_create_column_inline_fk = None
122 sql_delete_fk = sql_delete_constraint
123
124 sql_create_index = (
125 "CREATE INDEX %(name)s ON %(table)s "
126 "(%(columns)s)%(include)s%(extra)s%(condition)s"
127 )
128 sql_create_unique_index = (
129 "CREATE UNIQUE INDEX %(name)s ON %(table)s "
130 "(%(columns)s)%(include)s%(condition)s"
131 )
132 sql_rename_index = "ALTER INDEX %(old_name)s RENAME TO %(new_name)s"
133 sql_delete_index = "DROP INDEX %(name)s"
134
135 sql_create_pk = (
136 "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s PRIMARY KEY (%(columns)s)"
137 )
138 sql_delete_pk = sql_delete_constraint
139
140 sql_alter_table_comment = "COMMENT ON TABLE %(table)s IS %(comment)s"
141 sql_alter_column_comment = "COMMENT ON COLUMN %(table)s.%(column)s IS %(comment)s"
142
143 def __init__(self, connection, collect_sql=False, atomic=True):
144 self.connection = connection
145 self.collect_sql = collect_sql
146 if self.collect_sql:
147 self.collected_sql = []
148 self.atomic_migration = self.connection.features.can_rollback_ddl and atomic
149
150 # State-managing methods
151
152 def __enter__(self):
153 self.deferred_sql = []
154 if self.atomic_migration:
155 self.atomic = atomic()
156 self.atomic.__enter__()
157 return self
158
159 def __exit__(self, exc_type, exc_value, traceback):
160 if exc_type is None:
161 for sql in self.deferred_sql:
162 self.execute(sql)
163 if self.atomic_migration:
164 self.atomic.__exit__(exc_type, exc_value, traceback)
165
166 # Core utility functions
167
168 def execute(self, sql, params=()):
169 """Execute the given SQL statement, with optional parameters."""
170 # Don't perform the transactional DDL check if SQL is being collected
171 # as it's not going to be executed anyway.
172 if (
173 not self.collect_sql
174 and self.connection.in_atomic_block
175 and not self.connection.features.can_rollback_ddl
176 ):
177 raise TransactionManagementError(
178 "Executing DDL statements while in a transaction on databases "
179 "that can't perform a rollback is prohibited."
180 )
181 # Account for non-string statement objects.
182 sql = str(sql)
183 # Log the command we're running, then run it
184 logger.debug(
185 "%s; (params %r)", sql, params, extra={"params": params, "sql": sql}
186 )
187 if self.collect_sql:
188 ending = "" if sql.rstrip().endswith(";") else ";"
189 if params is not None:
190 self.collected_sql.append(
191 (sql % tuple(map(self.quote_value, params))) + ending
192 )
193 else:
194 self.collected_sql.append(sql + ending)
195 else:
196 with self.connection.cursor() as cursor:
197 cursor.execute(sql, params)
198
199 def quote_name(self, name):
200 return self.connection.ops.quote_name(name)
201
202 def table_sql(self, model):
203 """Take a model and return its table definition."""
204 # Create column SQL, add FK deferreds if needed.
205 column_sqls = []
206 params = []
207 for field in model._meta.local_fields:
208 # SQL.
209 definition, extra_params = self.column_sql(model, field)
210 if definition is None:
211 continue
212 # Check constraints can go on the column SQL here.
213 db_params = field.db_parameters(connection=self.connection)
214 if db_params["check"]:
215 definition += " " + self.sql_check_constraint % db_params
216 # Autoincrement SQL (for backends with inline variant).
217 col_type_suffix = field.db_type_suffix(connection=self.connection)
218 if col_type_suffix:
219 definition += f" {col_type_suffix}"
220 params.extend(extra_params)
221 # FK.
222 if field.remote_field and field.db_constraint:
223 to_table = field.remote_field.model._meta.db_table
224 to_column = field.remote_field.model._meta.get_field(
225 field.remote_field.field_name
226 ).column
227 if self.sql_create_inline_fk:
228 definition += " " + self.sql_create_inline_fk % {
229 "to_table": self.quote_name(to_table),
230 "to_column": self.quote_name(to_column),
231 }
232 elif self.connection.features.supports_foreign_keys:
233 self.deferred_sql.append(
234 self._create_fk_sql(
235 model, field, "_fk_%(to_table)s_%(to_column)s"
236 )
237 )
238 # Add the SQL to our big list.
239 column_sqls.append(f"{self.quote_name(field.column)} {definition}")
240 # Autoincrement SQL (for backends with post table definition
241 # variant).
242 if field.get_internal_type() in ("PrimaryKeyField",):
243 autoinc_sql = self.connection.ops.autoinc_sql(
244 model._meta.db_table, field.column
245 )
246 if autoinc_sql:
247 self.deferred_sql.extend(autoinc_sql)
248 constraints = [
249 constraint.constraint_sql(model, self)
250 for constraint in model._meta.constraints
251 ]
252 sql = self.sql_create_table % {
253 "table": self.quote_name(model._meta.db_table),
254 "definition": ", ".join(
255 str(constraint)
256 for constraint in (*column_sqls, *constraints)
257 if constraint
258 ),
259 }
260 return sql, params
261
262 # Field <-> database mapping functions
263
264 def _iter_column_sql(
265 self, column_db_type, params, model, field, field_db_params, include_default
266 ):
267 yield column_db_type
268 if collation := field_db_params.get("collation"):
269 yield self._collate_sql(collation)
270 if self.connection.features.supports_comments_inline and field.db_comment:
271 yield self._comment_sql(field.db_comment)
272 # Work out nullability.
273 null = field.allow_null
274 # Include a default value, if requested.
275 include_default = (
276 include_default
277 and not self.skip_default(field)
278 and
279 # Don't include a default value if it's a nullable field and the
280 # default cannot be dropped in the ALTER COLUMN statement (e.g.
281 # MySQL longtext and longblob).
282 not (null and self.skip_default_on_alter(field))
283 )
284 if include_default:
285 default_value = self.effective_default(field)
286 if default_value is not None:
287 column_default = "DEFAULT " + self._column_default_sql(field)
288 if self.connection.features.requires_literal_defaults:
289 # Some databases can't take defaults as a parameter (Oracle).
290 # If this is the case, the individual schema backend should
291 # implement prepare_default().
292 yield column_default % self.prepare_default(default_value)
293 else:
294 yield column_default
295 params.append(default_value)
296
297 if not null:
298 yield "NOT NULL"
299 elif not self.connection.features.implied_column_null:
300 yield "NULL"
301
302 if field.primary_key:
303 yield "PRIMARY KEY"
304
305 def column_sql(self, model, field, include_default=False):
306 """
307 Return the column definition for a field. The field must already have
308 had set_attributes_from_name() called.
309 """
310 # Get the column's type and use that as the basis of the SQL.
311 field_db_params = field.db_parameters(connection=self.connection)
312 column_db_type = field_db_params["type"]
313 # Check for fields that aren't actually columns (e.g. M2M).
314 if column_db_type is None:
315 return None, None
316 params = []
317 return (
318 " ".join(
319 # This appends to the params being returned.
320 self._iter_column_sql(
321 column_db_type,
322 params,
323 model,
324 field,
325 field_db_params,
326 include_default,
327 )
328 ),
329 params,
330 )
331
332 def skip_default(self, field):
333 """
334 Some backends don't accept default values for certain columns types
335 (i.e. MySQL longtext and longblob).
336 """
337 return False
338
339 def skip_default_on_alter(self, field):
340 """
341 Some backends don't accept default values for certain columns types
342 (i.e. MySQL longtext and longblob) in the ALTER COLUMN statement.
343 """
344 return False
345
346 def prepare_default(self, value):
347 """
348 Only used for backends which have requires_literal_defaults feature
349 """
350 raise NotImplementedError(
351 "subclasses of BaseDatabaseSchemaEditor for backends which have "
352 "requires_literal_defaults must provide a prepare_default() method"
353 )
354
355 def _column_default_sql(self, field):
356 """
357 Return the SQL to use in a DEFAULT clause. The resulting string should
358 contain a '%s' placeholder for a default value.
359 """
360 return "%s"
361
362 @staticmethod
363 def _effective_default(field):
364 # This method allows testing its logic without a connection.
365 if field.has_default():
366 default = field.get_default()
367 elif (
368 not field.allow_null and not field.required and field.empty_strings_allowed
369 ):
370 if field.get_internal_type() == "BinaryField":
371 default = b""
372 else:
373 default = ""
374 elif getattr(field, "auto_now", False) or getattr(field, "auto_now_add", False):
375 internal_type = field.get_internal_type()
376 if internal_type == "DateTimeField":
377 default = timezone.now()
378 else:
379 default = datetime.now()
380 if internal_type == "DateField":
381 default = default.date()
382 elif internal_type == "TimeField":
383 default = default.time()
384 else:
385 default = None
386 return default
387
388 def effective_default(self, field):
389 """Return a field's effective database default value."""
390 return field.get_db_prep_save(self._effective_default(field), self.connection)
391
392 def quote_value(self, value):
393 """
394 Return a quoted version of the value so it's safe to use in an SQL
395 string. This is not safe against injection from user code; it is
396 intended only for use in making SQL scripts or preparing default values
397 for particularly tricky backends (defaults are not user-defined, though,
398 so this is safe).
399 """
400 raise NotImplementedError()
401
402 # Actions
403
404 def create_model(self, model):
405 """
406 Create a table and any accompanying indexes or unique constraints for
407 the given `model`.
408 """
409 sql, params = self.table_sql(model)
410 # Prevent using [] as params, in the case a literal '%' is used in the
411 # definition.
412 self.execute(sql, params or None)
413
414 if self.connection.features.supports_comments:
415 # Add table comment.
416 if model._meta.db_table_comment:
417 self.alter_db_table_comment(model, None, model._meta.db_table_comment)
418 # Add column comments.
419 if not self.connection.features.supports_comments_inline:
420 for field in model._meta.local_fields:
421 if field.db_comment:
422 field_db_params = field.db_parameters(
423 connection=self.connection
424 )
425 field_type = field_db_params["type"]
426 self.execute(
427 *self._alter_column_comment_sql(
428 model, field, field_type, field.db_comment
429 )
430 )
431 # Add any field index (deferred as SQLite _remake_table needs it).
432 self.deferred_sql.extend(self._model_indexes_sql(model))
433
434 def delete_model(self, model):
435 """Delete a model from the database."""
436
437 # Delete the table
438 self.execute(
439 self.sql_delete_table
440 % {
441 "table": self.quote_name(model._meta.db_table),
442 }
443 )
444 # Remove all deferred statements referencing the deleted table.
445 for sql in list(self.deferred_sql):
446 if isinstance(sql, Statement) and sql.references_table(
447 model._meta.db_table
448 ):
449 self.deferred_sql.remove(sql)
450
451 def add_index(self, model, index):
452 """Add an index on a model."""
453 if (
454 index.contains_expressions
455 and not self.connection.features.supports_expression_indexes
456 ):
457 return None
458 # Index.create_sql returns interpolated SQL which makes params=None a
459 # necessity to avoid escaping attempts on execution.
460 self.execute(index.create_sql(model, self), params=None)
461
462 def remove_index(self, model, index):
463 """Remove an index from a model."""
464 if (
465 index.contains_expressions
466 and not self.connection.features.supports_expression_indexes
467 ):
468 return None
469 self.execute(index.remove_sql(model, self))
470
471 def rename_index(self, model, old_index, new_index):
472 if self.connection.features.can_rename_index:
473 self.execute(
474 self._rename_index_sql(model, old_index.name, new_index.name),
475 params=None,
476 )
477 else:
478 self.remove_index(model, old_index)
479 self.add_index(model, new_index)
480
481 def add_constraint(self, model, constraint):
482 """Add a constraint to a model."""
483 sql = constraint.create_sql(model, self)
484 if sql:
485 # Constraint.create_sql returns interpolated SQL which makes
486 # params=None a necessity to avoid escaping attempts on execution.
487 self.execute(sql, params=None)
488
489 def remove_constraint(self, model, constraint):
490 """Remove a constraint from a model."""
491 sql = constraint.remove_sql(model, self)
492 if sql:
493 self.execute(sql)
494
495 def alter_db_table(self, model, old_db_table, new_db_table):
496 """Rename the table a model points to."""
497 if old_db_table == new_db_table or (
498 self.connection.features.ignores_table_name_case
499 and old_db_table.lower() == new_db_table.lower()
500 ):
501 return
502 self.execute(
503 self.sql_rename_table
504 % {
505 "old_table": self.quote_name(old_db_table),
506 "new_table": self.quote_name(new_db_table),
507 }
508 )
509 # Rename all references to the old table name.
510 for sql in self.deferred_sql:
511 if isinstance(sql, Statement):
512 sql.rename_table_references(old_db_table, new_db_table)
513
514 def alter_db_table_comment(self, model, old_db_table_comment, new_db_table_comment):
515 self.execute(
516 self.sql_alter_table_comment
517 % {
518 "table": self.quote_name(model._meta.db_table),
519 "comment": self.quote_value(new_db_table_comment or ""),
520 }
521 )
522
523 def add_field(self, model, field):
524 """
525 Create a field on a model. Usually involves adding a column, but may
526 involve adding a table instead (for M2M fields).
527 """
528 # Get the column's definition
529 definition, params = self.column_sql(model, field, include_default=True)
530 # It might not actually have a column behind it
531 if definition is None:
532 return
533 if col_type_suffix := field.db_type_suffix(connection=self.connection):
534 definition += f" {col_type_suffix}"
535 # Check constraints can go on the column SQL here
536 db_params = field.db_parameters(connection=self.connection)
537 if db_params["check"]:
538 definition += " " + self.sql_check_constraint % db_params
539 if (
540 field.remote_field
541 and self.connection.features.supports_foreign_keys
542 and field.db_constraint
543 ):
544 constraint_suffix = "_fk_%(to_table)s_%(to_column)s"
545 # Add FK constraint inline, if supported.
546 if self.sql_create_column_inline_fk:
547 to_table = field.remote_field.model._meta.db_table
548 to_column = field.remote_field.model._meta.get_field(
549 field.remote_field.field_name
550 ).column
551 namespace, _ = split_identifier(model._meta.db_table)
552 definition += " " + self.sql_create_column_inline_fk % {
553 "name": self._fk_constraint_name(model, field, constraint_suffix),
554 "namespace": f"{self.quote_name(namespace)}." if namespace else "",
555 "column": self.quote_name(field.column),
556 "to_table": self.quote_name(to_table),
557 "to_column": self.quote_name(to_column),
558 "deferrable": self.connection.ops.deferrable_sql(),
559 }
560 # Otherwise, add FK constraints later.
561 else:
562 self.deferred_sql.append(
563 self._create_fk_sql(model, field, constraint_suffix)
564 )
565 # Build the SQL and run it
566 sql = self.sql_create_column % {
567 "table": self.quote_name(model._meta.db_table),
568 "column": self.quote_name(field.column),
569 "definition": definition,
570 }
571 self.execute(sql, params)
572 # Drop the default if we need to
573 # (Plain usually does not use in-database defaults)
574 if (
575 not self.skip_default_on_alter(field)
576 and self.effective_default(field) is not None
577 ):
578 changes_sql, params = self._alter_column_default_sql(
579 model, None, field, drop=True
580 )
581 sql = self.sql_alter_column % {
582 "table": self.quote_name(model._meta.db_table),
583 "changes": changes_sql,
584 }
585 self.execute(sql, params)
586 # Add field comment, if required.
587 if (
588 field.db_comment
589 and self.connection.features.supports_comments
590 and not self.connection.features.supports_comments_inline
591 ):
592 field_type = db_params["type"]
593 self.execute(
594 *self._alter_column_comment_sql(
595 model, field, field_type, field.db_comment
596 )
597 )
598 # Add an index, if required
599 self.deferred_sql.extend(self._field_indexes_sql(model, field))
600 # Reset connection if required
601 if self.connection.features.connection_persists_old_columns:
602 self.connection.close()
603
604 def remove_field(self, model, field):
605 """
606 Remove a field from a model. Usually involves deleting a column,
607 but for M2Ms may involve deleting a table.
608 """
609 # It might not actually have a column behind it
610 if field.db_parameters(connection=self.connection)["type"] is None:
611 return
612 # Drop any FK constraints, MySQL requires explicit deletion
613 if field.remote_field:
614 fk_names = self._constraint_names(model, [field.column], foreign_key=True)
615 for fk_name in fk_names:
616 self.execute(self._delete_fk_sql(model, fk_name))
617 # Delete the column
618 sql = self.sql_delete_column % {
619 "table": self.quote_name(model._meta.db_table),
620 "column": self.quote_name(field.column),
621 }
622 self.execute(sql)
623 # Reset connection if required
624 if self.connection.features.connection_persists_old_columns:
625 self.connection.close()
626 # Remove all deferred statements referencing the deleted column.
627 for sql in list(self.deferred_sql):
628 if isinstance(sql, Statement) and sql.references_column(
629 model._meta.db_table, field.column
630 ):
631 self.deferred_sql.remove(sql)
632
633 def alter_field(self, model, old_field, new_field, strict=False):
634 """
635 Allow a field's type, uniqueness, nullability, default, column,
636 constraints, etc. to be modified.
637 `old_field` is required to compute the necessary changes.
638 If `strict` is True, raise errors if the old column does not match
639 `old_field` precisely.
640 """
641 if not self._field_should_be_altered(old_field, new_field):
642 return
643 # Ensure this field is even column-based
644 old_db_params = old_field.db_parameters(connection=self.connection)
645 old_type = old_db_params["type"]
646 new_db_params = new_field.db_parameters(connection=self.connection)
647 new_type = new_db_params["type"]
648 if (old_type is None and old_field.remote_field is None) or (
649 new_type is None and new_field.remote_field is None
650 ):
651 raise ValueError(
652 f"Cannot alter field {old_field} into {new_field} - they do not properly define "
653 "db_type (are you using a badly-written custom field?)",
654 )
655 elif (
656 old_type is None
657 and new_type is None
658 and (old_field.remote_field.through and new_field.remote_field.through)
659 ):
660 # Both sides have through models; this is a no-op.
661 return
662 elif old_type is None or new_type is None:
663 raise ValueError(
664 f"Cannot alter field {old_field} into {new_field} - they are not compatible types "
665 "(you cannot alter to or from M2M fields, or add or remove "
666 "through= on M2M fields)"
667 )
668
669 self._alter_field(
670 model,
671 old_field,
672 new_field,
673 old_type,
674 new_type,
675 old_db_params,
676 new_db_params,
677 strict,
678 )
679
680 def _field_db_check(self, field, field_db_params):
681 # Always check constraints with the same mocked column name to avoid
682 # recreating constrains when the column is renamed.
683 check_constraints = self.connection.data_type_check_constraints
684 data = field.db_type_parameters(self.connection)
685 data["column"] = "__column_name__"
686 try:
687 return check_constraints[field.get_internal_type()] % data
688 except KeyError:
689 return None
690
691 def _alter_field(
692 self,
693 model,
694 old_field,
695 new_field,
696 old_type,
697 new_type,
698 old_db_params,
699 new_db_params,
700 strict=False,
701 ):
702 """Perform a "physical" (non-ManyToMany) field update."""
703 # Drop any FK constraints, we'll remake them later
704 fks_dropped = set()
705 if (
706 self.connection.features.supports_foreign_keys
707 and old_field.remote_field
708 and old_field.db_constraint
709 and self._field_should_be_altered(
710 old_field,
711 new_field,
712 ignore={"db_comment"},
713 )
714 ):
715 fk_names = self._constraint_names(
716 model, [old_field.column], foreign_key=True
717 )
718 if strict and len(fk_names) != 1:
719 raise ValueError(
720 f"Found wrong number ({len(fk_names)}) of foreign key constraints for {model._meta.db_table}.{old_field.column}"
721 )
722 for fk_name in fk_names:
723 fks_dropped.add((old_field.column,))
724 self.execute(self._delete_fk_sql(model, fk_name))
725 # Has unique been removed?
726 if old_field.primary_key and (
727 not new_field.primary_key
728 or self._field_became_primary_key(old_field, new_field)
729 ):
730 # Find the unique constraint for this field
731 meta_constraint_names = {
732 constraint.name for constraint in model._meta.constraints
733 }
734 constraint_names = self._constraint_names(
735 model,
736 [old_field.column],
737 unique=True,
738 primary_key=False,
739 exclude=meta_constraint_names,
740 )
741 if strict and len(constraint_names) != 1:
742 raise ValueError(
743 f"Found wrong number ({len(constraint_names)}) of unique constraints for {model._meta.db_table}.{old_field.column}"
744 )
745 for constraint_name in constraint_names:
746 self.execute(self._delete_unique_sql(model, constraint_name))
747 # Drop incoming FK constraints if the field is a primary key or unique,
748 # which might be a to_field target, and things are going to change.
749 old_collation = old_db_params.get("collation")
750 new_collation = new_db_params.get("collation")
751 drop_foreign_keys = (
752 self.connection.features.supports_foreign_keys
753 and (old_field.primary_key and new_field.primary_key)
754 and ((old_type != new_type) or (old_collation != new_collation))
755 )
756 if drop_foreign_keys:
757 # '_meta.related_field' also contains M2M reverse fields, these
758 # will be filtered out
759 for _old_rel, new_rel in _related_non_m2m_objects(old_field, new_field):
760 rel_fk_names = self._constraint_names(
761 new_rel.related_model, [new_rel.field.column], foreign_key=True
762 )
763 for fk_name in rel_fk_names:
764 self.execute(self._delete_fk_sql(new_rel.related_model, fk_name))
765 # Removed an index? (no strict check, as multiple indexes are possible)
766 # Remove indexes if db_index switched to False or a unique constraint
767 # will now be used in lieu of an index. The following lines from the
768 # truth table show all True cases; the rest are False:
769 #
770 # old_field.db_index | old_field.primary_key | new_field.db_index | new_field.primary_key
771 # ------------------------------------------------------------------------------
772 # True | False | False | False
773 # True | False | False | True
774 # True | False | True | True
775 if (
776 (old_field.remote_field and old_field.db_index)
777 and not old_field.primary_key
778 and (
779 not (new_field.remote_field and new_field.db_index)
780 or new_field.primary_key
781 )
782 ):
783 # Find the index for this field
784 meta_index_names = {index.name for index in model._meta.indexes}
785 # Retrieve only BTREE indexes since this is what's created with
786 # db_index=True.
787 index_names = self._constraint_names(
788 model,
789 [old_field.column],
790 index=True,
791 type_=Index.suffix,
792 exclude=meta_index_names,
793 )
794 for index_name in index_names:
795 # The only way to check if an index was created with
796 # db_index=True or with Index(['field'], name='foo')
797 # is to look at its name (refs #28053).
798 self.execute(self._delete_index_sql(model, index_name))
799 # Change check constraints?
800 old_db_check = self._field_db_check(old_field, old_db_params)
801 new_db_check = self._field_db_check(new_field, new_db_params)
802 if old_db_check != new_db_check and old_db_check:
803 meta_constraint_names = {
804 constraint.name for constraint in model._meta.constraints
805 }
806 constraint_names = self._constraint_names(
807 model,
808 [old_field.column],
809 check=True,
810 exclude=meta_constraint_names,
811 )
812 if strict and len(constraint_names) != 1:
813 raise ValueError(
814 f"Found wrong number ({len(constraint_names)}) of check constraints for {model._meta.db_table}.{old_field.column}"
815 )
816 for constraint_name in constraint_names:
817 self.execute(self._delete_check_sql(model, constraint_name))
818 # Have they renamed the column?
819 if old_field.column != new_field.column:
820 self.execute(
821 self._rename_field_sql(
822 model._meta.db_table, old_field, new_field, new_type
823 )
824 )
825 # Rename all references to the renamed column.
826 for sql in self.deferred_sql:
827 if isinstance(sql, Statement):
828 sql.rename_column_references(
829 model._meta.db_table, old_field.column, new_field.column
830 )
831 # Next, start accumulating actions to do
832 actions = []
833 null_actions = []
834 post_actions = []
835 # Type suffix change? (e.g. auto increment).
836 old_type_suffix = old_field.db_type_suffix(connection=self.connection)
837 new_type_suffix = new_field.db_type_suffix(connection=self.connection)
838 # Type, collation, or comment change?
839 if (
840 old_type != new_type
841 or old_type_suffix != new_type_suffix
842 or old_collation != new_collation
843 or (
844 self.connection.features.supports_comments
845 and old_field.db_comment != new_field.db_comment
846 )
847 ):
848 fragment, other_actions = self._alter_column_type_sql(
849 model, old_field, new_field, new_type, old_collation, new_collation
850 )
851 actions.append(fragment)
852 post_actions.extend(other_actions)
853 # When changing a column NULL constraint to NOT NULL with a given
854 # default value, we need to perform 4 steps:
855 # 1. Add a default for new incoming writes
856 # 2. Update existing NULL rows with new default
857 # 3. Replace NULL constraint with NOT NULL
858 # 4. Drop the default again.
859 # Default change?
860 needs_database_default = False
861 if old_field.allow_null and not new_field.allow_null:
862 old_default = self.effective_default(old_field)
863 new_default = self.effective_default(new_field)
864 if (
865 not self.skip_default_on_alter(new_field)
866 and old_default != new_default
867 and new_default is not None
868 ):
869 needs_database_default = True
870 actions.append(
871 self._alter_column_default_sql(model, old_field, new_field)
872 )
873 # Nullability change?
874 if old_field.allow_null != new_field.allow_null:
875 fragment = self._alter_column_null_sql(model, old_field, new_field)
876 if fragment:
877 null_actions.append(fragment)
878 # Only if we have a default and there is a change from NULL to NOT NULL
879 four_way_default_alteration = new_field.has_default() and (
880 old_field.allow_null and not new_field.allow_null
881 )
882 if actions or null_actions:
883 if not four_way_default_alteration:
884 # If we don't have to do a 4-way default alteration we can
885 # directly run a (NOT) NULL alteration
886 actions += null_actions
887 # Combine actions together if we can (e.g. postgres)
888 if self.connection.features.supports_combined_alters and actions:
889 sql, params = tuple(zip(*actions))
890 actions = [(", ".join(sql), sum(params, []))]
891 # Apply those actions
892 for sql, params in actions:
893 self.execute(
894 self.sql_alter_column
895 % {
896 "table": self.quote_name(model._meta.db_table),
897 "changes": sql,
898 },
899 params,
900 )
901 if four_way_default_alteration:
902 # Update existing rows with default value
903 self.execute(
904 self.sql_update_with_default
905 % {
906 "table": self.quote_name(model._meta.db_table),
907 "column": self.quote_name(new_field.column),
908 "default": "%s",
909 },
910 [new_default],
911 )
912 # Since we didn't run a NOT NULL change before we need to do it
913 # now
914 for sql, params in null_actions:
915 self.execute(
916 self.sql_alter_column
917 % {
918 "table": self.quote_name(model._meta.db_table),
919 "changes": sql,
920 },
921 params,
922 )
923 if post_actions:
924 for sql, params in post_actions:
925 self.execute(sql, params)
926 # If primary_key changed to False, delete the primary key constraint.
927 if old_field.primary_key and not new_field.primary_key:
928 self._delete_primary_key(model, strict)
929
930 # Added an index? Add an index if db_index switched to True or a unique
931 # constraint will no longer be used in lieu of an index. The following
932 # lines from the truth table show all True cases; the rest are False:
933 #
934 # old_field.db_index | old_field.primary_key | new_field.db_index | new_field.primary_key
935 # ------------------------------------------------------------------------------
936 # False | False | True | False
937 # False | True | True | False
938 # True | True | True | False
939 if (
940 (
941 not (old_field.remote_field and old_field.db_index)
942 or old_field.primary_key
943 )
944 and (new_field.remote_field and new_field.db_index)
945 and not new_field.primary_key
946 ):
947 self.execute(self._create_index_sql(model, fields=[new_field]))
948 # Type alteration on primary key? Then we need to alter the column
949 # referring to us.
950 rels_to_update = []
951 if drop_foreign_keys:
952 rels_to_update.extend(_related_non_m2m_objects(old_field, new_field))
953 # Changed to become primary key?
954 if self._field_became_primary_key(old_field, new_field):
955 # Make the new one
956 self.execute(self._create_primary_key_sql(model, new_field))
957 # Update all referencing columns
958 rels_to_update.extend(_related_non_m2m_objects(old_field, new_field))
959 # Handle our type alters on the other end of rels from the PK stuff above
960 for old_rel, new_rel in rels_to_update:
961 rel_db_params = new_rel.field.db_parameters(connection=self.connection)
962 rel_type = rel_db_params["type"]
963 rel_collation = rel_db_params.get("collation")
964 old_rel_db_params = old_rel.field.db_parameters(connection=self.connection)
965 old_rel_collation = old_rel_db_params.get("collation")
966 fragment, other_actions = self._alter_column_type_sql(
967 new_rel.related_model,
968 old_rel.field,
969 new_rel.field,
970 rel_type,
971 old_rel_collation,
972 rel_collation,
973 )
974 self.execute(
975 self.sql_alter_column
976 % {
977 "table": self.quote_name(new_rel.related_model._meta.db_table),
978 "changes": fragment[0],
979 },
980 fragment[1],
981 )
982 for sql, params in other_actions:
983 self.execute(sql, params)
984 # Does it have a foreign key?
985 if (
986 self.connection.features.supports_foreign_keys
987 and new_field.remote_field
988 and (
989 fks_dropped or not old_field.remote_field or not old_field.db_constraint
990 )
991 and new_field.db_constraint
992 ):
993 self.execute(
994 self._create_fk_sql(model, new_field, "_fk_%(to_table)s_%(to_column)s")
995 )
996 # Rebuild FKs that pointed to us if we previously had to drop them
997 if drop_foreign_keys:
998 for _, rel in rels_to_update:
999 if rel.field.db_constraint:
1000 self.execute(
1001 self._create_fk_sql(rel.related_model, rel.field, "_fk")
1002 )
1003 # Does it have check constraints we need to add?
1004 if old_db_check != new_db_check and new_db_check:
1005 constraint_name = self._create_index_name(
1006 model._meta.db_table, [new_field.column], suffix="_check"
1007 )
1008 self.execute(
1009 self._create_check_sql(model, constraint_name, new_db_params["check"])
1010 )
1011 # Drop the default if we need to
1012 # (Plain usually does not use in-database defaults)
1013 if needs_database_default:
1014 changes_sql, params = self._alter_column_default_sql(
1015 model, old_field, new_field, drop=True
1016 )
1017 sql = self.sql_alter_column % {
1018 "table": self.quote_name(model._meta.db_table),
1019 "changes": changes_sql,
1020 }
1021 self.execute(sql, params)
1022 # Reset connection if required
1023 if self.connection.features.connection_persists_old_columns:
1024 self.connection.close()
1025
1026 def _alter_column_null_sql(self, model, old_field, new_field):
1027 """
1028 Hook to specialize column null alteration.
1029
1030 Return a (sql, params) fragment to set a column to null or non-null
1031 as required by new_field, or None if no changes are required.
1032 """
1033 new_db_params = new_field.db_parameters(connection=self.connection)
1034 sql = (
1035 self.sql_alter_column_null
1036 if new_field.allow_null
1037 else self.sql_alter_column_not_null
1038 )
1039 return (
1040 sql
1041 % {
1042 "column": self.quote_name(new_field.column),
1043 "type": new_db_params["type"],
1044 },
1045 [],
1046 )
1047
1048 def _alter_column_default_sql(self, model, old_field, new_field, drop=False):
1049 """
1050 Hook to specialize column default alteration.
1051
1052 Return a (sql, params) fragment to add or drop (depending on the drop
1053 argument) a default to new_field's column.
1054 """
1055 new_default = self.effective_default(new_field)
1056 default = self._column_default_sql(new_field)
1057 params = [new_default]
1058
1059 if drop:
1060 params = []
1061 elif self.connection.features.requires_literal_defaults:
1062 # Some databases (Oracle) can't take defaults as a parameter
1063 # If this is the case, the SchemaEditor for that database should
1064 # implement prepare_default().
1065 default = self.prepare_default(new_default)
1066 params = []
1067
1068 new_db_params = new_field.db_parameters(connection=self.connection)
1069 if drop:
1070 if new_field.allow_null:
1071 sql = self.sql_alter_column_no_default_null
1072 else:
1073 sql = self.sql_alter_column_no_default
1074 else:
1075 sql = self.sql_alter_column_default
1076 return (
1077 sql
1078 % {
1079 "column": self.quote_name(new_field.column),
1080 "type": new_db_params["type"],
1081 "default": default,
1082 },
1083 params,
1084 )
1085
1086 def _alter_column_type_sql(
1087 self, model, old_field, new_field, new_type, old_collation, new_collation
1088 ):
1089 """
1090 Hook to specialize column type alteration for different backends,
1091 for cases when a creation type is different to an alteration type
1092 (e.g. SERIAL in PostgreSQL, PostGIS fields).
1093
1094 Return a two-tuple of: an SQL fragment of (sql, params) to insert into
1095 an ALTER TABLE statement and a list of extra (sql, params) tuples to
1096 run once the field is altered.
1097 """
1098 other_actions = []
1099 if collate_sql := self._collate_sql(
1100 new_collation, old_collation, model._meta.db_table
1101 ):
1102 collate_sql = f" {collate_sql}"
1103 else:
1104 collate_sql = ""
1105 # Comment change?
1106 comment_sql = ""
1107 if self.connection.features.supports_comments and not new_field.many_to_many:
1108 if old_field.db_comment != new_field.db_comment:
1109 # PostgreSQL and Oracle can't execute 'ALTER COLUMN ...' and
1110 # 'COMMENT ON ...' at the same time.
1111 sql, params = self._alter_column_comment_sql(
1112 model, new_field, new_type, new_field.db_comment
1113 )
1114 if sql:
1115 other_actions.append((sql, params))
1116 if new_field.db_comment:
1117 comment_sql = self._comment_sql(new_field.db_comment)
1118 return (
1119 (
1120 self.sql_alter_column_type
1121 % {
1122 "column": self.quote_name(new_field.column),
1123 "type": new_type,
1124 "collation": collate_sql,
1125 "comment": comment_sql,
1126 },
1127 [],
1128 ),
1129 other_actions,
1130 )
1131
1132 def _alter_column_comment_sql(self, model, new_field, new_type, new_db_comment):
1133 return (
1134 self.sql_alter_column_comment
1135 % {
1136 "table": self.quote_name(model._meta.db_table),
1137 "column": self.quote_name(new_field.column),
1138 "comment": self._comment_sql(new_db_comment),
1139 },
1140 [],
1141 )
1142
1143 def _comment_sql(self, comment):
1144 return self.quote_value(comment or "")
1145
1146 def _alter_many_to_many(self, model, old_field, new_field, strict):
1147 """Alter M2Ms to repoint their to= endpoints."""
1148 # Rename the through table
1149 if (
1150 old_field.remote_field.through._meta.db_table
1151 != new_field.remote_field.through._meta.db_table
1152 ):
1153 self.alter_db_table(
1154 old_field.remote_field.through,
1155 old_field.remote_field.through._meta.db_table,
1156 new_field.remote_field.through._meta.db_table,
1157 )
1158 # Repoint the FK to the other side
1159 self.alter_field(
1160 new_field.remote_field.through,
1161 # The field that points to the target model is needed, so we can
1162 # tell alter_field to change it - this is m2m_reverse_field_name()
1163 # (as opposed to m2m_field_name(), which points to our model).
1164 old_field.remote_field.through._meta.get_field(
1165 old_field.m2m_reverse_field_name()
1166 ),
1167 new_field.remote_field.through._meta.get_field(
1168 new_field.m2m_reverse_field_name()
1169 ),
1170 )
1171 self.alter_field(
1172 new_field.remote_field.through,
1173 # for self-referential models we need to alter field from the other end too
1174 old_field.remote_field.through._meta.get_field(old_field.m2m_field_name()),
1175 new_field.remote_field.through._meta.get_field(new_field.m2m_field_name()),
1176 )
1177
1178 def _create_index_name(self, table_name, column_names, suffix=""):
1179 """
1180 Generate a unique name for an index/unique constraint.
1181
1182 The name is divided into 3 parts: the table name, the column names,
1183 and a unique digest and suffix.
1184 """
1185 _, table_name = split_identifier(table_name)
1186 hash_suffix_part = (
1187 f"{names_digest(table_name, *column_names, length=8)}{suffix}"
1188 )
1189 max_length = self.connection.ops.max_name_length() or 200
1190 # If everything fits into max_length, use that name.
1191 index_name = "{}_{}_{}".format(
1192 table_name, "_".join(column_names), hash_suffix_part
1193 )
1194 if len(index_name) <= max_length:
1195 return index_name
1196 # Shorten a long suffix.
1197 if len(hash_suffix_part) > max_length / 3:
1198 hash_suffix_part = hash_suffix_part[: max_length // 3]
1199 other_length = (max_length - len(hash_suffix_part)) // 2 - 1
1200 index_name = "{}_{}_{}".format(
1201 table_name[:other_length],
1202 "_".join(column_names)[:other_length],
1203 hash_suffix_part,
1204 )
1205 # Prepend D if needed to prevent the name from starting with an
1206 # underscore or a number (not permitted on Oracle).
1207 if index_name[0] == "_" or index_name[0].isdigit():
1208 index_name = f"D{index_name[:-1]}"
1209 return index_name
1210
1211 def _index_condition_sql(self, condition):
1212 if condition:
1213 return " WHERE " + condition
1214 return ""
1215
1216 def _index_include_sql(self, model, columns):
1217 if not columns or not self.connection.features.supports_covering_indexes:
1218 return ""
1219 return Statement(
1220 " INCLUDE (%(columns)s)",
1221 columns=Columns(model._meta.db_table, columns, self.quote_name),
1222 )
1223
1224 def _create_index_sql(
1225 self,
1226 model,
1227 *,
1228 fields=None,
1229 name=None,
1230 suffix="",
1231 using="",
1232 col_suffixes=(),
1233 sql=None,
1234 opclasses=(),
1235 condition=None,
1236 include=None,
1237 expressions=None,
1238 ):
1239 """
1240 Return the SQL statement to create the index for one or several fields
1241 or expressions. `sql` can be specified if the syntax differs from the
1242 standard (GIS indexes, ...).
1243 """
1244 fields = fields or []
1245 expressions = expressions or []
1246 compiler = Query(model, alias_cols=False).get_compiler()
1247 columns = [field.column for field in fields]
1248 sql_create_index = sql or self.sql_create_index
1249 table = model._meta.db_table
1250
1251 def create_index_name(*args, **kwargs):
1252 nonlocal name
1253 if name is None:
1254 name = self._create_index_name(*args, **kwargs)
1255 return self.quote_name(name)
1256
1257 return Statement(
1258 sql_create_index,
1259 table=Table(table, self.quote_name),
1260 name=IndexName(table, columns, suffix, create_index_name),
1261 using=using,
1262 columns=(
1263 self._index_columns(table, columns, col_suffixes, opclasses)
1264 if columns
1265 else Expressions(table, expressions, compiler, self.quote_value)
1266 ),
1267 extra="",
1268 condition=self._index_condition_sql(condition),
1269 include=self._index_include_sql(model, include),
1270 )
1271
1272 def _delete_index_sql(self, model, name, sql=None):
1273 return Statement(
1274 sql or self.sql_delete_index,
1275 table=Table(model._meta.db_table, self.quote_name),
1276 name=self.quote_name(name),
1277 )
1278
1279 def _rename_index_sql(self, model, old_name, new_name):
1280 return Statement(
1281 self.sql_rename_index,
1282 table=Table(model._meta.db_table, self.quote_name),
1283 old_name=self.quote_name(old_name),
1284 new_name=self.quote_name(new_name),
1285 )
1286
1287 def _index_columns(self, table, columns, col_suffixes, opclasses):
1288 return Columns(table, columns, self.quote_name, col_suffixes=col_suffixes)
1289
1290 def _model_indexes_sql(self, model):
1291 """
1292 Return a list of all index SQL statements (field indexes, Meta.indexes) for the specified model.
1293 """
1294 output = []
1295 for field in model._meta.local_fields:
1296 output.extend(self._field_indexes_sql(model, field))
1297
1298 for index in model._meta.indexes:
1299 if (
1300 not index.contains_expressions
1301 or self.connection.features.supports_expression_indexes
1302 ):
1303 output.append(index.create_sql(model, self))
1304 return output
1305
1306 def _field_indexes_sql(self, model, field):
1307 """
1308 Return a list of all index SQL statements for the specified field.
1309 """
1310 output = []
1311 if self._field_should_be_indexed(model, field):
1312 output.append(self._create_index_sql(model, fields=[field]))
1313 return output
1314
1315 def _field_should_be_altered(self, old_field, new_field, ignore=None):
1316 ignore = ignore or set()
1317 _, old_path, old_args, old_kwargs = old_field.deconstruct()
1318 _, new_path, new_args, new_kwargs = new_field.deconstruct()
1319 # Don't alter when:
1320 # - changing only a field name
1321 # - changing an attribute that doesn't affect the schema
1322 # - changing an attribute in the provided set of ignored attributes
1323 # - adding only a db_column and the column name is not changed
1324 for attr in ignore.union(old_field.non_db_attrs):
1325 old_kwargs.pop(attr, None)
1326 for attr in ignore.union(new_field.non_db_attrs):
1327 new_kwargs.pop(attr, None)
1328 return self.quote_name(old_field.column) != self.quote_name(
1329 new_field.column
1330 ) or (old_path, old_args, old_kwargs) != (new_path, new_args, new_kwargs)
1331
1332 def _field_should_be_indexed(self, model, field):
1333 return (field.remote_field and field.db_index) and not field.primary_key
1334
1335 def _field_became_primary_key(self, old_field, new_field):
1336 return not old_field.primary_key and new_field.primary_key
1337
1338 def _rename_field_sql(self, table, old_field, new_field, new_type):
1339 return self.sql_rename_column % {
1340 "table": self.quote_name(table),
1341 "old_column": self.quote_name(old_field.column),
1342 "new_column": self.quote_name(new_field.column),
1343 "type": new_type,
1344 }
1345
1346 def _create_fk_sql(self, model, field, suffix):
1347 table = Table(model._meta.db_table, self.quote_name)
1348 name = self._fk_constraint_name(model, field, suffix)
1349 column = Columns(model._meta.db_table, [field.column], self.quote_name)
1350 to_table = Table(field.target_field.model._meta.db_table, self.quote_name)
1351 to_column = Columns(
1352 field.target_field.model._meta.db_table,
1353 [field.target_field.column],
1354 self.quote_name,
1355 )
1356 deferrable = self.connection.ops.deferrable_sql()
1357 return Statement(
1358 self.sql_create_fk,
1359 table=table,
1360 name=name,
1361 column=column,
1362 to_table=to_table,
1363 to_column=to_column,
1364 deferrable=deferrable,
1365 )
1366
1367 def _fk_constraint_name(self, model, field, suffix):
1368 def create_fk_name(*args, **kwargs):
1369 return self.quote_name(self._create_index_name(*args, **kwargs))
1370
1371 return ForeignKeyName(
1372 model._meta.db_table,
1373 [field.column],
1374 split_identifier(field.target_field.model._meta.db_table)[1],
1375 [field.target_field.column],
1376 suffix,
1377 create_fk_name,
1378 )
1379
1380 def _delete_fk_sql(self, model, name):
1381 return self._delete_constraint_sql(self.sql_delete_fk, model, name)
1382
1383 def _deferrable_constraint_sql(self, deferrable):
1384 if deferrable is None:
1385 return ""
1386 if deferrable == Deferrable.DEFERRED:
1387 return " DEFERRABLE INITIALLY DEFERRED"
1388 if deferrable == Deferrable.IMMEDIATE:
1389 return " DEFERRABLE INITIALLY IMMEDIATE"
1390
1391 def _unique_sql(
1392 self,
1393 model,
1394 fields,
1395 name,
1396 condition=None,
1397 deferrable=None,
1398 include=None,
1399 opclasses=None,
1400 expressions=None,
1401 ):
1402 if (
1403 deferrable
1404 and not self.connection.features.supports_deferrable_unique_constraints
1405 ):
1406 return None
1407 if condition or include or opclasses or expressions:
1408 # Databases support conditional, covering, and functional unique
1409 # constraints via a unique index.
1410 sql = self._create_unique_sql(
1411 model,
1412 fields,
1413 name=name,
1414 condition=condition,
1415 include=include,
1416 opclasses=opclasses,
1417 expressions=expressions,
1418 )
1419 if sql:
1420 self.deferred_sql.append(sql)
1421 return None
1422 constraint = self.sql_unique_constraint % {
1423 "columns": ", ".join([self.quote_name(field.column) for field in fields]),
1424 "deferrable": self._deferrable_constraint_sql(deferrable),
1425 }
1426 return self.sql_constraint % {
1427 "name": self.quote_name(name),
1428 "constraint": constraint,
1429 }
1430
1431 def _create_unique_sql(
1432 self,
1433 model,
1434 fields,
1435 name=None,
1436 condition=None,
1437 deferrable=None,
1438 include=None,
1439 opclasses=None,
1440 expressions=None,
1441 ):
1442 if (
1443 (
1444 deferrable
1445 and not self.connection.features.supports_deferrable_unique_constraints
1446 )
1447 or (condition and not self.connection.features.supports_partial_indexes)
1448 or (include and not self.connection.features.supports_covering_indexes)
1449 or (
1450 expressions and not self.connection.features.supports_expression_indexes
1451 )
1452 ):
1453 return None
1454
1455 compiler = Query(model, alias_cols=False).get_compiler()
1456 table = model._meta.db_table
1457 columns = [field.column for field in fields]
1458 if name is None:
1459 name = self._unique_constraint_name(table, columns, quote=True)
1460 else:
1461 name = self.quote_name(name)
1462 if condition or include or opclasses or expressions:
1463 sql = self.sql_create_unique_index
1464 else:
1465 sql = self.sql_create_unique
1466 if columns:
1467 columns = self._index_columns(
1468 table, columns, col_suffixes=(), opclasses=opclasses
1469 )
1470 else:
1471 columns = Expressions(table, expressions, compiler, self.quote_value)
1472 return Statement(
1473 sql,
1474 table=Table(table, self.quote_name),
1475 name=name,
1476 columns=columns,
1477 condition=self._index_condition_sql(condition),
1478 deferrable=self._deferrable_constraint_sql(deferrable),
1479 include=self._index_include_sql(model, include),
1480 )
1481
1482 def _unique_constraint_name(self, table, columns, quote=True):
1483 if quote:
1484
1485 def create_unique_name(*args, **kwargs):
1486 return self.quote_name(self._create_index_name(*args, **kwargs))
1487
1488 else:
1489 create_unique_name = self._create_index_name
1490
1491 return IndexName(table, columns, "_uniq", create_unique_name)
1492
1493 def _delete_unique_sql(
1494 self,
1495 model,
1496 name,
1497 condition=None,
1498 deferrable=None,
1499 include=None,
1500 opclasses=None,
1501 expressions=None,
1502 ):
1503 if (
1504 (
1505 deferrable
1506 and not self.connection.features.supports_deferrable_unique_constraints
1507 )
1508 or (condition and not self.connection.features.supports_partial_indexes)
1509 or (include and not self.connection.features.supports_covering_indexes)
1510 or (
1511 expressions and not self.connection.features.supports_expression_indexes
1512 )
1513 ):
1514 return None
1515 if condition or include or opclasses or expressions:
1516 sql = self.sql_delete_index
1517 else:
1518 sql = self.sql_delete_unique
1519 return self._delete_constraint_sql(sql, model, name)
1520
1521 def _check_sql(self, name, check):
1522 return self.sql_constraint % {
1523 "name": self.quote_name(name),
1524 "constraint": self.sql_check_constraint % {"check": check},
1525 }
1526
1527 def _create_check_sql(self, model, name, check):
1528 if not self.connection.features.supports_table_check_constraints:
1529 return None
1530 return Statement(
1531 self.sql_create_check,
1532 table=Table(model._meta.db_table, self.quote_name),
1533 name=self.quote_name(name),
1534 check=check,
1535 )
1536
1537 def _delete_check_sql(self, model, name):
1538 if not self.connection.features.supports_table_check_constraints:
1539 return None
1540 return self._delete_constraint_sql(self.sql_delete_check, model, name)
1541
1542 def _delete_constraint_sql(self, template, model, name):
1543 return Statement(
1544 template,
1545 table=Table(model._meta.db_table, self.quote_name),
1546 name=self.quote_name(name),
1547 )
1548
1549 def _constraint_names(
1550 self,
1551 model,
1552 column_names=None,
1553 unique=None,
1554 primary_key=None,
1555 index=None,
1556 foreign_key=None,
1557 check=None,
1558 type_=None,
1559 exclude=None,
1560 ):
1561 """Return all constraint names matching the columns and conditions."""
1562 if column_names is not None:
1563 column_names = [
1564 self.connection.introspection.identifier_converter(
1565 truncate_name(name, self.connection.ops.max_name_length())
1566 )
1567 if self.connection.features.truncates_names
1568 else self.connection.introspection.identifier_converter(name)
1569 for name in column_names
1570 ]
1571 with self.connection.cursor() as cursor:
1572 constraints = self.connection.introspection.get_constraints(
1573 cursor, model._meta.db_table
1574 )
1575 result = []
1576 for name, infodict in constraints.items():
1577 if column_names is None or column_names == infodict["columns"]:
1578 if unique is not None and infodict["unique"] != unique:
1579 continue
1580 if primary_key is not None and infodict["primary_key"] != primary_key:
1581 continue
1582 if index is not None and infodict["index"] != index:
1583 continue
1584 if check is not None and infodict["check"] != check:
1585 continue
1586 if foreign_key is not None and not infodict["foreign_key"]:
1587 continue
1588 if type_ is not None and infodict["type"] != type_:
1589 continue
1590 if not exclude or name not in exclude:
1591 result.append(name)
1592 return result
1593
1594 def _delete_primary_key(self, model, strict=False):
1595 constraint_names = self._constraint_names(model, primary_key=True)
1596 if strict and len(constraint_names) != 1:
1597 raise ValueError(
1598 f"Found wrong number ({len(constraint_names)}) of PK constraints for {model._meta.db_table}"
1599 )
1600 for constraint_name in constraint_names:
1601 self.execute(self._delete_primary_key_sql(model, constraint_name))
1602
1603 def _create_primary_key_sql(self, model, field):
1604 return Statement(
1605 self.sql_create_pk,
1606 table=Table(model._meta.db_table, self.quote_name),
1607 name=self.quote_name(
1608 self._create_index_name(
1609 model._meta.db_table, [field.column], suffix="_pk"
1610 )
1611 ),
1612 columns=Columns(model._meta.db_table, [field.column], self.quote_name),
1613 )
1614
1615 def _delete_primary_key_sql(self, model, name):
1616 return self._delete_constraint_sql(self.sql_delete_pk, model, name)
1617
1618 def _collate_sql(self, collation, old_collation=None, table_name=None):
1619 return "COLLATE " + self.quote_name(collation) if collation else ""