1from __future__ import annotations
2
3import logging
4import operator
5from abc import ABC, abstractmethod
6from collections.abc import Generator
7from datetime import datetime
8from typing import TYPE_CHECKING, Any
9
10if TYPE_CHECKING:
11 from typing import Self
12
13from plain.models.backends.ddl_references import (
14 Columns,
15 Expressions,
16 ForeignKeyName,
17 IndexName,
18 Statement,
19 Table,
20)
21from plain.models.backends.utils import names_digest, split_identifier, truncate_name
22from plain.models.constraints import Deferrable
23from plain.models.indexes import Index
24from plain.models.sql import Query
25from plain.models.transaction import TransactionManagementError, atomic
26from plain.utils import timezone
27
28if TYPE_CHECKING:
29 from collections.abc import Iterable
30
31 from plain.models.backends.base.base import BaseDatabaseWrapper
32 from plain.models.base import Model
33 from plain.models.constraints import BaseConstraint
34 from plain.models.fields import Field
35 from plain.models.fields.related import ForeignKey, ManyToManyField
36 from plain.models.fields.reverse_related import ManyToManyRel
37
38logger = logging.getLogger("plain.models.backends.schema")
39
40
41def _is_relevant_relation(relation: Any, altered_field: Field) -> bool:
42 """
43 When altering the given field, must constraints on its model from the given
44 relation be temporarily dropped?
45 """
46 field = relation.field
47 if field.many_to_many:
48 # M2M reverse field
49 return False
50 if altered_field.primary_key:
51 # Foreign key constraint on the primary key, which is being altered.
52 return True
53 # ForeignKey always targets 'id'
54 return altered_field.name == "id"
55
56
57def _all_related_fields(model: type[Model]) -> list[Any]:
58 # Related fields must be returned in a deterministic order.
59 return sorted(
60 model._model_meta._get_fields(
61 forward=False,
62 reverse=True,
63 ),
64 key=operator.attrgetter("name"),
65 )
66
67
68def _related_non_m2m_objects(
69 old_field: Field, new_field: Field
70) -> Generator[tuple[Any, Any], None, None]:
71 # Filter out m2m objects from reverse relations.
72 # Return (old_relation, new_relation) tuples.
73 related_fields = zip(
74 (
75 obj
76 for obj in _all_related_fields(old_field.model)
77 if _is_relevant_relation(obj, old_field)
78 ),
79 (
80 obj
81 for obj in _all_related_fields(new_field.model)
82 if _is_relevant_relation(obj, new_field)
83 ),
84 )
85 for old_rel, new_rel in related_fields:
86 yield old_rel, new_rel
87 yield from _related_non_m2m_objects(
88 old_rel.remote_field,
89 new_rel.remote_field,
90 )
91
92
93class BaseDatabaseSchemaEditor(ABC):
94 """
95 This class and its subclasses are responsible for emitting schema-changing
96 statements to the databases - model creation/removal/alteration, field
97 renaming, index fiddling, and so on.
98 """
99
100 # Overrideable SQL templates
101 sql_create_table = "CREATE TABLE %(table)s (%(definition)s)"
102 sql_rename_table = "ALTER TABLE %(old_table)s RENAME TO %(new_table)s"
103 sql_delete_table = "DROP TABLE %(table)s CASCADE"
104
105 sql_create_column = "ALTER TABLE %(table)s ADD COLUMN %(column)s %(definition)s"
106 sql_alter_column = "ALTER TABLE %(table)s %(changes)s"
107 sql_alter_column_type = "ALTER COLUMN %(column)s TYPE %(type)s%(collation)s"
108 sql_alter_column_null = "ALTER COLUMN %(column)s DROP NOT NULL"
109 sql_alter_column_not_null = "ALTER COLUMN %(column)s SET NOT NULL"
110 sql_alter_column_default = "ALTER COLUMN %(column)s SET DEFAULT %(default)s"
111 sql_alter_column_no_default = "ALTER COLUMN %(column)s DROP DEFAULT"
112 sql_alter_column_no_default_null = sql_alter_column_no_default
113 sql_delete_column = "ALTER TABLE %(table)s DROP COLUMN %(column)s CASCADE"
114 sql_rename_column = (
115 "ALTER TABLE %(table)s RENAME COLUMN %(old_column)s TO %(new_column)s"
116 )
117 sql_update_with_default = (
118 "UPDATE %(table)s SET %(column)s = %(default)s WHERE %(column)s IS NULL"
119 )
120
121 sql_unique_constraint = "UNIQUE (%(columns)s)%(deferrable)s"
122 sql_check_constraint = "CHECK (%(check)s)"
123 sql_delete_constraint = "ALTER TABLE %(table)s DROP CONSTRAINT %(name)s"
124 sql_constraint = "CONSTRAINT %(name)s %(constraint)s"
125
126 sql_create_check = "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s CHECK (%(check)s)"
127 sql_delete_check = sql_delete_constraint
128
129 sql_create_unique = (
130 "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s "
131 "UNIQUE (%(columns)s)%(deferrable)s"
132 )
133 sql_delete_unique = sql_delete_constraint
134
135 sql_create_fk = (
136 "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s FOREIGN KEY (%(column)s) "
137 "REFERENCES %(to_table)s (%(to_column)s)%(deferrable)s"
138 )
139 sql_create_inline_fk = None
140 sql_create_column_inline_fk = None
141 sql_delete_fk = sql_delete_constraint
142
143 sql_create_index = (
144 "CREATE INDEX %(name)s ON %(table)s "
145 "(%(columns)s)%(include)s%(extra)s%(condition)s"
146 )
147 sql_create_unique_index = (
148 "CREATE UNIQUE INDEX %(name)s ON %(table)s "
149 "(%(columns)s)%(include)s%(condition)s"
150 )
151 sql_rename_index = "ALTER INDEX %(old_name)s RENAME TO %(new_name)s"
152 sql_delete_index = "DROP INDEX %(name)s"
153
154 sql_create_pk = (
155 "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s PRIMARY KEY (%(columns)s)"
156 )
157 sql_delete_pk = sql_delete_constraint
158
159 sql_alter_table_comment = "COMMENT ON TABLE %(table)s IS %(comment)s"
160 sql_alter_column_comment = "COMMENT ON COLUMN %(table)s.%(column)s IS %(comment)s"
161
162 def __init__(
163 self,
164 connection: BaseDatabaseWrapper,
165 atomic: bool = True,
166 ):
167 self.connection = connection
168 self.atomic_migration = self.connection.features.can_rollback_ddl and atomic
169
170 # State-managing methods
171
172 def __enter__(self) -> Self:
173 self.deferred_sql: list[Any] = []
174 self.executed_sql: list[str] = []
175 if self.atomic_migration:
176 self.atomic = atomic()
177 self.atomic.__enter__()
178 return self
179
180 def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
181 if exc_type is None:
182 for sql in self.deferred_sql:
183 self.execute(sql)
184 if self.atomic_migration:
185 self.atomic.__exit__(exc_type, exc_value, traceback)
186
187 # Core utility functions
188
189 def execute(
190 self, sql: str | Statement, params: tuple[Any, ...] | list[Any] | None = ()
191 ) -> None:
192 """Execute the given SQL statement, with optional parameters."""
193 if (
194 self.connection.in_atomic_block
195 and not self.connection.features.can_rollback_ddl
196 ):
197 raise TransactionManagementError(
198 "Executing DDL statements while in a transaction on databases "
199 "that can't perform a rollback is prohibited."
200 )
201 # Account for non-string statement objects.
202 sql = str(sql)
203 # Log the command we're running, then run it
204 logger.debug(
205 "%s; (params %r)", sql, params, extra={"params": params, "sql": sql}
206 )
207
208 # Track executed SQL for display in migration output
209 # Store the SQL for display (interpolate params for readability)
210 if params:
211 self.executed_sql.append(sql % tuple(map(self.quote_value, params)))
212 else:
213 self.executed_sql.append(sql)
214
215 with self.connection.cursor() as cursor:
216 cursor.execute(sql, params)
217
218 def quote_name(self, name: str) -> str:
219 return self.connection.ops.quote_name(name)
220
221 def table_sql(self, model: type[Model]) -> tuple[str, list[Any]]:
222 """Take a model and return its table definition."""
223 # Create column SQL, add FK deferreds if needed.
224 column_sqls = []
225 params = []
226 for field in model._model_meta.local_fields:
227 # SQL.
228 definition, extra_params = self.column_sql(model, field)
229 if definition is None:
230 continue
231 # Check constraints can go on the column SQL here.
232 db_params = field.db_parameters(connection=self.connection)
233 if db_params["check"]:
234 definition += " " + self.sql_check_constraint % db_params
235 # Autoincrement SQL (for backends with inline variant).
236 col_type_suffix = field.db_type_suffix(connection=self.connection)
237 if col_type_suffix:
238 definition += f" {col_type_suffix}"
239 if extra_params:
240 params.extend(extra_params)
241 # FK.
242 if field.remote_field and field.db_constraint: # type: ignore[attr-defined]
243 # After checking these attributes, we know field is a ForeignKey
244 fk_field: ForeignKey = field # type: ignore[assignment]
245 assert fk_field.remote_field is not None # Checked in if condition
246 to_table = fk_field.remote_field.model.model_options.db_table
247 to_column = fk_field.remote_field.model._model_meta.get_field(
248 fk_field.remote_field.field_name
249 ).column
250 if self.sql_create_inline_fk:
251 definition += " " + self.sql_create_inline_fk % {
252 "to_table": self.quote_name(to_table),
253 "to_column": self.quote_name(to_column),
254 }
255 elif self.connection.features.supports_foreign_keys:
256 self.deferred_sql.append(
257 self._create_fk_sql(
258 model, fk_field, "_fk_%(to_table)s_%(to_column)s"
259 )
260 )
261 # Add the SQL to our big list.
262 column_sqls.append(f"{self.quote_name(field.column)} {definition}")
263 # Autoincrement SQL (for backends with post table definition
264 # variant).
265 if field.get_internal_type() in ("PrimaryKeyField",):
266 autoinc_sql = self.connection.ops.autoinc_sql(
267 model.model_options.db_table, field.column
268 )
269 if autoinc_sql:
270 self.deferred_sql.extend(autoinc_sql)
271 constraints = [
272 constraint.constraint_sql(model, self)
273 for constraint in model.model_options.constraints
274 ]
275 sql = self.sql_create_table % {
276 "table": self.quote_name(model.model_options.db_table),
277 "definition": ", ".join(
278 str(constraint)
279 for constraint in (*column_sqls, *constraints)
280 if constraint
281 ),
282 }
283 return sql, params
284
285 # Field <-> database mapping functions
286
287 def _iter_column_sql(
288 self,
289 column_db_type: str,
290 params: list[Any],
291 model: type[Model],
292 field: Field,
293 field_db_params: dict[str, Any],
294 include_default: bool,
295 ) -> Generator[str, None, None]:
296 yield column_db_type
297 if collation := field_db_params.get("collation"):
298 yield self._collate_sql(collation)
299 if self.connection.features.supports_comments_inline and field.db_comment:
300 yield self._comment_sql(field.db_comment)
301 # Work out nullability.
302 null = field.allow_null
303 # Include a default value, if requested.
304 include_default = (
305 include_default
306 and not self.skip_default(field)
307 and
308 # Don't include a default value if it's a nullable field and the
309 # default cannot be dropped in the ALTER COLUMN statement (e.g.
310 # MySQL longtext and longblob).
311 not (null and self.skip_default_on_alter(field))
312 )
313 if include_default:
314 default_value = self.effective_default(field)
315 if default_value is not None:
316 column_default = "DEFAULT " + self._column_default_sql(field)
317 if self.connection.features.requires_literal_defaults:
318 # Some databases can't take defaults as a parameter (Oracle).
319 # If this is the case, the individual schema backend should
320 # implement prepare_default().
321 yield column_default % self.prepare_default(default_value)
322 else:
323 yield column_default
324 params.append(default_value)
325
326 if not null:
327 yield "NOT NULL"
328 elif not self.connection.features.implied_column_null:
329 yield "NULL"
330
331 if field.primary_key:
332 yield "PRIMARY KEY"
333
334 def column_sql(
335 self, model: type[Model], field: Field, include_default: bool = False
336 ) -> tuple[str | None, list[Any] | None]:
337 """
338 Return the column definition for a field. The field must already have
339 had set_attributes_from_name() called.
340 """
341 # Get the column's type and use that as the basis of the SQL.
342 field_db_params = field.db_parameters(connection=self.connection)
343 column_db_type = field_db_params["type"]
344 # Check for fields that aren't actually columns (e.g. M2M).
345 if column_db_type is None:
346 return None, None
347 params: list[Any] = []
348 return (
349 " ".join(
350 # This appends to the params being returned.
351 self._iter_column_sql(
352 column_db_type,
353 params,
354 model,
355 field,
356 field_db_params,
357 include_default,
358 )
359 ),
360 params,
361 )
362
363 def skip_default(self, field: Field) -> bool:
364 """
365 Some backends don't accept default values for certain columns types
366 (i.e. MySQL longtext and longblob).
367 """
368 return False
369
370 def skip_default_on_alter(self, field: Field) -> bool:
371 """
372 Some backends don't accept default values for certain columns types
373 (i.e. MySQL longtext and longblob) in the ALTER COLUMN statement.
374 """
375 return False
376
377 def prepare_default(self, value: Any) -> str:
378 """
379 Only used for backends which have requires_literal_defaults feature
380 """
381 raise NotImplementedError(
382 "subclasses of BaseDatabaseSchemaEditor for backends which have "
383 "requires_literal_defaults must provide a prepare_default() method"
384 )
385
386 def _column_default_sql(self, field: Field) -> str:
387 """
388 Return the SQL to use in a DEFAULT clause. The resulting string should
389 contain a '%s' placeholder for a default value.
390 """
391 return "%s"
392
393 @staticmethod
394 def _effective_default(field: Field) -> Any:
395 # This method allows testing its logic without a connection.
396 if field.has_default():
397 default = field.get_default()
398 elif (
399 not field.allow_null and not field.required and field.empty_strings_allowed
400 ):
401 if field.get_internal_type() == "BinaryField":
402 default = b""
403 else:
404 default = ""
405 elif getattr(field, "auto_now", False) or getattr(field, "auto_now_add", False):
406 internal_type = field.get_internal_type()
407 if internal_type == "DateTimeField":
408 default = timezone.now()
409 else:
410 default = datetime.now()
411 if internal_type == "DateField":
412 default = default.date()
413 elif internal_type == "TimeField":
414 default = default.time()
415 else:
416 default = None
417 return default
418
419 def effective_default(self, field: Field) -> Any:
420 """Return a field's effective database default value."""
421 return field.get_db_prep_save(self._effective_default(field), self.connection)
422
423 @abstractmethod
424 def quote_value(self, value: Any) -> str:
425 """
426 Return a quoted version of the value so it's safe to use in an SQL
427 string. This is not safe against injection from user code; it is
428 intended only for use in making SQL scripts or preparing default values
429 for particularly tricky backends (defaults are not user-defined, though,
430 so this is safe).
431 """
432 ...
433
434 # Actions
435
436 def create_model(self, model: type[Model]) -> None:
437 """
438 Create a table and any accompanying indexes or unique constraints for
439 the given `model`.
440 """
441 sql, params = self.table_sql(model)
442 # Prevent using [] as params, in the case a literal '%' is used in the
443 # definition.
444 self.execute(sql, params or None)
445
446 if self.connection.features.supports_comments:
447 # Add table comment.
448 if model.model_options.db_table_comment:
449 self.alter_db_table_comment(
450 model, None, model.model_options.db_table_comment
451 )
452 # Add column comments.
453 if not self.connection.features.supports_comments_inline:
454 for field in model._model_meta.local_fields:
455 if field.db_comment:
456 field_db_params = field.db_parameters(
457 connection=self.connection
458 )
459 field_type = field_db_params["type"]
460 self.execute(
461 *self._alter_column_comment_sql(
462 model, field, field_type, field.db_comment
463 )
464 )
465 # Add any field index (deferred as SQLite _remake_table needs it).
466 self.deferred_sql.extend(self._model_indexes_sql(model))
467
468 def delete_model(self, model: type[Model]) -> None:
469 """Delete a model from the database."""
470
471 # Delete the table
472 self.execute(
473 self.sql_delete_table
474 % {
475 "table": self.quote_name(model.model_options.db_table),
476 }
477 )
478 # Remove all deferred statements referencing the deleted table.
479 for sql in list(self.deferred_sql):
480 if isinstance(sql, Statement) and sql.references_table(
481 model.model_options.db_table
482 ):
483 self.deferred_sql.remove(sql)
484
485 def add_index(self, model: type[Model], index: Index) -> None:
486 """Add an index on a model."""
487 if (
488 index.contains_expressions
489 and not self.connection.features.supports_expression_indexes
490 ):
491 return None
492 # Index.create_sql returns interpolated SQL which makes params=None a
493 # necessity to avoid escaping attempts on execution.
494 self.execute(index.create_sql(model, self), params=None)
495
496 def remove_index(self, model: type[Model], index: Index) -> None:
497 """Remove an index from a model."""
498 if (
499 index.contains_expressions
500 and not self.connection.features.supports_expression_indexes
501 ):
502 return None
503 self.execute(index.remove_sql(model, self))
504
505 def rename_index(
506 self, model: type[Model], old_index: Index, new_index: Index
507 ) -> None:
508 if self.connection.features.can_rename_index:
509 self.execute(
510 self._rename_index_sql(model, old_index.name, new_index.name),
511 params=None,
512 )
513 else:
514 self.remove_index(model, old_index)
515 self.add_index(model, new_index)
516
517 def add_constraint(self, model: type[Model], constraint: BaseConstraint) -> None:
518 """Add a constraint to a model."""
519 sql = constraint.create_sql(model, self)
520 if sql:
521 # Constraint.create_sql returns interpolated SQL which makes
522 # params=None a necessity to avoid escaping attempts on execution.
523 self.execute(sql, params=None)
524
525 def remove_constraint(self, model: type[Model], constraint: BaseConstraint) -> None:
526 """Remove a constraint from a model."""
527 sql = constraint.remove_sql(model, self)
528 if sql:
529 self.execute(sql)
530
531 def alter_db_table(
532 self, model: type[Model], old_db_table: str, new_db_table: str
533 ) -> None:
534 """Rename the table a model points to."""
535 if old_db_table == new_db_table or (
536 self.connection.features.ignores_table_name_case
537 and old_db_table.lower() == new_db_table.lower()
538 ):
539 return
540 self.execute(
541 self.sql_rename_table
542 % {
543 "old_table": self.quote_name(old_db_table),
544 "new_table": self.quote_name(new_db_table),
545 }
546 )
547 # Rename all references to the old table name.
548 for sql in self.deferred_sql:
549 if isinstance(sql, Statement):
550 sql.rename_table_references(old_db_table, new_db_table)
551
552 def alter_db_table_comment(
553 self,
554 model: type[Model],
555 old_db_table_comment: str | None,
556 new_db_table_comment: str | None,
557 ) -> None:
558 self.execute(
559 self.sql_alter_table_comment
560 % {
561 "table": self.quote_name(model.model_options.db_table),
562 "comment": self.quote_value(new_db_table_comment or ""),
563 }
564 )
565
566 def add_field(self, model: type[Model], field: Field) -> None:
567 """
568 Create a field on a model. Usually involves adding a column, but may
569 involve adding a table instead (for M2M fields).
570 """
571 # Get the column's definition
572 definition, params = self.column_sql(model, field, include_default=True)
573 # It might not actually have a column behind it
574 if definition is None:
575 return
576 if col_type_suffix := field.db_type_suffix(connection=self.connection):
577 definition += f" {col_type_suffix}"
578 # Check constraints can go on the column SQL here
579 db_params = field.db_parameters(connection=self.connection)
580 if db_params["check"]:
581 definition += " " + self.sql_check_constraint % db_params
582 if (
583 field.remote_field
584 and self.connection.features.supports_foreign_keys
585 and field.db_constraint # type: ignore[attr-defined]
586 ):
587 # After checking these attributes, we know field is a ForeignKey
588 fk_field: ForeignKey = field # type: ignore[assignment]
589 assert fk_field.remote_field is not None # Checked in if condition
590 constraint_suffix = "_fk_%(to_table)s_%(to_column)s"
591 # Add FK constraint inline, if supported.
592 if self.sql_create_column_inline_fk:
593 to_table = fk_field.remote_field.model.model_options.db_table
594 to_column = fk_field.remote_field.model._model_meta.get_field(
595 fk_field.remote_field.field_name
596 ).column
597 namespace, _ = split_identifier(model.model_options.db_table)
598 definition += " " + self.sql_create_column_inline_fk % {
599 "name": self._fk_constraint_name(
600 model, fk_field, constraint_suffix
601 ),
602 "namespace": f"{self.quote_name(namespace)}." if namespace else "",
603 "column": self.quote_name(fk_field.column),
604 "to_table": self.quote_name(to_table),
605 "to_column": self.quote_name(to_column),
606 "deferrable": self.connection.ops.deferrable_sql(),
607 }
608 # Otherwise, add FK constraints later.
609 else:
610 self.deferred_sql.append(
611 self._create_fk_sql(model, fk_field, constraint_suffix)
612 )
613 # Build the SQL and run it
614 sql = self.sql_create_column % {
615 "table": self.quote_name(model.model_options.db_table),
616 "column": self.quote_name(field.column),
617 "definition": definition,
618 }
619 self.execute(sql, params)
620 # Drop the default if we need to
621 # (Plain usually does not use in-database defaults)
622 if (
623 not self.skip_default_on_alter(field)
624 and self.effective_default(field) is not None
625 ):
626 changes_sql, params = self._alter_column_default_sql(
627 model, None, field, drop=True
628 )
629 sql = self.sql_alter_column % {
630 "table": self.quote_name(model.model_options.db_table),
631 "changes": changes_sql,
632 }
633 self.execute(sql, params)
634 # Add field comment, if required.
635 if (
636 field.db_comment
637 and self.connection.features.supports_comments
638 and not self.connection.features.supports_comments_inline
639 ):
640 field_type = db_params["type"]
641 self.execute(
642 *self._alter_column_comment_sql(
643 model, field, field_type, field.db_comment
644 )
645 )
646 # Add an index, if required
647 self.deferred_sql.extend(self._field_indexes_sql(model, field))
648 # Reset connection if required
649 if self.connection.features.connection_persists_old_columns:
650 self.connection.close()
651
652 def remove_field(self, model: type[Model], field: Field) -> None:
653 """
654 Remove a field from a model. Usually involves deleting a column,
655 but for M2Ms may involve deleting a table.
656 """
657 # It might not actually have a column behind it
658 if field.db_parameters(connection=self.connection)["type"] is None:
659 return
660 # Drop any FK constraints, MySQL requires explicit deletion
661 if field.remote_field:
662 fk_names = self._constraint_names(model, [field.column], foreign_key=True)
663 for fk_name in fk_names:
664 self.execute(self._delete_fk_sql(model, fk_name))
665 # Delete the column
666 sql = self.sql_delete_column % {
667 "table": self.quote_name(model.model_options.db_table),
668 "column": self.quote_name(field.column),
669 }
670 self.execute(sql)
671 # Reset connection if required
672 if self.connection.features.connection_persists_old_columns:
673 self.connection.close()
674 # Remove all deferred statements referencing the deleted column.
675 for sql in list(self.deferred_sql):
676 if isinstance(sql, Statement) and sql.references_column(
677 model.model_options.db_table, field.column
678 ):
679 self.deferred_sql.remove(sql)
680
681 def alter_field(
682 self,
683 model: type[Model],
684 old_field: Field,
685 new_field: Field,
686 strict: bool = False,
687 ) -> None:
688 """
689 Allow a field's type, uniqueness, nullability, default, column,
690 constraints, etc. to be modified.
691 `old_field` is required to compute the necessary changes.
692 If `strict` is True, raise errors if the old column does not match
693 `old_field` precisely.
694 """
695 if not self._field_should_be_altered(old_field, new_field):
696 return
697 # Ensure this field is even column-based
698 old_db_params = old_field.db_parameters(connection=self.connection)
699 old_type = old_db_params["type"]
700 new_db_params = new_field.db_parameters(connection=self.connection)
701 new_type = new_db_params["type"]
702 if (old_type is None and old_field.remote_field is None) or (
703 new_type is None and new_field.remote_field is None
704 ):
705 raise ValueError(
706 f"Cannot alter field {old_field} into {new_field} - they do not properly define "
707 "db_type (are you using a badly-written custom field?)",
708 )
709 elif (
710 old_type is None
711 and new_type is None
712 and (old_field.remote_field.through and new_field.remote_field.through) # type: ignore[attr-defined]
713 ):
714 # Both sides have through models; this is a no-op.
715 return
716 elif old_type is None or new_type is None:
717 raise ValueError(
718 f"Cannot alter field {old_field} into {new_field} - they are not compatible types "
719 "(you cannot alter to or from M2M fields, or add or remove "
720 "through= on M2M fields)"
721 )
722
723 self._alter_field(
724 model,
725 old_field,
726 new_field,
727 old_type,
728 new_type,
729 old_db_params,
730 new_db_params,
731 strict,
732 )
733
734 def _field_db_check(
735 self, field: Field, field_db_params: dict[str, Any]
736 ) -> str | None:
737 # Always check constraints with the same mocked column name to avoid
738 # recreating constrains when the column is renamed.
739 check_constraints = self.connection.data_type_check_constraints
740 data = field.db_type_parameters(self.connection)
741 data["column"] = "__column_name__"
742 try:
743 return check_constraints[field.get_internal_type()] % data
744 except KeyError:
745 return None
746
747 def _alter_field(
748 self,
749 model: type[Model],
750 old_field: Field,
751 new_field: Field,
752 old_type: str,
753 new_type: str,
754 old_db_params: dict[str, Any],
755 new_db_params: dict[str, Any],
756 strict: bool = False,
757 ) -> None:
758 """Perform a "physical" (non-ManyToMany) field update."""
759 # Drop any FK constraints, we'll remake them later
760 fks_dropped = set()
761 if (
762 self.connection.features.supports_foreign_keys
763 and old_field.remote_field
764 and old_field.db_constraint # type: ignore[attr-defined]
765 and self._field_should_be_altered(
766 old_field,
767 new_field,
768 ignore={"db_comment"},
769 )
770 ):
771 fk_names = self._constraint_names(
772 model, [old_field.column], foreign_key=True
773 )
774 if strict and len(fk_names) != 1:
775 raise ValueError(
776 f"Found wrong number ({len(fk_names)}) of foreign key constraints for {model.model_options.db_table}.{old_field.column}"
777 )
778 for fk_name in fk_names:
779 fks_dropped.add((old_field.column,))
780 self.execute(self._delete_fk_sql(model, fk_name))
781 # Has unique been removed?
782 if old_field.primary_key and (
783 not new_field.primary_key
784 or self._field_became_primary_key(old_field, new_field)
785 ):
786 # Find the unique constraint for this field
787 meta_constraint_names = {
788 constraint.name for constraint in model.model_options.constraints
789 }
790 constraint_names = self._constraint_names(
791 model,
792 [old_field.column],
793 unique=True,
794 primary_key=False,
795 exclude=meta_constraint_names,
796 )
797 if strict and len(constraint_names) != 1:
798 raise ValueError(
799 f"Found wrong number ({len(constraint_names)}) of unique constraints for {model.model_options.db_table}.{old_field.column}"
800 )
801 for constraint_name in constraint_names:
802 sql = self._delete_unique_sql(model, constraint_name)
803 if sql is not None:
804 self.execute(sql)
805 # Drop incoming FK constraints if the field is a primary key or unique,
806 # which might be a to_field target, and things are going to change.
807 old_collation = old_db_params.get("collation")
808 new_collation = new_db_params.get("collation")
809 drop_foreign_keys = (
810 self.connection.features.supports_foreign_keys
811 and (old_field.primary_key and new_field.primary_key)
812 and ((old_type != new_type) or (old_collation != new_collation))
813 )
814 if drop_foreign_keys:
815 # '_model_meta.related_field' also contains M2M reverse fields, these
816 # will be filtered out
817 for _old_rel, new_rel in _related_non_m2m_objects(old_field, new_field):
818 rel_fk_names = self._constraint_names(
819 new_rel.related_model, [new_rel.field.column], foreign_key=True
820 )
821 for fk_name in rel_fk_names:
822 self.execute(self._delete_fk_sql(new_rel.related_model, fk_name))
823 # Removed an index? (no strict check, as multiple indexes are possible)
824 # Remove indexes if db_index switched to False or a unique constraint
825 # will now be used in lieu of an index. The following lines from the
826 # truth table show all True cases; the rest are False:
827 #
828 # old_field.db_index | old_field.primary_key | new_field.db_index | new_field.primary_key
829 # ------------------------------------------------------------------------------
830 # True | False | False | False
831 # True | False | False | True
832 # True | False | True | True
833 if (
834 (old_field.remote_field and old_field.db_index) # type: ignore[attr-defined]
835 and not old_field.primary_key
836 and (
837 not (new_field.remote_field and new_field.db_index) # type: ignore[attr-defined]
838 or new_field.primary_key
839 )
840 ):
841 # Find the index for this field
842 meta_index_names = {index.name for index in model.model_options.indexes}
843 # Retrieve only BTREE indexes since this is what's created with
844 # db_index=True.
845 index_names = self._constraint_names(
846 model,
847 [old_field.column],
848 index=True,
849 type_=Index.suffix,
850 exclude=meta_index_names,
851 )
852 for index_name in index_names:
853 # The only way to check if an index was created with
854 # db_index=True or with Index(['field'], name='foo')
855 # is to look at its name (refs #28053).
856 self.execute(self._delete_index_sql(model, index_name))
857 # Change check constraints?
858 old_db_check = self._field_db_check(old_field, old_db_params)
859 new_db_check = self._field_db_check(new_field, new_db_params)
860 if old_db_check != new_db_check and old_db_check:
861 meta_constraint_names = {
862 constraint.name for constraint in model.model_options.constraints
863 }
864 constraint_names = self._constraint_names(
865 model,
866 [old_field.column],
867 check=True,
868 exclude=meta_constraint_names,
869 )
870 if strict and len(constraint_names) != 1:
871 raise ValueError(
872 f"Found wrong number ({len(constraint_names)}) of check constraints for {model.model_options.db_table}.{old_field.column}"
873 )
874 for constraint_name in constraint_names:
875 sql = self._delete_check_sql(model, constraint_name)
876 if sql is not None:
877 self.execute(sql)
878 # Have they renamed the column?
879 if old_field.column != new_field.column:
880 self.execute(
881 self._rename_field_sql(
882 model.model_options.db_table, old_field, new_field, new_type
883 )
884 )
885 # Rename all references to the renamed column.
886 for sql in self.deferred_sql:
887 if isinstance(sql, Statement):
888 sql.rename_column_references(
889 model.model_options.db_table, old_field.column, new_field.column
890 )
891 # Next, start accumulating actions to do
892 actions = []
893 null_actions = []
894 post_actions = []
895 # Type suffix change? (e.g. auto increment).
896 old_type_suffix = old_field.db_type_suffix(connection=self.connection)
897 new_type_suffix = new_field.db_type_suffix(connection=self.connection)
898 # Type, collation, or comment change?
899 if (
900 old_type != new_type
901 or old_type_suffix != new_type_suffix
902 or old_collation != new_collation
903 or (
904 self.connection.features.supports_comments
905 and old_field.db_comment != new_field.db_comment
906 )
907 ):
908 fragment, other_actions = self._alter_column_type_sql(
909 model, old_field, new_field, new_type, old_collation, new_collation
910 )
911 actions.append(fragment)
912 post_actions.extend(other_actions)
913 # When changing a column NULL constraint to NOT NULL with a given
914 # default value, we need to perform 4 steps:
915 # 1. Add a default for new incoming writes
916 # 2. Update existing NULL rows with new default
917 # 3. Replace NULL constraint with NOT NULL
918 # 4. Drop the default again.
919 # Default change?
920 needs_database_default = False
921 if old_field.allow_null and not new_field.allow_null:
922 old_default = self.effective_default(old_field)
923 new_default = self.effective_default(new_field)
924 if (
925 not self.skip_default_on_alter(new_field)
926 and old_default != new_default
927 and new_default is not None
928 ):
929 needs_database_default = True
930 actions.append(
931 self._alter_column_default_sql(model, old_field, new_field)
932 )
933 # Nullability change?
934 if old_field.allow_null != new_field.allow_null:
935 fragment = self._alter_column_null_sql(model, old_field, new_field)
936 if fragment:
937 null_actions.append(fragment)
938 # Only if we have a default and there is a change from NULL to NOT NULL
939 four_way_default_alteration = new_field.has_default() and (
940 old_field.allow_null and not new_field.allow_null
941 )
942 if actions or null_actions:
943 if not four_way_default_alteration:
944 # If we don't have to do a 4-way default alteration we can
945 # directly run a (NOT) NULL alteration
946 actions += null_actions
947 # Combine actions together if we can (e.g. postgres)
948 if self.connection.features.supports_combined_alters and actions:
949 sql, params = tuple(zip(*actions))
950 actions = [(", ".join(sql), sum(params, []))]
951 # Apply those actions
952 for sql, params in actions:
953 self.execute(
954 self.sql_alter_column
955 % {
956 "table": self.quote_name(model.model_options.db_table),
957 "changes": sql,
958 },
959 params,
960 )
961 if four_way_default_alteration:
962 # Update existing rows with default value
963 self.execute(
964 self.sql_update_with_default
965 % {
966 "table": self.quote_name(model.model_options.db_table),
967 "column": self.quote_name(new_field.column),
968 "default": "%s",
969 },
970 [new_default],
971 )
972 # Since we didn't run a NOT NULL change before we need to do it
973 # now
974 for sql, params in null_actions:
975 self.execute(
976 self.sql_alter_column
977 % {
978 "table": self.quote_name(model.model_options.db_table),
979 "changes": sql,
980 },
981 params,
982 )
983 if post_actions:
984 for sql, params in post_actions:
985 self.execute(sql, params)
986 # If primary_key changed to False, delete the primary key constraint.
987 if old_field.primary_key and not new_field.primary_key:
988 self._delete_primary_key(model, strict)
989
990 # Added an index? Add an index if db_index switched to True or a unique
991 # constraint will no longer be used in lieu of an index. The following
992 # lines from the truth table show all True cases; the rest are False:
993 #
994 # old_field.db_index | old_field.primary_key | new_field.db_index | new_field.primary_key
995 # ------------------------------------------------------------------------------
996 # False | False | True | False
997 # False | True | True | False
998 # True | True | True | False
999 if (
1000 (
1001 not (old_field.remote_field and old_field.db_index) # type: ignore[attr-defined]
1002 or old_field.primary_key
1003 )
1004 and (new_field.remote_field and new_field.db_index) # type: ignore[attr-defined]
1005 and not new_field.primary_key
1006 ):
1007 self.execute(self._create_index_sql(model, fields=[new_field]))
1008 # Type alteration on primary key? Then we need to alter the column
1009 # referring to us.
1010 rels_to_update = []
1011 if drop_foreign_keys:
1012 rels_to_update.extend(_related_non_m2m_objects(old_field, new_field))
1013 # Changed to become primary key?
1014 if self._field_became_primary_key(old_field, new_field):
1015 # Make the new one
1016 self.execute(self._create_primary_key_sql(model, new_field))
1017 # Update all referencing columns
1018 rels_to_update.extend(_related_non_m2m_objects(old_field, new_field))
1019 # Handle our type alters on the other end of rels from the PK stuff above
1020 for old_rel, new_rel in rels_to_update:
1021 rel_db_params = new_rel.field.db_parameters(connection=self.connection)
1022 rel_type = rel_db_params["type"]
1023 rel_collation = rel_db_params.get("collation")
1024 old_rel_db_params = old_rel.field.db_parameters(connection=self.connection)
1025 old_rel_collation = old_rel_db_params.get("collation")
1026 fragment, other_actions = self._alter_column_type_sql(
1027 new_rel.related_model,
1028 old_rel.field,
1029 new_rel.field,
1030 rel_type,
1031 old_rel_collation,
1032 rel_collation,
1033 )
1034 self.execute(
1035 self.sql_alter_column
1036 % {
1037 "table": self.quote_name(
1038 new_rel.related_model.model_options.db_table
1039 ),
1040 "changes": fragment[0],
1041 },
1042 fragment[1],
1043 )
1044 for sql, params in other_actions:
1045 self.execute(sql, params)
1046 # Does it have a foreign key?
1047 if (
1048 self.connection.features.supports_foreign_keys
1049 and new_field.remote_field
1050 and (
1051 fks_dropped or not old_field.remote_field or not old_field.db_constraint # type: ignore[attr-defined]
1052 )
1053 and new_field.db_constraint # type: ignore[attr-defined]
1054 ):
1055 # After checking these attributes, we know new_field is a ForeignKey
1056 fk_field: ForeignKey = new_field # type: ignore[assignment]
1057 assert fk_field.remote_field is not None # Checked in if condition
1058 self.execute(
1059 self._create_fk_sql(model, fk_field, "_fk_%(to_table)s_%(to_column)s")
1060 )
1061 # Rebuild FKs that pointed to us if we previously had to drop them
1062 if drop_foreign_keys:
1063 for _, rel in rels_to_update:
1064 if rel.field.db_constraint:
1065 self.execute(
1066 self._create_fk_sql(rel.related_model, rel.field, "_fk")
1067 )
1068 # Does it have check constraints we need to add?
1069 if old_db_check != new_db_check and new_db_check:
1070 constraint_name = self._create_index_name(
1071 model.model_options.db_table, [new_field.column], suffix="_check"
1072 )
1073 sql = self._create_check_sql(model, constraint_name, new_db_params["check"])
1074 if sql is not None:
1075 self.execute(sql)
1076 # Drop the default if we need to
1077 # (Plain usually does not use in-database defaults)
1078 if needs_database_default:
1079 changes_sql, params = self._alter_column_default_sql(
1080 model, old_field, new_field, drop=True
1081 )
1082 sql = self.sql_alter_column % {
1083 "table": self.quote_name(model.model_options.db_table),
1084 "changes": changes_sql,
1085 }
1086 self.execute(sql, params)
1087 # Reset connection if required
1088 if self.connection.features.connection_persists_old_columns:
1089 self.connection.close()
1090
1091 def _alter_column_null_sql(
1092 self, model: type[Model], old_field: Field, new_field: Field
1093 ) -> tuple[str, list[Any]]:
1094 """
1095 Hook to specialize column null alteration.
1096
1097 Return a (sql, params) fragment to set a column to null or non-null
1098 as required by new_field, or None if no changes are required.
1099 """
1100 new_db_params = new_field.db_parameters(connection=self.connection)
1101 sql = (
1102 self.sql_alter_column_null
1103 if new_field.allow_null
1104 else self.sql_alter_column_not_null
1105 )
1106 return (
1107 sql
1108 % {
1109 "column": self.quote_name(new_field.column),
1110 "type": new_db_params["type"],
1111 },
1112 [],
1113 )
1114
1115 def _alter_column_default_sql(
1116 self,
1117 model: type[Model],
1118 old_field: Field | None,
1119 new_field: Field,
1120 drop: bool = False,
1121 ) -> tuple[str, list[Any]]:
1122 """
1123 Hook to specialize column default alteration.
1124
1125 Return a (sql, params) fragment to add or drop (depending on the drop
1126 argument) a default to new_field's column.
1127 """
1128 new_default = self.effective_default(new_field)
1129 default = self._column_default_sql(new_field)
1130 params = [new_default]
1131
1132 if drop:
1133 params = []
1134 elif self.connection.features.requires_literal_defaults:
1135 # Some databases (Oracle) can't take defaults as a parameter
1136 # If this is the case, the SchemaEditor for that database should
1137 # implement prepare_default().
1138 default = self.prepare_default(new_default)
1139 params = []
1140
1141 new_db_params = new_field.db_parameters(connection=self.connection)
1142 if drop:
1143 if new_field.allow_null:
1144 sql = self.sql_alter_column_no_default_null
1145 else:
1146 sql = self.sql_alter_column_no_default
1147 else:
1148 sql = self.sql_alter_column_default
1149 return (
1150 sql
1151 % {
1152 "column": self.quote_name(new_field.column),
1153 "type": new_db_params["type"],
1154 "default": default,
1155 },
1156 params,
1157 )
1158
1159 def _alter_column_type_sql(
1160 self,
1161 model: type[Model],
1162 old_field: Field,
1163 new_field: Field,
1164 new_type: str,
1165 old_collation: str | None,
1166 new_collation: str | None,
1167 ) -> tuple[tuple[str, list[Any]], list[tuple[str, list[Any]]]]:
1168 """
1169 Hook to specialize column type alteration for different backends,
1170 for cases when a creation type is different to an alteration type
1171 (e.g. SERIAL in PostgreSQL, PostGIS fields).
1172
1173 Return a two-tuple of: an SQL fragment of (sql, params) to insert into
1174 an ALTER TABLE statement and a list of extra (sql, params) tuples to
1175 run once the field is altered.
1176 """
1177 other_actions = []
1178 if collate_sql := self._collate_sql(
1179 new_collation, old_collation, model.model_options.db_table
1180 ):
1181 collate_sql = f" {collate_sql}"
1182 else:
1183 collate_sql = ""
1184 # Comment change?
1185 comment_sql = ""
1186 if self.connection.features.supports_comments and not new_field.many_to_many:
1187 if old_field.db_comment != new_field.db_comment:
1188 # PostgreSQL and Oracle can't execute 'ALTER COLUMN ...' and
1189 # 'COMMENT ON ...' at the same time.
1190 sql, params = self._alter_column_comment_sql(
1191 model, new_field, new_type, new_field.db_comment
1192 )
1193 if sql:
1194 other_actions.append((sql, params))
1195 if new_field.db_comment:
1196 comment_sql = self._comment_sql(new_field.db_comment)
1197 return (
1198 (
1199 self.sql_alter_column_type
1200 % {
1201 "column": self.quote_name(new_field.column),
1202 "type": new_type,
1203 "collation": collate_sql,
1204 "comment": comment_sql,
1205 },
1206 [],
1207 ),
1208 other_actions,
1209 )
1210
1211 def _alter_column_comment_sql(
1212 self,
1213 model: type[Model],
1214 new_field: Field,
1215 new_type: str,
1216 new_db_comment: str | None,
1217 ) -> tuple[str, list[Any]]:
1218 return (
1219 self.sql_alter_column_comment
1220 % {
1221 "table": self.quote_name(model.model_options.db_table),
1222 "column": self.quote_name(new_field.column),
1223 "comment": self._comment_sql(new_db_comment),
1224 },
1225 [],
1226 )
1227
1228 def _comment_sql(self, comment: str | None) -> str:
1229 return self.quote_value(comment or "")
1230
1231 def _alter_many_to_many(
1232 self,
1233 model: type[Model],
1234 old_field: ManyToManyField,
1235 new_field: ManyToManyField,
1236 strict: bool,
1237 ) -> None:
1238 """Alter M2Ms to repoint their to= endpoints."""
1239 # Type narrow for ManyToManyField.remote_field
1240 old_rel: ManyToManyRel = old_field.remote_field # type: ignore[assignment]
1241 new_rel: ManyToManyRel = new_field.remote_field # type: ignore[assignment]
1242
1243 # Rename the through table
1244 if (
1245 old_rel.through.model_options.db_table
1246 != new_rel.through.model_options.db_table
1247 ):
1248 self.alter_db_table(
1249 old_rel.through,
1250 old_rel.through.model_options.db_table,
1251 new_rel.through.model_options.db_table,
1252 )
1253 # Repoint the FK to the other side
1254 self.alter_field(
1255 new_rel.through,
1256 # The field that points to the target model is needed, so we can
1257 # tell alter_field to change it - this is m2m_reverse_field_name()
1258 # (as opposed to m2m_field_name(), which points to our model).
1259 old_rel.through._model_meta.get_field(
1260 old_field.m2m_reverse_field_name() # type: ignore[attr-defined]
1261 ),
1262 new_rel.through._model_meta.get_field(
1263 new_field.m2m_reverse_field_name() # type: ignore[attr-defined]
1264 ),
1265 )
1266 self.alter_field(
1267 new_rel.through,
1268 # for self-referential models we need to alter field from the other end too
1269 old_rel.through._model_meta.get_field(old_field.m2m_field_name()),
1270 new_rel.through._model_meta.get_field(new_field.m2m_field_name()),
1271 )
1272
1273 def _create_index_name(
1274 self, table_name: str, column_names: list[str], suffix: str = ""
1275 ) -> str:
1276 """
1277 Generate a unique name for an index/unique constraint.
1278
1279 The name is divided into 3 parts: the table name, the column names,
1280 and a unique digest and suffix.
1281 """
1282 _, table_name = split_identifier(table_name)
1283 hash_suffix_part = (
1284 f"{names_digest(table_name, *column_names, length=8)}{suffix}"
1285 )
1286 max_length = self.connection.ops.max_name_length() or 200
1287 # If everything fits into max_length, use that name.
1288 index_name = "{}_{}_{}".format(
1289 table_name, "_".join(column_names), hash_suffix_part
1290 )
1291 if len(index_name) <= max_length:
1292 return index_name
1293 # Shorten a long suffix.
1294 if len(hash_suffix_part) > max_length / 3:
1295 hash_suffix_part = hash_suffix_part[: max_length // 3]
1296 other_length = (max_length - len(hash_suffix_part)) // 2 - 1
1297 index_name = "{}_{}_{}".format(
1298 table_name[:other_length],
1299 "_".join(column_names)[:other_length],
1300 hash_suffix_part,
1301 )
1302 # Prepend D if needed to prevent the name from starting with an
1303 # underscore or a number (not permitted on Oracle).
1304 if index_name[0] == "_" or index_name[0].isdigit():
1305 index_name = f"D{index_name[:-1]}"
1306 return index_name
1307
1308 def _index_condition_sql(self, condition: str | None) -> str:
1309 if condition:
1310 return " WHERE " + condition
1311 return ""
1312
1313 def _index_include_sql(
1314 self, model: type[Model], columns: list[str] | None
1315 ) -> str | Statement:
1316 if not columns or not self.connection.features.supports_covering_indexes:
1317 return ""
1318 return Statement(
1319 " INCLUDE (%(columns)s)",
1320 columns=Columns(model.model_options.db_table, columns, self.quote_name),
1321 )
1322
1323 def _create_index_sql(
1324 self,
1325 model: type[Model],
1326 *,
1327 fields: list[Field] | None = None,
1328 name: str | None = None,
1329 suffix: str = "",
1330 using: str = "",
1331 col_suffixes: tuple[str, ...] = (),
1332 sql: str | None = None,
1333 opclasses: tuple[str, ...] = (),
1334 condition: str | None = None,
1335 include: list[str] | None = None,
1336 expressions: Any = None,
1337 ) -> Statement:
1338 """
1339 Return the SQL statement to create the index for one or several fields
1340 or expressions. `sql` can be specified if the syntax differs from the
1341 standard (GIS indexes, ...).
1342 """
1343 fields = fields or []
1344 expressions = expressions or []
1345 compiler = Query(model, alias_cols=False).get_compiler()
1346 columns = [field.column for field in fields]
1347 sql_create_index = sql or self.sql_create_index
1348 table = model.model_options.db_table
1349
1350 def create_index_name(*args: Any, **kwargs: Any) -> str:
1351 nonlocal name
1352 if name is None:
1353 name = self._create_index_name(*args, **kwargs)
1354 return self.quote_name(name)
1355
1356 return Statement(
1357 sql_create_index,
1358 table=Table(table, self.quote_name),
1359 name=IndexName(table, columns, suffix, create_index_name),
1360 using=using,
1361 columns=(
1362 self._index_columns(table, columns, col_suffixes, opclasses)
1363 if columns
1364 else Expressions(table, expressions, compiler, self.quote_value)
1365 ),
1366 extra="",
1367 condition=self._index_condition_sql(condition),
1368 include=self._index_include_sql(model, include),
1369 )
1370
1371 def _delete_index_sql(
1372 self, model: type[Model], name: str, sql: str | None = None
1373 ) -> Statement:
1374 return Statement(
1375 sql or self.sql_delete_index,
1376 table=Table(model.model_options.db_table, self.quote_name),
1377 name=self.quote_name(name),
1378 )
1379
1380 def _rename_index_sql(
1381 self, model: type[Model], old_name: str, new_name: str
1382 ) -> Statement:
1383 return Statement(
1384 self.sql_rename_index,
1385 table=Table(model.model_options.db_table, self.quote_name),
1386 old_name=self.quote_name(old_name),
1387 new_name=self.quote_name(new_name),
1388 )
1389
1390 def _index_columns(
1391 self,
1392 table: str,
1393 columns: list[str],
1394 col_suffixes: tuple[str, ...],
1395 opclasses: tuple[str, ...],
1396 ) -> Columns:
1397 return Columns(table, columns, self.quote_name, col_suffixes=col_suffixes)
1398
1399 def _model_indexes_sql(self, model: type[Model]) -> list[Statement | None]:
1400 """
1401 Return a list of all index SQL statements (field indexes, Meta.indexes) for the specified model.
1402 """
1403 output: list[Statement | None] = []
1404 for field in model._model_meta.local_fields:
1405 output.extend(self._field_indexes_sql(model, field))
1406
1407 for index in model.model_options.indexes:
1408 if (
1409 not index.contains_expressions
1410 or self.connection.features.supports_expression_indexes
1411 ):
1412 output.append(index.create_sql(model, self))
1413 return output
1414
1415 def _field_indexes_sql(self, model: type[Model], field: Field) -> list[Statement]:
1416 """
1417 Return a list of all index SQL statements for the specified field.
1418 """
1419 output: list[Statement] = []
1420 if self._field_should_be_indexed(model, field):
1421 output.append(self._create_index_sql(model, fields=[field]))
1422 return output
1423
1424 def _field_should_be_altered(
1425 self, old_field: Field, new_field: Field, ignore: set[str] | None = None
1426 ) -> bool:
1427 ignore = ignore or set()
1428 _, old_path, old_args, old_kwargs = old_field.deconstruct()
1429 _, new_path, new_args, new_kwargs = new_field.deconstruct()
1430 # Don't alter when:
1431 # - changing only a field name
1432 # - changing an attribute that doesn't affect the schema
1433 # - changing an attribute in the provided set of ignored attributes
1434 # - adding only a db_column and the column name is not changed
1435 for attr in ignore.union(old_field.non_db_attrs): # type: ignore[attr-defined]
1436 old_kwargs.pop(attr, None)
1437 for attr in ignore.union(new_field.non_db_attrs): # type: ignore[attr-defined]
1438 new_kwargs.pop(attr, None)
1439 return self.quote_name(old_field.column) != self.quote_name(
1440 new_field.column
1441 ) or (old_path, old_args, old_kwargs) != (new_path, new_args, new_kwargs)
1442
1443 def _field_should_be_indexed(self, model: type[Model], field: Field) -> bool:
1444 return (field.remote_field and field.db_index) and not field.primary_key # type: ignore[attr-defined]
1445
1446 def _field_became_primary_key(self, old_field: Field, new_field: Field) -> bool:
1447 return not old_field.primary_key and new_field.primary_key
1448
1449 def _rename_field_sql(
1450 self, table: str, old_field: Field, new_field: Field, new_type: str
1451 ) -> str:
1452 return self.sql_rename_column % {
1453 "table": self.quote_name(table),
1454 "old_column": self.quote_name(old_field.column),
1455 "new_column": self.quote_name(new_field.column),
1456 "type": new_type,
1457 }
1458
1459 def _create_fk_sql(
1460 self, model: type[Model], field: ForeignKey, suffix: str
1461 ) -> Statement:
1462 table = Table(model.model_options.db_table, self.quote_name)
1463 name = self._fk_constraint_name(model, field, suffix)
1464 column = Columns(model.model_options.db_table, [field.column], self.quote_name)
1465 to_table = Table(
1466 field.target_field.model.model_options.db_table, self.quote_name
1467 )
1468 to_column = Columns(
1469 field.target_field.model.model_options.db_table,
1470 [field.target_field.column], # type: ignore[attr-defined]
1471 self.quote_name,
1472 )
1473 deferrable = self.connection.ops.deferrable_sql()
1474 return Statement(
1475 self.sql_create_fk,
1476 table=table,
1477 name=name,
1478 column=column,
1479 to_table=to_table,
1480 to_column=to_column,
1481 deferrable=deferrable,
1482 )
1483
1484 def _fk_constraint_name(
1485 self, model: type[Model], field: ForeignKey, suffix: str
1486 ) -> ForeignKeyName:
1487 def create_fk_name(*args: Any, **kwargs: Any) -> str:
1488 return self.quote_name(self._create_index_name(*args, **kwargs))
1489
1490 return ForeignKeyName(
1491 model.model_options.db_table,
1492 [field.column],
1493 split_identifier(field.target_field.model.model_options.db_table)[1],
1494 [field.target_field.column], # type: ignore[attr-defined]
1495 suffix,
1496 create_fk_name,
1497 )
1498
1499 def _delete_fk_sql(self, model: type[Model], name: str) -> Statement:
1500 return self._delete_constraint_sql(self.sql_delete_fk, model, name)
1501
1502 def _deferrable_constraint_sql(self, deferrable: Deferrable | None) -> str:
1503 if deferrable is None:
1504 return ""
1505 if deferrable == Deferrable.DEFERRED:
1506 return " DEFERRABLE INITIALLY DEFERRED"
1507 if deferrable == Deferrable.IMMEDIATE:
1508 return " DEFERRABLE INITIALLY IMMEDIATE"
1509 return ""
1510
1511 def _unique_sql(
1512 self,
1513 model: type[Model],
1514 fields: Iterable[Field],
1515 name: str,
1516 condition: str | None = None,
1517 deferrable: Deferrable | None = None,
1518 include: list[str] | None = None,
1519 opclasses: tuple[str, ...] | None = None,
1520 expressions: Any = None,
1521 ) -> str | None:
1522 if (
1523 deferrable
1524 and not self.connection.features.supports_deferrable_unique_constraints
1525 ):
1526 return None
1527 if condition or include or opclasses or expressions:
1528 # Databases support conditional, covering, and functional unique
1529 # constraints via a unique index.
1530 sql = self._create_unique_sql(
1531 model,
1532 fields,
1533 name=name,
1534 condition=condition,
1535 include=include,
1536 opclasses=opclasses,
1537 expressions=expressions,
1538 )
1539 if sql:
1540 self.deferred_sql.append(sql)
1541 return None
1542 constraint = self.sql_unique_constraint % {
1543 "columns": ", ".join([self.quote_name(field.column) for field in fields]),
1544 "deferrable": self._deferrable_constraint_sql(deferrable),
1545 }
1546 return self.sql_constraint % {
1547 "name": self.quote_name(name),
1548 "constraint": constraint,
1549 }
1550
1551 def _create_unique_sql(
1552 self,
1553 model: type[Model],
1554 fields: Iterable[Field],
1555 name: str | None = None,
1556 condition: str | None = None,
1557 deferrable: Deferrable | None = None,
1558 include: list[str] | None = None,
1559 opclasses: tuple[str, ...] | None = None,
1560 expressions: Any = None,
1561 ) -> Statement | None:
1562 if (
1563 (
1564 deferrable
1565 and not self.connection.features.supports_deferrable_unique_constraints
1566 )
1567 or (condition and not self.connection.features.supports_partial_indexes)
1568 or (include and not self.connection.features.supports_covering_indexes)
1569 or (
1570 expressions and not self.connection.features.supports_expression_indexes
1571 )
1572 ):
1573 return None
1574
1575 compiler = Query(model, alias_cols=False).get_compiler()
1576 table = model.model_options.db_table
1577 columns = [field.column for field in fields]
1578 constraint_name: IndexName | str
1579 if name is None:
1580 constraint_name = self._unique_constraint_name(table, columns, quote=True)
1581 else:
1582 constraint_name = self.quote_name(name)
1583 if condition or include or opclasses or expressions:
1584 sql = self.sql_create_unique_index
1585 else:
1586 sql = self.sql_create_unique
1587 if columns:
1588 columns_obj: Columns | Expressions = self._index_columns(
1589 table, columns, col_suffixes=(), opclasses=opclasses or ()
1590 )
1591 else:
1592 columns_obj = Expressions(table, expressions, compiler, self.quote_value)
1593 return Statement(
1594 sql,
1595 table=Table(table, self.quote_name),
1596 name=constraint_name,
1597 columns=columns_obj,
1598 condition=self._index_condition_sql(condition),
1599 deferrable=self._deferrable_constraint_sql(deferrable),
1600 include=self._index_include_sql(model, include),
1601 )
1602
1603 def _unique_constraint_name(
1604 self, table: str, columns: list[str], quote: bool = True
1605 ) -> IndexName | str:
1606 if quote:
1607
1608 def create_unique_name(*args: Any, **kwargs: Any) -> str:
1609 return self.quote_name(self._create_index_name(*args, **kwargs))
1610
1611 else:
1612 create_unique_name = self._create_index_name
1613
1614 return IndexName(table, columns, "_uniq", create_unique_name)
1615
1616 def _delete_unique_sql(
1617 self,
1618 model: type[Model],
1619 name: str,
1620 condition: str | None = None,
1621 deferrable: Deferrable | None = None,
1622 include: list[str] | None = None,
1623 opclasses: tuple[str, ...] | None = None,
1624 expressions: Any = None,
1625 ) -> Statement | None:
1626 if (
1627 (
1628 deferrable
1629 and not self.connection.features.supports_deferrable_unique_constraints
1630 )
1631 or (condition and not self.connection.features.supports_partial_indexes)
1632 or (include and not self.connection.features.supports_covering_indexes)
1633 or (
1634 expressions and not self.connection.features.supports_expression_indexes
1635 )
1636 ):
1637 return None
1638 if condition or include or opclasses or expressions:
1639 sql = self.sql_delete_index
1640 else:
1641 sql = self.sql_delete_unique
1642 return self._delete_constraint_sql(sql, model, name)
1643
1644 def _check_sql(self, name: str, check: str) -> str:
1645 return self.sql_constraint % {
1646 "name": self.quote_name(name),
1647 "constraint": self.sql_check_constraint % {"check": check},
1648 }
1649
1650 def _create_check_sql(
1651 self, model: type[Model], name: str, check: str
1652 ) -> Statement | None:
1653 if not self.connection.features.supports_table_check_constraints:
1654 return None
1655 return Statement(
1656 self.sql_create_check,
1657 table=Table(model.model_options.db_table, self.quote_name),
1658 name=self.quote_name(name),
1659 check=check,
1660 )
1661
1662 def _delete_check_sql(self, model: type[Model], name: str) -> Statement | None:
1663 if not self.connection.features.supports_table_check_constraints:
1664 return None
1665 return self._delete_constraint_sql(self.sql_delete_check, model, name)
1666
1667 def _delete_constraint_sql(
1668 self, template: str, model: type[Model], name: str
1669 ) -> Statement:
1670 return Statement(
1671 template,
1672 table=Table(model.model_options.db_table, self.quote_name),
1673 name=self.quote_name(name),
1674 )
1675
1676 def _constraint_names(
1677 self,
1678 model: type[Model],
1679 column_names: list[str] | None = None,
1680 unique: bool | None = None,
1681 primary_key: bool | None = None,
1682 index: bool | None = None,
1683 foreign_key: bool | None = None,
1684 check: bool | None = None,
1685 type_: str | None = None,
1686 exclude: set[str] | None = None,
1687 ) -> list[str]:
1688 """Return all constraint names matching the columns and conditions."""
1689 if column_names is not None:
1690 column_names = [
1691 self.connection.introspection.identifier_converter(
1692 truncate_name(name, self.connection.ops.max_name_length())
1693 )
1694 if self.connection.features.truncates_names
1695 else self.connection.introspection.identifier_converter(name)
1696 for name in column_names
1697 ]
1698 with self.connection.cursor() as cursor:
1699 constraints = self.connection.introspection.get_constraints(
1700 cursor, model.model_options.db_table
1701 )
1702 result: list[str] = []
1703 for name, infodict in constraints.items():
1704 if column_names is None or column_names == infodict["columns"]:
1705 if unique is not None and infodict["unique"] != unique:
1706 continue
1707 if primary_key is not None and infodict["primary_key"] != primary_key:
1708 continue
1709 if index is not None and infodict["index"] != index:
1710 continue
1711 if check is not None and infodict["check"] != check:
1712 continue
1713 if foreign_key is not None and not infodict["foreign_key"]:
1714 continue
1715 if type_ is not None and infodict["type"] != type_:
1716 continue
1717 if not exclude or name not in exclude:
1718 result.append(name)
1719 return result
1720
1721 def _delete_primary_key(self, model: type[Model], strict: bool = False) -> None:
1722 constraint_names = self._constraint_names(model, primary_key=True)
1723 if strict and len(constraint_names) != 1:
1724 raise ValueError(
1725 f"Found wrong number ({len(constraint_names)}) of PK constraints for {model.model_options.db_table}"
1726 )
1727 for constraint_name in constraint_names:
1728 self.execute(self._delete_primary_key_sql(model, constraint_name))
1729
1730 def _create_primary_key_sql(self, model: type[Model], field: Field) -> Statement:
1731 return Statement(
1732 self.sql_create_pk,
1733 table=Table(model.model_options.db_table, self.quote_name),
1734 name=self.quote_name(
1735 self._create_index_name(
1736 model.model_options.db_table, [field.column], suffix="_pk"
1737 )
1738 ),
1739 columns=Columns(
1740 model.model_options.db_table, [field.column], self.quote_name
1741 ),
1742 )
1743
1744 def _delete_primary_key_sql(self, model: type[Model], name: str) -> Statement:
1745 return self._delete_constraint_sql(self.sql_delete_pk, model, name)
1746
1747 def _collate_sql(
1748 self,
1749 collation: str | None,
1750 old_collation: str | None = None,
1751 table_name: str | None = None,
1752 ) -> str:
1753 return "COLLATE " + self.quote_name(collation) if collation else ""