1from __future__ import annotations
2
3import logging
4import operator
5from abc import ABC, abstractmethod
6from collections.abc import Generator
7from datetime import datetime
8from typing import TYPE_CHECKING, Any
9
10if TYPE_CHECKING:
11 from typing import Self
12
13from plain.models.backends.ddl_references import (
14 Columns,
15 Expressions,
16 ForeignKeyName,
17 IndexName,
18 Statement,
19 Table,
20)
21from plain.models.backends.utils import names_digest, split_identifier, truncate_name
22from plain.models.constraints import Deferrable
23from plain.models.fields import Field
24from plain.models.fields.related import ForeignKeyField, RelatedField
25from plain.models.fields.reverse_related import ManyToManyRel
26from plain.models.indexes import Index
27from plain.models.sql import Query
28from plain.models.transaction import TransactionManagementError, atomic
29from plain.utils import timezone
30
31if TYPE_CHECKING:
32 from collections.abc import Iterable
33
34 from plain.models.backends.base.base import BaseDatabaseWrapper
35 from plain.models.base import Model
36 from plain.models.constraints import BaseConstraint
37 from plain.models.fields import Field
38 from plain.models.fields.related import ForeignKeyField, ManyToManyField
39 from plain.models.fields.reverse_related import ManyToManyRel
40
41logger = logging.getLogger("plain.models.backends.schema")
42
43
44def _is_relevant_relation(relation: Any, altered_field: Field) -> bool:
45 """
46 When altering the given field, must constraints on its model from the given
47 relation be temporarily dropped?
48 """
49 from plain.models.fields.related import ManyToManyField
50
51 field = relation.field
52 if isinstance(field, ManyToManyField):
53 # M2M reverse field
54 return False
55 if altered_field.primary_key:
56 # Foreign key constraint on the primary key, which is being altered.
57 return True
58 # ForeignKeyField always targets 'id'
59 return altered_field.name == "id"
60
61
62def _all_related_fields(model: type[Model]) -> list[Any]:
63 # Related fields must be returned in a deterministic order.
64 return sorted(
65 model._model_meta._get_fields(
66 forward=False,
67 reverse=True,
68 ),
69 key=operator.attrgetter("name"),
70 )
71
72
73def _related_non_m2m_objects(
74 old_field: Field, new_field: Field
75) -> Generator[tuple[Any, Any], None, None]:
76 # Filter out m2m objects from reverse relations.
77 # Return (old_relation, new_relation) tuples.
78 related_fields = zip(
79 (
80 obj
81 for obj in _all_related_fields(old_field.model)
82 if _is_relevant_relation(obj, old_field)
83 ),
84 (
85 obj
86 for obj in _all_related_fields(new_field.model)
87 if _is_relevant_relation(obj, new_field)
88 ),
89 )
90 for old_rel, new_rel in related_fields:
91 yield old_rel, new_rel
92 yield from _related_non_m2m_objects(
93 old_rel.remote_field,
94 new_rel.remote_field,
95 )
96
97
98class BaseDatabaseSchemaEditor(ABC):
99 """
100 This class and its subclasses are responsible for emitting schema-changing
101 statements to the databases - model creation/removal/alteration, field
102 renaming, index fiddling, and so on.
103 """
104
105 # Overrideable SQL templates
106 sql_create_table = "CREATE TABLE %(table)s (%(definition)s)"
107 sql_rename_table = "ALTER TABLE %(old_table)s RENAME TO %(new_table)s"
108 sql_delete_table = "DROP TABLE %(table)s CASCADE"
109
110 sql_create_column = "ALTER TABLE %(table)s ADD COLUMN %(column)s %(definition)s"
111 sql_alter_column = "ALTER TABLE %(table)s %(changes)s"
112 sql_alter_column_type = "ALTER COLUMN %(column)s TYPE %(type)s%(collation)s"
113 sql_alter_column_null = "ALTER COLUMN %(column)s DROP NOT NULL"
114 sql_alter_column_not_null = "ALTER COLUMN %(column)s SET NOT NULL"
115 sql_alter_column_default = "ALTER COLUMN %(column)s SET DEFAULT %(default)s"
116 sql_alter_column_no_default = "ALTER COLUMN %(column)s DROP DEFAULT"
117 sql_alter_column_no_default_null = sql_alter_column_no_default
118 sql_delete_column = "ALTER TABLE %(table)s DROP COLUMN %(column)s CASCADE"
119 sql_rename_column = (
120 "ALTER TABLE %(table)s RENAME COLUMN %(old_column)s TO %(new_column)s"
121 )
122 sql_update_with_default = (
123 "UPDATE %(table)s SET %(column)s = %(default)s WHERE %(column)s IS NULL"
124 )
125
126 sql_unique_constraint = "UNIQUE (%(columns)s)%(deferrable)s"
127 sql_check_constraint = "CHECK (%(check)s)"
128 sql_delete_constraint = "ALTER TABLE %(table)s DROP CONSTRAINT %(name)s"
129 sql_constraint = "CONSTRAINT %(name)s %(constraint)s"
130
131 sql_create_check = "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s CHECK (%(check)s)"
132 sql_delete_check = sql_delete_constraint
133
134 sql_create_unique = (
135 "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s "
136 "UNIQUE (%(columns)s)%(deferrable)s"
137 )
138 sql_delete_unique = sql_delete_constraint
139
140 sql_create_fk = (
141 "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s FOREIGN KEY (%(column)s) "
142 "REFERENCES %(to_table)s (%(to_column)s)%(deferrable)s"
143 )
144 sql_create_inline_fk = None
145 sql_create_column_inline_fk = None
146 sql_delete_fk = sql_delete_constraint
147
148 sql_create_index = (
149 "CREATE INDEX %(name)s ON %(table)s "
150 "(%(columns)s)%(include)s%(extra)s%(condition)s"
151 )
152 sql_create_unique_index = (
153 "CREATE UNIQUE INDEX %(name)s ON %(table)s "
154 "(%(columns)s)%(include)s%(condition)s"
155 )
156 sql_rename_index = "ALTER INDEX %(old_name)s RENAME TO %(new_name)s"
157 sql_delete_index = "DROP INDEX %(name)s"
158
159 sql_create_pk = (
160 "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s PRIMARY KEY (%(columns)s)"
161 )
162 sql_delete_pk = sql_delete_constraint
163
164 sql_alter_table_comment = "COMMENT ON TABLE %(table)s IS %(comment)s"
165 sql_alter_column_comment = "COMMENT ON COLUMN %(table)s.%(column)s IS %(comment)s"
166
167 def __init__(
168 self,
169 connection: BaseDatabaseWrapper,
170 atomic: bool = True,
171 ):
172 self.connection = connection
173 self.atomic_migration = self.connection.features.can_rollback_ddl and atomic
174
175 # State-managing methods
176
177 def __enter__(self) -> Self:
178 self.deferred_sql: list[Any] = []
179 self.executed_sql: list[str] = []
180 if self.atomic_migration:
181 self.atomic = atomic()
182 self.atomic.__enter__()
183 return self
184
185 def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
186 if exc_type is None:
187 for sql in self.deferred_sql:
188 self.execute(sql)
189 if self.atomic_migration:
190 self.atomic.__exit__(exc_type, exc_value, traceback)
191
192 # Core utility functions
193
194 def execute(
195 self, sql: str | Statement, params: tuple[Any, ...] | list[Any] | None = ()
196 ) -> None:
197 """Execute the given SQL statement, with optional parameters."""
198 if (
199 self.connection.in_atomic_block
200 and not self.connection.features.can_rollback_ddl
201 ):
202 raise TransactionManagementError(
203 "Executing DDL statements while in a transaction on databases "
204 "that can't perform a rollback is prohibited."
205 )
206 # Account for non-string statement objects.
207 sql = str(sql)
208 # Log the command we're running, then run it
209 logger.debug(
210 "%s; (params %r)", sql, params, extra={"params": params, "sql": sql}
211 )
212
213 # Track executed SQL for display in migration output
214 # Store the SQL for display (interpolate params for readability)
215 if params:
216 self.executed_sql.append(sql % tuple(map(self.quote_value, params)))
217 else:
218 self.executed_sql.append(sql)
219
220 with self.connection.cursor() as cursor:
221 cursor.execute(sql, params)
222
223 def quote_name(self, name: str) -> str:
224 return self.connection.ops.quote_name(name)
225
226 def table_sql(self, model: type[Model]) -> tuple[str, list[Any]]:
227 """Take a model and return its table definition."""
228 # Create column SQL, add FK deferreds if needed.
229 column_sqls = []
230 params = []
231 for field in model._model_meta.local_fields:
232 # SQL.
233 definition, extra_params = self.column_sql(model, field)
234 if definition is None:
235 continue
236 # Check constraints can go on the column SQL here.
237 db_params = field.db_parameters(connection=self.connection)
238 if db_params["check"]:
239 definition += " " + self.sql_check_constraint % db_params
240 # Autoincrement SQL (for backends with inline variant).
241 col_type_suffix = field.db_type_suffix(connection=self.connection)
242 if col_type_suffix:
243 definition += f" {col_type_suffix}"
244 if extra_params:
245 params.extend(extra_params)
246 # FK.
247 if isinstance(field, ForeignKeyField) and field.db_constraint:
248 to_table = field.remote_field.model.model_options.db_table
249 field_name = field.remote_field.field_name
250 if field_name is None:
251 raise ValueError("Foreign key field_name cannot be None")
252 to_field = field.remote_field.model._model_meta.get_forward_field(
253 field_name
254 )
255 to_column = to_field.column
256 if self.sql_create_inline_fk:
257 definition += " " + self.sql_create_inline_fk % {
258 "to_table": self.quote_name(to_table),
259 "to_column": self.quote_name(to_column),
260 }
261 elif self.connection.features.supports_foreign_keys:
262 self.deferred_sql.append(
263 self._create_fk_sql(
264 model, field, "_fk_%(to_table)s_%(to_column)s"
265 )
266 )
267 # Add the SQL to our big list.
268 column_sqls.append(f"{self.quote_name(field.column)} {definition}")
269 # Autoincrement SQL (for backends with post table definition
270 # variant).
271 if field.get_internal_type() in ("PrimaryKeyField",):
272 autoinc_sql = self.connection.ops.autoinc_sql(
273 model.model_options.db_table, field.column
274 )
275 if autoinc_sql:
276 self.deferred_sql.extend(autoinc_sql)
277 constraints = [
278 constraint.constraint_sql(model, self)
279 for constraint in model.model_options.constraints
280 ]
281 sql = self.sql_create_table % {
282 "table": self.quote_name(model.model_options.db_table),
283 "definition": ", ".join(
284 str(constraint)
285 for constraint in (*column_sqls, *constraints)
286 if constraint
287 ),
288 }
289 return sql, params
290
291 # Field <-> database mapping functions
292
293 def _iter_column_sql(
294 self,
295 column_db_type: str,
296 params: list[Any],
297 model: type[Model],
298 field: Field,
299 field_db_params: dict[str, Any],
300 include_default: bool,
301 ) -> Generator[str, None, None]:
302 yield column_db_type
303 if collation := field_db_params.get("collation"):
304 yield self._collate_sql(collation)
305 if self.connection.features.supports_comments_inline and field.db_comment:
306 yield self._comment_sql(field.db_comment)
307 # Work out nullability.
308 null = field.allow_null
309 # Include a default value, if requested.
310 include_default = (
311 include_default
312 and not self.skip_default(field)
313 and
314 # Don't include a default value if it's a nullable field and the
315 # default cannot be dropped in the ALTER COLUMN statement (e.g.
316 # MySQL longtext and longblob).
317 not (null and self.skip_default_on_alter(field))
318 )
319 if include_default:
320 default_value = self.effective_default(field)
321 if default_value is not None:
322 column_default = "DEFAULT " + self._column_default_sql(field)
323 if self.connection.features.requires_literal_defaults:
324 # Some databases can't take defaults as a parameter (Oracle).
325 # If this is the case, the individual schema backend should
326 # implement prepare_default().
327 yield column_default % self.prepare_default(default_value)
328 else:
329 yield column_default
330 params.append(default_value)
331
332 if not null:
333 yield "NOT NULL"
334 elif not self.connection.features.implied_column_null:
335 yield "NULL"
336
337 if field.primary_key:
338 yield "PRIMARY KEY"
339
340 def column_sql(
341 self, model: type[Model], field: Field, include_default: bool = False
342 ) -> tuple[str | None, list[Any] | None]:
343 """
344 Return the column definition for a field. The field must already have
345 had set_attributes_from_name() called.
346 """
347 # Get the column's type and use that as the basis of the SQL.
348 field_db_params = field.db_parameters(connection=self.connection)
349 column_db_type = field_db_params["type"]
350 # Check for fields that aren't actually columns (e.g. M2M).
351 if column_db_type is None:
352 return None, None
353 params: list[Any] = []
354 return (
355 " ".join(
356 # This appends to the params being returned.
357 self._iter_column_sql(
358 column_db_type,
359 params,
360 model,
361 field,
362 field_db_params,
363 include_default,
364 )
365 ),
366 params,
367 )
368
369 def skip_default(self, field: Field) -> bool:
370 """
371 Some backends don't accept default values for certain columns types
372 (i.e. MySQL longtext and longblob).
373 """
374 return False
375
376 def skip_default_on_alter(self, field: Field) -> bool:
377 """
378 Some backends don't accept default values for certain columns types
379 (i.e. MySQL longtext and longblob) in the ALTER COLUMN statement.
380 """
381 return False
382
383 def prepare_default(self, value: Any) -> str:
384 """
385 Only used for backends which have requires_literal_defaults feature
386 """
387 raise NotImplementedError(
388 "subclasses of BaseDatabaseSchemaEditor for backends which have "
389 "requires_literal_defaults must provide a prepare_default() method"
390 )
391
392 def _column_default_sql(self, field: Field) -> str:
393 """
394 Return the SQL to use in a DEFAULT clause. The resulting string should
395 contain a '%s' placeholder for a default value.
396 """
397 return "%s"
398
399 @staticmethod
400 def _effective_default(field: Field) -> Any:
401 # This method allows testing its logic without a connection.
402 if field.has_default():
403 default = field.get_default()
404 elif (
405 not field.allow_null and not field.required and field.empty_strings_allowed
406 ):
407 if field.get_internal_type() == "BinaryField":
408 default = b""
409 else:
410 default = ""
411 elif getattr(field, "auto_now", False) or getattr(field, "auto_now_add", False):
412 internal_type = field.get_internal_type()
413 if internal_type == "DateTimeField":
414 default = timezone.now()
415 else:
416 default = datetime.now()
417 if internal_type == "DateField":
418 default = default.date()
419 elif internal_type == "TimeField":
420 default = default.time()
421 else:
422 default = None
423 return default
424
425 def effective_default(self, field: Field) -> Any:
426 """Return a field's effective database default value."""
427 return field.get_db_prep_save(self._effective_default(field), self.connection)
428
429 @abstractmethod
430 def quote_value(self, value: Any) -> str:
431 """
432 Return a quoted version of the value so it's safe to use in an SQL
433 string. This is not safe against injection from user code; it is
434 intended only for use in making SQL scripts or preparing default values
435 for particularly tricky backends (defaults are not user-defined, though,
436 so this is safe).
437 """
438 ...
439
440 # Actions
441
442 def create_model(self, model: type[Model]) -> None:
443 """
444 Create a table and any accompanying indexes or unique constraints for
445 the given `model`.
446 """
447 sql, params = self.table_sql(model)
448 # Prevent using [] as params, in the case a literal '%' is used in the
449 # definition.
450 self.execute(sql, params or None)
451
452 if self.connection.features.supports_comments:
453 # Add table comment.
454 if model.model_options.db_table_comment:
455 self.alter_db_table_comment(
456 model, None, model.model_options.db_table_comment
457 )
458 # Add column comments.
459 if not self.connection.features.supports_comments_inline:
460 for field in model._model_meta.local_fields:
461 if field.db_comment:
462 field_db_params = field.db_parameters(
463 connection=self.connection
464 )
465 field_type = field_db_params["type"]
466 self.execute(
467 *self._alter_column_comment_sql(
468 model, field, field_type, field.db_comment
469 )
470 )
471 # Add any field index (deferred as SQLite _remake_table needs it).
472 self.deferred_sql.extend(self._model_indexes_sql(model))
473
474 def delete_model(self, model: type[Model]) -> None:
475 """Delete a model from the database."""
476
477 # Delete the table
478 self.execute(
479 self.sql_delete_table
480 % {
481 "table": self.quote_name(model.model_options.db_table),
482 }
483 )
484 # Remove all deferred statements referencing the deleted table.
485 for sql in list(self.deferred_sql):
486 if isinstance(sql, Statement) and sql.references_table(
487 model.model_options.db_table
488 ):
489 self.deferred_sql.remove(sql)
490
491 def add_index(self, model: type[Model], index: Index) -> None:
492 """Add an index on a model."""
493 if (
494 index.contains_expressions
495 and not self.connection.features.supports_expression_indexes
496 ):
497 return None
498 # Index.create_sql returns interpolated SQL which makes params=None a
499 # necessity to avoid escaping attempts on execution.
500 self.execute(index.create_sql(model, self), params=None)
501
502 def remove_index(self, model: type[Model], index: Index) -> None:
503 """Remove an index from a model."""
504 if (
505 index.contains_expressions
506 and not self.connection.features.supports_expression_indexes
507 ):
508 return None
509 self.execute(index.remove_sql(model, self))
510
511 def rename_index(
512 self, model: type[Model], old_index: Index, new_index: Index
513 ) -> None:
514 if self.connection.features.can_rename_index:
515 self.execute(
516 self._rename_index_sql(model, old_index.name, new_index.name),
517 params=None,
518 )
519 else:
520 self.remove_index(model, old_index)
521 self.add_index(model, new_index)
522
523 def add_constraint(self, model: type[Model], constraint: BaseConstraint) -> None:
524 """Add a constraint to a model."""
525 sql = constraint.create_sql(model, self)
526 if sql:
527 # Constraint.create_sql returns interpolated SQL which makes
528 # params=None a necessity to avoid escaping attempts on execution.
529 self.execute(sql, params=None)
530
531 def remove_constraint(self, model: type[Model], constraint: BaseConstraint) -> None:
532 """Remove a constraint from a model."""
533 sql = constraint.remove_sql(model, self)
534 if sql:
535 self.execute(sql)
536
537 def alter_db_table(
538 self, model: type[Model], old_db_table: str, new_db_table: str
539 ) -> None:
540 """Rename the table a model points to."""
541 if old_db_table == new_db_table or (
542 self.connection.features.ignores_table_name_case
543 and old_db_table.lower() == new_db_table.lower()
544 ):
545 return
546 self.execute(
547 self.sql_rename_table
548 % {
549 "old_table": self.quote_name(old_db_table),
550 "new_table": self.quote_name(new_db_table),
551 }
552 )
553 # Rename all references to the old table name.
554 for sql in self.deferred_sql:
555 if isinstance(sql, Statement):
556 sql.rename_table_references(old_db_table, new_db_table)
557
558 def alter_db_table_comment(
559 self,
560 model: type[Model],
561 old_db_table_comment: str | None,
562 new_db_table_comment: str | None,
563 ) -> None:
564 self.execute(
565 self.sql_alter_table_comment
566 % {
567 "table": self.quote_name(model.model_options.db_table),
568 "comment": self.quote_value(new_db_table_comment or ""),
569 }
570 )
571
572 def add_field(self, model: type[Model], field: Field) -> None:
573 """
574 Create a field on a model. Usually involves adding a column, but may
575 involve adding a table instead (for M2M fields).
576 """
577 # Get the column's definition
578 definition, params = self.column_sql(model, field, include_default=True)
579 # It might not actually have a column behind it
580 if definition is None:
581 return
582 if col_type_suffix := field.db_type_suffix(connection=self.connection):
583 definition += f" {col_type_suffix}"
584 # Check constraints can go on the column SQL here
585 db_params = field.db_parameters(connection=self.connection)
586 if db_params["check"]:
587 definition += " " + self.sql_check_constraint % db_params
588 if (
589 isinstance(field, ForeignKeyField)
590 and self.connection.features.supports_foreign_keys
591 and field.db_constraint
592 ):
593 constraint_suffix = "_fk_%(to_table)s_%(to_column)s"
594 # Add FK constraint inline, if supported.
595 if self.sql_create_column_inline_fk:
596 to_table = field.remote_field.model.model_options.db_table
597 field_name = field.remote_field.field_name
598 if field_name is None:
599 raise ValueError("Foreign key field_name cannot be None")
600 to_field = field.remote_field.model._model_meta.get_forward_field(
601 field_name
602 )
603 to_column = to_field.column
604 namespace, _ = split_identifier(model.model_options.db_table)
605 definition += " " + self.sql_create_column_inline_fk % {
606 "name": self._fk_constraint_name(model, field, constraint_suffix),
607 "namespace": f"{self.quote_name(namespace)}." if namespace else "",
608 "column": self.quote_name(field.column),
609 "to_table": self.quote_name(to_table),
610 "to_column": self.quote_name(to_column),
611 "deferrable": self.connection.ops.deferrable_sql(),
612 }
613 # Otherwise, add FK constraints later.
614 else:
615 self.deferred_sql.append(
616 self._create_fk_sql(model, field, constraint_suffix)
617 )
618 # Build the SQL and run it
619 sql = self.sql_create_column % {
620 "table": self.quote_name(model.model_options.db_table),
621 "column": self.quote_name(field.column),
622 "definition": definition,
623 }
624 self.execute(sql, params)
625 # Drop the default if we need to
626 # (Plain usually does not use in-database defaults)
627 if (
628 not self.skip_default_on_alter(field)
629 and self.effective_default(field) is not None
630 ):
631 changes_sql, params = self._alter_column_default_sql(
632 model, None, field, drop=True
633 )
634 sql = self.sql_alter_column % {
635 "table": self.quote_name(model.model_options.db_table),
636 "changes": changes_sql,
637 }
638 self.execute(sql, params)
639 # Add field comment, if required.
640 if (
641 field.db_comment
642 and self.connection.features.supports_comments
643 and not self.connection.features.supports_comments_inline
644 ):
645 field_type = db_params["type"]
646 self.execute(
647 *self._alter_column_comment_sql(
648 model, field, field_type, field.db_comment
649 )
650 )
651 # Add an index, if required
652 self.deferred_sql.extend(self._field_indexes_sql(model, field))
653 # Reset connection if required
654 if self.connection.features.connection_persists_old_columns:
655 self.connection.close()
656
657 def remove_field(self, model: type[Model], field: Field) -> None:
658 """
659 Remove a field from a model. Usually involves deleting a column,
660 but for M2Ms may involve deleting a table.
661 """
662 # It might not actually have a column behind it
663 if field.db_parameters(connection=self.connection)["type"] is None:
664 return
665 # Drop any FK constraints, MySQL requires explicit deletion
666 if isinstance(field, RelatedField):
667 fk_names = self._constraint_names(model, [field.column], foreign_key=True)
668 for fk_name in fk_names:
669 self.execute(self._delete_fk_sql(model, fk_name))
670 # Delete the column
671 sql = self.sql_delete_column % {
672 "table": self.quote_name(model.model_options.db_table),
673 "column": self.quote_name(field.column),
674 }
675 self.execute(sql)
676 # Reset connection if required
677 if self.connection.features.connection_persists_old_columns:
678 self.connection.close()
679 # Remove all deferred statements referencing the deleted column.
680 for sql in list(self.deferred_sql):
681 if isinstance(sql, Statement) and sql.references_column(
682 model.model_options.db_table, field.column
683 ):
684 self.deferred_sql.remove(sql)
685
686 def alter_field(
687 self,
688 model: type[Model],
689 old_field: Field,
690 new_field: Field,
691 strict: bool = False,
692 ) -> None:
693 """
694 Allow a field's type, uniqueness, nullability, default, column,
695 constraints, etc. to be modified.
696 `old_field` is required to compute the necessary changes.
697 If `strict` is True, raise errors if the old column does not match
698 `old_field` precisely.
699 """
700 if not self._field_should_be_altered(old_field, new_field):
701 return
702 # Ensure this field is even column-based
703 old_db_params = old_field.db_parameters(connection=self.connection)
704 old_type = old_db_params["type"]
705 new_db_params = new_field.db_parameters(connection=self.connection)
706 new_type = new_db_params["type"]
707 if (old_type is None and not isinstance(old_field, RelatedField)) or (
708 new_type is None and not isinstance(new_field, RelatedField)
709 ):
710 raise ValueError(
711 f"Cannot alter field {old_field} into {new_field} - they do not properly define "
712 "db_type (are you using a badly-written custom field?)",
713 )
714 elif (
715 old_type is None
716 and new_type is None
717 and isinstance(old_field, RelatedField)
718 and isinstance(old_field.remote_field, ManyToManyRel)
719 and isinstance(new_field, RelatedField)
720 and isinstance(new_field.remote_field, ManyToManyRel)
721 ):
722 # Both sides have through models; this is a no-op.
723 return
724 elif old_type is None or new_type is None:
725 raise ValueError(
726 f"Cannot alter field {old_field} into {new_field} - they are not compatible types "
727 "(you cannot alter to or from M2M fields, or add or remove "
728 "through= on M2M fields)"
729 )
730
731 self._alter_field(
732 model,
733 old_field,
734 new_field,
735 old_type,
736 new_type,
737 old_db_params,
738 new_db_params,
739 strict,
740 )
741
742 def _field_db_check(
743 self, field: Field, field_db_params: dict[str, Any]
744 ) -> str | None:
745 # Always check constraints with the same mocked column name to avoid
746 # recreating constrains when the column is renamed.
747 check_constraints = self.connection.data_type_check_constraints
748 data = field.db_type_parameters(self.connection)
749 data["column"] = "__column_name__"
750 try:
751 return check_constraints[field.get_internal_type()] % data
752 except KeyError:
753 return None
754
755 def _alter_field(
756 self,
757 model: type[Model],
758 old_field: Field,
759 new_field: Field,
760 old_type: str,
761 new_type: str,
762 old_db_params: dict[str, Any],
763 new_db_params: dict[str, Any],
764 strict: bool = False,
765 ) -> None:
766 """Perform a "physical" (non-ManyToMany) field update."""
767 # Drop any FK constraints, we'll remake them later
768 fks_dropped = set()
769 if (
770 self.connection.features.supports_foreign_keys
771 and isinstance(old_field, ForeignKeyField)
772 and old_field.db_constraint
773 and self._field_should_be_altered(
774 old_field,
775 new_field,
776 ignore={"db_comment"},
777 )
778 ):
779 fk_names = self._constraint_names(
780 model, [old_field.column], foreign_key=True
781 )
782 if strict and len(fk_names) != 1:
783 raise ValueError(
784 f"Found wrong number ({len(fk_names)}) of foreign key constraints for {model.model_options.db_table}.{old_field.column}"
785 )
786 for fk_name in fk_names:
787 fks_dropped.add((old_field.column,))
788 self.execute(self._delete_fk_sql(model, fk_name))
789 # Has unique been removed?
790 if old_field.primary_key and (
791 not new_field.primary_key
792 or self._field_became_primary_key(old_field, new_field)
793 ):
794 # Find the unique constraint for this field
795 meta_constraint_names = {
796 constraint.name for constraint in model.model_options.constraints
797 }
798 constraint_names = self._constraint_names(
799 model,
800 [old_field.column],
801 unique=True,
802 primary_key=False,
803 exclude=meta_constraint_names,
804 )
805 if strict and len(constraint_names) != 1:
806 raise ValueError(
807 f"Found wrong number ({len(constraint_names)}) of unique constraints for {model.model_options.db_table}.{old_field.column}"
808 )
809 for constraint_name in constraint_names:
810 sql = self._delete_unique_sql(model, constraint_name)
811 if sql is not None:
812 self.execute(sql)
813 # Drop incoming FK constraints if the field is a primary key or unique,
814 # which might be a to_field target, and things are going to change.
815 old_collation = old_db_params.get("collation")
816 new_collation = new_db_params.get("collation")
817 drop_foreign_keys = (
818 self.connection.features.supports_foreign_keys
819 and (old_field.primary_key and new_field.primary_key)
820 and ((old_type != new_type) or (old_collation != new_collation))
821 )
822 if drop_foreign_keys:
823 # '_model_meta.related_field' also contains M2M reverse fields, these
824 # will be filtered out
825 for _old_rel, new_rel in _related_non_m2m_objects(old_field, new_field):
826 rel_fk_names = self._constraint_names(
827 new_rel.related_model, [new_rel.field.column], foreign_key=True
828 )
829 for fk_name in rel_fk_names:
830 self.execute(self._delete_fk_sql(new_rel.related_model, fk_name))
831 # Removed an index? (no strict check, as multiple indexes are possible)
832 # Remove indexes if db_index switched to False or a unique constraint
833 # will now be used in lieu of an index. The following lines from the
834 # truth table show all True cases; the rest are False:
835 #
836 # old_field.db_index | old_field.primary_key | new_field.db_index | new_field.primary_key
837 # ------------------------------------------------------------------------------
838 # True | False | False | False
839 # True | False | False | True
840 # True | False | True | True
841 if (
842 isinstance(old_field, ForeignKeyField)
843 and old_field.db_index
844 and not old_field.primary_key
845 and (
846 not (isinstance(new_field, ForeignKeyField) and new_field.db_index)
847 or new_field.primary_key
848 )
849 ):
850 # Find the index for this field
851 meta_index_names = {index.name for index in model.model_options.indexes}
852 # Retrieve only BTREE indexes since this is what's created with
853 # db_index=True.
854 index_names = self._constraint_names(
855 model,
856 [old_field.column],
857 index=True,
858 type_=Index.suffix,
859 exclude=meta_index_names,
860 )
861 for index_name in index_names:
862 # The only way to check if an index was created with
863 # db_index=True or with Index(['field'], name='foo')
864 # is to look at its name (refs #28053).
865 self.execute(self._delete_index_sql(model, index_name))
866 # Change check constraints?
867 old_db_check = self._field_db_check(old_field, old_db_params)
868 new_db_check = self._field_db_check(new_field, new_db_params)
869 if old_db_check != new_db_check and old_db_check:
870 meta_constraint_names = {
871 constraint.name for constraint in model.model_options.constraints
872 }
873 constraint_names = self._constraint_names(
874 model,
875 [old_field.column],
876 check=True,
877 exclude=meta_constraint_names,
878 )
879 if strict and len(constraint_names) != 1:
880 raise ValueError(
881 f"Found wrong number ({len(constraint_names)}) of check constraints for {model.model_options.db_table}.{old_field.column}"
882 )
883 for constraint_name in constraint_names:
884 sql = self._delete_check_sql(model, constraint_name)
885 if sql is not None:
886 self.execute(sql)
887 # Have they renamed the column?
888 if old_field.column != new_field.column:
889 self.execute(
890 self._rename_field_sql(
891 model.model_options.db_table, old_field, new_field, new_type
892 )
893 )
894 # Rename all references to the renamed column.
895 for sql in self.deferred_sql:
896 if isinstance(sql, Statement):
897 sql.rename_column_references(
898 model.model_options.db_table, old_field.column, new_field.column
899 )
900 # Next, start accumulating actions to do
901 actions = []
902 null_actions = []
903 post_actions = []
904 # Type suffix change? (e.g. auto increment).
905 old_type_suffix = old_field.db_type_suffix(connection=self.connection)
906 new_type_suffix = new_field.db_type_suffix(connection=self.connection)
907 # Type, collation, or comment change?
908 if (
909 old_type != new_type
910 or old_type_suffix != new_type_suffix
911 or old_collation != new_collation
912 or (
913 self.connection.features.supports_comments
914 and old_field.db_comment != new_field.db_comment
915 )
916 ):
917 fragment, other_actions = self._alter_column_type_sql(
918 model, old_field, new_field, new_type, old_collation, new_collation
919 )
920 actions.append(fragment)
921 post_actions.extend(other_actions)
922 # When changing a column NULL constraint to NOT NULL with a given
923 # default value, we need to perform 4 steps:
924 # 1. Add a default for new incoming writes
925 # 2. Update existing NULL rows with new default
926 # 3. Replace NULL constraint with NOT NULL
927 # 4. Drop the default again.
928 # Default change?
929 needs_database_default = False
930 if old_field.allow_null and not new_field.allow_null:
931 old_default = self.effective_default(old_field)
932 new_default = self.effective_default(new_field)
933 if (
934 not self.skip_default_on_alter(new_field)
935 and old_default != new_default
936 and new_default is not None
937 ):
938 needs_database_default = True
939 actions.append(
940 self._alter_column_default_sql(model, old_field, new_field)
941 )
942 # Nullability change?
943 if old_field.allow_null != new_field.allow_null:
944 fragment = self._alter_column_null_sql(model, old_field, new_field)
945 if fragment:
946 null_actions.append(fragment)
947 # Only if we have a default and there is a change from NULL to NOT NULL
948 four_way_default_alteration = new_field.has_default() and (
949 old_field.allow_null and not new_field.allow_null
950 )
951 if actions or null_actions:
952 if not four_way_default_alteration:
953 # If we don't have to do a 4-way default alteration we can
954 # directly run a (NOT) NULL alteration
955 actions += null_actions
956 # Combine actions together if we can (e.g. postgres)
957 if self.connection.features.supports_combined_alters and actions:
958 sql, params = tuple(zip(*actions))
959 actions = [(", ".join(sql), sum(params, []))]
960 # Apply those actions
961 for sql, params in actions:
962 self.execute(
963 self.sql_alter_column
964 % {
965 "table": self.quote_name(model.model_options.db_table),
966 "changes": sql,
967 },
968 params,
969 )
970 if four_way_default_alteration:
971 # Update existing rows with default value
972 self.execute(
973 self.sql_update_with_default
974 % {
975 "table": self.quote_name(model.model_options.db_table),
976 "column": self.quote_name(new_field.column),
977 "default": "%s",
978 },
979 [new_default],
980 )
981 # Since we didn't run a NOT NULL change before we need to do it
982 # now
983 for sql, params in null_actions:
984 self.execute(
985 self.sql_alter_column
986 % {
987 "table": self.quote_name(model.model_options.db_table),
988 "changes": sql,
989 },
990 params,
991 )
992 if post_actions:
993 for sql, params in post_actions:
994 self.execute(sql, params)
995 # If primary_key changed to False, delete the primary key constraint.
996 if old_field.primary_key and not new_field.primary_key:
997 self._delete_primary_key(model, strict)
998
999 # Added an index? Add an index if db_index switched to True or a unique
1000 # constraint will no longer be used in lieu of an index. The following
1001 # lines from the truth table show all True cases; the rest are False:
1002 #
1003 # old_field.db_index | old_field.primary_key | new_field.db_index | new_field.primary_key
1004 # ------------------------------------------------------------------------------
1005 # False | False | True | False
1006 # False | True | True | False
1007 # True | True | True | False
1008 if (
1009 (
1010 not (isinstance(old_field, ForeignKeyField) and old_field.db_index)
1011 or old_field.primary_key
1012 )
1013 and isinstance(new_field, ForeignKeyField)
1014 and new_field.db_index
1015 and not new_field.primary_key
1016 ):
1017 self.execute(self._create_index_sql(model, fields=[new_field]))
1018 # Type alteration on primary key? Then we need to alter the column
1019 # referring to us.
1020 rels_to_update = []
1021 if drop_foreign_keys:
1022 rels_to_update.extend(_related_non_m2m_objects(old_field, new_field))
1023 # Changed to become primary key?
1024 if self._field_became_primary_key(old_field, new_field):
1025 # Make the new one
1026 self.execute(self._create_primary_key_sql(model, new_field))
1027 # Update all referencing columns
1028 rels_to_update.extend(_related_non_m2m_objects(old_field, new_field))
1029 # Handle our type alters on the other end of rels from the PK stuff above
1030 for old_rel, new_rel in rels_to_update:
1031 rel_db_params = new_rel.field.db_parameters(connection=self.connection)
1032 rel_type = rel_db_params["type"]
1033 rel_collation = rel_db_params.get("collation")
1034 old_rel_db_params = old_rel.field.db_parameters(connection=self.connection)
1035 old_rel_collation = old_rel_db_params.get("collation")
1036 fragment, other_actions = self._alter_column_type_sql(
1037 new_rel.related_model,
1038 old_rel.field,
1039 new_rel.field,
1040 rel_type,
1041 old_rel_collation,
1042 rel_collation,
1043 )
1044 self.execute(
1045 self.sql_alter_column
1046 % {
1047 "table": self.quote_name(
1048 new_rel.related_model.model_options.db_table
1049 ),
1050 "changes": fragment[0],
1051 },
1052 fragment[1],
1053 )
1054 for sql, params in other_actions:
1055 self.execute(sql, params)
1056 # Does it have a foreign key?
1057 if (
1058 self.connection.features.supports_foreign_keys
1059 and isinstance(new_field, ForeignKeyField)
1060 and (
1061 fks_dropped
1062 or not isinstance(old_field, ForeignKeyField)
1063 or not old_field.db_constraint
1064 )
1065 and new_field.db_constraint
1066 ):
1067 self.execute(
1068 self._create_fk_sql(model, new_field, "_fk_%(to_table)s_%(to_column)s")
1069 )
1070 # Rebuild FKs that pointed to us if we previously had to drop them
1071 if drop_foreign_keys:
1072 for _, rel in rels_to_update:
1073 if isinstance(rel.field, ForeignKeyField) and rel.field.db_constraint:
1074 self.execute(
1075 self._create_fk_sql(rel.related_model, rel.field, "_fk")
1076 )
1077 # Does it have check constraints we need to add?
1078 if old_db_check != new_db_check and new_db_check:
1079 constraint_name = self._create_index_name(
1080 model.model_options.db_table, [new_field.column], suffix="_check"
1081 )
1082 sql = self._create_check_sql(model, constraint_name, new_db_params["check"])
1083 if sql is not None:
1084 self.execute(sql)
1085 # Drop the default if we need to
1086 # (Plain usually does not use in-database defaults)
1087 if needs_database_default:
1088 changes_sql, params = self._alter_column_default_sql(
1089 model, old_field, new_field, drop=True
1090 )
1091 sql = self.sql_alter_column % {
1092 "table": self.quote_name(model.model_options.db_table),
1093 "changes": changes_sql,
1094 }
1095 self.execute(sql, params)
1096 # Reset connection if required
1097 if self.connection.features.connection_persists_old_columns:
1098 self.connection.close()
1099
1100 def _alter_column_null_sql(
1101 self, model: type[Model], old_field: Field, new_field: Field
1102 ) -> tuple[str, list[Any]]:
1103 """
1104 Hook to specialize column null alteration.
1105
1106 Return a (sql, params) fragment to set a column to null or non-null
1107 as required by new_field, or None if no changes are required.
1108 """
1109 new_db_params = new_field.db_parameters(connection=self.connection)
1110 sql = (
1111 self.sql_alter_column_null
1112 if new_field.allow_null
1113 else self.sql_alter_column_not_null
1114 )
1115 return (
1116 sql
1117 % {
1118 "column": self.quote_name(new_field.column),
1119 "type": new_db_params["type"],
1120 },
1121 [],
1122 )
1123
1124 def _alter_column_default_sql(
1125 self,
1126 model: type[Model],
1127 old_field: Field | None,
1128 new_field: Field,
1129 drop: bool = False,
1130 ) -> tuple[str, list[Any]]:
1131 """
1132 Hook to specialize column default alteration.
1133
1134 Return a (sql, params) fragment to add or drop (depending on the drop
1135 argument) a default to new_field's column.
1136 """
1137 new_default = self.effective_default(new_field)
1138 default = self._column_default_sql(new_field)
1139 params = [new_default]
1140
1141 if drop:
1142 params = []
1143 elif self.connection.features.requires_literal_defaults:
1144 # Some databases (Oracle) can't take defaults as a parameter
1145 # If this is the case, the SchemaEditor for that database should
1146 # implement prepare_default().
1147 default = self.prepare_default(new_default)
1148 params = []
1149
1150 new_db_params = new_field.db_parameters(connection=self.connection)
1151 if drop:
1152 if new_field.allow_null:
1153 sql = self.sql_alter_column_no_default_null
1154 else:
1155 sql = self.sql_alter_column_no_default
1156 else:
1157 sql = self.sql_alter_column_default
1158 return (
1159 sql
1160 % {
1161 "column": self.quote_name(new_field.column),
1162 "type": new_db_params["type"],
1163 "default": default,
1164 },
1165 params,
1166 )
1167
1168 def _alter_column_type_sql(
1169 self,
1170 model: type[Model],
1171 old_field: Field,
1172 new_field: Field,
1173 new_type: str,
1174 old_collation: str | None,
1175 new_collation: str | None,
1176 ) -> tuple[tuple[str, list[Any]], list[tuple[str, list[Any]]]]:
1177 """
1178 Hook to specialize column type alteration for different backends,
1179 for cases when a creation type is different to an alteration type
1180 (e.g. SERIAL in PostgreSQL, PostGIS fields).
1181
1182 Return a two-tuple of: an SQL fragment of (sql, params) to insert into
1183 an ALTER TABLE statement and a list of extra (sql, params) tuples to
1184 run once the field is altered.
1185 """
1186 other_actions = []
1187 if collate_sql := self._collate_sql(
1188 new_collation, old_collation, model.model_options.db_table
1189 ):
1190 collate_sql = f" {collate_sql}"
1191 else:
1192 collate_sql = ""
1193 # Comment change?
1194 from plain.models.fields.related import ManyToManyField
1195
1196 comment_sql = ""
1197 if self.connection.features.supports_comments and not isinstance(
1198 new_field, ManyToManyField
1199 ):
1200 if old_field.db_comment != new_field.db_comment:
1201 # PostgreSQL and Oracle can't execute 'ALTER COLUMN ...' and
1202 # 'COMMENT ON ...' at the same time.
1203 sql, params = self._alter_column_comment_sql(
1204 model, new_field, new_type, new_field.db_comment
1205 )
1206 if sql:
1207 other_actions.append((sql, params))
1208 if new_field.db_comment:
1209 comment_sql = self._comment_sql(new_field.db_comment)
1210 return (
1211 (
1212 self.sql_alter_column_type
1213 % {
1214 "column": self.quote_name(new_field.column),
1215 "type": new_type,
1216 "collation": collate_sql,
1217 "comment": comment_sql,
1218 },
1219 [],
1220 ),
1221 other_actions,
1222 )
1223
1224 def _alter_column_comment_sql(
1225 self,
1226 model: type[Model],
1227 new_field: Field,
1228 new_type: str,
1229 new_db_comment: str | None,
1230 ) -> tuple[str, list[Any]]:
1231 return (
1232 self.sql_alter_column_comment
1233 % {
1234 "table": self.quote_name(model.model_options.db_table),
1235 "column": self.quote_name(new_field.column),
1236 "comment": self._comment_sql(new_db_comment),
1237 },
1238 [],
1239 )
1240
1241 def _comment_sql(self, comment: str | None) -> str:
1242 return self.quote_value(comment or "")
1243
1244 def _alter_many_to_many(
1245 self,
1246 model: type[Model],
1247 old_field: ManyToManyField,
1248 new_field: ManyToManyField,
1249 strict: bool,
1250 ) -> None:
1251 """Alter M2Ms to repoint their to= endpoints."""
1252 # Type narrow for ManyToManyField.remote_field
1253 old_rel: ManyToManyRel = old_field.remote_field
1254 new_rel: ManyToManyRel = new_field.remote_field
1255
1256 # Rename the through table
1257 if (
1258 old_rel.through.model_options.db_table
1259 != new_rel.through.model_options.db_table
1260 ):
1261 self.alter_db_table(
1262 old_rel.through,
1263 old_rel.through.model_options.db_table,
1264 new_rel.through.model_options.db_table,
1265 )
1266 # Repoint the FK to the other side
1267 old_reverse_field = old_rel.through._model_meta.get_forward_field(
1268 old_field.m2m_reverse_field_name()
1269 )
1270 new_reverse_field = new_rel.through._model_meta.get_forward_field(
1271 new_field.m2m_reverse_field_name()
1272 )
1273 self.alter_field(
1274 new_rel.through,
1275 # The field that points to the target model is needed, so we can
1276 # tell alter_field to change it - this is m2m_reverse_field_name()
1277 # (as opposed to m2m_field_name(), which points to our model).
1278 old_reverse_field,
1279 new_reverse_field,
1280 )
1281 old_m2m_field = old_rel.through._model_meta.get_forward_field(
1282 old_field.m2m_field_name()
1283 )
1284 new_m2m_field = new_rel.through._model_meta.get_forward_field(
1285 new_field.m2m_field_name()
1286 )
1287 self.alter_field(
1288 new_rel.through,
1289 # for self-referential models we need to alter field from the other end too
1290 old_m2m_field,
1291 new_m2m_field,
1292 )
1293
1294 def _create_index_name(
1295 self, table_name: str, column_names: list[str], suffix: str = ""
1296 ) -> str:
1297 """
1298 Generate a unique name for an index/unique constraint.
1299
1300 The name is divided into 3 parts: the table name, the column names,
1301 and a unique digest and suffix.
1302 """
1303 _, table_name = split_identifier(table_name)
1304 hash_suffix_part = (
1305 f"{names_digest(table_name, *column_names, length=8)}{suffix}"
1306 )
1307 max_length = self.connection.ops.max_name_length() or 200
1308 # If everything fits into max_length, use that name.
1309 index_name = "{}_{}_{}".format(
1310 table_name, "_".join(column_names), hash_suffix_part
1311 )
1312 if len(index_name) <= max_length:
1313 return index_name
1314 # Shorten a long suffix.
1315 if len(hash_suffix_part) > max_length / 3:
1316 hash_suffix_part = hash_suffix_part[: max_length // 3]
1317 other_length = (max_length - len(hash_suffix_part)) // 2 - 1
1318 index_name = "{}_{}_{}".format(
1319 table_name[:other_length],
1320 "_".join(column_names)[:other_length],
1321 hash_suffix_part,
1322 )
1323 # Prepend D if needed to prevent the name from starting with an
1324 # underscore or a number (not permitted on Oracle).
1325 if index_name[0] == "_" or index_name[0].isdigit():
1326 index_name = f"D{index_name[:-1]}"
1327 return index_name
1328
1329 def _index_condition_sql(self, condition: str | None) -> str:
1330 if condition:
1331 return " WHERE " + condition
1332 return ""
1333
1334 def _index_include_sql(
1335 self, model: type[Model], columns: list[str] | None
1336 ) -> str | Statement:
1337 if not columns or not self.connection.features.supports_covering_indexes:
1338 return ""
1339 return Statement(
1340 " INCLUDE (%(columns)s)",
1341 columns=Columns(model.model_options.db_table, columns, self.quote_name),
1342 )
1343
1344 def _create_index_sql(
1345 self,
1346 model: type[Model],
1347 *,
1348 fields: list[Field] | None = None,
1349 name: str | None = None,
1350 suffix: str = "",
1351 using: str = "",
1352 col_suffixes: tuple[str, ...] = (),
1353 sql: str | None = None,
1354 opclasses: tuple[str, ...] = (),
1355 condition: str | None = None,
1356 include: list[str] | None = None,
1357 expressions: Any = None,
1358 ) -> Statement:
1359 """
1360 Return the SQL statement to create the index for one or several fields
1361 or expressions. `sql` can be specified if the syntax differs from the
1362 standard (GIS indexes, ...).
1363 """
1364 fields = fields or []
1365 expressions = expressions or []
1366 compiler = Query(model, alias_cols=False).get_compiler()
1367 columns = [field.column for field in fields]
1368 sql_create_index = sql or self.sql_create_index
1369 table = model.model_options.db_table
1370
1371 def create_index_name(*args: Any, **kwargs: Any) -> str:
1372 nonlocal name
1373 if name is None:
1374 name = self._create_index_name(*args, **kwargs)
1375 return self.quote_name(name)
1376
1377 return Statement(
1378 sql_create_index,
1379 table=Table(table, self.quote_name),
1380 name=IndexName(table, columns, suffix, create_index_name),
1381 using=using,
1382 columns=(
1383 self._index_columns(table, columns, col_suffixes, opclasses)
1384 if columns
1385 else Expressions(table, expressions, compiler, self.quote_value)
1386 ),
1387 extra="",
1388 condition=self._index_condition_sql(condition),
1389 include=self._index_include_sql(model, include),
1390 )
1391
1392 def _delete_index_sql(
1393 self, model: type[Model], name: str, sql: str | None = None
1394 ) -> Statement:
1395 return Statement(
1396 sql or self.sql_delete_index,
1397 table=Table(model.model_options.db_table, self.quote_name),
1398 name=self.quote_name(name),
1399 )
1400
1401 def _rename_index_sql(
1402 self, model: type[Model], old_name: str, new_name: str
1403 ) -> Statement:
1404 return Statement(
1405 self.sql_rename_index,
1406 table=Table(model.model_options.db_table, self.quote_name),
1407 old_name=self.quote_name(old_name),
1408 new_name=self.quote_name(new_name),
1409 )
1410
1411 def _index_columns(
1412 self,
1413 table: str,
1414 columns: list[str],
1415 col_suffixes: tuple[str, ...],
1416 opclasses: tuple[str, ...],
1417 ) -> Columns:
1418 return Columns(table, columns, self.quote_name, col_suffixes=col_suffixes)
1419
1420 def _model_indexes_sql(self, model: type[Model]) -> list[Statement | None]:
1421 """
1422 Return a list of all index SQL statements (field indexes, Meta.indexes) for the specified model.
1423 """
1424 output: list[Statement | None] = []
1425 for field in model._model_meta.local_fields:
1426 output.extend(self._field_indexes_sql(model, field))
1427
1428 for index in model.model_options.indexes:
1429 if (
1430 not index.contains_expressions
1431 or self.connection.features.supports_expression_indexes
1432 ):
1433 output.append(index.create_sql(model, self))
1434 return output
1435
1436 def _field_indexes_sql(self, model: type[Model], field: Field) -> list[Statement]:
1437 """
1438 Return a list of all index SQL statements for the specified field.
1439 """
1440 output: list[Statement] = []
1441 if self._field_should_be_indexed(model, field):
1442 output.append(self._create_index_sql(model, fields=[field]))
1443 return output
1444
1445 def _field_should_be_altered(
1446 self, old_field: Field, new_field: Field, ignore: set[str] | None = None
1447 ) -> bool:
1448 ignore = ignore or set()
1449 _, old_path, old_args, old_kwargs = old_field.deconstruct()
1450 _, new_path, new_args, new_kwargs = new_field.deconstruct()
1451 # Don't alter when:
1452 # - changing only a field name
1453 # - changing an attribute that doesn't affect the schema
1454 # - changing an attribute in the provided set of ignored attributes
1455 # - adding only a db_column and the column name is not changed
1456 for attr in ignore.union(old_field.non_db_attrs):
1457 old_kwargs.pop(attr, None)
1458 for attr in ignore.union(new_field.non_db_attrs):
1459 new_kwargs.pop(attr, None)
1460 return self.quote_name(old_field.column) != self.quote_name(
1461 new_field.column
1462 ) or (old_path, old_args, old_kwargs) != (new_path, new_args, new_kwargs)
1463
1464 def _field_should_be_indexed(self, model: type[Model], field: Field) -> bool:
1465 if isinstance(field, ForeignKeyField):
1466 return bool(field.remote_field) and field.db_index and not field.primary_key
1467 return False
1468
1469 def _field_became_primary_key(self, old_field: Field, new_field: Field) -> bool:
1470 return not old_field.primary_key and new_field.primary_key
1471
1472 def _rename_field_sql(
1473 self, table: str, old_field: Field, new_field: Field, new_type: str
1474 ) -> str:
1475 return self.sql_rename_column % {
1476 "table": self.quote_name(table),
1477 "old_column": self.quote_name(old_field.column),
1478 "new_column": self.quote_name(new_field.column),
1479 "type": new_type,
1480 }
1481
1482 def _create_fk_sql(
1483 self, model: type[Model], field: ForeignKeyField, suffix: str
1484 ) -> Statement:
1485 table = Table(model.model_options.db_table, self.quote_name)
1486 name = self._fk_constraint_name(model, field, suffix)
1487 column = Columns(model.model_options.db_table, [field.column], self.quote_name)
1488 to_table = Table(
1489 field.target_field.model.model_options.db_table, self.quote_name
1490 )
1491 to_column = Columns(
1492 field.target_field.model.model_options.db_table,
1493 [field.target_field.column],
1494 self.quote_name,
1495 )
1496 deferrable = self.connection.ops.deferrable_sql()
1497 return Statement(
1498 self.sql_create_fk,
1499 table=table,
1500 name=name,
1501 column=column,
1502 to_table=to_table,
1503 to_column=to_column,
1504 deferrable=deferrable,
1505 )
1506
1507 def _fk_constraint_name(
1508 self, model: type[Model], field: ForeignKeyField, suffix: str
1509 ) -> ForeignKeyName:
1510 def create_fk_name(*args: Any, **kwargs: Any) -> str:
1511 return self.quote_name(self._create_index_name(*args, **kwargs))
1512
1513 return ForeignKeyName(
1514 model.model_options.db_table,
1515 [field.column],
1516 split_identifier(field.target_field.model.model_options.db_table)[1],
1517 [field.target_field.column],
1518 suffix,
1519 create_fk_name,
1520 )
1521
1522 def _delete_fk_sql(self, model: type[Model], name: str) -> Statement:
1523 return self._delete_constraint_sql(self.sql_delete_fk, model, name)
1524
1525 def _deferrable_constraint_sql(self, deferrable: Deferrable | None) -> str:
1526 if deferrable is None:
1527 return ""
1528 if deferrable == Deferrable.DEFERRED:
1529 return " DEFERRABLE INITIALLY DEFERRED"
1530 if deferrable == Deferrable.IMMEDIATE:
1531 return " DEFERRABLE INITIALLY IMMEDIATE"
1532 return ""
1533
1534 def _unique_sql(
1535 self,
1536 model: type[Model],
1537 fields: Iterable[Field],
1538 name: str,
1539 condition: str | None = None,
1540 deferrable: Deferrable | None = None,
1541 include: list[str] | None = None,
1542 opclasses: tuple[str, ...] | None = None,
1543 expressions: Any = None,
1544 ) -> str | None:
1545 if (
1546 deferrable
1547 and not self.connection.features.supports_deferrable_unique_constraints
1548 ):
1549 return None
1550 if condition or include or opclasses or expressions:
1551 # Databases support conditional, covering, and functional unique
1552 # constraints via a unique index.
1553 sql = self._create_unique_sql(
1554 model,
1555 fields,
1556 name=name,
1557 condition=condition,
1558 include=include,
1559 opclasses=opclasses,
1560 expressions=expressions,
1561 )
1562 if sql:
1563 self.deferred_sql.append(sql)
1564 return None
1565 constraint = self.sql_unique_constraint % {
1566 "columns": ", ".join([self.quote_name(field.column) for field in fields]),
1567 "deferrable": self._deferrable_constraint_sql(deferrable),
1568 }
1569 return self.sql_constraint % {
1570 "name": self.quote_name(name),
1571 "constraint": constraint,
1572 }
1573
1574 def _create_unique_sql(
1575 self,
1576 model: type[Model],
1577 fields: Iterable[Field],
1578 name: str | None = None,
1579 condition: str | None = None,
1580 deferrable: Deferrable | None = None,
1581 include: list[str] | None = None,
1582 opclasses: tuple[str, ...] | None = None,
1583 expressions: Any = None,
1584 ) -> Statement | None:
1585 if (
1586 (
1587 deferrable
1588 and not self.connection.features.supports_deferrable_unique_constraints
1589 )
1590 or (condition and not self.connection.features.supports_partial_indexes)
1591 or (include and not self.connection.features.supports_covering_indexes)
1592 or (
1593 expressions and not self.connection.features.supports_expression_indexes
1594 )
1595 ):
1596 return None
1597
1598 compiler = Query(model, alias_cols=False).get_compiler()
1599 table = model.model_options.db_table
1600 columns = [field.column for field in fields]
1601 constraint_name: IndexName | str
1602 if name is None:
1603 constraint_name = self._unique_constraint_name(table, columns, quote=True)
1604 else:
1605 constraint_name = self.quote_name(name)
1606 if condition or include or opclasses or expressions:
1607 sql = self.sql_create_unique_index
1608 else:
1609 sql = self.sql_create_unique
1610 if columns:
1611 columns_obj: Columns | Expressions = self._index_columns(
1612 table, columns, col_suffixes=(), opclasses=opclasses or ()
1613 )
1614 else:
1615 columns_obj = Expressions(table, expressions, compiler, self.quote_value)
1616 return Statement(
1617 sql,
1618 table=Table(table, self.quote_name),
1619 name=constraint_name,
1620 columns=columns_obj,
1621 condition=self._index_condition_sql(condition),
1622 deferrable=self._deferrable_constraint_sql(deferrable),
1623 include=self._index_include_sql(model, include),
1624 )
1625
1626 def _unique_constraint_name(
1627 self, table: str, columns: list[str], quote: bool = True
1628 ) -> IndexName | str:
1629 if quote:
1630
1631 def create_unique_name(*args: Any, **kwargs: Any) -> str:
1632 return self.quote_name(self._create_index_name(*args, **kwargs))
1633
1634 else:
1635 create_unique_name = self._create_index_name
1636
1637 return IndexName(table, columns, "_uniq", create_unique_name)
1638
1639 def _delete_unique_sql(
1640 self,
1641 model: type[Model],
1642 name: str,
1643 condition: str | None = None,
1644 deferrable: Deferrable | None = None,
1645 include: list[str] | None = None,
1646 opclasses: tuple[str, ...] | None = None,
1647 expressions: Any = None,
1648 ) -> Statement | None:
1649 if (
1650 (
1651 deferrable
1652 and not self.connection.features.supports_deferrable_unique_constraints
1653 )
1654 or (condition and not self.connection.features.supports_partial_indexes)
1655 or (include and not self.connection.features.supports_covering_indexes)
1656 or (
1657 expressions and not self.connection.features.supports_expression_indexes
1658 )
1659 ):
1660 return None
1661 if condition or include or opclasses or expressions:
1662 sql = self.sql_delete_index
1663 else:
1664 sql = self.sql_delete_unique
1665 return self._delete_constraint_sql(sql, model, name)
1666
1667 def _check_sql(self, name: str, check: str) -> str:
1668 return self.sql_constraint % {
1669 "name": self.quote_name(name),
1670 "constraint": self.sql_check_constraint % {"check": check},
1671 }
1672
1673 def _create_check_sql(
1674 self, model: type[Model], name: str, check: str
1675 ) -> Statement | None:
1676 if not self.connection.features.supports_table_check_constraints:
1677 return None
1678 return Statement(
1679 self.sql_create_check,
1680 table=Table(model.model_options.db_table, self.quote_name),
1681 name=self.quote_name(name),
1682 check=check,
1683 )
1684
1685 def _delete_check_sql(self, model: type[Model], name: str) -> Statement | None:
1686 if not self.connection.features.supports_table_check_constraints:
1687 return None
1688 return self._delete_constraint_sql(self.sql_delete_check, model, name)
1689
1690 def _delete_constraint_sql(
1691 self, template: str, model: type[Model], name: str
1692 ) -> Statement:
1693 return Statement(
1694 template,
1695 table=Table(model.model_options.db_table, self.quote_name),
1696 name=self.quote_name(name),
1697 )
1698
1699 def _constraint_names(
1700 self,
1701 model: type[Model],
1702 column_names: list[str] | None = None,
1703 unique: bool | None = None,
1704 primary_key: bool | None = None,
1705 index: bool | None = None,
1706 foreign_key: bool | None = None,
1707 check: bool | None = None,
1708 type_: str | None = None,
1709 exclude: set[str] | None = None,
1710 ) -> list[str]:
1711 """Return all constraint names matching the columns and conditions."""
1712 if column_names is not None:
1713 column_names = [
1714 self.connection.introspection.identifier_converter(
1715 truncate_name(name, self.connection.ops.max_name_length())
1716 )
1717 if self.connection.features.truncates_names
1718 else self.connection.introspection.identifier_converter(name)
1719 for name in column_names
1720 ]
1721 with self.connection.cursor() as cursor:
1722 constraints = self.connection.introspection.get_constraints(
1723 cursor, model.model_options.db_table
1724 )
1725 result: list[str] = []
1726 for name, infodict in constraints.items():
1727 if column_names is None or column_names == infodict["columns"]:
1728 if unique is not None and infodict["unique"] != unique:
1729 continue
1730 if primary_key is not None and infodict["primary_key"] != primary_key:
1731 continue
1732 if index is not None and infodict["index"] != index:
1733 continue
1734 if check is not None and infodict["check"] != check:
1735 continue
1736 if foreign_key is not None and not infodict["foreign_key"]:
1737 continue
1738 if type_ is not None and infodict["type"] != type_:
1739 continue
1740 if not exclude or name not in exclude:
1741 result.append(name)
1742 return result
1743
1744 def _delete_primary_key(self, model: type[Model], strict: bool = False) -> None:
1745 constraint_names = self._constraint_names(model, primary_key=True)
1746 if strict and len(constraint_names) != 1:
1747 raise ValueError(
1748 f"Found wrong number ({len(constraint_names)}) of PK constraints for {model.model_options.db_table}"
1749 )
1750 for constraint_name in constraint_names:
1751 self.execute(self._delete_primary_key_sql(model, constraint_name))
1752
1753 def _create_primary_key_sql(self, model: type[Model], field: Field) -> Statement:
1754 return Statement(
1755 self.sql_create_pk,
1756 table=Table(model.model_options.db_table, self.quote_name),
1757 name=self.quote_name(
1758 self._create_index_name(
1759 model.model_options.db_table, [field.column], suffix="_pk"
1760 )
1761 ),
1762 columns=Columns(
1763 model.model_options.db_table, [field.column], self.quote_name
1764 ),
1765 )
1766
1767 def _delete_primary_key_sql(self, model: type[Model], name: str) -> Statement:
1768 return self._delete_constraint_sql(self.sql_delete_pk, model, name)
1769
1770 def _collate_sql(
1771 self,
1772 collation: str | None,
1773 old_collation: str | None = None,
1774 table_name: str | None = None,
1775 ) -> str:
1776 return "COLLATE " + self.quote_name(collation) if collation else ""