1from __future__ import annotations
   2
   3import logging
   4import operator
   5from collections.abc import Callable, Generator
   6from copy import deepcopy
   7from datetime import datetime
   8from typing import TYPE_CHECKING, Any
   9
  10from psycopg import sql as psycopg_sql
  11
  12if TYPE_CHECKING:
  13    from typing import Self
  14
  15    from plain.models.sql.compiler import SQLCompiler
  16
  17from plain.models.constraints import Deferrable
  18from plain.models.fields import DbParameters, Field
  19from plain.models.fields.related import ForeignKeyField, RelatedField
  20from plain.models.fields.reverse_related import ForeignObjectRel, ManyToManyRel
  21from plain.models.indexes import Index
  22from plain.models.postgres.sql import (
  23    DATA_TYPE_CHECK_CONSTRAINTS,
  24    DATA_TYPES,
  25    DEFERRABLE_SQL,
  26    MAX_NAME_LENGTH,
  27    quote_name,
  28)
  29from plain.models.postgres.utils import names_digest, split_identifier, strip_quotes
  30from plain.models.sql import Query
  31from plain.models.transaction import atomic
  32from plain.utils import timezone
  33
  34if TYPE_CHECKING:
  35    from collections.abc import Iterable
  36
  37    from plain.models.base import Model
  38    from plain.models.constraints import BaseConstraint
  39    from plain.models.fields import Field
  40    from plain.models.fields.related import ForeignKeyField
  41    from plain.models.fields.reverse_related import ManyToManyRel
  42    from plain.models.postgres.wrapper import DatabaseWrapper
  43
  44logger = logging.getLogger("plain.models.postgres.schema")
  45
  46
  47# ##### DDL Reference classes (for deferred DDL statement manipulation) #####
  48
  49
  50class Table:
  51    """Hold a reference to a table."""
  52
  53    def __init__(self, table: str) -> None:
  54        self.table = table
  55
  56    def references_table(self, table: str) -> bool:
  57        return self.table == table
  58
  59    def references_column(self, table: str, column: str) -> bool:
  60        return False
  61
  62    def rename_table_references(self, old_table: str, new_table: str) -> None:
  63        if self.table == old_table:
  64            self.table = new_table
  65
  66    def rename_column_references(
  67        self, table: str, old_column: str, new_column: str
  68    ) -> None:
  69        pass
  70
  71    def __repr__(self) -> str:
  72        return f"<{self.__class__.__name__} {str(self)!r}>"
  73
  74    def __str__(self) -> str:
  75        return quote_name(self.table)
  76
  77
  78class TableColumns(Table):
  79    """Base class for references to multiple columns of a table."""
  80
  81    def __init__(self, table: str, columns: list[str]) -> None:
  82        self.table = table
  83        self.columns = columns
  84
  85    def references_column(self, table: str, column: str) -> bool:
  86        return self.table == table and column in self.columns
  87
  88    def rename_column_references(
  89        self, table: str, old_column: str, new_column: str
  90    ) -> None:
  91        if self.table == table:
  92            for index, column in enumerate(self.columns):
  93                if column == old_column:
  94                    self.columns[index] = new_column
  95
  96
  97class Columns(TableColumns):
  98    """Hold a reference to one or many columns."""
  99
 100    def __init__(
 101        self,
 102        table: str,
 103        columns: list[str],
 104        col_suffixes: tuple[str, ...] = (),
 105        opclasses: tuple[str, ...] = (),
 106    ) -> None:
 107        self.col_suffixes = col_suffixes
 108        self.opclasses = opclasses
 109        super().__init__(table, columns)
 110
 111    def __str__(self) -> str:
 112        def col_str(column: str, idx: int) -> str:
 113            col = quote_name(column)
 114            # If opclasses are provided, include them
 115            if self.opclasses:
 116                col = f"{col} {self.opclasses[idx]}"
 117            try:
 118                suffix = self.col_suffixes[idx]
 119                if suffix:
 120                    col = f"{col} {suffix}"
 121            except IndexError:
 122                pass
 123            return col
 124
 125        return ", ".join(
 126            col_str(column, idx) for idx, column in enumerate(self.columns)
 127        )
 128
 129
 130class IndexName(TableColumns):
 131    """Hold a reference to an index name."""
 132
 133    def __init__(
 134        self,
 135        table: str,
 136        columns: list[str],
 137        suffix: str,
 138        create_index_name: Callable[[str, list[str], str], str],
 139    ) -> None:
 140        self.suffix = suffix
 141        self.create_index_name = create_index_name
 142        super().__init__(table, columns)
 143
 144    def __str__(self) -> str:
 145        return self.create_index_name(self.table, self.columns, self.suffix)
 146
 147
 148class ForeignKeyName(TableColumns):
 149    """Hold a reference to a foreign key name."""
 150
 151    def __init__(
 152        self,
 153        from_table: str,
 154        from_columns: list[str],
 155        to_table: str,
 156        to_columns: list[str],
 157        suffix_template: str,
 158        create_fk_name: Callable[[str, list[str], str], str],
 159    ) -> None:
 160        self.to_reference = TableColumns(to_table, to_columns)
 161        self.suffix_template = suffix_template
 162        self.create_fk_name = create_fk_name
 163        super().__init__(
 164            from_table,
 165            from_columns,
 166        )
 167
 168    def references_table(self, table: str) -> bool:
 169        return super().references_table(table) or self.to_reference.references_table(
 170            table
 171        )
 172
 173    def references_column(self, table: str, column: str) -> bool:
 174        return super().references_column(
 175            table, column
 176        ) or self.to_reference.references_column(table, column)
 177
 178    def rename_table_references(self, old_table: str, new_table: str) -> None:
 179        super().rename_table_references(old_table, new_table)
 180        self.to_reference.rename_table_references(old_table, new_table)
 181
 182    def rename_column_references(
 183        self, table: str, old_column: str, new_column: str
 184    ) -> None:
 185        super().rename_column_references(table, old_column, new_column)
 186        self.to_reference.rename_column_references(table, old_column, new_column)
 187
 188    def __str__(self) -> str:
 189        suffix = self.suffix_template % {
 190            "to_table": self.to_reference.table,
 191            "to_column": self.to_reference.columns[0],
 192        }
 193        return self.create_fk_name(self.table, self.columns, suffix)
 194
 195
 196class Statement:
 197    """
 198    Statement template and formatting parameters container.
 199
 200    Allows keeping a reference to a statement without interpolating identifiers
 201    that might have to be adjusted if they're referencing a table or column
 202    that is removed
 203    """
 204
 205    def __init__(self, template: str, **parts: Any) -> None:
 206        self.template = template
 207        self.parts = parts
 208
 209    def references_table(self, table: str) -> bool:
 210        return any(
 211            hasattr(part, "references_table") and part.references_table(table)
 212            for part in self.parts.values()
 213        )
 214
 215    def references_column(self, table: str, column: str) -> bool:
 216        return any(
 217            hasattr(part, "references_column") and part.references_column(table, column)
 218            for part in self.parts.values()
 219        )
 220
 221    def rename_table_references(self, old_table: str, new_table: str) -> None:
 222        for part in self.parts.values():
 223            if hasattr(part, "rename_table_references"):
 224                part.rename_table_references(old_table, new_table)
 225
 226    def rename_column_references(
 227        self, table: str, old_column: str, new_column: str
 228    ) -> None:
 229        for part in self.parts.values():
 230            if hasattr(part, "rename_column_references"):
 231                part.rename_column_references(table, old_column, new_column)
 232
 233    def __repr__(self) -> str:
 234        return f"<{self.__class__.__name__} {str(self)!r}>"
 235
 236    def __str__(self) -> str:
 237        return self.template % self.parts
 238
 239
 240class Expressions(TableColumns):
 241    def __init__(
 242        self,
 243        table: str,
 244        expressions: Any,
 245        compiler: SQLCompiler,
 246        quote_value: Callable[[Any], str],
 247    ) -> None:
 248        self.compiler = compiler
 249        self.expressions = expressions
 250        self.quote_value = quote_value
 251        columns = [
 252            col.target.column
 253            for col in self.compiler.query._gen_cols(iter([self.expressions]))
 254        ]
 255        super().__init__(table, columns)
 256
 257    def rename_table_references(self, old_table: str, new_table: str) -> None:
 258        if self.table != old_table:
 259            return
 260        self.expressions = self.expressions.relabeled_clone({old_table: new_table})
 261        super().rename_table_references(old_table, new_table)
 262
 263    def rename_column_references(
 264        self, table: str, old_column: str, new_column: str
 265    ) -> None:
 266        if self.table != table:
 267            return
 268        expressions = deepcopy(self.expressions)
 269        self.columns = []
 270        for col in self.compiler.query._gen_cols(iter([expressions])):
 271            if col.target.column == old_column:
 272                col.target.column = new_column
 273            self.columns.append(col.target.column)
 274        self.expressions = expressions
 275
 276    def __str__(self) -> str:
 277        sql, params = self.compiler.compile(self.expressions)
 278        params = map(self.quote_value, params)
 279        return sql % tuple(params)
 280
 281
 282# ##### Schema helper functions #####
 283
 284
 285def _is_relevant_relation(relation: ForeignObjectRel, altered_field: Field) -> bool:
 286    """
 287    When altering the given field, must constraints on its model from the given
 288    relation be temporarily dropped?
 289    """
 290    from plain.models.fields.related import ManyToManyField
 291
 292    field = relation.field
 293    if isinstance(field, ManyToManyField):
 294        # M2M reverse field
 295        return False
 296    if altered_field.primary_key:
 297        # Foreign key constraint on the primary key, which is being altered.
 298        return True
 299    # ForeignKeyField always targets 'id'
 300    return altered_field.name == "id"
 301
 302
 303def _all_related_fields(model: type[Model]) -> list[ForeignObjectRel]:
 304    # Related fields must be returned in a deterministic order.
 305    return sorted(
 306        model._model_meta._get_fields(
 307            forward=False,
 308            reverse=True,
 309        ),
 310        key=operator.attrgetter("name"),
 311    )
 312
 313
 314def _related_non_m2m_objects(
 315    old_field: Field, new_field: Field
 316) -> Generator[tuple[ForeignObjectRel, ForeignObjectRel], None, None]:
 317    # Filter out m2m objects from reverse relations.
 318    # Return (old_relation, new_relation) tuples.
 319    related_fields = zip(
 320        (
 321            obj
 322            for obj in _all_related_fields(old_field.model)
 323            if _is_relevant_relation(obj, old_field)
 324        ),
 325        (
 326            obj
 327            for obj in _all_related_fields(new_field.model)
 328            if _is_relevant_relation(obj, new_field)
 329        ),
 330    )
 331    for old_rel, new_rel in related_fields:
 332        yield old_rel, new_rel
 333        yield from _related_non_m2m_objects(
 334            old_rel.remote_field,
 335            new_rel.remote_field,
 336        )
 337
 338
 339class DatabaseSchemaEditor:
 340    """
 341    Responsible for emitting schema-changing statements to PostgreSQL - model
 342    creation/removal/alteration, field renaming, index management, and so on.
 343    """
 344
 345    sql_create_table = "CREATE TABLE %(table)s (%(definition)s)"
 346    sql_rename_table = "ALTER TABLE %(old_table)s RENAME TO %(new_table)s"
 347    sql_delete_table = "DROP TABLE %(table)s CASCADE"
 348
 349    sql_create_column = "ALTER TABLE %(table)s ADD COLUMN %(column)s %(definition)s"
 350    sql_alter_column = "ALTER TABLE %(table)s %(changes)s"
 351    sql_alter_column_type = "ALTER COLUMN %(column)s TYPE %(type)s"
 352    sql_alter_column_null = "ALTER COLUMN %(column)s DROP NOT NULL"
 353    sql_alter_column_not_null = "ALTER COLUMN %(column)s SET NOT NULL"
 354    sql_alter_column_default = "ALTER COLUMN %(column)s SET DEFAULT %(default)s"
 355    sql_alter_column_no_default = "ALTER COLUMN %(column)s DROP DEFAULT"
 356    sql_delete_column = "ALTER TABLE %(table)s DROP COLUMN %(column)s CASCADE"
 357    sql_rename_column = (
 358        "ALTER TABLE %(table)s RENAME COLUMN %(old_column)s TO %(new_column)s"
 359    )
 360    # Setting all constraints to IMMEDIATE to allow changing data in the same transaction.
 361    sql_update_with_default = (
 362        "UPDATE %(table)s SET %(column)s = %(default)s WHERE %(column)s IS NULL"
 363        "; SET CONSTRAINTS ALL IMMEDIATE"
 364    )
 365
 366    sql_unique_constraint = "UNIQUE (%(columns)s)%(deferrable)s"
 367    sql_check_constraint = "CHECK (%(check)s)"
 368    sql_delete_constraint = "ALTER TABLE %(table)s DROP CONSTRAINT %(name)s"
 369    sql_constraint = "CONSTRAINT %(name)s %(constraint)s"
 370
 371    sql_create_check = "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s CHECK (%(check)s)"
 372    sql_delete_check = sql_delete_constraint
 373
 374    sql_create_unique = (
 375        "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s "
 376        "UNIQUE (%(columns)s)%(deferrable)s"
 377    )
 378    sql_delete_unique = sql_delete_constraint
 379
 380    sql_create_fk = (
 381        "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s FOREIGN KEY (%(column)s) "
 382        "REFERENCES %(to_table)s (%(to_column)s)%(deferrable)s"
 383    )
 384    # Setting the constraint to IMMEDIATE to allow changing data in the same transaction.
 385    sql_create_column_inline_fk = (
 386        "CONSTRAINT %(name)s REFERENCES %(to_table)s(%(to_column)s)%(deferrable)s"
 387        "; SET CONSTRAINTS %(namespace)s%(name)s IMMEDIATE"
 388    )
 389    # Setting the constraint to IMMEDIATE runs any deferred checks to allow
 390    # dropping it in the same transaction.
 391    sql_delete_fk = (
 392        "SET CONSTRAINTS %(name)s IMMEDIATE; "
 393        "ALTER TABLE %(table)s DROP CONSTRAINT %(name)s"
 394    )
 395
 396    sql_create_index = (
 397        "CREATE INDEX %(name)s ON %(table)s%(using)s "
 398        "(%(columns)s)%(include)s%(extra)s%(condition)s"
 399    )
 400    sql_create_index_concurrently = (
 401        "CREATE INDEX CONCURRENTLY %(name)s ON %(table)s%(using)s "
 402        "(%(columns)s)%(include)s%(extra)s%(condition)s"
 403    )
 404    sql_create_unique_index = (
 405        "CREATE UNIQUE INDEX %(name)s ON %(table)s "
 406        "(%(columns)s)%(include)s%(condition)s"
 407    )
 408    sql_rename_index = "ALTER INDEX %(old_name)s RENAME TO %(new_name)s"
 409    sql_delete_index = "DROP INDEX IF EXISTS %(name)s"
 410    sql_delete_index_concurrently = "DROP INDEX CONCURRENTLY IF EXISTS %(name)s"
 411
 412    sql_create_pk = (
 413        "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s PRIMARY KEY (%(columns)s)"
 414    )
 415    sql_delete_pk = sql_delete_constraint
 416
 417    # PostgreSQL IDENTITY column support
 418    sql_add_identity = (
 419        "ALTER TABLE %(table)s ALTER COLUMN %(column)s ADD "
 420        "GENERATED BY DEFAULT AS IDENTITY"
 421    )
 422    sql_drop_identity = (
 423        "ALTER TABLE %(table)s ALTER COLUMN %(column)s DROP IDENTITY IF EXISTS"
 424    )
 425    sql_alter_sequence_type = "ALTER SEQUENCE IF EXISTS %(sequence)s AS %(type)s"
 426    sql_delete_sequence = "DROP SEQUENCE IF EXISTS %(sequence)s CASCADE"
 427
 428    def __init__(
 429        self,
 430        connection: DatabaseWrapper,
 431        atomic: bool = True,
 432        collect_sql: bool = False,
 433    ):
 434        self.connection = connection
 435        self.collect_sql = collect_sql
 436        self.atomic_migration = atomic and not collect_sql
 437
 438    # State-managing methods
 439
 440    def __enter__(self) -> Self:
 441        self.deferred_sql: list[Any] = []
 442        self.executed_sql: list[str] = []
 443        if self.atomic_migration:
 444            self.atomic = atomic()
 445            self.atomic.__enter__()
 446        return self
 447
 448    def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
 449        if exc_type is None:
 450            for sql in self.deferred_sql:
 451                self.execute(sql)
 452        if self.atomic_migration:
 453            self.atomic.__exit__(exc_type, exc_value, traceback)
 454        self.deferred_sql.clear()
 455
 456    # Core utility functions
 457
 458    def execute(
 459        self, sql: str | Statement, params: tuple[Any, ...] | list[Any] | None = ()
 460    ) -> None:
 461        """Execute the given SQL statement, with optional parameters."""
 462        # Account for non-string statement objects.
 463        sql_str = str(sql)
 464
 465        # Merge the query client-side, as PostgreSQL won't do it server-side.
 466        if params is not None:
 467            sql_str = self.connection.compose_sql(sql_str, params)
 468            params = None
 469
 470        # Log the command we're running, then run it
 471        logger.debug(
 472            "%s; (params %r)", sql_str, params, extra={"params": params, "sql": sql_str}
 473        )
 474
 475        # Track executed SQL for display in migration output
 476        self.executed_sql.append(sql_str)
 477
 478        if self.collect_sql:
 479            return
 480
 481        with self.connection.cursor() as cursor:
 482            cursor.execute(sql_str, params)
 483
 484    def quote_value(self, value: Any) -> str:
 485        """
 486        Return a quoted version of the value so it's safe to use in an SQL
 487        string. This is not safe against injection from user code; it is
 488        intended only for use in making SQL scripts or preparing default values
 489        (which are not user-defined, so this is safe).
 490        """
 491        if isinstance(value, str):
 492            value = value.replace("%", "%%")
 493        return psycopg_sql.quote(value, self.connection.connection)
 494
 495    def table_sql(self, model: type[Model]) -> tuple[str, list[Any]]:
 496        """Take a model and return its table definition."""
 497        # Create column SQL, add FK deferreds if needed.
 498        column_sqls = []
 499        params = []
 500        for field in model._model_meta.local_fields:
 501            # SQL.
 502            definition, extra_params = self.column_sql(model, field)
 503            if definition is None:
 504                continue
 505            # Check constraints can go on the column SQL here.
 506            db_params = field.db_parameters()
 507            if db_params["check"]:
 508                definition += " " + self.sql_check_constraint % db_params
 509            # Autoincrement SQL (e.g. GENERATED BY DEFAULT AS IDENTITY).
 510            col_type_suffix = field.db_type_suffix()
 511            if col_type_suffix:
 512                definition += f" {col_type_suffix}"
 513            if extra_params:
 514                params.extend(extra_params)
 515            # PostgreSQL creates FK constraints via deferred ALTER TABLE
 516            if isinstance(field, ForeignKeyField) and field.db_constraint:
 517                self.deferred_sql.append(
 518                    self._create_fk_sql(model, field, "_fk_%(to_table)s_%(to_column)s")
 519                )
 520            # Add the SQL to our big list.
 521            column_sqls.append(f"{quote_name(field.column)} {definition}")
 522        constraints = [
 523            constraint.constraint_sql(model, self)
 524            for constraint in model.model_options.constraints
 525        ]
 526        sql = self.sql_create_table % {
 527            "table": quote_name(model.model_options.db_table),
 528            "definition": ", ".join(
 529                str(constraint)
 530                for constraint in (*column_sqls, *constraints)
 531                if constraint
 532            ),
 533        }
 534        return sql, params
 535
 536    # Field <-> database mapping functions
 537
 538    def _iter_column_sql(
 539        self,
 540        column_db_type: str,
 541        params: list[Any],
 542        model: type[Model],
 543        field: Field,
 544        field_db_params: DbParameters,
 545        include_default: bool,
 546    ) -> Generator[str, None, None]:
 547        yield column_db_type
 548        # Work out nullability.
 549        null = field.allow_null
 550        # Include a default value, if requested.
 551        if include_default:
 552            default_value = self.effective_default(field)
 553            if default_value is not None:
 554                yield "DEFAULT %s"
 555                params.append(default_value)
 556
 557        if not null:
 558            yield "NOT NULL"
 559        else:
 560            yield "NULL"
 561
 562        if field.primary_key:
 563            yield "PRIMARY KEY"
 564
 565    def column_sql(
 566        self, model: type[Model], field: Field, include_default: bool = False
 567    ) -> tuple[str | None, list[Any] | None]:
 568        """
 569        Return the column definition for a field. The field must already have
 570        had set_attributes_from_name() called.
 571        """
 572        # Get the column's type and use that as the basis of the SQL.
 573        field_db_params = field.db_parameters()
 574        column_db_type = field_db_params["type"]
 575        # Check for fields that aren't actually columns (e.g. M2M).
 576        if column_db_type is None:
 577            return None, None
 578        params: list[Any] = []
 579        return (
 580            " ".join(
 581                # This appends to the params being returned.
 582                self._iter_column_sql(
 583                    column_db_type,
 584                    params,
 585                    model,
 586                    field,
 587                    field_db_params,
 588                    include_default,
 589                )
 590            ),
 591            params,
 592        )
 593
 594    @staticmethod
 595    def _effective_default(field: Field) -> Any:
 596        # This method allows testing its logic without a connection.
 597        if field.has_default():
 598            default = field.get_default()
 599        elif (
 600            not field.allow_null and not field.required and field.empty_strings_allowed
 601        ):
 602            if field.get_internal_type() == "BinaryField":
 603                default = b""
 604            else:
 605                default = ""
 606        elif getattr(field, "auto_now", False) or getattr(field, "auto_now_add", False):
 607            internal_type = field.get_internal_type()
 608            if internal_type == "DateTimeField":
 609                default = timezone.now()
 610            else:
 611                default = datetime.now()
 612                if internal_type == "DateField":
 613                    default = default.date()
 614                elif internal_type == "TimeField":
 615                    default = default.time()
 616        else:
 617            default = None
 618        return default
 619
 620    def effective_default(self, field: Field) -> Any:
 621        """Return a field's effective database default value."""
 622        return field.get_db_prep_save(self._effective_default(field), self.connection)
 623
 624    # Actions
 625
 626    def create_model(self, model: type[Model]) -> None:
 627        """
 628        Create a table and any accompanying indexes or unique constraints for
 629        the given `model`.
 630        """
 631        sql, params = self.table_sql(model)
 632        # Prevent using [] as params, in the case a literal '%' is used in the
 633        # definition.
 634        self.execute(sql, params or None)
 635
 636        # Add any field indexes.
 637        self.deferred_sql.extend(self._model_indexes_sql(model))
 638
 639    def delete_model(self, model: type[Model]) -> None:
 640        """Delete a model from the database."""
 641
 642        # Delete the table
 643        self.execute(
 644            self.sql_delete_table
 645            % {
 646                "table": quote_name(model.model_options.db_table),
 647            }
 648        )
 649        # Remove all deferred statements referencing the deleted table.
 650        for sql in list(self.deferred_sql):
 651            if isinstance(sql, Statement) and sql.references_table(
 652                model.model_options.db_table
 653            ):
 654                self.deferred_sql.remove(sql)
 655
 656    def add_index(
 657        self, model: type[Model], index: Index, concurrently: bool = False
 658    ) -> None:
 659        """Add an index on a model."""
 660        self.execute(
 661            index.create_sql(model, self, concurrently=concurrently), params=None
 662        )
 663
 664    def remove_index(
 665        self, model: type[Model], index: Index, concurrently: bool = False
 666    ) -> None:
 667        """Remove an index from a model."""
 668        self.execute(index.remove_sql(model, self, concurrently=concurrently))
 669
 670    def rename_index(
 671        self, model: type[Model], old_index: Index, new_index: Index
 672    ) -> None:
 673        self.execute(
 674            self._rename_index_sql(model, old_index.name, new_index.name),
 675            params=None,
 676        )
 677
 678    def add_constraint(self, model: type[Model], constraint: BaseConstraint) -> None:
 679        """Add a constraint to a model."""
 680        sql = constraint.create_sql(model, self)
 681        if sql:
 682            # Constraint.create_sql returns interpolated SQL which makes
 683            # params=None a necessity to avoid escaping attempts on execution.
 684            self.execute(sql, params=None)
 685
 686    def remove_constraint(self, model: type[Model], constraint: BaseConstraint) -> None:
 687        """Remove a constraint from a model."""
 688        sql = constraint.remove_sql(model, self)
 689        if sql:
 690            self.execute(sql)
 691
 692    def alter_db_table(
 693        self, model: type[Model], old_db_table: str, new_db_table: str
 694    ) -> None:
 695        """Rename the table a model points to."""
 696        if old_db_table == new_db_table:
 697            return
 698        self.execute(
 699            self.sql_rename_table
 700            % {
 701                "old_table": quote_name(old_db_table),
 702                "new_table": quote_name(new_db_table),
 703            }
 704        )
 705        # Rename all references to the old table name.
 706        for sql in self.deferred_sql:
 707            if isinstance(sql, Statement):
 708                sql.rename_table_references(old_db_table, new_db_table)
 709
 710    def add_field(self, model: type[Model], field: Field) -> None:
 711        """
 712        Create a field on a model. Usually involves adding a column, but may
 713        involve adding a table instead (for M2M fields).
 714        """
 715        # Get the column's definition
 716        definition, params = self.column_sql(model, field, include_default=True)
 717        # It might not actually have a column behind it
 718        if definition is None:
 719            return
 720        if col_type_suffix := field.db_type_suffix():
 721            definition += f" {col_type_suffix}"
 722        # Check constraints can go on the column SQL here
 723        db_params = field.db_parameters()
 724        if db_params["check"]:
 725            definition += " " + self.sql_check_constraint % db_params
 726        if isinstance(field, ForeignKeyField) and field.db_constraint:
 727            # Add FK constraint inline (PostgreSQL always supports this).
 728            constraint_suffix = "_fk_%(to_table)s_%(to_column)s"
 729            to_table = field.remote_field.model.model_options.db_table
 730            field_name = field.remote_field.field_name
 731            if field_name is None:
 732                raise ValueError("Foreign key field_name cannot be None")
 733            to_field = field.remote_field.model._model_meta.get_forward_field(
 734                field_name
 735            )
 736            to_column = to_field.column
 737            namespace, _ = split_identifier(model.model_options.db_table)
 738            definition += " " + self.sql_create_column_inline_fk % {
 739                "name": self._fk_constraint_name(model, field, constraint_suffix),
 740                "namespace": f"{quote_name(namespace)}." if namespace else "",
 741                "column": quote_name(field.column),
 742                "to_table": quote_name(to_table),
 743                "to_column": quote_name(to_column),
 744                "deferrable": DEFERRABLE_SQL,
 745            }
 746        # Build the SQL and run it
 747        sql = self.sql_create_column % {
 748            "table": quote_name(model.model_options.db_table),
 749            "column": quote_name(field.column),
 750            "definition": definition,
 751        }
 752        self.execute(sql, params)
 753        # Drop the default if we need to
 754        # (Plain usually does not use in-database defaults)
 755        if self.effective_default(field) is not None:
 756            changes_sql, params = self._alter_column_default_sql(
 757                model, None, field, drop=True
 758            )
 759            sql = self.sql_alter_column % {
 760                "table": quote_name(model.model_options.db_table),
 761                "changes": changes_sql,
 762            }
 763            self.execute(sql, params)
 764        # Add an index, if required
 765        self.deferred_sql.extend(self._field_indexes_sql(model, field))
 766
 767    def remove_field(self, model: type[Model], field: Field) -> None:
 768        """
 769        Remove a field from a model. Usually involves deleting a column,
 770        but for M2Ms may involve deleting a table.
 771        """
 772        # It might not actually have a column behind it
 773        if field.db_parameters()["type"] is None:
 774            return
 775        # Drop any FK constraints
 776        if isinstance(field, RelatedField):
 777            fk_names = self._constraint_names(model, [field.column], foreign_key=True)
 778            for fk_name in fk_names:
 779                self.execute(
 780                    self._delete_constraint_sql(self.sql_delete_fk, model, fk_name)
 781                )
 782        # Delete the column
 783        sql = self.sql_delete_column % {
 784            "table": quote_name(model.model_options.db_table),
 785            "column": quote_name(field.column),
 786        }
 787        self.execute(sql)
 788        # Remove all deferred statements referencing the deleted column.
 789        for sql in list(self.deferred_sql):
 790            if isinstance(sql, Statement) and sql.references_column(
 791                model.model_options.db_table, field.column
 792            ):
 793                self.deferred_sql.remove(sql)
 794
 795    def alter_field(
 796        self,
 797        model: type[Model],
 798        old_field: Field,
 799        new_field: Field,
 800        strict: bool = False,
 801    ) -> None:
 802        """
 803        Allow a field's type, uniqueness, nullability, default, column,
 804        constraints, etc. to be modified.
 805        `old_field` is required to compute the necessary changes.
 806        If `strict` is True, raise errors if the old column does not match
 807        `old_field` precisely.
 808        """
 809        if not self._field_should_be_altered(old_field, new_field):
 810            return
 811        # Ensure this field is even column-based
 812        old_db_params = old_field.db_parameters()
 813        old_type = old_db_params["type"]
 814        new_db_params = new_field.db_parameters()
 815        new_type = new_db_params["type"]
 816        if (old_type is None and not isinstance(old_field, RelatedField)) or (
 817            new_type is None and not isinstance(new_field, RelatedField)
 818        ):
 819            raise ValueError(
 820                f"Cannot alter field {old_field} into {new_field} - they do not properly define "
 821                "db_type (are you using a badly-written custom field?)",
 822            )
 823        elif (
 824            old_type is None
 825            and new_type is None
 826            and isinstance(old_field, RelatedField)
 827            and isinstance(old_field.remote_field, ManyToManyRel)
 828            and isinstance(new_field, RelatedField)
 829            and isinstance(new_field.remote_field, ManyToManyRel)
 830        ):
 831            # Both sides have through models; this is a no-op.
 832            return
 833        elif old_type is None or new_type is None:
 834            raise ValueError(
 835                f"Cannot alter field {old_field} into {new_field} - they are not compatible types "
 836                "(you cannot alter to or from M2M fields, or add or remove "
 837                "through= on M2M fields)"
 838            )
 839
 840        self._alter_field(
 841            model,
 842            old_field,
 843            new_field,
 844            old_type,
 845            new_type,
 846            old_db_params,
 847            new_db_params,
 848            strict,
 849        )
 850
 851    def _field_db_check(
 852        self, field: Field, field_db_params: DbParameters
 853    ) -> str | None:
 854        # Always check constraints with the same mocked column name to avoid
 855        # recreating constrains when the column is renamed.
 856        data = field.db_type_parameters()
 857        data["column"] = "__column_name__"
 858        try:
 859            return DATA_TYPE_CHECK_CONSTRAINTS[field.get_internal_type()] % data
 860        except KeyError:
 861            return None
 862
 863    def _field_data_type(
 864        self, field: Field
 865    ) -> str | None | Callable[[dict[str, Any]], str]:
 866        if isinstance(field, RelatedField):
 867            return field.rel_db_type()
 868        return DATA_TYPES.get(
 869            field.get_internal_type(),
 870            field.db_type(),
 871        )
 872
 873    def _get_sequence_name(self, table: str, column: str) -> str | None:
 874        with self.connection.cursor() as cursor:
 875            for sequence in self.connection.get_sequences(cursor, table):
 876                if sequence["column"] == column:
 877                    return sequence["name"]
 878        return None
 879
 880    def _alter_field(
 881        self,
 882        model: type[Model],
 883        old_field: Field,
 884        new_field: Field,
 885        old_type: str,
 886        new_type: str,
 887        old_db_params: DbParameters,
 888        new_db_params: DbParameters,
 889        strict: bool = False,
 890    ) -> None:
 891        """Perform a "physical" (non-ManyToMany) field update."""
 892        # Drop any FK constraints, we'll remake them later
 893        fks_dropped = set()
 894        if (
 895            isinstance(old_field, ForeignKeyField)
 896            and old_field.db_constraint
 897            and self._field_should_be_altered(
 898                old_field,
 899                new_field,
 900            )
 901        ):
 902            fk_names = self._constraint_names(
 903                model, [old_field.column], foreign_key=True
 904            )
 905            if strict and len(fk_names) != 1:
 906                raise ValueError(
 907                    f"Found wrong number ({len(fk_names)}) of foreign key constraints for {model.model_options.db_table}.{old_field.column}"
 908                )
 909            for fk_name in fk_names:
 910                fks_dropped.add((old_field.column,))
 911                self.execute(
 912                    self._delete_constraint_sql(self.sql_delete_fk, model, fk_name)
 913                )
 914        # Has unique been removed?
 915        if old_field.primary_key and (
 916            not new_field.primary_key
 917            or self._field_became_primary_key(old_field, new_field)
 918        ):
 919            # Find the unique constraint for this field
 920            meta_constraint_names = {
 921                constraint.name for constraint in model.model_options.constraints
 922            }
 923            constraint_names = self._constraint_names(
 924                model,
 925                [old_field.column],
 926                unique=True,
 927                primary_key=False,
 928                exclude=meta_constraint_names,
 929            )
 930            if strict and len(constraint_names) != 1:
 931                raise ValueError(
 932                    f"Found wrong number ({len(constraint_names)}) of unique constraints for {model.model_options.db_table}.{old_field.column}"
 933                )
 934            for constraint_name in constraint_names:
 935                sql = self._delete_unique_sql(model, constraint_name)
 936                if sql is not None:
 937                    self.execute(sql)
 938        # Drop incoming FK constraints if the field is a primary key or unique,
 939        # which might be a to_field target, and things are going to change.
 940        drop_foreign_keys = (old_field.primary_key and new_field.primary_key) and (
 941            old_type != new_type
 942        )
 943        if drop_foreign_keys:
 944            # '_model_meta.related_field' also contains M2M reverse fields, these
 945            # will be filtered out
 946            for _old_rel, new_rel in _related_non_m2m_objects(old_field, new_field):
 947                rel_fk_names = self._constraint_names(
 948                    new_rel.related_model, [new_rel.field.column], foreign_key=True
 949                )
 950                for fk_name in rel_fk_names:
 951                    self.execute(
 952                        self._delete_constraint_sql(
 953                            self.sql_delete_fk, new_rel.related_model, fk_name
 954                        )
 955                    )
 956        # Removed an index? (no strict check, as multiple indexes are possible)
 957        # Remove indexes if db_index switched to False or a unique constraint
 958        # will now be used in lieu of an index. The following lines from the
 959        # truth table show all True cases; the rest are False:
 960        #
 961        # old_field.db_index | old_field.primary_key | new_field.db_index | new_field.primary_key
 962        # ------------------------------------------------------------------------------
 963        # True               | False            | False              | False
 964        # True               | False            | False              | True
 965        # True               | False            | True               | True
 966        if (
 967            isinstance(old_field, ForeignKeyField)
 968            and old_field.db_index
 969            and not old_field.primary_key
 970            and (
 971                not (isinstance(new_field, ForeignKeyField) and new_field.db_index)
 972                or new_field.primary_key
 973            )
 974        ):
 975            # Find the index for this field
 976            meta_index_names = {index.name for index in model.model_options.indexes}
 977            # Retrieve only BTREE indexes since this is what's created with
 978            # db_index=True.
 979            index_names = self._constraint_names(
 980                model,
 981                [old_field.column],
 982                index=True,
 983                type_=Index.suffix,
 984                exclude=meta_index_names,
 985            )
 986            for index_name in index_names:
 987                # The only way to check if an index was created with
 988                # db_index=True or with Index(['field'], name='foo')
 989                # is to look at its name (refs #28053).
 990                self.execute(self._delete_index_sql(model, index_name))
 991        # Change check constraints?
 992        old_db_check = self._field_db_check(old_field, old_db_params)
 993        new_db_check = self._field_db_check(new_field, new_db_params)
 994        if old_db_check != new_db_check and old_db_check:
 995            meta_constraint_names = {
 996                constraint.name for constraint in model.model_options.constraints
 997            }
 998            constraint_names = self._constraint_names(
 999                model,
1000                [old_field.column],
1001                check=True,
1002                exclude=meta_constraint_names,
1003            )
1004            if strict and len(constraint_names) != 1:
1005                raise ValueError(
1006                    f"Found wrong number ({len(constraint_names)}) of check constraints for {model.model_options.db_table}.{old_field.column}"
1007                )
1008            for constraint_name in constraint_names:
1009                sql = self._delete_constraint_sql(
1010                    self.sql_delete_check, model, constraint_name
1011                )
1012                if sql is not None:
1013                    self.execute(sql)
1014        # Have they renamed the column?
1015        if old_field.column != new_field.column:
1016            self.execute(
1017                self._rename_field_sql(
1018                    model.model_options.db_table, old_field, new_field, new_type
1019                )
1020            )
1021            # Rename all references to the renamed column.
1022            for sql in self.deferred_sql:
1023                if isinstance(sql, Statement):
1024                    sql.rename_column_references(
1025                        model.model_options.db_table, old_field.column, new_field.column
1026                    )
1027        # Next, start accumulating actions to do
1028        actions = []
1029        null_actions = []
1030        post_actions = []
1031        # Type suffix change? (e.g. auto increment).
1032        old_type_suffix = old_field.db_type_suffix()
1033        new_type_suffix = new_field.db_type_suffix()
1034        # Type change?
1035        if old_type != new_type or old_type_suffix != new_type_suffix:
1036            fragment, other_actions = self._alter_column_type_sql(
1037                model, old_field, new_field, new_type
1038            )
1039            actions.append(fragment)
1040            post_actions.extend(other_actions)
1041        # When changing a column NULL constraint to NOT NULL with a given
1042        # default value, we need to perform 4 steps:
1043        #  1. Add a default for new incoming writes
1044        #  2. Update existing NULL rows with new default
1045        #  3. Replace NULL constraint with NOT NULL
1046        #  4. Drop the default again.
1047        # Default change?
1048        needs_database_default = False
1049        if old_field.allow_null and not new_field.allow_null:
1050            old_default = self.effective_default(old_field)
1051            new_default = self.effective_default(new_field)
1052            if old_default != new_default and new_default is not None:
1053                needs_database_default = True
1054                actions.append(
1055                    self._alter_column_default_sql(model, old_field, new_field)
1056                )
1057        # Nullability change?
1058        if old_field.allow_null != new_field.allow_null:
1059            fragment = self._alter_column_null_sql(model, old_field, new_field)
1060            if fragment:
1061                null_actions.append(fragment)
1062        # Only if we have a default and there is a change from NULL to NOT NULL
1063        four_way_default_alteration = new_field.has_default() and (
1064            old_field.allow_null and not new_field.allow_null
1065        )
1066        if actions or null_actions:
1067            if not four_way_default_alteration:
1068                # If we don't have to do a 4-way default alteration we can
1069                # directly run a (NOT) NULL alteration
1070                actions += null_actions
1071            # Combine actions together
1072            if actions:
1073                sql, params = tuple(zip(*actions))
1074                actions = [(", ".join(sql), sum(params, []))]
1075            # Apply those actions
1076            for sql, params in actions:
1077                self.execute(
1078                    self.sql_alter_column
1079                    % {
1080                        "table": quote_name(model.model_options.db_table),
1081                        "changes": sql,
1082                    },
1083                    params,
1084                )
1085            if four_way_default_alteration:
1086                # Update existing rows with default value
1087                self.execute(
1088                    self.sql_update_with_default
1089                    % {
1090                        "table": quote_name(model.model_options.db_table),
1091                        "column": quote_name(new_field.column),
1092                        "default": "%s",
1093                    },
1094                    [new_default],
1095                )
1096                # Since we didn't run a NOT NULL change before we need to do it
1097                # now
1098                for sql, params in null_actions:
1099                    self.execute(
1100                        self.sql_alter_column
1101                        % {
1102                            "table": quote_name(model.model_options.db_table),
1103                            "changes": sql,
1104                        },
1105                        params,
1106                    )
1107        if post_actions:
1108            for sql, params in post_actions:
1109                self.execute(sql, params)
1110        # If primary_key changed to False, delete the primary key constraint.
1111        if old_field.primary_key and not new_field.primary_key:
1112            self._delete_primary_key(model, strict)
1113
1114        # Added an index? Add an index if db_index switched to True or a unique
1115        # constraint will no longer be used in lieu of an index. The following
1116        # lines from the truth table show all True cases; the rest are False:
1117        #
1118        # old_field.db_index | old_field.primary_key | new_field.db_index | new_field.primary_key
1119        # ------------------------------------------------------------------------------
1120        # False              | False            | True               | False
1121        # False              | True             | True               | False
1122        # True               | True             | True               | False
1123        if (
1124            (
1125                not (isinstance(old_field, ForeignKeyField) and old_field.db_index)
1126                or old_field.primary_key
1127            )
1128            and isinstance(new_field, ForeignKeyField)
1129            and new_field.db_index
1130            and not new_field.primary_key
1131        ):
1132            self.execute(self._create_index_sql(model, fields=[new_field]))
1133        # Type alteration on primary key? Then we need to alter the column
1134        # referring to us.
1135        rels_to_update = []
1136        if drop_foreign_keys:
1137            rels_to_update.extend(_related_non_m2m_objects(old_field, new_field))
1138        # Changed to become primary key?
1139        if self._field_became_primary_key(old_field, new_field):
1140            # Make the new one
1141            self.execute(self._create_primary_key_sql(model, new_field))
1142            # Update all referencing columns
1143            rels_to_update.extend(_related_non_m2m_objects(old_field, new_field))
1144        # Handle our type alters on the other end of rels from the PK stuff above
1145        for old_rel, new_rel in rels_to_update:
1146            rel_db_params = new_rel.field.db_parameters()
1147            rel_type = rel_db_params["type"]
1148            fragment, other_actions = self._alter_column_type_sql(
1149                new_rel.related_model,
1150                old_rel.field,
1151                new_rel.field,
1152                rel_type,
1153            )
1154            self.execute(
1155                self.sql_alter_column
1156                % {
1157                    "table": quote_name(new_rel.related_model.model_options.db_table),
1158                    "changes": fragment[0],
1159                },
1160                fragment[1],
1161            )
1162            for sql, params in other_actions:
1163                self.execute(sql, params)
1164        # Does it have a foreign key?
1165        if (
1166            isinstance(new_field, ForeignKeyField)
1167            and (
1168                fks_dropped
1169                or not isinstance(old_field, ForeignKeyField)
1170                or not old_field.db_constraint
1171            )
1172            and new_field.db_constraint
1173        ):
1174            self.execute(
1175                self._create_fk_sql(model, new_field, "_fk_%(to_table)s_%(to_column)s")
1176            )
1177        # Rebuild FKs that pointed to us if we previously had to drop them
1178        if drop_foreign_keys:
1179            for _, rel in rels_to_update:
1180                if isinstance(rel.field, ForeignKeyField) and rel.field.db_constraint:
1181                    self.execute(
1182                        self._create_fk_sql(rel.related_model, rel.field, "_fk")
1183                    )
1184        # Does it have check constraints we need to add?
1185        if old_db_check != new_db_check and new_db_check:
1186            constraint_name = self._create_index_name(
1187                model.model_options.db_table, [new_field.column], suffix="_check"
1188            )
1189            new_check = new_db_params["check"]
1190            assert new_check is not None  # Guaranteed by new_db_check check above
1191            sql = self._create_check_sql(model, constraint_name, new_check)
1192            if sql is not None:
1193                self.execute(sql)
1194        # Drop the default if we need to
1195        # (Plain usually does not use in-database defaults)
1196        if needs_database_default:
1197            changes_sql, params = self._alter_column_default_sql(
1198                model, old_field, new_field, drop=True
1199            )
1200            sql = self.sql_alter_column % {
1201                "table": quote_name(model.model_options.db_table),
1202                "changes": changes_sql,
1203            }
1204            self.execute(sql, params)
1205
1206        # Added an index? Create any PostgreSQL-specific indexes.
1207        if (
1208            not (
1209                (isinstance(old_field, ForeignKeyField) and old_field.db_index)
1210                or old_field.primary_key
1211            )
1212            and isinstance(new_field, ForeignKeyField)
1213            and new_field.db_index
1214        ) or (not old_field.primary_key and new_field.primary_key):
1215            like_index_statement = self._create_like_index_sql(model, new_field)
1216            if like_index_statement is not None:
1217                self.execute(like_index_statement)
1218
1219        # Removed an index? Drop any PostgreSQL-specific indexes.
1220        if old_field.primary_key and not (
1221            (isinstance(new_field, ForeignKeyField) and new_field.db_index)
1222            or new_field.primary_key
1223        ):
1224            index_to_remove = self._create_index_name(
1225                model.model_options.db_table, [old_field.column], suffix="_like"
1226            )
1227            self.execute(self._delete_index_sql(model, index_to_remove))
1228
1229    def _alter_column_null_sql(
1230        self, model: type[Model], old_field: Field, new_field: Field
1231    ) -> tuple[str, list[Any]]:
1232        """
1233        Return a (sql, params) fragment to set a column to null or non-null
1234        as required by new_field.
1235        """
1236        new_db_params = new_field.db_parameters()
1237        sql = (
1238            self.sql_alter_column_null
1239            if new_field.allow_null
1240            else self.sql_alter_column_not_null
1241        )
1242        return (
1243            sql
1244            % {
1245                "column": quote_name(new_field.column),
1246                "type": new_db_params["type"],
1247            },
1248            [],
1249        )
1250
1251    def _alter_column_default_sql(
1252        self,
1253        model: type[Model],
1254        old_field: Field | None,
1255        new_field: Field,
1256        drop: bool = False,
1257    ) -> tuple[str, list[Any]]:
1258        """
1259        Return a (sql, params) fragment to add or drop (depending on the drop
1260        argument) a default to new_field's column.
1261        """
1262        new_default = self.effective_default(new_field)
1263        params: list[Any] = [] if drop else [new_default]
1264
1265        new_db_params = new_field.db_parameters()
1266        if drop:
1267            # PostgreSQL uses the same SQL for nullable and non-nullable columns
1268            sql = self.sql_alter_column_no_default
1269        else:
1270            sql = self.sql_alter_column_default
1271        return (
1272            sql
1273            % {
1274                "column": quote_name(new_field.column),
1275                "type": new_db_params["type"],
1276                "default": "%s",
1277            },
1278            params,
1279        )
1280
1281    def _alter_column_type_sql(
1282        self,
1283        model: type[Model],
1284        old_field: Field,
1285        new_field: Field,
1286        new_type: str,
1287    ) -> tuple[tuple[str, list[Any]], list[tuple[str, list[Any]]]]:
1288        """
1289        Return a two-tuple of: an SQL fragment of (sql, params) to insert into
1290        an ALTER TABLE statement and a list of extra (sql, params) tuples to
1291        run once the field is altered. Handles IDENTITY column transitions.
1292        """
1293        # Drop indexes on varchar/text/citext columns that are changing to a
1294        # different type.
1295        old_db_params = old_field.db_parameters()
1296        old_type = old_db_params["type"]
1297        assert old_type is not None, "old_type cannot be None for primary key field"
1298        if old_field.primary_key and (
1299            (old_type.startswith("varchar") and not new_type.startswith("varchar"))
1300            or (old_type.startswith("text") and not new_type.startswith("text"))
1301            or (old_type.startswith("citext") and not new_type.startswith("citext"))
1302        ):
1303            index_name = self._create_index_name(
1304                model.model_options.db_table, [old_field.column], suffix="_like"
1305            )
1306            self.execute(self._delete_index_sql(model, index_name))
1307
1308        self.sql_alter_column_type = "ALTER COLUMN %(column)s TYPE %(type)s"
1309        # Cast when data type changed.
1310        if self._field_data_type(old_field) != self._field_data_type(new_field):
1311            self.sql_alter_column_type += " USING %(column)s::%(type)s"
1312        new_internal_type = new_field.get_internal_type()
1313        old_internal_type = old_field.get_internal_type()
1314        # Make ALTER TYPE with IDENTITY make sense.
1315        table = strip_quotes(model.model_options.db_table)
1316        auto_field_types = {"PrimaryKeyField"}
1317        old_is_auto = old_internal_type in auto_field_types
1318        new_is_auto = new_internal_type in auto_field_types
1319        if new_is_auto and not old_is_auto:
1320            column = strip_quotes(new_field.column)
1321            return (
1322                (
1323                    self.sql_alter_column_type
1324                    % {
1325                        "column": quote_name(column),
1326                        "type": new_type,
1327                    },
1328                    [],
1329                ),
1330                [
1331                    (
1332                        self.sql_add_identity
1333                        % {
1334                            "table": quote_name(table),
1335                            "column": quote_name(column),
1336                        },
1337                        [],
1338                    ),
1339                ],
1340            )
1341        elif old_is_auto and not new_is_auto:
1342            # Drop IDENTITY if exists (pre-Plain 4.1 serial columns don't have
1343            # it).
1344            self.execute(
1345                self.sql_drop_identity
1346                % {
1347                    "table": quote_name(table),
1348                    "column": quote_name(strip_quotes(new_field.column)),
1349                }
1350            )
1351            column = strip_quotes(new_field.column)
1352            fragment, _ = self._alter_column_type_sql_base(
1353                model, old_field, new_field, new_type
1354            )
1355            # Drop the sequence if exists (Plain 4.1+ identity columns don't
1356            # have it).
1357            other_actions: list[tuple[str, list[Any]]] = []
1358            if sequence_name := self._get_sequence_name(table, column):
1359                other_actions = [
1360                    (
1361                        self.sql_delete_sequence
1362                        % {
1363                            "sequence": quote_name(sequence_name),
1364                        },
1365                        [],
1366                    )
1367                ]
1368            return fragment, other_actions
1369        elif new_is_auto and old_is_auto and old_internal_type != new_internal_type:
1370            fragment, _ = self._alter_column_type_sql_base(
1371                model, old_field, new_field, new_type
1372            )
1373            column = strip_quotes(new_field.column)
1374            db_types = {"PrimaryKeyField": "bigint"}
1375            # Alter the sequence type if exists (Plain 4.1+ identity columns
1376            # don't have it).
1377            other_actions: list[tuple[str, list[Any]]] = []
1378            if sequence_name := self._get_sequence_name(table, column):
1379                other_actions = [
1380                    (
1381                        self.sql_alter_sequence_type
1382                        % {
1383                            "sequence": quote_name(sequence_name),
1384                            "type": db_types[new_internal_type],
1385                        },
1386                        [],
1387                    ),
1388                ]
1389            return fragment, other_actions
1390        else:
1391            return self._alter_column_type_sql_base(
1392                model, old_field, new_field, new_type
1393            )
1394
1395    def _alter_column_type_sql_base(
1396        self,
1397        model: type[Model],
1398        old_field: Field,
1399        new_field: Field,
1400        new_type: str,
1401    ) -> tuple[tuple[str, list[Any]], list[tuple[str, list[Any]]]]:
1402        """Base implementation of _alter_column_type_sql without IDENTITY handling."""
1403        return (
1404            (
1405                self.sql_alter_column_type
1406                % {
1407                    "column": quote_name(new_field.column),
1408                    "type": new_type,
1409                },
1410                [],
1411            ),
1412            [],
1413        )
1414
1415    def _create_index_name(
1416        self, table_name: str, column_names: list[str], suffix: str = ""
1417    ) -> str:
1418        """
1419        Generate a unique name for an index/unique constraint.
1420
1421        The name is divided into 3 parts: the table name, the column names,
1422        and a unique digest and suffix.
1423        """
1424        _, table_name = split_identifier(table_name)
1425        hash_suffix_part = (
1426            f"{names_digest(table_name, *column_names, length=8)}{suffix}"
1427        )
1428        max_length = MAX_NAME_LENGTH
1429        # If everything fits into max_length, use that name.
1430        index_name = "{}_{}_{}".format(
1431            table_name, "_".join(column_names), hash_suffix_part
1432        )
1433        if len(index_name) <= max_length:
1434            return index_name
1435        # Shorten a long suffix.
1436        if len(hash_suffix_part) > max_length / 3:
1437            hash_suffix_part = hash_suffix_part[: max_length // 3]
1438        other_length = (max_length - len(hash_suffix_part)) // 2 - 1
1439        index_name = "{}_{}_{}".format(
1440            table_name[:other_length],
1441            "_".join(column_names)[:other_length],
1442            hash_suffix_part,
1443        )
1444        # Prepend D if needed to prevent the name from starting with an
1445        # underscore or a number.
1446        if index_name[0] == "_" or index_name[0].isdigit():
1447            index_name = f"D{index_name[:-1]}"
1448        return index_name
1449
1450    def _index_include_sql(
1451        self, model: type[Model], columns: list[str] | None
1452    ) -> str | Statement:
1453        if not columns:
1454            return ""
1455        return Statement(
1456            " INCLUDE (%(columns)s)",
1457            columns=Columns(model.model_options.db_table, columns),
1458        )
1459
1460    def _create_index_sql(
1461        self,
1462        model: type[Model],
1463        *,
1464        fields: list[Field] | None = None,
1465        name: str | None = None,
1466        suffix: str = "",
1467        using: str = "",
1468        col_suffixes: tuple[str, ...] = (),
1469        sql: str | None = None,
1470        opclasses: tuple[str, ...] = (),
1471        condition: str | None = None,
1472        concurrently: bool = False,
1473        include: list[str] | None = None,
1474        expressions: Any = None,
1475    ) -> Statement:
1476        """
1477        Return the SQL statement to create the index for one or several fields
1478        or expressions. `sql` can be specified if the syntax differs from the
1479        standard (GIS indexes, ...).
1480        """
1481        fields = fields or []
1482        expressions = expressions or []
1483        compiler = Query(model, alias_cols=False).get_compiler()
1484        columns = [field.column for field in fields]
1485        if sql is None:
1486            sql = (
1487                self.sql_create_index
1488                if not concurrently
1489                else self.sql_create_index_concurrently
1490            )
1491        table = model.model_options.db_table
1492
1493        def create_index_name(*args: Any, **kwargs: Any) -> str:
1494            nonlocal name
1495            if name is None:
1496                name = self._create_index_name(*args, **kwargs)
1497            return quote_name(name)
1498
1499        return Statement(
1500            sql,
1501            table=Table(table),
1502            name=IndexName(table, columns, suffix, create_index_name),
1503            using=using,
1504            columns=(
1505                self._index_columns(table, columns, col_suffixes, opclasses)
1506                if columns
1507                else Expressions(table, expressions, compiler, self.quote_value)
1508            ),
1509            extra="",
1510            condition=(" WHERE " + condition if condition else ""),
1511            include=self._index_include_sql(model, include),
1512        )
1513
1514    def _delete_index_sql(
1515        self,
1516        model: type[Model],
1517        name: str,
1518        sql: str | None = None,
1519        concurrently: bool = False,
1520    ) -> Statement:
1521        if sql is None:
1522            sql = (
1523                self.sql_delete_index_concurrently
1524                if concurrently
1525                else self.sql_delete_index
1526            )
1527        return Statement(
1528            sql,
1529            table=Table(model.model_options.db_table),
1530            name=quote_name(name),
1531        )
1532
1533    def _rename_index_sql(
1534        self, model: type[Model], old_name: str, new_name: str
1535    ) -> Statement:
1536        return Statement(
1537            self.sql_rename_index,
1538            table=Table(model.model_options.db_table),
1539            old_name=quote_name(old_name),
1540            new_name=quote_name(new_name),
1541        )
1542
1543    def _index_columns(
1544        self,
1545        table: str,
1546        columns: list[str],
1547        col_suffixes: tuple[str, ...],
1548        opclasses: tuple[str, ...],
1549    ) -> Columns:
1550        return Columns(
1551            table,
1552            columns,
1553            col_suffixes=col_suffixes,
1554            opclasses=opclasses,
1555        )
1556
1557    def _model_indexes_sql(self, model: type[Model]) -> list[Statement | None]:
1558        """
1559        Return a list of all index SQL statements (field indexes, Meta.indexes) for the specified model.
1560        """
1561        output: list[Statement | None] = []
1562        for field in model._model_meta.local_fields:
1563            output.extend(self._field_indexes_sql(model, field))
1564
1565        for index in model.model_options.indexes:
1566            if not index.contains_expressions:
1567                output.append(index.create_sql(model, self))
1568        return output
1569
1570    def _field_indexes_sql(self, model: type[Model], field: Field) -> list[Statement]:
1571        """
1572        Return a list of all index SQL statements for the specified field.
1573        """
1574        output: list[Statement] = []
1575        if self._field_should_be_indexed(model, field):
1576            output.append(self._create_index_sql(model, fields=[field]))
1577        # Add LIKE index for varchar/text primary keys
1578        like_index_statement = self._create_like_index_sql(model, field)
1579        if like_index_statement is not None:
1580            output.append(like_index_statement)
1581        return output
1582
1583    def _create_like_index_sql(
1584        self, model: type[Model], field: Field
1585    ) -> Statement | None:
1586        """
1587        Return the statement to create an index with varchar operator pattern
1588        when the column type is 'varchar' or 'text', otherwise return None.
1589        """
1590        db_type = field.db_type()
1591        if db_type is not None and field.primary_key:
1592            # Fields with database column types of `varchar` and `text` need
1593            # a second index that specifies their operator class, which is
1594            # needed when performing correct LIKE queries outside the
1595            # C locale. See #12234.
1596            #
1597            # The same doesn't apply to array fields such as varchar[size]
1598            # and text[size], so skip them.
1599            if "[" in db_type:
1600                return None
1601            if db_type.startswith("varchar"):
1602                return self._create_index_sql(
1603                    model,
1604                    fields=[field],
1605                    suffix="_like",
1606                    opclasses=("varchar_pattern_ops",),
1607                )
1608            elif db_type.startswith("text"):
1609                return self._create_index_sql(
1610                    model,
1611                    fields=[field],
1612                    suffix="_like",
1613                    opclasses=("text_pattern_ops",),
1614                )
1615        return None
1616
1617    def _field_should_be_altered(
1618        self, old_field: Field, new_field: Field, ignore: set[str] | None = None
1619    ) -> bool:
1620        ignore = ignore or set()
1621        _, old_path, old_args, old_kwargs = old_field.deconstruct()
1622        _, new_path, new_args, new_kwargs = new_field.deconstruct()
1623        # Don't alter when:
1624        # - changing only a field name
1625        # - changing an attribute that doesn't affect the schema
1626        # - changing an attribute in the provided set of ignored attributes
1627        for attr in ignore.union(old_field.non_db_attrs):
1628            old_kwargs.pop(attr, None)
1629        for attr in ignore.union(new_field.non_db_attrs):
1630            new_kwargs.pop(attr, None)
1631        return quote_name(old_field.column) != quote_name(new_field.column) or (
1632            old_path,
1633            old_args,
1634            old_kwargs,
1635        ) != (new_path, new_args, new_kwargs)
1636
1637    def _field_should_be_indexed(self, model: type[Model], field: Field) -> bool:
1638        if isinstance(field, ForeignKeyField):
1639            return bool(field.remote_field) and field.db_index and not field.primary_key
1640        return False
1641
1642    def _field_became_primary_key(self, old_field: Field, new_field: Field) -> bool:
1643        return not old_field.primary_key and new_field.primary_key
1644
1645    def _rename_field_sql(
1646        self, table: str, old_field: Field, new_field: Field, new_type: str
1647    ) -> str:
1648        return self.sql_rename_column % {
1649            "table": quote_name(table),
1650            "old_column": quote_name(old_field.column),
1651            "new_column": quote_name(new_field.column),
1652            "type": new_type,
1653        }
1654
1655    def _create_fk_sql(
1656        self, model: type[Model], field: ForeignKeyField, suffix: str
1657    ) -> Statement:
1658        table = Table(model.model_options.db_table)
1659        name = self._fk_constraint_name(model, field, suffix)
1660        column = Columns(model.model_options.db_table, [field.column])
1661        to_table = Table(field.target_field.model.model_options.db_table)
1662        to_column = Columns(
1663            field.target_field.model.model_options.db_table,
1664            [field.target_field.column],
1665        )
1666        deferrable = DEFERRABLE_SQL
1667        return Statement(
1668            self.sql_create_fk,
1669            table=table,
1670            name=name,
1671            column=column,
1672            to_table=to_table,
1673            to_column=to_column,
1674            deferrable=deferrable,
1675        )
1676
1677    def _fk_constraint_name(
1678        self, model: type[Model], field: ForeignKeyField, suffix: str
1679    ) -> ForeignKeyName:
1680        def create_fk_name(*args: Any, **kwargs: Any) -> str:
1681            return quote_name(self._create_index_name(*args, **kwargs))
1682
1683        return ForeignKeyName(
1684            model.model_options.db_table,
1685            [field.column],
1686            split_identifier(field.target_field.model.model_options.db_table)[1],
1687            [field.target_field.column],
1688            suffix,
1689            create_fk_name,
1690        )
1691
1692    def _deferrable_constraint_sql(self, deferrable: Deferrable | None) -> str:
1693        if deferrable is None:
1694            return ""
1695        if deferrable == Deferrable.DEFERRED:
1696            return " DEFERRABLE INITIALLY DEFERRED"
1697        if deferrable == Deferrable.IMMEDIATE:
1698            return " DEFERRABLE INITIALLY IMMEDIATE"
1699        return ""
1700
1701    def _unique_sql(
1702        self,
1703        model: type[Model],
1704        fields: Iterable[Field],
1705        name: str,
1706        condition: str | None = None,
1707        deferrable: Deferrable | None = None,
1708        include: list[str] | None = None,
1709        opclasses: tuple[str, ...] | None = None,
1710        expressions: Any = None,
1711    ) -> str | None:
1712        if condition or include or opclasses or expressions:
1713            # Databases support conditional, covering, and functional unique
1714            # constraints via a unique index.
1715            sql = self._create_unique_sql(
1716                model,
1717                fields,
1718                name=name,
1719                condition=condition,
1720                include=include,
1721                opclasses=opclasses,
1722                expressions=expressions,
1723            )
1724            if sql:
1725                self.deferred_sql.append(sql)
1726            return None
1727        constraint = self.sql_unique_constraint % {
1728            "columns": ", ".join([quote_name(field.column) for field in fields]),
1729            "deferrable": self._deferrable_constraint_sql(deferrable),
1730        }
1731        return self.sql_constraint % {
1732            "name": quote_name(name),
1733            "constraint": constraint,
1734        }
1735
1736    def _create_unique_sql(
1737        self,
1738        model: type[Model],
1739        fields: Iterable[Field],
1740        name: str | None = None,
1741        condition: str | None = None,
1742        deferrable: Deferrable | None = None,
1743        include: list[str] | None = None,
1744        opclasses: tuple[str, ...] | None = None,
1745        expressions: Any = None,
1746    ) -> Statement | None:
1747        compiler = Query(model, alias_cols=False).get_compiler()
1748        table = model.model_options.db_table
1749        columns = [field.column for field in fields]
1750        constraint_name: IndexName | str
1751        if name is None:
1752            constraint_name = self._unique_constraint_name(table, columns, quote=True)
1753        else:
1754            constraint_name = quote_name(name)
1755        if condition or include or opclasses or expressions:
1756            sql = self.sql_create_unique_index
1757        else:
1758            sql = self.sql_create_unique
1759        if columns:
1760            columns_obj: Columns | Expressions = self._index_columns(
1761                table, columns, col_suffixes=(), opclasses=opclasses or ()
1762            )
1763        else:
1764            columns_obj = Expressions(table, expressions, compiler, self.quote_value)
1765        return Statement(
1766            sql,
1767            table=Table(table),
1768            name=constraint_name,
1769            columns=columns_obj,
1770            condition=(" WHERE " + condition if condition else ""),
1771            deferrable=self._deferrable_constraint_sql(deferrable),
1772            include=self._index_include_sql(model, include),
1773        )
1774
1775    def _unique_constraint_name(
1776        self, table: str, columns: list[str], quote: bool = True
1777    ) -> IndexName | str:
1778        if quote:
1779
1780            def create_unique_name(*args: Any, **kwargs: Any) -> str:
1781                return quote_name(self._create_index_name(*args, **kwargs))
1782
1783        else:
1784            create_unique_name = self._create_index_name
1785
1786        return IndexName(table, columns, "_uniq", create_unique_name)
1787
1788    def _delete_unique_sql(
1789        self,
1790        model: type[Model],
1791        name: str,
1792        condition: str | None = None,
1793        deferrable: Deferrable | None = None,
1794        include: list[str] | None = None,
1795        opclasses: tuple[str, ...] | None = None,
1796        expressions: Any = None,
1797    ) -> Statement:
1798        if condition or include or opclasses or expressions:
1799            sql = self.sql_delete_index
1800        else:
1801            sql = self.sql_delete_unique
1802        return self._delete_constraint_sql(sql, model, name)
1803
1804    def _check_sql(self, name: str, check: str) -> str:
1805        return self.sql_constraint % {
1806            "name": quote_name(name),
1807            "constraint": self.sql_check_constraint % {"check": check},
1808        }
1809
1810    def _create_check_sql(self, model: type[Model], name: str, check: str) -> Statement:
1811        return Statement(
1812            self.sql_create_check,
1813            table=Table(model.model_options.db_table),
1814            name=quote_name(name),
1815            check=check,
1816        )
1817
1818    def _delete_constraint_sql(
1819        self, template: str, model: type[Model], name: str
1820    ) -> Statement:
1821        return Statement(
1822            template,
1823            table=Table(model.model_options.db_table),
1824            name=quote_name(name),
1825        )
1826
1827    def _constraint_names(
1828        self,
1829        model: type[Model],
1830        column_names: list[str] | None = None,
1831        unique: bool | None = None,
1832        primary_key: bool | None = None,
1833        index: bool | None = None,
1834        foreign_key: bool | None = None,
1835        check: bool | None = None,
1836        type_: str | None = None,
1837        exclude: set[str] | None = None,
1838    ) -> list[str]:
1839        """Return all constraint names matching the columns and conditions."""
1840        with self.connection.cursor() as cursor:
1841            constraints = self.connection.get_constraints(
1842                cursor, model.model_options.db_table
1843            )
1844        result: list[str] = []
1845        for name, infodict in constraints.items():
1846            if column_names is None or column_names == infodict["columns"]:
1847                if unique is not None and infodict["unique"] != unique:
1848                    continue
1849                if primary_key is not None and infodict["primary_key"] != primary_key:
1850                    continue
1851                if index is not None and infodict["index"] != index:
1852                    continue
1853                if check is not None and infodict["check"] != check:
1854                    continue
1855                if foreign_key is not None and not infodict["foreign_key"]:
1856                    continue
1857                if type_ is not None and infodict["type"] != type_:
1858                    continue
1859                if not exclude or name not in exclude:
1860                    result.append(name)
1861        return result
1862
1863    def _delete_primary_key(self, model: type[Model], strict: bool = False) -> None:
1864        constraint_names = self._constraint_names(model, primary_key=True)
1865        if strict and len(constraint_names) != 1:
1866            raise ValueError(
1867                f"Found wrong number ({len(constraint_names)}) of PK constraints for {model.model_options.db_table}"
1868            )
1869        for constraint_name in constraint_names:
1870            self.execute(self._delete_primary_key_sql(model, constraint_name))
1871
1872    def _create_primary_key_sql(self, model: type[Model], field: Field) -> Statement:
1873        return Statement(
1874            self.sql_create_pk,
1875            table=Table(model.model_options.db_table),
1876            name=quote_name(
1877                self._create_index_name(
1878                    model.model_options.db_table, [field.column], suffix="_pk"
1879                )
1880            ),
1881            columns=Columns(model.model_options.db_table, [field.column]),
1882        )
1883
1884    def _delete_primary_key_sql(self, model: type[Model], name: str) -> Statement:
1885        return self._delete_constraint_sql(self.sql_delete_pk, model, name)