Plain is headed towards 1.0! Subscribe for development updates →

   1from __future__ import annotations
   2
   3import logging
   4import operator
   5from abc import ABC, abstractmethod
   6from collections.abc import Generator
   7from datetime import datetime
   8from typing import TYPE_CHECKING, Any
   9
  10if TYPE_CHECKING:
  11    from typing import Self
  12
  13from plain.models.backends.ddl_references import (
  14    Columns,
  15    Expressions,
  16    ForeignKeyName,
  17    IndexName,
  18    Statement,
  19    Table,
  20)
  21from plain.models.backends.utils import names_digest, split_identifier, truncate_name
  22from plain.models.constraints import Deferrable
  23from plain.models.fields import DbParameters, Field
  24from plain.models.fields.related import ForeignKeyField, RelatedField
  25from plain.models.fields.reverse_related import ForeignObjectRel, ManyToManyRel
  26from plain.models.indexes import Index
  27from plain.models.sql import Query
  28from plain.models.transaction import TransactionManagementError, atomic
  29from plain.utils import timezone
  30
  31if TYPE_CHECKING:
  32    from collections.abc import Iterable
  33
  34    from plain.models.backends.base.base import BaseDatabaseWrapper
  35    from plain.models.base import Model
  36    from plain.models.constraints import BaseConstraint
  37    from plain.models.fields import Field
  38    from plain.models.fields.related import ForeignKeyField, ManyToManyField
  39    from plain.models.fields.reverse_related import ManyToManyRel
  40
  41logger = logging.getLogger("plain.models.backends.schema")
  42
  43
  44def _is_relevant_relation(relation: ForeignObjectRel, altered_field: Field) -> bool:
  45    """
  46    When altering the given field, must constraints on its model from the given
  47    relation be temporarily dropped?
  48    """
  49    from plain.models.fields.related import ManyToManyField
  50
  51    field = relation.field
  52    if isinstance(field, ManyToManyField):
  53        # M2M reverse field
  54        return False
  55    if altered_field.primary_key:
  56        # Foreign key constraint on the primary key, which is being altered.
  57        return True
  58    # ForeignKeyField always targets 'id'
  59    return altered_field.name == "id"
  60
  61
  62def _all_related_fields(model: type[Model]) -> list[ForeignObjectRel]:
  63    # Related fields must be returned in a deterministic order.
  64    return sorted(
  65        model._model_meta._get_fields(
  66            forward=False,
  67            reverse=True,
  68        ),
  69        key=operator.attrgetter("name"),
  70    )
  71
  72
  73def _related_non_m2m_objects(
  74    old_field: Field, new_field: Field
  75) -> Generator[tuple[ForeignObjectRel, ForeignObjectRel], None, None]:
  76    # Filter out m2m objects from reverse relations.
  77    # Return (old_relation, new_relation) tuples.
  78    related_fields = zip(
  79        (
  80            obj
  81            for obj in _all_related_fields(old_field.model)
  82            if _is_relevant_relation(obj, old_field)
  83        ),
  84        (
  85            obj
  86            for obj in _all_related_fields(new_field.model)
  87            if _is_relevant_relation(obj, new_field)
  88        ),
  89    )
  90    for old_rel, new_rel in related_fields:
  91        yield old_rel, new_rel
  92        yield from _related_non_m2m_objects(
  93            old_rel.remote_field,
  94            new_rel.remote_field,
  95        )
  96
  97
  98class BaseDatabaseSchemaEditor(ABC):
  99    """
 100    This class and its subclasses are responsible for emitting schema-changing
 101    statements to the databases - model creation/removal/alteration, field
 102    renaming, index fiddling, and so on.
 103    """
 104
 105    # Overrideable SQL templates
 106    sql_create_table = "CREATE TABLE %(table)s (%(definition)s)"
 107    sql_rename_table = "ALTER TABLE %(old_table)s RENAME TO %(new_table)s"
 108    sql_delete_table = "DROP TABLE %(table)s CASCADE"
 109
 110    sql_create_column = "ALTER TABLE %(table)s ADD COLUMN %(column)s %(definition)s"
 111    sql_alter_column = "ALTER TABLE %(table)s %(changes)s"
 112    sql_alter_column_type = "ALTER COLUMN %(column)s TYPE %(type)s%(collation)s"
 113    sql_alter_column_null = "ALTER COLUMN %(column)s DROP NOT NULL"
 114    sql_alter_column_not_null = "ALTER COLUMN %(column)s SET NOT NULL"
 115    sql_alter_column_default = "ALTER COLUMN %(column)s SET DEFAULT %(default)s"
 116    sql_alter_column_no_default = "ALTER COLUMN %(column)s DROP DEFAULT"
 117    sql_alter_column_no_default_null = sql_alter_column_no_default
 118    sql_delete_column = "ALTER TABLE %(table)s DROP COLUMN %(column)s CASCADE"
 119    sql_rename_column = (
 120        "ALTER TABLE %(table)s RENAME COLUMN %(old_column)s TO %(new_column)s"
 121    )
 122    sql_update_with_default = (
 123        "UPDATE %(table)s SET %(column)s = %(default)s WHERE %(column)s IS NULL"
 124    )
 125
 126    sql_unique_constraint = "UNIQUE (%(columns)s)%(deferrable)s"
 127    sql_check_constraint = "CHECK (%(check)s)"
 128    sql_delete_constraint = "ALTER TABLE %(table)s DROP CONSTRAINT %(name)s"
 129    sql_constraint = "CONSTRAINT %(name)s %(constraint)s"
 130
 131    sql_create_check = "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s CHECK (%(check)s)"
 132    sql_delete_check = sql_delete_constraint
 133
 134    sql_create_unique = (
 135        "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s "
 136        "UNIQUE (%(columns)s)%(deferrable)s"
 137    )
 138    sql_delete_unique = sql_delete_constraint
 139
 140    sql_create_fk = (
 141        "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s FOREIGN KEY (%(column)s) "
 142        "REFERENCES %(to_table)s (%(to_column)s)%(deferrable)s"
 143    )
 144    sql_create_inline_fk = None
 145    sql_create_column_inline_fk = None
 146    sql_delete_fk = sql_delete_constraint
 147
 148    sql_create_index = (
 149        "CREATE INDEX %(name)s ON %(table)s "
 150        "(%(columns)s)%(include)s%(extra)s%(condition)s"
 151    )
 152    sql_create_unique_index = (
 153        "CREATE UNIQUE INDEX %(name)s ON %(table)s "
 154        "(%(columns)s)%(include)s%(condition)s"
 155    )
 156    sql_rename_index = "ALTER INDEX %(old_name)s RENAME TO %(new_name)s"
 157    sql_delete_index = "DROP INDEX %(name)s"
 158
 159    sql_create_pk = (
 160        "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s PRIMARY KEY (%(columns)s)"
 161    )
 162    sql_delete_pk = sql_delete_constraint
 163
 164    sql_alter_table_comment = "COMMENT ON TABLE %(table)s IS %(comment)s"
 165    sql_alter_column_comment = "COMMENT ON COLUMN %(table)s.%(column)s IS %(comment)s"
 166
 167    def __init__(
 168        self,
 169        connection: BaseDatabaseWrapper,
 170        atomic: bool = True,
 171    ):
 172        self.connection = connection
 173        self.atomic_migration = self.connection.features.can_rollback_ddl and atomic
 174
 175    # State-managing methods
 176
 177    def __enter__(self) -> Self:
 178        self.deferred_sql: list[Any] = []
 179        self.executed_sql: list[str] = []
 180        if self.atomic_migration:
 181            self.atomic = atomic()
 182            self.atomic.__enter__()
 183        return self
 184
 185    def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
 186        if exc_type is None:
 187            for sql in self.deferred_sql:
 188                self.execute(sql)
 189        if self.atomic_migration:
 190            self.atomic.__exit__(exc_type, exc_value, traceback)
 191
 192    # Core utility functions
 193
 194    def execute(
 195        self, sql: str | Statement, params: tuple[Any, ...] | list[Any] | None = ()
 196    ) -> None:
 197        """Execute the given SQL statement, with optional parameters."""
 198        if (
 199            self.connection.in_atomic_block
 200            and not self.connection.features.can_rollback_ddl
 201        ):
 202            raise TransactionManagementError(
 203                "Executing DDL statements while in a transaction on databases "
 204                "that can't perform a rollback is prohibited."
 205            )
 206        # Account for non-string statement objects.
 207        sql = str(sql)
 208        # Log the command we're running, then run it
 209        logger.debug(
 210            "%s; (params %r)", sql, params, extra={"params": params, "sql": sql}
 211        )
 212
 213        # Track executed SQL for display in migration output
 214        # Store the SQL for display (interpolate params for readability)
 215        if params:
 216            self.executed_sql.append(sql % tuple(map(self.quote_value, params)))
 217        else:
 218            self.executed_sql.append(sql)
 219
 220        with self.connection.cursor() as cursor:
 221            cursor.execute(sql, params)
 222
 223    def quote_name(self, name: str) -> str:
 224        return self.connection.ops.quote_name(name)
 225
 226    def table_sql(self, model: type[Model]) -> tuple[str, list[Any]]:
 227        """Take a model and return its table definition."""
 228        # Create column SQL, add FK deferreds if needed.
 229        column_sqls = []
 230        params = []
 231        for field in model._model_meta.local_fields:
 232            # SQL.
 233            definition, extra_params = self.column_sql(model, field)
 234            if definition is None:
 235                continue
 236            # Check constraints can go on the column SQL here.
 237            db_params = field.db_parameters(connection=self.connection)
 238            if db_params["check"]:
 239                definition += " " + self.sql_check_constraint % db_params
 240            # Autoincrement SQL (for backends with inline variant).
 241            col_type_suffix = field.db_type_suffix(connection=self.connection)
 242            if col_type_suffix:
 243                definition += f" {col_type_suffix}"
 244            if extra_params:
 245                params.extend(extra_params)
 246            # FK.
 247            if isinstance(field, ForeignKeyField) and field.db_constraint:
 248                to_table = field.remote_field.model.model_options.db_table
 249                field_name = field.remote_field.field_name
 250                if field_name is None:
 251                    raise ValueError("Foreign key field_name cannot be None")
 252                to_field = field.remote_field.model._model_meta.get_forward_field(
 253                    field_name
 254                )
 255                to_column = to_field.column
 256                if self.sql_create_inline_fk:
 257                    definition += " " + self.sql_create_inline_fk % {
 258                        "to_table": self.quote_name(to_table),
 259                        "to_column": self.quote_name(to_column),
 260                    }
 261                elif self.connection.features.supports_foreign_keys:
 262                    self.deferred_sql.append(
 263                        self._create_fk_sql(
 264                            model, field, "_fk_%(to_table)s_%(to_column)s"
 265                        )
 266                    )
 267            # Add the SQL to our big list.
 268            column_sqls.append(f"{self.quote_name(field.column)} {definition}")
 269        constraints = [
 270            constraint.constraint_sql(model, self)
 271            for constraint in model.model_options.constraints
 272        ]
 273        sql = self.sql_create_table % {
 274            "table": self.quote_name(model.model_options.db_table),
 275            "definition": ", ".join(
 276                str(constraint)
 277                for constraint in (*column_sqls, *constraints)
 278                if constraint
 279            ),
 280        }
 281        return sql, params
 282
 283    # Field <-> database mapping functions
 284
 285    def _iter_column_sql(
 286        self,
 287        column_db_type: str,
 288        params: list[Any],
 289        model: type[Model],
 290        field: Field,
 291        field_db_params: DbParameters,
 292        include_default: bool,
 293    ) -> Generator[str, None, None]:
 294        yield column_db_type
 295        if collation := field_db_params.get("collation"):
 296            yield self._collate_sql(collation)
 297        if self.connection.features.supports_comments_inline and field.db_comment:
 298            yield self._comment_sql(field.db_comment)
 299        # Work out nullability.
 300        null = field.allow_null
 301        # Include a default value, if requested.
 302        include_default = (
 303            include_default
 304            and not self.skip_default(field)
 305            and
 306            # Don't include a default value if it's a nullable field and the
 307            # default cannot be dropped in the ALTER COLUMN statement (e.g.
 308            # MySQL longtext and longblob).
 309            not (null and self.skip_default_on_alter(field))
 310        )
 311        if include_default:
 312            default_value = self.effective_default(field)
 313            if default_value is not None:
 314                column_default = "DEFAULT " + self._column_default_sql(field)
 315                if self.connection.features.requires_literal_defaults:
 316                    # Some databases can't take defaults as a parameter (Oracle).
 317                    # If this is the case, the individual schema backend should
 318                    # implement prepare_default().
 319                    yield column_default % self.prepare_default(default_value)
 320                else:
 321                    yield column_default
 322                    params.append(default_value)
 323
 324        if not null:
 325            yield "NOT NULL"
 326        else:
 327            yield "NULL"
 328
 329        if field.primary_key:
 330            yield "PRIMARY KEY"
 331
 332    def column_sql(
 333        self, model: type[Model], field: Field, include_default: bool = False
 334    ) -> tuple[str | None, list[Any] | None]:
 335        """
 336        Return the column definition for a field. The field must already have
 337        had set_attributes_from_name() called.
 338        """
 339        # Get the column's type and use that as the basis of the SQL.
 340        field_db_params = field.db_parameters(connection=self.connection)
 341        column_db_type = field_db_params["type"]
 342        # Check for fields that aren't actually columns (e.g. M2M).
 343        if column_db_type is None:
 344            return None, None
 345        params: list[Any] = []
 346        return (
 347            " ".join(
 348                # This appends to the params being returned.
 349                self._iter_column_sql(
 350                    column_db_type,
 351                    params,
 352                    model,
 353                    field,
 354                    field_db_params,
 355                    include_default,
 356                )
 357            ),
 358            params,
 359        )
 360
 361    def skip_default(self, field: Field) -> bool:
 362        """
 363        Some backends don't accept default values for certain columns types
 364        (i.e. MySQL longtext and longblob).
 365        """
 366        return False
 367
 368    def skip_default_on_alter(self, field: Field) -> bool:
 369        """
 370        Some backends don't accept default values for certain columns types
 371        (i.e. MySQL longtext and longblob) in the ALTER COLUMN statement.
 372        """
 373        return False
 374
 375    def prepare_default(self, value: Any) -> str:
 376        """
 377        Only used for backends which have requires_literal_defaults feature
 378        """
 379        raise NotImplementedError(
 380            "subclasses of BaseDatabaseSchemaEditor for backends which have "
 381            "requires_literal_defaults must provide a prepare_default() method"
 382        )
 383
 384    def _column_default_sql(self, field: Field) -> str:
 385        """
 386        Return the SQL to use in a DEFAULT clause. The resulting string should
 387        contain a '%s' placeholder for a default value.
 388        """
 389        return "%s"
 390
 391    @staticmethod
 392    def _effective_default(field: Field) -> Any:
 393        # This method allows testing its logic without a connection.
 394        if field.has_default():
 395            default = field.get_default()
 396        elif (
 397            not field.allow_null and not field.required and field.empty_strings_allowed
 398        ):
 399            if field.get_internal_type() == "BinaryField":
 400                default = b""
 401            else:
 402                default = ""
 403        elif getattr(field, "auto_now", False) or getattr(field, "auto_now_add", False):
 404            internal_type = field.get_internal_type()
 405            if internal_type == "DateTimeField":
 406                default = timezone.now()
 407            else:
 408                default = datetime.now()
 409                if internal_type == "DateField":
 410                    default = default.date()
 411                elif internal_type == "TimeField":
 412                    default = default.time()
 413        else:
 414            default = None
 415        return default
 416
 417    def effective_default(self, field: Field) -> Any:
 418        """Return a field's effective database default value."""
 419        return field.get_db_prep_save(self._effective_default(field), self.connection)
 420
 421    @abstractmethod
 422    def quote_value(self, value: Any) -> str:
 423        """
 424        Return a quoted version of the value so it's safe to use in an SQL
 425        string. This is not safe against injection from user code; it is
 426        intended only for use in making SQL scripts or preparing default values
 427        for particularly tricky backends (defaults are not user-defined, though,
 428        so this is safe).
 429        """
 430        ...
 431
 432    # Actions
 433
 434    def create_model(self, model: type[Model]) -> None:
 435        """
 436        Create a table and any accompanying indexes or unique constraints for
 437        the given `model`.
 438        """
 439        sql, params = self.table_sql(model)
 440        # Prevent using [] as params, in the case a literal '%' is used in the
 441        # definition.
 442        self.execute(sql, params or None)
 443
 444        if self.connection.features.supports_comments:
 445            # Add table comment.
 446            if model.model_options.db_table_comment:
 447                self.alter_db_table_comment(
 448                    model, None, model.model_options.db_table_comment
 449                )
 450            # Add column comments.
 451            if not self.connection.features.supports_comments_inline:
 452                for field in model._model_meta.local_fields:
 453                    if field.db_comment:
 454                        field_db_params = field.db_parameters(
 455                            connection=self.connection
 456                        )
 457                        field_type = field_db_params["type"]
 458                        assert field_type is not None
 459                        self.execute(
 460                            *self._alter_column_comment_sql(
 461                                model, field, field_type, field.db_comment
 462                            )
 463                        )
 464        # Add any field index (deferred as SQLite _remake_table needs it).
 465        self.deferred_sql.extend(self._model_indexes_sql(model))
 466
 467    def delete_model(self, model: type[Model]) -> None:
 468        """Delete a model from the database."""
 469
 470        # Delete the table
 471        self.execute(
 472            self.sql_delete_table
 473            % {
 474                "table": self.quote_name(model.model_options.db_table),
 475            }
 476        )
 477        # Remove all deferred statements referencing the deleted table.
 478        for sql in list(self.deferred_sql):
 479            if isinstance(sql, Statement) and sql.references_table(
 480                model.model_options.db_table
 481            ):
 482                self.deferred_sql.remove(sql)
 483
 484    def add_index(self, model: type[Model], index: Index) -> None:
 485        """Add an index on a model."""
 486        if (
 487            index.contains_expressions
 488            and not self.connection.features.supports_expression_indexes
 489        ):
 490            return None
 491        # Index.create_sql returns interpolated SQL which makes params=None a
 492        # necessity to avoid escaping attempts on execution.
 493        self.execute(index.create_sql(model, self), params=None)
 494
 495    def remove_index(self, model: type[Model], index: Index) -> None:
 496        """Remove an index from a model."""
 497        if (
 498            index.contains_expressions
 499            and not self.connection.features.supports_expression_indexes
 500        ):
 501            return None
 502        self.execute(index.remove_sql(model, self))
 503
 504    def rename_index(
 505        self, model: type[Model], old_index: Index, new_index: Index
 506    ) -> None:
 507        if self.connection.features.can_rename_index:
 508            self.execute(
 509                self._rename_index_sql(model, old_index.name, new_index.name),
 510                params=None,
 511            )
 512        else:
 513            self.remove_index(model, old_index)
 514            self.add_index(model, new_index)
 515
 516    def add_constraint(self, model: type[Model], constraint: BaseConstraint) -> None:
 517        """Add a constraint to a model."""
 518        sql = constraint.create_sql(model, self)
 519        if sql:
 520            # Constraint.create_sql returns interpolated SQL which makes
 521            # params=None a necessity to avoid escaping attempts on execution.
 522            self.execute(sql, params=None)
 523
 524    def remove_constraint(self, model: type[Model], constraint: BaseConstraint) -> None:
 525        """Remove a constraint from a model."""
 526        sql = constraint.remove_sql(model, self)
 527        if sql:
 528            self.execute(sql)
 529
 530    def alter_db_table(
 531        self, model: type[Model], old_db_table: str, new_db_table: str
 532    ) -> None:
 533        """Rename the table a model points to."""
 534        if old_db_table == new_db_table or (
 535            self.connection.features.ignores_table_name_case
 536            and old_db_table.lower() == new_db_table.lower()
 537        ):
 538            return
 539        self.execute(
 540            self.sql_rename_table
 541            % {
 542                "old_table": self.quote_name(old_db_table),
 543                "new_table": self.quote_name(new_db_table),
 544            }
 545        )
 546        # Rename all references to the old table name.
 547        for sql in self.deferred_sql:
 548            if isinstance(sql, Statement):
 549                sql.rename_table_references(old_db_table, new_db_table)
 550
 551    def alter_db_table_comment(
 552        self,
 553        model: type[Model],
 554        old_db_table_comment: str | None,
 555        new_db_table_comment: str | None,
 556    ) -> None:
 557        self.execute(
 558            self.sql_alter_table_comment
 559            % {
 560                "table": self.quote_name(model.model_options.db_table),
 561                "comment": self.quote_value(new_db_table_comment or ""),
 562            }
 563        )
 564
 565    def add_field(self, model: type[Model], field: Field) -> None:
 566        """
 567        Create a field on a model. Usually involves adding a column, but may
 568        involve adding a table instead (for M2M fields).
 569        """
 570        # Get the column's definition
 571        definition, params = self.column_sql(model, field, include_default=True)
 572        # It might not actually have a column behind it
 573        if definition is None:
 574            return
 575        if col_type_suffix := field.db_type_suffix(connection=self.connection):
 576            definition += f" {col_type_suffix}"
 577        # Check constraints can go on the column SQL here
 578        db_params = field.db_parameters(connection=self.connection)
 579        if db_params["check"]:
 580            definition += " " + self.sql_check_constraint % db_params
 581        if (
 582            isinstance(field, ForeignKeyField)
 583            and self.connection.features.supports_foreign_keys
 584            and field.db_constraint
 585        ):
 586            constraint_suffix = "_fk_%(to_table)s_%(to_column)s"
 587            # Add FK constraint inline, if supported.
 588            if self.sql_create_column_inline_fk:
 589                to_table = field.remote_field.model.model_options.db_table
 590                field_name = field.remote_field.field_name
 591                if field_name is None:
 592                    raise ValueError("Foreign key field_name cannot be None")
 593                to_field = field.remote_field.model._model_meta.get_forward_field(
 594                    field_name
 595                )
 596                to_column = to_field.column
 597                namespace, _ = split_identifier(model.model_options.db_table)
 598                definition += " " + self.sql_create_column_inline_fk % {
 599                    "name": self._fk_constraint_name(model, field, constraint_suffix),
 600                    "namespace": f"{self.quote_name(namespace)}." if namespace else "",
 601                    "column": self.quote_name(field.column),
 602                    "to_table": self.quote_name(to_table),
 603                    "to_column": self.quote_name(to_column),
 604                    "deferrable": self.connection.ops.deferrable_sql(),
 605                }
 606            # Otherwise, add FK constraints later.
 607            else:
 608                self.deferred_sql.append(
 609                    self._create_fk_sql(model, field, constraint_suffix)
 610                )
 611        # Build the SQL and run it
 612        sql = self.sql_create_column % {
 613            "table": self.quote_name(model.model_options.db_table),
 614            "column": self.quote_name(field.column),
 615            "definition": definition,
 616        }
 617        self.execute(sql, params)
 618        # Drop the default if we need to
 619        # (Plain usually does not use in-database defaults)
 620        if (
 621            not self.skip_default_on_alter(field)
 622            and self.effective_default(field) is not None
 623        ):
 624            changes_sql, params = self._alter_column_default_sql(
 625                model, None, field, drop=True
 626            )
 627            sql = self.sql_alter_column % {
 628                "table": self.quote_name(model.model_options.db_table),
 629                "changes": changes_sql,
 630            }
 631            self.execute(sql, params)
 632        # Add field comment, if required.
 633        if (
 634            field.db_comment
 635            and self.connection.features.supports_comments
 636            and not self.connection.features.supports_comments_inline
 637        ):
 638            field_type = db_params["type"]
 639            assert field_type is not None
 640            self.execute(
 641                *self._alter_column_comment_sql(
 642                    model, field, field_type, field.db_comment
 643                )
 644            )
 645        # Add an index, if required
 646        self.deferred_sql.extend(self._field_indexes_sql(model, field))
 647
 648    def remove_field(self, model: type[Model], field: Field) -> None:
 649        """
 650        Remove a field from a model. Usually involves deleting a column,
 651        but for M2Ms may involve deleting a table.
 652        """
 653        # It might not actually have a column behind it
 654        if field.db_parameters(connection=self.connection)["type"] is None:
 655            return
 656        # Drop any FK constraints, MySQL requires explicit deletion
 657        if isinstance(field, RelatedField):
 658            fk_names = self._constraint_names(model, [field.column], foreign_key=True)
 659            for fk_name in fk_names:
 660                self.execute(self._delete_fk_sql(model, fk_name))
 661        # Delete the column
 662        sql = self.sql_delete_column % {
 663            "table": self.quote_name(model.model_options.db_table),
 664            "column": self.quote_name(field.column),
 665        }
 666        self.execute(sql)
 667        # Remove all deferred statements referencing the deleted column.
 668        for sql in list(self.deferred_sql):
 669            if isinstance(sql, Statement) and sql.references_column(
 670                model.model_options.db_table, field.column
 671            ):
 672                self.deferred_sql.remove(sql)
 673
 674    def alter_field(
 675        self,
 676        model: type[Model],
 677        old_field: Field,
 678        new_field: Field,
 679        strict: bool = False,
 680    ) -> None:
 681        """
 682        Allow a field's type, uniqueness, nullability, default, column,
 683        constraints, etc. to be modified.
 684        `old_field` is required to compute the necessary changes.
 685        If `strict` is True, raise errors if the old column does not match
 686        `old_field` precisely.
 687        """
 688        if not self._field_should_be_altered(old_field, new_field):
 689            return
 690        # Ensure this field is even column-based
 691        old_db_params = old_field.db_parameters(connection=self.connection)
 692        old_type = old_db_params["type"]
 693        new_db_params = new_field.db_parameters(connection=self.connection)
 694        new_type = new_db_params["type"]
 695        if (old_type is None and not isinstance(old_field, RelatedField)) or (
 696            new_type is None and not isinstance(new_field, RelatedField)
 697        ):
 698            raise ValueError(
 699                f"Cannot alter field {old_field} into {new_field} - they do not properly define "
 700                "db_type (are you using a badly-written custom field?)",
 701            )
 702        elif (
 703            old_type is None
 704            and new_type is None
 705            and isinstance(old_field, RelatedField)
 706            and isinstance(old_field.remote_field, ManyToManyRel)
 707            and isinstance(new_field, RelatedField)
 708            and isinstance(new_field.remote_field, ManyToManyRel)
 709        ):
 710            # Both sides have through models; this is a no-op.
 711            return
 712        elif old_type is None or new_type is None:
 713            raise ValueError(
 714                f"Cannot alter field {old_field} into {new_field} - they are not compatible types "
 715                "(you cannot alter to or from M2M fields, or add or remove "
 716                "through= on M2M fields)"
 717            )
 718
 719        self._alter_field(
 720            model,
 721            old_field,
 722            new_field,
 723            old_type,
 724            new_type,
 725            old_db_params,
 726            new_db_params,
 727            strict,
 728        )
 729
 730    def _field_db_check(
 731        self, field: Field, field_db_params: DbParameters
 732    ) -> str | None:
 733        # Always check constraints with the same mocked column name to avoid
 734        # recreating constrains when the column is renamed.
 735        check_constraints = self.connection.data_type_check_constraints
 736        data = field.db_type_parameters(self.connection)
 737        data["column"] = "__column_name__"
 738        try:
 739            return check_constraints[field.get_internal_type()] % data
 740        except KeyError:
 741            return None
 742
 743    def _alter_field(
 744        self,
 745        model: type[Model],
 746        old_field: Field,
 747        new_field: Field,
 748        old_type: str,
 749        new_type: str,
 750        old_db_params: DbParameters,
 751        new_db_params: DbParameters,
 752        strict: bool = False,
 753    ) -> None:
 754        """Perform a "physical" (non-ManyToMany) field update."""
 755        # Drop any FK constraints, we'll remake them later
 756        fks_dropped = set()
 757        if (
 758            self.connection.features.supports_foreign_keys
 759            and isinstance(old_field, ForeignKeyField)
 760            and old_field.db_constraint
 761            and self._field_should_be_altered(
 762                old_field,
 763                new_field,
 764                ignore={"db_comment"},
 765            )
 766        ):
 767            fk_names = self._constraint_names(
 768                model, [old_field.column], foreign_key=True
 769            )
 770            if strict and len(fk_names) != 1:
 771                raise ValueError(
 772                    f"Found wrong number ({len(fk_names)}) of foreign key constraints for {model.model_options.db_table}.{old_field.column}"
 773                )
 774            for fk_name in fk_names:
 775                fks_dropped.add((old_field.column,))
 776                self.execute(self._delete_fk_sql(model, fk_name))
 777        # Has unique been removed?
 778        if old_field.primary_key and (
 779            not new_field.primary_key
 780            or self._field_became_primary_key(old_field, new_field)
 781        ):
 782            # Find the unique constraint for this field
 783            meta_constraint_names = {
 784                constraint.name for constraint in model.model_options.constraints
 785            }
 786            constraint_names = self._constraint_names(
 787                model,
 788                [old_field.column],
 789                unique=True,
 790                primary_key=False,
 791                exclude=meta_constraint_names,
 792            )
 793            if strict and len(constraint_names) != 1:
 794                raise ValueError(
 795                    f"Found wrong number ({len(constraint_names)}) of unique constraints for {model.model_options.db_table}.{old_field.column}"
 796                )
 797            for constraint_name in constraint_names:
 798                sql = self._delete_unique_sql(model, constraint_name)
 799                if sql is not None:
 800                    self.execute(sql)
 801        # Drop incoming FK constraints if the field is a primary key or unique,
 802        # which might be a to_field target, and things are going to change.
 803        old_collation = old_db_params.get("collation")
 804        new_collation = new_db_params.get("collation")
 805        drop_foreign_keys = (
 806            self.connection.features.supports_foreign_keys
 807            and (old_field.primary_key and new_field.primary_key)
 808            and ((old_type != new_type) or (old_collation != new_collation))
 809        )
 810        if drop_foreign_keys:
 811            # '_model_meta.related_field' also contains M2M reverse fields, these
 812            # will be filtered out
 813            for _old_rel, new_rel in _related_non_m2m_objects(old_field, new_field):
 814                rel_fk_names = self._constraint_names(
 815                    new_rel.related_model, [new_rel.field.column], foreign_key=True
 816                )
 817                for fk_name in rel_fk_names:
 818                    self.execute(self._delete_fk_sql(new_rel.related_model, fk_name))
 819        # Removed an index? (no strict check, as multiple indexes are possible)
 820        # Remove indexes if db_index switched to False or a unique constraint
 821        # will now be used in lieu of an index. The following lines from the
 822        # truth table show all True cases; the rest are False:
 823        #
 824        # old_field.db_index | old_field.primary_key | new_field.db_index | new_field.primary_key
 825        # ------------------------------------------------------------------------------
 826        # True               | False            | False              | False
 827        # True               | False            | False              | True
 828        # True               | False            | True               | True
 829        if (
 830            isinstance(old_field, ForeignKeyField)
 831            and old_field.db_index
 832            and not old_field.primary_key
 833            and (
 834                not (isinstance(new_field, ForeignKeyField) and new_field.db_index)
 835                or new_field.primary_key
 836            )
 837        ):
 838            # Find the index for this field
 839            meta_index_names = {index.name for index in model.model_options.indexes}
 840            # Retrieve only BTREE indexes since this is what's created with
 841            # db_index=True.
 842            index_names = self._constraint_names(
 843                model,
 844                [old_field.column],
 845                index=True,
 846                type_=Index.suffix,
 847                exclude=meta_index_names,
 848            )
 849            for index_name in index_names:
 850                # The only way to check if an index was created with
 851                # db_index=True or with Index(['field'], name='foo')
 852                # is to look at its name (refs #28053).
 853                self.execute(self._delete_index_sql(model, index_name))
 854        # Change check constraints?
 855        old_db_check = self._field_db_check(old_field, old_db_params)
 856        new_db_check = self._field_db_check(new_field, new_db_params)
 857        if old_db_check != new_db_check and old_db_check:
 858            meta_constraint_names = {
 859                constraint.name for constraint in model.model_options.constraints
 860            }
 861            constraint_names = self._constraint_names(
 862                model,
 863                [old_field.column],
 864                check=True,
 865                exclude=meta_constraint_names,
 866            )
 867            if strict and len(constraint_names) != 1:
 868                raise ValueError(
 869                    f"Found wrong number ({len(constraint_names)}) of check constraints for {model.model_options.db_table}.{old_field.column}"
 870                )
 871            for constraint_name in constraint_names:
 872                sql = self._delete_check_sql(model, constraint_name)
 873                if sql is not None:
 874                    self.execute(sql)
 875        # Have they renamed the column?
 876        if old_field.column != new_field.column:
 877            self.execute(
 878                self._rename_field_sql(
 879                    model.model_options.db_table, old_field, new_field, new_type
 880                )
 881            )
 882            # Rename all references to the renamed column.
 883            for sql in self.deferred_sql:
 884                if isinstance(sql, Statement):
 885                    sql.rename_column_references(
 886                        model.model_options.db_table, old_field.column, new_field.column
 887                    )
 888        # Next, start accumulating actions to do
 889        actions = []
 890        null_actions = []
 891        post_actions = []
 892        # Type suffix change? (e.g. auto increment).
 893        old_type_suffix = old_field.db_type_suffix(connection=self.connection)
 894        new_type_suffix = new_field.db_type_suffix(connection=self.connection)
 895        # Type, collation, or comment change?
 896        if (
 897            old_type != new_type
 898            or old_type_suffix != new_type_suffix
 899            or old_collation != new_collation
 900            or (
 901                self.connection.features.supports_comments
 902                and old_field.db_comment != new_field.db_comment
 903            )
 904        ):
 905            fragment, other_actions = self._alter_column_type_sql(
 906                model, old_field, new_field, new_type, old_collation, new_collation
 907            )
 908            actions.append(fragment)
 909            post_actions.extend(other_actions)
 910        # When changing a column NULL constraint to NOT NULL with a given
 911        # default value, we need to perform 4 steps:
 912        #  1. Add a default for new incoming writes
 913        #  2. Update existing NULL rows with new default
 914        #  3. Replace NULL constraint with NOT NULL
 915        #  4. Drop the default again.
 916        # Default change?
 917        needs_database_default = False
 918        if old_field.allow_null and not new_field.allow_null:
 919            old_default = self.effective_default(old_field)
 920            new_default = self.effective_default(new_field)
 921            if (
 922                not self.skip_default_on_alter(new_field)
 923                and old_default != new_default
 924                and new_default is not None
 925            ):
 926                needs_database_default = True
 927                actions.append(
 928                    self._alter_column_default_sql(model, old_field, new_field)
 929                )
 930        # Nullability change?
 931        if old_field.allow_null != new_field.allow_null:
 932            fragment = self._alter_column_null_sql(model, old_field, new_field)
 933            if fragment:
 934                null_actions.append(fragment)
 935        # Only if we have a default and there is a change from NULL to NOT NULL
 936        four_way_default_alteration = new_field.has_default() and (
 937            old_field.allow_null and not new_field.allow_null
 938        )
 939        if actions or null_actions:
 940            if not four_way_default_alteration:
 941                # If we don't have to do a 4-way default alteration we can
 942                # directly run a (NOT) NULL alteration
 943                actions += null_actions
 944            # Combine actions together if we can (e.g. postgres)
 945            if self.connection.features.supports_combined_alters and actions:
 946                sql, params = tuple(zip(*actions))
 947                actions = [(", ".join(sql), sum(params, []))]
 948            # Apply those actions
 949            for sql, params in actions:
 950                self.execute(
 951                    self.sql_alter_column
 952                    % {
 953                        "table": self.quote_name(model.model_options.db_table),
 954                        "changes": sql,
 955                    },
 956                    params,
 957                )
 958            if four_way_default_alteration:
 959                # Update existing rows with default value
 960                self.execute(
 961                    self.sql_update_with_default
 962                    % {
 963                        "table": self.quote_name(model.model_options.db_table),
 964                        "column": self.quote_name(new_field.column),
 965                        "default": "%s",
 966                    },
 967                    [new_default],
 968                )
 969                # Since we didn't run a NOT NULL change before we need to do it
 970                # now
 971                for sql, params in null_actions:
 972                    self.execute(
 973                        self.sql_alter_column
 974                        % {
 975                            "table": self.quote_name(model.model_options.db_table),
 976                            "changes": sql,
 977                        },
 978                        params,
 979                    )
 980        if post_actions:
 981            for sql, params in post_actions:
 982                self.execute(sql, params)
 983        # If primary_key changed to False, delete the primary key constraint.
 984        if old_field.primary_key and not new_field.primary_key:
 985            self._delete_primary_key(model, strict)
 986
 987        # Added an index? Add an index if db_index switched to True or a unique
 988        # constraint will no longer be used in lieu of an index. The following
 989        # lines from the truth table show all True cases; the rest are False:
 990        #
 991        # old_field.db_index | old_field.primary_key | new_field.db_index | new_field.primary_key
 992        # ------------------------------------------------------------------------------
 993        # False              | False            | True               | False
 994        # False              | True             | True               | False
 995        # True               | True             | True               | False
 996        if (
 997            (
 998                not (isinstance(old_field, ForeignKeyField) and old_field.db_index)
 999                or old_field.primary_key
1000            )
1001            and isinstance(new_field, ForeignKeyField)
1002            and new_field.db_index
1003            and not new_field.primary_key
1004        ):
1005            self.execute(self._create_index_sql(model, fields=[new_field]))
1006        # Type alteration on primary key? Then we need to alter the column
1007        # referring to us.
1008        rels_to_update = []
1009        if drop_foreign_keys:
1010            rels_to_update.extend(_related_non_m2m_objects(old_field, new_field))
1011        # Changed to become primary key?
1012        if self._field_became_primary_key(old_field, new_field):
1013            # Make the new one
1014            self.execute(self._create_primary_key_sql(model, new_field))
1015            # Update all referencing columns
1016            rels_to_update.extend(_related_non_m2m_objects(old_field, new_field))
1017        # Handle our type alters on the other end of rels from the PK stuff above
1018        for old_rel, new_rel in rels_to_update:
1019            rel_db_params = new_rel.field.db_parameters(connection=self.connection)
1020            rel_type = rel_db_params["type"]
1021            rel_collation = rel_db_params.get("collation")
1022            old_rel_db_params = old_rel.field.db_parameters(connection=self.connection)
1023            old_rel_collation = old_rel_db_params.get("collation")
1024            fragment, other_actions = self._alter_column_type_sql(
1025                new_rel.related_model,
1026                old_rel.field,
1027                new_rel.field,
1028                rel_type,
1029                old_rel_collation,
1030                rel_collation,
1031            )
1032            self.execute(
1033                self.sql_alter_column
1034                % {
1035                    "table": self.quote_name(
1036                        new_rel.related_model.model_options.db_table
1037                    ),
1038                    "changes": fragment[0],
1039                },
1040                fragment[1],
1041            )
1042            for sql, params in other_actions:
1043                self.execute(sql, params)
1044        # Does it have a foreign key?
1045        if (
1046            self.connection.features.supports_foreign_keys
1047            and isinstance(new_field, ForeignKeyField)
1048            and (
1049                fks_dropped
1050                or not isinstance(old_field, ForeignKeyField)
1051                or not old_field.db_constraint
1052            )
1053            and new_field.db_constraint
1054        ):
1055            self.execute(
1056                self._create_fk_sql(model, new_field, "_fk_%(to_table)s_%(to_column)s")
1057            )
1058        # Rebuild FKs that pointed to us if we previously had to drop them
1059        if drop_foreign_keys:
1060            for _, rel in rels_to_update:
1061                if isinstance(rel.field, ForeignKeyField) and rel.field.db_constraint:
1062                    self.execute(
1063                        self._create_fk_sql(rel.related_model, rel.field, "_fk")
1064                    )
1065        # Does it have check constraints we need to add?
1066        if old_db_check != new_db_check and new_db_check:
1067            constraint_name = self._create_index_name(
1068                model.model_options.db_table, [new_field.column], suffix="_check"
1069            )
1070            new_check = new_db_params["check"]
1071            assert new_check is not None  # Guaranteed by new_db_check check above
1072            sql = self._create_check_sql(model, constraint_name, new_check)
1073            if sql is not None:
1074                self.execute(sql)
1075        # Drop the default if we need to
1076        # (Plain usually does not use in-database defaults)
1077        if needs_database_default:
1078            changes_sql, params = self._alter_column_default_sql(
1079                model, old_field, new_field, drop=True
1080            )
1081            sql = self.sql_alter_column % {
1082                "table": self.quote_name(model.model_options.db_table),
1083                "changes": changes_sql,
1084            }
1085            self.execute(sql, params)
1086
1087    def _alter_column_null_sql(
1088        self, model: type[Model], old_field: Field, new_field: Field
1089    ) -> tuple[str, list[Any]]:
1090        """
1091        Hook to specialize column null alteration.
1092
1093        Return a (sql, params) fragment to set a column to null or non-null
1094        as required by new_field, or None if no changes are required.
1095        """
1096        new_db_params = new_field.db_parameters(connection=self.connection)
1097        sql = (
1098            self.sql_alter_column_null
1099            if new_field.allow_null
1100            else self.sql_alter_column_not_null
1101        )
1102        return (
1103            sql
1104            % {
1105                "column": self.quote_name(new_field.column),
1106                "type": new_db_params["type"],
1107            },
1108            [],
1109        )
1110
1111    def _alter_column_default_sql(
1112        self,
1113        model: type[Model],
1114        old_field: Field | None,
1115        new_field: Field,
1116        drop: bool = False,
1117    ) -> tuple[str, list[Any]]:
1118        """
1119        Hook to specialize column default alteration.
1120
1121        Return a (sql, params) fragment to add or drop (depending on the drop
1122        argument) a default to new_field's column.
1123        """
1124        new_default = self.effective_default(new_field)
1125        default = self._column_default_sql(new_field)
1126        params = [new_default]
1127
1128        if drop:
1129            params = []
1130        elif self.connection.features.requires_literal_defaults:
1131            # Some databases (Oracle) can't take defaults as a parameter
1132            # If this is the case, the SchemaEditor for that database should
1133            # implement prepare_default().
1134            default = self.prepare_default(new_default)
1135            params = []
1136
1137        new_db_params = new_field.db_parameters(connection=self.connection)
1138        if drop:
1139            if new_field.allow_null:
1140                sql = self.sql_alter_column_no_default_null
1141            else:
1142                sql = self.sql_alter_column_no_default
1143        else:
1144            sql = self.sql_alter_column_default
1145        return (
1146            sql
1147            % {
1148                "column": self.quote_name(new_field.column),
1149                "type": new_db_params["type"],
1150                "default": default,
1151            },
1152            params,
1153        )
1154
1155    def _alter_column_type_sql(
1156        self,
1157        model: type[Model],
1158        old_field: Field,
1159        new_field: Field,
1160        new_type: str,
1161        old_collation: str | None,
1162        new_collation: str | None,
1163    ) -> tuple[tuple[str, list[Any]], list[tuple[str, list[Any]]]]:
1164        """
1165        Hook to specialize column type alteration for different backends,
1166        for cases when a creation type is different to an alteration type
1167        (e.g. SERIAL in PostgreSQL, PostGIS fields).
1168
1169        Return a two-tuple of: an SQL fragment of (sql, params) to insert into
1170        an ALTER TABLE statement and a list of extra (sql, params) tuples to
1171        run once the field is altered.
1172        """
1173        other_actions = []
1174        if collate_sql := self._collate_sql(
1175            new_collation, old_collation, model.model_options.db_table
1176        ):
1177            collate_sql = f" {collate_sql}"
1178        else:
1179            collate_sql = ""
1180        # Comment change?
1181        from plain.models.fields.related import ManyToManyField
1182
1183        comment_sql = ""
1184        if self.connection.features.supports_comments and not isinstance(
1185            new_field, ManyToManyField
1186        ):
1187            if old_field.db_comment != new_field.db_comment:
1188                # PostgreSQL and Oracle can't execute 'ALTER COLUMN ...' and
1189                # 'COMMENT ON ...' at the same time.
1190                sql, params = self._alter_column_comment_sql(
1191                    model, new_field, new_type, new_field.db_comment
1192                )
1193                if sql:
1194                    other_actions.append((sql, params))
1195            if new_field.db_comment:
1196                comment_sql = self._comment_sql(new_field.db_comment)
1197        return (
1198            (
1199                self.sql_alter_column_type
1200                % {
1201                    "column": self.quote_name(new_field.column),
1202                    "type": new_type,
1203                    "collation": collate_sql,
1204                    "comment": comment_sql,
1205                },
1206                [],
1207            ),
1208            other_actions,
1209        )
1210
1211    def _alter_column_comment_sql(
1212        self,
1213        model: type[Model],
1214        new_field: Field,
1215        new_type: str,
1216        new_db_comment: str | None,
1217    ) -> tuple[str, list[Any]]:
1218        return (
1219            self.sql_alter_column_comment
1220            % {
1221                "table": self.quote_name(model.model_options.db_table),
1222                "column": self.quote_name(new_field.column),
1223                "comment": self._comment_sql(new_db_comment),
1224            },
1225            [],
1226        )
1227
1228    def _comment_sql(self, comment: str | None) -> str:
1229        return self.quote_value(comment or "")
1230
1231    def _alter_many_to_many(
1232        self,
1233        model: type[Model],
1234        old_field: ManyToManyField,
1235        new_field: ManyToManyField,
1236        strict: bool,
1237    ) -> None:
1238        """Alter M2Ms to repoint their to= endpoints."""
1239        # Type narrow for ManyToManyField.remote_field
1240        old_rel: ManyToManyRel = old_field.remote_field
1241        new_rel: ManyToManyRel = new_field.remote_field
1242
1243        # Rename the through table
1244        if (
1245            old_rel.through.model_options.db_table
1246            != new_rel.through.model_options.db_table
1247        ):
1248            self.alter_db_table(
1249                old_rel.through,
1250                old_rel.through.model_options.db_table,
1251                new_rel.through.model_options.db_table,
1252            )
1253        # Repoint the FK to the other side
1254        old_reverse_field = old_rel.through._model_meta.get_forward_field(
1255            old_field.m2m_reverse_field_name()
1256        )
1257        new_reverse_field = new_rel.through._model_meta.get_forward_field(
1258            new_field.m2m_reverse_field_name()
1259        )
1260        self.alter_field(
1261            new_rel.through,
1262            # The field that points to the target model is needed, so we can
1263            # tell alter_field to change it - this is m2m_reverse_field_name()
1264            # (as opposed to m2m_field_name(), which points to our model).
1265            old_reverse_field,
1266            new_reverse_field,
1267        )
1268        old_m2m_field = old_rel.through._model_meta.get_forward_field(
1269            old_field.m2m_field_name()
1270        )
1271        new_m2m_field = new_rel.through._model_meta.get_forward_field(
1272            new_field.m2m_field_name()
1273        )
1274        self.alter_field(
1275            new_rel.through,
1276            # for self-referential models we need to alter field from the other end too
1277            old_m2m_field,
1278            new_m2m_field,
1279        )
1280
1281    def _create_index_name(
1282        self, table_name: str, column_names: list[str], suffix: str = ""
1283    ) -> str:
1284        """
1285        Generate a unique name for an index/unique constraint.
1286
1287        The name is divided into 3 parts: the table name, the column names,
1288        and a unique digest and suffix.
1289        """
1290        _, table_name = split_identifier(table_name)
1291        hash_suffix_part = (
1292            f"{names_digest(table_name, *column_names, length=8)}{suffix}"
1293        )
1294        max_length = self.connection.ops.max_name_length() or 200
1295        # If everything fits into max_length, use that name.
1296        index_name = "{}_{}_{}".format(
1297            table_name, "_".join(column_names), hash_suffix_part
1298        )
1299        if len(index_name) <= max_length:
1300            return index_name
1301        # Shorten a long suffix.
1302        if len(hash_suffix_part) > max_length / 3:
1303            hash_suffix_part = hash_suffix_part[: max_length // 3]
1304        other_length = (max_length - len(hash_suffix_part)) // 2 - 1
1305        index_name = "{}_{}_{}".format(
1306            table_name[:other_length],
1307            "_".join(column_names)[:other_length],
1308            hash_suffix_part,
1309        )
1310        # Prepend D if needed to prevent the name from starting with an
1311        # underscore or a number (not permitted on Oracle).
1312        if index_name[0] == "_" or index_name[0].isdigit():
1313            index_name = f"D{index_name[:-1]}"
1314        return index_name
1315
1316    def _index_condition_sql(self, condition: str | None) -> str:
1317        if condition:
1318            return " WHERE " + condition
1319        return ""
1320
1321    def _index_include_sql(
1322        self, model: type[Model], columns: list[str] | None
1323    ) -> str | Statement:
1324        if not columns or not self.connection.features.supports_covering_indexes:
1325            return ""
1326        return Statement(
1327            " INCLUDE (%(columns)s)",
1328            columns=Columns(model.model_options.db_table, columns, self.quote_name),
1329        )
1330
1331    def _create_index_sql(
1332        self,
1333        model: type[Model],
1334        *,
1335        fields: list[Field] | None = None,
1336        name: str | None = None,
1337        suffix: str = "",
1338        using: str = "",
1339        col_suffixes: tuple[str, ...] = (),
1340        sql: str | None = None,
1341        opclasses: tuple[str, ...] = (),
1342        condition: str | None = None,
1343        include: list[str] | None = None,
1344        expressions: Any = None,
1345    ) -> Statement:
1346        """
1347        Return the SQL statement to create the index for one or several fields
1348        or expressions. `sql` can be specified if the syntax differs from the
1349        standard (GIS indexes, ...).
1350        """
1351        fields = fields or []
1352        expressions = expressions or []
1353        compiler = Query(model, alias_cols=False).get_compiler()
1354        columns = [field.column for field in fields]
1355        sql_create_index = sql or self.sql_create_index
1356        table = model.model_options.db_table
1357
1358        def create_index_name(*args: Any, **kwargs: Any) -> str:
1359            nonlocal name
1360            if name is None:
1361                name = self._create_index_name(*args, **kwargs)
1362            return self.quote_name(name)
1363
1364        return Statement(
1365            sql_create_index,
1366            table=Table(table, self.quote_name),
1367            name=IndexName(table, columns, suffix, create_index_name),
1368            using=using,
1369            columns=(
1370                self._index_columns(table, columns, col_suffixes, opclasses)
1371                if columns
1372                else Expressions(table, expressions, compiler, self.quote_value)
1373            ),
1374            extra="",
1375            condition=self._index_condition_sql(condition),
1376            include=self._index_include_sql(model, include),
1377        )
1378
1379    def _delete_index_sql(
1380        self, model: type[Model], name: str, sql: str | None = None
1381    ) -> Statement:
1382        return Statement(
1383            sql or self.sql_delete_index,
1384            table=Table(model.model_options.db_table, self.quote_name),
1385            name=self.quote_name(name),
1386        )
1387
1388    def _rename_index_sql(
1389        self, model: type[Model], old_name: str, new_name: str
1390    ) -> Statement:
1391        return Statement(
1392            self.sql_rename_index,
1393            table=Table(model.model_options.db_table, self.quote_name),
1394            old_name=self.quote_name(old_name),
1395            new_name=self.quote_name(new_name),
1396        )
1397
1398    def _index_columns(
1399        self,
1400        table: str,
1401        columns: list[str],
1402        col_suffixes: tuple[str, ...],
1403        opclasses: tuple[str, ...],
1404    ) -> Columns:
1405        return Columns(table, columns, self.quote_name, col_suffixes=col_suffixes)
1406
1407    def _model_indexes_sql(self, model: type[Model]) -> list[Statement | None]:
1408        """
1409        Return a list of all index SQL statements (field indexes, Meta.indexes) for the specified model.
1410        """
1411        output: list[Statement | None] = []
1412        for field in model._model_meta.local_fields:
1413            output.extend(self._field_indexes_sql(model, field))
1414
1415        for index in model.model_options.indexes:
1416            if (
1417                not index.contains_expressions
1418                or self.connection.features.supports_expression_indexes
1419            ):
1420                output.append(index.create_sql(model, self))
1421        return output
1422
1423    def _field_indexes_sql(self, model: type[Model], field: Field) -> list[Statement]:
1424        """
1425        Return a list of all index SQL statements for the specified field.
1426        """
1427        output: list[Statement] = []
1428        if self._field_should_be_indexed(model, field):
1429            output.append(self._create_index_sql(model, fields=[field]))
1430        return output
1431
1432    def _field_should_be_altered(
1433        self, old_field: Field, new_field: Field, ignore: set[str] | None = None
1434    ) -> bool:
1435        ignore = ignore or set()
1436        _, old_path, old_args, old_kwargs = old_field.deconstruct()
1437        _, new_path, new_args, new_kwargs = new_field.deconstruct()
1438        # Don't alter when:
1439        # - changing only a field name
1440        # - changing an attribute that doesn't affect the schema
1441        # - changing an attribute in the provided set of ignored attributes
1442        # - adding only a db_column and the column name is not changed
1443        for attr in ignore.union(old_field.non_db_attrs):
1444            old_kwargs.pop(attr, None)
1445        for attr in ignore.union(new_field.non_db_attrs):
1446            new_kwargs.pop(attr, None)
1447        return self.quote_name(old_field.column) != self.quote_name(
1448            new_field.column
1449        ) or (old_path, old_args, old_kwargs) != (new_path, new_args, new_kwargs)
1450
1451    def _field_should_be_indexed(self, model: type[Model], field: Field) -> bool:
1452        if isinstance(field, ForeignKeyField):
1453            return bool(field.remote_field) and field.db_index and not field.primary_key
1454        return False
1455
1456    def _field_became_primary_key(self, old_field: Field, new_field: Field) -> bool:
1457        return not old_field.primary_key and new_field.primary_key
1458
1459    def _rename_field_sql(
1460        self, table: str, old_field: Field, new_field: Field, new_type: str
1461    ) -> str:
1462        return self.sql_rename_column % {
1463            "table": self.quote_name(table),
1464            "old_column": self.quote_name(old_field.column),
1465            "new_column": self.quote_name(new_field.column),
1466            "type": new_type,
1467        }
1468
1469    def _create_fk_sql(
1470        self, model: type[Model], field: ForeignKeyField, suffix: str
1471    ) -> Statement:
1472        table = Table(model.model_options.db_table, self.quote_name)
1473        name = self._fk_constraint_name(model, field, suffix)
1474        column = Columns(model.model_options.db_table, [field.column], self.quote_name)
1475        to_table = Table(
1476            field.target_field.model.model_options.db_table, self.quote_name
1477        )
1478        to_column = Columns(
1479            field.target_field.model.model_options.db_table,
1480            [field.target_field.column],
1481            self.quote_name,
1482        )
1483        deferrable = self.connection.ops.deferrable_sql()
1484        return Statement(
1485            self.sql_create_fk,
1486            table=table,
1487            name=name,
1488            column=column,
1489            to_table=to_table,
1490            to_column=to_column,
1491            deferrable=deferrable,
1492        )
1493
1494    def _fk_constraint_name(
1495        self, model: type[Model], field: ForeignKeyField, suffix: str
1496    ) -> ForeignKeyName:
1497        def create_fk_name(*args: Any, **kwargs: Any) -> str:
1498            return self.quote_name(self._create_index_name(*args, **kwargs))
1499
1500        return ForeignKeyName(
1501            model.model_options.db_table,
1502            [field.column],
1503            split_identifier(field.target_field.model.model_options.db_table)[1],
1504            [field.target_field.column],
1505            suffix,
1506            create_fk_name,
1507        )
1508
1509    def _delete_fk_sql(self, model: type[Model], name: str) -> Statement:
1510        return self._delete_constraint_sql(self.sql_delete_fk, model, name)
1511
1512    def _deferrable_constraint_sql(self, deferrable: Deferrable | None) -> str:
1513        if deferrable is None:
1514            return ""
1515        if deferrable == Deferrable.DEFERRED:
1516            return " DEFERRABLE INITIALLY DEFERRED"
1517        if deferrable == Deferrable.IMMEDIATE:
1518            return " DEFERRABLE INITIALLY IMMEDIATE"
1519        return ""
1520
1521    def _unique_sql(
1522        self,
1523        model: type[Model],
1524        fields: Iterable[Field],
1525        name: str,
1526        condition: str | None = None,
1527        deferrable: Deferrable | None = None,
1528        include: list[str] | None = None,
1529        opclasses: tuple[str, ...] | None = None,
1530        expressions: Any = None,
1531    ) -> str | None:
1532        if (
1533            deferrable
1534            and not self.connection.features.supports_deferrable_unique_constraints
1535        ):
1536            return None
1537        if condition or include or opclasses or expressions:
1538            # Databases support conditional, covering, and functional unique
1539            # constraints via a unique index.
1540            sql = self._create_unique_sql(
1541                model,
1542                fields,
1543                name=name,
1544                condition=condition,
1545                include=include,
1546                opclasses=opclasses,
1547                expressions=expressions,
1548            )
1549            if sql:
1550                self.deferred_sql.append(sql)
1551            return None
1552        constraint = self.sql_unique_constraint % {
1553            "columns": ", ".join([self.quote_name(field.column) for field in fields]),
1554            "deferrable": self._deferrable_constraint_sql(deferrable),
1555        }
1556        return self.sql_constraint % {
1557            "name": self.quote_name(name),
1558            "constraint": constraint,
1559        }
1560
1561    def _create_unique_sql(
1562        self,
1563        model: type[Model],
1564        fields: Iterable[Field],
1565        name: str | None = None,
1566        condition: str | None = None,
1567        deferrable: Deferrable | None = None,
1568        include: list[str] | None = None,
1569        opclasses: tuple[str, ...] | None = None,
1570        expressions: Any = None,
1571    ) -> Statement | None:
1572        if (
1573            (
1574                deferrable
1575                and not self.connection.features.supports_deferrable_unique_constraints
1576            )
1577            or (condition and not self.connection.features.supports_partial_indexes)
1578            or (include and not self.connection.features.supports_covering_indexes)
1579            or (
1580                expressions and not self.connection.features.supports_expression_indexes
1581            )
1582        ):
1583            return None
1584
1585        compiler = Query(model, alias_cols=False).get_compiler()
1586        table = model.model_options.db_table
1587        columns = [field.column for field in fields]
1588        constraint_name: IndexName | str
1589        if name is None:
1590            constraint_name = self._unique_constraint_name(table, columns, quote=True)
1591        else:
1592            constraint_name = self.quote_name(name)
1593        if condition or include or opclasses or expressions:
1594            sql = self.sql_create_unique_index
1595        else:
1596            sql = self.sql_create_unique
1597        if columns:
1598            columns_obj: Columns | Expressions = self._index_columns(
1599                table, columns, col_suffixes=(), opclasses=opclasses or ()
1600            )
1601        else:
1602            columns_obj = Expressions(table, expressions, compiler, self.quote_value)
1603        return Statement(
1604            sql,
1605            table=Table(table, self.quote_name),
1606            name=constraint_name,
1607            columns=columns_obj,
1608            condition=self._index_condition_sql(condition),
1609            deferrable=self._deferrable_constraint_sql(deferrable),
1610            include=self._index_include_sql(model, include),
1611        )
1612
1613    def _unique_constraint_name(
1614        self, table: str, columns: list[str], quote: bool = True
1615    ) -> IndexName | str:
1616        if quote:
1617
1618            def create_unique_name(*args: Any, **kwargs: Any) -> str:
1619                return self.quote_name(self._create_index_name(*args, **kwargs))
1620
1621        else:
1622            create_unique_name = self._create_index_name
1623
1624        return IndexName(table, columns, "_uniq", create_unique_name)
1625
1626    def _delete_unique_sql(
1627        self,
1628        model: type[Model],
1629        name: str,
1630        condition: str | None = None,
1631        deferrable: Deferrable | None = None,
1632        include: list[str] | None = None,
1633        opclasses: tuple[str, ...] | None = None,
1634        expressions: Any = None,
1635    ) -> Statement | None:
1636        if (
1637            (
1638                deferrable
1639                and not self.connection.features.supports_deferrable_unique_constraints
1640            )
1641            or (condition and not self.connection.features.supports_partial_indexes)
1642            or (include and not self.connection.features.supports_covering_indexes)
1643            or (
1644                expressions and not self.connection.features.supports_expression_indexes
1645            )
1646        ):
1647            return None
1648        if condition or include or opclasses or expressions:
1649            sql = self.sql_delete_index
1650        else:
1651            sql = self.sql_delete_unique
1652        return self._delete_constraint_sql(sql, model, name)
1653
1654    def _check_sql(self, name: str, check: str) -> str:
1655        return self.sql_constraint % {
1656            "name": self.quote_name(name),
1657            "constraint": self.sql_check_constraint % {"check": check},
1658        }
1659
1660    def _create_check_sql(
1661        self, model: type[Model], name: str, check: str
1662    ) -> Statement | None:
1663        if not self.connection.features.supports_table_check_constraints:
1664            return None
1665        return Statement(
1666            self.sql_create_check,
1667            table=Table(model.model_options.db_table, self.quote_name),
1668            name=self.quote_name(name),
1669            check=check,
1670        )
1671
1672    def _delete_check_sql(self, model: type[Model], name: str) -> Statement | None:
1673        if not self.connection.features.supports_table_check_constraints:
1674            return None
1675        return self._delete_constraint_sql(self.sql_delete_check, model, name)
1676
1677    def _delete_constraint_sql(
1678        self, template: str, model: type[Model], name: str
1679    ) -> Statement:
1680        return Statement(
1681            template,
1682            table=Table(model.model_options.db_table, self.quote_name),
1683            name=self.quote_name(name),
1684        )
1685
1686    def _constraint_names(
1687        self,
1688        model: type[Model],
1689        column_names: list[str] | None = None,
1690        unique: bool | None = None,
1691        primary_key: bool | None = None,
1692        index: bool | None = None,
1693        foreign_key: bool | None = None,
1694        check: bool | None = None,
1695        type_: str | None = None,
1696        exclude: set[str] | None = None,
1697    ) -> list[str]:
1698        """Return all constraint names matching the columns and conditions."""
1699        if column_names is not None:
1700            column_names = [
1701                self.connection.introspection.identifier_converter(
1702                    truncate_name(name, self.connection.ops.max_name_length())
1703                )
1704                if self.connection.features.truncates_names
1705                else self.connection.introspection.identifier_converter(name)
1706                for name in column_names
1707            ]
1708        with self.connection.cursor() as cursor:
1709            constraints = self.connection.introspection.get_constraints(
1710                cursor, model.model_options.db_table
1711            )
1712        result: list[str] = []
1713        for name, infodict in constraints.items():
1714            if column_names is None or column_names == infodict["columns"]:
1715                if unique is not None and infodict["unique"] != unique:
1716                    continue
1717                if primary_key is not None and infodict["primary_key"] != primary_key:
1718                    continue
1719                if index is not None and infodict["index"] != index:
1720                    continue
1721                if check is not None and infodict["check"] != check:
1722                    continue
1723                if foreign_key is not None and not infodict["foreign_key"]:
1724                    continue
1725                if type_ is not None and infodict["type"] != type_:
1726                    continue
1727                if not exclude or name not in exclude:
1728                    result.append(name)
1729        return result
1730
1731    def _delete_primary_key(self, model: type[Model], strict: bool = False) -> None:
1732        constraint_names = self._constraint_names(model, primary_key=True)
1733        if strict and len(constraint_names) != 1:
1734            raise ValueError(
1735                f"Found wrong number ({len(constraint_names)}) of PK constraints for {model.model_options.db_table}"
1736            )
1737        for constraint_name in constraint_names:
1738            self.execute(self._delete_primary_key_sql(model, constraint_name))
1739
1740    def _create_primary_key_sql(self, model: type[Model], field: Field) -> Statement:
1741        return Statement(
1742            self.sql_create_pk,
1743            table=Table(model.model_options.db_table, self.quote_name),
1744            name=self.quote_name(
1745                self._create_index_name(
1746                    model.model_options.db_table, [field.column], suffix="_pk"
1747                )
1748            ),
1749            columns=Columns(
1750                model.model_options.db_table, [field.column], self.quote_name
1751            ),
1752        )
1753
1754    def _delete_primary_key_sql(self, model: type[Model], name: str) -> Statement:
1755        return self._delete_constraint_sql(self.sql_delete_pk, model, name)
1756
1757    def _collate_sql(
1758        self,
1759        collation: str | None,
1760        old_collation: str | None = None,
1761        table_name: str | None = None,
1762    ) -> str:
1763        return "COLLATE " + self.quote_name(collation) if collation else ""