1from __future__ import annotations
2
3import logging
4import operator
5from abc import ABC, abstractmethod
6from collections.abc import Generator
7from datetime import datetime
8from typing import TYPE_CHECKING, Any
9
10if TYPE_CHECKING:
11 from typing import Self
12
13from plain.models.backends.ddl_references import (
14 Columns,
15 Expressions,
16 ForeignKeyName,
17 IndexName,
18 Statement,
19 Table,
20)
21from plain.models.backends.utils import names_digest, split_identifier, truncate_name
22from plain.models.constraints import Deferrable
23from plain.models.fields import DbParameters, Field
24from plain.models.fields.related import ForeignKeyField, RelatedField
25from plain.models.fields.reverse_related import ForeignObjectRel, ManyToManyRel
26from plain.models.indexes import Index
27from plain.models.sql.query import Query
28from plain.models.transaction import TransactionManagementError, atomic
29from plain.utils import timezone
30
31if TYPE_CHECKING:
32 from collections.abc import Iterable
33
34 from plain.models.backends.base.base import BaseDatabaseWrapper
35 from plain.models.base import Model
36 from plain.models.constraints import BaseConstraint
37 from plain.models.fields import Field
38 from plain.models.fields.related import ForeignKeyField, ManyToManyField
39 from plain.models.fields.reverse_related import ManyToManyRel
40
41logger = logging.getLogger("plain.models.backends.schema")
42
43
44def _is_relevant_relation(relation: ForeignObjectRel, altered_field: Field) -> bool:
45 """
46 When altering the given field, must constraints on its model from the given
47 relation be temporarily dropped?
48 """
49 from plain.models.fields.related import ManyToManyField
50
51 field = relation.field
52 if isinstance(field, ManyToManyField):
53 # M2M reverse field
54 return False
55 if altered_field.primary_key:
56 # Foreign key constraint on the primary key, which is being altered.
57 return True
58 # ForeignKeyField always targets 'id'
59 return altered_field.name == "id"
60
61
62def _all_related_fields(model: type[Model]) -> list[ForeignObjectRel]:
63 # Related fields must be returned in a deterministic order.
64 return sorted(
65 model._model_meta._get_fields(
66 forward=False,
67 reverse=True,
68 ),
69 key=operator.attrgetter("name"),
70 )
71
72
73def _related_non_m2m_objects(
74 old_field: Field, new_field: Field
75) -> Generator[tuple[ForeignObjectRel, ForeignObjectRel], None, None]:
76 # Filter out m2m objects from reverse relations.
77 # Return (old_relation, new_relation) tuples.
78 related_fields = zip(
79 (
80 obj
81 for obj in _all_related_fields(old_field.model)
82 if _is_relevant_relation(obj, old_field)
83 ),
84 (
85 obj
86 for obj in _all_related_fields(new_field.model)
87 if _is_relevant_relation(obj, new_field)
88 ),
89 )
90 for old_rel, new_rel in related_fields:
91 yield old_rel, new_rel
92 yield from _related_non_m2m_objects(
93 old_rel.remote_field,
94 new_rel.remote_field,
95 )
96
97
98class BaseDatabaseSchemaEditor(ABC):
99 """
100 This class and its subclasses are responsible for emitting schema-changing
101 statements to the databases - model creation/removal/alteration, field
102 renaming, index fiddling, and so on.
103 """
104
105 # Overrideable SQL templates
106 sql_create_table = "CREATE TABLE %(table)s (%(definition)s)"
107 sql_rename_table = "ALTER TABLE %(old_table)s RENAME TO %(new_table)s"
108 sql_delete_table = "DROP TABLE %(table)s CASCADE"
109
110 sql_create_column = "ALTER TABLE %(table)s ADD COLUMN %(column)s %(definition)s"
111 sql_alter_column = "ALTER TABLE %(table)s %(changes)s"
112 sql_alter_column_type = "ALTER COLUMN %(column)s TYPE %(type)s"
113 sql_alter_column_null = "ALTER COLUMN %(column)s DROP NOT NULL"
114 sql_alter_column_not_null = "ALTER COLUMN %(column)s SET NOT NULL"
115 sql_alter_column_default = "ALTER COLUMN %(column)s SET DEFAULT %(default)s"
116 sql_alter_column_no_default = "ALTER COLUMN %(column)s DROP DEFAULT"
117 sql_alter_column_no_default_null = sql_alter_column_no_default
118 sql_delete_column = "ALTER TABLE %(table)s DROP COLUMN %(column)s CASCADE"
119 sql_rename_column = (
120 "ALTER TABLE %(table)s RENAME COLUMN %(old_column)s TO %(new_column)s"
121 )
122 sql_update_with_default = (
123 "UPDATE %(table)s SET %(column)s = %(default)s WHERE %(column)s IS NULL"
124 )
125
126 sql_unique_constraint = "UNIQUE (%(columns)s)%(deferrable)s"
127 sql_check_constraint = "CHECK (%(check)s)"
128 sql_delete_constraint = "ALTER TABLE %(table)s DROP CONSTRAINT %(name)s"
129 sql_constraint = "CONSTRAINT %(name)s %(constraint)s"
130
131 sql_create_check = "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s CHECK (%(check)s)"
132 sql_delete_check = sql_delete_constraint
133
134 sql_create_unique = (
135 "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s "
136 "UNIQUE (%(columns)s)%(deferrable)s"
137 )
138 sql_delete_unique = sql_delete_constraint
139
140 sql_create_fk = (
141 "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s FOREIGN KEY (%(column)s) "
142 "REFERENCES %(to_table)s (%(to_column)s)%(deferrable)s"
143 )
144 sql_create_inline_fk = None
145 sql_create_column_inline_fk = None
146 sql_delete_fk = sql_delete_constraint
147
148 sql_create_index = (
149 "CREATE INDEX %(name)s ON %(table)s "
150 "(%(columns)s)%(include)s%(extra)s%(condition)s"
151 )
152 sql_create_unique_index = (
153 "CREATE UNIQUE INDEX %(name)s ON %(table)s "
154 "(%(columns)s)%(include)s%(condition)s"
155 )
156 sql_rename_index = "ALTER INDEX %(old_name)s RENAME TO %(new_name)s"
157 sql_delete_index = "DROP INDEX %(name)s"
158
159 sql_create_pk = (
160 "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s PRIMARY KEY (%(columns)s)"
161 )
162 sql_delete_pk = sql_delete_constraint
163
164 def __init__(
165 self,
166 connection: BaseDatabaseWrapper,
167 atomic: bool = True,
168 ):
169 self.connection = connection
170 self.atomic_migration = self.connection.features.can_rollback_ddl and atomic
171
172 # State-managing methods
173
174 def __enter__(self) -> Self:
175 self.deferred_sql: list[Any] = []
176 self.executed_sql: list[str] = []
177 if self.atomic_migration:
178 self.atomic = atomic()
179 self.atomic.__enter__()
180 return self
181
182 def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
183 if exc_type is None:
184 for sql in self.deferred_sql:
185 self.execute(sql)
186 if self.atomic_migration:
187 self.atomic.__exit__(exc_type, exc_value, traceback)
188
189 # Core utility functions
190
191 def execute(
192 self, sql: str | Statement, params: tuple[Any, ...] | list[Any] | None = ()
193 ) -> None:
194 """Execute the given SQL statement, with optional parameters."""
195 if (
196 self.connection.in_atomic_block
197 and not self.connection.features.can_rollback_ddl
198 ):
199 raise TransactionManagementError(
200 "Executing DDL statements while in a transaction on databases "
201 "that can't perform a rollback is prohibited."
202 )
203 # Account for non-string statement objects.
204 sql = str(sql)
205 # Log the command we're running, then run it
206 logger.debug(
207 "%s; (params %r)", sql, params, extra={"params": params, "sql": sql}
208 )
209
210 # Track executed SQL for display in migration output
211 # Store the SQL for display (interpolate params for readability)
212 if params:
213 self.executed_sql.append(sql % tuple(map(self.quote_value, params)))
214 else:
215 self.executed_sql.append(sql)
216
217 with self.connection.cursor() as cursor:
218 cursor.execute(sql, params)
219
220 def quote_name(self, name: str) -> str:
221 return self.connection.ops.quote_name(name)
222
223 def table_sql(self, model: type[Model]) -> tuple[str, list[Any]]:
224 """Take a model and return its table definition."""
225 # Create column SQL, add FK deferreds if needed.
226 column_sqls = []
227 params = []
228 for field in model._model_meta.local_fields:
229 # SQL.
230 definition, extra_params = self.column_sql(model, field)
231 if definition is None:
232 continue
233 # Check constraints can go on the column SQL here.
234 db_params = field.db_parameters(connection=self.connection)
235 if db_params["check"]:
236 definition += " " + self.sql_check_constraint % db_params
237 # Autoincrement SQL (for backends with inline variant).
238 col_type_suffix = field.db_type_suffix(connection=self.connection)
239 if col_type_suffix:
240 definition += f" {col_type_suffix}"
241 if extra_params:
242 params.extend(extra_params)
243 # FK.
244 if isinstance(field, ForeignKeyField) and field.db_constraint:
245 to_table = field.remote_field.model.model_options.db_table
246 field_name = field.remote_field.field_name
247 if field_name is None:
248 raise ValueError("Foreign key field_name cannot be None")
249 to_field = field.remote_field.model._model_meta.get_forward_field(
250 field_name
251 )
252 to_column = to_field.column
253 if self.sql_create_inline_fk:
254 definition += " " + self.sql_create_inline_fk % {
255 "to_table": self.quote_name(to_table),
256 "to_column": self.quote_name(to_column),
257 }
258 elif self.connection.features.supports_foreign_keys:
259 self.deferred_sql.append(
260 self._create_fk_sql(
261 model, field, "_fk_%(to_table)s_%(to_column)s"
262 )
263 )
264 # Add the SQL to our big list.
265 column_sqls.append(f"{self.quote_name(field.column)} {definition}")
266 constraints = [
267 constraint.constraint_sql(model, self)
268 for constraint in model.model_options.constraints
269 ]
270 sql = self.sql_create_table % {
271 "table": self.quote_name(model.model_options.db_table),
272 "definition": ", ".join(
273 str(constraint)
274 for constraint in (*column_sqls, *constraints)
275 if constraint
276 ),
277 }
278 return sql, params
279
280 # Field <-> database mapping functions
281
282 def _iter_column_sql(
283 self,
284 column_db_type: str,
285 params: list[Any],
286 model: type[Model],
287 field: Field,
288 field_db_params: DbParameters,
289 include_default: bool,
290 ) -> Generator[str, None, None]:
291 yield column_db_type
292 # Work out nullability.
293 null = field.allow_null
294 # Include a default value, if requested.
295 include_default = (
296 include_default
297 and not self.skip_default(field)
298 and
299 # Don't include a default value if it's a nullable field and the
300 # default cannot be dropped in the ALTER COLUMN statement (e.g.
301 # MySQL longtext and longblob).
302 not (null and self.skip_default_on_alter(field))
303 )
304 if include_default:
305 default_value = self.effective_default(field)
306 if default_value is not None:
307 column_default = "DEFAULT " + self._column_default_sql(field)
308 if self.connection.features.requires_literal_defaults:
309 # Some databases can't take defaults as a parameter (Oracle).
310 # If this is the case, the individual schema backend should
311 # implement prepare_default().
312 yield column_default % self.prepare_default(default_value)
313 else:
314 yield column_default
315 params.append(default_value)
316
317 if not null:
318 yield "NOT NULL"
319 else:
320 yield "NULL"
321
322 if field.primary_key:
323 yield "PRIMARY KEY"
324
325 def column_sql(
326 self, model: type[Model], field: Field, include_default: bool = False
327 ) -> tuple[str | None, list[Any] | None]:
328 """
329 Return the column definition for a field. The field must already have
330 had set_attributes_from_name() called.
331 """
332 # Get the column's type and use that as the basis of the SQL.
333 field_db_params = field.db_parameters(connection=self.connection)
334 column_db_type = field_db_params["type"]
335 # Check for fields that aren't actually columns (e.g. M2M).
336 if column_db_type is None:
337 return None, None
338 params: list[Any] = []
339 return (
340 " ".join(
341 # This appends to the params being returned.
342 self._iter_column_sql(
343 column_db_type,
344 params,
345 model,
346 field,
347 field_db_params,
348 include_default,
349 )
350 ),
351 params,
352 )
353
354 def skip_default(self, field: Field) -> bool:
355 """
356 Some backends don't accept default values for certain columns types
357 (i.e. MySQL longtext and longblob).
358 """
359 return False
360
361 def skip_default_on_alter(self, field: Field) -> bool:
362 """
363 Some backends don't accept default values for certain columns types
364 (i.e. MySQL longtext and longblob) in the ALTER COLUMN statement.
365 """
366 return False
367
368 def prepare_default(self, value: Any) -> str:
369 """
370 Only used for backends which have requires_literal_defaults feature
371 """
372 raise NotImplementedError(
373 "subclasses of BaseDatabaseSchemaEditor for backends which have "
374 "requires_literal_defaults must provide a prepare_default() method"
375 )
376
377 def _column_default_sql(self, field: Field) -> str:
378 """
379 Return the SQL to use in a DEFAULT clause. The resulting string should
380 contain a '%s' placeholder for a default value.
381 """
382 return "%s"
383
384 @staticmethod
385 def _effective_default(field: Field) -> Any:
386 # This method allows testing its logic without a connection.
387 if field.has_default():
388 default = field.get_default()
389 elif (
390 not field.allow_null and not field.required and field.empty_strings_allowed
391 ):
392 if field.get_internal_type() == "BinaryField":
393 default = b""
394 else:
395 default = ""
396 elif getattr(field, "auto_now", False) or getattr(field, "auto_now_add", False):
397 internal_type = field.get_internal_type()
398 if internal_type == "DateTimeField":
399 default = timezone.now()
400 else:
401 default = datetime.now()
402 if internal_type == "DateField":
403 default = default.date()
404 elif internal_type == "TimeField":
405 default = default.time()
406 else:
407 default = None
408 return default
409
410 def effective_default(self, field: Field) -> Any:
411 """Return a field's effective database default value."""
412 return field.get_db_prep_save(self._effective_default(field), self.connection)
413
414 @abstractmethod
415 def quote_value(self, value: Any) -> str:
416 """
417 Return a quoted version of the value so it's safe to use in an SQL
418 string. This is not safe against injection from user code; it is
419 intended only for use in making SQL scripts or preparing default values
420 for particularly tricky backends (defaults are not user-defined, though,
421 so this is safe).
422 """
423 ...
424
425 # Actions
426
427 def create_model(self, model: type[Model]) -> None:
428 """
429 Create a table and any accompanying indexes or unique constraints for
430 the given `model`.
431 """
432 sql, params = self.table_sql(model)
433 # Prevent using [] as params, in the case a literal '%' is used in the
434 # definition.
435 self.execute(sql, params or None)
436
437 # Add any field index (deferred as SQLite _remake_table needs it).
438 self.deferred_sql.extend(self._model_indexes_sql(model))
439
440 def delete_model(self, model: type[Model]) -> None:
441 """Delete a model from the database."""
442
443 # Delete the table
444 self.execute(
445 self.sql_delete_table
446 % {
447 "table": self.quote_name(model.model_options.db_table),
448 }
449 )
450 # Remove all deferred statements referencing the deleted table.
451 for sql in list(self.deferred_sql):
452 if isinstance(sql, Statement) and sql.references_table(
453 model.model_options.db_table
454 ):
455 self.deferred_sql.remove(sql)
456
457 def add_index(self, model: type[Model], index: Index) -> None:
458 """Add an index on a model."""
459 if (
460 index.contains_expressions
461 and not self.connection.features.supports_expression_indexes
462 ):
463 return None
464 # Index.create_sql returns interpolated SQL which makes params=None a
465 # necessity to avoid escaping attempts on execution.
466 self.execute(index.create_sql(model, self), params=None)
467
468 def remove_index(self, model: type[Model], index: Index) -> None:
469 """Remove an index from a model."""
470 if (
471 index.contains_expressions
472 and not self.connection.features.supports_expression_indexes
473 ):
474 return None
475 self.execute(index.remove_sql(model, self))
476
477 def rename_index(
478 self, model: type[Model], old_index: Index, new_index: Index
479 ) -> None:
480 if self.connection.features.can_rename_index:
481 self.execute(
482 self._rename_index_sql(model, old_index.name, new_index.name),
483 params=None,
484 )
485 else:
486 self.remove_index(model, old_index)
487 self.add_index(model, new_index)
488
489 def add_constraint(self, model: type[Model], constraint: BaseConstraint) -> None:
490 """Add a constraint to a model."""
491 sql = constraint.create_sql(model, self)
492 if sql:
493 # Constraint.create_sql returns interpolated SQL which makes
494 # params=None a necessity to avoid escaping attempts on execution.
495 self.execute(sql, params=None)
496
497 def remove_constraint(self, model: type[Model], constraint: BaseConstraint) -> None:
498 """Remove a constraint from a model."""
499 sql = constraint.remove_sql(model, self)
500 if sql:
501 self.execute(sql)
502
503 def alter_db_table(
504 self, model: type[Model], old_db_table: str, new_db_table: str
505 ) -> None:
506 """Rename the table a model points to."""
507 if old_db_table == new_db_table or (
508 self.connection.features.ignores_table_name_case
509 and old_db_table.lower() == new_db_table.lower()
510 ):
511 return
512 self.execute(
513 self.sql_rename_table
514 % {
515 "old_table": self.quote_name(old_db_table),
516 "new_table": self.quote_name(new_db_table),
517 }
518 )
519 # Rename all references to the old table name.
520 for sql in self.deferred_sql:
521 if isinstance(sql, Statement):
522 sql.rename_table_references(old_db_table, new_db_table)
523
524 def add_field(self, model: type[Model], field: Field) -> None:
525 """
526 Create a field on a model. Usually involves adding a column, but may
527 involve adding a table instead (for M2M fields).
528 """
529 # Get the column's definition
530 definition, params = self.column_sql(model, field, include_default=True)
531 # It might not actually have a column behind it
532 if definition is None:
533 return
534 if col_type_suffix := field.db_type_suffix(connection=self.connection):
535 definition += f" {col_type_suffix}"
536 # Check constraints can go on the column SQL here
537 db_params = field.db_parameters(connection=self.connection)
538 if db_params["check"]:
539 definition += " " + self.sql_check_constraint % db_params
540 if (
541 isinstance(field, ForeignKeyField)
542 and self.connection.features.supports_foreign_keys
543 and field.db_constraint
544 ):
545 constraint_suffix = "_fk_%(to_table)s_%(to_column)s"
546 # Add FK constraint inline, if supported.
547 if self.sql_create_column_inline_fk:
548 to_table = field.remote_field.model.model_options.db_table
549 field_name = field.remote_field.field_name
550 if field_name is None:
551 raise ValueError("Foreign key field_name cannot be None")
552 to_field = field.remote_field.model._model_meta.get_forward_field(
553 field_name
554 )
555 to_column = to_field.column
556 namespace, _ = split_identifier(model.model_options.db_table)
557 definition += " " + self.sql_create_column_inline_fk % {
558 "name": self._fk_constraint_name(model, field, constraint_suffix),
559 "namespace": f"{self.quote_name(namespace)}." if namespace else "",
560 "column": self.quote_name(field.column),
561 "to_table": self.quote_name(to_table),
562 "to_column": self.quote_name(to_column),
563 "deferrable": self.connection.ops.deferrable_sql(),
564 }
565 # Otherwise, add FK constraints later.
566 else:
567 self.deferred_sql.append(
568 self._create_fk_sql(model, field, constraint_suffix)
569 )
570 # Build the SQL and run it
571 sql = self.sql_create_column % {
572 "table": self.quote_name(model.model_options.db_table),
573 "column": self.quote_name(field.column),
574 "definition": definition,
575 }
576 self.execute(sql, params)
577 # Drop the default if we need to
578 # (Plain usually does not use in-database defaults)
579 if (
580 not self.skip_default_on_alter(field)
581 and self.effective_default(field) is not None
582 ):
583 changes_sql, params = self._alter_column_default_sql(
584 model, None, field, drop=True
585 )
586 sql = self.sql_alter_column % {
587 "table": self.quote_name(model.model_options.db_table),
588 "changes": changes_sql,
589 }
590 self.execute(sql, params)
591 # Add an index, if required
592 self.deferred_sql.extend(self._field_indexes_sql(model, field))
593
594 def remove_field(self, model: type[Model], field: Field) -> None:
595 """
596 Remove a field from a model. Usually involves deleting a column,
597 but for M2Ms may involve deleting a table.
598 """
599 # It might not actually have a column behind it
600 if field.db_parameters(connection=self.connection)["type"] is None:
601 return
602 # Drop any FK constraints, MySQL requires explicit deletion
603 if isinstance(field, RelatedField):
604 fk_names = self._constraint_names(model, [field.column], foreign_key=True)
605 for fk_name in fk_names:
606 self.execute(self._delete_fk_sql(model, fk_name))
607 # Delete the column
608 sql = self.sql_delete_column % {
609 "table": self.quote_name(model.model_options.db_table),
610 "column": self.quote_name(field.column),
611 }
612 self.execute(sql)
613 # Remove all deferred statements referencing the deleted column.
614 for sql in list(self.deferred_sql):
615 if isinstance(sql, Statement) and sql.references_column(
616 model.model_options.db_table, field.column
617 ):
618 self.deferred_sql.remove(sql)
619
620 def alter_field(
621 self,
622 model: type[Model],
623 old_field: Field,
624 new_field: Field,
625 strict: bool = False,
626 ) -> None:
627 """
628 Allow a field's type, uniqueness, nullability, default, column,
629 constraints, etc. to be modified.
630 `old_field` is required to compute the necessary changes.
631 If `strict` is True, raise errors if the old column does not match
632 `old_field` precisely.
633 """
634 if not self._field_should_be_altered(old_field, new_field):
635 return
636 # Ensure this field is even column-based
637 old_db_params = old_field.db_parameters(connection=self.connection)
638 old_type = old_db_params["type"]
639 new_db_params = new_field.db_parameters(connection=self.connection)
640 new_type = new_db_params["type"]
641 if (old_type is None and not isinstance(old_field, RelatedField)) or (
642 new_type is None and not isinstance(new_field, RelatedField)
643 ):
644 raise ValueError(
645 f"Cannot alter field {old_field} into {new_field} - they do not properly define "
646 "db_type (are you using a badly-written custom field?)",
647 )
648 elif (
649 old_type is None
650 and new_type is None
651 and isinstance(old_field, RelatedField)
652 and isinstance(old_field.remote_field, ManyToManyRel)
653 and isinstance(new_field, RelatedField)
654 and isinstance(new_field.remote_field, ManyToManyRel)
655 ):
656 # Both sides have through models; this is a no-op.
657 return
658 elif old_type is None or new_type is None:
659 raise ValueError(
660 f"Cannot alter field {old_field} into {new_field} - they are not compatible types "
661 "(you cannot alter to or from M2M fields, or add or remove "
662 "through= on M2M fields)"
663 )
664
665 self._alter_field(
666 model,
667 old_field,
668 new_field,
669 old_type,
670 new_type,
671 old_db_params,
672 new_db_params,
673 strict,
674 )
675
676 def _field_db_check(
677 self, field: Field, field_db_params: DbParameters
678 ) -> str | None:
679 # Always check constraints with the same mocked column name to avoid
680 # recreating constrains when the column is renamed.
681 check_constraints = self.connection.data_type_check_constraints
682 data = field.db_type_parameters(self.connection)
683 data["column"] = "__column_name__"
684 try:
685 return check_constraints[field.get_internal_type()] % data
686 except KeyError:
687 return None
688
689 def _alter_field(
690 self,
691 model: type[Model],
692 old_field: Field,
693 new_field: Field,
694 old_type: str,
695 new_type: str,
696 old_db_params: DbParameters,
697 new_db_params: DbParameters,
698 strict: bool = False,
699 ) -> None:
700 """Perform a "physical" (non-ManyToMany) field update."""
701 # Drop any FK constraints, we'll remake them later
702 fks_dropped = set()
703 if (
704 self.connection.features.supports_foreign_keys
705 and isinstance(old_field, ForeignKeyField)
706 and old_field.db_constraint
707 and self._field_should_be_altered(
708 old_field,
709 new_field,
710 )
711 ):
712 fk_names = self._constraint_names(
713 model, [old_field.column], foreign_key=True
714 )
715 if strict and len(fk_names) != 1:
716 raise ValueError(
717 f"Found wrong number ({len(fk_names)}) of foreign key constraints for {model.model_options.db_table}.{old_field.column}"
718 )
719 for fk_name in fk_names:
720 fks_dropped.add((old_field.column,))
721 self.execute(self._delete_fk_sql(model, fk_name))
722 # Has unique been removed?
723 if old_field.primary_key and (
724 not new_field.primary_key
725 or self._field_became_primary_key(old_field, new_field)
726 ):
727 # Find the unique constraint for this field
728 meta_constraint_names = {
729 constraint.name for constraint in model.model_options.constraints
730 }
731 constraint_names = self._constraint_names(
732 model,
733 [old_field.column],
734 unique=True,
735 primary_key=False,
736 exclude=meta_constraint_names,
737 )
738 if strict and len(constraint_names) != 1:
739 raise ValueError(
740 f"Found wrong number ({len(constraint_names)}) of unique constraints for {model.model_options.db_table}.{old_field.column}"
741 )
742 for constraint_name in constraint_names:
743 sql = self._delete_unique_sql(model, constraint_name)
744 if sql is not None:
745 self.execute(sql)
746 # Drop incoming FK constraints if the field is a primary key or unique,
747 # which might be a to_field target, and things are going to change.
748 drop_foreign_keys = (
749 self.connection.features.supports_foreign_keys
750 and (old_field.primary_key and new_field.primary_key)
751 and old_type != new_type
752 )
753 if drop_foreign_keys:
754 # '_model_meta.related_field' also contains M2M reverse fields, these
755 # will be filtered out
756 for _old_rel, new_rel in _related_non_m2m_objects(old_field, new_field):
757 rel_fk_names = self._constraint_names(
758 new_rel.related_model, [new_rel.field.column], foreign_key=True
759 )
760 for fk_name in rel_fk_names:
761 self.execute(self._delete_fk_sql(new_rel.related_model, fk_name))
762 # Removed an index? (no strict check, as multiple indexes are possible)
763 # Remove indexes if db_index switched to False or a unique constraint
764 # will now be used in lieu of an index. The following lines from the
765 # truth table show all True cases; the rest are False:
766 #
767 # old_field.db_index | old_field.primary_key | new_field.db_index | new_field.primary_key
768 # ------------------------------------------------------------------------------
769 # True | False | False | False
770 # True | False | False | True
771 # True | False | True | True
772 if (
773 isinstance(old_field, ForeignKeyField)
774 and old_field.db_index
775 and not old_field.primary_key
776 and (
777 not (isinstance(new_field, ForeignKeyField) and new_field.db_index)
778 or new_field.primary_key
779 )
780 ):
781 # Find the index for this field
782 meta_index_names = {index.name for index in model.model_options.indexes}
783 # Retrieve only BTREE indexes since this is what's created with
784 # db_index=True.
785 index_names = self._constraint_names(
786 model,
787 [old_field.column],
788 index=True,
789 type_=Index.suffix,
790 exclude=meta_index_names,
791 )
792 for index_name in index_names:
793 # The only way to check if an index was created with
794 # db_index=True or with Index(['field'], name='foo')
795 # is to look at its name (refs #28053).
796 self.execute(self._delete_index_sql(model, index_name))
797 # Change check constraints?
798 old_db_check = self._field_db_check(old_field, old_db_params)
799 new_db_check = self._field_db_check(new_field, new_db_params)
800 if old_db_check != new_db_check and old_db_check:
801 meta_constraint_names = {
802 constraint.name for constraint in model.model_options.constraints
803 }
804 constraint_names = self._constraint_names(
805 model,
806 [old_field.column],
807 check=True,
808 exclude=meta_constraint_names,
809 )
810 if strict and len(constraint_names) != 1:
811 raise ValueError(
812 f"Found wrong number ({len(constraint_names)}) of check constraints for {model.model_options.db_table}.{old_field.column}"
813 )
814 for constraint_name in constraint_names:
815 sql = self._delete_check_sql(model, constraint_name)
816 if sql is not None:
817 self.execute(sql)
818 # Have they renamed the column?
819 if old_field.column != new_field.column:
820 self.execute(
821 self._rename_field_sql(
822 model.model_options.db_table, old_field, new_field, new_type
823 )
824 )
825 # Rename all references to the renamed column.
826 for sql in self.deferred_sql:
827 if isinstance(sql, Statement):
828 sql.rename_column_references(
829 model.model_options.db_table, old_field.column, new_field.column
830 )
831 # Next, start accumulating actions to do
832 actions = []
833 null_actions = []
834 post_actions = []
835 # Type suffix change? (e.g. auto increment).
836 old_type_suffix = old_field.db_type_suffix(connection=self.connection)
837 new_type_suffix = new_field.db_type_suffix(connection=self.connection)
838 # Type change?
839 if old_type != new_type or old_type_suffix != new_type_suffix:
840 fragment, other_actions = self._alter_column_type_sql(
841 model, old_field, new_field, new_type
842 )
843 actions.append(fragment)
844 post_actions.extend(other_actions)
845 # When changing a column NULL constraint to NOT NULL with a given
846 # default value, we need to perform 4 steps:
847 # 1. Add a default for new incoming writes
848 # 2. Update existing NULL rows with new default
849 # 3. Replace NULL constraint with NOT NULL
850 # 4. Drop the default again.
851 # Default change?
852 needs_database_default = False
853 if old_field.allow_null and not new_field.allow_null:
854 old_default = self.effective_default(old_field)
855 new_default = self.effective_default(new_field)
856 if (
857 not self.skip_default_on_alter(new_field)
858 and old_default != new_default
859 and new_default is not None
860 ):
861 needs_database_default = True
862 actions.append(
863 self._alter_column_default_sql(model, old_field, new_field)
864 )
865 # Nullability change?
866 if old_field.allow_null != new_field.allow_null:
867 fragment = self._alter_column_null_sql(model, old_field, new_field)
868 if fragment:
869 null_actions.append(fragment)
870 # Only if we have a default and there is a change from NULL to NOT NULL
871 four_way_default_alteration = new_field.has_default() and (
872 old_field.allow_null and not new_field.allow_null
873 )
874 if actions or null_actions:
875 if not four_way_default_alteration:
876 # If we don't have to do a 4-way default alteration we can
877 # directly run a (NOT) NULL alteration
878 actions += null_actions
879 # Combine actions together if we can (e.g. postgres)
880 if self.connection.features.supports_combined_alters and actions:
881 sql, params = tuple(zip(*actions))
882 actions = [(", ".join(sql), sum(params, []))]
883 # Apply those actions
884 for sql, params in actions:
885 self.execute(
886 self.sql_alter_column
887 % {
888 "table": self.quote_name(model.model_options.db_table),
889 "changes": sql,
890 },
891 params,
892 )
893 if four_way_default_alteration:
894 # Update existing rows with default value
895 self.execute(
896 self.sql_update_with_default
897 % {
898 "table": self.quote_name(model.model_options.db_table),
899 "column": self.quote_name(new_field.column),
900 "default": "%s",
901 },
902 [new_default],
903 )
904 # Since we didn't run a NOT NULL change before we need to do it
905 # now
906 for sql, params in null_actions:
907 self.execute(
908 self.sql_alter_column
909 % {
910 "table": self.quote_name(model.model_options.db_table),
911 "changes": sql,
912 },
913 params,
914 )
915 if post_actions:
916 for sql, params in post_actions:
917 self.execute(sql, params)
918 # If primary_key changed to False, delete the primary key constraint.
919 if old_field.primary_key and not new_field.primary_key:
920 self._delete_primary_key(model, strict)
921
922 # Added an index? Add an index if db_index switched to True or a unique
923 # constraint will no longer be used in lieu of an index. The following
924 # lines from the truth table show all True cases; the rest are False:
925 #
926 # old_field.db_index | old_field.primary_key | new_field.db_index | new_field.primary_key
927 # ------------------------------------------------------------------------------
928 # False | False | True | False
929 # False | True | True | False
930 # True | True | True | False
931 if (
932 (
933 not (isinstance(old_field, ForeignKeyField) and old_field.db_index)
934 or old_field.primary_key
935 )
936 and isinstance(new_field, ForeignKeyField)
937 and new_field.db_index
938 and not new_field.primary_key
939 ):
940 self.execute(self._create_index_sql(model, fields=[new_field]))
941 # Type alteration on primary key? Then we need to alter the column
942 # referring to us.
943 rels_to_update = []
944 if drop_foreign_keys:
945 rels_to_update.extend(_related_non_m2m_objects(old_field, new_field))
946 # Changed to become primary key?
947 if self._field_became_primary_key(old_field, new_field):
948 # Make the new one
949 self.execute(self._create_primary_key_sql(model, new_field))
950 # Update all referencing columns
951 rels_to_update.extend(_related_non_m2m_objects(old_field, new_field))
952 # Handle our type alters on the other end of rels from the PK stuff above
953 for old_rel, new_rel in rels_to_update:
954 rel_db_params = new_rel.field.db_parameters(connection=self.connection)
955 rel_type = rel_db_params["type"]
956 fragment, other_actions = self._alter_column_type_sql(
957 new_rel.related_model,
958 old_rel.field,
959 new_rel.field,
960 rel_type,
961 )
962 self.execute(
963 self.sql_alter_column
964 % {
965 "table": self.quote_name(
966 new_rel.related_model.model_options.db_table
967 ),
968 "changes": fragment[0],
969 },
970 fragment[1],
971 )
972 for sql, params in other_actions:
973 self.execute(sql, params)
974 # Does it have a foreign key?
975 if (
976 self.connection.features.supports_foreign_keys
977 and isinstance(new_field, ForeignKeyField)
978 and (
979 fks_dropped
980 or not isinstance(old_field, ForeignKeyField)
981 or not old_field.db_constraint
982 )
983 and new_field.db_constraint
984 ):
985 self.execute(
986 self._create_fk_sql(model, new_field, "_fk_%(to_table)s_%(to_column)s")
987 )
988 # Rebuild FKs that pointed to us if we previously had to drop them
989 if drop_foreign_keys:
990 for _, rel in rels_to_update:
991 if isinstance(rel.field, ForeignKeyField) and rel.field.db_constraint:
992 self.execute(
993 self._create_fk_sql(rel.related_model, rel.field, "_fk")
994 )
995 # Does it have check constraints we need to add?
996 if old_db_check != new_db_check and new_db_check:
997 constraint_name = self._create_index_name(
998 model.model_options.db_table, [new_field.column], suffix="_check"
999 )
1000 new_check = new_db_params["check"]
1001 assert new_check is not None # Guaranteed by new_db_check check above
1002 sql = self._create_check_sql(model, constraint_name, new_check)
1003 if sql is not None:
1004 self.execute(sql)
1005 # Drop the default if we need to
1006 # (Plain usually does not use in-database defaults)
1007 if needs_database_default:
1008 changes_sql, params = self._alter_column_default_sql(
1009 model, old_field, new_field, drop=True
1010 )
1011 sql = self.sql_alter_column % {
1012 "table": self.quote_name(model.model_options.db_table),
1013 "changes": changes_sql,
1014 }
1015 self.execute(sql, params)
1016
1017 def _alter_column_null_sql(
1018 self, model: type[Model], old_field: Field, new_field: Field
1019 ) -> tuple[str, list[Any]]:
1020 """
1021 Hook to specialize column null alteration.
1022
1023 Return a (sql, params) fragment to set a column to null or non-null
1024 as required by new_field, or None if no changes are required.
1025 """
1026 new_db_params = new_field.db_parameters(connection=self.connection)
1027 sql = (
1028 self.sql_alter_column_null
1029 if new_field.allow_null
1030 else self.sql_alter_column_not_null
1031 )
1032 return (
1033 sql
1034 % {
1035 "column": self.quote_name(new_field.column),
1036 "type": new_db_params["type"],
1037 },
1038 [],
1039 )
1040
1041 def _alter_column_default_sql(
1042 self,
1043 model: type[Model],
1044 old_field: Field | None,
1045 new_field: Field,
1046 drop: bool = False,
1047 ) -> tuple[str, list[Any]]:
1048 """
1049 Hook to specialize column default alteration.
1050
1051 Return a (sql, params) fragment to add or drop (depending on the drop
1052 argument) a default to new_field's column.
1053 """
1054 new_default = self.effective_default(new_field)
1055 default = self._column_default_sql(new_field)
1056 params = [new_default]
1057
1058 if drop:
1059 params = []
1060 elif self.connection.features.requires_literal_defaults:
1061 # Some databases (Oracle) can't take defaults as a parameter
1062 # If this is the case, the SchemaEditor for that database should
1063 # implement prepare_default().
1064 default = self.prepare_default(new_default)
1065 params = []
1066
1067 new_db_params = new_field.db_parameters(connection=self.connection)
1068 if drop:
1069 if new_field.allow_null:
1070 sql = self.sql_alter_column_no_default_null
1071 else:
1072 sql = self.sql_alter_column_no_default
1073 else:
1074 sql = self.sql_alter_column_default
1075 return (
1076 sql
1077 % {
1078 "column": self.quote_name(new_field.column),
1079 "type": new_db_params["type"],
1080 "default": default,
1081 },
1082 params,
1083 )
1084
1085 def _alter_column_type_sql(
1086 self,
1087 model: type[Model],
1088 old_field: Field,
1089 new_field: Field,
1090 new_type: str,
1091 ) -> tuple[tuple[str, list[Any]], list[tuple[str, list[Any]]]]:
1092 """
1093 Hook to specialize column type alteration for different backends,
1094 for cases when a creation type is different to an alteration type
1095 (e.g. SERIAL in PostgreSQL, PostGIS fields).
1096
1097 Return a two-tuple of: an SQL fragment of (sql, params) to insert into
1098 an ALTER TABLE statement and a list of extra (sql, params) tuples to
1099 run once the field is altered.
1100 """
1101 other_actions: list[tuple[str, list[Any]]] = []
1102 return (
1103 (
1104 self.sql_alter_column_type
1105 % {
1106 "column": self.quote_name(new_field.column),
1107 "type": new_type,
1108 },
1109 [],
1110 ),
1111 other_actions,
1112 )
1113
1114 def _alter_many_to_many(
1115 self,
1116 model: type[Model],
1117 old_field: ManyToManyField,
1118 new_field: ManyToManyField,
1119 strict: bool,
1120 ) -> None:
1121 """Alter M2Ms to repoint their to= endpoints."""
1122 # Type narrow for ManyToManyField.remote_field
1123 old_rel: ManyToManyRel = old_field.remote_field
1124 new_rel: ManyToManyRel = new_field.remote_field
1125
1126 # Rename the through table
1127 if (
1128 old_rel.through.model_options.db_table
1129 != new_rel.through.model_options.db_table
1130 ):
1131 self.alter_db_table(
1132 old_rel.through,
1133 old_rel.through.model_options.db_table,
1134 new_rel.through.model_options.db_table,
1135 )
1136 # Repoint the FK to the other side
1137 old_reverse_field = old_rel.through._model_meta.get_forward_field(
1138 old_field.m2m_reverse_field_name()
1139 )
1140 new_reverse_field = new_rel.through._model_meta.get_forward_field(
1141 new_field.m2m_reverse_field_name()
1142 )
1143 self.alter_field(
1144 new_rel.through,
1145 # The field that points to the target model is needed, so we can
1146 # tell alter_field to change it - this is m2m_reverse_field_name()
1147 # (as opposed to m2m_field_name(), which points to our model).
1148 old_reverse_field,
1149 new_reverse_field,
1150 )
1151 old_m2m_field = old_rel.through._model_meta.get_forward_field(
1152 old_field.m2m_field_name()
1153 )
1154 new_m2m_field = new_rel.through._model_meta.get_forward_field(
1155 new_field.m2m_field_name()
1156 )
1157 self.alter_field(
1158 new_rel.through,
1159 # for self-referential models we need to alter field from the other end too
1160 old_m2m_field,
1161 new_m2m_field,
1162 )
1163
1164 def _create_index_name(
1165 self, table_name: str, column_names: list[str], suffix: str = ""
1166 ) -> str:
1167 """
1168 Generate a unique name for an index/unique constraint.
1169
1170 The name is divided into 3 parts: the table name, the column names,
1171 and a unique digest and suffix.
1172 """
1173 _, table_name = split_identifier(table_name)
1174 hash_suffix_part = (
1175 f"{names_digest(table_name, *column_names, length=8)}{suffix}"
1176 )
1177 max_length = self.connection.ops.max_name_length() or 200
1178 # If everything fits into max_length, use that name.
1179 index_name = "{}_{}_{}".format(
1180 table_name, "_".join(column_names), hash_suffix_part
1181 )
1182 if len(index_name) <= max_length:
1183 return index_name
1184 # Shorten a long suffix.
1185 if len(hash_suffix_part) > max_length / 3:
1186 hash_suffix_part = hash_suffix_part[: max_length // 3]
1187 other_length = (max_length - len(hash_suffix_part)) // 2 - 1
1188 index_name = "{}_{}_{}".format(
1189 table_name[:other_length],
1190 "_".join(column_names)[:other_length],
1191 hash_suffix_part,
1192 )
1193 # Prepend D if needed to prevent the name from starting with an
1194 # underscore or a number (not permitted on Oracle).
1195 if index_name[0] == "_" or index_name[0].isdigit():
1196 index_name = f"D{index_name[:-1]}"
1197 return index_name
1198
1199 def _index_condition_sql(self, condition: str | None) -> str:
1200 if condition:
1201 return " WHERE " + condition
1202 return ""
1203
1204 def _index_include_sql(
1205 self, model: type[Model], columns: list[str] | None
1206 ) -> str | Statement:
1207 if not columns or not self.connection.features.supports_covering_indexes:
1208 return ""
1209 return Statement(
1210 " INCLUDE (%(columns)s)",
1211 columns=Columns(model.model_options.db_table, columns, self.quote_name),
1212 )
1213
1214 def _create_index_sql(
1215 self,
1216 model: type[Model],
1217 *,
1218 fields: list[Field] | None = None,
1219 name: str | None = None,
1220 suffix: str = "",
1221 using: str = "",
1222 col_suffixes: tuple[str, ...] = (),
1223 sql: str | None = None,
1224 opclasses: tuple[str, ...] = (),
1225 condition: str | None = None,
1226 include: list[str] | None = None,
1227 expressions: Any = None,
1228 ) -> Statement:
1229 """
1230 Return the SQL statement to create the index for one or several fields
1231 or expressions. `sql` can be specified if the syntax differs from the
1232 standard (GIS indexes, ...).
1233 """
1234 fields = fields or []
1235 expressions = expressions or []
1236 compiler = Query(model, alias_cols=False).get_compiler()
1237 columns = [field.column for field in fields]
1238 sql_create_index = sql or self.sql_create_index
1239 table = model.model_options.db_table
1240
1241 def create_index_name(*args: Any, **kwargs: Any) -> str:
1242 nonlocal name
1243 if name is None:
1244 name = self._create_index_name(*args, **kwargs)
1245 return self.quote_name(name)
1246
1247 return Statement(
1248 sql_create_index,
1249 table=Table(table, self.quote_name),
1250 name=IndexName(table, columns, suffix, create_index_name),
1251 using=using,
1252 columns=(
1253 self._index_columns(table, columns, col_suffixes, opclasses)
1254 if columns
1255 else Expressions(table, expressions, compiler, self.quote_value)
1256 ),
1257 extra="",
1258 condition=self._index_condition_sql(condition),
1259 include=self._index_include_sql(model, include),
1260 )
1261
1262 def _delete_index_sql(
1263 self, model: type[Model], name: str, sql: str | None = None
1264 ) -> Statement:
1265 return Statement(
1266 sql or self.sql_delete_index,
1267 table=Table(model.model_options.db_table, self.quote_name),
1268 name=self.quote_name(name),
1269 )
1270
1271 def _rename_index_sql(
1272 self, model: type[Model], old_name: str, new_name: str
1273 ) -> Statement:
1274 return Statement(
1275 self.sql_rename_index,
1276 table=Table(model.model_options.db_table, self.quote_name),
1277 old_name=self.quote_name(old_name),
1278 new_name=self.quote_name(new_name),
1279 )
1280
1281 def _index_columns(
1282 self,
1283 table: str,
1284 columns: list[str],
1285 col_suffixes: tuple[str, ...],
1286 opclasses: tuple[str, ...],
1287 ) -> Columns:
1288 return Columns(table, columns, self.quote_name, col_suffixes=col_suffixes)
1289
1290 def _model_indexes_sql(self, model: type[Model]) -> list[Statement | None]:
1291 """
1292 Return a list of all index SQL statements (field indexes, Meta.indexes) for the specified model.
1293 """
1294 output: list[Statement | None] = []
1295 for field in model._model_meta.local_fields:
1296 output.extend(self._field_indexes_sql(model, field))
1297
1298 for index in model.model_options.indexes:
1299 if (
1300 not index.contains_expressions
1301 or self.connection.features.supports_expression_indexes
1302 ):
1303 output.append(index.create_sql(model, self))
1304 return output
1305
1306 def _field_indexes_sql(self, model: type[Model], field: Field) -> list[Statement]:
1307 """
1308 Return a list of all index SQL statements for the specified field.
1309 """
1310 output: list[Statement] = []
1311 if self._field_should_be_indexed(model, field):
1312 output.append(self._create_index_sql(model, fields=[field]))
1313 return output
1314
1315 def _field_should_be_altered(
1316 self, old_field: Field, new_field: Field, ignore: set[str] | None = None
1317 ) -> bool:
1318 ignore = ignore or set()
1319 _, old_path, old_args, old_kwargs = old_field.deconstruct()
1320 _, new_path, new_args, new_kwargs = new_field.deconstruct()
1321 # Don't alter when:
1322 # - changing only a field name
1323 # - changing an attribute that doesn't affect the schema
1324 # - changing an attribute in the provided set of ignored attributes
1325 for attr in ignore.union(old_field.non_db_attrs):
1326 old_kwargs.pop(attr, None)
1327 for attr in ignore.union(new_field.non_db_attrs):
1328 new_kwargs.pop(attr, None)
1329 return self.quote_name(old_field.column) != self.quote_name(
1330 new_field.column
1331 ) or (old_path, old_args, old_kwargs) != (new_path, new_args, new_kwargs)
1332
1333 def _field_should_be_indexed(self, model: type[Model], field: Field) -> bool:
1334 if isinstance(field, ForeignKeyField):
1335 return bool(field.remote_field) and field.db_index and not field.primary_key
1336 return False
1337
1338 def _field_became_primary_key(self, old_field: Field, new_field: Field) -> bool:
1339 return not old_field.primary_key and new_field.primary_key
1340
1341 def _rename_field_sql(
1342 self, table: str, old_field: Field, new_field: Field, new_type: str
1343 ) -> str:
1344 return self.sql_rename_column % {
1345 "table": self.quote_name(table),
1346 "old_column": self.quote_name(old_field.column),
1347 "new_column": self.quote_name(new_field.column),
1348 "type": new_type,
1349 }
1350
1351 def _create_fk_sql(
1352 self, model: type[Model], field: ForeignKeyField, suffix: str
1353 ) -> Statement:
1354 table = Table(model.model_options.db_table, self.quote_name)
1355 name = self._fk_constraint_name(model, field, suffix)
1356 column = Columns(model.model_options.db_table, [field.column], self.quote_name)
1357 to_table = Table(
1358 field.target_field.model.model_options.db_table, self.quote_name
1359 )
1360 to_column = Columns(
1361 field.target_field.model.model_options.db_table,
1362 [field.target_field.column],
1363 self.quote_name,
1364 )
1365 deferrable = self.connection.ops.deferrable_sql()
1366 return Statement(
1367 self.sql_create_fk,
1368 table=table,
1369 name=name,
1370 column=column,
1371 to_table=to_table,
1372 to_column=to_column,
1373 deferrable=deferrable,
1374 )
1375
1376 def _fk_constraint_name(
1377 self, model: type[Model], field: ForeignKeyField, suffix: str
1378 ) -> ForeignKeyName:
1379 def create_fk_name(*args: Any, **kwargs: Any) -> str:
1380 return self.quote_name(self._create_index_name(*args, **kwargs))
1381
1382 return ForeignKeyName(
1383 model.model_options.db_table,
1384 [field.column],
1385 split_identifier(field.target_field.model.model_options.db_table)[1],
1386 [field.target_field.column],
1387 suffix,
1388 create_fk_name,
1389 )
1390
1391 def _delete_fk_sql(self, model: type[Model], name: str) -> Statement:
1392 return self._delete_constraint_sql(self.sql_delete_fk, model, name)
1393
1394 def _deferrable_constraint_sql(self, deferrable: Deferrable | None) -> str:
1395 if deferrable is None:
1396 return ""
1397 if deferrable == Deferrable.DEFERRED:
1398 return " DEFERRABLE INITIALLY DEFERRED"
1399 if deferrable == Deferrable.IMMEDIATE:
1400 return " DEFERRABLE INITIALLY IMMEDIATE"
1401 return ""
1402
1403 def _unique_sql(
1404 self,
1405 model: type[Model],
1406 fields: Iterable[Field],
1407 name: str,
1408 condition: str | None = None,
1409 deferrable: Deferrable | None = None,
1410 include: list[str] | None = None,
1411 opclasses: tuple[str, ...] | None = None,
1412 expressions: Any = None,
1413 ) -> str | None:
1414 if (
1415 deferrable
1416 and not self.connection.features.supports_deferrable_unique_constraints
1417 ):
1418 return None
1419 if condition or include or opclasses or expressions:
1420 # Databases support conditional, covering, and functional unique
1421 # constraints via a unique index.
1422 sql = self._create_unique_sql(
1423 model,
1424 fields,
1425 name=name,
1426 condition=condition,
1427 include=include,
1428 opclasses=opclasses,
1429 expressions=expressions,
1430 )
1431 if sql:
1432 self.deferred_sql.append(sql)
1433 return None
1434 constraint = self.sql_unique_constraint % {
1435 "columns": ", ".join([self.quote_name(field.column) for field in fields]),
1436 "deferrable": self._deferrable_constraint_sql(deferrable),
1437 }
1438 return self.sql_constraint % {
1439 "name": self.quote_name(name),
1440 "constraint": constraint,
1441 }
1442
1443 def _create_unique_sql(
1444 self,
1445 model: type[Model],
1446 fields: Iterable[Field],
1447 name: str | None = None,
1448 condition: str | None = None,
1449 deferrable: Deferrable | None = None,
1450 include: list[str] | None = None,
1451 opclasses: tuple[str, ...] | None = None,
1452 expressions: Any = None,
1453 ) -> Statement | None:
1454 if (
1455 (
1456 deferrable
1457 and not self.connection.features.supports_deferrable_unique_constraints
1458 )
1459 or (condition and not self.connection.features.supports_partial_indexes)
1460 or (include and not self.connection.features.supports_covering_indexes)
1461 or (
1462 expressions and not self.connection.features.supports_expression_indexes
1463 )
1464 ):
1465 return None
1466
1467 compiler = Query(model, alias_cols=False).get_compiler()
1468 table = model.model_options.db_table
1469 columns = [field.column for field in fields]
1470 constraint_name: IndexName | str
1471 if name is None:
1472 constraint_name = self._unique_constraint_name(table, columns, quote=True)
1473 else:
1474 constraint_name = self.quote_name(name)
1475 if condition or include or opclasses or expressions:
1476 sql = self.sql_create_unique_index
1477 else:
1478 sql = self.sql_create_unique
1479 if columns:
1480 columns_obj: Columns | Expressions = self._index_columns(
1481 table, columns, col_suffixes=(), opclasses=opclasses or ()
1482 )
1483 else:
1484 columns_obj = Expressions(table, expressions, compiler, self.quote_value)
1485 return Statement(
1486 sql,
1487 table=Table(table, self.quote_name),
1488 name=constraint_name,
1489 columns=columns_obj,
1490 condition=self._index_condition_sql(condition),
1491 deferrable=self._deferrable_constraint_sql(deferrable),
1492 include=self._index_include_sql(model, include),
1493 )
1494
1495 def _unique_constraint_name(
1496 self, table: str, columns: list[str], quote: bool = True
1497 ) -> IndexName | str:
1498 if quote:
1499
1500 def create_unique_name(*args: Any, **kwargs: Any) -> str:
1501 return self.quote_name(self._create_index_name(*args, **kwargs))
1502
1503 else:
1504 create_unique_name = self._create_index_name
1505
1506 return IndexName(table, columns, "_uniq", create_unique_name)
1507
1508 def _delete_unique_sql(
1509 self,
1510 model: type[Model],
1511 name: str,
1512 condition: str | None = None,
1513 deferrable: Deferrable | None = None,
1514 include: list[str] | None = None,
1515 opclasses: tuple[str, ...] | None = None,
1516 expressions: Any = None,
1517 ) -> Statement | None:
1518 if (
1519 (
1520 deferrable
1521 and not self.connection.features.supports_deferrable_unique_constraints
1522 )
1523 or (condition and not self.connection.features.supports_partial_indexes)
1524 or (include and not self.connection.features.supports_covering_indexes)
1525 or (
1526 expressions and not self.connection.features.supports_expression_indexes
1527 )
1528 ):
1529 return None
1530 if condition or include or opclasses or expressions:
1531 sql = self.sql_delete_index
1532 else:
1533 sql = self.sql_delete_unique
1534 return self._delete_constraint_sql(sql, model, name)
1535
1536 def _check_sql(self, name: str, check: str) -> str:
1537 return self.sql_constraint % {
1538 "name": self.quote_name(name),
1539 "constraint": self.sql_check_constraint % {"check": check},
1540 }
1541
1542 def _create_check_sql(
1543 self, model: type[Model], name: str, check: str
1544 ) -> Statement | None:
1545 if not self.connection.features.supports_table_check_constraints:
1546 return None
1547 return Statement(
1548 self.sql_create_check,
1549 table=Table(model.model_options.db_table, self.quote_name),
1550 name=self.quote_name(name),
1551 check=check,
1552 )
1553
1554 def _delete_check_sql(self, model: type[Model], name: str) -> Statement | None:
1555 if not self.connection.features.supports_table_check_constraints:
1556 return None
1557 return self._delete_constraint_sql(self.sql_delete_check, model, name)
1558
1559 def _delete_constraint_sql(
1560 self, template: str, model: type[Model], name: str
1561 ) -> Statement:
1562 return Statement(
1563 template,
1564 table=Table(model.model_options.db_table, self.quote_name),
1565 name=self.quote_name(name),
1566 )
1567
1568 def _constraint_names(
1569 self,
1570 model: type[Model],
1571 column_names: list[str] | None = None,
1572 unique: bool | None = None,
1573 primary_key: bool | None = None,
1574 index: bool | None = None,
1575 foreign_key: bool | None = None,
1576 check: bool | None = None,
1577 type_: str | None = None,
1578 exclude: set[str] | None = None,
1579 ) -> list[str]:
1580 """Return all constraint names matching the columns and conditions."""
1581 if column_names is not None:
1582 column_names = [
1583 self.connection.introspection.identifier_converter(
1584 truncate_name(name, self.connection.ops.max_name_length())
1585 )
1586 if self.connection.features.truncates_names
1587 else self.connection.introspection.identifier_converter(name)
1588 for name in column_names
1589 ]
1590 with self.connection.cursor() as cursor:
1591 constraints = self.connection.introspection.get_constraints(
1592 cursor, model.model_options.db_table
1593 )
1594 result: list[str] = []
1595 for name, infodict in constraints.items():
1596 if column_names is None or column_names == infodict["columns"]:
1597 if unique is not None and infodict["unique"] != unique:
1598 continue
1599 if primary_key is not None and infodict["primary_key"] != primary_key:
1600 continue
1601 if index is not None and infodict["index"] != index:
1602 continue
1603 if check is not None and infodict["check"] != check:
1604 continue
1605 if foreign_key is not None and not infodict["foreign_key"]:
1606 continue
1607 if type_ is not None and infodict["type"] != type_:
1608 continue
1609 if not exclude or name not in exclude:
1610 result.append(name)
1611 return result
1612
1613 def _delete_primary_key(self, model: type[Model], strict: bool = False) -> None:
1614 constraint_names = self._constraint_names(model, primary_key=True)
1615 if strict and len(constraint_names) != 1:
1616 raise ValueError(
1617 f"Found wrong number ({len(constraint_names)}) of PK constraints for {model.model_options.db_table}"
1618 )
1619 for constraint_name in constraint_names:
1620 self.execute(self._delete_primary_key_sql(model, constraint_name))
1621
1622 def _create_primary_key_sql(self, model: type[Model], field: Field) -> Statement:
1623 return Statement(
1624 self.sql_create_pk,
1625 table=Table(model.model_options.db_table, self.quote_name),
1626 name=self.quote_name(
1627 self._create_index_name(
1628 model.model_options.db_table, [field.column], suffix="_pk"
1629 )
1630 ),
1631 columns=Columns(
1632 model.model_options.db_table, [field.column], self.quote_name
1633 ),
1634 )
1635
1636 def _delete_primary_key_sql(self, model: type[Model], name: str) -> Statement:
1637 return self._delete_constraint_sql(self.sql_delete_pk, model, name)