Plain is headed towards 1.0! Subscribe for development updates →

   1from __future__ import annotations
   2
   3import logging
   4import operator
   5from collections.abc import Generator
   6from datetime import datetime
   7from typing import TYPE_CHECKING, Any
   8
   9from plain.models.backends.ddl_references import (
  10    Columns,
  11    Expressions,
  12    ForeignKeyName,
  13    IndexName,
  14    Statement,
  15    Table,
  16)
  17from plain.models.backends.utils import names_digest, split_identifier, truncate_name
  18from plain.models.constraints import Deferrable
  19from plain.models.indexes import Index
  20from plain.models.sql import Query
  21from plain.models.transaction import TransactionManagementError, atomic
  22from plain.utils import timezone
  23
  24if TYPE_CHECKING:
  25    from collections.abc import Iterable
  26
  27    from plain.models.backends.base.base import BaseDatabaseWrapper
  28    from plain.models.base import Model
  29    from plain.models.constraints import BaseConstraint
  30    from plain.models.fields import Field
  31    from plain.models.fields.related import ForeignKey, ManyToManyField
  32    from plain.models.fields.reverse_related import ManyToManyRel
  33
  34logger = logging.getLogger("plain.models.backends.schema")
  35
  36
  37def _is_relevant_relation(relation: Any, altered_field: Field) -> bool:
  38    """
  39    When altering the given field, must constraints on its model from the given
  40    relation be temporarily dropped?
  41    """
  42    field = relation.field
  43    if field.many_to_many:
  44        # M2M reverse field
  45        return False
  46    if altered_field.primary_key:
  47        # Foreign key constraint on the primary key, which is being altered.
  48        return True
  49    # ForeignKey always targets 'id'
  50    return altered_field.name == "id"
  51
  52
  53def _all_related_fields(model: type[Model]) -> list[Any]:
  54    # Related fields must be returned in a deterministic order.
  55    return sorted(
  56        model._model_meta._get_fields(
  57            forward=False,
  58            reverse=True,
  59            include_hidden=True,
  60        ),
  61        key=operator.attrgetter("name"),
  62    )
  63
  64
  65def _related_non_m2m_objects(
  66    old_field: Field, new_field: Field
  67) -> Generator[tuple[Any, Any], None, None]:
  68    # Filter out m2m objects from reverse relations.
  69    # Return (old_relation, new_relation) tuples.
  70    related_fields = zip(
  71        (
  72            obj
  73            for obj in _all_related_fields(old_field.model)
  74            if _is_relevant_relation(obj, old_field)
  75        ),
  76        (
  77            obj
  78            for obj in _all_related_fields(new_field.model)
  79            if _is_relevant_relation(obj, new_field)
  80        ),
  81    )
  82    for old_rel, new_rel in related_fields:
  83        yield old_rel, new_rel
  84        yield from _related_non_m2m_objects(
  85            old_rel.remote_field,
  86            new_rel.remote_field,
  87        )
  88
  89
  90class BaseDatabaseSchemaEditor:
  91    """
  92    This class and its subclasses are responsible for emitting schema-changing
  93    statements to the databases - model creation/removal/alteration, field
  94    renaming, index fiddling, and so on.
  95    """
  96
  97    # Overrideable SQL templates
  98    sql_create_table = "CREATE TABLE %(table)s (%(definition)s)"
  99    sql_rename_table = "ALTER TABLE %(old_table)s RENAME TO %(new_table)s"
 100    sql_delete_table = "DROP TABLE %(table)s CASCADE"
 101
 102    sql_create_column = "ALTER TABLE %(table)s ADD COLUMN %(column)s %(definition)s"
 103    sql_alter_column = "ALTER TABLE %(table)s %(changes)s"
 104    sql_alter_column_type = "ALTER COLUMN %(column)s TYPE %(type)s%(collation)s"
 105    sql_alter_column_null = "ALTER COLUMN %(column)s DROP NOT NULL"
 106    sql_alter_column_not_null = "ALTER COLUMN %(column)s SET NOT NULL"
 107    sql_alter_column_default = "ALTER COLUMN %(column)s SET DEFAULT %(default)s"
 108    sql_alter_column_no_default = "ALTER COLUMN %(column)s DROP DEFAULT"
 109    sql_alter_column_no_default_null = sql_alter_column_no_default
 110    sql_delete_column = "ALTER TABLE %(table)s DROP COLUMN %(column)s CASCADE"
 111    sql_rename_column = (
 112        "ALTER TABLE %(table)s RENAME COLUMN %(old_column)s TO %(new_column)s"
 113    )
 114    sql_update_with_default = (
 115        "UPDATE %(table)s SET %(column)s = %(default)s WHERE %(column)s IS NULL"
 116    )
 117
 118    sql_unique_constraint = "UNIQUE (%(columns)s)%(deferrable)s"
 119    sql_check_constraint = "CHECK (%(check)s)"
 120    sql_delete_constraint = "ALTER TABLE %(table)s DROP CONSTRAINT %(name)s"
 121    sql_constraint = "CONSTRAINT %(name)s %(constraint)s"
 122
 123    sql_create_check = "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s CHECK (%(check)s)"
 124    sql_delete_check = sql_delete_constraint
 125
 126    sql_create_unique = (
 127        "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s "
 128        "UNIQUE (%(columns)s)%(deferrable)s"
 129    )
 130    sql_delete_unique = sql_delete_constraint
 131
 132    sql_create_fk = (
 133        "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s FOREIGN KEY (%(column)s) "
 134        "REFERENCES %(to_table)s (%(to_column)s)%(deferrable)s"
 135    )
 136    sql_create_inline_fk = None
 137    sql_create_column_inline_fk = None
 138    sql_delete_fk = sql_delete_constraint
 139
 140    sql_create_index = (
 141        "CREATE INDEX %(name)s ON %(table)s "
 142        "(%(columns)s)%(include)s%(extra)s%(condition)s"
 143    )
 144    sql_create_unique_index = (
 145        "CREATE UNIQUE INDEX %(name)s ON %(table)s "
 146        "(%(columns)s)%(include)s%(condition)s"
 147    )
 148    sql_rename_index = "ALTER INDEX %(old_name)s RENAME TO %(new_name)s"
 149    sql_delete_index = "DROP INDEX %(name)s"
 150
 151    sql_create_pk = (
 152        "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s PRIMARY KEY (%(columns)s)"
 153    )
 154    sql_delete_pk = sql_delete_constraint
 155
 156    sql_alter_table_comment = "COMMENT ON TABLE %(table)s IS %(comment)s"
 157    sql_alter_column_comment = "COMMENT ON COLUMN %(table)s.%(column)s IS %(comment)s"
 158
 159    def __init__(
 160        self,
 161        connection: BaseDatabaseWrapper,
 162        atomic: bool = True,
 163    ):
 164        self.connection = connection
 165        self.atomic_migration = self.connection.features.can_rollback_ddl and atomic
 166
 167    # State-managing methods
 168
 169    def __enter__(self) -> BaseDatabaseSchemaEditor:
 170        self.deferred_sql: list[Any] = []
 171        self.executed_sql: list[str] = []
 172        if self.atomic_migration:
 173            self.atomic = atomic()
 174            self.atomic.__enter__()
 175        return self
 176
 177    def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
 178        if exc_type is None:
 179            for sql in self.deferred_sql:
 180                self.execute(sql)
 181        if self.atomic_migration:
 182            self.atomic.__exit__(exc_type, exc_value, traceback)
 183
 184    # Core utility functions
 185
 186    def execute(
 187        self, sql: str | Statement, params: tuple[Any, ...] | list[Any] | None = ()
 188    ) -> None:
 189        """Execute the given SQL statement, with optional parameters."""
 190        if (
 191            self.connection.in_atomic_block
 192            and not self.connection.features.can_rollback_ddl
 193        ):
 194            raise TransactionManagementError(
 195                "Executing DDL statements while in a transaction on databases "
 196                "that can't perform a rollback is prohibited."
 197            )
 198        # Account for non-string statement objects.
 199        sql = str(sql)
 200        # Log the command we're running, then run it
 201        logger.debug(
 202            "%s; (params %r)", sql, params, extra={"params": params, "sql": sql}
 203        )
 204
 205        # Track executed SQL for display in migration output
 206        # Store the SQL for display (interpolate params for readability)
 207        if params:
 208            self.executed_sql.append(sql % tuple(map(self.quote_value, params)))
 209        else:
 210            self.executed_sql.append(sql)
 211
 212        with self.connection.cursor() as cursor:
 213            cursor.execute(sql, params)
 214
 215    def quote_name(self, name: str) -> str:
 216        return self.connection.ops.quote_name(name)
 217
 218    def table_sql(self, model: type[Model]) -> tuple[str, list[Any]]:
 219        """Take a model and return its table definition."""
 220        # Create column SQL, add FK deferreds if needed.
 221        column_sqls = []
 222        params = []
 223        for field in model._model_meta.local_fields:
 224            # SQL.
 225            definition, extra_params = self.column_sql(model, field)
 226            if definition is None:
 227                continue
 228            # Check constraints can go on the column SQL here.
 229            db_params = field.db_parameters(connection=self.connection)
 230            if db_params["check"]:
 231                definition += " " + self.sql_check_constraint % db_params
 232            # Autoincrement SQL (for backends with inline variant).
 233            col_type_suffix = field.db_type_suffix(connection=self.connection)
 234            if col_type_suffix:
 235                definition += f" {col_type_suffix}"
 236            params.extend(extra_params)
 237            # FK.
 238            if field.remote_field and field.db_constraint:  # type: ignore[attr-defined]
 239                to_table = field.remote_field.model.model_options.db_table
 240                to_column = field.remote_field.model._model_meta.get_field(
 241                    field.remote_field.field_name
 242                ).column
 243                if self.sql_create_inline_fk:
 244                    definition += " " + self.sql_create_inline_fk % {
 245                        "to_table": self.quote_name(to_table),
 246                        "to_column": self.quote_name(to_column),
 247                    }
 248                elif self.connection.features.supports_foreign_keys:
 249                    self.deferred_sql.append(
 250                        self._create_fk_sql(
 251                            model, field, "_fk_%(to_table)s_%(to_column)s"
 252                        )
 253                    )
 254            # Add the SQL to our big list.
 255            column_sqls.append(f"{self.quote_name(field.column)} {definition}")
 256            # Autoincrement SQL (for backends with post table definition
 257            # variant).
 258            if field.get_internal_type() in ("PrimaryKeyField",):
 259                autoinc_sql = self.connection.ops.autoinc_sql(
 260                    model.model_options.db_table, field.column
 261                )
 262                if autoinc_sql:
 263                    self.deferred_sql.extend(autoinc_sql)
 264        constraints = [
 265            constraint.constraint_sql(model, self)
 266            for constraint in model.model_options.constraints
 267        ]
 268        sql = self.sql_create_table % {
 269            "table": self.quote_name(model.model_options.db_table),
 270            "definition": ", ".join(
 271                str(constraint)
 272                for constraint in (*column_sqls, *constraints)
 273                if constraint
 274            ),
 275        }
 276        return sql, params
 277
 278    # Field <-> database mapping functions
 279
 280    def _iter_column_sql(
 281        self,
 282        column_db_type: str,
 283        params: list[Any],
 284        model: type[Model],
 285        field: Field,
 286        field_db_params: dict[str, Any],
 287        include_default: bool,
 288    ) -> Generator[str, None, None]:
 289        yield column_db_type
 290        if collation := field_db_params.get("collation"):
 291            yield self._collate_sql(collation)
 292        if self.connection.features.supports_comments_inline and field.db_comment:
 293            yield self._comment_sql(field.db_comment)
 294        # Work out nullability.
 295        null = field.allow_null
 296        # Include a default value, if requested.
 297        include_default = (
 298            include_default
 299            and not self.skip_default(field)
 300            and
 301            # Don't include a default value if it's a nullable field and the
 302            # default cannot be dropped in the ALTER COLUMN statement (e.g.
 303            # MySQL longtext and longblob).
 304            not (null and self.skip_default_on_alter(field))
 305        )
 306        if include_default:
 307            default_value = self.effective_default(field)
 308            if default_value is not None:
 309                column_default = "DEFAULT " + self._column_default_sql(field)
 310                if self.connection.features.requires_literal_defaults:
 311                    # Some databases can't take defaults as a parameter (Oracle).
 312                    # If this is the case, the individual schema backend should
 313                    # implement prepare_default().
 314                    yield column_default % self.prepare_default(default_value)
 315                else:
 316                    yield column_default
 317                    params.append(default_value)
 318
 319        if not null:
 320            yield "NOT NULL"
 321        elif not self.connection.features.implied_column_null:
 322            yield "NULL"
 323
 324        if field.primary_key:
 325            yield "PRIMARY KEY"
 326
 327    def column_sql(
 328        self, model: type[Model], field: Field, include_default: bool = False
 329    ) -> tuple[str | None, list[Any] | None]:
 330        """
 331        Return the column definition for a field. The field must already have
 332        had set_attributes_from_name() called.
 333        """
 334        # Get the column's type and use that as the basis of the SQL.
 335        field_db_params = field.db_parameters(connection=self.connection)
 336        column_db_type = field_db_params["type"]
 337        # Check for fields that aren't actually columns (e.g. M2M).
 338        if column_db_type is None:
 339            return None, None
 340        params: list[Any] = []
 341        return (
 342            " ".join(
 343                # This appends to the params being returned.
 344                self._iter_column_sql(
 345                    column_db_type,
 346                    params,
 347                    model,
 348                    field,
 349                    field_db_params,
 350                    include_default,
 351                )
 352            ),
 353            params,
 354        )
 355
 356    def skip_default(self, field: Field) -> bool:
 357        """
 358        Some backends don't accept default values for certain columns types
 359        (i.e. MySQL longtext and longblob).
 360        """
 361        return False
 362
 363    def skip_default_on_alter(self, field: Field) -> bool:
 364        """
 365        Some backends don't accept default values for certain columns types
 366        (i.e. MySQL longtext and longblob) in the ALTER COLUMN statement.
 367        """
 368        return False
 369
 370    def prepare_default(self, value: Any) -> str:
 371        """
 372        Only used for backends which have requires_literal_defaults feature
 373        """
 374        raise NotImplementedError(
 375            "subclasses of BaseDatabaseSchemaEditor for backends which have "
 376            "requires_literal_defaults must provide a prepare_default() method"
 377        )
 378
 379    def _column_default_sql(self, field: Field) -> str:
 380        """
 381        Return the SQL to use in a DEFAULT clause. The resulting string should
 382        contain a '%s' placeholder for a default value.
 383        """
 384        return "%s"
 385
 386    @staticmethod
 387    def _effective_default(field: Field) -> Any:
 388        # This method allows testing its logic without a connection.
 389        if field.has_default():
 390            default = field.get_default()
 391        elif (
 392            not field.allow_null and not field.required and field.empty_strings_allowed
 393        ):
 394            if field.get_internal_type() == "BinaryField":
 395                default = b""
 396            else:
 397                default = ""
 398        elif getattr(field, "auto_now", False) or getattr(field, "auto_now_add", False):
 399            internal_type = field.get_internal_type()
 400            if internal_type == "DateTimeField":
 401                default = timezone.now()
 402            else:
 403                default = datetime.now()
 404                if internal_type == "DateField":
 405                    default = default.date()
 406                elif internal_type == "TimeField":
 407                    default = default.time()
 408        else:
 409            default = None
 410        return default
 411
 412    def effective_default(self, field: Field) -> Any:
 413        """Return a field's effective database default value."""
 414        return field.get_db_prep_save(self._effective_default(field), self.connection)
 415
 416    def quote_value(self, value: Any) -> str:
 417        """
 418        Return a quoted version of the value so it's safe to use in an SQL
 419        string. This is not safe against injection from user code; it is
 420        intended only for use in making SQL scripts or preparing default values
 421        for particularly tricky backends (defaults are not user-defined, though,
 422        so this is safe).
 423        """
 424        raise NotImplementedError()
 425
 426    # Actions
 427
 428    def create_model(self, model: type[Model]) -> None:
 429        """
 430        Create a table and any accompanying indexes or unique constraints for
 431        the given `model`.
 432        """
 433        sql, params = self.table_sql(model)
 434        # Prevent using [] as params, in the case a literal '%' is used in the
 435        # definition.
 436        self.execute(sql, params or None)
 437
 438        if self.connection.features.supports_comments:
 439            # Add table comment.
 440            if model.model_options.db_table_comment:
 441                self.alter_db_table_comment(
 442                    model, None, model.model_options.db_table_comment
 443                )
 444            # Add column comments.
 445            if not self.connection.features.supports_comments_inline:
 446                for field in model._model_meta.local_fields:
 447                    if field.db_comment:
 448                        field_db_params = field.db_parameters(
 449                            connection=self.connection
 450                        )
 451                        field_type = field_db_params["type"]
 452                        self.execute(
 453                            *self._alter_column_comment_sql(
 454                                model, field, field_type, field.db_comment
 455                            )
 456                        )
 457        # Add any field index (deferred as SQLite _remake_table needs it).
 458        self.deferred_sql.extend(self._model_indexes_sql(model))
 459
 460    def delete_model(self, model: type[Model]) -> None:
 461        """Delete a model from the database."""
 462
 463        # Delete the table
 464        self.execute(
 465            self.sql_delete_table
 466            % {
 467                "table": self.quote_name(model.model_options.db_table),
 468            }
 469        )
 470        # Remove all deferred statements referencing the deleted table.
 471        for sql in list(self.deferred_sql):
 472            if isinstance(sql, Statement) and sql.references_table(
 473                model.model_options.db_table
 474            ):
 475                self.deferred_sql.remove(sql)
 476
 477    def add_index(self, model: type[Model], index: Index) -> None:
 478        """Add an index on a model."""
 479        if (
 480            index.contains_expressions
 481            and not self.connection.features.supports_expression_indexes
 482        ):
 483            return None
 484        # Index.create_sql returns interpolated SQL which makes params=None a
 485        # necessity to avoid escaping attempts on execution.
 486        self.execute(index.create_sql(model, self), params=None)
 487
 488    def remove_index(self, model: type[Model], index: Index) -> None:
 489        """Remove an index from a model."""
 490        if (
 491            index.contains_expressions
 492            and not self.connection.features.supports_expression_indexes
 493        ):
 494            return None
 495        self.execute(index.remove_sql(model, self))
 496
 497    def rename_index(
 498        self, model: type[Model], old_index: Index, new_index: Index
 499    ) -> None:
 500        if self.connection.features.can_rename_index:
 501            self.execute(
 502                self._rename_index_sql(model, old_index.name, new_index.name),
 503                params=None,
 504            )
 505        else:
 506            self.remove_index(model, old_index)
 507            self.add_index(model, new_index)
 508
 509    def add_constraint(self, model: type[Model], constraint: BaseConstraint) -> None:
 510        """Add a constraint to a model."""
 511        sql = constraint.create_sql(model, self)
 512        if sql:
 513            # Constraint.create_sql returns interpolated SQL which makes
 514            # params=None a necessity to avoid escaping attempts on execution.
 515            self.execute(sql, params=None)
 516
 517    def remove_constraint(self, model: type[Model], constraint: BaseConstraint) -> None:
 518        """Remove a constraint from a model."""
 519        sql = constraint.remove_sql(model, self)
 520        if sql:
 521            self.execute(sql)
 522
 523    def alter_db_table(
 524        self, model: type[Model], old_db_table: str, new_db_table: str
 525    ) -> None:
 526        """Rename the table a model points to."""
 527        if old_db_table == new_db_table or (
 528            self.connection.features.ignores_table_name_case
 529            and old_db_table.lower() == new_db_table.lower()
 530        ):
 531            return
 532        self.execute(
 533            self.sql_rename_table
 534            % {
 535                "old_table": self.quote_name(old_db_table),
 536                "new_table": self.quote_name(new_db_table),
 537            }
 538        )
 539        # Rename all references to the old table name.
 540        for sql in self.deferred_sql:
 541            if isinstance(sql, Statement):
 542                sql.rename_table_references(old_db_table, new_db_table)
 543
 544    def alter_db_table_comment(
 545        self,
 546        model: type[Model],
 547        old_db_table_comment: str | None,
 548        new_db_table_comment: str | None,
 549    ) -> None:
 550        self.execute(
 551            self.sql_alter_table_comment
 552            % {
 553                "table": self.quote_name(model.model_options.db_table),
 554                "comment": self.quote_value(new_db_table_comment or ""),
 555            }
 556        )
 557
 558    def add_field(self, model: type[Model], field: Field) -> None:
 559        """
 560        Create a field on a model. Usually involves adding a column, but may
 561        involve adding a table instead (for M2M fields).
 562        """
 563        # Get the column's definition
 564        definition, params = self.column_sql(model, field, include_default=True)
 565        # It might not actually have a column behind it
 566        if definition is None:
 567            return
 568        if col_type_suffix := field.db_type_suffix(connection=self.connection):
 569            definition += f" {col_type_suffix}"
 570        # Check constraints can go on the column SQL here
 571        db_params = field.db_parameters(connection=self.connection)
 572        if db_params["check"]:
 573            definition += " " + self.sql_check_constraint % db_params
 574        if (
 575            field.remote_field
 576            and self.connection.features.supports_foreign_keys
 577            and field.db_constraint  # type: ignore[attr-defined]
 578        ):
 579            constraint_suffix = "_fk_%(to_table)s_%(to_column)s"
 580            # Add FK constraint inline, if supported.
 581            if self.sql_create_column_inline_fk:
 582                to_table = field.remote_field.model.model_options.db_table
 583                to_column = field.remote_field.model._model_meta.get_field(
 584                    field.remote_field.field_name
 585                ).column
 586                namespace, _ = split_identifier(model.model_options.db_table)
 587                definition += " " + self.sql_create_column_inline_fk % {
 588                    "name": self._fk_constraint_name(model, field, constraint_suffix),
 589                    "namespace": f"{self.quote_name(namespace)}." if namespace else "",
 590                    "column": self.quote_name(field.column),
 591                    "to_table": self.quote_name(to_table),
 592                    "to_column": self.quote_name(to_column),
 593                    "deferrable": self.connection.ops.deferrable_sql(),
 594                }
 595            # Otherwise, add FK constraints later.
 596            else:
 597                self.deferred_sql.append(
 598                    self._create_fk_sql(model, field, constraint_suffix)
 599                )
 600        # Build the SQL and run it
 601        sql = self.sql_create_column % {
 602            "table": self.quote_name(model.model_options.db_table),
 603            "column": self.quote_name(field.column),
 604            "definition": definition,
 605        }
 606        self.execute(sql, params)
 607        # Drop the default if we need to
 608        # (Plain usually does not use in-database defaults)
 609        if (
 610            not self.skip_default_on_alter(field)
 611            and self.effective_default(field) is not None
 612        ):
 613            changes_sql, params = self._alter_column_default_sql(
 614                model, None, field, drop=True
 615            )
 616            sql = self.sql_alter_column % {
 617                "table": self.quote_name(model.model_options.db_table),
 618                "changes": changes_sql,
 619            }
 620            self.execute(sql, params)
 621        # Add field comment, if required.
 622        if (
 623            field.db_comment
 624            and self.connection.features.supports_comments
 625            and not self.connection.features.supports_comments_inline
 626        ):
 627            field_type = db_params["type"]
 628            self.execute(
 629                *self._alter_column_comment_sql(
 630                    model, field, field_type, field.db_comment
 631                )
 632            )
 633        # Add an index, if required
 634        self.deferred_sql.extend(self._field_indexes_sql(model, field))
 635        # Reset connection if required
 636        if self.connection.features.connection_persists_old_columns:
 637            self.connection.close()
 638
 639    def remove_field(self, model: type[Model], field: Field) -> None:
 640        """
 641        Remove a field from a model. Usually involves deleting a column,
 642        but for M2Ms may involve deleting a table.
 643        """
 644        # It might not actually have a column behind it
 645        if field.db_parameters(connection=self.connection)["type"] is None:
 646            return
 647        # Drop any FK constraints, MySQL requires explicit deletion
 648        if field.remote_field:
 649            fk_names = self._constraint_names(model, [field.column], foreign_key=True)
 650            for fk_name in fk_names:
 651                self.execute(self._delete_fk_sql(model, fk_name))
 652        # Delete the column
 653        sql = self.sql_delete_column % {
 654            "table": self.quote_name(model.model_options.db_table),
 655            "column": self.quote_name(field.column),
 656        }
 657        self.execute(sql)
 658        # Reset connection if required
 659        if self.connection.features.connection_persists_old_columns:
 660            self.connection.close()
 661        # Remove all deferred statements referencing the deleted column.
 662        for sql in list(self.deferred_sql):
 663            if isinstance(sql, Statement) and sql.references_column(
 664                model.model_options.db_table, field.column
 665            ):
 666                self.deferred_sql.remove(sql)
 667
 668    def alter_field(
 669        self,
 670        model: type[Model],
 671        old_field: Field,
 672        new_field: Field,
 673        strict: bool = False,
 674    ) -> None:
 675        """
 676        Allow a field's type, uniqueness, nullability, default, column,
 677        constraints, etc. to be modified.
 678        `old_field` is required to compute the necessary changes.
 679        If `strict` is True, raise errors if the old column does not match
 680        `old_field` precisely.
 681        """
 682        if not self._field_should_be_altered(old_field, new_field):
 683            return
 684        # Ensure this field is even column-based
 685        old_db_params = old_field.db_parameters(connection=self.connection)
 686        old_type = old_db_params["type"]
 687        new_db_params = new_field.db_parameters(connection=self.connection)
 688        new_type = new_db_params["type"]
 689        if (old_type is None and old_field.remote_field is None) or (
 690            new_type is None and new_field.remote_field is None
 691        ):
 692            raise ValueError(
 693                f"Cannot alter field {old_field} into {new_field} - they do not properly define "
 694                "db_type (are you using a badly-written custom field?)",
 695            )
 696        elif (
 697            old_type is None
 698            and new_type is None
 699            and (old_field.remote_field.through and new_field.remote_field.through)  # type: ignore[attr-defined]
 700        ):
 701            # Both sides have through models; this is a no-op.
 702            return
 703        elif old_type is None or new_type is None:
 704            raise ValueError(
 705                f"Cannot alter field {old_field} into {new_field} - they are not compatible types "
 706                "(you cannot alter to or from M2M fields, or add or remove "
 707                "through= on M2M fields)"
 708            )
 709
 710        self._alter_field(
 711            model,
 712            old_field,
 713            new_field,
 714            old_type,
 715            new_type,
 716            old_db_params,
 717            new_db_params,
 718            strict,
 719        )
 720
 721    def _field_db_check(
 722        self, field: Field, field_db_params: dict[str, Any]
 723    ) -> str | None:
 724        # Always check constraints with the same mocked column name to avoid
 725        # recreating constrains when the column is renamed.
 726        check_constraints = self.connection.data_type_check_constraints
 727        data = field.db_type_parameters(self.connection)
 728        data["column"] = "__column_name__"
 729        try:
 730            return check_constraints[field.get_internal_type()] % data
 731        except KeyError:
 732            return None
 733
 734    def _alter_field(
 735        self,
 736        model: type[Model],
 737        old_field: Field,
 738        new_field: Field,
 739        old_type: str,
 740        new_type: str,
 741        old_db_params: dict[str, Any],
 742        new_db_params: dict[str, Any],
 743        strict: bool = False,
 744    ) -> None:
 745        """Perform a "physical" (non-ManyToMany) field update."""
 746        # Drop any FK constraints, we'll remake them later
 747        fks_dropped = set()
 748        if (
 749            self.connection.features.supports_foreign_keys
 750            and old_field.remote_field
 751            and old_field.db_constraint  # type: ignore[attr-defined]
 752            and self._field_should_be_altered(
 753                old_field,
 754                new_field,
 755                ignore={"db_comment"},
 756            )
 757        ):
 758            fk_names = self._constraint_names(
 759                model, [old_field.column], foreign_key=True
 760            )
 761            if strict and len(fk_names) != 1:
 762                raise ValueError(
 763                    f"Found wrong number ({len(fk_names)}) of foreign key constraints for {model.model_options.db_table}.{old_field.column}"
 764                )
 765            for fk_name in fk_names:
 766                fks_dropped.add((old_field.column,))
 767                self.execute(self._delete_fk_sql(model, fk_name))
 768        # Has unique been removed?
 769        if old_field.primary_key and (
 770            not new_field.primary_key
 771            or self._field_became_primary_key(old_field, new_field)
 772        ):
 773            # Find the unique constraint for this field
 774            meta_constraint_names = {
 775                constraint.name for constraint in model.model_options.constraints
 776            }
 777            constraint_names = self._constraint_names(
 778                model,
 779                [old_field.column],
 780                unique=True,
 781                primary_key=False,
 782                exclude=meta_constraint_names,
 783            )
 784            if strict and len(constraint_names) != 1:
 785                raise ValueError(
 786                    f"Found wrong number ({len(constraint_names)}) of unique constraints for {model.model_options.db_table}.{old_field.column}"
 787                )
 788            for constraint_name in constraint_names:
 789                self.execute(self._delete_unique_sql(model, constraint_name))
 790        # Drop incoming FK constraints if the field is a primary key or unique,
 791        # which might be a to_field target, and things are going to change.
 792        old_collation = old_db_params.get("collation")
 793        new_collation = new_db_params.get("collation")
 794        drop_foreign_keys = (
 795            self.connection.features.supports_foreign_keys
 796            and (old_field.primary_key and new_field.primary_key)
 797            and ((old_type != new_type) or (old_collation != new_collation))
 798        )
 799        if drop_foreign_keys:
 800            # '_model_meta.related_field' also contains M2M reverse fields, these
 801            # will be filtered out
 802            for _old_rel, new_rel in _related_non_m2m_objects(old_field, new_field):
 803                rel_fk_names = self._constraint_names(
 804                    new_rel.related_model, [new_rel.field.column], foreign_key=True
 805                )
 806                for fk_name in rel_fk_names:
 807                    self.execute(self._delete_fk_sql(new_rel.related_model, fk_name))
 808        # Removed an index? (no strict check, as multiple indexes are possible)
 809        # Remove indexes if db_index switched to False or a unique constraint
 810        # will now be used in lieu of an index. The following lines from the
 811        # truth table show all True cases; the rest are False:
 812        #
 813        # old_field.db_index | old_field.primary_key | new_field.db_index | new_field.primary_key
 814        # ------------------------------------------------------------------------------
 815        # True               | False            | False              | False
 816        # True               | False            | False              | True
 817        # True               | False            | True               | True
 818        if (
 819            (old_field.remote_field and old_field.db_index)  # type: ignore[attr-defined]
 820            and not old_field.primary_key
 821            and (
 822                not (new_field.remote_field and new_field.db_index)  # type: ignore[attr-defined]
 823                or new_field.primary_key
 824            )
 825        ):
 826            # Find the index for this field
 827            meta_index_names = {index.name for index in model.model_options.indexes}
 828            # Retrieve only BTREE indexes since this is what's created with
 829            # db_index=True.
 830            index_names = self._constraint_names(
 831                model,
 832                [old_field.column],
 833                index=True,
 834                type_=Index.suffix,
 835                exclude=meta_index_names,
 836            )
 837            for index_name in index_names:
 838                # The only way to check if an index was created with
 839                # db_index=True or with Index(['field'], name='foo')
 840                # is to look at its name (refs #28053).
 841                self.execute(self._delete_index_sql(model, index_name))
 842        # Change check constraints?
 843        old_db_check = self._field_db_check(old_field, old_db_params)
 844        new_db_check = self._field_db_check(new_field, new_db_params)
 845        if old_db_check != new_db_check and old_db_check:
 846            meta_constraint_names = {
 847                constraint.name for constraint in model.model_options.constraints
 848            }
 849            constraint_names = self._constraint_names(
 850                model,
 851                [old_field.column],
 852                check=True,
 853                exclude=meta_constraint_names,
 854            )
 855            if strict and len(constraint_names) != 1:
 856                raise ValueError(
 857                    f"Found wrong number ({len(constraint_names)}) of check constraints for {model.model_options.db_table}.{old_field.column}"
 858                )
 859            for constraint_name in constraint_names:
 860                self.execute(self._delete_check_sql(model, constraint_name))
 861        # Have they renamed the column?
 862        if old_field.column != new_field.column:
 863            self.execute(
 864                self._rename_field_sql(
 865                    model.model_options.db_table, old_field, new_field, new_type
 866                )
 867            )
 868            # Rename all references to the renamed column.
 869            for sql in self.deferred_sql:
 870                if isinstance(sql, Statement):
 871                    sql.rename_column_references(
 872                        model.model_options.db_table, old_field.column, new_field.column
 873                    )
 874        # Next, start accumulating actions to do
 875        actions = []
 876        null_actions = []
 877        post_actions = []
 878        # Type suffix change? (e.g. auto increment).
 879        old_type_suffix = old_field.db_type_suffix(connection=self.connection)
 880        new_type_suffix = new_field.db_type_suffix(connection=self.connection)
 881        # Type, collation, or comment change?
 882        if (
 883            old_type != new_type
 884            or old_type_suffix != new_type_suffix
 885            or old_collation != new_collation
 886            or (
 887                self.connection.features.supports_comments
 888                and old_field.db_comment != new_field.db_comment
 889            )
 890        ):
 891            fragment, other_actions = self._alter_column_type_sql(
 892                model, old_field, new_field, new_type, old_collation, new_collation
 893            )
 894            actions.append(fragment)
 895            post_actions.extend(other_actions)
 896        # When changing a column NULL constraint to NOT NULL with a given
 897        # default value, we need to perform 4 steps:
 898        #  1. Add a default for new incoming writes
 899        #  2. Update existing NULL rows with new default
 900        #  3. Replace NULL constraint with NOT NULL
 901        #  4. Drop the default again.
 902        # Default change?
 903        needs_database_default = False
 904        if old_field.allow_null and not new_field.allow_null:
 905            old_default = self.effective_default(old_field)
 906            new_default = self.effective_default(new_field)
 907            if (
 908                not self.skip_default_on_alter(new_field)
 909                and old_default != new_default
 910                and new_default is not None
 911            ):
 912                needs_database_default = True
 913                actions.append(
 914                    self._alter_column_default_sql(model, old_field, new_field)
 915                )
 916        # Nullability change?
 917        if old_field.allow_null != new_field.allow_null:
 918            fragment = self._alter_column_null_sql(model, old_field, new_field)
 919            if fragment:
 920                null_actions.append(fragment)
 921        # Only if we have a default and there is a change from NULL to NOT NULL
 922        four_way_default_alteration = new_field.has_default() and (
 923            old_field.allow_null and not new_field.allow_null
 924        )
 925        if actions or null_actions:
 926            if not four_way_default_alteration:
 927                # If we don't have to do a 4-way default alteration we can
 928                # directly run a (NOT) NULL alteration
 929                actions += null_actions
 930            # Combine actions together if we can (e.g. postgres)
 931            if self.connection.features.supports_combined_alters and actions:
 932                sql, params = tuple(zip(*actions))
 933                actions = [(", ".join(sql), sum(params, []))]
 934            # Apply those actions
 935            for sql, params in actions:
 936                self.execute(
 937                    self.sql_alter_column
 938                    % {
 939                        "table": self.quote_name(model.model_options.db_table),
 940                        "changes": sql,
 941                    },
 942                    params,
 943                )
 944            if four_way_default_alteration:
 945                # Update existing rows with default value
 946                self.execute(
 947                    self.sql_update_with_default
 948                    % {
 949                        "table": self.quote_name(model.model_options.db_table),
 950                        "column": self.quote_name(new_field.column),
 951                        "default": "%s",
 952                    },
 953                    [new_default],
 954                )
 955                # Since we didn't run a NOT NULL change before we need to do it
 956                # now
 957                for sql, params in null_actions:
 958                    self.execute(
 959                        self.sql_alter_column
 960                        % {
 961                            "table": self.quote_name(model.model_options.db_table),
 962                            "changes": sql,
 963                        },
 964                        params,
 965                    )
 966        if post_actions:
 967            for sql, params in post_actions:
 968                self.execute(sql, params)
 969        # If primary_key changed to False, delete the primary key constraint.
 970        if old_field.primary_key and not new_field.primary_key:
 971            self._delete_primary_key(model, strict)
 972
 973        # Added an index? Add an index if db_index switched to True or a unique
 974        # constraint will no longer be used in lieu of an index. The following
 975        # lines from the truth table show all True cases; the rest are False:
 976        #
 977        # old_field.db_index | old_field.primary_key | new_field.db_index | new_field.primary_key
 978        # ------------------------------------------------------------------------------
 979        # False              | False            | True               | False
 980        # False              | True             | True               | False
 981        # True               | True             | True               | False
 982        if (
 983            (
 984                not (old_field.remote_field and old_field.db_index)  # type: ignore[attr-defined]
 985                or old_field.primary_key
 986            )
 987            and (new_field.remote_field and new_field.db_index)  # type: ignore[attr-defined]
 988            and not new_field.primary_key
 989        ):
 990            self.execute(self._create_index_sql(model, fields=[new_field]))
 991        # Type alteration on primary key? Then we need to alter the column
 992        # referring to us.
 993        rels_to_update = []
 994        if drop_foreign_keys:
 995            rels_to_update.extend(_related_non_m2m_objects(old_field, new_field))
 996        # Changed to become primary key?
 997        if self._field_became_primary_key(old_field, new_field):
 998            # Make the new one
 999            self.execute(self._create_primary_key_sql(model, new_field))
1000            # Update all referencing columns
1001            rels_to_update.extend(_related_non_m2m_objects(old_field, new_field))
1002        # Handle our type alters on the other end of rels from the PK stuff above
1003        for old_rel, new_rel in rels_to_update:
1004            rel_db_params = new_rel.field.db_parameters(connection=self.connection)
1005            rel_type = rel_db_params["type"]
1006            rel_collation = rel_db_params.get("collation")
1007            old_rel_db_params = old_rel.field.db_parameters(connection=self.connection)
1008            old_rel_collation = old_rel_db_params.get("collation")
1009            fragment, other_actions = self._alter_column_type_sql(
1010                new_rel.related_model,
1011                old_rel.field,
1012                new_rel.field,
1013                rel_type,
1014                old_rel_collation,
1015                rel_collation,
1016            )
1017            self.execute(
1018                self.sql_alter_column
1019                % {
1020                    "table": self.quote_name(
1021                        new_rel.related_model.model_options.db_table
1022                    ),
1023                    "changes": fragment[0],
1024                },
1025                fragment[1],
1026            )
1027            for sql, params in other_actions:
1028                self.execute(sql, params)
1029        # Does it have a foreign key?
1030        if (
1031            self.connection.features.supports_foreign_keys
1032            and new_field.remote_field
1033            and (
1034                fks_dropped or not old_field.remote_field or not old_field.db_constraint  # type: ignore[attr-defined]
1035            )
1036            and new_field.db_constraint  # type: ignore[attr-defined]
1037        ):
1038            self.execute(
1039                self._create_fk_sql(model, new_field, "_fk_%(to_table)s_%(to_column)s")
1040            )
1041        # Rebuild FKs that pointed to us if we previously had to drop them
1042        if drop_foreign_keys:
1043            for _, rel in rels_to_update:
1044                if rel.field.db_constraint:
1045                    self.execute(
1046                        self._create_fk_sql(rel.related_model, rel.field, "_fk")
1047                    )
1048        # Does it have check constraints we need to add?
1049        if old_db_check != new_db_check and new_db_check:
1050            constraint_name = self._create_index_name(
1051                model.model_options.db_table, [new_field.column], suffix="_check"
1052            )
1053            self.execute(
1054                self._create_check_sql(model, constraint_name, new_db_params["check"])
1055            )
1056        # Drop the default if we need to
1057        # (Plain usually does not use in-database defaults)
1058        if needs_database_default:
1059            changes_sql, params = self._alter_column_default_sql(
1060                model, old_field, new_field, drop=True
1061            )
1062            sql = self.sql_alter_column % {
1063                "table": self.quote_name(model.model_options.db_table),
1064                "changes": changes_sql,
1065            }
1066            self.execute(sql, params)
1067        # Reset connection if required
1068        if self.connection.features.connection_persists_old_columns:
1069            self.connection.close()
1070
1071    def _alter_column_null_sql(
1072        self, model: type[Model], old_field: Field, new_field: Field
1073    ) -> tuple[str, list[Any]]:
1074        """
1075        Hook to specialize column null alteration.
1076
1077        Return a (sql, params) fragment to set a column to null or non-null
1078        as required by new_field, or None if no changes are required.
1079        """
1080        new_db_params = new_field.db_parameters(connection=self.connection)
1081        sql = (
1082            self.sql_alter_column_null
1083            if new_field.allow_null
1084            else self.sql_alter_column_not_null
1085        )
1086        return (
1087            sql
1088            % {
1089                "column": self.quote_name(new_field.column),
1090                "type": new_db_params["type"],
1091            },
1092            [],
1093        )
1094
1095    def _alter_column_default_sql(
1096        self,
1097        model: type[Model],
1098        old_field: Field | None,
1099        new_field: Field,
1100        drop: bool = False,
1101    ) -> tuple[str, list[Any]]:
1102        """
1103        Hook to specialize column default alteration.
1104
1105        Return a (sql, params) fragment to add or drop (depending on the drop
1106        argument) a default to new_field's column.
1107        """
1108        new_default = self.effective_default(new_field)
1109        default = self._column_default_sql(new_field)
1110        params = [new_default]
1111
1112        if drop:
1113            params = []
1114        elif self.connection.features.requires_literal_defaults:
1115            # Some databases (Oracle) can't take defaults as a parameter
1116            # If this is the case, the SchemaEditor for that database should
1117            # implement prepare_default().
1118            default = self.prepare_default(new_default)
1119            params = []
1120
1121        new_db_params = new_field.db_parameters(connection=self.connection)
1122        if drop:
1123            if new_field.allow_null:
1124                sql = self.sql_alter_column_no_default_null
1125            else:
1126                sql = self.sql_alter_column_no_default
1127        else:
1128            sql = self.sql_alter_column_default
1129        return (
1130            sql
1131            % {
1132                "column": self.quote_name(new_field.column),
1133                "type": new_db_params["type"],
1134                "default": default,
1135            },
1136            params,
1137        )
1138
1139    def _alter_column_type_sql(
1140        self,
1141        model: type[Model],
1142        old_field: Field,
1143        new_field: Field,
1144        new_type: str,
1145        old_collation: str | None,
1146        new_collation: str | None,
1147    ) -> tuple[tuple[str, list[Any]], list[tuple[str, list[Any]]]]:
1148        """
1149        Hook to specialize column type alteration for different backends,
1150        for cases when a creation type is different to an alteration type
1151        (e.g. SERIAL in PostgreSQL, PostGIS fields).
1152
1153        Return a two-tuple of: an SQL fragment of (sql, params) to insert into
1154        an ALTER TABLE statement and a list of extra (sql, params) tuples to
1155        run once the field is altered.
1156        """
1157        other_actions = []
1158        if collate_sql := self._collate_sql(
1159            new_collation, old_collation, model.model_options.db_table
1160        ):
1161            collate_sql = f" {collate_sql}"
1162        else:
1163            collate_sql = ""
1164        # Comment change?
1165        comment_sql = ""
1166        if self.connection.features.supports_comments and not new_field.many_to_many:
1167            if old_field.db_comment != new_field.db_comment:
1168                # PostgreSQL and Oracle can't execute 'ALTER COLUMN ...' and
1169                # 'COMMENT ON ...' at the same time.
1170                sql, params = self._alter_column_comment_sql(
1171                    model, new_field, new_type, new_field.db_comment
1172                )
1173                if sql:
1174                    other_actions.append((sql, params))
1175            if new_field.db_comment:
1176                comment_sql = self._comment_sql(new_field.db_comment)
1177        return (
1178            (
1179                self.sql_alter_column_type
1180                % {
1181                    "column": self.quote_name(new_field.column),
1182                    "type": new_type,
1183                    "collation": collate_sql,
1184                    "comment": comment_sql,
1185                },
1186                [],
1187            ),
1188            other_actions,
1189        )
1190
1191    def _alter_column_comment_sql(
1192        self,
1193        model: type[Model],
1194        new_field: Field,
1195        new_type: str,
1196        new_db_comment: str | None,
1197    ) -> tuple[str, list[Any]]:
1198        return (
1199            self.sql_alter_column_comment
1200            % {
1201                "table": self.quote_name(model.model_options.db_table),
1202                "column": self.quote_name(new_field.column),
1203                "comment": self._comment_sql(new_db_comment),
1204            },
1205            [],
1206        )
1207
1208    def _comment_sql(self, comment: str | None) -> str:
1209        return self.quote_value(comment or "")
1210
1211    def _alter_many_to_many(
1212        self,
1213        model: type[Model],
1214        old_field: ManyToManyField,
1215        new_field: ManyToManyField,
1216        strict: bool,
1217    ) -> None:
1218        """Alter M2Ms to repoint their to= endpoints."""
1219        # Type narrow for ManyToManyField.remote_field
1220        old_rel: ManyToManyRel = old_field.remote_field  # type: ignore[assignment]
1221        new_rel: ManyToManyRel = new_field.remote_field  # type: ignore[assignment]
1222
1223        # Rename the through table
1224        if (
1225            old_rel.through.model_options.db_table
1226            != new_rel.through.model_options.db_table
1227        ):
1228            self.alter_db_table(
1229                old_rel.through,
1230                old_rel.through.model_options.db_table,
1231                new_rel.through.model_options.db_table,
1232            )
1233        # Repoint the FK to the other side
1234        self.alter_field(
1235            new_rel.through,
1236            # The field that points to the target model is needed, so we can
1237            # tell alter_field to change it - this is m2m_reverse_field_name()
1238            # (as opposed to m2m_field_name(), which points to our model).
1239            old_rel.through._model_meta.get_field(
1240                old_field.m2m_reverse_field_name()  # type: ignore[attr-defined]
1241            ),
1242            new_rel.through._model_meta.get_field(
1243                new_field.m2m_reverse_field_name()  # type: ignore[attr-defined]
1244            ),
1245        )
1246        self.alter_field(
1247            new_rel.through,
1248            # for self-referential models we need to alter field from the other end too
1249            old_rel.through._model_meta.get_field(old_field.m2m_field_name()),
1250            new_rel.through._model_meta.get_field(new_field.m2m_field_name()),
1251        )
1252
1253    def _create_index_name(
1254        self, table_name: str, column_names: list[str], suffix: str = ""
1255    ) -> str:
1256        """
1257        Generate a unique name for an index/unique constraint.
1258
1259        The name is divided into 3 parts: the table name, the column names,
1260        and a unique digest and suffix.
1261        """
1262        _, table_name = split_identifier(table_name)
1263        hash_suffix_part = (
1264            f"{names_digest(table_name, *column_names, length=8)}{suffix}"
1265        )
1266        max_length = self.connection.ops.max_name_length() or 200
1267        # If everything fits into max_length, use that name.
1268        index_name = "{}_{}_{}".format(
1269            table_name, "_".join(column_names), hash_suffix_part
1270        )
1271        if len(index_name) <= max_length:
1272            return index_name
1273        # Shorten a long suffix.
1274        if len(hash_suffix_part) > max_length / 3:
1275            hash_suffix_part = hash_suffix_part[: max_length // 3]
1276        other_length = (max_length - len(hash_suffix_part)) // 2 - 1
1277        index_name = "{}_{}_{}".format(
1278            table_name[:other_length],
1279            "_".join(column_names)[:other_length],
1280            hash_suffix_part,
1281        )
1282        # Prepend D if needed to prevent the name from starting with an
1283        # underscore or a number (not permitted on Oracle).
1284        if index_name[0] == "_" or index_name[0].isdigit():
1285            index_name = f"D{index_name[:-1]}"
1286        return index_name
1287
1288    def _index_condition_sql(self, condition: str | None) -> str:
1289        if condition:
1290            return " WHERE " + condition
1291        return ""
1292
1293    def _index_include_sql(
1294        self, model: type[Model], columns: list[str] | None
1295    ) -> str | Statement:
1296        if not columns or not self.connection.features.supports_covering_indexes:
1297            return ""
1298        return Statement(
1299            " INCLUDE (%(columns)s)",
1300            columns=Columns(model.model_options.db_table, columns, self.quote_name),
1301        )
1302
1303    def _create_index_sql(
1304        self,
1305        model: type[Model],
1306        *,
1307        fields: list[Field] | None = None,
1308        name: str | None = None,
1309        suffix: str = "",
1310        using: str = "",
1311        col_suffixes: tuple[str, ...] = (),
1312        sql: str | None = None,
1313        opclasses: tuple[str, ...] = (),
1314        condition: str | None = None,
1315        include: list[str] | None = None,
1316        expressions: Any = None,
1317    ) -> Statement:
1318        """
1319        Return the SQL statement to create the index for one or several fields
1320        or expressions. `sql` can be specified if the syntax differs from the
1321        standard (GIS indexes, ...).
1322        """
1323        fields = fields or []
1324        expressions = expressions or []
1325        compiler = Query(model, alias_cols=False).get_compiler()
1326        columns = [field.column for field in fields]
1327        sql_create_index = sql or self.sql_create_index
1328        table = model.model_options.db_table
1329
1330        def create_index_name(*args: Any, **kwargs: Any) -> str:
1331            nonlocal name
1332            if name is None:
1333                name = self._create_index_name(*args, **kwargs)
1334            return self.quote_name(name)
1335
1336        return Statement(
1337            sql_create_index,
1338            table=Table(table, self.quote_name),
1339            name=IndexName(table, columns, suffix, create_index_name),
1340            using=using,
1341            columns=(
1342                self._index_columns(table, columns, col_suffixes, opclasses)
1343                if columns
1344                else Expressions(table, expressions, compiler, self.quote_value)
1345            ),
1346            extra="",
1347            condition=self._index_condition_sql(condition),
1348            include=self._index_include_sql(model, include),
1349        )
1350
1351    def _delete_index_sql(
1352        self, model: type[Model], name: str, sql: str | None = None
1353    ) -> Statement:
1354        return Statement(
1355            sql or self.sql_delete_index,
1356            table=Table(model.model_options.db_table, self.quote_name),
1357            name=self.quote_name(name),
1358        )
1359
1360    def _rename_index_sql(
1361        self, model: type[Model], old_name: str, new_name: str
1362    ) -> Statement:
1363        return Statement(
1364            self.sql_rename_index,
1365            table=Table(model.model_options.db_table, self.quote_name),
1366            old_name=self.quote_name(old_name),
1367            new_name=self.quote_name(new_name),
1368        )
1369
1370    def _index_columns(
1371        self,
1372        table: str,
1373        columns: list[str],
1374        col_suffixes: tuple[str, ...],
1375        opclasses: tuple[str, ...],
1376    ) -> Columns:
1377        return Columns(table, columns, self.quote_name, col_suffixes=col_suffixes)
1378
1379    def _model_indexes_sql(self, model: type[Model]) -> list[Statement | None]:
1380        """
1381        Return a list of all index SQL statements (field indexes, Meta.indexes) for the specified model.
1382        """
1383        output: list[Statement | None] = []
1384        for field in model._model_meta.local_fields:
1385            output.extend(self._field_indexes_sql(model, field))
1386
1387        for index in model.model_options.indexes:
1388            if (
1389                not index.contains_expressions
1390                or self.connection.features.supports_expression_indexes
1391            ):
1392                output.append(index.create_sql(model, self))
1393        return output
1394
1395    def _field_indexes_sql(self, model: type[Model], field: Field) -> list[Statement]:
1396        """
1397        Return a list of all index SQL statements for the specified field.
1398        """
1399        output: list[Statement] = []
1400        if self._field_should_be_indexed(model, field):
1401            output.append(self._create_index_sql(model, fields=[field]))
1402        return output
1403
1404    def _field_should_be_altered(
1405        self, old_field: Field, new_field: Field, ignore: set[str] | None = None
1406    ) -> bool:
1407        ignore = ignore or set()
1408        _, old_path, old_args, old_kwargs = old_field.deconstruct()
1409        _, new_path, new_args, new_kwargs = new_field.deconstruct()
1410        # Don't alter when:
1411        # - changing only a field name
1412        # - changing an attribute that doesn't affect the schema
1413        # - changing an attribute in the provided set of ignored attributes
1414        # - adding only a db_column and the column name is not changed
1415        for attr in ignore.union(old_field.non_db_attrs):  # type: ignore[attr-defined]
1416            old_kwargs.pop(attr, None)
1417        for attr in ignore.union(new_field.non_db_attrs):  # type: ignore[attr-defined]
1418            new_kwargs.pop(attr, None)
1419        return self.quote_name(old_field.column) != self.quote_name(
1420            new_field.column
1421        ) or (old_path, old_args, old_kwargs) != (new_path, new_args, new_kwargs)
1422
1423    def _field_should_be_indexed(self, model: type[Model], field: Field) -> bool:
1424        return (field.remote_field and field.db_index) and not field.primary_key  # type: ignore[attr-defined]
1425
1426    def _field_became_primary_key(self, old_field: Field, new_field: Field) -> bool:
1427        return not old_field.primary_key and new_field.primary_key
1428
1429    def _rename_field_sql(
1430        self, table: str, old_field: Field, new_field: Field, new_type: str
1431    ) -> str:
1432        return self.sql_rename_column % {
1433            "table": self.quote_name(table),
1434            "old_column": self.quote_name(old_field.column),
1435            "new_column": self.quote_name(new_field.column),
1436            "type": new_type,
1437        }
1438
1439    def _create_fk_sql(
1440        self, model: type[Model], field: ForeignKey, suffix: str
1441    ) -> Statement:
1442        table = Table(model.model_options.db_table, self.quote_name)
1443        name = self._fk_constraint_name(model, field, suffix)
1444        column = Columns(model.model_options.db_table, [field.column], self.quote_name)
1445        to_table = Table(
1446            field.target_field.model.model_options.db_table, self.quote_name
1447        )
1448        to_column = Columns(
1449            field.target_field.model.model_options.db_table,
1450            [field.target_field.column],  # type: ignore[attr-defined]
1451            self.quote_name,
1452        )
1453        deferrable = self.connection.ops.deferrable_sql()
1454        return Statement(
1455            self.sql_create_fk,
1456            table=table,
1457            name=name,
1458            column=column,
1459            to_table=to_table,
1460            to_column=to_column,
1461            deferrable=deferrable,
1462        )
1463
1464    def _fk_constraint_name(
1465        self, model: type[Model], field: ForeignKey, suffix: str
1466    ) -> ForeignKeyName:
1467        def create_fk_name(*args: Any, **kwargs: Any) -> str:
1468            return self.quote_name(self._create_index_name(*args, **kwargs))
1469
1470        return ForeignKeyName(
1471            model.model_options.db_table,
1472            [field.column],
1473            split_identifier(field.target_field.model.model_options.db_table)[1],
1474            [field.target_field.column],  # type: ignore[attr-defined]
1475            suffix,
1476            create_fk_name,
1477        )
1478
1479    def _delete_fk_sql(self, model: type[Model], name: str) -> Statement:
1480        return self._delete_constraint_sql(self.sql_delete_fk, model, name)
1481
1482    def _deferrable_constraint_sql(self, deferrable: Deferrable | None) -> str:
1483        if deferrable is None:
1484            return ""
1485        if deferrable == Deferrable.DEFERRED:
1486            return " DEFERRABLE INITIALLY DEFERRED"
1487        if deferrable == Deferrable.IMMEDIATE:
1488            return " DEFERRABLE INITIALLY IMMEDIATE"
1489        return ""
1490
1491    def _unique_sql(
1492        self,
1493        model: type[Model],
1494        fields: Iterable[Field],
1495        name: str,
1496        condition: str | None = None,
1497        deferrable: Deferrable | None = None,
1498        include: list[str] | None = None,
1499        opclasses: tuple[str, ...] | None = None,
1500        expressions: Any = None,
1501    ) -> str | None:
1502        if (
1503            deferrable
1504            and not self.connection.features.supports_deferrable_unique_constraints
1505        ):
1506            return None
1507        if condition or include or opclasses or expressions:
1508            # Databases support conditional, covering, and functional unique
1509            # constraints via a unique index.
1510            sql = self._create_unique_sql(
1511                model,
1512                fields,
1513                name=name,
1514                condition=condition,
1515                include=include,
1516                opclasses=opclasses,
1517                expressions=expressions,
1518            )
1519            if sql:
1520                self.deferred_sql.append(sql)
1521            return None
1522        constraint = self.sql_unique_constraint % {
1523            "columns": ", ".join([self.quote_name(field.column) for field in fields]),
1524            "deferrable": self._deferrable_constraint_sql(deferrable),
1525        }
1526        return self.sql_constraint % {
1527            "name": self.quote_name(name),
1528            "constraint": constraint,
1529        }
1530
1531    def _create_unique_sql(
1532        self,
1533        model: type[Model],
1534        fields: Iterable[Field],
1535        name: str | None = None,
1536        condition: str | None = None,
1537        deferrable: Deferrable | None = None,
1538        include: list[str] | None = None,
1539        opclasses: tuple[str, ...] | None = None,
1540        expressions: Any = None,
1541    ) -> Statement | None:
1542        if (
1543            (
1544                deferrable
1545                and not self.connection.features.supports_deferrable_unique_constraints
1546            )
1547            or (condition and not self.connection.features.supports_partial_indexes)
1548            or (include and not self.connection.features.supports_covering_indexes)
1549            or (
1550                expressions and not self.connection.features.supports_expression_indexes
1551            )
1552        ):
1553            return None
1554
1555        compiler = Query(model, alias_cols=False).get_compiler()
1556        table = model.model_options.db_table
1557        columns = [field.column for field in fields]
1558        if name is None:
1559            name = self._unique_constraint_name(table, columns, quote=True)
1560        else:
1561            name = self.quote_name(name)
1562        if condition or include or opclasses or expressions:
1563            sql = self.sql_create_unique_index
1564        else:
1565            sql = self.sql_create_unique
1566        if columns:
1567            columns_obj: Columns | Expressions = self._index_columns(
1568                table, columns, col_suffixes=(), opclasses=opclasses
1569            )
1570        else:
1571            columns_obj = Expressions(table, expressions, compiler, self.quote_value)
1572        return Statement(
1573            sql,
1574            table=Table(table, self.quote_name),
1575            name=name,
1576            columns=columns_obj,
1577            condition=self._index_condition_sql(condition),
1578            deferrable=self._deferrable_constraint_sql(deferrable),
1579            include=self._index_include_sql(model, include),
1580        )
1581
1582    def _unique_constraint_name(
1583        self, table: str, columns: list[str], quote: bool = True
1584    ) -> IndexName | str:
1585        if quote:
1586
1587            def create_unique_name(*args: Any, **kwargs: Any) -> str:
1588                return self.quote_name(self._create_index_name(*args, **kwargs))
1589
1590        else:
1591            create_unique_name = self._create_index_name
1592
1593        return IndexName(table, columns, "_uniq", create_unique_name)
1594
1595    def _delete_unique_sql(
1596        self,
1597        model: type[Model],
1598        name: str,
1599        condition: str | None = None,
1600        deferrable: Deferrable | None = None,
1601        include: list[str] | None = None,
1602        opclasses: tuple[str, ...] | None = None,
1603        expressions: Any = None,
1604    ) -> Statement | None:
1605        if (
1606            (
1607                deferrable
1608                and not self.connection.features.supports_deferrable_unique_constraints
1609            )
1610            or (condition and not self.connection.features.supports_partial_indexes)
1611            or (include and not self.connection.features.supports_covering_indexes)
1612            or (
1613                expressions and not self.connection.features.supports_expression_indexes
1614            )
1615        ):
1616            return None
1617        if condition or include or opclasses or expressions:
1618            sql = self.sql_delete_index
1619        else:
1620            sql = self.sql_delete_unique
1621        return self._delete_constraint_sql(sql, model, name)
1622
1623    def _check_sql(self, name: str, check: str) -> str:
1624        return self.sql_constraint % {
1625            "name": self.quote_name(name),
1626            "constraint": self.sql_check_constraint % {"check": check},
1627        }
1628
1629    def _create_check_sql(
1630        self, model: type[Model], name: str, check: str
1631    ) -> Statement | None:
1632        if not self.connection.features.supports_table_check_constraints:
1633            return None
1634        return Statement(
1635            self.sql_create_check,
1636            table=Table(model.model_options.db_table, self.quote_name),
1637            name=self.quote_name(name),
1638            check=check,
1639        )
1640
1641    def _delete_check_sql(self, model: type[Model], name: str) -> Statement | None:
1642        if not self.connection.features.supports_table_check_constraints:
1643            return None
1644        return self._delete_constraint_sql(self.sql_delete_check, model, name)
1645
1646    def _delete_constraint_sql(
1647        self, template: str, model: type[Model], name: str
1648    ) -> Statement:
1649        return Statement(
1650            template,
1651            table=Table(model.model_options.db_table, self.quote_name),
1652            name=self.quote_name(name),
1653        )
1654
1655    def _constraint_names(
1656        self,
1657        model: type[Model],
1658        column_names: list[str] | None = None,
1659        unique: bool | None = None,
1660        primary_key: bool | None = None,
1661        index: bool | None = None,
1662        foreign_key: bool | None = None,
1663        check: bool | None = None,
1664        type_: str | None = None,
1665        exclude: set[str] | None = None,
1666    ) -> list[str]:
1667        """Return all constraint names matching the columns and conditions."""
1668        if column_names is not None:
1669            column_names = [
1670                self.connection.introspection.identifier_converter(
1671                    truncate_name(name, self.connection.ops.max_name_length())
1672                )
1673                if self.connection.features.truncates_names
1674                else self.connection.introspection.identifier_converter(name)
1675                for name in column_names
1676            ]
1677        with self.connection.cursor() as cursor:
1678            constraints = self.connection.introspection.get_constraints(
1679                cursor, model.model_options.db_table
1680            )
1681        result: list[str] = []
1682        for name, infodict in constraints.items():
1683            if column_names is None or column_names == infodict["columns"]:
1684                if unique is not None and infodict["unique"] != unique:
1685                    continue
1686                if primary_key is not None and infodict["primary_key"] != primary_key:
1687                    continue
1688                if index is not None and infodict["index"] != index:
1689                    continue
1690                if check is not None and infodict["check"] != check:
1691                    continue
1692                if foreign_key is not None and not infodict["foreign_key"]:
1693                    continue
1694                if type_ is not None and infodict["type"] != type_:
1695                    continue
1696                if not exclude or name not in exclude:
1697                    result.append(name)
1698        return result
1699
1700    def _delete_primary_key(self, model: type[Model], strict: bool = False) -> None:
1701        constraint_names = self._constraint_names(model, primary_key=True)
1702        if strict and len(constraint_names) != 1:
1703            raise ValueError(
1704                f"Found wrong number ({len(constraint_names)}) of PK constraints for {model.model_options.db_table}"
1705            )
1706        for constraint_name in constraint_names:
1707            self.execute(self._delete_primary_key_sql(model, constraint_name))
1708
1709    def _create_primary_key_sql(self, model: type[Model], field: Field) -> Statement:
1710        return Statement(
1711            self.sql_create_pk,
1712            table=Table(model.model_options.db_table, self.quote_name),
1713            name=self.quote_name(
1714                self._create_index_name(
1715                    model.model_options.db_table, [field.column], suffix="_pk"
1716                )
1717            ),
1718            columns=Columns(
1719                model.model_options.db_table, [field.column], self.quote_name
1720            ),
1721        )
1722
1723    def _delete_primary_key_sql(self, model: type[Model], name: str) -> Statement:
1724        return self._delete_constraint_sql(self.sql_delete_pk, model, name)
1725
1726    def _collate_sql(
1727        self,
1728        collation: str | None,
1729        old_collation: str | None = None,
1730        table_name: str | None = None,
1731    ) -> str:
1732        return "COLLATE " + self.quote_name(collation) if collation else ""