1from __future__ import annotations
   2
   3import logging
   4import operator
   5from abc import ABC, abstractmethod
   6from collections.abc import Generator
   7from datetime import datetime
   8from typing import TYPE_CHECKING, Any
   9
  10if TYPE_CHECKING:
  11    from typing import Self
  12
  13from plain.models.backends.ddl_references import (
  14    Columns,
  15    Expressions,
  16    ForeignKeyName,
  17    IndexName,
  18    Statement,
  19    Table,
  20)
  21from plain.models.backends.utils import names_digest, split_identifier, truncate_name
  22from plain.models.constraints import Deferrable
  23from plain.models.fields import DbParameters, Field
  24from plain.models.fields.related import ForeignKeyField, RelatedField
  25from plain.models.fields.reverse_related import ForeignObjectRel, ManyToManyRel
  26from plain.models.indexes import Index
  27from plain.models.sql.query import Query
  28from plain.models.transaction import TransactionManagementError, atomic
  29from plain.utils import timezone
  30
  31if TYPE_CHECKING:
  32    from collections.abc import Iterable
  33
  34    from plain.models.backends.base.base import BaseDatabaseWrapper
  35    from plain.models.base import Model
  36    from plain.models.constraints import BaseConstraint
  37    from plain.models.fields import Field
  38    from plain.models.fields.related import ForeignKeyField, ManyToManyField
  39    from plain.models.fields.reverse_related import ManyToManyRel
  40
  41logger = logging.getLogger("plain.models.backends.schema")
  42
  43
  44def _is_relevant_relation(relation: ForeignObjectRel, altered_field: Field) -> bool:
  45    """
  46    When altering the given field, must constraints on its model from the given
  47    relation be temporarily dropped?
  48    """
  49    from plain.models.fields.related import ManyToManyField
  50
  51    field = relation.field
  52    if isinstance(field, ManyToManyField):
  53        # M2M reverse field
  54        return False
  55    if altered_field.primary_key:
  56        # Foreign key constraint on the primary key, which is being altered.
  57        return True
  58    # ForeignKeyField always targets 'id'
  59    return altered_field.name == "id"
  60
  61
  62def _all_related_fields(model: type[Model]) -> list[ForeignObjectRel]:
  63    # Related fields must be returned in a deterministic order.
  64    return sorted(
  65        model._model_meta._get_fields(
  66            forward=False,
  67            reverse=True,
  68        ),
  69        key=operator.attrgetter("name"),
  70    )
  71
  72
  73def _related_non_m2m_objects(
  74    old_field: Field, new_field: Field
  75) -> Generator[tuple[ForeignObjectRel, ForeignObjectRel], None, None]:
  76    # Filter out m2m objects from reverse relations.
  77    # Return (old_relation, new_relation) tuples.
  78    related_fields = zip(
  79        (
  80            obj
  81            for obj in _all_related_fields(old_field.model)
  82            if _is_relevant_relation(obj, old_field)
  83        ),
  84        (
  85            obj
  86            for obj in _all_related_fields(new_field.model)
  87            if _is_relevant_relation(obj, new_field)
  88        ),
  89    )
  90    for old_rel, new_rel in related_fields:
  91        yield old_rel, new_rel
  92        yield from _related_non_m2m_objects(
  93            old_rel.remote_field,
  94            new_rel.remote_field,
  95        )
  96
  97
  98class BaseDatabaseSchemaEditor(ABC):
  99    """
 100    This class and its subclasses are responsible for emitting schema-changing
 101    statements to the databases - model creation/removal/alteration, field
 102    renaming, index fiddling, and so on.
 103    """
 104
 105    # Overrideable SQL templates
 106    sql_create_table = "CREATE TABLE %(table)s (%(definition)s)"
 107    sql_rename_table = "ALTER TABLE %(old_table)s RENAME TO %(new_table)s"
 108    sql_delete_table = "DROP TABLE %(table)s CASCADE"
 109
 110    sql_create_column = "ALTER TABLE %(table)s ADD COLUMN %(column)s %(definition)s"
 111    sql_alter_column = "ALTER TABLE %(table)s %(changes)s"
 112    sql_alter_column_type = "ALTER COLUMN %(column)s TYPE %(type)s"
 113    sql_alter_column_null = "ALTER COLUMN %(column)s DROP NOT NULL"
 114    sql_alter_column_not_null = "ALTER COLUMN %(column)s SET NOT NULL"
 115    sql_alter_column_default = "ALTER COLUMN %(column)s SET DEFAULT %(default)s"
 116    sql_alter_column_no_default = "ALTER COLUMN %(column)s DROP DEFAULT"
 117    sql_alter_column_no_default_null = sql_alter_column_no_default
 118    sql_delete_column = "ALTER TABLE %(table)s DROP COLUMN %(column)s CASCADE"
 119    sql_rename_column = (
 120        "ALTER TABLE %(table)s RENAME COLUMN %(old_column)s TO %(new_column)s"
 121    )
 122    sql_update_with_default = (
 123        "UPDATE %(table)s SET %(column)s = %(default)s WHERE %(column)s IS NULL"
 124    )
 125
 126    sql_unique_constraint = "UNIQUE (%(columns)s)%(deferrable)s"
 127    sql_check_constraint = "CHECK (%(check)s)"
 128    sql_delete_constraint = "ALTER TABLE %(table)s DROP CONSTRAINT %(name)s"
 129    sql_constraint = "CONSTRAINT %(name)s %(constraint)s"
 130
 131    sql_create_check = "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s CHECK (%(check)s)"
 132    sql_delete_check = sql_delete_constraint
 133
 134    sql_create_unique = (
 135        "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s "
 136        "UNIQUE (%(columns)s)%(deferrable)s"
 137    )
 138    sql_delete_unique = sql_delete_constraint
 139
 140    sql_create_fk = (
 141        "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s FOREIGN KEY (%(column)s) "
 142        "REFERENCES %(to_table)s (%(to_column)s)%(deferrable)s"
 143    )
 144    sql_create_inline_fk = None
 145    sql_create_column_inline_fk = None
 146    sql_delete_fk = sql_delete_constraint
 147
 148    sql_create_index = (
 149        "CREATE INDEX %(name)s ON %(table)s "
 150        "(%(columns)s)%(include)s%(extra)s%(condition)s"
 151    )
 152    sql_create_unique_index = (
 153        "CREATE UNIQUE INDEX %(name)s ON %(table)s "
 154        "(%(columns)s)%(include)s%(condition)s"
 155    )
 156    sql_rename_index = "ALTER INDEX %(old_name)s RENAME TO %(new_name)s"
 157    sql_delete_index = "DROP INDEX %(name)s"
 158
 159    sql_create_pk = (
 160        "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s PRIMARY KEY (%(columns)s)"
 161    )
 162    sql_delete_pk = sql_delete_constraint
 163
 164    def __init__(
 165        self,
 166        connection: BaseDatabaseWrapper,
 167        atomic: bool = True,
 168    ):
 169        self.connection = connection
 170        self.atomic_migration = self.connection.features.can_rollback_ddl and atomic
 171
 172    # State-managing methods
 173
 174    def __enter__(self) -> Self:
 175        self.deferred_sql: list[Any] = []
 176        self.executed_sql: list[str] = []
 177        if self.atomic_migration:
 178            self.atomic = atomic()
 179            self.atomic.__enter__()
 180        return self
 181
 182    def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
 183        if exc_type is None:
 184            for sql in self.deferred_sql:
 185                self.execute(sql)
 186        if self.atomic_migration:
 187            self.atomic.__exit__(exc_type, exc_value, traceback)
 188
 189    # Core utility functions
 190
 191    def execute(
 192        self, sql: str | Statement, params: tuple[Any, ...] | list[Any] | None = ()
 193    ) -> None:
 194        """Execute the given SQL statement, with optional parameters."""
 195        if (
 196            self.connection.in_atomic_block
 197            and not self.connection.features.can_rollback_ddl
 198        ):
 199            raise TransactionManagementError(
 200                "Executing DDL statements while in a transaction on databases "
 201                "that can't perform a rollback is prohibited."
 202            )
 203        # Account for non-string statement objects.
 204        sql = str(sql)
 205        # Log the command we're running, then run it
 206        logger.debug(
 207            "%s; (params %r)", sql, params, extra={"params": params, "sql": sql}
 208        )
 209
 210        # Track executed SQL for display in migration output
 211        # Store the SQL for display (interpolate params for readability)
 212        if params:
 213            self.executed_sql.append(sql % tuple(map(self.quote_value, params)))
 214        else:
 215            self.executed_sql.append(sql)
 216
 217        with self.connection.cursor() as cursor:
 218            cursor.execute(sql, params)
 219
 220    def quote_name(self, name: str) -> str:
 221        return self.connection.ops.quote_name(name)
 222
 223    def table_sql(self, model: type[Model]) -> tuple[str, list[Any]]:
 224        """Take a model and return its table definition."""
 225        # Create column SQL, add FK deferreds if needed.
 226        column_sqls = []
 227        params = []
 228        for field in model._model_meta.local_fields:
 229            # SQL.
 230            definition, extra_params = self.column_sql(model, field)
 231            if definition is None:
 232                continue
 233            # Check constraints can go on the column SQL here.
 234            db_params = field.db_parameters(connection=self.connection)
 235            if db_params["check"]:
 236                definition += " " + self.sql_check_constraint % db_params
 237            # Autoincrement SQL (for backends with inline variant).
 238            col_type_suffix = field.db_type_suffix(connection=self.connection)
 239            if col_type_suffix:
 240                definition += f" {col_type_suffix}"
 241            if extra_params:
 242                params.extend(extra_params)
 243            # FK.
 244            if isinstance(field, ForeignKeyField) and field.db_constraint:
 245                to_table = field.remote_field.model.model_options.db_table
 246                field_name = field.remote_field.field_name
 247                if field_name is None:
 248                    raise ValueError("Foreign key field_name cannot be None")
 249                to_field = field.remote_field.model._model_meta.get_forward_field(
 250                    field_name
 251                )
 252                to_column = to_field.column
 253                if self.sql_create_inline_fk:
 254                    definition += " " + self.sql_create_inline_fk % {
 255                        "to_table": self.quote_name(to_table),
 256                        "to_column": self.quote_name(to_column),
 257                    }
 258                elif self.connection.features.supports_foreign_keys:
 259                    self.deferred_sql.append(
 260                        self._create_fk_sql(
 261                            model, field, "_fk_%(to_table)s_%(to_column)s"
 262                        )
 263                    )
 264            # Add the SQL to our big list.
 265            column_sqls.append(f"{self.quote_name(field.column)} {definition}")
 266        constraints = [
 267            constraint.constraint_sql(model, self)
 268            for constraint in model.model_options.constraints
 269        ]
 270        sql = self.sql_create_table % {
 271            "table": self.quote_name(model.model_options.db_table),
 272            "definition": ", ".join(
 273                str(constraint)
 274                for constraint in (*column_sqls, *constraints)
 275                if constraint
 276            ),
 277        }
 278        return sql, params
 279
 280    # Field <-> database mapping functions
 281
 282    def _iter_column_sql(
 283        self,
 284        column_db_type: str,
 285        params: list[Any],
 286        model: type[Model],
 287        field: Field,
 288        field_db_params: DbParameters,
 289        include_default: bool,
 290    ) -> Generator[str, None, None]:
 291        yield column_db_type
 292        # Work out nullability.
 293        null = field.allow_null
 294        # Include a default value, if requested.
 295        include_default = (
 296            include_default
 297            and not self.skip_default(field)
 298            and
 299            # Don't include a default value if it's a nullable field and the
 300            # default cannot be dropped in the ALTER COLUMN statement (e.g.
 301            # MySQL longtext and longblob).
 302            not (null and self.skip_default_on_alter(field))
 303        )
 304        if include_default:
 305            default_value = self.effective_default(field)
 306            if default_value is not None:
 307                column_default = "DEFAULT " + self._column_default_sql(field)
 308                if self.connection.features.requires_literal_defaults:
 309                    # Some databases can't take defaults as a parameter (Oracle).
 310                    # If this is the case, the individual schema backend should
 311                    # implement prepare_default().
 312                    yield column_default % self.prepare_default(default_value)
 313                else:
 314                    yield column_default
 315                    params.append(default_value)
 316
 317        if not null:
 318            yield "NOT NULL"
 319        else:
 320            yield "NULL"
 321
 322        if field.primary_key:
 323            yield "PRIMARY KEY"
 324
 325    def column_sql(
 326        self, model: type[Model], field: Field, include_default: bool = False
 327    ) -> tuple[str | None, list[Any] | None]:
 328        """
 329        Return the column definition for a field. The field must already have
 330        had set_attributes_from_name() called.
 331        """
 332        # Get the column's type and use that as the basis of the SQL.
 333        field_db_params = field.db_parameters(connection=self.connection)
 334        column_db_type = field_db_params["type"]
 335        # Check for fields that aren't actually columns (e.g. M2M).
 336        if column_db_type is None:
 337            return None, None
 338        params: list[Any] = []
 339        return (
 340            " ".join(
 341                # This appends to the params being returned.
 342                self._iter_column_sql(
 343                    column_db_type,
 344                    params,
 345                    model,
 346                    field,
 347                    field_db_params,
 348                    include_default,
 349                )
 350            ),
 351            params,
 352        )
 353
 354    def skip_default(self, field: Field) -> bool:
 355        """
 356        Some backends don't accept default values for certain columns types
 357        (i.e. MySQL longtext and longblob).
 358        """
 359        return False
 360
 361    def skip_default_on_alter(self, field: Field) -> bool:
 362        """
 363        Some backends don't accept default values for certain columns types
 364        (i.e. MySQL longtext and longblob) in the ALTER COLUMN statement.
 365        """
 366        return False
 367
 368    def prepare_default(self, value: Any) -> str:
 369        """
 370        Only used for backends which have requires_literal_defaults feature
 371        """
 372        raise NotImplementedError(
 373            "subclasses of BaseDatabaseSchemaEditor for backends which have "
 374            "requires_literal_defaults must provide a prepare_default() method"
 375        )
 376
 377    def _column_default_sql(self, field: Field) -> str:
 378        """
 379        Return the SQL to use in a DEFAULT clause. The resulting string should
 380        contain a '%s' placeholder for a default value.
 381        """
 382        return "%s"
 383
 384    @staticmethod
 385    def _effective_default(field: Field) -> Any:
 386        # This method allows testing its logic without a connection.
 387        if field.has_default():
 388            default = field.get_default()
 389        elif (
 390            not field.allow_null and not field.required and field.empty_strings_allowed
 391        ):
 392            if field.get_internal_type() == "BinaryField":
 393                default = b""
 394            else:
 395                default = ""
 396        elif getattr(field, "auto_now", False) or getattr(field, "auto_now_add", False):
 397            internal_type = field.get_internal_type()
 398            if internal_type == "DateTimeField":
 399                default = timezone.now()
 400            else:
 401                default = datetime.now()
 402                if internal_type == "DateField":
 403                    default = default.date()
 404                elif internal_type == "TimeField":
 405                    default = default.time()
 406        else:
 407            default = None
 408        return default
 409
 410    def effective_default(self, field: Field) -> Any:
 411        """Return a field's effective database default value."""
 412        return field.get_db_prep_save(self._effective_default(field), self.connection)
 413
 414    @abstractmethod
 415    def quote_value(self, value: Any) -> str:
 416        """
 417        Return a quoted version of the value so it's safe to use in an SQL
 418        string. This is not safe against injection from user code; it is
 419        intended only for use in making SQL scripts or preparing default values
 420        for particularly tricky backends (defaults are not user-defined, though,
 421        so this is safe).
 422        """
 423        ...
 424
 425    # Actions
 426
 427    def create_model(self, model: type[Model]) -> None:
 428        """
 429        Create a table and any accompanying indexes or unique constraints for
 430        the given `model`.
 431        """
 432        sql, params = self.table_sql(model)
 433        # Prevent using [] as params, in the case a literal '%' is used in the
 434        # definition.
 435        self.execute(sql, params or None)
 436
 437        # Add any field index (deferred as SQLite _remake_table needs it).
 438        self.deferred_sql.extend(self._model_indexes_sql(model))
 439
 440    def delete_model(self, model: type[Model]) -> None:
 441        """Delete a model from the database."""
 442
 443        # Delete the table
 444        self.execute(
 445            self.sql_delete_table
 446            % {
 447                "table": self.quote_name(model.model_options.db_table),
 448            }
 449        )
 450        # Remove all deferred statements referencing the deleted table.
 451        for sql in list(self.deferred_sql):
 452            if isinstance(sql, Statement) and sql.references_table(
 453                model.model_options.db_table
 454            ):
 455                self.deferred_sql.remove(sql)
 456
 457    def add_index(self, model: type[Model], index: Index) -> None:
 458        """Add an index on a model."""
 459        if (
 460            index.contains_expressions
 461            and not self.connection.features.supports_expression_indexes
 462        ):
 463            return None
 464        # Index.create_sql returns interpolated SQL which makes params=None a
 465        # necessity to avoid escaping attempts on execution.
 466        self.execute(index.create_sql(model, self), params=None)
 467
 468    def remove_index(self, model: type[Model], index: Index) -> None:
 469        """Remove an index from a model."""
 470        if (
 471            index.contains_expressions
 472            and not self.connection.features.supports_expression_indexes
 473        ):
 474            return None
 475        self.execute(index.remove_sql(model, self))
 476
 477    def rename_index(
 478        self, model: type[Model], old_index: Index, new_index: Index
 479    ) -> None:
 480        if self.connection.features.can_rename_index:
 481            self.execute(
 482                self._rename_index_sql(model, old_index.name, new_index.name),
 483                params=None,
 484            )
 485        else:
 486            self.remove_index(model, old_index)
 487            self.add_index(model, new_index)
 488
 489    def add_constraint(self, model: type[Model], constraint: BaseConstraint) -> None:
 490        """Add a constraint to a model."""
 491        sql = constraint.create_sql(model, self)
 492        if sql:
 493            # Constraint.create_sql returns interpolated SQL which makes
 494            # params=None a necessity to avoid escaping attempts on execution.
 495            self.execute(sql, params=None)
 496
 497    def remove_constraint(self, model: type[Model], constraint: BaseConstraint) -> None:
 498        """Remove a constraint from a model."""
 499        sql = constraint.remove_sql(model, self)
 500        if sql:
 501            self.execute(sql)
 502
 503    def alter_db_table(
 504        self, model: type[Model], old_db_table: str, new_db_table: str
 505    ) -> None:
 506        """Rename the table a model points to."""
 507        if old_db_table == new_db_table or (
 508            self.connection.features.ignores_table_name_case
 509            and old_db_table.lower() == new_db_table.lower()
 510        ):
 511            return
 512        self.execute(
 513            self.sql_rename_table
 514            % {
 515                "old_table": self.quote_name(old_db_table),
 516                "new_table": self.quote_name(new_db_table),
 517            }
 518        )
 519        # Rename all references to the old table name.
 520        for sql in self.deferred_sql:
 521            if isinstance(sql, Statement):
 522                sql.rename_table_references(old_db_table, new_db_table)
 523
 524    def add_field(self, model: type[Model], field: Field) -> None:
 525        """
 526        Create a field on a model. Usually involves adding a column, but may
 527        involve adding a table instead (for M2M fields).
 528        """
 529        # Get the column's definition
 530        definition, params = self.column_sql(model, field, include_default=True)
 531        # It might not actually have a column behind it
 532        if definition is None:
 533            return
 534        if col_type_suffix := field.db_type_suffix(connection=self.connection):
 535            definition += f" {col_type_suffix}"
 536        # Check constraints can go on the column SQL here
 537        db_params = field.db_parameters(connection=self.connection)
 538        if db_params["check"]:
 539            definition += " " + self.sql_check_constraint % db_params
 540        if (
 541            isinstance(field, ForeignKeyField)
 542            and self.connection.features.supports_foreign_keys
 543            and field.db_constraint
 544        ):
 545            constraint_suffix = "_fk_%(to_table)s_%(to_column)s"
 546            # Add FK constraint inline, if supported.
 547            if self.sql_create_column_inline_fk:
 548                to_table = field.remote_field.model.model_options.db_table
 549                field_name = field.remote_field.field_name
 550                if field_name is None:
 551                    raise ValueError("Foreign key field_name cannot be None")
 552                to_field = field.remote_field.model._model_meta.get_forward_field(
 553                    field_name
 554                )
 555                to_column = to_field.column
 556                namespace, _ = split_identifier(model.model_options.db_table)
 557                definition += " " + self.sql_create_column_inline_fk % {
 558                    "name": self._fk_constraint_name(model, field, constraint_suffix),
 559                    "namespace": f"{self.quote_name(namespace)}." if namespace else "",
 560                    "column": self.quote_name(field.column),
 561                    "to_table": self.quote_name(to_table),
 562                    "to_column": self.quote_name(to_column),
 563                    "deferrable": self.connection.ops.deferrable_sql(),
 564                }
 565            # Otherwise, add FK constraints later.
 566            else:
 567                self.deferred_sql.append(
 568                    self._create_fk_sql(model, field, constraint_suffix)
 569                )
 570        # Build the SQL and run it
 571        sql = self.sql_create_column % {
 572            "table": self.quote_name(model.model_options.db_table),
 573            "column": self.quote_name(field.column),
 574            "definition": definition,
 575        }
 576        self.execute(sql, params)
 577        # Drop the default if we need to
 578        # (Plain usually does not use in-database defaults)
 579        if (
 580            not self.skip_default_on_alter(field)
 581            and self.effective_default(field) is not None
 582        ):
 583            changes_sql, params = self._alter_column_default_sql(
 584                model, None, field, drop=True
 585            )
 586            sql = self.sql_alter_column % {
 587                "table": self.quote_name(model.model_options.db_table),
 588                "changes": changes_sql,
 589            }
 590            self.execute(sql, params)
 591        # Add an index, if required
 592        self.deferred_sql.extend(self._field_indexes_sql(model, field))
 593
 594    def remove_field(self, model: type[Model], field: Field) -> None:
 595        """
 596        Remove a field from a model. Usually involves deleting a column,
 597        but for M2Ms may involve deleting a table.
 598        """
 599        # It might not actually have a column behind it
 600        if field.db_parameters(connection=self.connection)["type"] is None:
 601            return
 602        # Drop any FK constraints, MySQL requires explicit deletion
 603        if isinstance(field, RelatedField):
 604            fk_names = self._constraint_names(model, [field.column], foreign_key=True)
 605            for fk_name in fk_names:
 606                self.execute(self._delete_fk_sql(model, fk_name))
 607        # Delete the column
 608        sql = self.sql_delete_column % {
 609            "table": self.quote_name(model.model_options.db_table),
 610            "column": self.quote_name(field.column),
 611        }
 612        self.execute(sql)
 613        # Remove all deferred statements referencing the deleted column.
 614        for sql in list(self.deferred_sql):
 615            if isinstance(sql, Statement) and sql.references_column(
 616                model.model_options.db_table, field.column
 617            ):
 618                self.deferred_sql.remove(sql)
 619
 620    def alter_field(
 621        self,
 622        model: type[Model],
 623        old_field: Field,
 624        new_field: Field,
 625        strict: bool = False,
 626    ) -> None:
 627        """
 628        Allow a field's type, uniqueness, nullability, default, column,
 629        constraints, etc. to be modified.
 630        `old_field` is required to compute the necessary changes.
 631        If `strict` is True, raise errors if the old column does not match
 632        `old_field` precisely.
 633        """
 634        if not self._field_should_be_altered(old_field, new_field):
 635            return
 636        # Ensure this field is even column-based
 637        old_db_params = old_field.db_parameters(connection=self.connection)
 638        old_type = old_db_params["type"]
 639        new_db_params = new_field.db_parameters(connection=self.connection)
 640        new_type = new_db_params["type"]
 641        if (old_type is None and not isinstance(old_field, RelatedField)) or (
 642            new_type is None and not isinstance(new_field, RelatedField)
 643        ):
 644            raise ValueError(
 645                f"Cannot alter field {old_field} into {new_field} - they do not properly define "
 646                "db_type (are you using a badly-written custom field?)",
 647            )
 648        elif (
 649            old_type is None
 650            and new_type is None
 651            and isinstance(old_field, RelatedField)
 652            and isinstance(old_field.remote_field, ManyToManyRel)
 653            and isinstance(new_field, RelatedField)
 654            and isinstance(new_field.remote_field, ManyToManyRel)
 655        ):
 656            # Both sides have through models; this is a no-op.
 657            return
 658        elif old_type is None or new_type is None:
 659            raise ValueError(
 660                f"Cannot alter field {old_field} into {new_field} - they are not compatible types "
 661                "(you cannot alter to or from M2M fields, or add or remove "
 662                "through= on M2M fields)"
 663            )
 664
 665        self._alter_field(
 666            model,
 667            old_field,
 668            new_field,
 669            old_type,
 670            new_type,
 671            old_db_params,
 672            new_db_params,
 673            strict,
 674        )
 675
 676    def _field_db_check(
 677        self, field: Field, field_db_params: DbParameters
 678    ) -> str | None:
 679        # Always check constraints with the same mocked column name to avoid
 680        # recreating constrains when the column is renamed.
 681        check_constraints = self.connection.data_type_check_constraints
 682        data = field.db_type_parameters(self.connection)
 683        data["column"] = "__column_name__"
 684        try:
 685            return check_constraints[field.get_internal_type()] % data
 686        except KeyError:
 687            return None
 688
 689    def _alter_field(
 690        self,
 691        model: type[Model],
 692        old_field: Field,
 693        new_field: Field,
 694        old_type: str,
 695        new_type: str,
 696        old_db_params: DbParameters,
 697        new_db_params: DbParameters,
 698        strict: bool = False,
 699    ) -> None:
 700        """Perform a "physical" (non-ManyToMany) field update."""
 701        # Drop any FK constraints, we'll remake them later
 702        fks_dropped = set()
 703        if (
 704            self.connection.features.supports_foreign_keys
 705            and isinstance(old_field, ForeignKeyField)
 706            and old_field.db_constraint
 707            and self._field_should_be_altered(
 708                old_field,
 709                new_field,
 710            )
 711        ):
 712            fk_names = self._constraint_names(
 713                model, [old_field.column], foreign_key=True
 714            )
 715            if strict and len(fk_names) != 1:
 716                raise ValueError(
 717                    f"Found wrong number ({len(fk_names)}) of foreign key constraints for {model.model_options.db_table}.{old_field.column}"
 718                )
 719            for fk_name in fk_names:
 720                fks_dropped.add((old_field.column,))
 721                self.execute(self._delete_fk_sql(model, fk_name))
 722        # Has unique been removed?
 723        if old_field.primary_key and (
 724            not new_field.primary_key
 725            or self._field_became_primary_key(old_field, new_field)
 726        ):
 727            # Find the unique constraint for this field
 728            meta_constraint_names = {
 729                constraint.name for constraint in model.model_options.constraints
 730            }
 731            constraint_names = self._constraint_names(
 732                model,
 733                [old_field.column],
 734                unique=True,
 735                primary_key=False,
 736                exclude=meta_constraint_names,
 737            )
 738            if strict and len(constraint_names) != 1:
 739                raise ValueError(
 740                    f"Found wrong number ({len(constraint_names)}) of unique constraints for {model.model_options.db_table}.{old_field.column}"
 741                )
 742            for constraint_name in constraint_names:
 743                sql = self._delete_unique_sql(model, constraint_name)
 744                if sql is not None:
 745                    self.execute(sql)
 746        # Drop incoming FK constraints if the field is a primary key or unique,
 747        # which might be a to_field target, and things are going to change.
 748        drop_foreign_keys = (
 749            self.connection.features.supports_foreign_keys
 750            and (old_field.primary_key and new_field.primary_key)
 751            and old_type != new_type
 752        )
 753        if drop_foreign_keys:
 754            # '_model_meta.related_field' also contains M2M reverse fields, these
 755            # will be filtered out
 756            for _old_rel, new_rel in _related_non_m2m_objects(old_field, new_field):
 757                rel_fk_names = self._constraint_names(
 758                    new_rel.related_model, [new_rel.field.column], foreign_key=True
 759                )
 760                for fk_name in rel_fk_names:
 761                    self.execute(self._delete_fk_sql(new_rel.related_model, fk_name))
 762        # Removed an index? (no strict check, as multiple indexes are possible)
 763        # Remove indexes if db_index switched to False or a unique constraint
 764        # will now be used in lieu of an index. The following lines from the
 765        # truth table show all True cases; the rest are False:
 766        #
 767        # old_field.db_index | old_field.primary_key | new_field.db_index | new_field.primary_key
 768        # ------------------------------------------------------------------------------
 769        # True               | False            | False              | False
 770        # True               | False            | False              | True
 771        # True               | False            | True               | True
 772        if (
 773            isinstance(old_field, ForeignKeyField)
 774            and old_field.db_index
 775            and not old_field.primary_key
 776            and (
 777                not (isinstance(new_field, ForeignKeyField) and new_field.db_index)
 778                or new_field.primary_key
 779            )
 780        ):
 781            # Find the index for this field
 782            meta_index_names = {index.name for index in model.model_options.indexes}
 783            # Retrieve only BTREE indexes since this is what's created with
 784            # db_index=True.
 785            index_names = self._constraint_names(
 786                model,
 787                [old_field.column],
 788                index=True,
 789                type_=Index.suffix,
 790                exclude=meta_index_names,
 791            )
 792            for index_name in index_names:
 793                # The only way to check if an index was created with
 794                # db_index=True or with Index(['field'], name='foo')
 795                # is to look at its name (refs #28053).
 796                self.execute(self._delete_index_sql(model, index_name))
 797        # Change check constraints?
 798        old_db_check = self._field_db_check(old_field, old_db_params)
 799        new_db_check = self._field_db_check(new_field, new_db_params)
 800        if old_db_check != new_db_check and old_db_check:
 801            meta_constraint_names = {
 802                constraint.name for constraint in model.model_options.constraints
 803            }
 804            constraint_names = self._constraint_names(
 805                model,
 806                [old_field.column],
 807                check=True,
 808                exclude=meta_constraint_names,
 809            )
 810            if strict and len(constraint_names) != 1:
 811                raise ValueError(
 812                    f"Found wrong number ({len(constraint_names)}) of check constraints for {model.model_options.db_table}.{old_field.column}"
 813                )
 814            for constraint_name in constraint_names:
 815                sql = self._delete_check_sql(model, constraint_name)
 816                if sql is not None:
 817                    self.execute(sql)
 818        # Have they renamed the column?
 819        if old_field.column != new_field.column:
 820            self.execute(
 821                self._rename_field_sql(
 822                    model.model_options.db_table, old_field, new_field, new_type
 823                )
 824            )
 825            # Rename all references to the renamed column.
 826            for sql in self.deferred_sql:
 827                if isinstance(sql, Statement):
 828                    sql.rename_column_references(
 829                        model.model_options.db_table, old_field.column, new_field.column
 830                    )
 831        # Next, start accumulating actions to do
 832        actions = []
 833        null_actions = []
 834        post_actions = []
 835        # Type suffix change? (e.g. auto increment).
 836        old_type_suffix = old_field.db_type_suffix(connection=self.connection)
 837        new_type_suffix = new_field.db_type_suffix(connection=self.connection)
 838        # Type change?
 839        if old_type != new_type or old_type_suffix != new_type_suffix:
 840            fragment, other_actions = self._alter_column_type_sql(
 841                model, old_field, new_field, new_type
 842            )
 843            actions.append(fragment)
 844            post_actions.extend(other_actions)
 845        # When changing a column NULL constraint to NOT NULL with a given
 846        # default value, we need to perform 4 steps:
 847        #  1. Add a default for new incoming writes
 848        #  2. Update existing NULL rows with new default
 849        #  3. Replace NULL constraint with NOT NULL
 850        #  4. Drop the default again.
 851        # Default change?
 852        needs_database_default = False
 853        if old_field.allow_null and not new_field.allow_null:
 854            old_default = self.effective_default(old_field)
 855            new_default = self.effective_default(new_field)
 856            if (
 857                not self.skip_default_on_alter(new_field)
 858                and old_default != new_default
 859                and new_default is not None
 860            ):
 861                needs_database_default = True
 862                actions.append(
 863                    self._alter_column_default_sql(model, old_field, new_field)
 864                )
 865        # Nullability change?
 866        if old_field.allow_null != new_field.allow_null:
 867            fragment = self._alter_column_null_sql(model, old_field, new_field)
 868            if fragment:
 869                null_actions.append(fragment)
 870        # Only if we have a default and there is a change from NULL to NOT NULL
 871        four_way_default_alteration = new_field.has_default() and (
 872            old_field.allow_null and not new_field.allow_null
 873        )
 874        if actions or null_actions:
 875            if not four_way_default_alteration:
 876                # If we don't have to do a 4-way default alteration we can
 877                # directly run a (NOT) NULL alteration
 878                actions += null_actions
 879            # Combine actions together if we can (e.g. postgres)
 880            if self.connection.features.supports_combined_alters and actions:
 881                sql, params = tuple(zip(*actions))
 882                actions = [(", ".join(sql), sum(params, []))]
 883            # Apply those actions
 884            for sql, params in actions:
 885                self.execute(
 886                    self.sql_alter_column
 887                    % {
 888                        "table": self.quote_name(model.model_options.db_table),
 889                        "changes": sql,
 890                    },
 891                    params,
 892                )
 893            if four_way_default_alteration:
 894                # Update existing rows with default value
 895                self.execute(
 896                    self.sql_update_with_default
 897                    % {
 898                        "table": self.quote_name(model.model_options.db_table),
 899                        "column": self.quote_name(new_field.column),
 900                        "default": "%s",
 901                    },
 902                    [new_default],
 903                )
 904                # Since we didn't run a NOT NULL change before we need to do it
 905                # now
 906                for sql, params in null_actions:
 907                    self.execute(
 908                        self.sql_alter_column
 909                        % {
 910                            "table": self.quote_name(model.model_options.db_table),
 911                            "changes": sql,
 912                        },
 913                        params,
 914                    )
 915        if post_actions:
 916            for sql, params in post_actions:
 917                self.execute(sql, params)
 918        # If primary_key changed to False, delete the primary key constraint.
 919        if old_field.primary_key and not new_field.primary_key:
 920            self._delete_primary_key(model, strict)
 921
 922        # Added an index? Add an index if db_index switched to True or a unique
 923        # constraint will no longer be used in lieu of an index. The following
 924        # lines from the truth table show all True cases; the rest are False:
 925        #
 926        # old_field.db_index | old_field.primary_key | new_field.db_index | new_field.primary_key
 927        # ------------------------------------------------------------------------------
 928        # False              | False            | True               | False
 929        # False              | True             | True               | False
 930        # True               | True             | True               | False
 931        if (
 932            (
 933                not (isinstance(old_field, ForeignKeyField) and old_field.db_index)
 934                or old_field.primary_key
 935            )
 936            and isinstance(new_field, ForeignKeyField)
 937            and new_field.db_index
 938            and not new_field.primary_key
 939        ):
 940            self.execute(self._create_index_sql(model, fields=[new_field]))
 941        # Type alteration on primary key? Then we need to alter the column
 942        # referring to us.
 943        rels_to_update = []
 944        if drop_foreign_keys:
 945            rels_to_update.extend(_related_non_m2m_objects(old_field, new_field))
 946        # Changed to become primary key?
 947        if self._field_became_primary_key(old_field, new_field):
 948            # Make the new one
 949            self.execute(self._create_primary_key_sql(model, new_field))
 950            # Update all referencing columns
 951            rels_to_update.extend(_related_non_m2m_objects(old_field, new_field))
 952        # Handle our type alters on the other end of rels from the PK stuff above
 953        for old_rel, new_rel in rels_to_update:
 954            rel_db_params = new_rel.field.db_parameters(connection=self.connection)
 955            rel_type = rel_db_params["type"]
 956            fragment, other_actions = self._alter_column_type_sql(
 957                new_rel.related_model,
 958                old_rel.field,
 959                new_rel.field,
 960                rel_type,
 961            )
 962            self.execute(
 963                self.sql_alter_column
 964                % {
 965                    "table": self.quote_name(
 966                        new_rel.related_model.model_options.db_table
 967                    ),
 968                    "changes": fragment[0],
 969                },
 970                fragment[1],
 971            )
 972            for sql, params in other_actions:
 973                self.execute(sql, params)
 974        # Does it have a foreign key?
 975        if (
 976            self.connection.features.supports_foreign_keys
 977            and isinstance(new_field, ForeignKeyField)
 978            and (
 979                fks_dropped
 980                or not isinstance(old_field, ForeignKeyField)
 981                or not old_field.db_constraint
 982            )
 983            and new_field.db_constraint
 984        ):
 985            self.execute(
 986                self._create_fk_sql(model, new_field, "_fk_%(to_table)s_%(to_column)s")
 987            )
 988        # Rebuild FKs that pointed to us if we previously had to drop them
 989        if drop_foreign_keys:
 990            for _, rel in rels_to_update:
 991                if isinstance(rel.field, ForeignKeyField) and rel.field.db_constraint:
 992                    self.execute(
 993                        self._create_fk_sql(rel.related_model, rel.field, "_fk")
 994                    )
 995        # Does it have check constraints we need to add?
 996        if old_db_check != new_db_check and new_db_check:
 997            constraint_name = self._create_index_name(
 998                model.model_options.db_table, [new_field.column], suffix="_check"
 999            )
1000            new_check = new_db_params["check"]
1001            assert new_check is not None  # Guaranteed by new_db_check check above
1002            sql = self._create_check_sql(model, constraint_name, new_check)
1003            if sql is not None:
1004                self.execute(sql)
1005        # Drop the default if we need to
1006        # (Plain usually does not use in-database defaults)
1007        if needs_database_default:
1008            changes_sql, params = self._alter_column_default_sql(
1009                model, old_field, new_field, drop=True
1010            )
1011            sql = self.sql_alter_column % {
1012                "table": self.quote_name(model.model_options.db_table),
1013                "changes": changes_sql,
1014            }
1015            self.execute(sql, params)
1016
1017    def _alter_column_null_sql(
1018        self, model: type[Model], old_field: Field, new_field: Field
1019    ) -> tuple[str, list[Any]]:
1020        """
1021        Hook to specialize column null alteration.
1022
1023        Return a (sql, params) fragment to set a column to null or non-null
1024        as required by new_field, or None if no changes are required.
1025        """
1026        new_db_params = new_field.db_parameters(connection=self.connection)
1027        sql = (
1028            self.sql_alter_column_null
1029            if new_field.allow_null
1030            else self.sql_alter_column_not_null
1031        )
1032        return (
1033            sql
1034            % {
1035                "column": self.quote_name(new_field.column),
1036                "type": new_db_params["type"],
1037            },
1038            [],
1039        )
1040
1041    def _alter_column_default_sql(
1042        self,
1043        model: type[Model],
1044        old_field: Field | None,
1045        new_field: Field,
1046        drop: bool = False,
1047    ) -> tuple[str, list[Any]]:
1048        """
1049        Hook to specialize column default alteration.
1050
1051        Return a (sql, params) fragment to add or drop (depending on the drop
1052        argument) a default to new_field's column.
1053        """
1054        new_default = self.effective_default(new_field)
1055        default = self._column_default_sql(new_field)
1056        params = [new_default]
1057
1058        if drop:
1059            params = []
1060        elif self.connection.features.requires_literal_defaults:
1061            # Some databases (Oracle) can't take defaults as a parameter
1062            # If this is the case, the SchemaEditor for that database should
1063            # implement prepare_default().
1064            default = self.prepare_default(new_default)
1065            params = []
1066
1067        new_db_params = new_field.db_parameters(connection=self.connection)
1068        if drop:
1069            if new_field.allow_null:
1070                sql = self.sql_alter_column_no_default_null
1071            else:
1072                sql = self.sql_alter_column_no_default
1073        else:
1074            sql = self.sql_alter_column_default
1075        return (
1076            sql
1077            % {
1078                "column": self.quote_name(new_field.column),
1079                "type": new_db_params["type"],
1080                "default": default,
1081            },
1082            params,
1083        )
1084
1085    def _alter_column_type_sql(
1086        self,
1087        model: type[Model],
1088        old_field: Field,
1089        new_field: Field,
1090        new_type: str,
1091    ) -> tuple[tuple[str, list[Any]], list[tuple[str, list[Any]]]]:
1092        """
1093        Hook to specialize column type alteration for different backends,
1094        for cases when a creation type is different to an alteration type
1095        (e.g. SERIAL in PostgreSQL, PostGIS fields).
1096
1097        Return a two-tuple of: an SQL fragment of (sql, params) to insert into
1098        an ALTER TABLE statement and a list of extra (sql, params) tuples to
1099        run once the field is altered.
1100        """
1101        other_actions: list[tuple[str, list[Any]]] = []
1102        return (
1103            (
1104                self.sql_alter_column_type
1105                % {
1106                    "column": self.quote_name(new_field.column),
1107                    "type": new_type,
1108                },
1109                [],
1110            ),
1111            other_actions,
1112        )
1113
1114    def _alter_many_to_many(
1115        self,
1116        model: type[Model],
1117        old_field: ManyToManyField,
1118        new_field: ManyToManyField,
1119        strict: bool,
1120    ) -> None:
1121        """Alter M2Ms to repoint their to= endpoints."""
1122        # Type narrow for ManyToManyField.remote_field
1123        old_rel: ManyToManyRel = old_field.remote_field
1124        new_rel: ManyToManyRel = new_field.remote_field
1125
1126        # Rename the through table
1127        if (
1128            old_rel.through.model_options.db_table
1129            != new_rel.through.model_options.db_table
1130        ):
1131            self.alter_db_table(
1132                old_rel.through,
1133                old_rel.through.model_options.db_table,
1134                new_rel.through.model_options.db_table,
1135            )
1136        # Repoint the FK to the other side
1137        old_reverse_field = old_rel.through._model_meta.get_forward_field(
1138            old_field.m2m_reverse_field_name()
1139        )
1140        new_reverse_field = new_rel.through._model_meta.get_forward_field(
1141            new_field.m2m_reverse_field_name()
1142        )
1143        self.alter_field(
1144            new_rel.through,
1145            # The field that points to the target model is needed, so we can
1146            # tell alter_field to change it - this is m2m_reverse_field_name()
1147            # (as opposed to m2m_field_name(), which points to our model).
1148            old_reverse_field,
1149            new_reverse_field,
1150        )
1151        old_m2m_field = old_rel.through._model_meta.get_forward_field(
1152            old_field.m2m_field_name()
1153        )
1154        new_m2m_field = new_rel.through._model_meta.get_forward_field(
1155            new_field.m2m_field_name()
1156        )
1157        self.alter_field(
1158            new_rel.through,
1159            # for self-referential models we need to alter field from the other end too
1160            old_m2m_field,
1161            new_m2m_field,
1162        )
1163
1164    def _create_index_name(
1165        self, table_name: str, column_names: list[str], suffix: str = ""
1166    ) -> str:
1167        """
1168        Generate a unique name for an index/unique constraint.
1169
1170        The name is divided into 3 parts: the table name, the column names,
1171        and a unique digest and suffix.
1172        """
1173        _, table_name = split_identifier(table_name)
1174        hash_suffix_part = (
1175            f"{names_digest(table_name, *column_names, length=8)}{suffix}"
1176        )
1177        max_length = self.connection.ops.max_name_length() or 200
1178        # If everything fits into max_length, use that name.
1179        index_name = "{}_{}_{}".format(
1180            table_name, "_".join(column_names), hash_suffix_part
1181        )
1182        if len(index_name) <= max_length:
1183            return index_name
1184        # Shorten a long suffix.
1185        if len(hash_suffix_part) > max_length / 3:
1186            hash_suffix_part = hash_suffix_part[: max_length // 3]
1187        other_length = (max_length - len(hash_suffix_part)) // 2 - 1
1188        index_name = "{}_{}_{}".format(
1189            table_name[:other_length],
1190            "_".join(column_names)[:other_length],
1191            hash_suffix_part,
1192        )
1193        # Prepend D if needed to prevent the name from starting with an
1194        # underscore or a number (not permitted on Oracle).
1195        if index_name[0] == "_" or index_name[0].isdigit():
1196            index_name = f"D{index_name[:-1]}"
1197        return index_name
1198
1199    def _index_condition_sql(self, condition: str | None) -> str:
1200        if condition:
1201            return " WHERE " + condition
1202        return ""
1203
1204    def _index_include_sql(
1205        self, model: type[Model], columns: list[str] | None
1206    ) -> str | Statement:
1207        if not columns or not self.connection.features.supports_covering_indexes:
1208            return ""
1209        return Statement(
1210            " INCLUDE (%(columns)s)",
1211            columns=Columns(model.model_options.db_table, columns, self.quote_name),
1212        )
1213
1214    def _create_index_sql(
1215        self,
1216        model: type[Model],
1217        *,
1218        fields: list[Field] | None = None,
1219        name: str | None = None,
1220        suffix: str = "",
1221        using: str = "",
1222        col_suffixes: tuple[str, ...] = (),
1223        sql: str | None = None,
1224        opclasses: tuple[str, ...] = (),
1225        condition: str | None = None,
1226        include: list[str] | None = None,
1227        expressions: Any = None,
1228    ) -> Statement:
1229        """
1230        Return the SQL statement to create the index for one or several fields
1231        or expressions. `sql` can be specified if the syntax differs from the
1232        standard (GIS indexes, ...).
1233        """
1234        fields = fields or []
1235        expressions = expressions or []
1236        compiler = Query(model, alias_cols=False).get_compiler()
1237        columns = [field.column for field in fields]
1238        sql_create_index = sql or self.sql_create_index
1239        table = model.model_options.db_table
1240
1241        def create_index_name(*args: Any, **kwargs: Any) -> str:
1242            nonlocal name
1243            if name is None:
1244                name = self._create_index_name(*args, **kwargs)
1245            return self.quote_name(name)
1246
1247        return Statement(
1248            sql_create_index,
1249            table=Table(table, self.quote_name),
1250            name=IndexName(table, columns, suffix, create_index_name),
1251            using=using,
1252            columns=(
1253                self._index_columns(table, columns, col_suffixes, opclasses)
1254                if columns
1255                else Expressions(table, expressions, compiler, self.quote_value)
1256            ),
1257            extra="",
1258            condition=self._index_condition_sql(condition),
1259            include=self._index_include_sql(model, include),
1260        )
1261
1262    def _delete_index_sql(
1263        self, model: type[Model], name: str, sql: str | None = None
1264    ) -> Statement:
1265        return Statement(
1266            sql or self.sql_delete_index,
1267            table=Table(model.model_options.db_table, self.quote_name),
1268            name=self.quote_name(name),
1269        )
1270
1271    def _rename_index_sql(
1272        self, model: type[Model], old_name: str, new_name: str
1273    ) -> Statement:
1274        return Statement(
1275            self.sql_rename_index,
1276            table=Table(model.model_options.db_table, self.quote_name),
1277            old_name=self.quote_name(old_name),
1278            new_name=self.quote_name(new_name),
1279        )
1280
1281    def _index_columns(
1282        self,
1283        table: str,
1284        columns: list[str],
1285        col_suffixes: tuple[str, ...],
1286        opclasses: tuple[str, ...],
1287    ) -> Columns:
1288        return Columns(table, columns, self.quote_name, col_suffixes=col_suffixes)
1289
1290    def _model_indexes_sql(self, model: type[Model]) -> list[Statement | None]:
1291        """
1292        Return a list of all index SQL statements (field indexes, Meta.indexes) for the specified model.
1293        """
1294        output: list[Statement | None] = []
1295        for field in model._model_meta.local_fields:
1296            output.extend(self._field_indexes_sql(model, field))
1297
1298        for index in model.model_options.indexes:
1299            if (
1300                not index.contains_expressions
1301                or self.connection.features.supports_expression_indexes
1302            ):
1303                output.append(index.create_sql(model, self))
1304        return output
1305
1306    def _field_indexes_sql(self, model: type[Model], field: Field) -> list[Statement]:
1307        """
1308        Return a list of all index SQL statements for the specified field.
1309        """
1310        output: list[Statement] = []
1311        if self._field_should_be_indexed(model, field):
1312            output.append(self._create_index_sql(model, fields=[field]))
1313        return output
1314
1315    def _field_should_be_altered(
1316        self, old_field: Field, new_field: Field, ignore: set[str] | None = None
1317    ) -> bool:
1318        ignore = ignore or set()
1319        _, old_path, old_args, old_kwargs = old_field.deconstruct()
1320        _, new_path, new_args, new_kwargs = new_field.deconstruct()
1321        # Don't alter when:
1322        # - changing only a field name
1323        # - changing an attribute that doesn't affect the schema
1324        # - changing an attribute in the provided set of ignored attributes
1325        for attr in ignore.union(old_field.non_db_attrs):
1326            old_kwargs.pop(attr, None)
1327        for attr in ignore.union(new_field.non_db_attrs):
1328            new_kwargs.pop(attr, None)
1329        return self.quote_name(old_field.column) != self.quote_name(
1330            new_field.column
1331        ) or (old_path, old_args, old_kwargs) != (new_path, new_args, new_kwargs)
1332
1333    def _field_should_be_indexed(self, model: type[Model], field: Field) -> bool:
1334        if isinstance(field, ForeignKeyField):
1335            return bool(field.remote_field) and field.db_index and not field.primary_key
1336        return False
1337
1338    def _field_became_primary_key(self, old_field: Field, new_field: Field) -> bool:
1339        return not old_field.primary_key and new_field.primary_key
1340
1341    def _rename_field_sql(
1342        self, table: str, old_field: Field, new_field: Field, new_type: str
1343    ) -> str:
1344        return self.sql_rename_column % {
1345            "table": self.quote_name(table),
1346            "old_column": self.quote_name(old_field.column),
1347            "new_column": self.quote_name(new_field.column),
1348            "type": new_type,
1349        }
1350
1351    def _create_fk_sql(
1352        self, model: type[Model], field: ForeignKeyField, suffix: str
1353    ) -> Statement:
1354        table = Table(model.model_options.db_table, self.quote_name)
1355        name = self._fk_constraint_name(model, field, suffix)
1356        column = Columns(model.model_options.db_table, [field.column], self.quote_name)
1357        to_table = Table(
1358            field.target_field.model.model_options.db_table, self.quote_name
1359        )
1360        to_column = Columns(
1361            field.target_field.model.model_options.db_table,
1362            [field.target_field.column],
1363            self.quote_name,
1364        )
1365        deferrable = self.connection.ops.deferrable_sql()
1366        return Statement(
1367            self.sql_create_fk,
1368            table=table,
1369            name=name,
1370            column=column,
1371            to_table=to_table,
1372            to_column=to_column,
1373            deferrable=deferrable,
1374        )
1375
1376    def _fk_constraint_name(
1377        self, model: type[Model], field: ForeignKeyField, suffix: str
1378    ) -> ForeignKeyName:
1379        def create_fk_name(*args: Any, **kwargs: Any) -> str:
1380            return self.quote_name(self._create_index_name(*args, **kwargs))
1381
1382        return ForeignKeyName(
1383            model.model_options.db_table,
1384            [field.column],
1385            split_identifier(field.target_field.model.model_options.db_table)[1],
1386            [field.target_field.column],
1387            suffix,
1388            create_fk_name,
1389        )
1390
1391    def _delete_fk_sql(self, model: type[Model], name: str) -> Statement:
1392        return self._delete_constraint_sql(self.sql_delete_fk, model, name)
1393
1394    def _deferrable_constraint_sql(self, deferrable: Deferrable | None) -> str:
1395        if deferrable is None:
1396            return ""
1397        if deferrable == Deferrable.DEFERRED:
1398            return " DEFERRABLE INITIALLY DEFERRED"
1399        if deferrable == Deferrable.IMMEDIATE:
1400            return " DEFERRABLE INITIALLY IMMEDIATE"
1401        return ""
1402
1403    def _unique_sql(
1404        self,
1405        model: type[Model],
1406        fields: Iterable[Field],
1407        name: str,
1408        condition: str | None = None,
1409        deferrable: Deferrable | None = None,
1410        include: list[str] | None = None,
1411        opclasses: tuple[str, ...] | None = None,
1412        expressions: Any = None,
1413    ) -> str | None:
1414        if (
1415            deferrable
1416            and not self.connection.features.supports_deferrable_unique_constraints
1417        ):
1418            return None
1419        if condition or include or opclasses or expressions:
1420            # Databases support conditional, covering, and functional unique
1421            # constraints via a unique index.
1422            sql = self._create_unique_sql(
1423                model,
1424                fields,
1425                name=name,
1426                condition=condition,
1427                include=include,
1428                opclasses=opclasses,
1429                expressions=expressions,
1430            )
1431            if sql:
1432                self.deferred_sql.append(sql)
1433            return None
1434        constraint = self.sql_unique_constraint % {
1435            "columns": ", ".join([self.quote_name(field.column) for field in fields]),
1436            "deferrable": self._deferrable_constraint_sql(deferrable),
1437        }
1438        return self.sql_constraint % {
1439            "name": self.quote_name(name),
1440            "constraint": constraint,
1441        }
1442
1443    def _create_unique_sql(
1444        self,
1445        model: type[Model],
1446        fields: Iterable[Field],
1447        name: str | None = None,
1448        condition: str | None = None,
1449        deferrable: Deferrable | None = None,
1450        include: list[str] | None = None,
1451        opclasses: tuple[str, ...] | None = None,
1452        expressions: Any = None,
1453    ) -> Statement | None:
1454        if (
1455            (
1456                deferrable
1457                and not self.connection.features.supports_deferrable_unique_constraints
1458            )
1459            or (condition and not self.connection.features.supports_partial_indexes)
1460            or (include and not self.connection.features.supports_covering_indexes)
1461            or (
1462                expressions and not self.connection.features.supports_expression_indexes
1463            )
1464        ):
1465            return None
1466
1467        compiler = Query(model, alias_cols=False).get_compiler()
1468        table = model.model_options.db_table
1469        columns = [field.column for field in fields]
1470        constraint_name: IndexName | str
1471        if name is None:
1472            constraint_name = self._unique_constraint_name(table, columns, quote=True)
1473        else:
1474            constraint_name = self.quote_name(name)
1475        if condition or include or opclasses or expressions:
1476            sql = self.sql_create_unique_index
1477        else:
1478            sql = self.sql_create_unique
1479        if columns:
1480            columns_obj: Columns | Expressions = self._index_columns(
1481                table, columns, col_suffixes=(), opclasses=opclasses or ()
1482            )
1483        else:
1484            columns_obj = Expressions(table, expressions, compiler, self.quote_value)
1485        return Statement(
1486            sql,
1487            table=Table(table, self.quote_name),
1488            name=constraint_name,
1489            columns=columns_obj,
1490            condition=self._index_condition_sql(condition),
1491            deferrable=self._deferrable_constraint_sql(deferrable),
1492            include=self._index_include_sql(model, include),
1493        )
1494
1495    def _unique_constraint_name(
1496        self, table: str, columns: list[str], quote: bool = True
1497    ) -> IndexName | str:
1498        if quote:
1499
1500            def create_unique_name(*args: Any, **kwargs: Any) -> str:
1501                return self.quote_name(self._create_index_name(*args, **kwargs))
1502
1503        else:
1504            create_unique_name = self._create_index_name
1505
1506        return IndexName(table, columns, "_uniq", create_unique_name)
1507
1508    def _delete_unique_sql(
1509        self,
1510        model: type[Model],
1511        name: str,
1512        condition: str | None = None,
1513        deferrable: Deferrable | None = None,
1514        include: list[str] | None = None,
1515        opclasses: tuple[str, ...] | None = None,
1516        expressions: Any = None,
1517    ) -> Statement | None:
1518        if (
1519            (
1520                deferrable
1521                and not self.connection.features.supports_deferrable_unique_constraints
1522            )
1523            or (condition and not self.connection.features.supports_partial_indexes)
1524            or (include and not self.connection.features.supports_covering_indexes)
1525            or (
1526                expressions and not self.connection.features.supports_expression_indexes
1527            )
1528        ):
1529            return None
1530        if condition or include or opclasses or expressions:
1531            sql = self.sql_delete_index
1532        else:
1533            sql = self.sql_delete_unique
1534        return self._delete_constraint_sql(sql, model, name)
1535
1536    def _check_sql(self, name: str, check: str) -> str:
1537        return self.sql_constraint % {
1538            "name": self.quote_name(name),
1539            "constraint": self.sql_check_constraint % {"check": check},
1540        }
1541
1542    def _create_check_sql(
1543        self, model: type[Model], name: str, check: str
1544    ) -> Statement | None:
1545        if not self.connection.features.supports_table_check_constraints:
1546            return None
1547        return Statement(
1548            self.sql_create_check,
1549            table=Table(model.model_options.db_table, self.quote_name),
1550            name=self.quote_name(name),
1551            check=check,
1552        )
1553
1554    def _delete_check_sql(self, model: type[Model], name: str) -> Statement | None:
1555        if not self.connection.features.supports_table_check_constraints:
1556            return None
1557        return self._delete_constraint_sql(self.sql_delete_check, model, name)
1558
1559    def _delete_constraint_sql(
1560        self, template: str, model: type[Model], name: str
1561    ) -> Statement:
1562        return Statement(
1563            template,
1564            table=Table(model.model_options.db_table, self.quote_name),
1565            name=self.quote_name(name),
1566        )
1567
1568    def _constraint_names(
1569        self,
1570        model: type[Model],
1571        column_names: list[str] | None = None,
1572        unique: bool | None = None,
1573        primary_key: bool | None = None,
1574        index: bool | None = None,
1575        foreign_key: bool | None = None,
1576        check: bool | None = None,
1577        type_: str | None = None,
1578        exclude: set[str] | None = None,
1579    ) -> list[str]:
1580        """Return all constraint names matching the columns and conditions."""
1581        if column_names is not None:
1582            column_names = [
1583                self.connection.introspection.identifier_converter(
1584                    truncate_name(name, self.connection.ops.max_name_length())
1585                )
1586                if self.connection.features.truncates_names
1587                else self.connection.introspection.identifier_converter(name)
1588                for name in column_names
1589            ]
1590        with self.connection.cursor() as cursor:
1591            constraints = self.connection.introspection.get_constraints(
1592                cursor, model.model_options.db_table
1593            )
1594        result: list[str] = []
1595        for name, infodict in constraints.items():
1596            if column_names is None or column_names == infodict["columns"]:
1597                if unique is not None and infodict["unique"] != unique:
1598                    continue
1599                if primary_key is not None and infodict["primary_key"] != primary_key:
1600                    continue
1601                if index is not None and infodict["index"] != index:
1602                    continue
1603                if check is not None and infodict["check"] != check:
1604                    continue
1605                if foreign_key is not None and not infodict["foreign_key"]:
1606                    continue
1607                if type_ is not None and infodict["type"] != type_:
1608                    continue
1609                if not exclude or name not in exclude:
1610                    result.append(name)
1611        return result
1612
1613    def _delete_primary_key(self, model: type[Model], strict: bool = False) -> None:
1614        constraint_names = self._constraint_names(model, primary_key=True)
1615        if strict and len(constraint_names) != 1:
1616            raise ValueError(
1617                f"Found wrong number ({len(constraint_names)}) of PK constraints for {model.model_options.db_table}"
1618            )
1619        for constraint_name in constraint_names:
1620            self.execute(self._delete_primary_key_sql(model, constraint_name))
1621
1622    def _create_primary_key_sql(self, model: type[Model], field: Field) -> Statement:
1623        return Statement(
1624            self.sql_create_pk,
1625            table=Table(model.model_options.db_table, self.quote_name),
1626            name=self.quote_name(
1627                self._create_index_name(
1628                    model.model_options.db_table, [field.column], suffix="_pk"
1629                )
1630            ),
1631            columns=Columns(
1632                model.model_options.db_table, [field.column], self.quote_name
1633            ),
1634        )
1635
1636    def _delete_primary_key_sql(self, model: type[Model], name: str) -> Statement:
1637        return self._delete_constraint_sql(self.sql_delete_pk, model, name)