1from __future__ import annotations
2
3import logging
4import operator
5from collections.abc import Generator
6from datetime import datetime
7from typing import TYPE_CHECKING, Any
8
9from plain.models.backends.ddl_references import (
10 Columns,
11 Expressions,
12 ForeignKeyName,
13 IndexName,
14 Statement,
15 Table,
16)
17from plain.models.backends.utils import names_digest, split_identifier, truncate_name
18from plain.models.constraints import Deferrable
19from plain.models.indexes import Index
20from plain.models.sql import Query
21from plain.models.transaction import TransactionManagementError, atomic
22from plain.utils import timezone
23
24if TYPE_CHECKING:
25 from collections.abc import Iterable
26
27 from plain.models.backends.base.base import BaseDatabaseWrapper
28 from plain.models.base import Model
29 from plain.models.constraints import BaseConstraint
30 from plain.models.fields import Field
31 from plain.models.fields.related import ForeignKey, ManyToManyField
32 from plain.models.fields.reverse_related import ManyToManyRel
33
34logger = logging.getLogger("plain.models.backends.schema")
35
36
37def _is_relevant_relation(relation: Any, altered_field: Field) -> bool:
38 """
39 When altering the given field, must constraints on its model from the given
40 relation be temporarily dropped?
41 """
42 field = relation.field
43 if field.many_to_many:
44 # M2M reverse field
45 return False
46 if altered_field.primary_key:
47 # Foreign key constraint on the primary key, which is being altered.
48 return True
49 # ForeignKey always targets 'id'
50 return altered_field.name == "id"
51
52
53def _all_related_fields(model: type[Model]) -> list[Any]:
54 # Related fields must be returned in a deterministic order.
55 return sorted(
56 model._model_meta._get_fields(
57 forward=False,
58 reverse=True,
59 include_hidden=True,
60 ),
61 key=operator.attrgetter("name"),
62 )
63
64
65def _related_non_m2m_objects(
66 old_field: Field, new_field: Field
67) -> Generator[tuple[Any, Any], None, None]:
68 # Filter out m2m objects from reverse relations.
69 # Return (old_relation, new_relation) tuples.
70 related_fields = zip(
71 (
72 obj
73 for obj in _all_related_fields(old_field.model)
74 if _is_relevant_relation(obj, old_field)
75 ),
76 (
77 obj
78 for obj in _all_related_fields(new_field.model)
79 if _is_relevant_relation(obj, new_field)
80 ),
81 )
82 for old_rel, new_rel in related_fields:
83 yield old_rel, new_rel
84 yield from _related_non_m2m_objects(
85 old_rel.remote_field,
86 new_rel.remote_field,
87 )
88
89
90class BaseDatabaseSchemaEditor:
91 """
92 This class and its subclasses are responsible for emitting schema-changing
93 statements to the databases - model creation/removal/alteration, field
94 renaming, index fiddling, and so on.
95 """
96
97 # Overrideable SQL templates
98 sql_create_table = "CREATE TABLE %(table)s (%(definition)s)"
99 sql_rename_table = "ALTER TABLE %(old_table)s RENAME TO %(new_table)s"
100 sql_delete_table = "DROP TABLE %(table)s CASCADE"
101
102 sql_create_column = "ALTER TABLE %(table)s ADD COLUMN %(column)s %(definition)s"
103 sql_alter_column = "ALTER TABLE %(table)s %(changes)s"
104 sql_alter_column_type = "ALTER COLUMN %(column)s TYPE %(type)s%(collation)s"
105 sql_alter_column_null = "ALTER COLUMN %(column)s DROP NOT NULL"
106 sql_alter_column_not_null = "ALTER COLUMN %(column)s SET NOT NULL"
107 sql_alter_column_default = "ALTER COLUMN %(column)s SET DEFAULT %(default)s"
108 sql_alter_column_no_default = "ALTER COLUMN %(column)s DROP DEFAULT"
109 sql_alter_column_no_default_null = sql_alter_column_no_default
110 sql_delete_column = "ALTER TABLE %(table)s DROP COLUMN %(column)s CASCADE"
111 sql_rename_column = (
112 "ALTER TABLE %(table)s RENAME COLUMN %(old_column)s TO %(new_column)s"
113 )
114 sql_update_with_default = (
115 "UPDATE %(table)s SET %(column)s = %(default)s WHERE %(column)s IS NULL"
116 )
117
118 sql_unique_constraint = "UNIQUE (%(columns)s)%(deferrable)s"
119 sql_check_constraint = "CHECK (%(check)s)"
120 sql_delete_constraint = "ALTER TABLE %(table)s DROP CONSTRAINT %(name)s"
121 sql_constraint = "CONSTRAINT %(name)s %(constraint)s"
122
123 sql_create_check = "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s CHECK (%(check)s)"
124 sql_delete_check = sql_delete_constraint
125
126 sql_create_unique = (
127 "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s "
128 "UNIQUE (%(columns)s)%(deferrable)s"
129 )
130 sql_delete_unique = sql_delete_constraint
131
132 sql_create_fk = (
133 "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s FOREIGN KEY (%(column)s) "
134 "REFERENCES %(to_table)s (%(to_column)s)%(deferrable)s"
135 )
136 sql_create_inline_fk = None
137 sql_create_column_inline_fk = None
138 sql_delete_fk = sql_delete_constraint
139
140 sql_create_index = (
141 "CREATE INDEX %(name)s ON %(table)s "
142 "(%(columns)s)%(include)s%(extra)s%(condition)s"
143 )
144 sql_create_unique_index = (
145 "CREATE UNIQUE INDEX %(name)s ON %(table)s "
146 "(%(columns)s)%(include)s%(condition)s"
147 )
148 sql_rename_index = "ALTER INDEX %(old_name)s RENAME TO %(new_name)s"
149 sql_delete_index = "DROP INDEX %(name)s"
150
151 sql_create_pk = (
152 "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s PRIMARY KEY (%(columns)s)"
153 )
154 sql_delete_pk = sql_delete_constraint
155
156 sql_alter_table_comment = "COMMENT ON TABLE %(table)s IS %(comment)s"
157 sql_alter_column_comment = "COMMENT ON COLUMN %(table)s.%(column)s IS %(comment)s"
158
159 def __init__(
160 self,
161 connection: BaseDatabaseWrapper,
162 atomic: bool = True,
163 ):
164 self.connection = connection
165 self.atomic_migration = self.connection.features.can_rollback_ddl and atomic
166
167 # State-managing methods
168
169 def __enter__(self) -> BaseDatabaseSchemaEditor:
170 self.deferred_sql: list[Any] = []
171 self.executed_sql: list[str] = []
172 if self.atomic_migration:
173 self.atomic = atomic()
174 self.atomic.__enter__()
175 return self
176
177 def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
178 if exc_type is None:
179 for sql in self.deferred_sql:
180 self.execute(sql)
181 if self.atomic_migration:
182 self.atomic.__exit__(exc_type, exc_value, traceback)
183
184 # Core utility functions
185
186 def execute(
187 self, sql: str | Statement, params: tuple[Any, ...] | list[Any] | None = ()
188 ) -> None:
189 """Execute the given SQL statement, with optional parameters."""
190 if (
191 self.connection.in_atomic_block
192 and not self.connection.features.can_rollback_ddl
193 ):
194 raise TransactionManagementError(
195 "Executing DDL statements while in a transaction on databases "
196 "that can't perform a rollback is prohibited."
197 )
198 # Account for non-string statement objects.
199 sql = str(sql)
200 # Log the command we're running, then run it
201 logger.debug(
202 "%s; (params %r)", sql, params, extra={"params": params, "sql": sql}
203 )
204
205 # Track executed SQL for display in migration output
206 # Store the SQL for display (interpolate params for readability)
207 if params:
208 self.executed_sql.append(sql % tuple(map(self.quote_value, params)))
209 else:
210 self.executed_sql.append(sql)
211
212 with self.connection.cursor() as cursor:
213 cursor.execute(sql, params)
214
215 def quote_name(self, name: str) -> str:
216 return self.connection.ops.quote_name(name)
217
218 def table_sql(self, model: type[Model]) -> tuple[str, list[Any]]:
219 """Take a model and return its table definition."""
220 # Create column SQL, add FK deferreds if needed.
221 column_sqls = []
222 params = []
223 for field in model._model_meta.local_fields:
224 # SQL.
225 definition, extra_params = self.column_sql(model, field)
226 if definition is None:
227 continue
228 # Check constraints can go on the column SQL here.
229 db_params = field.db_parameters(connection=self.connection)
230 if db_params["check"]:
231 definition += " " + self.sql_check_constraint % db_params
232 # Autoincrement SQL (for backends with inline variant).
233 col_type_suffix = field.db_type_suffix(connection=self.connection)
234 if col_type_suffix:
235 definition += f" {col_type_suffix}"
236 params.extend(extra_params)
237 # FK.
238 if field.remote_field and field.db_constraint: # type: ignore[attr-defined]
239 to_table = field.remote_field.model.model_options.db_table
240 to_column = field.remote_field.model._model_meta.get_field(
241 field.remote_field.field_name
242 ).column
243 if self.sql_create_inline_fk:
244 definition += " " + self.sql_create_inline_fk % {
245 "to_table": self.quote_name(to_table),
246 "to_column": self.quote_name(to_column),
247 }
248 elif self.connection.features.supports_foreign_keys:
249 self.deferred_sql.append(
250 self._create_fk_sql(
251 model, field, "_fk_%(to_table)s_%(to_column)s"
252 )
253 )
254 # Add the SQL to our big list.
255 column_sqls.append(f"{self.quote_name(field.column)} {definition}")
256 # Autoincrement SQL (for backends with post table definition
257 # variant).
258 if field.get_internal_type() in ("PrimaryKeyField",):
259 autoinc_sql = self.connection.ops.autoinc_sql(
260 model.model_options.db_table, field.column
261 )
262 if autoinc_sql:
263 self.deferred_sql.extend(autoinc_sql)
264 constraints = [
265 constraint.constraint_sql(model, self)
266 for constraint in model.model_options.constraints
267 ]
268 sql = self.sql_create_table % {
269 "table": self.quote_name(model.model_options.db_table),
270 "definition": ", ".join(
271 str(constraint)
272 for constraint in (*column_sqls, *constraints)
273 if constraint
274 ),
275 }
276 return sql, params
277
278 # Field <-> database mapping functions
279
280 def _iter_column_sql(
281 self,
282 column_db_type: str,
283 params: list[Any],
284 model: type[Model],
285 field: Field,
286 field_db_params: dict[str, Any],
287 include_default: bool,
288 ) -> Generator[str, None, None]:
289 yield column_db_type
290 if collation := field_db_params.get("collation"):
291 yield self._collate_sql(collation)
292 if self.connection.features.supports_comments_inline and field.db_comment:
293 yield self._comment_sql(field.db_comment)
294 # Work out nullability.
295 null = field.allow_null
296 # Include a default value, if requested.
297 include_default = (
298 include_default
299 and not self.skip_default(field)
300 and
301 # Don't include a default value if it's a nullable field and the
302 # default cannot be dropped in the ALTER COLUMN statement (e.g.
303 # MySQL longtext and longblob).
304 not (null and self.skip_default_on_alter(field))
305 )
306 if include_default:
307 default_value = self.effective_default(field)
308 if default_value is not None:
309 column_default = "DEFAULT " + self._column_default_sql(field)
310 if self.connection.features.requires_literal_defaults:
311 # Some databases can't take defaults as a parameter (Oracle).
312 # If this is the case, the individual schema backend should
313 # implement prepare_default().
314 yield column_default % self.prepare_default(default_value)
315 else:
316 yield column_default
317 params.append(default_value)
318
319 if not null:
320 yield "NOT NULL"
321 elif not self.connection.features.implied_column_null:
322 yield "NULL"
323
324 if field.primary_key:
325 yield "PRIMARY KEY"
326
327 def column_sql(
328 self, model: type[Model], field: Field, include_default: bool = False
329 ) -> tuple[str | None, list[Any] | None]:
330 """
331 Return the column definition for a field. The field must already have
332 had set_attributes_from_name() called.
333 """
334 # Get the column's type and use that as the basis of the SQL.
335 field_db_params = field.db_parameters(connection=self.connection)
336 column_db_type = field_db_params["type"]
337 # Check for fields that aren't actually columns (e.g. M2M).
338 if column_db_type is None:
339 return None, None
340 params: list[Any] = []
341 return (
342 " ".join(
343 # This appends to the params being returned.
344 self._iter_column_sql(
345 column_db_type,
346 params,
347 model,
348 field,
349 field_db_params,
350 include_default,
351 )
352 ),
353 params,
354 )
355
356 def skip_default(self, field: Field) -> bool:
357 """
358 Some backends don't accept default values for certain columns types
359 (i.e. MySQL longtext and longblob).
360 """
361 return False
362
363 def skip_default_on_alter(self, field: Field) -> bool:
364 """
365 Some backends don't accept default values for certain columns types
366 (i.e. MySQL longtext and longblob) in the ALTER COLUMN statement.
367 """
368 return False
369
370 def prepare_default(self, value: Any) -> str:
371 """
372 Only used for backends which have requires_literal_defaults feature
373 """
374 raise NotImplementedError(
375 "subclasses of BaseDatabaseSchemaEditor for backends which have "
376 "requires_literal_defaults must provide a prepare_default() method"
377 )
378
379 def _column_default_sql(self, field: Field) -> str:
380 """
381 Return the SQL to use in a DEFAULT clause. The resulting string should
382 contain a '%s' placeholder for a default value.
383 """
384 return "%s"
385
386 @staticmethod
387 def _effective_default(field: Field) -> Any:
388 # This method allows testing its logic without a connection.
389 if field.has_default():
390 default = field.get_default()
391 elif (
392 not field.allow_null and not field.required and field.empty_strings_allowed
393 ):
394 if field.get_internal_type() == "BinaryField":
395 default = b""
396 else:
397 default = ""
398 elif getattr(field, "auto_now", False) or getattr(field, "auto_now_add", False):
399 internal_type = field.get_internal_type()
400 if internal_type == "DateTimeField":
401 default = timezone.now()
402 else:
403 default = datetime.now()
404 if internal_type == "DateField":
405 default = default.date()
406 elif internal_type == "TimeField":
407 default = default.time()
408 else:
409 default = None
410 return default
411
412 def effective_default(self, field: Field) -> Any:
413 """Return a field's effective database default value."""
414 return field.get_db_prep_save(self._effective_default(field), self.connection)
415
416 def quote_value(self, value: Any) -> str:
417 """
418 Return a quoted version of the value so it's safe to use in an SQL
419 string. This is not safe against injection from user code; it is
420 intended only for use in making SQL scripts or preparing default values
421 for particularly tricky backends (defaults are not user-defined, though,
422 so this is safe).
423 """
424 raise NotImplementedError()
425
426 # Actions
427
428 def create_model(self, model: type[Model]) -> None:
429 """
430 Create a table and any accompanying indexes or unique constraints for
431 the given `model`.
432 """
433 sql, params = self.table_sql(model)
434 # Prevent using [] as params, in the case a literal '%' is used in the
435 # definition.
436 self.execute(sql, params or None)
437
438 if self.connection.features.supports_comments:
439 # Add table comment.
440 if model.model_options.db_table_comment:
441 self.alter_db_table_comment(
442 model, None, model.model_options.db_table_comment
443 )
444 # Add column comments.
445 if not self.connection.features.supports_comments_inline:
446 for field in model._model_meta.local_fields:
447 if field.db_comment:
448 field_db_params = field.db_parameters(
449 connection=self.connection
450 )
451 field_type = field_db_params["type"]
452 self.execute(
453 *self._alter_column_comment_sql(
454 model, field, field_type, field.db_comment
455 )
456 )
457 # Add any field index (deferred as SQLite _remake_table needs it).
458 self.deferred_sql.extend(self._model_indexes_sql(model))
459
460 def delete_model(self, model: type[Model]) -> None:
461 """Delete a model from the database."""
462
463 # Delete the table
464 self.execute(
465 self.sql_delete_table
466 % {
467 "table": self.quote_name(model.model_options.db_table),
468 }
469 )
470 # Remove all deferred statements referencing the deleted table.
471 for sql in list(self.deferred_sql):
472 if isinstance(sql, Statement) and sql.references_table(
473 model.model_options.db_table
474 ):
475 self.deferred_sql.remove(sql)
476
477 def add_index(self, model: type[Model], index: Index) -> None:
478 """Add an index on a model."""
479 if (
480 index.contains_expressions
481 and not self.connection.features.supports_expression_indexes
482 ):
483 return None
484 # Index.create_sql returns interpolated SQL which makes params=None a
485 # necessity to avoid escaping attempts on execution.
486 self.execute(index.create_sql(model, self), params=None)
487
488 def remove_index(self, model: type[Model], index: Index) -> None:
489 """Remove an index from a model."""
490 if (
491 index.contains_expressions
492 and not self.connection.features.supports_expression_indexes
493 ):
494 return None
495 self.execute(index.remove_sql(model, self))
496
497 def rename_index(
498 self, model: type[Model], old_index: Index, new_index: Index
499 ) -> None:
500 if self.connection.features.can_rename_index:
501 self.execute(
502 self._rename_index_sql(model, old_index.name, new_index.name),
503 params=None,
504 )
505 else:
506 self.remove_index(model, old_index)
507 self.add_index(model, new_index)
508
509 def add_constraint(self, model: type[Model], constraint: BaseConstraint) -> None:
510 """Add a constraint to a model."""
511 sql = constraint.create_sql(model, self)
512 if sql:
513 # Constraint.create_sql returns interpolated SQL which makes
514 # params=None a necessity to avoid escaping attempts on execution.
515 self.execute(sql, params=None)
516
517 def remove_constraint(self, model: type[Model], constraint: BaseConstraint) -> None:
518 """Remove a constraint from a model."""
519 sql = constraint.remove_sql(model, self)
520 if sql:
521 self.execute(sql)
522
523 def alter_db_table(
524 self, model: type[Model], old_db_table: str, new_db_table: str
525 ) -> None:
526 """Rename the table a model points to."""
527 if old_db_table == new_db_table or (
528 self.connection.features.ignores_table_name_case
529 and old_db_table.lower() == new_db_table.lower()
530 ):
531 return
532 self.execute(
533 self.sql_rename_table
534 % {
535 "old_table": self.quote_name(old_db_table),
536 "new_table": self.quote_name(new_db_table),
537 }
538 )
539 # Rename all references to the old table name.
540 for sql in self.deferred_sql:
541 if isinstance(sql, Statement):
542 sql.rename_table_references(old_db_table, new_db_table)
543
544 def alter_db_table_comment(
545 self,
546 model: type[Model],
547 old_db_table_comment: str | None,
548 new_db_table_comment: str | None,
549 ) -> None:
550 self.execute(
551 self.sql_alter_table_comment
552 % {
553 "table": self.quote_name(model.model_options.db_table),
554 "comment": self.quote_value(new_db_table_comment or ""),
555 }
556 )
557
558 def add_field(self, model: type[Model], field: Field) -> None:
559 """
560 Create a field on a model. Usually involves adding a column, but may
561 involve adding a table instead (for M2M fields).
562 """
563 # Get the column's definition
564 definition, params = self.column_sql(model, field, include_default=True)
565 # It might not actually have a column behind it
566 if definition is None:
567 return
568 if col_type_suffix := field.db_type_suffix(connection=self.connection):
569 definition += f" {col_type_suffix}"
570 # Check constraints can go on the column SQL here
571 db_params = field.db_parameters(connection=self.connection)
572 if db_params["check"]:
573 definition += " " + self.sql_check_constraint % db_params
574 if (
575 field.remote_field
576 and self.connection.features.supports_foreign_keys
577 and field.db_constraint # type: ignore[attr-defined]
578 ):
579 constraint_suffix = "_fk_%(to_table)s_%(to_column)s"
580 # Add FK constraint inline, if supported.
581 if self.sql_create_column_inline_fk:
582 to_table = field.remote_field.model.model_options.db_table
583 to_column = field.remote_field.model._model_meta.get_field(
584 field.remote_field.field_name
585 ).column
586 namespace, _ = split_identifier(model.model_options.db_table)
587 definition += " " + self.sql_create_column_inline_fk % {
588 "name": self._fk_constraint_name(model, field, constraint_suffix),
589 "namespace": f"{self.quote_name(namespace)}." if namespace else "",
590 "column": self.quote_name(field.column),
591 "to_table": self.quote_name(to_table),
592 "to_column": self.quote_name(to_column),
593 "deferrable": self.connection.ops.deferrable_sql(),
594 }
595 # Otherwise, add FK constraints later.
596 else:
597 self.deferred_sql.append(
598 self._create_fk_sql(model, field, constraint_suffix)
599 )
600 # Build the SQL and run it
601 sql = self.sql_create_column % {
602 "table": self.quote_name(model.model_options.db_table),
603 "column": self.quote_name(field.column),
604 "definition": definition,
605 }
606 self.execute(sql, params)
607 # Drop the default if we need to
608 # (Plain usually does not use in-database defaults)
609 if (
610 not self.skip_default_on_alter(field)
611 and self.effective_default(field) is not None
612 ):
613 changes_sql, params = self._alter_column_default_sql(
614 model, None, field, drop=True
615 )
616 sql = self.sql_alter_column % {
617 "table": self.quote_name(model.model_options.db_table),
618 "changes": changes_sql,
619 }
620 self.execute(sql, params)
621 # Add field comment, if required.
622 if (
623 field.db_comment
624 and self.connection.features.supports_comments
625 and not self.connection.features.supports_comments_inline
626 ):
627 field_type = db_params["type"]
628 self.execute(
629 *self._alter_column_comment_sql(
630 model, field, field_type, field.db_comment
631 )
632 )
633 # Add an index, if required
634 self.deferred_sql.extend(self._field_indexes_sql(model, field))
635 # Reset connection if required
636 if self.connection.features.connection_persists_old_columns:
637 self.connection.close()
638
639 def remove_field(self, model: type[Model], field: Field) -> None:
640 """
641 Remove a field from a model. Usually involves deleting a column,
642 but for M2Ms may involve deleting a table.
643 """
644 # It might not actually have a column behind it
645 if field.db_parameters(connection=self.connection)["type"] is None:
646 return
647 # Drop any FK constraints, MySQL requires explicit deletion
648 if field.remote_field:
649 fk_names = self._constraint_names(model, [field.column], foreign_key=True)
650 for fk_name in fk_names:
651 self.execute(self._delete_fk_sql(model, fk_name))
652 # Delete the column
653 sql = self.sql_delete_column % {
654 "table": self.quote_name(model.model_options.db_table),
655 "column": self.quote_name(field.column),
656 }
657 self.execute(sql)
658 # Reset connection if required
659 if self.connection.features.connection_persists_old_columns:
660 self.connection.close()
661 # Remove all deferred statements referencing the deleted column.
662 for sql in list(self.deferred_sql):
663 if isinstance(sql, Statement) and sql.references_column(
664 model.model_options.db_table, field.column
665 ):
666 self.deferred_sql.remove(sql)
667
668 def alter_field(
669 self,
670 model: type[Model],
671 old_field: Field,
672 new_field: Field,
673 strict: bool = False,
674 ) -> None:
675 """
676 Allow a field's type, uniqueness, nullability, default, column,
677 constraints, etc. to be modified.
678 `old_field` is required to compute the necessary changes.
679 If `strict` is True, raise errors if the old column does not match
680 `old_field` precisely.
681 """
682 if not self._field_should_be_altered(old_field, new_field):
683 return
684 # Ensure this field is even column-based
685 old_db_params = old_field.db_parameters(connection=self.connection)
686 old_type = old_db_params["type"]
687 new_db_params = new_field.db_parameters(connection=self.connection)
688 new_type = new_db_params["type"]
689 if (old_type is None and old_field.remote_field is None) or (
690 new_type is None and new_field.remote_field is None
691 ):
692 raise ValueError(
693 f"Cannot alter field {old_field} into {new_field} - they do not properly define "
694 "db_type (are you using a badly-written custom field?)",
695 )
696 elif (
697 old_type is None
698 and new_type is None
699 and (old_field.remote_field.through and new_field.remote_field.through) # type: ignore[attr-defined]
700 ):
701 # Both sides have through models; this is a no-op.
702 return
703 elif old_type is None or new_type is None:
704 raise ValueError(
705 f"Cannot alter field {old_field} into {new_field} - they are not compatible types "
706 "(you cannot alter to or from M2M fields, or add or remove "
707 "through= on M2M fields)"
708 )
709
710 self._alter_field(
711 model,
712 old_field,
713 new_field,
714 old_type,
715 new_type,
716 old_db_params,
717 new_db_params,
718 strict,
719 )
720
721 def _field_db_check(
722 self, field: Field, field_db_params: dict[str, Any]
723 ) -> str | None:
724 # Always check constraints with the same mocked column name to avoid
725 # recreating constrains when the column is renamed.
726 check_constraints = self.connection.data_type_check_constraints
727 data = field.db_type_parameters(self.connection)
728 data["column"] = "__column_name__"
729 try:
730 return check_constraints[field.get_internal_type()] % data
731 except KeyError:
732 return None
733
734 def _alter_field(
735 self,
736 model: type[Model],
737 old_field: Field,
738 new_field: Field,
739 old_type: str,
740 new_type: str,
741 old_db_params: dict[str, Any],
742 new_db_params: dict[str, Any],
743 strict: bool = False,
744 ) -> None:
745 """Perform a "physical" (non-ManyToMany) field update."""
746 # Drop any FK constraints, we'll remake them later
747 fks_dropped = set()
748 if (
749 self.connection.features.supports_foreign_keys
750 and old_field.remote_field
751 and old_field.db_constraint # type: ignore[attr-defined]
752 and self._field_should_be_altered(
753 old_field,
754 new_field,
755 ignore={"db_comment"},
756 )
757 ):
758 fk_names = self._constraint_names(
759 model, [old_field.column], foreign_key=True
760 )
761 if strict and len(fk_names) != 1:
762 raise ValueError(
763 f"Found wrong number ({len(fk_names)}) of foreign key constraints for {model.model_options.db_table}.{old_field.column}"
764 )
765 for fk_name in fk_names:
766 fks_dropped.add((old_field.column,))
767 self.execute(self._delete_fk_sql(model, fk_name))
768 # Has unique been removed?
769 if old_field.primary_key and (
770 not new_field.primary_key
771 or self._field_became_primary_key(old_field, new_field)
772 ):
773 # Find the unique constraint for this field
774 meta_constraint_names = {
775 constraint.name for constraint in model.model_options.constraints
776 }
777 constraint_names = self._constraint_names(
778 model,
779 [old_field.column],
780 unique=True,
781 primary_key=False,
782 exclude=meta_constraint_names,
783 )
784 if strict and len(constraint_names) != 1:
785 raise ValueError(
786 f"Found wrong number ({len(constraint_names)}) of unique constraints for {model.model_options.db_table}.{old_field.column}"
787 )
788 for constraint_name in constraint_names:
789 self.execute(self._delete_unique_sql(model, constraint_name))
790 # Drop incoming FK constraints if the field is a primary key or unique,
791 # which might be a to_field target, and things are going to change.
792 old_collation = old_db_params.get("collation")
793 new_collation = new_db_params.get("collation")
794 drop_foreign_keys = (
795 self.connection.features.supports_foreign_keys
796 and (old_field.primary_key and new_field.primary_key)
797 and ((old_type != new_type) or (old_collation != new_collation))
798 )
799 if drop_foreign_keys:
800 # '_model_meta.related_field' also contains M2M reverse fields, these
801 # will be filtered out
802 for _old_rel, new_rel in _related_non_m2m_objects(old_field, new_field):
803 rel_fk_names = self._constraint_names(
804 new_rel.related_model, [new_rel.field.column], foreign_key=True
805 )
806 for fk_name in rel_fk_names:
807 self.execute(self._delete_fk_sql(new_rel.related_model, fk_name))
808 # Removed an index? (no strict check, as multiple indexes are possible)
809 # Remove indexes if db_index switched to False or a unique constraint
810 # will now be used in lieu of an index. The following lines from the
811 # truth table show all True cases; the rest are False:
812 #
813 # old_field.db_index | old_field.primary_key | new_field.db_index | new_field.primary_key
814 # ------------------------------------------------------------------------------
815 # True | False | False | False
816 # True | False | False | True
817 # True | False | True | True
818 if (
819 (old_field.remote_field and old_field.db_index) # type: ignore[attr-defined]
820 and not old_field.primary_key
821 and (
822 not (new_field.remote_field and new_field.db_index) # type: ignore[attr-defined]
823 or new_field.primary_key
824 )
825 ):
826 # Find the index for this field
827 meta_index_names = {index.name for index in model.model_options.indexes}
828 # Retrieve only BTREE indexes since this is what's created with
829 # db_index=True.
830 index_names = self._constraint_names(
831 model,
832 [old_field.column],
833 index=True,
834 type_=Index.suffix,
835 exclude=meta_index_names,
836 )
837 for index_name in index_names:
838 # The only way to check if an index was created with
839 # db_index=True or with Index(['field'], name='foo')
840 # is to look at its name (refs #28053).
841 self.execute(self._delete_index_sql(model, index_name))
842 # Change check constraints?
843 old_db_check = self._field_db_check(old_field, old_db_params)
844 new_db_check = self._field_db_check(new_field, new_db_params)
845 if old_db_check != new_db_check and old_db_check:
846 meta_constraint_names = {
847 constraint.name for constraint in model.model_options.constraints
848 }
849 constraint_names = self._constraint_names(
850 model,
851 [old_field.column],
852 check=True,
853 exclude=meta_constraint_names,
854 )
855 if strict and len(constraint_names) != 1:
856 raise ValueError(
857 f"Found wrong number ({len(constraint_names)}) of check constraints for {model.model_options.db_table}.{old_field.column}"
858 )
859 for constraint_name in constraint_names:
860 self.execute(self._delete_check_sql(model, constraint_name))
861 # Have they renamed the column?
862 if old_field.column != new_field.column:
863 self.execute(
864 self._rename_field_sql(
865 model.model_options.db_table, old_field, new_field, new_type
866 )
867 )
868 # Rename all references to the renamed column.
869 for sql in self.deferred_sql:
870 if isinstance(sql, Statement):
871 sql.rename_column_references(
872 model.model_options.db_table, old_field.column, new_field.column
873 )
874 # Next, start accumulating actions to do
875 actions = []
876 null_actions = []
877 post_actions = []
878 # Type suffix change? (e.g. auto increment).
879 old_type_suffix = old_field.db_type_suffix(connection=self.connection)
880 new_type_suffix = new_field.db_type_suffix(connection=self.connection)
881 # Type, collation, or comment change?
882 if (
883 old_type != new_type
884 or old_type_suffix != new_type_suffix
885 or old_collation != new_collation
886 or (
887 self.connection.features.supports_comments
888 and old_field.db_comment != new_field.db_comment
889 )
890 ):
891 fragment, other_actions = self._alter_column_type_sql(
892 model, old_field, new_field, new_type, old_collation, new_collation
893 )
894 actions.append(fragment)
895 post_actions.extend(other_actions)
896 # When changing a column NULL constraint to NOT NULL with a given
897 # default value, we need to perform 4 steps:
898 # 1. Add a default for new incoming writes
899 # 2. Update existing NULL rows with new default
900 # 3. Replace NULL constraint with NOT NULL
901 # 4. Drop the default again.
902 # Default change?
903 needs_database_default = False
904 if old_field.allow_null and not new_field.allow_null:
905 old_default = self.effective_default(old_field)
906 new_default = self.effective_default(new_field)
907 if (
908 not self.skip_default_on_alter(new_field)
909 and old_default != new_default
910 and new_default is not None
911 ):
912 needs_database_default = True
913 actions.append(
914 self._alter_column_default_sql(model, old_field, new_field)
915 )
916 # Nullability change?
917 if old_field.allow_null != new_field.allow_null:
918 fragment = self._alter_column_null_sql(model, old_field, new_field)
919 if fragment:
920 null_actions.append(fragment)
921 # Only if we have a default and there is a change from NULL to NOT NULL
922 four_way_default_alteration = new_field.has_default() and (
923 old_field.allow_null and not new_field.allow_null
924 )
925 if actions or null_actions:
926 if not four_way_default_alteration:
927 # If we don't have to do a 4-way default alteration we can
928 # directly run a (NOT) NULL alteration
929 actions += null_actions
930 # Combine actions together if we can (e.g. postgres)
931 if self.connection.features.supports_combined_alters and actions:
932 sql, params = tuple(zip(*actions))
933 actions = [(", ".join(sql), sum(params, []))]
934 # Apply those actions
935 for sql, params in actions:
936 self.execute(
937 self.sql_alter_column
938 % {
939 "table": self.quote_name(model.model_options.db_table),
940 "changes": sql,
941 },
942 params,
943 )
944 if four_way_default_alteration:
945 # Update existing rows with default value
946 self.execute(
947 self.sql_update_with_default
948 % {
949 "table": self.quote_name(model.model_options.db_table),
950 "column": self.quote_name(new_field.column),
951 "default": "%s",
952 },
953 [new_default],
954 )
955 # Since we didn't run a NOT NULL change before we need to do it
956 # now
957 for sql, params in null_actions:
958 self.execute(
959 self.sql_alter_column
960 % {
961 "table": self.quote_name(model.model_options.db_table),
962 "changes": sql,
963 },
964 params,
965 )
966 if post_actions:
967 for sql, params in post_actions:
968 self.execute(sql, params)
969 # If primary_key changed to False, delete the primary key constraint.
970 if old_field.primary_key and not new_field.primary_key:
971 self._delete_primary_key(model, strict)
972
973 # Added an index? Add an index if db_index switched to True or a unique
974 # constraint will no longer be used in lieu of an index. The following
975 # lines from the truth table show all True cases; the rest are False:
976 #
977 # old_field.db_index | old_field.primary_key | new_field.db_index | new_field.primary_key
978 # ------------------------------------------------------------------------------
979 # False | False | True | False
980 # False | True | True | False
981 # True | True | True | False
982 if (
983 (
984 not (old_field.remote_field and old_field.db_index) # type: ignore[attr-defined]
985 or old_field.primary_key
986 )
987 and (new_field.remote_field and new_field.db_index) # type: ignore[attr-defined]
988 and not new_field.primary_key
989 ):
990 self.execute(self._create_index_sql(model, fields=[new_field]))
991 # Type alteration on primary key? Then we need to alter the column
992 # referring to us.
993 rels_to_update = []
994 if drop_foreign_keys:
995 rels_to_update.extend(_related_non_m2m_objects(old_field, new_field))
996 # Changed to become primary key?
997 if self._field_became_primary_key(old_field, new_field):
998 # Make the new one
999 self.execute(self._create_primary_key_sql(model, new_field))
1000 # Update all referencing columns
1001 rels_to_update.extend(_related_non_m2m_objects(old_field, new_field))
1002 # Handle our type alters on the other end of rels from the PK stuff above
1003 for old_rel, new_rel in rels_to_update:
1004 rel_db_params = new_rel.field.db_parameters(connection=self.connection)
1005 rel_type = rel_db_params["type"]
1006 rel_collation = rel_db_params.get("collation")
1007 old_rel_db_params = old_rel.field.db_parameters(connection=self.connection)
1008 old_rel_collation = old_rel_db_params.get("collation")
1009 fragment, other_actions = self._alter_column_type_sql(
1010 new_rel.related_model,
1011 old_rel.field,
1012 new_rel.field,
1013 rel_type,
1014 old_rel_collation,
1015 rel_collation,
1016 )
1017 self.execute(
1018 self.sql_alter_column
1019 % {
1020 "table": self.quote_name(
1021 new_rel.related_model.model_options.db_table
1022 ),
1023 "changes": fragment[0],
1024 },
1025 fragment[1],
1026 )
1027 for sql, params in other_actions:
1028 self.execute(sql, params)
1029 # Does it have a foreign key?
1030 if (
1031 self.connection.features.supports_foreign_keys
1032 and new_field.remote_field
1033 and (
1034 fks_dropped or not old_field.remote_field or not old_field.db_constraint # type: ignore[attr-defined]
1035 )
1036 and new_field.db_constraint # type: ignore[attr-defined]
1037 ):
1038 self.execute(
1039 self._create_fk_sql(model, new_field, "_fk_%(to_table)s_%(to_column)s")
1040 )
1041 # Rebuild FKs that pointed to us if we previously had to drop them
1042 if drop_foreign_keys:
1043 for _, rel in rels_to_update:
1044 if rel.field.db_constraint:
1045 self.execute(
1046 self._create_fk_sql(rel.related_model, rel.field, "_fk")
1047 )
1048 # Does it have check constraints we need to add?
1049 if old_db_check != new_db_check and new_db_check:
1050 constraint_name = self._create_index_name(
1051 model.model_options.db_table, [new_field.column], suffix="_check"
1052 )
1053 self.execute(
1054 self._create_check_sql(model, constraint_name, new_db_params["check"])
1055 )
1056 # Drop the default if we need to
1057 # (Plain usually does not use in-database defaults)
1058 if needs_database_default:
1059 changes_sql, params = self._alter_column_default_sql(
1060 model, old_field, new_field, drop=True
1061 )
1062 sql = self.sql_alter_column % {
1063 "table": self.quote_name(model.model_options.db_table),
1064 "changes": changes_sql,
1065 }
1066 self.execute(sql, params)
1067 # Reset connection if required
1068 if self.connection.features.connection_persists_old_columns:
1069 self.connection.close()
1070
1071 def _alter_column_null_sql(
1072 self, model: type[Model], old_field: Field, new_field: Field
1073 ) -> tuple[str, list[Any]]:
1074 """
1075 Hook to specialize column null alteration.
1076
1077 Return a (sql, params) fragment to set a column to null or non-null
1078 as required by new_field, or None if no changes are required.
1079 """
1080 new_db_params = new_field.db_parameters(connection=self.connection)
1081 sql = (
1082 self.sql_alter_column_null
1083 if new_field.allow_null
1084 else self.sql_alter_column_not_null
1085 )
1086 return (
1087 sql
1088 % {
1089 "column": self.quote_name(new_field.column),
1090 "type": new_db_params["type"],
1091 },
1092 [],
1093 )
1094
1095 def _alter_column_default_sql(
1096 self,
1097 model: type[Model],
1098 old_field: Field | None,
1099 new_field: Field,
1100 drop: bool = False,
1101 ) -> tuple[str, list[Any]]:
1102 """
1103 Hook to specialize column default alteration.
1104
1105 Return a (sql, params) fragment to add or drop (depending on the drop
1106 argument) a default to new_field's column.
1107 """
1108 new_default = self.effective_default(new_field)
1109 default = self._column_default_sql(new_field)
1110 params = [new_default]
1111
1112 if drop:
1113 params = []
1114 elif self.connection.features.requires_literal_defaults:
1115 # Some databases (Oracle) can't take defaults as a parameter
1116 # If this is the case, the SchemaEditor for that database should
1117 # implement prepare_default().
1118 default = self.prepare_default(new_default)
1119 params = []
1120
1121 new_db_params = new_field.db_parameters(connection=self.connection)
1122 if drop:
1123 if new_field.allow_null:
1124 sql = self.sql_alter_column_no_default_null
1125 else:
1126 sql = self.sql_alter_column_no_default
1127 else:
1128 sql = self.sql_alter_column_default
1129 return (
1130 sql
1131 % {
1132 "column": self.quote_name(new_field.column),
1133 "type": new_db_params["type"],
1134 "default": default,
1135 },
1136 params,
1137 )
1138
1139 def _alter_column_type_sql(
1140 self,
1141 model: type[Model],
1142 old_field: Field,
1143 new_field: Field,
1144 new_type: str,
1145 old_collation: str | None,
1146 new_collation: str | None,
1147 ) -> tuple[tuple[str, list[Any]], list[tuple[str, list[Any]]]]:
1148 """
1149 Hook to specialize column type alteration for different backends,
1150 for cases when a creation type is different to an alteration type
1151 (e.g. SERIAL in PostgreSQL, PostGIS fields).
1152
1153 Return a two-tuple of: an SQL fragment of (sql, params) to insert into
1154 an ALTER TABLE statement and a list of extra (sql, params) tuples to
1155 run once the field is altered.
1156 """
1157 other_actions = []
1158 if collate_sql := self._collate_sql(
1159 new_collation, old_collation, model.model_options.db_table
1160 ):
1161 collate_sql = f" {collate_sql}"
1162 else:
1163 collate_sql = ""
1164 # Comment change?
1165 comment_sql = ""
1166 if self.connection.features.supports_comments and not new_field.many_to_many:
1167 if old_field.db_comment != new_field.db_comment:
1168 # PostgreSQL and Oracle can't execute 'ALTER COLUMN ...' and
1169 # 'COMMENT ON ...' at the same time.
1170 sql, params = self._alter_column_comment_sql(
1171 model, new_field, new_type, new_field.db_comment
1172 )
1173 if sql:
1174 other_actions.append((sql, params))
1175 if new_field.db_comment:
1176 comment_sql = self._comment_sql(new_field.db_comment)
1177 return (
1178 (
1179 self.sql_alter_column_type
1180 % {
1181 "column": self.quote_name(new_field.column),
1182 "type": new_type,
1183 "collation": collate_sql,
1184 "comment": comment_sql,
1185 },
1186 [],
1187 ),
1188 other_actions,
1189 )
1190
1191 def _alter_column_comment_sql(
1192 self,
1193 model: type[Model],
1194 new_field: Field,
1195 new_type: str,
1196 new_db_comment: str | None,
1197 ) -> tuple[str, list[Any]]:
1198 return (
1199 self.sql_alter_column_comment
1200 % {
1201 "table": self.quote_name(model.model_options.db_table),
1202 "column": self.quote_name(new_field.column),
1203 "comment": self._comment_sql(new_db_comment),
1204 },
1205 [],
1206 )
1207
1208 def _comment_sql(self, comment: str | None) -> str:
1209 return self.quote_value(comment or "")
1210
1211 def _alter_many_to_many(
1212 self,
1213 model: type[Model],
1214 old_field: ManyToManyField,
1215 new_field: ManyToManyField,
1216 strict: bool,
1217 ) -> None:
1218 """Alter M2Ms to repoint their to= endpoints."""
1219 # Type narrow for ManyToManyField.remote_field
1220 old_rel: ManyToManyRel = old_field.remote_field # type: ignore[assignment]
1221 new_rel: ManyToManyRel = new_field.remote_field # type: ignore[assignment]
1222
1223 # Rename the through table
1224 if (
1225 old_rel.through.model_options.db_table
1226 != new_rel.through.model_options.db_table
1227 ):
1228 self.alter_db_table(
1229 old_rel.through,
1230 old_rel.through.model_options.db_table,
1231 new_rel.through.model_options.db_table,
1232 )
1233 # Repoint the FK to the other side
1234 self.alter_field(
1235 new_rel.through,
1236 # The field that points to the target model is needed, so we can
1237 # tell alter_field to change it - this is m2m_reverse_field_name()
1238 # (as opposed to m2m_field_name(), which points to our model).
1239 old_rel.through._model_meta.get_field(
1240 old_field.m2m_reverse_field_name() # type: ignore[attr-defined]
1241 ),
1242 new_rel.through._model_meta.get_field(
1243 new_field.m2m_reverse_field_name() # type: ignore[attr-defined]
1244 ),
1245 )
1246 self.alter_field(
1247 new_rel.through,
1248 # for self-referential models we need to alter field from the other end too
1249 old_rel.through._model_meta.get_field(old_field.m2m_field_name()),
1250 new_rel.through._model_meta.get_field(new_field.m2m_field_name()),
1251 )
1252
1253 def _create_index_name(
1254 self, table_name: str, column_names: list[str], suffix: str = ""
1255 ) -> str:
1256 """
1257 Generate a unique name for an index/unique constraint.
1258
1259 The name is divided into 3 parts: the table name, the column names,
1260 and a unique digest and suffix.
1261 """
1262 _, table_name = split_identifier(table_name)
1263 hash_suffix_part = (
1264 f"{names_digest(table_name, *column_names, length=8)}{suffix}"
1265 )
1266 max_length = self.connection.ops.max_name_length() or 200
1267 # If everything fits into max_length, use that name.
1268 index_name = "{}_{}_{}".format(
1269 table_name, "_".join(column_names), hash_suffix_part
1270 )
1271 if len(index_name) <= max_length:
1272 return index_name
1273 # Shorten a long suffix.
1274 if len(hash_suffix_part) > max_length / 3:
1275 hash_suffix_part = hash_suffix_part[: max_length // 3]
1276 other_length = (max_length - len(hash_suffix_part)) // 2 - 1
1277 index_name = "{}_{}_{}".format(
1278 table_name[:other_length],
1279 "_".join(column_names)[:other_length],
1280 hash_suffix_part,
1281 )
1282 # Prepend D if needed to prevent the name from starting with an
1283 # underscore or a number (not permitted on Oracle).
1284 if index_name[0] == "_" or index_name[0].isdigit():
1285 index_name = f"D{index_name[:-1]}"
1286 return index_name
1287
1288 def _index_condition_sql(self, condition: str | None) -> str:
1289 if condition:
1290 return " WHERE " + condition
1291 return ""
1292
1293 def _index_include_sql(
1294 self, model: type[Model], columns: list[str] | None
1295 ) -> str | Statement:
1296 if not columns or not self.connection.features.supports_covering_indexes:
1297 return ""
1298 return Statement(
1299 " INCLUDE (%(columns)s)",
1300 columns=Columns(model.model_options.db_table, columns, self.quote_name),
1301 )
1302
1303 def _create_index_sql(
1304 self,
1305 model: type[Model],
1306 *,
1307 fields: list[Field] | None = None,
1308 name: str | None = None,
1309 suffix: str = "",
1310 using: str = "",
1311 col_suffixes: tuple[str, ...] = (),
1312 sql: str | None = None,
1313 opclasses: tuple[str, ...] = (),
1314 condition: str | None = None,
1315 include: list[str] | None = None,
1316 expressions: Any = None,
1317 ) -> Statement:
1318 """
1319 Return the SQL statement to create the index for one or several fields
1320 or expressions. `sql` can be specified if the syntax differs from the
1321 standard (GIS indexes, ...).
1322 """
1323 fields = fields or []
1324 expressions = expressions or []
1325 compiler = Query(model, alias_cols=False).get_compiler()
1326 columns = [field.column for field in fields]
1327 sql_create_index = sql or self.sql_create_index
1328 table = model.model_options.db_table
1329
1330 def create_index_name(*args: Any, **kwargs: Any) -> str:
1331 nonlocal name
1332 if name is None:
1333 name = self._create_index_name(*args, **kwargs)
1334 return self.quote_name(name)
1335
1336 return Statement(
1337 sql_create_index,
1338 table=Table(table, self.quote_name),
1339 name=IndexName(table, columns, suffix, create_index_name),
1340 using=using,
1341 columns=(
1342 self._index_columns(table, columns, col_suffixes, opclasses)
1343 if columns
1344 else Expressions(table, expressions, compiler, self.quote_value)
1345 ),
1346 extra="",
1347 condition=self._index_condition_sql(condition),
1348 include=self._index_include_sql(model, include),
1349 )
1350
1351 def _delete_index_sql(
1352 self, model: type[Model], name: str, sql: str | None = None
1353 ) -> Statement:
1354 return Statement(
1355 sql or self.sql_delete_index,
1356 table=Table(model.model_options.db_table, self.quote_name),
1357 name=self.quote_name(name),
1358 )
1359
1360 def _rename_index_sql(
1361 self, model: type[Model], old_name: str, new_name: str
1362 ) -> Statement:
1363 return Statement(
1364 self.sql_rename_index,
1365 table=Table(model.model_options.db_table, self.quote_name),
1366 old_name=self.quote_name(old_name),
1367 new_name=self.quote_name(new_name),
1368 )
1369
1370 def _index_columns(
1371 self,
1372 table: str,
1373 columns: list[str],
1374 col_suffixes: tuple[str, ...],
1375 opclasses: tuple[str, ...],
1376 ) -> Columns:
1377 return Columns(table, columns, self.quote_name, col_suffixes=col_suffixes)
1378
1379 def _model_indexes_sql(self, model: type[Model]) -> list[Statement | None]:
1380 """
1381 Return a list of all index SQL statements (field indexes, Meta.indexes) for the specified model.
1382 """
1383 output: list[Statement | None] = []
1384 for field in model._model_meta.local_fields:
1385 output.extend(self._field_indexes_sql(model, field))
1386
1387 for index in model.model_options.indexes:
1388 if (
1389 not index.contains_expressions
1390 or self.connection.features.supports_expression_indexes
1391 ):
1392 output.append(index.create_sql(model, self))
1393 return output
1394
1395 def _field_indexes_sql(self, model: type[Model], field: Field) -> list[Statement]:
1396 """
1397 Return a list of all index SQL statements for the specified field.
1398 """
1399 output: list[Statement] = []
1400 if self._field_should_be_indexed(model, field):
1401 output.append(self._create_index_sql(model, fields=[field]))
1402 return output
1403
1404 def _field_should_be_altered(
1405 self, old_field: Field, new_field: Field, ignore: set[str] | None = None
1406 ) -> bool:
1407 ignore = ignore or set()
1408 _, old_path, old_args, old_kwargs = old_field.deconstruct()
1409 _, new_path, new_args, new_kwargs = new_field.deconstruct()
1410 # Don't alter when:
1411 # - changing only a field name
1412 # - changing an attribute that doesn't affect the schema
1413 # - changing an attribute in the provided set of ignored attributes
1414 # - adding only a db_column and the column name is not changed
1415 for attr in ignore.union(old_field.non_db_attrs): # type: ignore[attr-defined]
1416 old_kwargs.pop(attr, None)
1417 for attr in ignore.union(new_field.non_db_attrs): # type: ignore[attr-defined]
1418 new_kwargs.pop(attr, None)
1419 return self.quote_name(old_field.column) != self.quote_name(
1420 new_field.column
1421 ) or (old_path, old_args, old_kwargs) != (new_path, new_args, new_kwargs)
1422
1423 def _field_should_be_indexed(self, model: type[Model], field: Field) -> bool:
1424 return (field.remote_field and field.db_index) and not field.primary_key # type: ignore[attr-defined]
1425
1426 def _field_became_primary_key(self, old_field: Field, new_field: Field) -> bool:
1427 return not old_field.primary_key and new_field.primary_key
1428
1429 def _rename_field_sql(
1430 self, table: str, old_field: Field, new_field: Field, new_type: str
1431 ) -> str:
1432 return self.sql_rename_column % {
1433 "table": self.quote_name(table),
1434 "old_column": self.quote_name(old_field.column),
1435 "new_column": self.quote_name(new_field.column),
1436 "type": new_type,
1437 }
1438
1439 def _create_fk_sql(
1440 self, model: type[Model], field: ForeignKey, suffix: str
1441 ) -> Statement:
1442 table = Table(model.model_options.db_table, self.quote_name)
1443 name = self._fk_constraint_name(model, field, suffix)
1444 column = Columns(model.model_options.db_table, [field.column], self.quote_name)
1445 to_table = Table(
1446 field.target_field.model.model_options.db_table, self.quote_name
1447 )
1448 to_column = Columns(
1449 field.target_field.model.model_options.db_table,
1450 [field.target_field.column], # type: ignore[attr-defined]
1451 self.quote_name,
1452 )
1453 deferrable = self.connection.ops.deferrable_sql()
1454 return Statement(
1455 self.sql_create_fk,
1456 table=table,
1457 name=name,
1458 column=column,
1459 to_table=to_table,
1460 to_column=to_column,
1461 deferrable=deferrable,
1462 )
1463
1464 def _fk_constraint_name(
1465 self, model: type[Model], field: ForeignKey, suffix: str
1466 ) -> ForeignKeyName:
1467 def create_fk_name(*args: Any, **kwargs: Any) -> str:
1468 return self.quote_name(self._create_index_name(*args, **kwargs))
1469
1470 return ForeignKeyName(
1471 model.model_options.db_table,
1472 [field.column],
1473 split_identifier(field.target_field.model.model_options.db_table)[1],
1474 [field.target_field.column], # type: ignore[attr-defined]
1475 suffix,
1476 create_fk_name,
1477 )
1478
1479 def _delete_fk_sql(self, model: type[Model], name: str) -> Statement:
1480 return self._delete_constraint_sql(self.sql_delete_fk, model, name)
1481
1482 def _deferrable_constraint_sql(self, deferrable: Deferrable | None) -> str:
1483 if deferrable is None:
1484 return ""
1485 if deferrable == Deferrable.DEFERRED:
1486 return " DEFERRABLE INITIALLY DEFERRED"
1487 if deferrable == Deferrable.IMMEDIATE:
1488 return " DEFERRABLE INITIALLY IMMEDIATE"
1489 return ""
1490
1491 def _unique_sql(
1492 self,
1493 model: type[Model],
1494 fields: Iterable[Field],
1495 name: str,
1496 condition: str | None = None,
1497 deferrable: Deferrable | None = None,
1498 include: list[str] | None = None,
1499 opclasses: tuple[str, ...] | None = None,
1500 expressions: Any = None,
1501 ) -> str | None:
1502 if (
1503 deferrable
1504 and not self.connection.features.supports_deferrable_unique_constraints
1505 ):
1506 return None
1507 if condition or include or opclasses or expressions:
1508 # Databases support conditional, covering, and functional unique
1509 # constraints via a unique index.
1510 sql = self._create_unique_sql(
1511 model,
1512 fields,
1513 name=name,
1514 condition=condition,
1515 include=include,
1516 opclasses=opclasses,
1517 expressions=expressions,
1518 )
1519 if sql:
1520 self.deferred_sql.append(sql)
1521 return None
1522 constraint = self.sql_unique_constraint % {
1523 "columns": ", ".join([self.quote_name(field.column) for field in fields]),
1524 "deferrable": self._deferrable_constraint_sql(deferrable),
1525 }
1526 return self.sql_constraint % {
1527 "name": self.quote_name(name),
1528 "constraint": constraint,
1529 }
1530
1531 def _create_unique_sql(
1532 self,
1533 model: type[Model],
1534 fields: Iterable[Field],
1535 name: str | None = None,
1536 condition: str | None = None,
1537 deferrable: Deferrable | None = None,
1538 include: list[str] | None = None,
1539 opclasses: tuple[str, ...] | None = None,
1540 expressions: Any = None,
1541 ) -> Statement | None:
1542 if (
1543 (
1544 deferrable
1545 and not self.connection.features.supports_deferrable_unique_constraints
1546 )
1547 or (condition and not self.connection.features.supports_partial_indexes)
1548 or (include and not self.connection.features.supports_covering_indexes)
1549 or (
1550 expressions and not self.connection.features.supports_expression_indexes
1551 )
1552 ):
1553 return None
1554
1555 compiler = Query(model, alias_cols=False).get_compiler()
1556 table = model.model_options.db_table
1557 columns = [field.column for field in fields]
1558 if name is None:
1559 name = self._unique_constraint_name(table, columns, quote=True)
1560 else:
1561 name = self.quote_name(name)
1562 if condition or include or opclasses or expressions:
1563 sql = self.sql_create_unique_index
1564 else:
1565 sql = self.sql_create_unique
1566 if columns:
1567 columns_obj: Columns | Expressions = self._index_columns(
1568 table, columns, col_suffixes=(), opclasses=opclasses
1569 )
1570 else:
1571 columns_obj = Expressions(table, expressions, compiler, self.quote_value)
1572 return Statement(
1573 sql,
1574 table=Table(table, self.quote_name),
1575 name=name,
1576 columns=columns_obj,
1577 condition=self._index_condition_sql(condition),
1578 deferrable=self._deferrable_constraint_sql(deferrable),
1579 include=self._index_include_sql(model, include),
1580 )
1581
1582 def _unique_constraint_name(
1583 self, table: str, columns: list[str], quote: bool = True
1584 ) -> IndexName | str:
1585 if quote:
1586
1587 def create_unique_name(*args: Any, **kwargs: Any) -> str:
1588 return self.quote_name(self._create_index_name(*args, **kwargs))
1589
1590 else:
1591 create_unique_name = self._create_index_name
1592
1593 return IndexName(table, columns, "_uniq", create_unique_name)
1594
1595 def _delete_unique_sql(
1596 self,
1597 model: type[Model],
1598 name: str,
1599 condition: str | None = None,
1600 deferrable: Deferrable | None = None,
1601 include: list[str] | None = None,
1602 opclasses: tuple[str, ...] | None = None,
1603 expressions: Any = None,
1604 ) -> Statement | None:
1605 if (
1606 (
1607 deferrable
1608 and not self.connection.features.supports_deferrable_unique_constraints
1609 )
1610 or (condition and not self.connection.features.supports_partial_indexes)
1611 or (include and not self.connection.features.supports_covering_indexes)
1612 or (
1613 expressions and not self.connection.features.supports_expression_indexes
1614 )
1615 ):
1616 return None
1617 if condition or include or opclasses or expressions:
1618 sql = self.sql_delete_index
1619 else:
1620 sql = self.sql_delete_unique
1621 return self._delete_constraint_sql(sql, model, name)
1622
1623 def _check_sql(self, name: str, check: str) -> str:
1624 return self.sql_constraint % {
1625 "name": self.quote_name(name),
1626 "constraint": self.sql_check_constraint % {"check": check},
1627 }
1628
1629 def _create_check_sql(
1630 self, model: type[Model], name: str, check: str
1631 ) -> Statement | None:
1632 if not self.connection.features.supports_table_check_constraints:
1633 return None
1634 return Statement(
1635 self.sql_create_check,
1636 table=Table(model.model_options.db_table, self.quote_name),
1637 name=self.quote_name(name),
1638 check=check,
1639 )
1640
1641 def _delete_check_sql(self, model: type[Model], name: str) -> Statement | None:
1642 if not self.connection.features.supports_table_check_constraints:
1643 return None
1644 return self._delete_constraint_sql(self.sql_delete_check, model, name)
1645
1646 def _delete_constraint_sql(
1647 self, template: str, model: type[Model], name: str
1648 ) -> Statement:
1649 return Statement(
1650 template,
1651 table=Table(model.model_options.db_table, self.quote_name),
1652 name=self.quote_name(name),
1653 )
1654
1655 def _constraint_names(
1656 self,
1657 model: type[Model],
1658 column_names: list[str] | None = None,
1659 unique: bool | None = None,
1660 primary_key: bool | None = None,
1661 index: bool | None = None,
1662 foreign_key: bool | None = None,
1663 check: bool | None = None,
1664 type_: str | None = None,
1665 exclude: set[str] | None = None,
1666 ) -> list[str]:
1667 """Return all constraint names matching the columns and conditions."""
1668 if column_names is not None:
1669 column_names = [
1670 self.connection.introspection.identifier_converter(
1671 truncate_name(name, self.connection.ops.max_name_length())
1672 )
1673 if self.connection.features.truncates_names
1674 else self.connection.introspection.identifier_converter(name)
1675 for name in column_names
1676 ]
1677 with self.connection.cursor() as cursor:
1678 constraints = self.connection.introspection.get_constraints(
1679 cursor, model.model_options.db_table
1680 )
1681 result: list[str] = []
1682 for name, infodict in constraints.items():
1683 if column_names is None or column_names == infodict["columns"]:
1684 if unique is not None and infodict["unique"] != unique:
1685 continue
1686 if primary_key is not None and infodict["primary_key"] != primary_key:
1687 continue
1688 if index is not None and infodict["index"] != index:
1689 continue
1690 if check is not None and infodict["check"] != check:
1691 continue
1692 if foreign_key is not None and not infodict["foreign_key"]:
1693 continue
1694 if type_ is not None and infodict["type"] != type_:
1695 continue
1696 if not exclude or name not in exclude:
1697 result.append(name)
1698 return result
1699
1700 def _delete_primary_key(self, model: type[Model], strict: bool = False) -> None:
1701 constraint_names = self._constraint_names(model, primary_key=True)
1702 if strict and len(constraint_names) != 1:
1703 raise ValueError(
1704 f"Found wrong number ({len(constraint_names)}) of PK constraints for {model.model_options.db_table}"
1705 )
1706 for constraint_name in constraint_names:
1707 self.execute(self._delete_primary_key_sql(model, constraint_name))
1708
1709 def _create_primary_key_sql(self, model: type[Model], field: Field) -> Statement:
1710 return Statement(
1711 self.sql_create_pk,
1712 table=Table(model.model_options.db_table, self.quote_name),
1713 name=self.quote_name(
1714 self._create_index_name(
1715 model.model_options.db_table, [field.column], suffix="_pk"
1716 )
1717 ),
1718 columns=Columns(
1719 model.model_options.db_table, [field.column], self.quote_name
1720 ),
1721 )
1722
1723 def _delete_primary_key_sql(self, model: type[Model], name: str) -> Statement:
1724 return self._delete_constraint_sql(self.sql_delete_pk, model, name)
1725
1726 def _collate_sql(
1727 self,
1728 collation: str | None,
1729 old_collation: str | None = None,
1730 table_name: str | None = None,
1731 ) -> str:
1732 return "COLLATE " + self.quote_name(collation) if collation else ""