Plain is headed towards 1.0! Subscribe for development updates →

   1from __future__ import annotations
   2
   3import logging
   4import operator
   5from abc import ABC, abstractmethod
   6from collections.abc import Generator
   7from datetime import datetime
   8from typing import TYPE_CHECKING, Any
   9
  10if TYPE_CHECKING:
  11    from typing import Self
  12
  13from plain.models.backends.ddl_references import (
  14    Columns,
  15    Expressions,
  16    ForeignKeyName,
  17    IndexName,
  18    Statement,
  19    Table,
  20)
  21from plain.models.backends.utils import names_digest, split_identifier, truncate_name
  22from plain.models.constraints import Deferrable
  23from plain.models.fields import Field
  24from plain.models.fields.related import ForeignKeyField, RelatedField
  25from plain.models.fields.reverse_related import ManyToManyRel
  26from plain.models.indexes import Index
  27from plain.models.sql import Query
  28from plain.models.transaction import TransactionManagementError, atomic
  29from plain.utils import timezone
  30
  31if TYPE_CHECKING:
  32    from collections.abc import Iterable
  33
  34    from plain.models.backends.base.base import BaseDatabaseWrapper
  35    from plain.models.base import Model
  36    from plain.models.constraints import BaseConstraint
  37    from plain.models.fields import Field
  38    from plain.models.fields.related import ForeignKeyField, ManyToManyField
  39    from plain.models.fields.reverse_related import ManyToManyRel
  40
  41logger = logging.getLogger("plain.models.backends.schema")
  42
  43
  44def _is_relevant_relation(relation: Any, altered_field: Field) -> bool:
  45    """
  46    When altering the given field, must constraints on its model from the given
  47    relation be temporarily dropped?
  48    """
  49    from plain.models.fields.related import ManyToManyField
  50
  51    field = relation.field
  52    if isinstance(field, ManyToManyField):
  53        # M2M reverse field
  54        return False
  55    if altered_field.primary_key:
  56        # Foreign key constraint on the primary key, which is being altered.
  57        return True
  58    # ForeignKeyField always targets 'id'
  59    return altered_field.name == "id"
  60
  61
  62def _all_related_fields(model: type[Model]) -> list[Any]:
  63    # Related fields must be returned in a deterministic order.
  64    return sorted(
  65        model._model_meta._get_fields(
  66            forward=False,
  67            reverse=True,
  68        ),
  69        key=operator.attrgetter("name"),
  70    )
  71
  72
  73def _related_non_m2m_objects(
  74    old_field: Field, new_field: Field
  75) -> Generator[tuple[Any, Any], None, None]:
  76    # Filter out m2m objects from reverse relations.
  77    # Return (old_relation, new_relation) tuples.
  78    related_fields = zip(
  79        (
  80            obj
  81            for obj in _all_related_fields(old_field.model)
  82            if _is_relevant_relation(obj, old_field)
  83        ),
  84        (
  85            obj
  86            for obj in _all_related_fields(new_field.model)
  87            if _is_relevant_relation(obj, new_field)
  88        ),
  89    )
  90    for old_rel, new_rel in related_fields:
  91        yield old_rel, new_rel
  92        yield from _related_non_m2m_objects(
  93            old_rel.remote_field,
  94            new_rel.remote_field,
  95        )
  96
  97
  98class BaseDatabaseSchemaEditor(ABC):
  99    """
 100    This class and its subclasses are responsible for emitting schema-changing
 101    statements to the databases - model creation/removal/alteration, field
 102    renaming, index fiddling, and so on.
 103    """
 104
 105    # Overrideable SQL templates
 106    sql_create_table = "CREATE TABLE %(table)s (%(definition)s)"
 107    sql_rename_table = "ALTER TABLE %(old_table)s RENAME TO %(new_table)s"
 108    sql_delete_table = "DROP TABLE %(table)s CASCADE"
 109
 110    sql_create_column = "ALTER TABLE %(table)s ADD COLUMN %(column)s %(definition)s"
 111    sql_alter_column = "ALTER TABLE %(table)s %(changes)s"
 112    sql_alter_column_type = "ALTER COLUMN %(column)s TYPE %(type)s%(collation)s"
 113    sql_alter_column_null = "ALTER COLUMN %(column)s DROP NOT NULL"
 114    sql_alter_column_not_null = "ALTER COLUMN %(column)s SET NOT NULL"
 115    sql_alter_column_default = "ALTER COLUMN %(column)s SET DEFAULT %(default)s"
 116    sql_alter_column_no_default = "ALTER COLUMN %(column)s DROP DEFAULT"
 117    sql_alter_column_no_default_null = sql_alter_column_no_default
 118    sql_delete_column = "ALTER TABLE %(table)s DROP COLUMN %(column)s CASCADE"
 119    sql_rename_column = (
 120        "ALTER TABLE %(table)s RENAME COLUMN %(old_column)s TO %(new_column)s"
 121    )
 122    sql_update_with_default = (
 123        "UPDATE %(table)s SET %(column)s = %(default)s WHERE %(column)s IS NULL"
 124    )
 125
 126    sql_unique_constraint = "UNIQUE (%(columns)s)%(deferrable)s"
 127    sql_check_constraint = "CHECK (%(check)s)"
 128    sql_delete_constraint = "ALTER TABLE %(table)s DROP CONSTRAINT %(name)s"
 129    sql_constraint = "CONSTRAINT %(name)s %(constraint)s"
 130
 131    sql_create_check = "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s CHECK (%(check)s)"
 132    sql_delete_check = sql_delete_constraint
 133
 134    sql_create_unique = (
 135        "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s "
 136        "UNIQUE (%(columns)s)%(deferrable)s"
 137    )
 138    sql_delete_unique = sql_delete_constraint
 139
 140    sql_create_fk = (
 141        "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s FOREIGN KEY (%(column)s) "
 142        "REFERENCES %(to_table)s (%(to_column)s)%(deferrable)s"
 143    )
 144    sql_create_inline_fk = None
 145    sql_create_column_inline_fk = None
 146    sql_delete_fk = sql_delete_constraint
 147
 148    sql_create_index = (
 149        "CREATE INDEX %(name)s ON %(table)s "
 150        "(%(columns)s)%(include)s%(extra)s%(condition)s"
 151    )
 152    sql_create_unique_index = (
 153        "CREATE UNIQUE INDEX %(name)s ON %(table)s "
 154        "(%(columns)s)%(include)s%(condition)s"
 155    )
 156    sql_rename_index = "ALTER INDEX %(old_name)s RENAME TO %(new_name)s"
 157    sql_delete_index = "DROP INDEX %(name)s"
 158
 159    sql_create_pk = (
 160        "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s PRIMARY KEY (%(columns)s)"
 161    )
 162    sql_delete_pk = sql_delete_constraint
 163
 164    sql_alter_table_comment = "COMMENT ON TABLE %(table)s IS %(comment)s"
 165    sql_alter_column_comment = "COMMENT ON COLUMN %(table)s.%(column)s IS %(comment)s"
 166
 167    def __init__(
 168        self,
 169        connection: BaseDatabaseWrapper,
 170        atomic: bool = True,
 171    ):
 172        self.connection = connection
 173        self.atomic_migration = self.connection.features.can_rollback_ddl and atomic
 174
 175    # State-managing methods
 176
 177    def __enter__(self) -> Self:
 178        self.deferred_sql: list[Any] = []
 179        self.executed_sql: list[str] = []
 180        if self.atomic_migration:
 181            self.atomic = atomic()
 182            self.atomic.__enter__()
 183        return self
 184
 185    def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
 186        if exc_type is None:
 187            for sql in self.deferred_sql:
 188                self.execute(sql)
 189        if self.atomic_migration:
 190            self.atomic.__exit__(exc_type, exc_value, traceback)
 191
 192    # Core utility functions
 193
 194    def execute(
 195        self, sql: str | Statement, params: tuple[Any, ...] | list[Any] | None = ()
 196    ) -> None:
 197        """Execute the given SQL statement, with optional parameters."""
 198        if (
 199            self.connection.in_atomic_block
 200            and not self.connection.features.can_rollback_ddl
 201        ):
 202            raise TransactionManagementError(
 203                "Executing DDL statements while in a transaction on databases "
 204                "that can't perform a rollback is prohibited."
 205            )
 206        # Account for non-string statement objects.
 207        sql = str(sql)
 208        # Log the command we're running, then run it
 209        logger.debug(
 210            "%s; (params %r)", sql, params, extra={"params": params, "sql": sql}
 211        )
 212
 213        # Track executed SQL for display in migration output
 214        # Store the SQL for display (interpolate params for readability)
 215        if params:
 216            self.executed_sql.append(sql % tuple(map(self.quote_value, params)))
 217        else:
 218            self.executed_sql.append(sql)
 219
 220        with self.connection.cursor() as cursor:
 221            cursor.execute(sql, params)
 222
 223    def quote_name(self, name: str) -> str:
 224        return self.connection.ops.quote_name(name)
 225
 226    def table_sql(self, model: type[Model]) -> tuple[str, list[Any]]:
 227        """Take a model and return its table definition."""
 228        # Create column SQL, add FK deferreds if needed.
 229        column_sqls = []
 230        params = []
 231        for field in model._model_meta.local_fields:
 232            # SQL.
 233            definition, extra_params = self.column_sql(model, field)
 234            if definition is None:
 235                continue
 236            # Check constraints can go on the column SQL here.
 237            db_params = field.db_parameters(connection=self.connection)
 238            if db_params["check"]:
 239                definition += " " + self.sql_check_constraint % db_params
 240            # Autoincrement SQL (for backends with inline variant).
 241            col_type_suffix = field.db_type_suffix(connection=self.connection)
 242            if col_type_suffix:
 243                definition += f" {col_type_suffix}"
 244            if extra_params:
 245                params.extend(extra_params)
 246            # FK.
 247            if isinstance(field, ForeignKeyField) and field.db_constraint:
 248                to_table = field.remote_field.model.model_options.db_table
 249                field_name = field.remote_field.field_name
 250                if field_name is None:
 251                    raise ValueError("Foreign key field_name cannot be None")
 252                to_field = field.remote_field.model._model_meta.get_forward_field(
 253                    field_name
 254                )
 255                to_column = to_field.column
 256                if self.sql_create_inline_fk:
 257                    definition += " " + self.sql_create_inline_fk % {
 258                        "to_table": self.quote_name(to_table),
 259                        "to_column": self.quote_name(to_column),
 260                    }
 261                elif self.connection.features.supports_foreign_keys:
 262                    self.deferred_sql.append(
 263                        self._create_fk_sql(
 264                            model, field, "_fk_%(to_table)s_%(to_column)s"
 265                        )
 266                    )
 267            # Add the SQL to our big list.
 268            column_sqls.append(f"{self.quote_name(field.column)} {definition}")
 269            # Autoincrement SQL (for backends with post table definition
 270            # variant).
 271            if field.get_internal_type() in ("PrimaryKeyField",):
 272                autoinc_sql = self.connection.ops.autoinc_sql(
 273                    model.model_options.db_table, field.column
 274                )
 275                if autoinc_sql:
 276                    self.deferred_sql.extend(autoinc_sql)
 277        constraints = [
 278            constraint.constraint_sql(model, self)
 279            for constraint in model.model_options.constraints
 280        ]
 281        sql = self.sql_create_table % {
 282            "table": self.quote_name(model.model_options.db_table),
 283            "definition": ", ".join(
 284                str(constraint)
 285                for constraint in (*column_sqls, *constraints)
 286                if constraint
 287            ),
 288        }
 289        return sql, params
 290
 291    # Field <-> database mapping functions
 292
 293    def _iter_column_sql(
 294        self,
 295        column_db_type: str,
 296        params: list[Any],
 297        model: type[Model],
 298        field: Field,
 299        field_db_params: dict[str, Any],
 300        include_default: bool,
 301    ) -> Generator[str, None, None]:
 302        yield column_db_type
 303        if collation := field_db_params.get("collation"):
 304            yield self._collate_sql(collation)
 305        if self.connection.features.supports_comments_inline and field.db_comment:
 306            yield self._comment_sql(field.db_comment)
 307        # Work out nullability.
 308        null = field.allow_null
 309        # Include a default value, if requested.
 310        include_default = (
 311            include_default
 312            and not self.skip_default(field)
 313            and
 314            # Don't include a default value if it's a nullable field and the
 315            # default cannot be dropped in the ALTER COLUMN statement (e.g.
 316            # MySQL longtext and longblob).
 317            not (null and self.skip_default_on_alter(field))
 318        )
 319        if include_default:
 320            default_value = self.effective_default(field)
 321            if default_value is not None:
 322                column_default = "DEFAULT " + self._column_default_sql(field)
 323                if self.connection.features.requires_literal_defaults:
 324                    # Some databases can't take defaults as a parameter (Oracle).
 325                    # If this is the case, the individual schema backend should
 326                    # implement prepare_default().
 327                    yield column_default % self.prepare_default(default_value)
 328                else:
 329                    yield column_default
 330                    params.append(default_value)
 331
 332        if not null:
 333            yield "NOT NULL"
 334        elif not self.connection.features.implied_column_null:
 335            yield "NULL"
 336
 337        if field.primary_key:
 338            yield "PRIMARY KEY"
 339
 340    def column_sql(
 341        self, model: type[Model], field: Field, include_default: bool = False
 342    ) -> tuple[str | None, list[Any] | None]:
 343        """
 344        Return the column definition for a field. The field must already have
 345        had set_attributes_from_name() called.
 346        """
 347        # Get the column's type and use that as the basis of the SQL.
 348        field_db_params = field.db_parameters(connection=self.connection)
 349        column_db_type = field_db_params["type"]
 350        # Check for fields that aren't actually columns (e.g. M2M).
 351        if column_db_type is None:
 352            return None, None
 353        params: list[Any] = []
 354        return (
 355            " ".join(
 356                # This appends to the params being returned.
 357                self._iter_column_sql(
 358                    column_db_type,
 359                    params,
 360                    model,
 361                    field,
 362                    field_db_params,
 363                    include_default,
 364                )
 365            ),
 366            params,
 367        )
 368
 369    def skip_default(self, field: Field) -> bool:
 370        """
 371        Some backends don't accept default values for certain columns types
 372        (i.e. MySQL longtext and longblob).
 373        """
 374        return False
 375
 376    def skip_default_on_alter(self, field: Field) -> bool:
 377        """
 378        Some backends don't accept default values for certain columns types
 379        (i.e. MySQL longtext and longblob) in the ALTER COLUMN statement.
 380        """
 381        return False
 382
 383    def prepare_default(self, value: Any) -> str:
 384        """
 385        Only used for backends which have requires_literal_defaults feature
 386        """
 387        raise NotImplementedError(
 388            "subclasses of BaseDatabaseSchemaEditor for backends which have "
 389            "requires_literal_defaults must provide a prepare_default() method"
 390        )
 391
 392    def _column_default_sql(self, field: Field) -> str:
 393        """
 394        Return the SQL to use in a DEFAULT clause. The resulting string should
 395        contain a '%s' placeholder for a default value.
 396        """
 397        return "%s"
 398
 399    @staticmethod
 400    def _effective_default(field: Field) -> Any:
 401        # This method allows testing its logic without a connection.
 402        if field.has_default():
 403            default = field.get_default()
 404        elif (
 405            not field.allow_null and not field.required and field.empty_strings_allowed
 406        ):
 407            if field.get_internal_type() == "BinaryField":
 408                default = b""
 409            else:
 410                default = ""
 411        elif getattr(field, "auto_now", False) or getattr(field, "auto_now_add", False):
 412            internal_type = field.get_internal_type()
 413            if internal_type == "DateTimeField":
 414                default = timezone.now()
 415            else:
 416                default = datetime.now()
 417                if internal_type == "DateField":
 418                    default = default.date()
 419                elif internal_type == "TimeField":
 420                    default = default.time()
 421        else:
 422            default = None
 423        return default
 424
 425    def effective_default(self, field: Field) -> Any:
 426        """Return a field's effective database default value."""
 427        return field.get_db_prep_save(self._effective_default(field), self.connection)
 428
 429    @abstractmethod
 430    def quote_value(self, value: Any) -> str:
 431        """
 432        Return a quoted version of the value so it's safe to use in an SQL
 433        string. This is not safe against injection from user code; it is
 434        intended only for use in making SQL scripts or preparing default values
 435        for particularly tricky backends (defaults are not user-defined, though,
 436        so this is safe).
 437        """
 438        ...
 439
 440    # Actions
 441
 442    def create_model(self, model: type[Model]) -> None:
 443        """
 444        Create a table and any accompanying indexes or unique constraints for
 445        the given `model`.
 446        """
 447        sql, params = self.table_sql(model)
 448        # Prevent using [] as params, in the case a literal '%' is used in the
 449        # definition.
 450        self.execute(sql, params or None)
 451
 452        if self.connection.features.supports_comments:
 453            # Add table comment.
 454            if model.model_options.db_table_comment:
 455                self.alter_db_table_comment(
 456                    model, None, model.model_options.db_table_comment
 457                )
 458            # Add column comments.
 459            if not self.connection.features.supports_comments_inline:
 460                for field in model._model_meta.local_fields:
 461                    if field.db_comment:
 462                        field_db_params = field.db_parameters(
 463                            connection=self.connection
 464                        )
 465                        field_type = field_db_params["type"]
 466                        self.execute(
 467                            *self._alter_column_comment_sql(
 468                                model, field, field_type, field.db_comment
 469                            )
 470                        )
 471        # Add any field index (deferred as SQLite _remake_table needs it).
 472        self.deferred_sql.extend(self._model_indexes_sql(model))
 473
 474    def delete_model(self, model: type[Model]) -> None:
 475        """Delete a model from the database."""
 476
 477        # Delete the table
 478        self.execute(
 479            self.sql_delete_table
 480            % {
 481                "table": self.quote_name(model.model_options.db_table),
 482            }
 483        )
 484        # Remove all deferred statements referencing the deleted table.
 485        for sql in list(self.deferred_sql):
 486            if isinstance(sql, Statement) and sql.references_table(
 487                model.model_options.db_table
 488            ):
 489                self.deferred_sql.remove(sql)
 490
 491    def add_index(self, model: type[Model], index: Index) -> None:
 492        """Add an index on a model."""
 493        if (
 494            index.contains_expressions
 495            and not self.connection.features.supports_expression_indexes
 496        ):
 497            return None
 498        # Index.create_sql returns interpolated SQL which makes params=None a
 499        # necessity to avoid escaping attempts on execution.
 500        self.execute(index.create_sql(model, self), params=None)
 501
 502    def remove_index(self, model: type[Model], index: Index) -> None:
 503        """Remove an index from a model."""
 504        if (
 505            index.contains_expressions
 506            and not self.connection.features.supports_expression_indexes
 507        ):
 508            return None
 509        self.execute(index.remove_sql(model, self))
 510
 511    def rename_index(
 512        self, model: type[Model], old_index: Index, new_index: Index
 513    ) -> None:
 514        if self.connection.features.can_rename_index:
 515            self.execute(
 516                self._rename_index_sql(model, old_index.name, new_index.name),
 517                params=None,
 518            )
 519        else:
 520            self.remove_index(model, old_index)
 521            self.add_index(model, new_index)
 522
 523    def add_constraint(self, model: type[Model], constraint: BaseConstraint) -> None:
 524        """Add a constraint to a model."""
 525        sql = constraint.create_sql(model, self)
 526        if sql:
 527            # Constraint.create_sql returns interpolated SQL which makes
 528            # params=None a necessity to avoid escaping attempts on execution.
 529            self.execute(sql, params=None)
 530
 531    def remove_constraint(self, model: type[Model], constraint: BaseConstraint) -> None:
 532        """Remove a constraint from a model."""
 533        sql = constraint.remove_sql(model, self)
 534        if sql:
 535            self.execute(sql)
 536
 537    def alter_db_table(
 538        self, model: type[Model], old_db_table: str, new_db_table: str
 539    ) -> None:
 540        """Rename the table a model points to."""
 541        if old_db_table == new_db_table or (
 542            self.connection.features.ignores_table_name_case
 543            and old_db_table.lower() == new_db_table.lower()
 544        ):
 545            return
 546        self.execute(
 547            self.sql_rename_table
 548            % {
 549                "old_table": self.quote_name(old_db_table),
 550                "new_table": self.quote_name(new_db_table),
 551            }
 552        )
 553        # Rename all references to the old table name.
 554        for sql in self.deferred_sql:
 555            if isinstance(sql, Statement):
 556                sql.rename_table_references(old_db_table, new_db_table)
 557
 558    def alter_db_table_comment(
 559        self,
 560        model: type[Model],
 561        old_db_table_comment: str | None,
 562        new_db_table_comment: str | None,
 563    ) -> None:
 564        self.execute(
 565            self.sql_alter_table_comment
 566            % {
 567                "table": self.quote_name(model.model_options.db_table),
 568                "comment": self.quote_value(new_db_table_comment or ""),
 569            }
 570        )
 571
 572    def add_field(self, model: type[Model], field: Field) -> None:
 573        """
 574        Create a field on a model. Usually involves adding a column, but may
 575        involve adding a table instead (for M2M fields).
 576        """
 577        # Get the column's definition
 578        definition, params = self.column_sql(model, field, include_default=True)
 579        # It might not actually have a column behind it
 580        if definition is None:
 581            return
 582        if col_type_suffix := field.db_type_suffix(connection=self.connection):
 583            definition += f" {col_type_suffix}"
 584        # Check constraints can go on the column SQL here
 585        db_params = field.db_parameters(connection=self.connection)
 586        if db_params["check"]:
 587            definition += " " + self.sql_check_constraint % db_params
 588        if (
 589            isinstance(field, ForeignKeyField)
 590            and self.connection.features.supports_foreign_keys
 591            and field.db_constraint
 592        ):
 593            constraint_suffix = "_fk_%(to_table)s_%(to_column)s"
 594            # Add FK constraint inline, if supported.
 595            if self.sql_create_column_inline_fk:
 596                to_table = field.remote_field.model.model_options.db_table
 597                field_name = field.remote_field.field_name
 598                if field_name is None:
 599                    raise ValueError("Foreign key field_name cannot be None")
 600                to_field = field.remote_field.model._model_meta.get_forward_field(
 601                    field_name
 602                )
 603                to_column = to_field.column
 604                namespace, _ = split_identifier(model.model_options.db_table)
 605                definition += " " + self.sql_create_column_inline_fk % {
 606                    "name": self._fk_constraint_name(model, field, constraint_suffix),
 607                    "namespace": f"{self.quote_name(namespace)}." if namespace else "",
 608                    "column": self.quote_name(field.column),
 609                    "to_table": self.quote_name(to_table),
 610                    "to_column": self.quote_name(to_column),
 611                    "deferrable": self.connection.ops.deferrable_sql(),
 612                }
 613            # Otherwise, add FK constraints later.
 614            else:
 615                self.deferred_sql.append(
 616                    self._create_fk_sql(model, field, constraint_suffix)
 617                )
 618        # Build the SQL and run it
 619        sql = self.sql_create_column % {
 620            "table": self.quote_name(model.model_options.db_table),
 621            "column": self.quote_name(field.column),
 622            "definition": definition,
 623        }
 624        self.execute(sql, params)
 625        # Drop the default if we need to
 626        # (Plain usually does not use in-database defaults)
 627        if (
 628            not self.skip_default_on_alter(field)
 629            and self.effective_default(field) is not None
 630        ):
 631            changes_sql, params = self._alter_column_default_sql(
 632                model, None, field, drop=True
 633            )
 634            sql = self.sql_alter_column % {
 635                "table": self.quote_name(model.model_options.db_table),
 636                "changes": changes_sql,
 637            }
 638            self.execute(sql, params)
 639        # Add field comment, if required.
 640        if (
 641            field.db_comment
 642            and self.connection.features.supports_comments
 643            and not self.connection.features.supports_comments_inline
 644        ):
 645            field_type = db_params["type"]
 646            self.execute(
 647                *self._alter_column_comment_sql(
 648                    model, field, field_type, field.db_comment
 649                )
 650            )
 651        # Add an index, if required
 652        self.deferred_sql.extend(self._field_indexes_sql(model, field))
 653        # Reset connection if required
 654        if self.connection.features.connection_persists_old_columns:
 655            self.connection.close()
 656
 657    def remove_field(self, model: type[Model], field: Field) -> None:
 658        """
 659        Remove a field from a model. Usually involves deleting a column,
 660        but for M2Ms may involve deleting a table.
 661        """
 662        # It might not actually have a column behind it
 663        if field.db_parameters(connection=self.connection)["type"] is None:
 664            return
 665        # Drop any FK constraints, MySQL requires explicit deletion
 666        if isinstance(field, RelatedField):
 667            fk_names = self._constraint_names(model, [field.column], foreign_key=True)
 668            for fk_name in fk_names:
 669                self.execute(self._delete_fk_sql(model, fk_name))
 670        # Delete the column
 671        sql = self.sql_delete_column % {
 672            "table": self.quote_name(model.model_options.db_table),
 673            "column": self.quote_name(field.column),
 674        }
 675        self.execute(sql)
 676        # Reset connection if required
 677        if self.connection.features.connection_persists_old_columns:
 678            self.connection.close()
 679        # Remove all deferred statements referencing the deleted column.
 680        for sql in list(self.deferred_sql):
 681            if isinstance(sql, Statement) and sql.references_column(
 682                model.model_options.db_table, field.column
 683            ):
 684                self.deferred_sql.remove(sql)
 685
 686    def alter_field(
 687        self,
 688        model: type[Model],
 689        old_field: Field,
 690        new_field: Field,
 691        strict: bool = False,
 692    ) -> None:
 693        """
 694        Allow a field's type, uniqueness, nullability, default, column,
 695        constraints, etc. to be modified.
 696        `old_field` is required to compute the necessary changes.
 697        If `strict` is True, raise errors if the old column does not match
 698        `old_field` precisely.
 699        """
 700        if not self._field_should_be_altered(old_field, new_field):
 701            return
 702        # Ensure this field is even column-based
 703        old_db_params = old_field.db_parameters(connection=self.connection)
 704        old_type = old_db_params["type"]
 705        new_db_params = new_field.db_parameters(connection=self.connection)
 706        new_type = new_db_params["type"]
 707        if (old_type is None and not isinstance(old_field, RelatedField)) or (
 708            new_type is None and not isinstance(new_field, RelatedField)
 709        ):
 710            raise ValueError(
 711                f"Cannot alter field {old_field} into {new_field} - they do not properly define "
 712                "db_type (are you using a badly-written custom field?)",
 713            )
 714        elif (
 715            old_type is None
 716            and new_type is None
 717            and isinstance(old_field, RelatedField)
 718            and isinstance(old_field.remote_field, ManyToManyRel)
 719            and isinstance(new_field, RelatedField)
 720            and isinstance(new_field.remote_field, ManyToManyRel)
 721        ):
 722            # Both sides have through models; this is a no-op.
 723            return
 724        elif old_type is None or new_type is None:
 725            raise ValueError(
 726                f"Cannot alter field {old_field} into {new_field} - they are not compatible types "
 727                "(you cannot alter to or from M2M fields, or add or remove "
 728                "through= on M2M fields)"
 729            )
 730
 731        self._alter_field(
 732            model,
 733            old_field,
 734            new_field,
 735            old_type,
 736            new_type,
 737            old_db_params,
 738            new_db_params,
 739            strict,
 740        )
 741
 742    def _field_db_check(
 743        self, field: Field, field_db_params: dict[str, Any]
 744    ) -> str | None:
 745        # Always check constraints with the same mocked column name to avoid
 746        # recreating constrains when the column is renamed.
 747        check_constraints = self.connection.data_type_check_constraints
 748        data = field.db_type_parameters(self.connection)
 749        data["column"] = "__column_name__"
 750        try:
 751            return check_constraints[field.get_internal_type()] % data
 752        except KeyError:
 753            return None
 754
 755    def _alter_field(
 756        self,
 757        model: type[Model],
 758        old_field: Field,
 759        new_field: Field,
 760        old_type: str,
 761        new_type: str,
 762        old_db_params: dict[str, Any],
 763        new_db_params: dict[str, Any],
 764        strict: bool = False,
 765    ) -> None:
 766        """Perform a "physical" (non-ManyToMany) field update."""
 767        # Drop any FK constraints, we'll remake them later
 768        fks_dropped = set()
 769        if (
 770            self.connection.features.supports_foreign_keys
 771            and isinstance(old_field, ForeignKeyField)
 772            and old_field.db_constraint
 773            and self._field_should_be_altered(
 774                old_field,
 775                new_field,
 776                ignore={"db_comment"},
 777            )
 778        ):
 779            fk_names = self._constraint_names(
 780                model, [old_field.column], foreign_key=True
 781            )
 782            if strict and len(fk_names) != 1:
 783                raise ValueError(
 784                    f"Found wrong number ({len(fk_names)}) of foreign key constraints for {model.model_options.db_table}.{old_field.column}"
 785                )
 786            for fk_name in fk_names:
 787                fks_dropped.add((old_field.column,))
 788                self.execute(self._delete_fk_sql(model, fk_name))
 789        # Has unique been removed?
 790        if old_field.primary_key and (
 791            not new_field.primary_key
 792            or self._field_became_primary_key(old_field, new_field)
 793        ):
 794            # Find the unique constraint for this field
 795            meta_constraint_names = {
 796                constraint.name for constraint in model.model_options.constraints
 797            }
 798            constraint_names = self._constraint_names(
 799                model,
 800                [old_field.column],
 801                unique=True,
 802                primary_key=False,
 803                exclude=meta_constraint_names,
 804            )
 805            if strict and len(constraint_names) != 1:
 806                raise ValueError(
 807                    f"Found wrong number ({len(constraint_names)}) of unique constraints for {model.model_options.db_table}.{old_field.column}"
 808                )
 809            for constraint_name in constraint_names:
 810                sql = self._delete_unique_sql(model, constraint_name)
 811                if sql is not None:
 812                    self.execute(sql)
 813        # Drop incoming FK constraints if the field is a primary key or unique,
 814        # which might be a to_field target, and things are going to change.
 815        old_collation = old_db_params.get("collation")
 816        new_collation = new_db_params.get("collation")
 817        drop_foreign_keys = (
 818            self.connection.features.supports_foreign_keys
 819            and (old_field.primary_key and new_field.primary_key)
 820            and ((old_type != new_type) or (old_collation != new_collation))
 821        )
 822        if drop_foreign_keys:
 823            # '_model_meta.related_field' also contains M2M reverse fields, these
 824            # will be filtered out
 825            for _old_rel, new_rel in _related_non_m2m_objects(old_field, new_field):
 826                rel_fk_names = self._constraint_names(
 827                    new_rel.related_model, [new_rel.field.column], foreign_key=True
 828                )
 829                for fk_name in rel_fk_names:
 830                    self.execute(self._delete_fk_sql(new_rel.related_model, fk_name))
 831        # Removed an index? (no strict check, as multiple indexes are possible)
 832        # Remove indexes if db_index switched to False or a unique constraint
 833        # will now be used in lieu of an index. The following lines from the
 834        # truth table show all True cases; the rest are False:
 835        #
 836        # old_field.db_index | old_field.primary_key | new_field.db_index | new_field.primary_key
 837        # ------------------------------------------------------------------------------
 838        # True               | False            | False              | False
 839        # True               | False            | False              | True
 840        # True               | False            | True               | True
 841        if (
 842            isinstance(old_field, ForeignKeyField)
 843            and old_field.db_index
 844            and not old_field.primary_key
 845            and (
 846                not (isinstance(new_field, ForeignKeyField) and new_field.db_index)
 847                or new_field.primary_key
 848            )
 849        ):
 850            # Find the index for this field
 851            meta_index_names = {index.name for index in model.model_options.indexes}
 852            # Retrieve only BTREE indexes since this is what's created with
 853            # db_index=True.
 854            index_names = self._constraint_names(
 855                model,
 856                [old_field.column],
 857                index=True,
 858                type_=Index.suffix,
 859                exclude=meta_index_names,
 860            )
 861            for index_name in index_names:
 862                # The only way to check if an index was created with
 863                # db_index=True or with Index(['field'], name='foo')
 864                # is to look at its name (refs #28053).
 865                self.execute(self._delete_index_sql(model, index_name))
 866        # Change check constraints?
 867        old_db_check = self._field_db_check(old_field, old_db_params)
 868        new_db_check = self._field_db_check(new_field, new_db_params)
 869        if old_db_check != new_db_check and old_db_check:
 870            meta_constraint_names = {
 871                constraint.name for constraint in model.model_options.constraints
 872            }
 873            constraint_names = self._constraint_names(
 874                model,
 875                [old_field.column],
 876                check=True,
 877                exclude=meta_constraint_names,
 878            )
 879            if strict and len(constraint_names) != 1:
 880                raise ValueError(
 881                    f"Found wrong number ({len(constraint_names)}) of check constraints for {model.model_options.db_table}.{old_field.column}"
 882                )
 883            for constraint_name in constraint_names:
 884                sql = self._delete_check_sql(model, constraint_name)
 885                if sql is not None:
 886                    self.execute(sql)
 887        # Have they renamed the column?
 888        if old_field.column != new_field.column:
 889            self.execute(
 890                self._rename_field_sql(
 891                    model.model_options.db_table, old_field, new_field, new_type
 892                )
 893            )
 894            # Rename all references to the renamed column.
 895            for sql in self.deferred_sql:
 896                if isinstance(sql, Statement):
 897                    sql.rename_column_references(
 898                        model.model_options.db_table, old_field.column, new_field.column
 899                    )
 900        # Next, start accumulating actions to do
 901        actions = []
 902        null_actions = []
 903        post_actions = []
 904        # Type suffix change? (e.g. auto increment).
 905        old_type_suffix = old_field.db_type_suffix(connection=self.connection)
 906        new_type_suffix = new_field.db_type_suffix(connection=self.connection)
 907        # Type, collation, or comment change?
 908        if (
 909            old_type != new_type
 910            or old_type_suffix != new_type_suffix
 911            or old_collation != new_collation
 912            or (
 913                self.connection.features.supports_comments
 914                and old_field.db_comment != new_field.db_comment
 915            )
 916        ):
 917            fragment, other_actions = self._alter_column_type_sql(
 918                model, old_field, new_field, new_type, old_collation, new_collation
 919            )
 920            actions.append(fragment)
 921            post_actions.extend(other_actions)
 922        # When changing a column NULL constraint to NOT NULL with a given
 923        # default value, we need to perform 4 steps:
 924        #  1. Add a default for new incoming writes
 925        #  2. Update existing NULL rows with new default
 926        #  3. Replace NULL constraint with NOT NULL
 927        #  4. Drop the default again.
 928        # Default change?
 929        needs_database_default = False
 930        if old_field.allow_null and not new_field.allow_null:
 931            old_default = self.effective_default(old_field)
 932            new_default = self.effective_default(new_field)
 933            if (
 934                not self.skip_default_on_alter(new_field)
 935                and old_default != new_default
 936                and new_default is not None
 937            ):
 938                needs_database_default = True
 939                actions.append(
 940                    self._alter_column_default_sql(model, old_field, new_field)
 941                )
 942        # Nullability change?
 943        if old_field.allow_null != new_field.allow_null:
 944            fragment = self._alter_column_null_sql(model, old_field, new_field)
 945            if fragment:
 946                null_actions.append(fragment)
 947        # Only if we have a default and there is a change from NULL to NOT NULL
 948        four_way_default_alteration = new_field.has_default() and (
 949            old_field.allow_null and not new_field.allow_null
 950        )
 951        if actions or null_actions:
 952            if not four_way_default_alteration:
 953                # If we don't have to do a 4-way default alteration we can
 954                # directly run a (NOT) NULL alteration
 955                actions += null_actions
 956            # Combine actions together if we can (e.g. postgres)
 957            if self.connection.features.supports_combined_alters and actions:
 958                sql, params = tuple(zip(*actions))
 959                actions = [(", ".join(sql), sum(params, []))]
 960            # Apply those actions
 961            for sql, params in actions:
 962                self.execute(
 963                    self.sql_alter_column
 964                    % {
 965                        "table": self.quote_name(model.model_options.db_table),
 966                        "changes": sql,
 967                    },
 968                    params,
 969                )
 970            if four_way_default_alteration:
 971                # Update existing rows with default value
 972                self.execute(
 973                    self.sql_update_with_default
 974                    % {
 975                        "table": self.quote_name(model.model_options.db_table),
 976                        "column": self.quote_name(new_field.column),
 977                        "default": "%s",
 978                    },
 979                    [new_default],
 980                )
 981                # Since we didn't run a NOT NULL change before we need to do it
 982                # now
 983                for sql, params in null_actions:
 984                    self.execute(
 985                        self.sql_alter_column
 986                        % {
 987                            "table": self.quote_name(model.model_options.db_table),
 988                            "changes": sql,
 989                        },
 990                        params,
 991                    )
 992        if post_actions:
 993            for sql, params in post_actions:
 994                self.execute(sql, params)
 995        # If primary_key changed to False, delete the primary key constraint.
 996        if old_field.primary_key and not new_field.primary_key:
 997            self._delete_primary_key(model, strict)
 998
 999        # Added an index? Add an index if db_index switched to True or a unique
1000        # constraint will no longer be used in lieu of an index. The following
1001        # lines from the truth table show all True cases; the rest are False:
1002        #
1003        # old_field.db_index | old_field.primary_key | new_field.db_index | new_field.primary_key
1004        # ------------------------------------------------------------------------------
1005        # False              | False            | True               | False
1006        # False              | True             | True               | False
1007        # True               | True             | True               | False
1008        if (
1009            (
1010                not (isinstance(old_field, ForeignKeyField) and old_field.db_index)
1011                or old_field.primary_key
1012            )
1013            and isinstance(new_field, ForeignKeyField)
1014            and new_field.db_index
1015            and not new_field.primary_key
1016        ):
1017            self.execute(self._create_index_sql(model, fields=[new_field]))
1018        # Type alteration on primary key? Then we need to alter the column
1019        # referring to us.
1020        rels_to_update = []
1021        if drop_foreign_keys:
1022            rels_to_update.extend(_related_non_m2m_objects(old_field, new_field))
1023        # Changed to become primary key?
1024        if self._field_became_primary_key(old_field, new_field):
1025            # Make the new one
1026            self.execute(self._create_primary_key_sql(model, new_field))
1027            # Update all referencing columns
1028            rels_to_update.extend(_related_non_m2m_objects(old_field, new_field))
1029        # Handle our type alters on the other end of rels from the PK stuff above
1030        for old_rel, new_rel in rels_to_update:
1031            rel_db_params = new_rel.field.db_parameters(connection=self.connection)
1032            rel_type = rel_db_params["type"]
1033            rel_collation = rel_db_params.get("collation")
1034            old_rel_db_params = old_rel.field.db_parameters(connection=self.connection)
1035            old_rel_collation = old_rel_db_params.get("collation")
1036            fragment, other_actions = self._alter_column_type_sql(
1037                new_rel.related_model,
1038                old_rel.field,
1039                new_rel.field,
1040                rel_type,
1041                old_rel_collation,
1042                rel_collation,
1043            )
1044            self.execute(
1045                self.sql_alter_column
1046                % {
1047                    "table": self.quote_name(
1048                        new_rel.related_model.model_options.db_table
1049                    ),
1050                    "changes": fragment[0],
1051                },
1052                fragment[1],
1053            )
1054            for sql, params in other_actions:
1055                self.execute(sql, params)
1056        # Does it have a foreign key?
1057        if (
1058            self.connection.features.supports_foreign_keys
1059            and isinstance(new_field, ForeignKeyField)
1060            and (
1061                fks_dropped
1062                or not isinstance(old_field, ForeignKeyField)
1063                or not old_field.db_constraint
1064            )
1065            and new_field.db_constraint
1066        ):
1067            self.execute(
1068                self._create_fk_sql(model, new_field, "_fk_%(to_table)s_%(to_column)s")
1069            )
1070        # Rebuild FKs that pointed to us if we previously had to drop them
1071        if drop_foreign_keys:
1072            for _, rel in rels_to_update:
1073                if isinstance(rel.field, ForeignKeyField) and rel.field.db_constraint:
1074                    self.execute(
1075                        self._create_fk_sql(rel.related_model, rel.field, "_fk")
1076                    )
1077        # Does it have check constraints we need to add?
1078        if old_db_check != new_db_check and new_db_check:
1079            constraint_name = self._create_index_name(
1080                model.model_options.db_table, [new_field.column], suffix="_check"
1081            )
1082            sql = self._create_check_sql(model, constraint_name, new_db_params["check"])
1083            if sql is not None:
1084                self.execute(sql)
1085        # Drop the default if we need to
1086        # (Plain usually does not use in-database defaults)
1087        if needs_database_default:
1088            changes_sql, params = self._alter_column_default_sql(
1089                model, old_field, new_field, drop=True
1090            )
1091            sql = self.sql_alter_column % {
1092                "table": self.quote_name(model.model_options.db_table),
1093                "changes": changes_sql,
1094            }
1095            self.execute(sql, params)
1096        # Reset connection if required
1097        if self.connection.features.connection_persists_old_columns:
1098            self.connection.close()
1099
1100    def _alter_column_null_sql(
1101        self, model: type[Model], old_field: Field, new_field: Field
1102    ) -> tuple[str, list[Any]]:
1103        """
1104        Hook to specialize column null alteration.
1105
1106        Return a (sql, params) fragment to set a column to null or non-null
1107        as required by new_field, or None if no changes are required.
1108        """
1109        new_db_params = new_field.db_parameters(connection=self.connection)
1110        sql = (
1111            self.sql_alter_column_null
1112            if new_field.allow_null
1113            else self.sql_alter_column_not_null
1114        )
1115        return (
1116            sql
1117            % {
1118                "column": self.quote_name(new_field.column),
1119                "type": new_db_params["type"],
1120            },
1121            [],
1122        )
1123
1124    def _alter_column_default_sql(
1125        self,
1126        model: type[Model],
1127        old_field: Field | None,
1128        new_field: Field,
1129        drop: bool = False,
1130    ) -> tuple[str, list[Any]]:
1131        """
1132        Hook to specialize column default alteration.
1133
1134        Return a (sql, params) fragment to add or drop (depending on the drop
1135        argument) a default to new_field's column.
1136        """
1137        new_default = self.effective_default(new_field)
1138        default = self._column_default_sql(new_field)
1139        params = [new_default]
1140
1141        if drop:
1142            params = []
1143        elif self.connection.features.requires_literal_defaults:
1144            # Some databases (Oracle) can't take defaults as a parameter
1145            # If this is the case, the SchemaEditor for that database should
1146            # implement prepare_default().
1147            default = self.prepare_default(new_default)
1148            params = []
1149
1150        new_db_params = new_field.db_parameters(connection=self.connection)
1151        if drop:
1152            if new_field.allow_null:
1153                sql = self.sql_alter_column_no_default_null
1154            else:
1155                sql = self.sql_alter_column_no_default
1156        else:
1157            sql = self.sql_alter_column_default
1158        return (
1159            sql
1160            % {
1161                "column": self.quote_name(new_field.column),
1162                "type": new_db_params["type"],
1163                "default": default,
1164            },
1165            params,
1166        )
1167
1168    def _alter_column_type_sql(
1169        self,
1170        model: type[Model],
1171        old_field: Field,
1172        new_field: Field,
1173        new_type: str,
1174        old_collation: str | None,
1175        new_collation: str | None,
1176    ) -> tuple[tuple[str, list[Any]], list[tuple[str, list[Any]]]]:
1177        """
1178        Hook to specialize column type alteration for different backends,
1179        for cases when a creation type is different to an alteration type
1180        (e.g. SERIAL in PostgreSQL, PostGIS fields).
1181
1182        Return a two-tuple of: an SQL fragment of (sql, params) to insert into
1183        an ALTER TABLE statement and a list of extra (sql, params) tuples to
1184        run once the field is altered.
1185        """
1186        other_actions = []
1187        if collate_sql := self._collate_sql(
1188            new_collation, old_collation, model.model_options.db_table
1189        ):
1190            collate_sql = f" {collate_sql}"
1191        else:
1192            collate_sql = ""
1193        # Comment change?
1194        from plain.models.fields.related import ManyToManyField
1195
1196        comment_sql = ""
1197        if self.connection.features.supports_comments and not isinstance(
1198            new_field, ManyToManyField
1199        ):
1200            if old_field.db_comment != new_field.db_comment:
1201                # PostgreSQL and Oracle can't execute 'ALTER COLUMN ...' and
1202                # 'COMMENT ON ...' at the same time.
1203                sql, params = self._alter_column_comment_sql(
1204                    model, new_field, new_type, new_field.db_comment
1205                )
1206                if sql:
1207                    other_actions.append((sql, params))
1208            if new_field.db_comment:
1209                comment_sql = self._comment_sql(new_field.db_comment)
1210        return (
1211            (
1212                self.sql_alter_column_type
1213                % {
1214                    "column": self.quote_name(new_field.column),
1215                    "type": new_type,
1216                    "collation": collate_sql,
1217                    "comment": comment_sql,
1218                },
1219                [],
1220            ),
1221            other_actions,
1222        )
1223
1224    def _alter_column_comment_sql(
1225        self,
1226        model: type[Model],
1227        new_field: Field,
1228        new_type: str,
1229        new_db_comment: str | None,
1230    ) -> tuple[str, list[Any]]:
1231        return (
1232            self.sql_alter_column_comment
1233            % {
1234                "table": self.quote_name(model.model_options.db_table),
1235                "column": self.quote_name(new_field.column),
1236                "comment": self._comment_sql(new_db_comment),
1237            },
1238            [],
1239        )
1240
1241    def _comment_sql(self, comment: str | None) -> str:
1242        return self.quote_value(comment or "")
1243
1244    def _alter_many_to_many(
1245        self,
1246        model: type[Model],
1247        old_field: ManyToManyField,
1248        new_field: ManyToManyField,
1249        strict: bool,
1250    ) -> None:
1251        """Alter M2Ms to repoint their to= endpoints."""
1252        # Type narrow for ManyToManyField.remote_field
1253        old_rel: ManyToManyRel = old_field.remote_field
1254        new_rel: ManyToManyRel = new_field.remote_field
1255
1256        # Rename the through table
1257        if (
1258            old_rel.through.model_options.db_table
1259            != new_rel.through.model_options.db_table
1260        ):
1261            self.alter_db_table(
1262                old_rel.through,
1263                old_rel.through.model_options.db_table,
1264                new_rel.through.model_options.db_table,
1265            )
1266        # Repoint the FK to the other side
1267        old_reverse_field = old_rel.through._model_meta.get_forward_field(
1268            old_field.m2m_reverse_field_name()
1269        )
1270        new_reverse_field = new_rel.through._model_meta.get_forward_field(
1271            new_field.m2m_reverse_field_name()
1272        )
1273        self.alter_field(
1274            new_rel.through,
1275            # The field that points to the target model is needed, so we can
1276            # tell alter_field to change it - this is m2m_reverse_field_name()
1277            # (as opposed to m2m_field_name(), which points to our model).
1278            old_reverse_field,
1279            new_reverse_field,
1280        )
1281        old_m2m_field = old_rel.through._model_meta.get_forward_field(
1282            old_field.m2m_field_name()
1283        )
1284        new_m2m_field = new_rel.through._model_meta.get_forward_field(
1285            new_field.m2m_field_name()
1286        )
1287        self.alter_field(
1288            new_rel.through,
1289            # for self-referential models we need to alter field from the other end too
1290            old_m2m_field,
1291            new_m2m_field,
1292        )
1293
1294    def _create_index_name(
1295        self, table_name: str, column_names: list[str], suffix: str = ""
1296    ) -> str:
1297        """
1298        Generate a unique name for an index/unique constraint.
1299
1300        The name is divided into 3 parts: the table name, the column names,
1301        and a unique digest and suffix.
1302        """
1303        _, table_name = split_identifier(table_name)
1304        hash_suffix_part = (
1305            f"{names_digest(table_name, *column_names, length=8)}{suffix}"
1306        )
1307        max_length = self.connection.ops.max_name_length() or 200
1308        # If everything fits into max_length, use that name.
1309        index_name = "{}_{}_{}".format(
1310            table_name, "_".join(column_names), hash_suffix_part
1311        )
1312        if len(index_name) <= max_length:
1313            return index_name
1314        # Shorten a long suffix.
1315        if len(hash_suffix_part) > max_length / 3:
1316            hash_suffix_part = hash_suffix_part[: max_length // 3]
1317        other_length = (max_length - len(hash_suffix_part)) // 2 - 1
1318        index_name = "{}_{}_{}".format(
1319            table_name[:other_length],
1320            "_".join(column_names)[:other_length],
1321            hash_suffix_part,
1322        )
1323        # Prepend D if needed to prevent the name from starting with an
1324        # underscore or a number (not permitted on Oracle).
1325        if index_name[0] == "_" or index_name[0].isdigit():
1326            index_name = f"D{index_name[:-1]}"
1327        return index_name
1328
1329    def _index_condition_sql(self, condition: str | None) -> str:
1330        if condition:
1331            return " WHERE " + condition
1332        return ""
1333
1334    def _index_include_sql(
1335        self, model: type[Model], columns: list[str] | None
1336    ) -> str | Statement:
1337        if not columns or not self.connection.features.supports_covering_indexes:
1338            return ""
1339        return Statement(
1340            " INCLUDE (%(columns)s)",
1341            columns=Columns(model.model_options.db_table, columns, self.quote_name),
1342        )
1343
1344    def _create_index_sql(
1345        self,
1346        model: type[Model],
1347        *,
1348        fields: list[Field] | None = None,
1349        name: str | None = None,
1350        suffix: str = "",
1351        using: str = "",
1352        col_suffixes: tuple[str, ...] = (),
1353        sql: str | None = None,
1354        opclasses: tuple[str, ...] = (),
1355        condition: str | None = None,
1356        include: list[str] | None = None,
1357        expressions: Any = None,
1358    ) -> Statement:
1359        """
1360        Return the SQL statement to create the index for one or several fields
1361        or expressions. `sql` can be specified if the syntax differs from the
1362        standard (GIS indexes, ...).
1363        """
1364        fields = fields or []
1365        expressions = expressions or []
1366        compiler = Query(model, alias_cols=False).get_compiler()
1367        columns = [field.column for field in fields]
1368        sql_create_index = sql or self.sql_create_index
1369        table = model.model_options.db_table
1370
1371        def create_index_name(*args: Any, **kwargs: Any) -> str:
1372            nonlocal name
1373            if name is None:
1374                name = self._create_index_name(*args, **kwargs)
1375            return self.quote_name(name)
1376
1377        return Statement(
1378            sql_create_index,
1379            table=Table(table, self.quote_name),
1380            name=IndexName(table, columns, suffix, create_index_name),
1381            using=using,
1382            columns=(
1383                self._index_columns(table, columns, col_suffixes, opclasses)
1384                if columns
1385                else Expressions(table, expressions, compiler, self.quote_value)
1386            ),
1387            extra="",
1388            condition=self._index_condition_sql(condition),
1389            include=self._index_include_sql(model, include),
1390        )
1391
1392    def _delete_index_sql(
1393        self, model: type[Model], name: str, sql: str | None = None
1394    ) -> Statement:
1395        return Statement(
1396            sql or self.sql_delete_index,
1397            table=Table(model.model_options.db_table, self.quote_name),
1398            name=self.quote_name(name),
1399        )
1400
1401    def _rename_index_sql(
1402        self, model: type[Model], old_name: str, new_name: str
1403    ) -> Statement:
1404        return Statement(
1405            self.sql_rename_index,
1406            table=Table(model.model_options.db_table, self.quote_name),
1407            old_name=self.quote_name(old_name),
1408            new_name=self.quote_name(new_name),
1409        )
1410
1411    def _index_columns(
1412        self,
1413        table: str,
1414        columns: list[str],
1415        col_suffixes: tuple[str, ...],
1416        opclasses: tuple[str, ...],
1417    ) -> Columns:
1418        return Columns(table, columns, self.quote_name, col_suffixes=col_suffixes)
1419
1420    def _model_indexes_sql(self, model: type[Model]) -> list[Statement | None]:
1421        """
1422        Return a list of all index SQL statements (field indexes, Meta.indexes) for the specified model.
1423        """
1424        output: list[Statement | None] = []
1425        for field in model._model_meta.local_fields:
1426            output.extend(self._field_indexes_sql(model, field))
1427
1428        for index in model.model_options.indexes:
1429            if (
1430                not index.contains_expressions
1431                or self.connection.features.supports_expression_indexes
1432            ):
1433                output.append(index.create_sql(model, self))
1434        return output
1435
1436    def _field_indexes_sql(self, model: type[Model], field: Field) -> list[Statement]:
1437        """
1438        Return a list of all index SQL statements for the specified field.
1439        """
1440        output: list[Statement] = []
1441        if self._field_should_be_indexed(model, field):
1442            output.append(self._create_index_sql(model, fields=[field]))
1443        return output
1444
1445    def _field_should_be_altered(
1446        self, old_field: Field, new_field: Field, ignore: set[str] | None = None
1447    ) -> bool:
1448        ignore = ignore or set()
1449        _, old_path, old_args, old_kwargs = old_field.deconstruct()
1450        _, new_path, new_args, new_kwargs = new_field.deconstruct()
1451        # Don't alter when:
1452        # - changing only a field name
1453        # - changing an attribute that doesn't affect the schema
1454        # - changing an attribute in the provided set of ignored attributes
1455        # - adding only a db_column and the column name is not changed
1456        for attr in ignore.union(old_field.non_db_attrs):
1457            old_kwargs.pop(attr, None)
1458        for attr in ignore.union(new_field.non_db_attrs):
1459            new_kwargs.pop(attr, None)
1460        return self.quote_name(old_field.column) != self.quote_name(
1461            new_field.column
1462        ) or (old_path, old_args, old_kwargs) != (new_path, new_args, new_kwargs)
1463
1464    def _field_should_be_indexed(self, model: type[Model], field: Field) -> bool:
1465        if isinstance(field, ForeignKeyField):
1466            return bool(field.remote_field) and field.db_index and not field.primary_key
1467        return False
1468
1469    def _field_became_primary_key(self, old_field: Field, new_field: Field) -> bool:
1470        return not old_field.primary_key and new_field.primary_key
1471
1472    def _rename_field_sql(
1473        self, table: str, old_field: Field, new_field: Field, new_type: str
1474    ) -> str:
1475        return self.sql_rename_column % {
1476            "table": self.quote_name(table),
1477            "old_column": self.quote_name(old_field.column),
1478            "new_column": self.quote_name(new_field.column),
1479            "type": new_type,
1480        }
1481
1482    def _create_fk_sql(
1483        self, model: type[Model], field: ForeignKeyField, suffix: str
1484    ) -> Statement:
1485        table = Table(model.model_options.db_table, self.quote_name)
1486        name = self._fk_constraint_name(model, field, suffix)
1487        column = Columns(model.model_options.db_table, [field.column], self.quote_name)
1488        to_table = Table(
1489            field.target_field.model.model_options.db_table, self.quote_name
1490        )
1491        to_column = Columns(
1492            field.target_field.model.model_options.db_table,
1493            [field.target_field.column],
1494            self.quote_name,
1495        )
1496        deferrable = self.connection.ops.deferrable_sql()
1497        return Statement(
1498            self.sql_create_fk,
1499            table=table,
1500            name=name,
1501            column=column,
1502            to_table=to_table,
1503            to_column=to_column,
1504            deferrable=deferrable,
1505        )
1506
1507    def _fk_constraint_name(
1508        self, model: type[Model], field: ForeignKeyField, suffix: str
1509    ) -> ForeignKeyName:
1510        def create_fk_name(*args: Any, **kwargs: Any) -> str:
1511            return self.quote_name(self._create_index_name(*args, **kwargs))
1512
1513        return ForeignKeyName(
1514            model.model_options.db_table,
1515            [field.column],
1516            split_identifier(field.target_field.model.model_options.db_table)[1],
1517            [field.target_field.column],
1518            suffix,
1519            create_fk_name,
1520        )
1521
1522    def _delete_fk_sql(self, model: type[Model], name: str) -> Statement:
1523        return self._delete_constraint_sql(self.sql_delete_fk, model, name)
1524
1525    def _deferrable_constraint_sql(self, deferrable: Deferrable | None) -> str:
1526        if deferrable is None:
1527            return ""
1528        if deferrable == Deferrable.DEFERRED:
1529            return " DEFERRABLE INITIALLY DEFERRED"
1530        if deferrable == Deferrable.IMMEDIATE:
1531            return " DEFERRABLE INITIALLY IMMEDIATE"
1532        return ""
1533
1534    def _unique_sql(
1535        self,
1536        model: type[Model],
1537        fields: Iterable[Field],
1538        name: str,
1539        condition: str | None = None,
1540        deferrable: Deferrable | None = None,
1541        include: list[str] | None = None,
1542        opclasses: tuple[str, ...] | None = None,
1543        expressions: Any = None,
1544    ) -> str | None:
1545        if (
1546            deferrable
1547            and not self.connection.features.supports_deferrable_unique_constraints
1548        ):
1549            return None
1550        if condition or include or opclasses or expressions:
1551            # Databases support conditional, covering, and functional unique
1552            # constraints via a unique index.
1553            sql = self._create_unique_sql(
1554                model,
1555                fields,
1556                name=name,
1557                condition=condition,
1558                include=include,
1559                opclasses=opclasses,
1560                expressions=expressions,
1561            )
1562            if sql:
1563                self.deferred_sql.append(sql)
1564            return None
1565        constraint = self.sql_unique_constraint % {
1566            "columns": ", ".join([self.quote_name(field.column) for field in fields]),
1567            "deferrable": self._deferrable_constraint_sql(deferrable),
1568        }
1569        return self.sql_constraint % {
1570            "name": self.quote_name(name),
1571            "constraint": constraint,
1572        }
1573
1574    def _create_unique_sql(
1575        self,
1576        model: type[Model],
1577        fields: Iterable[Field],
1578        name: str | None = None,
1579        condition: str | None = None,
1580        deferrable: Deferrable | None = None,
1581        include: list[str] | None = None,
1582        opclasses: tuple[str, ...] | None = None,
1583        expressions: Any = None,
1584    ) -> Statement | None:
1585        if (
1586            (
1587                deferrable
1588                and not self.connection.features.supports_deferrable_unique_constraints
1589            )
1590            or (condition and not self.connection.features.supports_partial_indexes)
1591            or (include and not self.connection.features.supports_covering_indexes)
1592            or (
1593                expressions and not self.connection.features.supports_expression_indexes
1594            )
1595        ):
1596            return None
1597
1598        compiler = Query(model, alias_cols=False).get_compiler()
1599        table = model.model_options.db_table
1600        columns = [field.column for field in fields]
1601        constraint_name: IndexName | str
1602        if name is None:
1603            constraint_name = self._unique_constraint_name(table, columns, quote=True)
1604        else:
1605            constraint_name = self.quote_name(name)
1606        if condition or include or opclasses or expressions:
1607            sql = self.sql_create_unique_index
1608        else:
1609            sql = self.sql_create_unique
1610        if columns:
1611            columns_obj: Columns | Expressions = self._index_columns(
1612                table, columns, col_suffixes=(), opclasses=opclasses or ()
1613            )
1614        else:
1615            columns_obj = Expressions(table, expressions, compiler, self.quote_value)
1616        return Statement(
1617            sql,
1618            table=Table(table, self.quote_name),
1619            name=constraint_name,
1620            columns=columns_obj,
1621            condition=self._index_condition_sql(condition),
1622            deferrable=self._deferrable_constraint_sql(deferrable),
1623            include=self._index_include_sql(model, include),
1624        )
1625
1626    def _unique_constraint_name(
1627        self, table: str, columns: list[str], quote: bool = True
1628    ) -> IndexName | str:
1629        if quote:
1630
1631            def create_unique_name(*args: Any, **kwargs: Any) -> str:
1632                return self.quote_name(self._create_index_name(*args, **kwargs))
1633
1634        else:
1635            create_unique_name = self._create_index_name
1636
1637        return IndexName(table, columns, "_uniq", create_unique_name)
1638
1639    def _delete_unique_sql(
1640        self,
1641        model: type[Model],
1642        name: str,
1643        condition: str | None = None,
1644        deferrable: Deferrable | None = None,
1645        include: list[str] | None = None,
1646        opclasses: tuple[str, ...] | None = None,
1647        expressions: Any = None,
1648    ) -> Statement | None:
1649        if (
1650            (
1651                deferrable
1652                and not self.connection.features.supports_deferrable_unique_constraints
1653            )
1654            or (condition and not self.connection.features.supports_partial_indexes)
1655            or (include and not self.connection.features.supports_covering_indexes)
1656            or (
1657                expressions and not self.connection.features.supports_expression_indexes
1658            )
1659        ):
1660            return None
1661        if condition or include or opclasses or expressions:
1662            sql = self.sql_delete_index
1663        else:
1664            sql = self.sql_delete_unique
1665        return self._delete_constraint_sql(sql, model, name)
1666
1667    def _check_sql(self, name: str, check: str) -> str:
1668        return self.sql_constraint % {
1669            "name": self.quote_name(name),
1670            "constraint": self.sql_check_constraint % {"check": check},
1671        }
1672
1673    def _create_check_sql(
1674        self, model: type[Model], name: str, check: str
1675    ) -> Statement | None:
1676        if not self.connection.features.supports_table_check_constraints:
1677            return None
1678        return Statement(
1679            self.sql_create_check,
1680            table=Table(model.model_options.db_table, self.quote_name),
1681            name=self.quote_name(name),
1682            check=check,
1683        )
1684
1685    def _delete_check_sql(self, model: type[Model], name: str) -> Statement | None:
1686        if not self.connection.features.supports_table_check_constraints:
1687            return None
1688        return self._delete_constraint_sql(self.sql_delete_check, model, name)
1689
1690    def _delete_constraint_sql(
1691        self, template: str, model: type[Model], name: str
1692    ) -> Statement:
1693        return Statement(
1694            template,
1695            table=Table(model.model_options.db_table, self.quote_name),
1696            name=self.quote_name(name),
1697        )
1698
1699    def _constraint_names(
1700        self,
1701        model: type[Model],
1702        column_names: list[str] | None = None,
1703        unique: bool | None = None,
1704        primary_key: bool | None = None,
1705        index: bool | None = None,
1706        foreign_key: bool | None = None,
1707        check: bool | None = None,
1708        type_: str | None = None,
1709        exclude: set[str] | None = None,
1710    ) -> list[str]:
1711        """Return all constraint names matching the columns and conditions."""
1712        if column_names is not None:
1713            column_names = [
1714                self.connection.introspection.identifier_converter(
1715                    truncate_name(name, self.connection.ops.max_name_length())
1716                )
1717                if self.connection.features.truncates_names
1718                else self.connection.introspection.identifier_converter(name)
1719                for name in column_names
1720            ]
1721        with self.connection.cursor() as cursor:
1722            constraints = self.connection.introspection.get_constraints(
1723                cursor, model.model_options.db_table
1724            )
1725        result: list[str] = []
1726        for name, infodict in constraints.items():
1727            if column_names is None or column_names == infodict["columns"]:
1728                if unique is not None and infodict["unique"] != unique:
1729                    continue
1730                if primary_key is not None and infodict["primary_key"] != primary_key:
1731                    continue
1732                if index is not None and infodict["index"] != index:
1733                    continue
1734                if check is not None and infodict["check"] != check:
1735                    continue
1736                if foreign_key is not None and not infodict["foreign_key"]:
1737                    continue
1738                if type_ is not None and infodict["type"] != type_:
1739                    continue
1740                if not exclude or name not in exclude:
1741                    result.append(name)
1742        return result
1743
1744    def _delete_primary_key(self, model: type[Model], strict: bool = False) -> None:
1745        constraint_names = self._constraint_names(model, primary_key=True)
1746        if strict and len(constraint_names) != 1:
1747            raise ValueError(
1748                f"Found wrong number ({len(constraint_names)}) of PK constraints for {model.model_options.db_table}"
1749            )
1750        for constraint_name in constraint_names:
1751            self.execute(self._delete_primary_key_sql(model, constraint_name))
1752
1753    def _create_primary_key_sql(self, model: type[Model], field: Field) -> Statement:
1754        return Statement(
1755            self.sql_create_pk,
1756            table=Table(model.model_options.db_table, self.quote_name),
1757            name=self.quote_name(
1758                self._create_index_name(
1759                    model.model_options.db_table, [field.column], suffix="_pk"
1760                )
1761            ),
1762            columns=Columns(
1763                model.model_options.db_table, [field.column], self.quote_name
1764            ),
1765        )
1766
1767    def _delete_primary_key_sql(self, model: type[Model], name: str) -> Statement:
1768        return self._delete_constraint_sql(self.sql_delete_pk, model, name)
1769
1770    def _collate_sql(
1771        self,
1772        collation: str | None,
1773        old_collation: str | None = None,
1774        table_name: str | None = None,
1775    ) -> str:
1776        return "COLLATE " + self.quote_name(collation) if collation else ""