Plain is headed towards 1.0! Subscribe for development updates →

   1from __future__ import annotations
   2
   3import logging
   4import operator
   5from abc import ABC, abstractmethod
   6from collections.abc import Generator
   7from datetime import datetime
   8from typing import TYPE_CHECKING, Any
   9
  10if TYPE_CHECKING:
  11    from typing import Self
  12
  13from plain.models.backends.ddl_references import (
  14    Columns,
  15    Expressions,
  16    ForeignKeyName,
  17    IndexName,
  18    Statement,
  19    Table,
  20)
  21from plain.models.backends.utils import names_digest, split_identifier, truncate_name
  22from plain.models.constraints import Deferrable
  23from plain.models.indexes import Index
  24from plain.models.sql import Query
  25from plain.models.transaction import TransactionManagementError, atomic
  26from plain.utils import timezone
  27
  28if TYPE_CHECKING:
  29    from collections.abc import Iterable
  30
  31    from plain.models.backends.base.base import BaseDatabaseWrapper
  32    from plain.models.base import Model
  33    from plain.models.constraints import BaseConstraint
  34    from plain.models.fields import Field
  35    from plain.models.fields.related import ForeignKey, ManyToManyField
  36    from plain.models.fields.reverse_related import ManyToManyRel
  37
  38logger = logging.getLogger("plain.models.backends.schema")
  39
  40
  41def _is_relevant_relation(relation: Any, altered_field: Field) -> bool:
  42    """
  43    When altering the given field, must constraints on its model from the given
  44    relation be temporarily dropped?
  45    """
  46    field = relation.field
  47    if field.many_to_many:
  48        # M2M reverse field
  49        return False
  50    if altered_field.primary_key:
  51        # Foreign key constraint on the primary key, which is being altered.
  52        return True
  53    # ForeignKey always targets 'id'
  54    return altered_field.name == "id"
  55
  56
  57def _all_related_fields(model: type[Model]) -> list[Any]:
  58    # Related fields must be returned in a deterministic order.
  59    return sorted(
  60        model._model_meta._get_fields(
  61            forward=False,
  62            reverse=True,
  63        ),
  64        key=operator.attrgetter("name"),
  65    )
  66
  67
  68def _related_non_m2m_objects(
  69    old_field: Field, new_field: Field
  70) -> Generator[tuple[Any, Any], None, None]:
  71    # Filter out m2m objects from reverse relations.
  72    # Return (old_relation, new_relation) tuples.
  73    related_fields = zip(
  74        (
  75            obj
  76            for obj in _all_related_fields(old_field.model)
  77            if _is_relevant_relation(obj, old_field)
  78        ),
  79        (
  80            obj
  81            for obj in _all_related_fields(new_field.model)
  82            if _is_relevant_relation(obj, new_field)
  83        ),
  84    )
  85    for old_rel, new_rel in related_fields:
  86        yield old_rel, new_rel
  87        yield from _related_non_m2m_objects(
  88            old_rel.remote_field,
  89            new_rel.remote_field,
  90        )
  91
  92
  93class BaseDatabaseSchemaEditor(ABC):
  94    """
  95    This class and its subclasses are responsible for emitting schema-changing
  96    statements to the databases - model creation/removal/alteration, field
  97    renaming, index fiddling, and so on.
  98    """
  99
 100    # Overrideable SQL templates
 101    sql_create_table = "CREATE TABLE %(table)s (%(definition)s)"
 102    sql_rename_table = "ALTER TABLE %(old_table)s RENAME TO %(new_table)s"
 103    sql_delete_table = "DROP TABLE %(table)s CASCADE"
 104
 105    sql_create_column = "ALTER TABLE %(table)s ADD COLUMN %(column)s %(definition)s"
 106    sql_alter_column = "ALTER TABLE %(table)s %(changes)s"
 107    sql_alter_column_type = "ALTER COLUMN %(column)s TYPE %(type)s%(collation)s"
 108    sql_alter_column_null = "ALTER COLUMN %(column)s DROP NOT NULL"
 109    sql_alter_column_not_null = "ALTER COLUMN %(column)s SET NOT NULL"
 110    sql_alter_column_default = "ALTER COLUMN %(column)s SET DEFAULT %(default)s"
 111    sql_alter_column_no_default = "ALTER COLUMN %(column)s DROP DEFAULT"
 112    sql_alter_column_no_default_null = sql_alter_column_no_default
 113    sql_delete_column = "ALTER TABLE %(table)s DROP COLUMN %(column)s CASCADE"
 114    sql_rename_column = (
 115        "ALTER TABLE %(table)s RENAME COLUMN %(old_column)s TO %(new_column)s"
 116    )
 117    sql_update_with_default = (
 118        "UPDATE %(table)s SET %(column)s = %(default)s WHERE %(column)s IS NULL"
 119    )
 120
 121    sql_unique_constraint = "UNIQUE (%(columns)s)%(deferrable)s"
 122    sql_check_constraint = "CHECK (%(check)s)"
 123    sql_delete_constraint = "ALTER TABLE %(table)s DROP CONSTRAINT %(name)s"
 124    sql_constraint = "CONSTRAINT %(name)s %(constraint)s"
 125
 126    sql_create_check = "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s CHECK (%(check)s)"
 127    sql_delete_check = sql_delete_constraint
 128
 129    sql_create_unique = (
 130        "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s "
 131        "UNIQUE (%(columns)s)%(deferrable)s"
 132    )
 133    sql_delete_unique = sql_delete_constraint
 134
 135    sql_create_fk = (
 136        "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s FOREIGN KEY (%(column)s) "
 137        "REFERENCES %(to_table)s (%(to_column)s)%(deferrable)s"
 138    )
 139    sql_create_inline_fk = None
 140    sql_create_column_inline_fk = None
 141    sql_delete_fk = sql_delete_constraint
 142
 143    sql_create_index = (
 144        "CREATE INDEX %(name)s ON %(table)s "
 145        "(%(columns)s)%(include)s%(extra)s%(condition)s"
 146    )
 147    sql_create_unique_index = (
 148        "CREATE UNIQUE INDEX %(name)s ON %(table)s "
 149        "(%(columns)s)%(include)s%(condition)s"
 150    )
 151    sql_rename_index = "ALTER INDEX %(old_name)s RENAME TO %(new_name)s"
 152    sql_delete_index = "DROP INDEX %(name)s"
 153
 154    sql_create_pk = (
 155        "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s PRIMARY KEY (%(columns)s)"
 156    )
 157    sql_delete_pk = sql_delete_constraint
 158
 159    sql_alter_table_comment = "COMMENT ON TABLE %(table)s IS %(comment)s"
 160    sql_alter_column_comment = "COMMENT ON COLUMN %(table)s.%(column)s IS %(comment)s"
 161
 162    def __init__(
 163        self,
 164        connection: BaseDatabaseWrapper,
 165        atomic: bool = True,
 166    ):
 167        self.connection = connection
 168        self.atomic_migration = self.connection.features.can_rollback_ddl and atomic
 169
 170    # State-managing methods
 171
 172    def __enter__(self) -> Self:
 173        self.deferred_sql: list[Any] = []
 174        self.executed_sql: list[str] = []
 175        if self.atomic_migration:
 176            self.atomic = atomic()
 177            self.atomic.__enter__()
 178        return self
 179
 180    def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
 181        if exc_type is None:
 182            for sql in self.deferred_sql:
 183                self.execute(sql)
 184        if self.atomic_migration:
 185            self.atomic.__exit__(exc_type, exc_value, traceback)
 186
 187    # Core utility functions
 188
 189    def execute(
 190        self, sql: str | Statement, params: tuple[Any, ...] | list[Any] | None = ()
 191    ) -> None:
 192        """Execute the given SQL statement, with optional parameters."""
 193        if (
 194            self.connection.in_atomic_block
 195            and not self.connection.features.can_rollback_ddl
 196        ):
 197            raise TransactionManagementError(
 198                "Executing DDL statements while in a transaction on databases "
 199                "that can't perform a rollback is prohibited."
 200            )
 201        # Account for non-string statement objects.
 202        sql = str(sql)
 203        # Log the command we're running, then run it
 204        logger.debug(
 205            "%s; (params %r)", sql, params, extra={"params": params, "sql": sql}
 206        )
 207
 208        # Track executed SQL for display in migration output
 209        # Store the SQL for display (interpolate params for readability)
 210        if params:
 211            self.executed_sql.append(sql % tuple(map(self.quote_value, params)))
 212        else:
 213            self.executed_sql.append(sql)
 214
 215        with self.connection.cursor() as cursor:
 216            cursor.execute(sql, params)
 217
 218    def quote_name(self, name: str) -> str:
 219        return self.connection.ops.quote_name(name)
 220
 221    def table_sql(self, model: type[Model]) -> tuple[str, list[Any]]:
 222        """Take a model and return its table definition."""
 223        # Create column SQL, add FK deferreds if needed.
 224        column_sqls = []
 225        params = []
 226        for field in model._model_meta.local_fields:
 227            # SQL.
 228            definition, extra_params = self.column_sql(model, field)
 229            if definition is None:
 230                continue
 231            # Check constraints can go on the column SQL here.
 232            db_params = field.db_parameters(connection=self.connection)
 233            if db_params["check"]:
 234                definition += " " + self.sql_check_constraint % db_params
 235            # Autoincrement SQL (for backends with inline variant).
 236            col_type_suffix = field.db_type_suffix(connection=self.connection)
 237            if col_type_suffix:
 238                definition += f" {col_type_suffix}"
 239            if extra_params:
 240                params.extend(extra_params)
 241            # FK.
 242            if field.remote_field and field.db_constraint:  # type: ignore[attr-defined]
 243                # After checking these attributes, we know field is a ForeignKey
 244                fk_field: ForeignKey = field  # type: ignore[assignment]
 245                assert fk_field.remote_field is not None  # Checked in if condition
 246                to_table = fk_field.remote_field.model.model_options.db_table
 247                to_column = fk_field.remote_field.model._model_meta.get_field(
 248                    fk_field.remote_field.field_name
 249                ).column
 250                if self.sql_create_inline_fk:
 251                    definition += " " + self.sql_create_inline_fk % {
 252                        "to_table": self.quote_name(to_table),
 253                        "to_column": self.quote_name(to_column),
 254                    }
 255                elif self.connection.features.supports_foreign_keys:
 256                    self.deferred_sql.append(
 257                        self._create_fk_sql(
 258                            model, fk_field, "_fk_%(to_table)s_%(to_column)s"
 259                        )
 260                    )
 261            # Add the SQL to our big list.
 262            column_sqls.append(f"{self.quote_name(field.column)} {definition}")
 263            # Autoincrement SQL (for backends with post table definition
 264            # variant).
 265            if field.get_internal_type() in ("PrimaryKeyField",):
 266                autoinc_sql = self.connection.ops.autoinc_sql(
 267                    model.model_options.db_table, field.column
 268                )
 269                if autoinc_sql:
 270                    self.deferred_sql.extend(autoinc_sql)
 271        constraints = [
 272            constraint.constraint_sql(model, self)
 273            for constraint in model.model_options.constraints
 274        ]
 275        sql = self.sql_create_table % {
 276            "table": self.quote_name(model.model_options.db_table),
 277            "definition": ", ".join(
 278                str(constraint)
 279                for constraint in (*column_sqls, *constraints)
 280                if constraint
 281            ),
 282        }
 283        return sql, params
 284
 285    # Field <-> database mapping functions
 286
 287    def _iter_column_sql(
 288        self,
 289        column_db_type: str,
 290        params: list[Any],
 291        model: type[Model],
 292        field: Field,
 293        field_db_params: dict[str, Any],
 294        include_default: bool,
 295    ) -> Generator[str, None, None]:
 296        yield column_db_type
 297        if collation := field_db_params.get("collation"):
 298            yield self._collate_sql(collation)
 299        if self.connection.features.supports_comments_inline and field.db_comment:
 300            yield self._comment_sql(field.db_comment)
 301        # Work out nullability.
 302        null = field.allow_null
 303        # Include a default value, if requested.
 304        include_default = (
 305            include_default
 306            and not self.skip_default(field)
 307            and
 308            # Don't include a default value if it's a nullable field and the
 309            # default cannot be dropped in the ALTER COLUMN statement (e.g.
 310            # MySQL longtext and longblob).
 311            not (null and self.skip_default_on_alter(field))
 312        )
 313        if include_default:
 314            default_value = self.effective_default(field)
 315            if default_value is not None:
 316                column_default = "DEFAULT " + self._column_default_sql(field)
 317                if self.connection.features.requires_literal_defaults:
 318                    # Some databases can't take defaults as a parameter (Oracle).
 319                    # If this is the case, the individual schema backend should
 320                    # implement prepare_default().
 321                    yield column_default % self.prepare_default(default_value)
 322                else:
 323                    yield column_default
 324                    params.append(default_value)
 325
 326        if not null:
 327            yield "NOT NULL"
 328        elif not self.connection.features.implied_column_null:
 329            yield "NULL"
 330
 331        if field.primary_key:
 332            yield "PRIMARY KEY"
 333
 334    def column_sql(
 335        self, model: type[Model], field: Field, include_default: bool = False
 336    ) -> tuple[str | None, list[Any] | None]:
 337        """
 338        Return the column definition for a field. The field must already have
 339        had set_attributes_from_name() called.
 340        """
 341        # Get the column's type and use that as the basis of the SQL.
 342        field_db_params = field.db_parameters(connection=self.connection)
 343        column_db_type = field_db_params["type"]
 344        # Check for fields that aren't actually columns (e.g. M2M).
 345        if column_db_type is None:
 346            return None, None
 347        params: list[Any] = []
 348        return (
 349            " ".join(
 350                # This appends to the params being returned.
 351                self._iter_column_sql(
 352                    column_db_type,
 353                    params,
 354                    model,
 355                    field,
 356                    field_db_params,
 357                    include_default,
 358                )
 359            ),
 360            params,
 361        )
 362
 363    def skip_default(self, field: Field) -> bool:
 364        """
 365        Some backends don't accept default values for certain columns types
 366        (i.e. MySQL longtext and longblob).
 367        """
 368        return False
 369
 370    def skip_default_on_alter(self, field: Field) -> bool:
 371        """
 372        Some backends don't accept default values for certain columns types
 373        (i.e. MySQL longtext and longblob) in the ALTER COLUMN statement.
 374        """
 375        return False
 376
 377    def prepare_default(self, value: Any) -> str:
 378        """
 379        Only used for backends which have requires_literal_defaults feature
 380        """
 381        raise NotImplementedError(
 382            "subclasses of BaseDatabaseSchemaEditor for backends which have "
 383            "requires_literal_defaults must provide a prepare_default() method"
 384        )
 385
 386    def _column_default_sql(self, field: Field) -> str:
 387        """
 388        Return the SQL to use in a DEFAULT clause. The resulting string should
 389        contain a '%s' placeholder for a default value.
 390        """
 391        return "%s"
 392
 393    @staticmethod
 394    def _effective_default(field: Field) -> Any:
 395        # This method allows testing its logic without a connection.
 396        if field.has_default():
 397            default = field.get_default()
 398        elif (
 399            not field.allow_null and not field.required and field.empty_strings_allowed
 400        ):
 401            if field.get_internal_type() == "BinaryField":
 402                default = b""
 403            else:
 404                default = ""
 405        elif getattr(field, "auto_now", False) or getattr(field, "auto_now_add", False):
 406            internal_type = field.get_internal_type()
 407            if internal_type == "DateTimeField":
 408                default = timezone.now()
 409            else:
 410                default = datetime.now()
 411                if internal_type == "DateField":
 412                    default = default.date()
 413                elif internal_type == "TimeField":
 414                    default = default.time()
 415        else:
 416            default = None
 417        return default
 418
 419    def effective_default(self, field: Field) -> Any:
 420        """Return a field's effective database default value."""
 421        return field.get_db_prep_save(self._effective_default(field), self.connection)
 422
 423    @abstractmethod
 424    def quote_value(self, value: Any) -> str:
 425        """
 426        Return a quoted version of the value so it's safe to use in an SQL
 427        string. This is not safe against injection from user code; it is
 428        intended only for use in making SQL scripts or preparing default values
 429        for particularly tricky backends (defaults are not user-defined, though,
 430        so this is safe).
 431        """
 432        ...
 433
 434    # Actions
 435
 436    def create_model(self, model: type[Model]) -> None:
 437        """
 438        Create a table and any accompanying indexes or unique constraints for
 439        the given `model`.
 440        """
 441        sql, params = self.table_sql(model)
 442        # Prevent using [] as params, in the case a literal '%' is used in the
 443        # definition.
 444        self.execute(sql, params or None)
 445
 446        if self.connection.features.supports_comments:
 447            # Add table comment.
 448            if model.model_options.db_table_comment:
 449                self.alter_db_table_comment(
 450                    model, None, model.model_options.db_table_comment
 451                )
 452            # Add column comments.
 453            if not self.connection.features.supports_comments_inline:
 454                for field in model._model_meta.local_fields:
 455                    if field.db_comment:
 456                        field_db_params = field.db_parameters(
 457                            connection=self.connection
 458                        )
 459                        field_type = field_db_params["type"]
 460                        self.execute(
 461                            *self._alter_column_comment_sql(
 462                                model, field, field_type, field.db_comment
 463                            )
 464                        )
 465        # Add any field index (deferred as SQLite _remake_table needs it).
 466        self.deferred_sql.extend(self._model_indexes_sql(model))
 467
 468    def delete_model(self, model: type[Model]) -> None:
 469        """Delete a model from the database."""
 470
 471        # Delete the table
 472        self.execute(
 473            self.sql_delete_table
 474            % {
 475                "table": self.quote_name(model.model_options.db_table),
 476            }
 477        )
 478        # Remove all deferred statements referencing the deleted table.
 479        for sql in list(self.deferred_sql):
 480            if isinstance(sql, Statement) and sql.references_table(
 481                model.model_options.db_table
 482            ):
 483                self.deferred_sql.remove(sql)
 484
 485    def add_index(self, model: type[Model], index: Index) -> None:
 486        """Add an index on a model."""
 487        if (
 488            index.contains_expressions
 489            and not self.connection.features.supports_expression_indexes
 490        ):
 491            return None
 492        # Index.create_sql returns interpolated SQL which makes params=None a
 493        # necessity to avoid escaping attempts on execution.
 494        self.execute(index.create_sql(model, self), params=None)
 495
 496    def remove_index(self, model: type[Model], index: Index) -> None:
 497        """Remove an index from a model."""
 498        if (
 499            index.contains_expressions
 500            and not self.connection.features.supports_expression_indexes
 501        ):
 502            return None
 503        self.execute(index.remove_sql(model, self))
 504
 505    def rename_index(
 506        self, model: type[Model], old_index: Index, new_index: Index
 507    ) -> None:
 508        if self.connection.features.can_rename_index:
 509            self.execute(
 510                self._rename_index_sql(model, old_index.name, new_index.name),
 511                params=None,
 512            )
 513        else:
 514            self.remove_index(model, old_index)
 515            self.add_index(model, new_index)
 516
 517    def add_constraint(self, model: type[Model], constraint: BaseConstraint) -> None:
 518        """Add a constraint to a model."""
 519        sql = constraint.create_sql(model, self)
 520        if sql:
 521            # Constraint.create_sql returns interpolated SQL which makes
 522            # params=None a necessity to avoid escaping attempts on execution.
 523            self.execute(sql, params=None)
 524
 525    def remove_constraint(self, model: type[Model], constraint: BaseConstraint) -> None:
 526        """Remove a constraint from a model."""
 527        sql = constraint.remove_sql(model, self)
 528        if sql:
 529            self.execute(sql)
 530
 531    def alter_db_table(
 532        self, model: type[Model], old_db_table: str, new_db_table: str
 533    ) -> None:
 534        """Rename the table a model points to."""
 535        if old_db_table == new_db_table or (
 536            self.connection.features.ignores_table_name_case
 537            and old_db_table.lower() == new_db_table.lower()
 538        ):
 539            return
 540        self.execute(
 541            self.sql_rename_table
 542            % {
 543                "old_table": self.quote_name(old_db_table),
 544                "new_table": self.quote_name(new_db_table),
 545            }
 546        )
 547        # Rename all references to the old table name.
 548        for sql in self.deferred_sql:
 549            if isinstance(sql, Statement):
 550                sql.rename_table_references(old_db_table, new_db_table)
 551
 552    def alter_db_table_comment(
 553        self,
 554        model: type[Model],
 555        old_db_table_comment: str | None,
 556        new_db_table_comment: str | None,
 557    ) -> None:
 558        self.execute(
 559            self.sql_alter_table_comment
 560            % {
 561                "table": self.quote_name(model.model_options.db_table),
 562                "comment": self.quote_value(new_db_table_comment or ""),
 563            }
 564        )
 565
 566    def add_field(self, model: type[Model], field: Field) -> None:
 567        """
 568        Create a field on a model. Usually involves adding a column, but may
 569        involve adding a table instead (for M2M fields).
 570        """
 571        # Get the column's definition
 572        definition, params = self.column_sql(model, field, include_default=True)
 573        # It might not actually have a column behind it
 574        if definition is None:
 575            return
 576        if col_type_suffix := field.db_type_suffix(connection=self.connection):
 577            definition += f" {col_type_suffix}"
 578        # Check constraints can go on the column SQL here
 579        db_params = field.db_parameters(connection=self.connection)
 580        if db_params["check"]:
 581            definition += " " + self.sql_check_constraint % db_params
 582        if (
 583            field.remote_field
 584            and self.connection.features.supports_foreign_keys
 585            and field.db_constraint  # type: ignore[attr-defined]
 586        ):
 587            # After checking these attributes, we know field is a ForeignKey
 588            fk_field: ForeignKey = field  # type: ignore[assignment]
 589            assert fk_field.remote_field is not None  # Checked in if condition
 590            constraint_suffix = "_fk_%(to_table)s_%(to_column)s"
 591            # Add FK constraint inline, if supported.
 592            if self.sql_create_column_inline_fk:
 593                to_table = fk_field.remote_field.model.model_options.db_table
 594                to_column = fk_field.remote_field.model._model_meta.get_field(
 595                    fk_field.remote_field.field_name
 596                ).column
 597                namespace, _ = split_identifier(model.model_options.db_table)
 598                definition += " " + self.sql_create_column_inline_fk % {
 599                    "name": self._fk_constraint_name(
 600                        model, fk_field, constraint_suffix
 601                    ),
 602                    "namespace": f"{self.quote_name(namespace)}." if namespace else "",
 603                    "column": self.quote_name(fk_field.column),
 604                    "to_table": self.quote_name(to_table),
 605                    "to_column": self.quote_name(to_column),
 606                    "deferrable": self.connection.ops.deferrable_sql(),
 607                }
 608            # Otherwise, add FK constraints later.
 609            else:
 610                self.deferred_sql.append(
 611                    self._create_fk_sql(model, fk_field, constraint_suffix)
 612                )
 613        # Build the SQL and run it
 614        sql = self.sql_create_column % {
 615            "table": self.quote_name(model.model_options.db_table),
 616            "column": self.quote_name(field.column),
 617            "definition": definition,
 618        }
 619        self.execute(sql, params)
 620        # Drop the default if we need to
 621        # (Plain usually does not use in-database defaults)
 622        if (
 623            not self.skip_default_on_alter(field)
 624            and self.effective_default(field) is not None
 625        ):
 626            changes_sql, params = self._alter_column_default_sql(
 627                model, None, field, drop=True
 628            )
 629            sql = self.sql_alter_column % {
 630                "table": self.quote_name(model.model_options.db_table),
 631                "changes": changes_sql,
 632            }
 633            self.execute(sql, params)
 634        # Add field comment, if required.
 635        if (
 636            field.db_comment
 637            and self.connection.features.supports_comments
 638            and not self.connection.features.supports_comments_inline
 639        ):
 640            field_type = db_params["type"]
 641            self.execute(
 642                *self._alter_column_comment_sql(
 643                    model, field, field_type, field.db_comment
 644                )
 645            )
 646        # Add an index, if required
 647        self.deferred_sql.extend(self._field_indexes_sql(model, field))
 648        # Reset connection if required
 649        if self.connection.features.connection_persists_old_columns:
 650            self.connection.close()
 651
 652    def remove_field(self, model: type[Model], field: Field) -> None:
 653        """
 654        Remove a field from a model. Usually involves deleting a column,
 655        but for M2Ms may involve deleting a table.
 656        """
 657        # It might not actually have a column behind it
 658        if field.db_parameters(connection=self.connection)["type"] is None:
 659            return
 660        # Drop any FK constraints, MySQL requires explicit deletion
 661        if field.remote_field:
 662            fk_names = self._constraint_names(model, [field.column], foreign_key=True)
 663            for fk_name in fk_names:
 664                self.execute(self._delete_fk_sql(model, fk_name))
 665        # Delete the column
 666        sql = self.sql_delete_column % {
 667            "table": self.quote_name(model.model_options.db_table),
 668            "column": self.quote_name(field.column),
 669        }
 670        self.execute(sql)
 671        # Reset connection if required
 672        if self.connection.features.connection_persists_old_columns:
 673            self.connection.close()
 674        # Remove all deferred statements referencing the deleted column.
 675        for sql in list(self.deferred_sql):
 676            if isinstance(sql, Statement) and sql.references_column(
 677                model.model_options.db_table, field.column
 678            ):
 679                self.deferred_sql.remove(sql)
 680
 681    def alter_field(
 682        self,
 683        model: type[Model],
 684        old_field: Field,
 685        new_field: Field,
 686        strict: bool = False,
 687    ) -> None:
 688        """
 689        Allow a field's type, uniqueness, nullability, default, column,
 690        constraints, etc. to be modified.
 691        `old_field` is required to compute the necessary changes.
 692        If `strict` is True, raise errors if the old column does not match
 693        `old_field` precisely.
 694        """
 695        if not self._field_should_be_altered(old_field, new_field):
 696            return
 697        # Ensure this field is even column-based
 698        old_db_params = old_field.db_parameters(connection=self.connection)
 699        old_type = old_db_params["type"]
 700        new_db_params = new_field.db_parameters(connection=self.connection)
 701        new_type = new_db_params["type"]
 702        if (old_type is None and old_field.remote_field is None) or (
 703            new_type is None and new_field.remote_field is None
 704        ):
 705            raise ValueError(
 706                f"Cannot alter field {old_field} into {new_field} - they do not properly define "
 707                "db_type (are you using a badly-written custom field?)",
 708            )
 709        elif (
 710            old_type is None
 711            and new_type is None
 712            and (old_field.remote_field.through and new_field.remote_field.through)  # type: ignore[attr-defined]
 713        ):
 714            # Both sides have through models; this is a no-op.
 715            return
 716        elif old_type is None or new_type is None:
 717            raise ValueError(
 718                f"Cannot alter field {old_field} into {new_field} - they are not compatible types "
 719                "(you cannot alter to or from M2M fields, or add or remove "
 720                "through= on M2M fields)"
 721            )
 722
 723        self._alter_field(
 724            model,
 725            old_field,
 726            new_field,
 727            old_type,
 728            new_type,
 729            old_db_params,
 730            new_db_params,
 731            strict,
 732        )
 733
 734    def _field_db_check(
 735        self, field: Field, field_db_params: dict[str, Any]
 736    ) -> str | None:
 737        # Always check constraints with the same mocked column name to avoid
 738        # recreating constrains when the column is renamed.
 739        check_constraints = self.connection.data_type_check_constraints
 740        data = field.db_type_parameters(self.connection)
 741        data["column"] = "__column_name__"
 742        try:
 743            return check_constraints[field.get_internal_type()] % data
 744        except KeyError:
 745            return None
 746
 747    def _alter_field(
 748        self,
 749        model: type[Model],
 750        old_field: Field,
 751        new_field: Field,
 752        old_type: str,
 753        new_type: str,
 754        old_db_params: dict[str, Any],
 755        new_db_params: dict[str, Any],
 756        strict: bool = False,
 757    ) -> None:
 758        """Perform a "physical" (non-ManyToMany) field update."""
 759        # Drop any FK constraints, we'll remake them later
 760        fks_dropped = set()
 761        if (
 762            self.connection.features.supports_foreign_keys
 763            and old_field.remote_field
 764            and old_field.db_constraint  # type: ignore[attr-defined]
 765            and self._field_should_be_altered(
 766                old_field,
 767                new_field,
 768                ignore={"db_comment"},
 769            )
 770        ):
 771            fk_names = self._constraint_names(
 772                model, [old_field.column], foreign_key=True
 773            )
 774            if strict and len(fk_names) != 1:
 775                raise ValueError(
 776                    f"Found wrong number ({len(fk_names)}) of foreign key constraints for {model.model_options.db_table}.{old_field.column}"
 777                )
 778            for fk_name in fk_names:
 779                fks_dropped.add((old_field.column,))
 780                self.execute(self._delete_fk_sql(model, fk_name))
 781        # Has unique been removed?
 782        if old_field.primary_key and (
 783            not new_field.primary_key
 784            or self._field_became_primary_key(old_field, new_field)
 785        ):
 786            # Find the unique constraint for this field
 787            meta_constraint_names = {
 788                constraint.name for constraint in model.model_options.constraints
 789            }
 790            constraint_names = self._constraint_names(
 791                model,
 792                [old_field.column],
 793                unique=True,
 794                primary_key=False,
 795                exclude=meta_constraint_names,
 796            )
 797            if strict and len(constraint_names) != 1:
 798                raise ValueError(
 799                    f"Found wrong number ({len(constraint_names)}) of unique constraints for {model.model_options.db_table}.{old_field.column}"
 800                )
 801            for constraint_name in constraint_names:
 802                sql = self._delete_unique_sql(model, constraint_name)
 803                if sql is not None:
 804                    self.execute(sql)
 805        # Drop incoming FK constraints if the field is a primary key or unique,
 806        # which might be a to_field target, and things are going to change.
 807        old_collation = old_db_params.get("collation")
 808        new_collation = new_db_params.get("collation")
 809        drop_foreign_keys = (
 810            self.connection.features.supports_foreign_keys
 811            and (old_field.primary_key and new_field.primary_key)
 812            and ((old_type != new_type) or (old_collation != new_collation))
 813        )
 814        if drop_foreign_keys:
 815            # '_model_meta.related_field' also contains M2M reverse fields, these
 816            # will be filtered out
 817            for _old_rel, new_rel in _related_non_m2m_objects(old_field, new_field):
 818                rel_fk_names = self._constraint_names(
 819                    new_rel.related_model, [new_rel.field.column], foreign_key=True
 820                )
 821                for fk_name in rel_fk_names:
 822                    self.execute(self._delete_fk_sql(new_rel.related_model, fk_name))
 823        # Removed an index? (no strict check, as multiple indexes are possible)
 824        # Remove indexes if db_index switched to False or a unique constraint
 825        # will now be used in lieu of an index. The following lines from the
 826        # truth table show all True cases; the rest are False:
 827        #
 828        # old_field.db_index | old_field.primary_key | new_field.db_index | new_field.primary_key
 829        # ------------------------------------------------------------------------------
 830        # True               | False            | False              | False
 831        # True               | False            | False              | True
 832        # True               | False            | True               | True
 833        if (
 834            (old_field.remote_field and old_field.db_index)  # type: ignore[attr-defined]
 835            and not old_field.primary_key
 836            and (
 837                not (new_field.remote_field and new_field.db_index)  # type: ignore[attr-defined]
 838                or new_field.primary_key
 839            )
 840        ):
 841            # Find the index for this field
 842            meta_index_names = {index.name for index in model.model_options.indexes}
 843            # Retrieve only BTREE indexes since this is what's created with
 844            # db_index=True.
 845            index_names = self._constraint_names(
 846                model,
 847                [old_field.column],
 848                index=True,
 849                type_=Index.suffix,
 850                exclude=meta_index_names,
 851            )
 852            for index_name in index_names:
 853                # The only way to check if an index was created with
 854                # db_index=True or with Index(['field'], name='foo')
 855                # is to look at its name (refs #28053).
 856                self.execute(self._delete_index_sql(model, index_name))
 857        # Change check constraints?
 858        old_db_check = self._field_db_check(old_field, old_db_params)
 859        new_db_check = self._field_db_check(new_field, new_db_params)
 860        if old_db_check != new_db_check and old_db_check:
 861            meta_constraint_names = {
 862                constraint.name for constraint in model.model_options.constraints
 863            }
 864            constraint_names = self._constraint_names(
 865                model,
 866                [old_field.column],
 867                check=True,
 868                exclude=meta_constraint_names,
 869            )
 870            if strict and len(constraint_names) != 1:
 871                raise ValueError(
 872                    f"Found wrong number ({len(constraint_names)}) of check constraints for {model.model_options.db_table}.{old_field.column}"
 873                )
 874            for constraint_name in constraint_names:
 875                sql = self._delete_check_sql(model, constraint_name)
 876                if sql is not None:
 877                    self.execute(sql)
 878        # Have they renamed the column?
 879        if old_field.column != new_field.column:
 880            self.execute(
 881                self._rename_field_sql(
 882                    model.model_options.db_table, old_field, new_field, new_type
 883                )
 884            )
 885            # Rename all references to the renamed column.
 886            for sql in self.deferred_sql:
 887                if isinstance(sql, Statement):
 888                    sql.rename_column_references(
 889                        model.model_options.db_table, old_field.column, new_field.column
 890                    )
 891        # Next, start accumulating actions to do
 892        actions = []
 893        null_actions = []
 894        post_actions = []
 895        # Type suffix change? (e.g. auto increment).
 896        old_type_suffix = old_field.db_type_suffix(connection=self.connection)
 897        new_type_suffix = new_field.db_type_suffix(connection=self.connection)
 898        # Type, collation, or comment change?
 899        if (
 900            old_type != new_type
 901            or old_type_suffix != new_type_suffix
 902            or old_collation != new_collation
 903            or (
 904                self.connection.features.supports_comments
 905                and old_field.db_comment != new_field.db_comment
 906            )
 907        ):
 908            fragment, other_actions = self._alter_column_type_sql(
 909                model, old_field, new_field, new_type, old_collation, new_collation
 910            )
 911            actions.append(fragment)
 912            post_actions.extend(other_actions)
 913        # When changing a column NULL constraint to NOT NULL with a given
 914        # default value, we need to perform 4 steps:
 915        #  1. Add a default for new incoming writes
 916        #  2. Update existing NULL rows with new default
 917        #  3. Replace NULL constraint with NOT NULL
 918        #  4. Drop the default again.
 919        # Default change?
 920        needs_database_default = False
 921        if old_field.allow_null and not new_field.allow_null:
 922            old_default = self.effective_default(old_field)
 923            new_default = self.effective_default(new_field)
 924            if (
 925                not self.skip_default_on_alter(new_field)
 926                and old_default != new_default
 927                and new_default is not None
 928            ):
 929                needs_database_default = True
 930                actions.append(
 931                    self._alter_column_default_sql(model, old_field, new_field)
 932                )
 933        # Nullability change?
 934        if old_field.allow_null != new_field.allow_null:
 935            fragment = self._alter_column_null_sql(model, old_field, new_field)
 936            if fragment:
 937                null_actions.append(fragment)
 938        # Only if we have a default and there is a change from NULL to NOT NULL
 939        four_way_default_alteration = new_field.has_default() and (
 940            old_field.allow_null and not new_field.allow_null
 941        )
 942        if actions or null_actions:
 943            if not four_way_default_alteration:
 944                # If we don't have to do a 4-way default alteration we can
 945                # directly run a (NOT) NULL alteration
 946                actions += null_actions
 947            # Combine actions together if we can (e.g. postgres)
 948            if self.connection.features.supports_combined_alters and actions:
 949                sql, params = tuple(zip(*actions))
 950                actions = [(", ".join(sql), sum(params, []))]
 951            # Apply those actions
 952            for sql, params in actions:
 953                self.execute(
 954                    self.sql_alter_column
 955                    % {
 956                        "table": self.quote_name(model.model_options.db_table),
 957                        "changes": sql,
 958                    },
 959                    params,
 960                )
 961            if four_way_default_alteration:
 962                # Update existing rows with default value
 963                self.execute(
 964                    self.sql_update_with_default
 965                    % {
 966                        "table": self.quote_name(model.model_options.db_table),
 967                        "column": self.quote_name(new_field.column),
 968                        "default": "%s",
 969                    },
 970                    [new_default],
 971                )
 972                # Since we didn't run a NOT NULL change before we need to do it
 973                # now
 974                for sql, params in null_actions:
 975                    self.execute(
 976                        self.sql_alter_column
 977                        % {
 978                            "table": self.quote_name(model.model_options.db_table),
 979                            "changes": sql,
 980                        },
 981                        params,
 982                    )
 983        if post_actions:
 984            for sql, params in post_actions:
 985                self.execute(sql, params)
 986        # If primary_key changed to False, delete the primary key constraint.
 987        if old_field.primary_key and not new_field.primary_key:
 988            self._delete_primary_key(model, strict)
 989
 990        # Added an index? Add an index if db_index switched to True or a unique
 991        # constraint will no longer be used in lieu of an index. The following
 992        # lines from the truth table show all True cases; the rest are False:
 993        #
 994        # old_field.db_index | old_field.primary_key | new_field.db_index | new_field.primary_key
 995        # ------------------------------------------------------------------------------
 996        # False              | False            | True               | False
 997        # False              | True             | True               | False
 998        # True               | True             | True               | False
 999        if (
1000            (
1001                not (old_field.remote_field and old_field.db_index)  # type: ignore[attr-defined]
1002                or old_field.primary_key
1003            )
1004            and (new_field.remote_field and new_field.db_index)  # type: ignore[attr-defined]
1005            and not new_field.primary_key
1006        ):
1007            self.execute(self._create_index_sql(model, fields=[new_field]))
1008        # Type alteration on primary key? Then we need to alter the column
1009        # referring to us.
1010        rels_to_update = []
1011        if drop_foreign_keys:
1012            rels_to_update.extend(_related_non_m2m_objects(old_field, new_field))
1013        # Changed to become primary key?
1014        if self._field_became_primary_key(old_field, new_field):
1015            # Make the new one
1016            self.execute(self._create_primary_key_sql(model, new_field))
1017            # Update all referencing columns
1018            rels_to_update.extend(_related_non_m2m_objects(old_field, new_field))
1019        # Handle our type alters on the other end of rels from the PK stuff above
1020        for old_rel, new_rel in rels_to_update:
1021            rel_db_params = new_rel.field.db_parameters(connection=self.connection)
1022            rel_type = rel_db_params["type"]
1023            rel_collation = rel_db_params.get("collation")
1024            old_rel_db_params = old_rel.field.db_parameters(connection=self.connection)
1025            old_rel_collation = old_rel_db_params.get("collation")
1026            fragment, other_actions = self._alter_column_type_sql(
1027                new_rel.related_model,
1028                old_rel.field,
1029                new_rel.field,
1030                rel_type,
1031                old_rel_collation,
1032                rel_collation,
1033            )
1034            self.execute(
1035                self.sql_alter_column
1036                % {
1037                    "table": self.quote_name(
1038                        new_rel.related_model.model_options.db_table
1039                    ),
1040                    "changes": fragment[0],
1041                },
1042                fragment[1],
1043            )
1044            for sql, params in other_actions:
1045                self.execute(sql, params)
1046        # Does it have a foreign key?
1047        if (
1048            self.connection.features.supports_foreign_keys
1049            and new_field.remote_field
1050            and (
1051                fks_dropped or not old_field.remote_field or not old_field.db_constraint  # type: ignore[attr-defined]
1052            )
1053            and new_field.db_constraint  # type: ignore[attr-defined]
1054        ):
1055            # After checking these attributes, we know new_field is a ForeignKey
1056            fk_field: ForeignKey = new_field  # type: ignore[assignment]
1057            assert fk_field.remote_field is not None  # Checked in if condition
1058            self.execute(
1059                self._create_fk_sql(model, fk_field, "_fk_%(to_table)s_%(to_column)s")
1060            )
1061        # Rebuild FKs that pointed to us if we previously had to drop them
1062        if drop_foreign_keys:
1063            for _, rel in rels_to_update:
1064                if rel.field.db_constraint:
1065                    self.execute(
1066                        self._create_fk_sql(rel.related_model, rel.field, "_fk")
1067                    )
1068        # Does it have check constraints we need to add?
1069        if old_db_check != new_db_check and new_db_check:
1070            constraint_name = self._create_index_name(
1071                model.model_options.db_table, [new_field.column], suffix="_check"
1072            )
1073            sql = self._create_check_sql(model, constraint_name, new_db_params["check"])
1074            if sql is not None:
1075                self.execute(sql)
1076        # Drop the default if we need to
1077        # (Plain usually does not use in-database defaults)
1078        if needs_database_default:
1079            changes_sql, params = self._alter_column_default_sql(
1080                model, old_field, new_field, drop=True
1081            )
1082            sql = self.sql_alter_column % {
1083                "table": self.quote_name(model.model_options.db_table),
1084                "changes": changes_sql,
1085            }
1086            self.execute(sql, params)
1087        # Reset connection if required
1088        if self.connection.features.connection_persists_old_columns:
1089            self.connection.close()
1090
1091    def _alter_column_null_sql(
1092        self, model: type[Model], old_field: Field, new_field: Field
1093    ) -> tuple[str, list[Any]]:
1094        """
1095        Hook to specialize column null alteration.
1096
1097        Return a (sql, params) fragment to set a column to null or non-null
1098        as required by new_field, or None if no changes are required.
1099        """
1100        new_db_params = new_field.db_parameters(connection=self.connection)
1101        sql = (
1102            self.sql_alter_column_null
1103            if new_field.allow_null
1104            else self.sql_alter_column_not_null
1105        )
1106        return (
1107            sql
1108            % {
1109                "column": self.quote_name(new_field.column),
1110                "type": new_db_params["type"],
1111            },
1112            [],
1113        )
1114
1115    def _alter_column_default_sql(
1116        self,
1117        model: type[Model],
1118        old_field: Field | None,
1119        new_field: Field,
1120        drop: bool = False,
1121    ) -> tuple[str, list[Any]]:
1122        """
1123        Hook to specialize column default alteration.
1124
1125        Return a (sql, params) fragment to add or drop (depending on the drop
1126        argument) a default to new_field's column.
1127        """
1128        new_default = self.effective_default(new_field)
1129        default = self._column_default_sql(new_field)
1130        params = [new_default]
1131
1132        if drop:
1133            params = []
1134        elif self.connection.features.requires_literal_defaults:
1135            # Some databases (Oracle) can't take defaults as a parameter
1136            # If this is the case, the SchemaEditor for that database should
1137            # implement prepare_default().
1138            default = self.prepare_default(new_default)
1139            params = []
1140
1141        new_db_params = new_field.db_parameters(connection=self.connection)
1142        if drop:
1143            if new_field.allow_null:
1144                sql = self.sql_alter_column_no_default_null
1145            else:
1146                sql = self.sql_alter_column_no_default
1147        else:
1148            sql = self.sql_alter_column_default
1149        return (
1150            sql
1151            % {
1152                "column": self.quote_name(new_field.column),
1153                "type": new_db_params["type"],
1154                "default": default,
1155            },
1156            params,
1157        )
1158
1159    def _alter_column_type_sql(
1160        self,
1161        model: type[Model],
1162        old_field: Field,
1163        new_field: Field,
1164        new_type: str,
1165        old_collation: str | None,
1166        new_collation: str | None,
1167    ) -> tuple[tuple[str, list[Any]], list[tuple[str, list[Any]]]]:
1168        """
1169        Hook to specialize column type alteration for different backends,
1170        for cases when a creation type is different to an alteration type
1171        (e.g. SERIAL in PostgreSQL, PostGIS fields).
1172
1173        Return a two-tuple of: an SQL fragment of (sql, params) to insert into
1174        an ALTER TABLE statement and a list of extra (sql, params) tuples to
1175        run once the field is altered.
1176        """
1177        other_actions = []
1178        if collate_sql := self._collate_sql(
1179            new_collation, old_collation, model.model_options.db_table
1180        ):
1181            collate_sql = f" {collate_sql}"
1182        else:
1183            collate_sql = ""
1184        # Comment change?
1185        comment_sql = ""
1186        if self.connection.features.supports_comments and not new_field.many_to_many:
1187            if old_field.db_comment != new_field.db_comment:
1188                # PostgreSQL and Oracle can't execute 'ALTER COLUMN ...' and
1189                # 'COMMENT ON ...' at the same time.
1190                sql, params = self._alter_column_comment_sql(
1191                    model, new_field, new_type, new_field.db_comment
1192                )
1193                if sql:
1194                    other_actions.append((sql, params))
1195            if new_field.db_comment:
1196                comment_sql = self._comment_sql(new_field.db_comment)
1197        return (
1198            (
1199                self.sql_alter_column_type
1200                % {
1201                    "column": self.quote_name(new_field.column),
1202                    "type": new_type,
1203                    "collation": collate_sql,
1204                    "comment": comment_sql,
1205                },
1206                [],
1207            ),
1208            other_actions,
1209        )
1210
1211    def _alter_column_comment_sql(
1212        self,
1213        model: type[Model],
1214        new_field: Field,
1215        new_type: str,
1216        new_db_comment: str | None,
1217    ) -> tuple[str, list[Any]]:
1218        return (
1219            self.sql_alter_column_comment
1220            % {
1221                "table": self.quote_name(model.model_options.db_table),
1222                "column": self.quote_name(new_field.column),
1223                "comment": self._comment_sql(new_db_comment),
1224            },
1225            [],
1226        )
1227
1228    def _comment_sql(self, comment: str | None) -> str:
1229        return self.quote_value(comment or "")
1230
1231    def _alter_many_to_many(
1232        self,
1233        model: type[Model],
1234        old_field: ManyToManyField,
1235        new_field: ManyToManyField,
1236        strict: bool,
1237    ) -> None:
1238        """Alter M2Ms to repoint their to= endpoints."""
1239        # Type narrow for ManyToManyField.remote_field
1240        old_rel: ManyToManyRel = old_field.remote_field  # type: ignore[assignment]
1241        new_rel: ManyToManyRel = new_field.remote_field  # type: ignore[assignment]
1242
1243        # Rename the through table
1244        if (
1245            old_rel.through.model_options.db_table
1246            != new_rel.through.model_options.db_table
1247        ):
1248            self.alter_db_table(
1249                old_rel.through,
1250                old_rel.through.model_options.db_table,
1251                new_rel.through.model_options.db_table,
1252            )
1253        # Repoint the FK to the other side
1254        self.alter_field(
1255            new_rel.through,
1256            # The field that points to the target model is needed, so we can
1257            # tell alter_field to change it - this is m2m_reverse_field_name()
1258            # (as opposed to m2m_field_name(), which points to our model).
1259            old_rel.through._model_meta.get_field(
1260                old_field.m2m_reverse_field_name()  # type: ignore[attr-defined]
1261            ),
1262            new_rel.through._model_meta.get_field(
1263                new_field.m2m_reverse_field_name()  # type: ignore[attr-defined]
1264            ),
1265        )
1266        self.alter_field(
1267            new_rel.through,
1268            # for self-referential models we need to alter field from the other end too
1269            old_rel.through._model_meta.get_field(old_field.m2m_field_name()),
1270            new_rel.through._model_meta.get_field(new_field.m2m_field_name()),
1271        )
1272
1273    def _create_index_name(
1274        self, table_name: str, column_names: list[str], suffix: str = ""
1275    ) -> str:
1276        """
1277        Generate a unique name for an index/unique constraint.
1278
1279        The name is divided into 3 parts: the table name, the column names,
1280        and a unique digest and suffix.
1281        """
1282        _, table_name = split_identifier(table_name)
1283        hash_suffix_part = (
1284            f"{names_digest(table_name, *column_names, length=8)}{suffix}"
1285        )
1286        max_length = self.connection.ops.max_name_length() or 200
1287        # If everything fits into max_length, use that name.
1288        index_name = "{}_{}_{}".format(
1289            table_name, "_".join(column_names), hash_suffix_part
1290        )
1291        if len(index_name) <= max_length:
1292            return index_name
1293        # Shorten a long suffix.
1294        if len(hash_suffix_part) > max_length / 3:
1295            hash_suffix_part = hash_suffix_part[: max_length // 3]
1296        other_length = (max_length - len(hash_suffix_part)) // 2 - 1
1297        index_name = "{}_{}_{}".format(
1298            table_name[:other_length],
1299            "_".join(column_names)[:other_length],
1300            hash_suffix_part,
1301        )
1302        # Prepend D if needed to prevent the name from starting with an
1303        # underscore or a number (not permitted on Oracle).
1304        if index_name[0] == "_" or index_name[0].isdigit():
1305            index_name = f"D{index_name[:-1]}"
1306        return index_name
1307
1308    def _index_condition_sql(self, condition: str | None) -> str:
1309        if condition:
1310            return " WHERE " + condition
1311        return ""
1312
1313    def _index_include_sql(
1314        self, model: type[Model], columns: list[str] | None
1315    ) -> str | Statement:
1316        if not columns or not self.connection.features.supports_covering_indexes:
1317            return ""
1318        return Statement(
1319            " INCLUDE (%(columns)s)",
1320            columns=Columns(model.model_options.db_table, columns, self.quote_name),
1321        )
1322
1323    def _create_index_sql(
1324        self,
1325        model: type[Model],
1326        *,
1327        fields: list[Field] | None = None,
1328        name: str | None = None,
1329        suffix: str = "",
1330        using: str = "",
1331        col_suffixes: tuple[str, ...] = (),
1332        sql: str | None = None,
1333        opclasses: tuple[str, ...] = (),
1334        condition: str | None = None,
1335        include: list[str] | None = None,
1336        expressions: Any = None,
1337    ) -> Statement:
1338        """
1339        Return the SQL statement to create the index for one or several fields
1340        or expressions. `sql` can be specified if the syntax differs from the
1341        standard (GIS indexes, ...).
1342        """
1343        fields = fields or []
1344        expressions = expressions or []
1345        compiler = Query(model, alias_cols=False).get_compiler()
1346        columns = [field.column for field in fields]
1347        sql_create_index = sql or self.sql_create_index
1348        table = model.model_options.db_table
1349
1350        def create_index_name(*args: Any, **kwargs: Any) -> str:
1351            nonlocal name
1352            if name is None:
1353                name = self._create_index_name(*args, **kwargs)
1354            return self.quote_name(name)
1355
1356        return Statement(
1357            sql_create_index,
1358            table=Table(table, self.quote_name),
1359            name=IndexName(table, columns, suffix, create_index_name),
1360            using=using,
1361            columns=(
1362                self._index_columns(table, columns, col_suffixes, opclasses)
1363                if columns
1364                else Expressions(table, expressions, compiler, self.quote_value)
1365            ),
1366            extra="",
1367            condition=self._index_condition_sql(condition),
1368            include=self._index_include_sql(model, include),
1369        )
1370
1371    def _delete_index_sql(
1372        self, model: type[Model], name: str, sql: str | None = None
1373    ) -> Statement:
1374        return Statement(
1375            sql or self.sql_delete_index,
1376            table=Table(model.model_options.db_table, self.quote_name),
1377            name=self.quote_name(name),
1378        )
1379
1380    def _rename_index_sql(
1381        self, model: type[Model], old_name: str, new_name: str
1382    ) -> Statement:
1383        return Statement(
1384            self.sql_rename_index,
1385            table=Table(model.model_options.db_table, self.quote_name),
1386            old_name=self.quote_name(old_name),
1387            new_name=self.quote_name(new_name),
1388        )
1389
1390    def _index_columns(
1391        self,
1392        table: str,
1393        columns: list[str],
1394        col_suffixes: tuple[str, ...],
1395        opclasses: tuple[str, ...],
1396    ) -> Columns:
1397        return Columns(table, columns, self.quote_name, col_suffixes=col_suffixes)
1398
1399    def _model_indexes_sql(self, model: type[Model]) -> list[Statement | None]:
1400        """
1401        Return a list of all index SQL statements (field indexes, Meta.indexes) for the specified model.
1402        """
1403        output: list[Statement | None] = []
1404        for field in model._model_meta.local_fields:
1405            output.extend(self._field_indexes_sql(model, field))
1406
1407        for index in model.model_options.indexes:
1408            if (
1409                not index.contains_expressions
1410                or self.connection.features.supports_expression_indexes
1411            ):
1412                output.append(index.create_sql(model, self))
1413        return output
1414
1415    def _field_indexes_sql(self, model: type[Model], field: Field) -> list[Statement]:
1416        """
1417        Return a list of all index SQL statements for the specified field.
1418        """
1419        output: list[Statement] = []
1420        if self._field_should_be_indexed(model, field):
1421            output.append(self._create_index_sql(model, fields=[field]))
1422        return output
1423
1424    def _field_should_be_altered(
1425        self, old_field: Field, new_field: Field, ignore: set[str] | None = None
1426    ) -> bool:
1427        ignore = ignore or set()
1428        _, old_path, old_args, old_kwargs = old_field.deconstruct()
1429        _, new_path, new_args, new_kwargs = new_field.deconstruct()
1430        # Don't alter when:
1431        # - changing only a field name
1432        # - changing an attribute that doesn't affect the schema
1433        # - changing an attribute in the provided set of ignored attributes
1434        # - adding only a db_column and the column name is not changed
1435        for attr in ignore.union(old_field.non_db_attrs):  # type: ignore[attr-defined]
1436            old_kwargs.pop(attr, None)
1437        for attr in ignore.union(new_field.non_db_attrs):  # type: ignore[attr-defined]
1438            new_kwargs.pop(attr, None)
1439        return self.quote_name(old_field.column) != self.quote_name(
1440            new_field.column
1441        ) or (old_path, old_args, old_kwargs) != (new_path, new_args, new_kwargs)
1442
1443    def _field_should_be_indexed(self, model: type[Model], field: Field) -> bool:
1444        return (field.remote_field and field.db_index) and not field.primary_key  # type: ignore[attr-defined]
1445
1446    def _field_became_primary_key(self, old_field: Field, new_field: Field) -> bool:
1447        return not old_field.primary_key and new_field.primary_key
1448
1449    def _rename_field_sql(
1450        self, table: str, old_field: Field, new_field: Field, new_type: str
1451    ) -> str:
1452        return self.sql_rename_column % {
1453            "table": self.quote_name(table),
1454            "old_column": self.quote_name(old_field.column),
1455            "new_column": self.quote_name(new_field.column),
1456            "type": new_type,
1457        }
1458
1459    def _create_fk_sql(
1460        self, model: type[Model], field: ForeignKey, suffix: str
1461    ) -> Statement:
1462        table = Table(model.model_options.db_table, self.quote_name)
1463        name = self._fk_constraint_name(model, field, suffix)
1464        column = Columns(model.model_options.db_table, [field.column], self.quote_name)
1465        to_table = Table(
1466            field.target_field.model.model_options.db_table, self.quote_name
1467        )
1468        to_column = Columns(
1469            field.target_field.model.model_options.db_table,
1470            [field.target_field.column],  # type: ignore[attr-defined]
1471            self.quote_name,
1472        )
1473        deferrable = self.connection.ops.deferrable_sql()
1474        return Statement(
1475            self.sql_create_fk,
1476            table=table,
1477            name=name,
1478            column=column,
1479            to_table=to_table,
1480            to_column=to_column,
1481            deferrable=deferrable,
1482        )
1483
1484    def _fk_constraint_name(
1485        self, model: type[Model], field: ForeignKey, suffix: str
1486    ) -> ForeignKeyName:
1487        def create_fk_name(*args: Any, **kwargs: Any) -> str:
1488            return self.quote_name(self._create_index_name(*args, **kwargs))
1489
1490        return ForeignKeyName(
1491            model.model_options.db_table,
1492            [field.column],
1493            split_identifier(field.target_field.model.model_options.db_table)[1],
1494            [field.target_field.column],  # type: ignore[attr-defined]
1495            suffix,
1496            create_fk_name,
1497        )
1498
1499    def _delete_fk_sql(self, model: type[Model], name: str) -> Statement:
1500        return self._delete_constraint_sql(self.sql_delete_fk, model, name)
1501
1502    def _deferrable_constraint_sql(self, deferrable: Deferrable | None) -> str:
1503        if deferrable is None:
1504            return ""
1505        if deferrable == Deferrable.DEFERRED:
1506            return " DEFERRABLE INITIALLY DEFERRED"
1507        if deferrable == Deferrable.IMMEDIATE:
1508            return " DEFERRABLE INITIALLY IMMEDIATE"
1509        return ""
1510
1511    def _unique_sql(
1512        self,
1513        model: type[Model],
1514        fields: Iterable[Field],
1515        name: str,
1516        condition: str | None = None,
1517        deferrable: Deferrable | None = None,
1518        include: list[str] | None = None,
1519        opclasses: tuple[str, ...] | None = None,
1520        expressions: Any = None,
1521    ) -> str | None:
1522        if (
1523            deferrable
1524            and not self.connection.features.supports_deferrable_unique_constraints
1525        ):
1526            return None
1527        if condition or include or opclasses or expressions:
1528            # Databases support conditional, covering, and functional unique
1529            # constraints via a unique index.
1530            sql = self._create_unique_sql(
1531                model,
1532                fields,
1533                name=name,
1534                condition=condition,
1535                include=include,
1536                opclasses=opclasses,
1537                expressions=expressions,
1538            )
1539            if sql:
1540                self.deferred_sql.append(sql)
1541            return None
1542        constraint = self.sql_unique_constraint % {
1543            "columns": ", ".join([self.quote_name(field.column) for field in fields]),
1544            "deferrable": self._deferrable_constraint_sql(deferrable),
1545        }
1546        return self.sql_constraint % {
1547            "name": self.quote_name(name),
1548            "constraint": constraint,
1549        }
1550
1551    def _create_unique_sql(
1552        self,
1553        model: type[Model],
1554        fields: Iterable[Field],
1555        name: str | None = None,
1556        condition: str | None = None,
1557        deferrable: Deferrable | None = None,
1558        include: list[str] | None = None,
1559        opclasses: tuple[str, ...] | None = None,
1560        expressions: Any = None,
1561    ) -> Statement | None:
1562        if (
1563            (
1564                deferrable
1565                and not self.connection.features.supports_deferrable_unique_constraints
1566            )
1567            or (condition and not self.connection.features.supports_partial_indexes)
1568            or (include and not self.connection.features.supports_covering_indexes)
1569            or (
1570                expressions and not self.connection.features.supports_expression_indexes
1571            )
1572        ):
1573            return None
1574
1575        compiler = Query(model, alias_cols=False).get_compiler()
1576        table = model.model_options.db_table
1577        columns = [field.column for field in fields]
1578        constraint_name: IndexName | str
1579        if name is None:
1580            constraint_name = self._unique_constraint_name(table, columns, quote=True)
1581        else:
1582            constraint_name = self.quote_name(name)
1583        if condition or include or opclasses or expressions:
1584            sql = self.sql_create_unique_index
1585        else:
1586            sql = self.sql_create_unique
1587        if columns:
1588            columns_obj: Columns | Expressions = self._index_columns(
1589                table, columns, col_suffixes=(), opclasses=opclasses or ()
1590            )
1591        else:
1592            columns_obj = Expressions(table, expressions, compiler, self.quote_value)
1593        return Statement(
1594            sql,
1595            table=Table(table, self.quote_name),
1596            name=constraint_name,
1597            columns=columns_obj,
1598            condition=self._index_condition_sql(condition),
1599            deferrable=self._deferrable_constraint_sql(deferrable),
1600            include=self._index_include_sql(model, include),
1601        )
1602
1603    def _unique_constraint_name(
1604        self, table: str, columns: list[str], quote: bool = True
1605    ) -> IndexName | str:
1606        if quote:
1607
1608            def create_unique_name(*args: Any, **kwargs: Any) -> str:
1609                return self.quote_name(self._create_index_name(*args, **kwargs))
1610
1611        else:
1612            create_unique_name = self._create_index_name
1613
1614        return IndexName(table, columns, "_uniq", create_unique_name)
1615
1616    def _delete_unique_sql(
1617        self,
1618        model: type[Model],
1619        name: str,
1620        condition: str | None = None,
1621        deferrable: Deferrable | None = None,
1622        include: list[str] | None = None,
1623        opclasses: tuple[str, ...] | None = None,
1624        expressions: Any = None,
1625    ) -> Statement | None:
1626        if (
1627            (
1628                deferrable
1629                and not self.connection.features.supports_deferrable_unique_constraints
1630            )
1631            or (condition and not self.connection.features.supports_partial_indexes)
1632            or (include and not self.connection.features.supports_covering_indexes)
1633            or (
1634                expressions and not self.connection.features.supports_expression_indexes
1635            )
1636        ):
1637            return None
1638        if condition or include or opclasses or expressions:
1639            sql = self.sql_delete_index
1640        else:
1641            sql = self.sql_delete_unique
1642        return self._delete_constraint_sql(sql, model, name)
1643
1644    def _check_sql(self, name: str, check: str) -> str:
1645        return self.sql_constraint % {
1646            "name": self.quote_name(name),
1647            "constraint": self.sql_check_constraint % {"check": check},
1648        }
1649
1650    def _create_check_sql(
1651        self, model: type[Model], name: str, check: str
1652    ) -> Statement | None:
1653        if not self.connection.features.supports_table_check_constraints:
1654            return None
1655        return Statement(
1656            self.sql_create_check,
1657            table=Table(model.model_options.db_table, self.quote_name),
1658            name=self.quote_name(name),
1659            check=check,
1660        )
1661
1662    def _delete_check_sql(self, model: type[Model], name: str) -> Statement | None:
1663        if not self.connection.features.supports_table_check_constraints:
1664            return None
1665        return self._delete_constraint_sql(self.sql_delete_check, model, name)
1666
1667    def _delete_constraint_sql(
1668        self, template: str, model: type[Model], name: str
1669    ) -> Statement:
1670        return Statement(
1671            template,
1672            table=Table(model.model_options.db_table, self.quote_name),
1673            name=self.quote_name(name),
1674        )
1675
1676    def _constraint_names(
1677        self,
1678        model: type[Model],
1679        column_names: list[str] | None = None,
1680        unique: bool | None = None,
1681        primary_key: bool | None = None,
1682        index: bool | None = None,
1683        foreign_key: bool | None = None,
1684        check: bool | None = None,
1685        type_: str | None = None,
1686        exclude: set[str] | None = None,
1687    ) -> list[str]:
1688        """Return all constraint names matching the columns and conditions."""
1689        if column_names is not None:
1690            column_names = [
1691                self.connection.introspection.identifier_converter(
1692                    truncate_name(name, self.connection.ops.max_name_length())
1693                )
1694                if self.connection.features.truncates_names
1695                else self.connection.introspection.identifier_converter(name)
1696                for name in column_names
1697            ]
1698        with self.connection.cursor() as cursor:
1699            constraints = self.connection.introspection.get_constraints(
1700                cursor, model.model_options.db_table
1701            )
1702        result: list[str] = []
1703        for name, infodict in constraints.items():
1704            if column_names is None or column_names == infodict["columns"]:
1705                if unique is not None and infodict["unique"] != unique:
1706                    continue
1707                if primary_key is not None and infodict["primary_key"] != primary_key:
1708                    continue
1709                if index is not None and infodict["index"] != index:
1710                    continue
1711                if check is not None and infodict["check"] != check:
1712                    continue
1713                if foreign_key is not None and not infodict["foreign_key"]:
1714                    continue
1715                if type_ is not None and infodict["type"] != type_:
1716                    continue
1717                if not exclude or name not in exclude:
1718                    result.append(name)
1719        return result
1720
1721    def _delete_primary_key(self, model: type[Model], strict: bool = False) -> None:
1722        constraint_names = self._constraint_names(model, primary_key=True)
1723        if strict and len(constraint_names) != 1:
1724            raise ValueError(
1725                f"Found wrong number ({len(constraint_names)}) of PK constraints for {model.model_options.db_table}"
1726            )
1727        for constraint_name in constraint_names:
1728            self.execute(self._delete_primary_key_sql(model, constraint_name))
1729
1730    def _create_primary_key_sql(self, model: type[Model], field: Field) -> Statement:
1731        return Statement(
1732            self.sql_create_pk,
1733            table=Table(model.model_options.db_table, self.quote_name),
1734            name=self.quote_name(
1735                self._create_index_name(
1736                    model.model_options.db_table, [field.column], suffix="_pk"
1737                )
1738            ),
1739            columns=Columns(
1740                model.model_options.db_table, [field.column], self.quote_name
1741            ),
1742        )
1743
1744    def _delete_primary_key_sql(self, model: type[Model], name: str) -> Statement:
1745        return self._delete_constraint_sql(self.sql_delete_pk, model, name)
1746
1747    def _collate_sql(
1748        self,
1749        collation: str | None,
1750        old_collation: str | None = None,
1751        table_name: str | None = None,
1752    ) -> str:
1753        return "COLLATE " + self.quote_name(collation) if collation else ""