1from __future__ import annotations
2
3import logging
4import operator
5from abc import ABC, abstractmethod
6from collections.abc import Generator
7from datetime import datetime
8from typing import TYPE_CHECKING, Any
9
10if TYPE_CHECKING:
11 from typing import Self
12
13from plain.models.backends.ddl_references import (
14 Columns,
15 Expressions,
16 ForeignKeyName,
17 IndexName,
18 Statement,
19 Table,
20)
21from plain.models.backends.utils import names_digest, split_identifier, truncate_name
22from plain.models.constraints import Deferrable
23from plain.models.fields import DbParameters, Field
24from plain.models.fields.related import ForeignKeyField, RelatedField
25from plain.models.fields.reverse_related import ForeignObjectRel, ManyToManyRel
26from plain.models.indexes import Index
27from plain.models.sql import Query
28from plain.models.transaction import TransactionManagementError, atomic
29from plain.utils import timezone
30
31if TYPE_CHECKING:
32 from collections.abc import Iterable
33
34 from plain.models.backends.base.base import BaseDatabaseWrapper
35 from plain.models.base import Model
36 from plain.models.constraints import BaseConstraint
37 from plain.models.fields import Field
38 from plain.models.fields.related import ForeignKeyField, ManyToManyField
39 from plain.models.fields.reverse_related import ManyToManyRel
40
41logger = logging.getLogger("plain.models.backends.schema")
42
43
44def _is_relevant_relation(relation: ForeignObjectRel, altered_field: Field) -> bool:
45 """
46 When altering the given field, must constraints on its model from the given
47 relation be temporarily dropped?
48 """
49 from plain.models.fields.related import ManyToManyField
50
51 field = relation.field
52 if isinstance(field, ManyToManyField):
53 # M2M reverse field
54 return False
55 if altered_field.primary_key:
56 # Foreign key constraint on the primary key, which is being altered.
57 return True
58 # ForeignKeyField always targets 'id'
59 return altered_field.name == "id"
60
61
62def _all_related_fields(model: type[Model]) -> list[ForeignObjectRel]:
63 # Related fields must be returned in a deterministic order.
64 return sorted(
65 model._model_meta._get_fields(
66 forward=False,
67 reverse=True,
68 ),
69 key=operator.attrgetter("name"),
70 )
71
72
73def _related_non_m2m_objects(
74 old_field: Field, new_field: Field
75) -> Generator[tuple[ForeignObjectRel, ForeignObjectRel], None, None]:
76 # Filter out m2m objects from reverse relations.
77 # Return (old_relation, new_relation) tuples.
78 related_fields = zip(
79 (
80 obj
81 for obj in _all_related_fields(old_field.model)
82 if _is_relevant_relation(obj, old_field)
83 ),
84 (
85 obj
86 for obj in _all_related_fields(new_field.model)
87 if _is_relevant_relation(obj, new_field)
88 ),
89 )
90 for old_rel, new_rel in related_fields:
91 yield old_rel, new_rel
92 yield from _related_non_m2m_objects(
93 old_rel.remote_field,
94 new_rel.remote_field,
95 )
96
97
98class BaseDatabaseSchemaEditor(ABC):
99 """
100 This class and its subclasses are responsible for emitting schema-changing
101 statements to the databases - model creation/removal/alteration, field
102 renaming, index fiddling, and so on.
103 """
104
105 # Overrideable SQL templates
106 sql_create_table = "CREATE TABLE %(table)s (%(definition)s)"
107 sql_rename_table = "ALTER TABLE %(old_table)s RENAME TO %(new_table)s"
108 sql_delete_table = "DROP TABLE %(table)s CASCADE"
109
110 sql_create_column = "ALTER TABLE %(table)s ADD COLUMN %(column)s %(definition)s"
111 sql_alter_column = "ALTER TABLE %(table)s %(changes)s"
112 sql_alter_column_type = "ALTER COLUMN %(column)s TYPE %(type)s%(collation)s"
113 sql_alter_column_null = "ALTER COLUMN %(column)s DROP NOT NULL"
114 sql_alter_column_not_null = "ALTER COLUMN %(column)s SET NOT NULL"
115 sql_alter_column_default = "ALTER COLUMN %(column)s SET DEFAULT %(default)s"
116 sql_alter_column_no_default = "ALTER COLUMN %(column)s DROP DEFAULT"
117 sql_alter_column_no_default_null = sql_alter_column_no_default
118 sql_delete_column = "ALTER TABLE %(table)s DROP COLUMN %(column)s CASCADE"
119 sql_rename_column = (
120 "ALTER TABLE %(table)s RENAME COLUMN %(old_column)s TO %(new_column)s"
121 )
122 sql_update_with_default = (
123 "UPDATE %(table)s SET %(column)s = %(default)s WHERE %(column)s IS NULL"
124 )
125
126 sql_unique_constraint = "UNIQUE (%(columns)s)%(deferrable)s"
127 sql_check_constraint = "CHECK (%(check)s)"
128 sql_delete_constraint = "ALTER TABLE %(table)s DROP CONSTRAINT %(name)s"
129 sql_constraint = "CONSTRAINT %(name)s %(constraint)s"
130
131 sql_create_check = "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s CHECK (%(check)s)"
132 sql_delete_check = sql_delete_constraint
133
134 sql_create_unique = (
135 "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s "
136 "UNIQUE (%(columns)s)%(deferrable)s"
137 )
138 sql_delete_unique = sql_delete_constraint
139
140 sql_create_fk = (
141 "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s FOREIGN KEY (%(column)s) "
142 "REFERENCES %(to_table)s (%(to_column)s)%(deferrable)s"
143 )
144 sql_create_inline_fk = None
145 sql_create_column_inline_fk = None
146 sql_delete_fk = sql_delete_constraint
147
148 sql_create_index = (
149 "CREATE INDEX %(name)s ON %(table)s "
150 "(%(columns)s)%(include)s%(extra)s%(condition)s"
151 )
152 sql_create_unique_index = (
153 "CREATE UNIQUE INDEX %(name)s ON %(table)s "
154 "(%(columns)s)%(include)s%(condition)s"
155 )
156 sql_rename_index = "ALTER INDEX %(old_name)s RENAME TO %(new_name)s"
157 sql_delete_index = "DROP INDEX %(name)s"
158
159 sql_create_pk = (
160 "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s PRIMARY KEY (%(columns)s)"
161 )
162 sql_delete_pk = sql_delete_constraint
163
164 sql_alter_table_comment = "COMMENT ON TABLE %(table)s IS %(comment)s"
165 sql_alter_column_comment = "COMMENT ON COLUMN %(table)s.%(column)s IS %(comment)s"
166
167 def __init__(
168 self,
169 connection: BaseDatabaseWrapper,
170 atomic: bool = True,
171 ):
172 self.connection = connection
173 self.atomic_migration = self.connection.features.can_rollback_ddl and atomic
174
175 # State-managing methods
176
177 def __enter__(self) -> Self:
178 self.deferred_sql: list[Any] = []
179 self.executed_sql: list[str] = []
180 if self.atomic_migration:
181 self.atomic = atomic()
182 self.atomic.__enter__()
183 return self
184
185 def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
186 if exc_type is None:
187 for sql in self.deferred_sql:
188 self.execute(sql)
189 if self.atomic_migration:
190 self.atomic.__exit__(exc_type, exc_value, traceback)
191
192 # Core utility functions
193
194 def execute(
195 self, sql: str | Statement, params: tuple[Any, ...] | list[Any] | None = ()
196 ) -> None:
197 """Execute the given SQL statement, with optional parameters."""
198 if (
199 self.connection.in_atomic_block
200 and not self.connection.features.can_rollback_ddl
201 ):
202 raise TransactionManagementError(
203 "Executing DDL statements while in a transaction on databases "
204 "that can't perform a rollback is prohibited."
205 )
206 # Account for non-string statement objects.
207 sql = str(sql)
208 # Log the command we're running, then run it
209 logger.debug(
210 "%s; (params %r)", sql, params, extra={"params": params, "sql": sql}
211 )
212
213 # Track executed SQL for display in migration output
214 # Store the SQL for display (interpolate params for readability)
215 if params:
216 self.executed_sql.append(sql % tuple(map(self.quote_value, params)))
217 else:
218 self.executed_sql.append(sql)
219
220 with self.connection.cursor() as cursor:
221 cursor.execute(sql, params)
222
223 def quote_name(self, name: str) -> str:
224 return self.connection.ops.quote_name(name)
225
226 def table_sql(self, model: type[Model]) -> tuple[str, list[Any]]:
227 """Take a model and return its table definition."""
228 # Create column SQL, add FK deferreds if needed.
229 column_sqls = []
230 params = []
231 for field in model._model_meta.local_fields:
232 # SQL.
233 definition, extra_params = self.column_sql(model, field)
234 if definition is None:
235 continue
236 # Check constraints can go on the column SQL here.
237 db_params = field.db_parameters(connection=self.connection)
238 if db_params["check"]:
239 definition += " " + self.sql_check_constraint % db_params
240 # Autoincrement SQL (for backends with inline variant).
241 col_type_suffix = field.db_type_suffix(connection=self.connection)
242 if col_type_suffix:
243 definition += f" {col_type_suffix}"
244 if extra_params:
245 params.extend(extra_params)
246 # FK.
247 if isinstance(field, ForeignKeyField) and field.db_constraint:
248 to_table = field.remote_field.model.model_options.db_table
249 field_name = field.remote_field.field_name
250 if field_name is None:
251 raise ValueError("Foreign key field_name cannot be None")
252 to_field = field.remote_field.model._model_meta.get_forward_field(
253 field_name
254 )
255 to_column = to_field.column
256 if self.sql_create_inline_fk:
257 definition += " " + self.sql_create_inline_fk % {
258 "to_table": self.quote_name(to_table),
259 "to_column": self.quote_name(to_column),
260 }
261 elif self.connection.features.supports_foreign_keys:
262 self.deferred_sql.append(
263 self._create_fk_sql(
264 model, field, "_fk_%(to_table)s_%(to_column)s"
265 )
266 )
267 # Add the SQL to our big list.
268 column_sqls.append(f"{self.quote_name(field.column)} {definition}")
269 constraints = [
270 constraint.constraint_sql(model, self)
271 for constraint in model.model_options.constraints
272 ]
273 sql = self.sql_create_table % {
274 "table": self.quote_name(model.model_options.db_table),
275 "definition": ", ".join(
276 str(constraint)
277 for constraint in (*column_sqls, *constraints)
278 if constraint
279 ),
280 }
281 return sql, params
282
283 # Field <-> database mapping functions
284
285 def _iter_column_sql(
286 self,
287 column_db_type: str,
288 params: list[Any],
289 model: type[Model],
290 field: Field,
291 field_db_params: DbParameters,
292 include_default: bool,
293 ) -> Generator[str, None, None]:
294 yield column_db_type
295 if collation := field_db_params.get("collation"):
296 yield self._collate_sql(collation)
297 if self.connection.features.supports_comments_inline and field.db_comment:
298 yield self._comment_sql(field.db_comment)
299 # Work out nullability.
300 null = field.allow_null
301 # Include a default value, if requested.
302 include_default = (
303 include_default
304 and not self.skip_default(field)
305 and
306 # Don't include a default value if it's a nullable field and the
307 # default cannot be dropped in the ALTER COLUMN statement (e.g.
308 # MySQL longtext and longblob).
309 not (null and self.skip_default_on_alter(field))
310 )
311 if include_default:
312 default_value = self.effective_default(field)
313 if default_value is not None:
314 column_default = "DEFAULT " + self._column_default_sql(field)
315 if self.connection.features.requires_literal_defaults:
316 # Some databases can't take defaults as a parameter (Oracle).
317 # If this is the case, the individual schema backend should
318 # implement prepare_default().
319 yield column_default % self.prepare_default(default_value)
320 else:
321 yield column_default
322 params.append(default_value)
323
324 if not null:
325 yield "NOT NULL"
326 else:
327 yield "NULL"
328
329 if field.primary_key:
330 yield "PRIMARY KEY"
331
332 def column_sql(
333 self, model: type[Model], field: Field, include_default: bool = False
334 ) -> tuple[str | None, list[Any] | None]:
335 """
336 Return the column definition for a field. The field must already have
337 had set_attributes_from_name() called.
338 """
339 # Get the column's type and use that as the basis of the SQL.
340 field_db_params = field.db_parameters(connection=self.connection)
341 column_db_type = field_db_params["type"]
342 # Check for fields that aren't actually columns (e.g. M2M).
343 if column_db_type is None:
344 return None, None
345 params: list[Any] = []
346 return (
347 " ".join(
348 # This appends to the params being returned.
349 self._iter_column_sql(
350 column_db_type,
351 params,
352 model,
353 field,
354 field_db_params,
355 include_default,
356 )
357 ),
358 params,
359 )
360
361 def skip_default(self, field: Field) -> bool:
362 """
363 Some backends don't accept default values for certain columns types
364 (i.e. MySQL longtext and longblob).
365 """
366 return False
367
368 def skip_default_on_alter(self, field: Field) -> bool:
369 """
370 Some backends don't accept default values for certain columns types
371 (i.e. MySQL longtext and longblob) in the ALTER COLUMN statement.
372 """
373 return False
374
375 def prepare_default(self, value: Any) -> str:
376 """
377 Only used for backends which have requires_literal_defaults feature
378 """
379 raise NotImplementedError(
380 "subclasses of BaseDatabaseSchemaEditor for backends which have "
381 "requires_literal_defaults must provide a prepare_default() method"
382 )
383
384 def _column_default_sql(self, field: Field) -> str:
385 """
386 Return the SQL to use in a DEFAULT clause. The resulting string should
387 contain a '%s' placeholder for a default value.
388 """
389 return "%s"
390
391 @staticmethod
392 def _effective_default(field: Field) -> Any:
393 # This method allows testing its logic without a connection.
394 if field.has_default():
395 default = field.get_default()
396 elif (
397 not field.allow_null and not field.required and field.empty_strings_allowed
398 ):
399 if field.get_internal_type() == "BinaryField":
400 default = b""
401 else:
402 default = ""
403 elif getattr(field, "auto_now", False) or getattr(field, "auto_now_add", False):
404 internal_type = field.get_internal_type()
405 if internal_type == "DateTimeField":
406 default = timezone.now()
407 else:
408 default = datetime.now()
409 if internal_type == "DateField":
410 default = default.date()
411 elif internal_type == "TimeField":
412 default = default.time()
413 else:
414 default = None
415 return default
416
417 def effective_default(self, field: Field) -> Any:
418 """Return a field's effective database default value."""
419 return field.get_db_prep_save(self._effective_default(field), self.connection)
420
421 @abstractmethod
422 def quote_value(self, value: Any) -> str:
423 """
424 Return a quoted version of the value so it's safe to use in an SQL
425 string. This is not safe against injection from user code; it is
426 intended only for use in making SQL scripts or preparing default values
427 for particularly tricky backends (defaults are not user-defined, though,
428 so this is safe).
429 """
430 ...
431
432 # Actions
433
434 def create_model(self, model: type[Model]) -> None:
435 """
436 Create a table and any accompanying indexes or unique constraints for
437 the given `model`.
438 """
439 sql, params = self.table_sql(model)
440 # Prevent using [] as params, in the case a literal '%' is used in the
441 # definition.
442 self.execute(sql, params or None)
443
444 if self.connection.features.supports_comments:
445 # Add table comment.
446 if model.model_options.db_table_comment:
447 self.alter_db_table_comment(
448 model, None, model.model_options.db_table_comment
449 )
450 # Add column comments.
451 if not self.connection.features.supports_comments_inline:
452 for field in model._model_meta.local_fields:
453 if field.db_comment:
454 field_db_params = field.db_parameters(
455 connection=self.connection
456 )
457 field_type = field_db_params["type"]
458 assert field_type is not None
459 self.execute(
460 *self._alter_column_comment_sql(
461 model, field, field_type, field.db_comment
462 )
463 )
464 # Add any field index (deferred as SQLite _remake_table needs it).
465 self.deferred_sql.extend(self._model_indexes_sql(model))
466
467 def delete_model(self, model: type[Model]) -> None:
468 """Delete a model from the database."""
469
470 # Delete the table
471 self.execute(
472 self.sql_delete_table
473 % {
474 "table": self.quote_name(model.model_options.db_table),
475 }
476 )
477 # Remove all deferred statements referencing the deleted table.
478 for sql in list(self.deferred_sql):
479 if isinstance(sql, Statement) and sql.references_table(
480 model.model_options.db_table
481 ):
482 self.deferred_sql.remove(sql)
483
484 def add_index(self, model: type[Model], index: Index) -> None:
485 """Add an index on a model."""
486 if (
487 index.contains_expressions
488 and not self.connection.features.supports_expression_indexes
489 ):
490 return None
491 # Index.create_sql returns interpolated SQL which makes params=None a
492 # necessity to avoid escaping attempts on execution.
493 self.execute(index.create_sql(model, self), params=None)
494
495 def remove_index(self, model: type[Model], index: Index) -> None:
496 """Remove an index from a model."""
497 if (
498 index.contains_expressions
499 and not self.connection.features.supports_expression_indexes
500 ):
501 return None
502 self.execute(index.remove_sql(model, self))
503
504 def rename_index(
505 self, model: type[Model], old_index: Index, new_index: Index
506 ) -> None:
507 if self.connection.features.can_rename_index:
508 self.execute(
509 self._rename_index_sql(model, old_index.name, new_index.name),
510 params=None,
511 )
512 else:
513 self.remove_index(model, old_index)
514 self.add_index(model, new_index)
515
516 def add_constraint(self, model: type[Model], constraint: BaseConstraint) -> None:
517 """Add a constraint to a model."""
518 sql = constraint.create_sql(model, self)
519 if sql:
520 # Constraint.create_sql returns interpolated SQL which makes
521 # params=None a necessity to avoid escaping attempts on execution.
522 self.execute(sql, params=None)
523
524 def remove_constraint(self, model: type[Model], constraint: BaseConstraint) -> None:
525 """Remove a constraint from a model."""
526 sql = constraint.remove_sql(model, self)
527 if sql:
528 self.execute(sql)
529
530 def alter_db_table(
531 self, model: type[Model], old_db_table: str, new_db_table: str
532 ) -> None:
533 """Rename the table a model points to."""
534 if old_db_table == new_db_table or (
535 self.connection.features.ignores_table_name_case
536 and old_db_table.lower() == new_db_table.lower()
537 ):
538 return
539 self.execute(
540 self.sql_rename_table
541 % {
542 "old_table": self.quote_name(old_db_table),
543 "new_table": self.quote_name(new_db_table),
544 }
545 )
546 # Rename all references to the old table name.
547 for sql in self.deferred_sql:
548 if isinstance(sql, Statement):
549 sql.rename_table_references(old_db_table, new_db_table)
550
551 def alter_db_table_comment(
552 self,
553 model: type[Model],
554 old_db_table_comment: str | None,
555 new_db_table_comment: str | None,
556 ) -> None:
557 self.execute(
558 self.sql_alter_table_comment
559 % {
560 "table": self.quote_name(model.model_options.db_table),
561 "comment": self.quote_value(new_db_table_comment or ""),
562 }
563 )
564
565 def add_field(self, model: type[Model], field: Field) -> None:
566 """
567 Create a field on a model. Usually involves adding a column, but may
568 involve adding a table instead (for M2M fields).
569 """
570 # Get the column's definition
571 definition, params = self.column_sql(model, field, include_default=True)
572 # It might not actually have a column behind it
573 if definition is None:
574 return
575 if col_type_suffix := field.db_type_suffix(connection=self.connection):
576 definition += f" {col_type_suffix}"
577 # Check constraints can go on the column SQL here
578 db_params = field.db_parameters(connection=self.connection)
579 if db_params["check"]:
580 definition += " " + self.sql_check_constraint % db_params
581 if (
582 isinstance(field, ForeignKeyField)
583 and self.connection.features.supports_foreign_keys
584 and field.db_constraint
585 ):
586 constraint_suffix = "_fk_%(to_table)s_%(to_column)s"
587 # Add FK constraint inline, if supported.
588 if self.sql_create_column_inline_fk:
589 to_table = field.remote_field.model.model_options.db_table
590 field_name = field.remote_field.field_name
591 if field_name is None:
592 raise ValueError("Foreign key field_name cannot be None")
593 to_field = field.remote_field.model._model_meta.get_forward_field(
594 field_name
595 )
596 to_column = to_field.column
597 namespace, _ = split_identifier(model.model_options.db_table)
598 definition += " " + self.sql_create_column_inline_fk % {
599 "name": self._fk_constraint_name(model, field, constraint_suffix),
600 "namespace": f"{self.quote_name(namespace)}." if namespace else "",
601 "column": self.quote_name(field.column),
602 "to_table": self.quote_name(to_table),
603 "to_column": self.quote_name(to_column),
604 "deferrable": self.connection.ops.deferrable_sql(),
605 }
606 # Otherwise, add FK constraints later.
607 else:
608 self.deferred_sql.append(
609 self._create_fk_sql(model, field, constraint_suffix)
610 )
611 # Build the SQL and run it
612 sql = self.sql_create_column % {
613 "table": self.quote_name(model.model_options.db_table),
614 "column": self.quote_name(field.column),
615 "definition": definition,
616 }
617 self.execute(sql, params)
618 # Drop the default if we need to
619 # (Plain usually does not use in-database defaults)
620 if (
621 not self.skip_default_on_alter(field)
622 and self.effective_default(field) is not None
623 ):
624 changes_sql, params = self._alter_column_default_sql(
625 model, None, field, drop=True
626 )
627 sql = self.sql_alter_column % {
628 "table": self.quote_name(model.model_options.db_table),
629 "changes": changes_sql,
630 }
631 self.execute(sql, params)
632 # Add field comment, if required.
633 if (
634 field.db_comment
635 and self.connection.features.supports_comments
636 and not self.connection.features.supports_comments_inline
637 ):
638 field_type = db_params["type"]
639 assert field_type is not None
640 self.execute(
641 *self._alter_column_comment_sql(
642 model, field, field_type, field.db_comment
643 )
644 )
645 # Add an index, if required
646 self.deferred_sql.extend(self._field_indexes_sql(model, field))
647
648 def remove_field(self, model: type[Model], field: Field) -> None:
649 """
650 Remove a field from a model. Usually involves deleting a column,
651 but for M2Ms may involve deleting a table.
652 """
653 # It might not actually have a column behind it
654 if field.db_parameters(connection=self.connection)["type"] is None:
655 return
656 # Drop any FK constraints, MySQL requires explicit deletion
657 if isinstance(field, RelatedField):
658 fk_names = self._constraint_names(model, [field.column], foreign_key=True)
659 for fk_name in fk_names:
660 self.execute(self._delete_fk_sql(model, fk_name))
661 # Delete the column
662 sql = self.sql_delete_column % {
663 "table": self.quote_name(model.model_options.db_table),
664 "column": self.quote_name(field.column),
665 }
666 self.execute(sql)
667 # Remove all deferred statements referencing the deleted column.
668 for sql in list(self.deferred_sql):
669 if isinstance(sql, Statement) and sql.references_column(
670 model.model_options.db_table, field.column
671 ):
672 self.deferred_sql.remove(sql)
673
674 def alter_field(
675 self,
676 model: type[Model],
677 old_field: Field,
678 new_field: Field,
679 strict: bool = False,
680 ) -> None:
681 """
682 Allow a field's type, uniqueness, nullability, default, column,
683 constraints, etc. to be modified.
684 `old_field` is required to compute the necessary changes.
685 If `strict` is True, raise errors if the old column does not match
686 `old_field` precisely.
687 """
688 if not self._field_should_be_altered(old_field, new_field):
689 return
690 # Ensure this field is even column-based
691 old_db_params = old_field.db_parameters(connection=self.connection)
692 old_type = old_db_params["type"]
693 new_db_params = new_field.db_parameters(connection=self.connection)
694 new_type = new_db_params["type"]
695 if (old_type is None and not isinstance(old_field, RelatedField)) or (
696 new_type is None and not isinstance(new_field, RelatedField)
697 ):
698 raise ValueError(
699 f"Cannot alter field {old_field} into {new_field} - they do not properly define "
700 "db_type (are you using a badly-written custom field?)",
701 )
702 elif (
703 old_type is None
704 and new_type is None
705 and isinstance(old_field, RelatedField)
706 and isinstance(old_field.remote_field, ManyToManyRel)
707 and isinstance(new_field, RelatedField)
708 and isinstance(new_field.remote_field, ManyToManyRel)
709 ):
710 # Both sides have through models; this is a no-op.
711 return
712 elif old_type is None or new_type is None:
713 raise ValueError(
714 f"Cannot alter field {old_field} into {new_field} - they are not compatible types "
715 "(you cannot alter to or from M2M fields, or add or remove "
716 "through= on M2M fields)"
717 )
718
719 self._alter_field(
720 model,
721 old_field,
722 new_field,
723 old_type,
724 new_type,
725 old_db_params,
726 new_db_params,
727 strict,
728 )
729
730 def _field_db_check(
731 self, field: Field, field_db_params: DbParameters
732 ) -> str | None:
733 # Always check constraints with the same mocked column name to avoid
734 # recreating constrains when the column is renamed.
735 check_constraints = self.connection.data_type_check_constraints
736 data = field.db_type_parameters(self.connection)
737 data["column"] = "__column_name__"
738 try:
739 return check_constraints[field.get_internal_type()] % data
740 except KeyError:
741 return None
742
743 def _alter_field(
744 self,
745 model: type[Model],
746 old_field: Field,
747 new_field: Field,
748 old_type: str,
749 new_type: str,
750 old_db_params: DbParameters,
751 new_db_params: DbParameters,
752 strict: bool = False,
753 ) -> None:
754 """Perform a "physical" (non-ManyToMany) field update."""
755 # Drop any FK constraints, we'll remake them later
756 fks_dropped = set()
757 if (
758 self.connection.features.supports_foreign_keys
759 and isinstance(old_field, ForeignKeyField)
760 and old_field.db_constraint
761 and self._field_should_be_altered(
762 old_field,
763 new_field,
764 ignore={"db_comment"},
765 )
766 ):
767 fk_names = self._constraint_names(
768 model, [old_field.column], foreign_key=True
769 )
770 if strict and len(fk_names) != 1:
771 raise ValueError(
772 f"Found wrong number ({len(fk_names)}) of foreign key constraints for {model.model_options.db_table}.{old_field.column}"
773 )
774 for fk_name in fk_names:
775 fks_dropped.add((old_field.column,))
776 self.execute(self._delete_fk_sql(model, fk_name))
777 # Has unique been removed?
778 if old_field.primary_key and (
779 not new_field.primary_key
780 or self._field_became_primary_key(old_field, new_field)
781 ):
782 # Find the unique constraint for this field
783 meta_constraint_names = {
784 constraint.name for constraint in model.model_options.constraints
785 }
786 constraint_names = self._constraint_names(
787 model,
788 [old_field.column],
789 unique=True,
790 primary_key=False,
791 exclude=meta_constraint_names,
792 )
793 if strict and len(constraint_names) != 1:
794 raise ValueError(
795 f"Found wrong number ({len(constraint_names)}) of unique constraints for {model.model_options.db_table}.{old_field.column}"
796 )
797 for constraint_name in constraint_names:
798 sql = self._delete_unique_sql(model, constraint_name)
799 if sql is not None:
800 self.execute(sql)
801 # Drop incoming FK constraints if the field is a primary key or unique,
802 # which might be a to_field target, and things are going to change.
803 old_collation = old_db_params.get("collation")
804 new_collation = new_db_params.get("collation")
805 drop_foreign_keys = (
806 self.connection.features.supports_foreign_keys
807 and (old_field.primary_key and new_field.primary_key)
808 and ((old_type != new_type) or (old_collation != new_collation))
809 )
810 if drop_foreign_keys:
811 # '_model_meta.related_field' also contains M2M reverse fields, these
812 # will be filtered out
813 for _old_rel, new_rel in _related_non_m2m_objects(old_field, new_field):
814 rel_fk_names = self._constraint_names(
815 new_rel.related_model, [new_rel.field.column], foreign_key=True
816 )
817 for fk_name in rel_fk_names:
818 self.execute(self._delete_fk_sql(new_rel.related_model, fk_name))
819 # Removed an index? (no strict check, as multiple indexes are possible)
820 # Remove indexes if db_index switched to False or a unique constraint
821 # will now be used in lieu of an index. The following lines from the
822 # truth table show all True cases; the rest are False:
823 #
824 # old_field.db_index | old_field.primary_key | new_field.db_index | new_field.primary_key
825 # ------------------------------------------------------------------------------
826 # True | False | False | False
827 # True | False | False | True
828 # True | False | True | True
829 if (
830 isinstance(old_field, ForeignKeyField)
831 and old_field.db_index
832 and not old_field.primary_key
833 and (
834 not (isinstance(new_field, ForeignKeyField) and new_field.db_index)
835 or new_field.primary_key
836 )
837 ):
838 # Find the index for this field
839 meta_index_names = {index.name for index in model.model_options.indexes}
840 # Retrieve only BTREE indexes since this is what's created with
841 # db_index=True.
842 index_names = self._constraint_names(
843 model,
844 [old_field.column],
845 index=True,
846 type_=Index.suffix,
847 exclude=meta_index_names,
848 )
849 for index_name in index_names:
850 # The only way to check if an index was created with
851 # db_index=True or with Index(['field'], name='foo')
852 # is to look at its name (refs #28053).
853 self.execute(self._delete_index_sql(model, index_name))
854 # Change check constraints?
855 old_db_check = self._field_db_check(old_field, old_db_params)
856 new_db_check = self._field_db_check(new_field, new_db_params)
857 if old_db_check != new_db_check and old_db_check:
858 meta_constraint_names = {
859 constraint.name for constraint in model.model_options.constraints
860 }
861 constraint_names = self._constraint_names(
862 model,
863 [old_field.column],
864 check=True,
865 exclude=meta_constraint_names,
866 )
867 if strict and len(constraint_names) != 1:
868 raise ValueError(
869 f"Found wrong number ({len(constraint_names)}) of check constraints for {model.model_options.db_table}.{old_field.column}"
870 )
871 for constraint_name in constraint_names:
872 sql = self._delete_check_sql(model, constraint_name)
873 if sql is not None:
874 self.execute(sql)
875 # Have they renamed the column?
876 if old_field.column != new_field.column:
877 self.execute(
878 self._rename_field_sql(
879 model.model_options.db_table, old_field, new_field, new_type
880 )
881 )
882 # Rename all references to the renamed column.
883 for sql in self.deferred_sql:
884 if isinstance(sql, Statement):
885 sql.rename_column_references(
886 model.model_options.db_table, old_field.column, new_field.column
887 )
888 # Next, start accumulating actions to do
889 actions = []
890 null_actions = []
891 post_actions = []
892 # Type suffix change? (e.g. auto increment).
893 old_type_suffix = old_field.db_type_suffix(connection=self.connection)
894 new_type_suffix = new_field.db_type_suffix(connection=self.connection)
895 # Type, collation, or comment change?
896 if (
897 old_type != new_type
898 or old_type_suffix != new_type_suffix
899 or old_collation != new_collation
900 or (
901 self.connection.features.supports_comments
902 and old_field.db_comment != new_field.db_comment
903 )
904 ):
905 fragment, other_actions = self._alter_column_type_sql(
906 model, old_field, new_field, new_type, old_collation, new_collation
907 )
908 actions.append(fragment)
909 post_actions.extend(other_actions)
910 # When changing a column NULL constraint to NOT NULL with a given
911 # default value, we need to perform 4 steps:
912 # 1. Add a default for new incoming writes
913 # 2. Update existing NULL rows with new default
914 # 3. Replace NULL constraint with NOT NULL
915 # 4. Drop the default again.
916 # Default change?
917 needs_database_default = False
918 if old_field.allow_null and not new_field.allow_null:
919 old_default = self.effective_default(old_field)
920 new_default = self.effective_default(new_field)
921 if (
922 not self.skip_default_on_alter(new_field)
923 and old_default != new_default
924 and new_default is not None
925 ):
926 needs_database_default = True
927 actions.append(
928 self._alter_column_default_sql(model, old_field, new_field)
929 )
930 # Nullability change?
931 if old_field.allow_null != new_field.allow_null:
932 fragment = self._alter_column_null_sql(model, old_field, new_field)
933 if fragment:
934 null_actions.append(fragment)
935 # Only if we have a default and there is a change from NULL to NOT NULL
936 four_way_default_alteration = new_field.has_default() and (
937 old_field.allow_null and not new_field.allow_null
938 )
939 if actions or null_actions:
940 if not four_way_default_alteration:
941 # If we don't have to do a 4-way default alteration we can
942 # directly run a (NOT) NULL alteration
943 actions += null_actions
944 # Combine actions together if we can (e.g. postgres)
945 if self.connection.features.supports_combined_alters and actions:
946 sql, params = tuple(zip(*actions))
947 actions = [(", ".join(sql), sum(params, []))]
948 # Apply those actions
949 for sql, params in actions:
950 self.execute(
951 self.sql_alter_column
952 % {
953 "table": self.quote_name(model.model_options.db_table),
954 "changes": sql,
955 },
956 params,
957 )
958 if four_way_default_alteration:
959 # Update existing rows with default value
960 self.execute(
961 self.sql_update_with_default
962 % {
963 "table": self.quote_name(model.model_options.db_table),
964 "column": self.quote_name(new_field.column),
965 "default": "%s",
966 },
967 [new_default],
968 )
969 # Since we didn't run a NOT NULL change before we need to do it
970 # now
971 for sql, params in null_actions:
972 self.execute(
973 self.sql_alter_column
974 % {
975 "table": self.quote_name(model.model_options.db_table),
976 "changes": sql,
977 },
978 params,
979 )
980 if post_actions:
981 for sql, params in post_actions:
982 self.execute(sql, params)
983 # If primary_key changed to False, delete the primary key constraint.
984 if old_field.primary_key and not new_field.primary_key:
985 self._delete_primary_key(model, strict)
986
987 # Added an index? Add an index if db_index switched to True or a unique
988 # constraint will no longer be used in lieu of an index. The following
989 # lines from the truth table show all True cases; the rest are False:
990 #
991 # old_field.db_index | old_field.primary_key | new_field.db_index | new_field.primary_key
992 # ------------------------------------------------------------------------------
993 # False | False | True | False
994 # False | True | True | False
995 # True | True | True | False
996 if (
997 (
998 not (isinstance(old_field, ForeignKeyField) and old_field.db_index)
999 or old_field.primary_key
1000 )
1001 and isinstance(new_field, ForeignKeyField)
1002 and new_field.db_index
1003 and not new_field.primary_key
1004 ):
1005 self.execute(self._create_index_sql(model, fields=[new_field]))
1006 # Type alteration on primary key? Then we need to alter the column
1007 # referring to us.
1008 rels_to_update = []
1009 if drop_foreign_keys:
1010 rels_to_update.extend(_related_non_m2m_objects(old_field, new_field))
1011 # Changed to become primary key?
1012 if self._field_became_primary_key(old_field, new_field):
1013 # Make the new one
1014 self.execute(self._create_primary_key_sql(model, new_field))
1015 # Update all referencing columns
1016 rels_to_update.extend(_related_non_m2m_objects(old_field, new_field))
1017 # Handle our type alters on the other end of rels from the PK stuff above
1018 for old_rel, new_rel in rels_to_update:
1019 rel_db_params = new_rel.field.db_parameters(connection=self.connection)
1020 rel_type = rel_db_params["type"]
1021 rel_collation = rel_db_params.get("collation")
1022 old_rel_db_params = old_rel.field.db_parameters(connection=self.connection)
1023 old_rel_collation = old_rel_db_params.get("collation")
1024 fragment, other_actions = self._alter_column_type_sql(
1025 new_rel.related_model,
1026 old_rel.field,
1027 new_rel.field,
1028 rel_type,
1029 old_rel_collation,
1030 rel_collation,
1031 )
1032 self.execute(
1033 self.sql_alter_column
1034 % {
1035 "table": self.quote_name(
1036 new_rel.related_model.model_options.db_table
1037 ),
1038 "changes": fragment[0],
1039 },
1040 fragment[1],
1041 )
1042 for sql, params in other_actions:
1043 self.execute(sql, params)
1044 # Does it have a foreign key?
1045 if (
1046 self.connection.features.supports_foreign_keys
1047 and isinstance(new_field, ForeignKeyField)
1048 and (
1049 fks_dropped
1050 or not isinstance(old_field, ForeignKeyField)
1051 or not old_field.db_constraint
1052 )
1053 and new_field.db_constraint
1054 ):
1055 self.execute(
1056 self._create_fk_sql(model, new_field, "_fk_%(to_table)s_%(to_column)s")
1057 )
1058 # Rebuild FKs that pointed to us if we previously had to drop them
1059 if drop_foreign_keys:
1060 for _, rel in rels_to_update:
1061 if isinstance(rel.field, ForeignKeyField) and rel.field.db_constraint:
1062 self.execute(
1063 self._create_fk_sql(rel.related_model, rel.field, "_fk")
1064 )
1065 # Does it have check constraints we need to add?
1066 if old_db_check != new_db_check and new_db_check:
1067 constraint_name = self._create_index_name(
1068 model.model_options.db_table, [new_field.column], suffix="_check"
1069 )
1070 new_check = new_db_params["check"]
1071 assert new_check is not None # Guaranteed by new_db_check check above
1072 sql = self._create_check_sql(model, constraint_name, new_check)
1073 if sql is not None:
1074 self.execute(sql)
1075 # Drop the default if we need to
1076 # (Plain usually does not use in-database defaults)
1077 if needs_database_default:
1078 changes_sql, params = self._alter_column_default_sql(
1079 model, old_field, new_field, drop=True
1080 )
1081 sql = self.sql_alter_column % {
1082 "table": self.quote_name(model.model_options.db_table),
1083 "changes": changes_sql,
1084 }
1085 self.execute(sql, params)
1086
1087 def _alter_column_null_sql(
1088 self, model: type[Model], old_field: Field, new_field: Field
1089 ) -> tuple[str, list[Any]]:
1090 """
1091 Hook to specialize column null alteration.
1092
1093 Return a (sql, params) fragment to set a column to null or non-null
1094 as required by new_field, or None if no changes are required.
1095 """
1096 new_db_params = new_field.db_parameters(connection=self.connection)
1097 sql = (
1098 self.sql_alter_column_null
1099 if new_field.allow_null
1100 else self.sql_alter_column_not_null
1101 )
1102 return (
1103 sql
1104 % {
1105 "column": self.quote_name(new_field.column),
1106 "type": new_db_params["type"],
1107 },
1108 [],
1109 )
1110
1111 def _alter_column_default_sql(
1112 self,
1113 model: type[Model],
1114 old_field: Field | None,
1115 new_field: Field,
1116 drop: bool = False,
1117 ) -> tuple[str, list[Any]]:
1118 """
1119 Hook to specialize column default alteration.
1120
1121 Return a (sql, params) fragment to add or drop (depending on the drop
1122 argument) a default to new_field's column.
1123 """
1124 new_default = self.effective_default(new_field)
1125 default = self._column_default_sql(new_field)
1126 params = [new_default]
1127
1128 if drop:
1129 params = []
1130 elif self.connection.features.requires_literal_defaults:
1131 # Some databases (Oracle) can't take defaults as a parameter
1132 # If this is the case, the SchemaEditor for that database should
1133 # implement prepare_default().
1134 default = self.prepare_default(new_default)
1135 params = []
1136
1137 new_db_params = new_field.db_parameters(connection=self.connection)
1138 if drop:
1139 if new_field.allow_null:
1140 sql = self.sql_alter_column_no_default_null
1141 else:
1142 sql = self.sql_alter_column_no_default
1143 else:
1144 sql = self.sql_alter_column_default
1145 return (
1146 sql
1147 % {
1148 "column": self.quote_name(new_field.column),
1149 "type": new_db_params["type"],
1150 "default": default,
1151 },
1152 params,
1153 )
1154
1155 def _alter_column_type_sql(
1156 self,
1157 model: type[Model],
1158 old_field: Field,
1159 new_field: Field,
1160 new_type: str,
1161 old_collation: str | None,
1162 new_collation: str | None,
1163 ) -> tuple[tuple[str, list[Any]], list[tuple[str, list[Any]]]]:
1164 """
1165 Hook to specialize column type alteration for different backends,
1166 for cases when a creation type is different to an alteration type
1167 (e.g. SERIAL in PostgreSQL, PostGIS fields).
1168
1169 Return a two-tuple of: an SQL fragment of (sql, params) to insert into
1170 an ALTER TABLE statement and a list of extra (sql, params) tuples to
1171 run once the field is altered.
1172 """
1173 other_actions = []
1174 if collate_sql := self._collate_sql(
1175 new_collation, old_collation, model.model_options.db_table
1176 ):
1177 collate_sql = f" {collate_sql}"
1178 else:
1179 collate_sql = ""
1180 # Comment change?
1181 from plain.models.fields.related import ManyToManyField
1182
1183 comment_sql = ""
1184 if self.connection.features.supports_comments and not isinstance(
1185 new_field, ManyToManyField
1186 ):
1187 if old_field.db_comment != new_field.db_comment:
1188 # PostgreSQL and Oracle can't execute 'ALTER COLUMN ...' and
1189 # 'COMMENT ON ...' at the same time.
1190 sql, params = self._alter_column_comment_sql(
1191 model, new_field, new_type, new_field.db_comment
1192 )
1193 if sql:
1194 other_actions.append((sql, params))
1195 if new_field.db_comment:
1196 comment_sql = self._comment_sql(new_field.db_comment)
1197 return (
1198 (
1199 self.sql_alter_column_type
1200 % {
1201 "column": self.quote_name(new_field.column),
1202 "type": new_type,
1203 "collation": collate_sql,
1204 "comment": comment_sql,
1205 },
1206 [],
1207 ),
1208 other_actions,
1209 )
1210
1211 def _alter_column_comment_sql(
1212 self,
1213 model: type[Model],
1214 new_field: Field,
1215 new_type: str,
1216 new_db_comment: str | None,
1217 ) -> tuple[str, list[Any]]:
1218 return (
1219 self.sql_alter_column_comment
1220 % {
1221 "table": self.quote_name(model.model_options.db_table),
1222 "column": self.quote_name(new_field.column),
1223 "comment": self._comment_sql(new_db_comment),
1224 },
1225 [],
1226 )
1227
1228 def _comment_sql(self, comment: str | None) -> str:
1229 return self.quote_value(comment or "")
1230
1231 def _alter_many_to_many(
1232 self,
1233 model: type[Model],
1234 old_field: ManyToManyField,
1235 new_field: ManyToManyField,
1236 strict: bool,
1237 ) -> None:
1238 """Alter M2Ms to repoint their to= endpoints."""
1239 # Type narrow for ManyToManyField.remote_field
1240 old_rel: ManyToManyRel = old_field.remote_field
1241 new_rel: ManyToManyRel = new_field.remote_field
1242
1243 # Rename the through table
1244 if (
1245 old_rel.through.model_options.db_table
1246 != new_rel.through.model_options.db_table
1247 ):
1248 self.alter_db_table(
1249 old_rel.through,
1250 old_rel.through.model_options.db_table,
1251 new_rel.through.model_options.db_table,
1252 )
1253 # Repoint the FK to the other side
1254 old_reverse_field = old_rel.through._model_meta.get_forward_field(
1255 old_field.m2m_reverse_field_name()
1256 )
1257 new_reverse_field = new_rel.through._model_meta.get_forward_field(
1258 new_field.m2m_reverse_field_name()
1259 )
1260 self.alter_field(
1261 new_rel.through,
1262 # The field that points to the target model is needed, so we can
1263 # tell alter_field to change it - this is m2m_reverse_field_name()
1264 # (as opposed to m2m_field_name(), which points to our model).
1265 old_reverse_field,
1266 new_reverse_field,
1267 )
1268 old_m2m_field = old_rel.through._model_meta.get_forward_field(
1269 old_field.m2m_field_name()
1270 )
1271 new_m2m_field = new_rel.through._model_meta.get_forward_field(
1272 new_field.m2m_field_name()
1273 )
1274 self.alter_field(
1275 new_rel.through,
1276 # for self-referential models we need to alter field from the other end too
1277 old_m2m_field,
1278 new_m2m_field,
1279 )
1280
1281 def _create_index_name(
1282 self, table_name: str, column_names: list[str], suffix: str = ""
1283 ) -> str:
1284 """
1285 Generate a unique name for an index/unique constraint.
1286
1287 The name is divided into 3 parts: the table name, the column names,
1288 and a unique digest and suffix.
1289 """
1290 _, table_name = split_identifier(table_name)
1291 hash_suffix_part = (
1292 f"{names_digest(table_name, *column_names, length=8)}{suffix}"
1293 )
1294 max_length = self.connection.ops.max_name_length() or 200
1295 # If everything fits into max_length, use that name.
1296 index_name = "{}_{}_{}".format(
1297 table_name, "_".join(column_names), hash_suffix_part
1298 )
1299 if len(index_name) <= max_length:
1300 return index_name
1301 # Shorten a long suffix.
1302 if len(hash_suffix_part) > max_length / 3:
1303 hash_suffix_part = hash_suffix_part[: max_length // 3]
1304 other_length = (max_length - len(hash_suffix_part)) // 2 - 1
1305 index_name = "{}_{}_{}".format(
1306 table_name[:other_length],
1307 "_".join(column_names)[:other_length],
1308 hash_suffix_part,
1309 )
1310 # Prepend D if needed to prevent the name from starting with an
1311 # underscore or a number (not permitted on Oracle).
1312 if index_name[0] == "_" or index_name[0].isdigit():
1313 index_name = f"D{index_name[:-1]}"
1314 return index_name
1315
1316 def _index_condition_sql(self, condition: str | None) -> str:
1317 if condition:
1318 return " WHERE " + condition
1319 return ""
1320
1321 def _index_include_sql(
1322 self, model: type[Model], columns: list[str] | None
1323 ) -> str | Statement:
1324 if not columns or not self.connection.features.supports_covering_indexes:
1325 return ""
1326 return Statement(
1327 " INCLUDE (%(columns)s)",
1328 columns=Columns(model.model_options.db_table, columns, self.quote_name),
1329 )
1330
1331 def _create_index_sql(
1332 self,
1333 model: type[Model],
1334 *,
1335 fields: list[Field] | None = None,
1336 name: str | None = None,
1337 suffix: str = "",
1338 using: str = "",
1339 col_suffixes: tuple[str, ...] = (),
1340 sql: str | None = None,
1341 opclasses: tuple[str, ...] = (),
1342 condition: str | None = None,
1343 include: list[str] | None = None,
1344 expressions: Any = None,
1345 ) -> Statement:
1346 """
1347 Return the SQL statement to create the index for one or several fields
1348 or expressions. `sql` can be specified if the syntax differs from the
1349 standard (GIS indexes, ...).
1350 """
1351 fields = fields or []
1352 expressions = expressions or []
1353 compiler = Query(model, alias_cols=False).get_compiler()
1354 columns = [field.column for field in fields]
1355 sql_create_index = sql or self.sql_create_index
1356 table = model.model_options.db_table
1357
1358 def create_index_name(*args: Any, **kwargs: Any) -> str:
1359 nonlocal name
1360 if name is None:
1361 name = self._create_index_name(*args, **kwargs)
1362 return self.quote_name(name)
1363
1364 return Statement(
1365 sql_create_index,
1366 table=Table(table, self.quote_name),
1367 name=IndexName(table, columns, suffix, create_index_name),
1368 using=using,
1369 columns=(
1370 self._index_columns(table, columns, col_suffixes, opclasses)
1371 if columns
1372 else Expressions(table, expressions, compiler, self.quote_value)
1373 ),
1374 extra="",
1375 condition=self._index_condition_sql(condition),
1376 include=self._index_include_sql(model, include),
1377 )
1378
1379 def _delete_index_sql(
1380 self, model: type[Model], name: str, sql: str | None = None
1381 ) -> Statement:
1382 return Statement(
1383 sql or self.sql_delete_index,
1384 table=Table(model.model_options.db_table, self.quote_name),
1385 name=self.quote_name(name),
1386 )
1387
1388 def _rename_index_sql(
1389 self, model: type[Model], old_name: str, new_name: str
1390 ) -> Statement:
1391 return Statement(
1392 self.sql_rename_index,
1393 table=Table(model.model_options.db_table, self.quote_name),
1394 old_name=self.quote_name(old_name),
1395 new_name=self.quote_name(new_name),
1396 )
1397
1398 def _index_columns(
1399 self,
1400 table: str,
1401 columns: list[str],
1402 col_suffixes: tuple[str, ...],
1403 opclasses: tuple[str, ...],
1404 ) -> Columns:
1405 return Columns(table, columns, self.quote_name, col_suffixes=col_suffixes)
1406
1407 def _model_indexes_sql(self, model: type[Model]) -> list[Statement | None]:
1408 """
1409 Return a list of all index SQL statements (field indexes, Meta.indexes) for the specified model.
1410 """
1411 output: list[Statement | None] = []
1412 for field in model._model_meta.local_fields:
1413 output.extend(self._field_indexes_sql(model, field))
1414
1415 for index in model.model_options.indexes:
1416 if (
1417 not index.contains_expressions
1418 or self.connection.features.supports_expression_indexes
1419 ):
1420 output.append(index.create_sql(model, self))
1421 return output
1422
1423 def _field_indexes_sql(self, model: type[Model], field: Field) -> list[Statement]:
1424 """
1425 Return a list of all index SQL statements for the specified field.
1426 """
1427 output: list[Statement] = []
1428 if self._field_should_be_indexed(model, field):
1429 output.append(self._create_index_sql(model, fields=[field]))
1430 return output
1431
1432 def _field_should_be_altered(
1433 self, old_field: Field, new_field: Field, ignore: set[str] | None = None
1434 ) -> bool:
1435 ignore = ignore or set()
1436 _, old_path, old_args, old_kwargs = old_field.deconstruct()
1437 _, new_path, new_args, new_kwargs = new_field.deconstruct()
1438 # Don't alter when:
1439 # - changing only a field name
1440 # - changing an attribute that doesn't affect the schema
1441 # - changing an attribute in the provided set of ignored attributes
1442 # - adding only a db_column and the column name is not changed
1443 for attr in ignore.union(old_field.non_db_attrs):
1444 old_kwargs.pop(attr, None)
1445 for attr in ignore.union(new_field.non_db_attrs):
1446 new_kwargs.pop(attr, None)
1447 return self.quote_name(old_field.column) != self.quote_name(
1448 new_field.column
1449 ) or (old_path, old_args, old_kwargs) != (new_path, new_args, new_kwargs)
1450
1451 def _field_should_be_indexed(self, model: type[Model], field: Field) -> bool:
1452 if isinstance(field, ForeignKeyField):
1453 return bool(field.remote_field) and field.db_index and not field.primary_key
1454 return False
1455
1456 def _field_became_primary_key(self, old_field: Field, new_field: Field) -> bool:
1457 return not old_field.primary_key and new_field.primary_key
1458
1459 def _rename_field_sql(
1460 self, table: str, old_field: Field, new_field: Field, new_type: str
1461 ) -> str:
1462 return self.sql_rename_column % {
1463 "table": self.quote_name(table),
1464 "old_column": self.quote_name(old_field.column),
1465 "new_column": self.quote_name(new_field.column),
1466 "type": new_type,
1467 }
1468
1469 def _create_fk_sql(
1470 self, model: type[Model], field: ForeignKeyField, suffix: str
1471 ) -> Statement:
1472 table = Table(model.model_options.db_table, self.quote_name)
1473 name = self._fk_constraint_name(model, field, suffix)
1474 column = Columns(model.model_options.db_table, [field.column], self.quote_name)
1475 to_table = Table(
1476 field.target_field.model.model_options.db_table, self.quote_name
1477 )
1478 to_column = Columns(
1479 field.target_field.model.model_options.db_table,
1480 [field.target_field.column],
1481 self.quote_name,
1482 )
1483 deferrable = self.connection.ops.deferrable_sql()
1484 return Statement(
1485 self.sql_create_fk,
1486 table=table,
1487 name=name,
1488 column=column,
1489 to_table=to_table,
1490 to_column=to_column,
1491 deferrable=deferrable,
1492 )
1493
1494 def _fk_constraint_name(
1495 self, model: type[Model], field: ForeignKeyField, suffix: str
1496 ) -> ForeignKeyName:
1497 def create_fk_name(*args: Any, **kwargs: Any) -> str:
1498 return self.quote_name(self._create_index_name(*args, **kwargs))
1499
1500 return ForeignKeyName(
1501 model.model_options.db_table,
1502 [field.column],
1503 split_identifier(field.target_field.model.model_options.db_table)[1],
1504 [field.target_field.column],
1505 suffix,
1506 create_fk_name,
1507 )
1508
1509 def _delete_fk_sql(self, model: type[Model], name: str) -> Statement:
1510 return self._delete_constraint_sql(self.sql_delete_fk, model, name)
1511
1512 def _deferrable_constraint_sql(self, deferrable: Deferrable | None) -> str:
1513 if deferrable is None:
1514 return ""
1515 if deferrable == Deferrable.DEFERRED:
1516 return " DEFERRABLE INITIALLY DEFERRED"
1517 if deferrable == Deferrable.IMMEDIATE:
1518 return " DEFERRABLE INITIALLY IMMEDIATE"
1519 return ""
1520
1521 def _unique_sql(
1522 self,
1523 model: type[Model],
1524 fields: Iterable[Field],
1525 name: str,
1526 condition: str | None = None,
1527 deferrable: Deferrable | None = None,
1528 include: list[str] | None = None,
1529 opclasses: tuple[str, ...] | None = None,
1530 expressions: Any = None,
1531 ) -> str | None:
1532 if (
1533 deferrable
1534 and not self.connection.features.supports_deferrable_unique_constraints
1535 ):
1536 return None
1537 if condition or include or opclasses or expressions:
1538 # Databases support conditional, covering, and functional unique
1539 # constraints via a unique index.
1540 sql = self._create_unique_sql(
1541 model,
1542 fields,
1543 name=name,
1544 condition=condition,
1545 include=include,
1546 opclasses=opclasses,
1547 expressions=expressions,
1548 )
1549 if sql:
1550 self.deferred_sql.append(sql)
1551 return None
1552 constraint = self.sql_unique_constraint % {
1553 "columns": ", ".join([self.quote_name(field.column) for field in fields]),
1554 "deferrable": self._deferrable_constraint_sql(deferrable),
1555 }
1556 return self.sql_constraint % {
1557 "name": self.quote_name(name),
1558 "constraint": constraint,
1559 }
1560
1561 def _create_unique_sql(
1562 self,
1563 model: type[Model],
1564 fields: Iterable[Field],
1565 name: str | None = None,
1566 condition: str | None = None,
1567 deferrable: Deferrable | None = None,
1568 include: list[str] | None = None,
1569 opclasses: tuple[str, ...] | None = None,
1570 expressions: Any = None,
1571 ) -> Statement | None:
1572 if (
1573 (
1574 deferrable
1575 and not self.connection.features.supports_deferrable_unique_constraints
1576 )
1577 or (condition and not self.connection.features.supports_partial_indexes)
1578 or (include and not self.connection.features.supports_covering_indexes)
1579 or (
1580 expressions and not self.connection.features.supports_expression_indexes
1581 )
1582 ):
1583 return None
1584
1585 compiler = Query(model, alias_cols=False).get_compiler()
1586 table = model.model_options.db_table
1587 columns = [field.column for field in fields]
1588 constraint_name: IndexName | str
1589 if name is None:
1590 constraint_name = self._unique_constraint_name(table, columns, quote=True)
1591 else:
1592 constraint_name = self.quote_name(name)
1593 if condition or include or opclasses or expressions:
1594 sql = self.sql_create_unique_index
1595 else:
1596 sql = self.sql_create_unique
1597 if columns:
1598 columns_obj: Columns | Expressions = self._index_columns(
1599 table, columns, col_suffixes=(), opclasses=opclasses or ()
1600 )
1601 else:
1602 columns_obj = Expressions(table, expressions, compiler, self.quote_value)
1603 return Statement(
1604 sql,
1605 table=Table(table, self.quote_name),
1606 name=constraint_name,
1607 columns=columns_obj,
1608 condition=self._index_condition_sql(condition),
1609 deferrable=self._deferrable_constraint_sql(deferrable),
1610 include=self._index_include_sql(model, include),
1611 )
1612
1613 def _unique_constraint_name(
1614 self, table: str, columns: list[str], quote: bool = True
1615 ) -> IndexName | str:
1616 if quote:
1617
1618 def create_unique_name(*args: Any, **kwargs: Any) -> str:
1619 return self.quote_name(self._create_index_name(*args, **kwargs))
1620
1621 else:
1622 create_unique_name = self._create_index_name
1623
1624 return IndexName(table, columns, "_uniq", create_unique_name)
1625
1626 def _delete_unique_sql(
1627 self,
1628 model: type[Model],
1629 name: str,
1630 condition: str | None = None,
1631 deferrable: Deferrable | None = None,
1632 include: list[str] | None = None,
1633 opclasses: tuple[str, ...] | None = None,
1634 expressions: Any = None,
1635 ) -> Statement | None:
1636 if (
1637 (
1638 deferrable
1639 and not self.connection.features.supports_deferrable_unique_constraints
1640 )
1641 or (condition and not self.connection.features.supports_partial_indexes)
1642 or (include and not self.connection.features.supports_covering_indexes)
1643 or (
1644 expressions and not self.connection.features.supports_expression_indexes
1645 )
1646 ):
1647 return None
1648 if condition or include or opclasses or expressions:
1649 sql = self.sql_delete_index
1650 else:
1651 sql = self.sql_delete_unique
1652 return self._delete_constraint_sql(sql, model, name)
1653
1654 def _check_sql(self, name: str, check: str) -> str:
1655 return self.sql_constraint % {
1656 "name": self.quote_name(name),
1657 "constraint": self.sql_check_constraint % {"check": check},
1658 }
1659
1660 def _create_check_sql(
1661 self, model: type[Model], name: str, check: str
1662 ) -> Statement | None:
1663 if not self.connection.features.supports_table_check_constraints:
1664 return None
1665 return Statement(
1666 self.sql_create_check,
1667 table=Table(model.model_options.db_table, self.quote_name),
1668 name=self.quote_name(name),
1669 check=check,
1670 )
1671
1672 def _delete_check_sql(self, model: type[Model], name: str) -> Statement | None:
1673 if not self.connection.features.supports_table_check_constraints:
1674 return None
1675 return self._delete_constraint_sql(self.sql_delete_check, model, name)
1676
1677 def _delete_constraint_sql(
1678 self, template: str, model: type[Model], name: str
1679 ) -> Statement:
1680 return Statement(
1681 template,
1682 table=Table(model.model_options.db_table, self.quote_name),
1683 name=self.quote_name(name),
1684 )
1685
1686 def _constraint_names(
1687 self,
1688 model: type[Model],
1689 column_names: list[str] | None = None,
1690 unique: bool | None = None,
1691 primary_key: bool | None = None,
1692 index: bool | None = None,
1693 foreign_key: bool | None = None,
1694 check: bool | None = None,
1695 type_: str | None = None,
1696 exclude: set[str] | None = None,
1697 ) -> list[str]:
1698 """Return all constraint names matching the columns and conditions."""
1699 if column_names is not None:
1700 column_names = [
1701 self.connection.introspection.identifier_converter(
1702 truncate_name(name, self.connection.ops.max_name_length())
1703 )
1704 if self.connection.features.truncates_names
1705 else self.connection.introspection.identifier_converter(name)
1706 for name in column_names
1707 ]
1708 with self.connection.cursor() as cursor:
1709 constraints = self.connection.introspection.get_constraints(
1710 cursor, model.model_options.db_table
1711 )
1712 result: list[str] = []
1713 for name, infodict in constraints.items():
1714 if column_names is None or column_names == infodict["columns"]:
1715 if unique is not None and infodict["unique"] != unique:
1716 continue
1717 if primary_key is not None and infodict["primary_key"] != primary_key:
1718 continue
1719 if index is not None and infodict["index"] != index:
1720 continue
1721 if check is not None and infodict["check"] != check:
1722 continue
1723 if foreign_key is not None and not infodict["foreign_key"]:
1724 continue
1725 if type_ is not None and infodict["type"] != type_:
1726 continue
1727 if not exclude or name not in exclude:
1728 result.append(name)
1729 return result
1730
1731 def _delete_primary_key(self, model: type[Model], strict: bool = False) -> None:
1732 constraint_names = self._constraint_names(model, primary_key=True)
1733 if strict and len(constraint_names) != 1:
1734 raise ValueError(
1735 f"Found wrong number ({len(constraint_names)}) of PK constraints for {model.model_options.db_table}"
1736 )
1737 for constraint_name in constraint_names:
1738 self.execute(self._delete_primary_key_sql(model, constraint_name))
1739
1740 def _create_primary_key_sql(self, model: type[Model], field: Field) -> Statement:
1741 return Statement(
1742 self.sql_create_pk,
1743 table=Table(model.model_options.db_table, self.quote_name),
1744 name=self.quote_name(
1745 self._create_index_name(
1746 model.model_options.db_table, [field.column], suffix="_pk"
1747 )
1748 ),
1749 columns=Columns(
1750 model.model_options.db_table, [field.column], self.quote_name
1751 ),
1752 )
1753
1754 def _delete_primary_key_sql(self, model: type[Model], name: str) -> Statement:
1755 return self._delete_constraint_sql(self.sql_delete_pk, model, name)
1756
1757 def _collate_sql(
1758 self,
1759 collation: str | None,
1760 old_collation: str | None = None,
1761 table_name: str | None = None,
1762 ) -> str:
1763 return "COLLATE " + self.quote_name(collation) if collation else ""