Plain is headed towards 1.0! Subscribe for development updates →

   1import logging
   2import operator
   3from datetime import datetime
   4
   5from plain.models.backends.ddl_references import (
   6    Columns,
   7    Expressions,
   8    ForeignKeyName,
   9    IndexName,
  10    Statement,
  11    Table,
  12)
  13from plain.models.backends.utils import names_digest, split_identifier, truncate_name
  14from plain.models.constraints import Deferrable
  15from plain.models.indexes import Index
  16from plain.models.sql import Query
  17from plain.models.transaction import TransactionManagementError, atomic
  18from plain.utils import timezone
  19
  20logger = logging.getLogger("plain.models.backends.schema")
  21
  22
  23def _is_relevant_relation(relation, altered_field):
  24    """
  25    When altering the given field, must constraints on its model from the given
  26    relation be temporarily dropped?
  27    """
  28    field = relation.field
  29    if field.many_to_many:
  30        # M2M reverse field
  31        return False
  32    if altered_field.primary_key:
  33        # Foreign key constraint on the primary key, which is being altered.
  34        return True
  35    # ForeignKey always targets 'id'
  36    return altered_field.name == "id"
  37
  38
  39def _all_related_fields(model):
  40    # Related fields must be returned in a deterministic order.
  41    return sorted(
  42        model._meta._get_fields(
  43            forward=False,
  44            reverse=True,
  45            include_hidden=True,
  46        ),
  47        key=operator.attrgetter("name"),
  48    )
  49
  50
  51def _related_non_m2m_objects(old_field, new_field):
  52    # Filter out m2m objects from reverse relations.
  53    # Return (old_relation, new_relation) tuples.
  54    related_fields = zip(
  55        (
  56            obj
  57            for obj in _all_related_fields(old_field.model)
  58            if _is_relevant_relation(obj, old_field)
  59        ),
  60        (
  61            obj
  62            for obj in _all_related_fields(new_field.model)
  63            if _is_relevant_relation(obj, new_field)
  64        ),
  65    )
  66    for old_rel, new_rel in related_fields:
  67        yield old_rel, new_rel
  68        yield from _related_non_m2m_objects(
  69            old_rel.remote_field,
  70            new_rel.remote_field,
  71        )
  72
  73
  74class BaseDatabaseSchemaEditor:
  75    """
  76    This class and its subclasses are responsible for emitting schema-changing
  77    statements to the databases - model creation/removal/alteration, field
  78    renaming, index fiddling, and so on.
  79    """
  80
  81    # Overrideable SQL templates
  82    sql_create_table = "CREATE TABLE %(table)s (%(definition)s)"
  83    sql_rename_table = "ALTER TABLE %(old_table)s RENAME TO %(new_table)s"
  84    sql_delete_table = "DROP TABLE %(table)s CASCADE"
  85
  86    sql_create_column = "ALTER TABLE %(table)s ADD COLUMN %(column)s %(definition)s"
  87    sql_alter_column = "ALTER TABLE %(table)s %(changes)s"
  88    sql_alter_column_type = "ALTER COLUMN %(column)s TYPE %(type)s%(collation)s"
  89    sql_alter_column_null = "ALTER COLUMN %(column)s DROP NOT NULL"
  90    sql_alter_column_not_null = "ALTER COLUMN %(column)s SET NOT NULL"
  91    sql_alter_column_default = "ALTER COLUMN %(column)s SET DEFAULT %(default)s"
  92    sql_alter_column_no_default = "ALTER COLUMN %(column)s DROP DEFAULT"
  93    sql_alter_column_no_default_null = sql_alter_column_no_default
  94    sql_delete_column = "ALTER TABLE %(table)s DROP COLUMN %(column)s CASCADE"
  95    sql_rename_column = (
  96        "ALTER TABLE %(table)s RENAME COLUMN %(old_column)s TO %(new_column)s"
  97    )
  98    sql_update_with_default = (
  99        "UPDATE %(table)s SET %(column)s = %(default)s WHERE %(column)s IS NULL"
 100    )
 101
 102    sql_unique_constraint = "UNIQUE (%(columns)s)%(deferrable)s"
 103    sql_check_constraint = "CHECK (%(check)s)"
 104    sql_delete_constraint = "ALTER TABLE %(table)s DROP CONSTRAINT %(name)s"
 105    sql_constraint = "CONSTRAINT %(name)s %(constraint)s"
 106
 107    sql_create_check = "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s CHECK (%(check)s)"
 108    sql_delete_check = sql_delete_constraint
 109
 110    sql_create_unique = (
 111        "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s "
 112        "UNIQUE (%(columns)s)%(deferrable)s"
 113    )
 114    sql_delete_unique = sql_delete_constraint
 115
 116    sql_create_fk = (
 117        "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s FOREIGN KEY (%(column)s) "
 118        "REFERENCES %(to_table)s (%(to_column)s)%(deferrable)s"
 119    )
 120    sql_create_inline_fk = None
 121    sql_create_column_inline_fk = None
 122    sql_delete_fk = sql_delete_constraint
 123
 124    sql_create_index = (
 125        "CREATE INDEX %(name)s ON %(table)s "
 126        "(%(columns)s)%(include)s%(extra)s%(condition)s"
 127    )
 128    sql_create_unique_index = (
 129        "CREATE UNIQUE INDEX %(name)s ON %(table)s "
 130        "(%(columns)s)%(include)s%(condition)s"
 131    )
 132    sql_rename_index = "ALTER INDEX %(old_name)s RENAME TO %(new_name)s"
 133    sql_delete_index = "DROP INDEX %(name)s"
 134
 135    sql_create_pk = (
 136        "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s PRIMARY KEY (%(columns)s)"
 137    )
 138    sql_delete_pk = sql_delete_constraint
 139
 140    sql_alter_table_comment = "COMMENT ON TABLE %(table)s IS %(comment)s"
 141    sql_alter_column_comment = "COMMENT ON COLUMN %(table)s.%(column)s IS %(comment)s"
 142
 143    def __init__(self, connection, collect_sql=False, atomic=True):
 144        self.connection = connection
 145        self.collect_sql = collect_sql
 146        if self.collect_sql:
 147            self.collected_sql = []
 148        self.atomic_migration = self.connection.features.can_rollback_ddl and atomic
 149
 150    # State-managing methods
 151
 152    def __enter__(self):
 153        self.deferred_sql = []
 154        if self.atomic_migration:
 155            self.atomic = atomic()
 156            self.atomic.__enter__()
 157        return self
 158
 159    def __exit__(self, exc_type, exc_value, traceback):
 160        if exc_type is None:
 161            for sql in self.deferred_sql:
 162                self.execute(sql)
 163        if self.atomic_migration:
 164            self.atomic.__exit__(exc_type, exc_value, traceback)
 165
 166    # Core utility functions
 167
 168    def execute(self, sql, params=()):
 169        """Execute the given SQL statement, with optional parameters."""
 170        # Don't perform the transactional DDL check if SQL is being collected
 171        # as it's not going to be executed anyway.
 172        if (
 173            not self.collect_sql
 174            and self.connection.in_atomic_block
 175            and not self.connection.features.can_rollback_ddl
 176        ):
 177            raise TransactionManagementError(
 178                "Executing DDL statements while in a transaction on databases "
 179                "that can't perform a rollback is prohibited."
 180            )
 181        # Account for non-string statement objects.
 182        sql = str(sql)
 183        # Log the command we're running, then run it
 184        logger.debug(
 185            "%s; (params %r)", sql, params, extra={"params": params, "sql": sql}
 186        )
 187        if self.collect_sql:
 188            ending = "" if sql.rstrip().endswith(";") else ";"
 189            if params is not None:
 190                self.collected_sql.append(
 191                    (sql % tuple(map(self.quote_value, params))) + ending
 192                )
 193            else:
 194                self.collected_sql.append(sql + ending)
 195        else:
 196            with self.connection.cursor() as cursor:
 197                cursor.execute(sql, params)
 198
 199    def quote_name(self, name):
 200        return self.connection.ops.quote_name(name)
 201
 202    def table_sql(self, model):
 203        """Take a model and return its table definition."""
 204        # Create column SQL, add FK deferreds if needed.
 205        column_sqls = []
 206        params = []
 207        for field in model._meta.local_fields:
 208            # SQL.
 209            definition, extra_params = self.column_sql(model, field)
 210            if definition is None:
 211                continue
 212            # Check constraints can go on the column SQL here.
 213            db_params = field.db_parameters(connection=self.connection)
 214            if db_params["check"]:
 215                definition += " " + self.sql_check_constraint % db_params
 216            # Autoincrement SQL (for backends with inline variant).
 217            col_type_suffix = field.db_type_suffix(connection=self.connection)
 218            if col_type_suffix:
 219                definition += f" {col_type_suffix}"
 220            params.extend(extra_params)
 221            # FK.
 222            if field.remote_field and field.db_constraint:
 223                to_table = field.remote_field.model._meta.db_table
 224                to_column = field.remote_field.model._meta.get_field(
 225                    field.remote_field.field_name
 226                ).column
 227                if self.sql_create_inline_fk:
 228                    definition += " " + self.sql_create_inline_fk % {
 229                        "to_table": self.quote_name(to_table),
 230                        "to_column": self.quote_name(to_column),
 231                    }
 232                elif self.connection.features.supports_foreign_keys:
 233                    self.deferred_sql.append(
 234                        self._create_fk_sql(
 235                            model, field, "_fk_%(to_table)s_%(to_column)s"
 236                        )
 237                    )
 238            # Add the SQL to our big list.
 239            column_sqls.append(f"{self.quote_name(field.column)} {definition}")
 240            # Autoincrement SQL (for backends with post table definition
 241            # variant).
 242            if field.get_internal_type() in ("PrimaryKeyField",):
 243                autoinc_sql = self.connection.ops.autoinc_sql(
 244                    model._meta.db_table, field.column
 245                )
 246                if autoinc_sql:
 247                    self.deferred_sql.extend(autoinc_sql)
 248        constraints = [
 249            constraint.constraint_sql(model, self)
 250            for constraint in model._meta.constraints
 251        ]
 252        sql = self.sql_create_table % {
 253            "table": self.quote_name(model._meta.db_table),
 254            "definition": ", ".join(
 255                str(constraint)
 256                for constraint in (*column_sqls, *constraints)
 257                if constraint
 258            ),
 259        }
 260        return sql, params
 261
 262    # Field <-> database mapping functions
 263
 264    def _iter_column_sql(
 265        self, column_db_type, params, model, field, field_db_params, include_default
 266    ):
 267        yield column_db_type
 268        if collation := field_db_params.get("collation"):
 269            yield self._collate_sql(collation)
 270        if self.connection.features.supports_comments_inline and field.db_comment:
 271            yield self._comment_sql(field.db_comment)
 272        # Work out nullability.
 273        null = field.allow_null
 274        # Include a default value, if requested.
 275        include_default = (
 276            include_default
 277            and not self.skip_default(field)
 278            and
 279            # Don't include a default value if it's a nullable field and the
 280            # default cannot be dropped in the ALTER COLUMN statement (e.g.
 281            # MySQL longtext and longblob).
 282            not (null and self.skip_default_on_alter(field))
 283        )
 284        if include_default:
 285            default_value = self.effective_default(field)
 286            if default_value is not None:
 287                column_default = "DEFAULT " + self._column_default_sql(field)
 288                if self.connection.features.requires_literal_defaults:
 289                    # Some databases can't take defaults as a parameter (Oracle).
 290                    # If this is the case, the individual schema backend should
 291                    # implement prepare_default().
 292                    yield column_default % self.prepare_default(default_value)
 293                else:
 294                    yield column_default
 295                    params.append(default_value)
 296
 297        if not null:
 298            yield "NOT NULL"
 299        elif not self.connection.features.implied_column_null:
 300            yield "NULL"
 301
 302        if field.primary_key:
 303            yield "PRIMARY KEY"
 304
 305    def column_sql(self, model, field, include_default=False):
 306        """
 307        Return the column definition for a field. The field must already have
 308        had set_attributes_from_name() called.
 309        """
 310        # Get the column's type and use that as the basis of the SQL.
 311        field_db_params = field.db_parameters(connection=self.connection)
 312        column_db_type = field_db_params["type"]
 313        # Check for fields that aren't actually columns (e.g. M2M).
 314        if column_db_type is None:
 315            return None, None
 316        params = []
 317        return (
 318            " ".join(
 319                # This appends to the params being returned.
 320                self._iter_column_sql(
 321                    column_db_type,
 322                    params,
 323                    model,
 324                    field,
 325                    field_db_params,
 326                    include_default,
 327                )
 328            ),
 329            params,
 330        )
 331
 332    def skip_default(self, field):
 333        """
 334        Some backends don't accept default values for certain columns types
 335        (i.e. MySQL longtext and longblob).
 336        """
 337        return False
 338
 339    def skip_default_on_alter(self, field):
 340        """
 341        Some backends don't accept default values for certain columns types
 342        (i.e. MySQL longtext and longblob) in the ALTER COLUMN statement.
 343        """
 344        return False
 345
 346    def prepare_default(self, value):
 347        """
 348        Only used for backends which have requires_literal_defaults feature
 349        """
 350        raise NotImplementedError(
 351            "subclasses of BaseDatabaseSchemaEditor for backends which have "
 352            "requires_literal_defaults must provide a prepare_default() method"
 353        )
 354
 355    def _column_default_sql(self, field):
 356        """
 357        Return the SQL to use in a DEFAULT clause. The resulting string should
 358        contain a '%s' placeholder for a default value.
 359        """
 360        return "%s"
 361
 362    @staticmethod
 363    def _effective_default(field):
 364        # This method allows testing its logic without a connection.
 365        if field.has_default():
 366            default = field.get_default()
 367        elif (
 368            not field.allow_null and not field.required and field.empty_strings_allowed
 369        ):
 370            if field.get_internal_type() == "BinaryField":
 371                default = b""
 372            else:
 373                default = ""
 374        elif getattr(field, "auto_now", False) or getattr(field, "auto_now_add", False):
 375            internal_type = field.get_internal_type()
 376            if internal_type == "DateTimeField":
 377                default = timezone.now()
 378            else:
 379                default = datetime.now()
 380                if internal_type == "DateField":
 381                    default = default.date()
 382                elif internal_type == "TimeField":
 383                    default = default.time()
 384        else:
 385            default = None
 386        return default
 387
 388    def effective_default(self, field):
 389        """Return a field's effective database default value."""
 390        return field.get_db_prep_save(self._effective_default(field), self.connection)
 391
 392    def quote_value(self, value):
 393        """
 394        Return a quoted version of the value so it's safe to use in an SQL
 395        string. This is not safe against injection from user code; it is
 396        intended only for use in making SQL scripts or preparing default values
 397        for particularly tricky backends (defaults are not user-defined, though,
 398        so this is safe).
 399        """
 400        raise NotImplementedError()
 401
 402    # Actions
 403
 404    def create_model(self, model):
 405        """
 406        Create a table and any accompanying indexes or unique constraints for
 407        the given `model`.
 408        """
 409        sql, params = self.table_sql(model)
 410        # Prevent using [] as params, in the case a literal '%' is used in the
 411        # definition.
 412        self.execute(sql, params or None)
 413
 414        if self.connection.features.supports_comments:
 415            # Add table comment.
 416            if model._meta.db_table_comment:
 417                self.alter_db_table_comment(model, None, model._meta.db_table_comment)
 418            # Add column comments.
 419            if not self.connection.features.supports_comments_inline:
 420                for field in model._meta.local_fields:
 421                    if field.db_comment:
 422                        field_db_params = field.db_parameters(
 423                            connection=self.connection
 424                        )
 425                        field_type = field_db_params["type"]
 426                        self.execute(
 427                            *self._alter_column_comment_sql(
 428                                model, field, field_type, field.db_comment
 429                            )
 430                        )
 431        # Add any field index (deferred as SQLite _remake_table needs it).
 432        self.deferred_sql.extend(self._model_indexes_sql(model))
 433
 434    def delete_model(self, model):
 435        """Delete a model from the database."""
 436
 437        # Delete the table
 438        self.execute(
 439            self.sql_delete_table
 440            % {
 441                "table": self.quote_name(model._meta.db_table),
 442            }
 443        )
 444        # Remove all deferred statements referencing the deleted table.
 445        for sql in list(self.deferred_sql):
 446            if isinstance(sql, Statement) and sql.references_table(
 447                model._meta.db_table
 448            ):
 449                self.deferred_sql.remove(sql)
 450
 451    def add_index(self, model, index):
 452        """Add an index on a model."""
 453        if (
 454            index.contains_expressions
 455            and not self.connection.features.supports_expression_indexes
 456        ):
 457            return None
 458        # Index.create_sql returns interpolated SQL which makes params=None a
 459        # necessity to avoid escaping attempts on execution.
 460        self.execute(index.create_sql(model, self), params=None)
 461
 462    def remove_index(self, model, index):
 463        """Remove an index from a model."""
 464        if (
 465            index.contains_expressions
 466            and not self.connection.features.supports_expression_indexes
 467        ):
 468            return None
 469        self.execute(index.remove_sql(model, self))
 470
 471    def rename_index(self, model, old_index, new_index):
 472        if self.connection.features.can_rename_index:
 473            self.execute(
 474                self._rename_index_sql(model, old_index.name, new_index.name),
 475                params=None,
 476            )
 477        else:
 478            self.remove_index(model, old_index)
 479            self.add_index(model, new_index)
 480
 481    def add_constraint(self, model, constraint):
 482        """Add a constraint to a model."""
 483        sql = constraint.create_sql(model, self)
 484        if sql:
 485            # Constraint.create_sql returns interpolated SQL which makes
 486            # params=None a necessity to avoid escaping attempts on execution.
 487            self.execute(sql, params=None)
 488
 489    def remove_constraint(self, model, constraint):
 490        """Remove a constraint from a model."""
 491        sql = constraint.remove_sql(model, self)
 492        if sql:
 493            self.execute(sql)
 494
 495    def alter_db_table(self, model, old_db_table, new_db_table):
 496        """Rename the table a model points to."""
 497        if old_db_table == new_db_table or (
 498            self.connection.features.ignores_table_name_case
 499            and old_db_table.lower() == new_db_table.lower()
 500        ):
 501            return
 502        self.execute(
 503            self.sql_rename_table
 504            % {
 505                "old_table": self.quote_name(old_db_table),
 506                "new_table": self.quote_name(new_db_table),
 507            }
 508        )
 509        # Rename all references to the old table name.
 510        for sql in self.deferred_sql:
 511            if isinstance(sql, Statement):
 512                sql.rename_table_references(old_db_table, new_db_table)
 513
 514    def alter_db_table_comment(self, model, old_db_table_comment, new_db_table_comment):
 515        self.execute(
 516            self.sql_alter_table_comment
 517            % {
 518                "table": self.quote_name(model._meta.db_table),
 519                "comment": self.quote_value(new_db_table_comment or ""),
 520            }
 521        )
 522
 523    def add_field(self, model, field):
 524        """
 525        Create a field on a model. Usually involves adding a column, but may
 526        involve adding a table instead (for M2M fields).
 527        """
 528        # Get the column's definition
 529        definition, params = self.column_sql(model, field, include_default=True)
 530        # It might not actually have a column behind it
 531        if definition is None:
 532            return
 533        if col_type_suffix := field.db_type_suffix(connection=self.connection):
 534            definition += f" {col_type_suffix}"
 535        # Check constraints can go on the column SQL here
 536        db_params = field.db_parameters(connection=self.connection)
 537        if db_params["check"]:
 538            definition += " " + self.sql_check_constraint % db_params
 539        if (
 540            field.remote_field
 541            and self.connection.features.supports_foreign_keys
 542            and field.db_constraint
 543        ):
 544            constraint_suffix = "_fk_%(to_table)s_%(to_column)s"
 545            # Add FK constraint inline, if supported.
 546            if self.sql_create_column_inline_fk:
 547                to_table = field.remote_field.model._meta.db_table
 548                to_column = field.remote_field.model._meta.get_field(
 549                    field.remote_field.field_name
 550                ).column
 551                namespace, _ = split_identifier(model._meta.db_table)
 552                definition += " " + self.sql_create_column_inline_fk % {
 553                    "name": self._fk_constraint_name(model, field, constraint_suffix),
 554                    "namespace": f"{self.quote_name(namespace)}." if namespace else "",
 555                    "column": self.quote_name(field.column),
 556                    "to_table": self.quote_name(to_table),
 557                    "to_column": self.quote_name(to_column),
 558                    "deferrable": self.connection.ops.deferrable_sql(),
 559                }
 560            # Otherwise, add FK constraints later.
 561            else:
 562                self.deferred_sql.append(
 563                    self._create_fk_sql(model, field, constraint_suffix)
 564                )
 565        # Build the SQL and run it
 566        sql = self.sql_create_column % {
 567            "table": self.quote_name(model._meta.db_table),
 568            "column": self.quote_name(field.column),
 569            "definition": definition,
 570        }
 571        self.execute(sql, params)
 572        # Drop the default if we need to
 573        # (Plain usually does not use in-database defaults)
 574        if (
 575            not self.skip_default_on_alter(field)
 576            and self.effective_default(field) is not None
 577        ):
 578            changes_sql, params = self._alter_column_default_sql(
 579                model, None, field, drop=True
 580            )
 581            sql = self.sql_alter_column % {
 582                "table": self.quote_name(model._meta.db_table),
 583                "changes": changes_sql,
 584            }
 585            self.execute(sql, params)
 586        # Add field comment, if required.
 587        if (
 588            field.db_comment
 589            and self.connection.features.supports_comments
 590            and not self.connection.features.supports_comments_inline
 591        ):
 592            field_type = db_params["type"]
 593            self.execute(
 594                *self._alter_column_comment_sql(
 595                    model, field, field_type, field.db_comment
 596                )
 597            )
 598        # Add an index, if required
 599        self.deferred_sql.extend(self._field_indexes_sql(model, field))
 600        # Reset connection if required
 601        if self.connection.features.connection_persists_old_columns:
 602            self.connection.close()
 603
 604    def remove_field(self, model, field):
 605        """
 606        Remove a field from a model. Usually involves deleting a column,
 607        but for M2Ms may involve deleting a table.
 608        """
 609        # It might not actually have a column behind it
 610        if field.db_parameters(connection=self.connection)["type"] is None:
 611            return
 612        # Drop any FK constraints, MySQL requires explicit deletion
 613        if field.remote_field:
 614            fk_names = self._constraint_names(model, [field.column], foreign_key=True)
 615            for fk_name in fk_names:
 616                self.execute(self._delete_fk_sql(model, fk_name))
 617        # Delete the column
 618        sql = self.sql_delete_column % {
 619            "table": self.quote_name(model._meta.db_table),
 620            "column": self.quote_name(field.column),
 621        }
 622        self.execute(sql)
 623        # Reset connection if required
 624        if self.connection.features.connection_persists_old_columns:
 625            self.connection.close()
 626        # Remove all deferred statements referencing the deleted column.
 627        for sql in list(self.deferred_sql):
 628            if isinstance(sql, Statement) and sql.references_column(
 629                model._meta.db_table, field.column
 630            ):
 631                self.deferred_sql.remove(sql)
 632
 633    def alter_field(self, model, old_field, new_field, strict=False):
 634        """
 635        Allow a field's type, uniqueness, nullability, default, column,
 636        constraints, etc. to be modified.
 637        `old_field` is required to compute the necessary changes.
 638        If `strict` is True, raise errors if the old column does not match
 639        `old_field` precisely.
 640        """
 641        if not self._field_should_be_altered(old_field, new_field):
 642            return
 643        # Ensure this field is even column-based
 644        old_db_params = old_field.db_parameters(connection=self.connection)
 645        old_type = old_db_params["type"]
 646        new_db_params = new_field.db_parameters(connection=self.connection)
 647        new_type = new_db_params["type"]
 648        if (old_type is None and old_field.remote_field is None) or (
 649            new_type is None and new_field.remote_field is None
 650        ):
 651            raise ValueError(
 652                f"Cannot alter field {old_field} into {new_field} - they do not properly define "
 653                "db_type (are you using a badly-written custom field?)",
 654            )
 655        elif (
 656            old_type is None
 657            and new_type is None
 658            and (old_field.remote_field.through and new_field.remote_field.through)
 659        ):
 660            # Both sides have through models; this is a no-op.
 661            return
 662        elif old_type is None or new_type is None:
 663            raise ValueError(
 664                f"Cannot alter field {old_field} into {new_field} - they are not compatible types "
 665                "(you cannot alter to or from M2M fields, or add or remove "
 666                "through= on M2M fields)"
 667            )
 668
 669        self._alter_field(
 670            model,
 671            old_field,
 672            new_field,
 673            old_type,
 674            new_type,
 675            old_db_params,
 676            new_db_params,
 677            strict,
 678        )
 679
 680    def _field_db_check(self, field, field_db_params):
 681        # Always check constraints with the same mocked column name to avoid
 682        # recreating constrains when the column is renamed.
 683        check_constraints = self.connection.data_type_check_constraints
 684        data = field.db_type_parameters(self.connection)
 685        data["column"] = "__column_name__"
 686        try:
 687            return check_constraints[field.get_internal_type()] % data
 688        except KeyError:
 689            return None
 690
 691    def _alter_field(
 692        self,
 693        model,
 694        old_field,
 695        new_field,
 696        old_type,
 697        new_type,
 698        old_db_params,
 699        new_db_params,
 700        strict=False,
 701    ):
 702        """Perform a "physical" (non-ManyToMany) field update."""
 703        # Drop any FK constraints, we'll remake them later
 704        fks_dropped = set()
 705        if (
 706            self.connection.features.supports_foreign_keys
 707            and old_field.remote_field
 708            and old_field.db_constraint
 709            and self._field_should_be_altered(
 710                old_field,
 711                new_field,
 712                ignore={"db_comment"},
 713            )
 714        ):
 715            fk_names = self._constraint_names(
 716                model, [old_field.column], foreign_key=True
 717            )
 718            if strict and len(fk_names) != 1:
 719                raise ValueError(
 720                    f"Found wrong number ({len(fk_names)}) of foreign key constraints for {model._meta.db_table}.{old_field.column}"
 721                )
 722            for fk_name in fk_names:
 723                fks_dropped.add((old_field.column,))
 724                self.execute(self._delete_fk_sql(model, fk_name))
 725        # Has unique been removed?
 726        if old_field.primary_key and (
 727            not new_field.primary_key
 728            or self._field_became_primary_key(old_field, new_field)
 729        ):
 730            # Find the unique constraint for this field
 731            meta_constraint_names = {
 732                constraint.name for constraint in model._meta.constraints
 733            }
 734            constraint_names = self._constraint_names(
 735                model,
 736                [old_field.column],
 737                unique=True,
 738                primary_key=False,
 739                exclude=meta_constraint_names,
 740            )
 741            if strict and len(constraint_names) != 1:
 742                raise ValueError(
 743                    f"Found wrong number ({len(constraint_names)}) of unique constraints for {model._meta.db_table}.{old_field.column}"
 744                )
 745            for constraint_name in constraint_names:
 746                self.execute(self._delete_unique_sql(model, constraint_name))
 747        # Drop incoming FK constraints if the field is a primary key or unique,
 748        # which might be a to_field target, and things are going to change.
 749        old_collation = old_db_params.get("collation")
 750        new_collation = new_db_params.get("collation")
 751        drop_foreign_keys = (
 752            self.connection.features.supports_foreign_keys
 753            and (old_field.primary_key and new_field.primary_key)
 754            and ((old_type != new_type) or (old_collation != new_collation))
 755        )
 756        if drop_foreign_keys:
 757            # '_meta.related_field' also contains M2M reverse fields, these
 758            # will be filtered out
 759            for _old_rel, new_rel in _related_non_m2m_objects(old_field, new_field):
 760                rel_fk_names = self._constraint_names(
 761                    new_rel.related_model, [new_rel.field.column], foreign_key=True
 762                )
 763                for fk_name in rel_fk_names:
 764                    self.execute(self._delete_fk_sql(new_rel.related_model, fk_name))
 765        # Removed an index? (no strict check, as multiple indexes are possible)
 766        # Remove indexes if db_index switched to False or a unique constraint
 767        # will now be used in lieu of an index. The following lines from the
 768        # truth table show all True cases; the rest are False:
 769        #
 770        # old_field.db_index | old_field.primary_key | new_field.db_index | new_field.primary_key
 771        # ------------------------------------------------------------------------------
 772        # True               | False            | False              | False
 773        # True               | False            | False              | True
 774        # True               | False            | True               | True
 775        if (
 776            (old_field.remote_field and old_field.db_index)
 777            and not old_field.primary_key
 778            and (
 779                not (new_field.remote_field and new_field.db_index)
 780                or new_field.primary_key
 781            )
 782        ):
 783            # Find the index for this field
 784            meta_index_names = {index.name for index in model._meta.indexes}
 785            # Retrieve only BTREE indexes since this is what's created with
 786            # db_index=True.
 787            index_names = self._constraint_names(
 788                model,
 789                [old_field.column],
 790                index=True,
 791                type_=Index.suffix,
 792                exclude=meta_index_names,
 793            )
 794            for index_name in index_names:
 795                # The only way to check if an index was created with
 796                # db_index=True or with Index(['field'], name='foo')
 797                # is to look at its name (refs #28053).
 798                self.execute(self._delete_index_sql(model, index_name))
 799        # Change check constraints?
 800        old_db_check = self._field_db_check(old_field, old_db_params)
 801        new_db_check = self._field_db_check(new_field, new_db_params)
 802        if old_db_check != new_db_check and old_db_check:
 803            meta_constraint_names = {
 804                constraint.name for constraint in model._meta.constraints
 805            }
 806            constraint_names = self._constraint_names(
 807                model,
 808                [old_field.column],
 809                check=True,
 810                exclude=meta_constraint_names,
 811            )
 812            if strict and len(constraint_names) != 1:
 813                raise ValueError(
 814                    f"Found wrong number ({len(constraint_names)}) of check constraints for {model._meta.db_table}.{old_field.column}"
 815                )
 816            for constraint_name in constraint_names:
 817                self.execute(self._delete_check_sql(model, constraint_name))
 818        # Have they renamed the column?
 819        if old_field.column != new_field.column:
 820            self.execute(
 821                self._rename_field_sql(
 822                    model._meta.db_table, old_field, new_field, new_type
 823                )
 824            )
 825            # Rename all references to the renamed column.
 826            for sql in self.deferred_sql:
 827                if isinstance(sql, Statement):
 828                    sql.rename_column_references(
 829                        model._meta.db_table, old_field.column, new_field.column
 830                    )
 831        # Next, start accumulating actions to do
 832        actions = []
 833        null_actions = []
 834        post_actions = []
 835        # Type suffix change? (e.g. auto increment).
 836        old_type_suffix = old_field.db_type_suffix(connection=self.connection)
 837        new_type_suffix = new_field.db_type_suffix(connection=self.connection)
 838        # Type, collation, or comment change?
 839        if (
 840            old_type != new_type
 841            or old_type_suffix != new_type_suffix
 842            or old_collation != new_collation
 843            or (
 844                self.connection.features.supports_comments
 845                and old_field.db_comment != new_field.db_comment
 846            )
 847        ):
 848            fragment, other_actions = self._alter_column_type_sql(
 849                model, old_field, new_field, new_type, old_collation, new_collation
 850            )
 851            actions.append(fragment)
 852            post_actions.extend(other_actions)
 853        # When changing a column NULL constraint to NOT NULL with a given
 854        # default value, we need to perform 4 steps:
 855        #  1. Add a default for new incoming writes
 856        #  2. Update existing NULL rows with new default
 857        #  3. Replace NULL constraint with NOT NULL
 858        #  4. Drop the default again.
 859        # Default change?
 860        needs_database_default = False
 861        if old_field.allow_null and not new_field.allow_null:
 862            old_default = self.effective_default(old_field)
 863            new_default = self.effective_default(new_field)
 864            if (
 865                not self.skip_default_on_alter(new_field)
 866                and old_default != new_default
 867                and new_default is not None
 868            ):
 869                needs_database_default = True
 870                actions.append(
 871                    self._alter_column_default_sql(model, old_field, new_field)
 872                )
 873        # Nullability change?
 874        if old_field.allow_null != new_field.allow_null:
 875            fragment = self._alter_column_null_sql(model, old_field, new_field)
 876            if fragment:
 877                null_actions.append(fragment)
 878        # Only if we have a default and there is a change from NULL to NOT NULL
 879        four_way_default_alteration = new_field.has_default() and (
 880            old_field.allow_null and not new_field.allow_null
 881        )
 882        if actions or null_actions:
 883            if not four_way_default_alteration:
 884                # If we don't have to do a 4-way default alteration we can
 885                # directly run a (NOT) NULL alteration
 886                actions += null_actions
 887            # Combine actions together if we can (e.g. postgres)
 888            if self.connection.features.supports_combined_alters and actions:
 889                sql, params = tuple(zip(*actions))
 890                actions = [(", ".join(sql), sum(params, []))]
 891            # Apply those actions
 892            for sql, params in actions:
 893                self.execute(
 894                    self.sql_alter_column
 895                    % {
 896                        "table": self.quote_name(model._meta.db_table),
 897                        "changes": sql,
 898                    },
 899                    params,
 900                )
 901            if four_way_default_alteration:
 902                # Update existing rows with default value
 903                self.execute(
 904                    self.sql_update_with_default
 905                    % {
 906                        "table": self.quote_name(model._meta.db_table),
 907                        "column": self.quote_name(new_field.column),
 908                        "default": "%s",
 909                    },
 910                    [new_default],
 911                )
 912                # Since we didn't run a NOT NULL change before we need to do it
 913                # now
 914                for sql, params in null_actions:
 915                    self.execute(
 916                        self.sql_alter_column
 917                        % {
 918                            "table": self.quote_name(model._meta.db_table),
 919                            "changes": sql,
 920                        },
 921                        params,
 922                    )
 923        if post_actions:
 924            for sql, params in post_actions:
 925                self.execute(sql, params)
 926        # If primary_key changed to False, delete the primary key constraint.
 927        if old_field.primary_key and not new_field.primary_key:
 928            self._delete_primary_key(model, strict)
 929
 930        # Added an index? Add an index if db_index switched to True or a unique
 931        # constraint will no longer be used in lieu of an index. The following
 932        # lines from the truth table show all True cases; the rest are False:
 933        #
 934        # old_field.db_index | old_field.primary_key | new_field.db_index | new_field.primary_key
 935        # ------------------------------------------------------------------------------
 936        # False              | False            | True               | False
 937        # False              | True             | True               | False
 938        # True               | True             | True               | False
 939        if (
 940            (
 941                not (old_field.remote_field and old_field.db_index)
 942                or old_field.primary_key
 943            )
 944            and (new_field.remote_field and new_field.db_index)
 945            and not new_field.primary_key
 946        ):
 947            self.execute(self._create_index_sql(model, fields=[new_field]))
 948        # Type alteration on primary key? Then we need to alter the column
 949        # referring to us.
 950        rels_to_update = []
 951        if drop_foreign_keys:
 952            rels_to_update.extend(_related_non_m2m_objects(old_field, new_field))
 953        # Changed to become primary key?
 954        if self._field_became_primary_key(old_field, new_field):
 955            # Make the new one
 956            self.execute(self._create_primary_key_sql(model, new_field))
 957            # Update all referencing columns
 958            rels_to_update.extend(_related_non_m2m_objects(old_field, new_field))
 959        # Handle our type alters on the other end of rels from the PK stuff above
 960        for old_rel, new_rel in rels_to_update:
 961            rel_db_params = new_rel.field.db_parameters(connection=self.connection)
 962            rel_type = rel_db_params["type"]
 963            rel_collation = rel_db_params.get("collation")
 964            old_rel_db_params = old_rel.field.db_parameters(connection=self.connection)
 965            old_rel_collation = old_rel_db_params.get("collation")
 966            fragment, other_actions = self._alter_column_type_sql(
 967                new_rel.related_model,
 968                old_rel.field,
 969                new_rel.field,
 970                rel_type,
 971                old_rel_collation,
 972                rel_collation,
 973            )
 974            self.execute(
 975                self.sql_alter_column
 976                % {
 977                    "table": self.quote_name(new_rel.related_model._meta.db_table),
 978                    "changes": fragment[0],
 979                },
 980                fragment[1],
 981            )
 982            for sql, params in other_actions:
 983                self.execute(sql, params)
 984        # Does it have a foreign key?
 985        if (
 986            self.connection.features.supports_foreign_keys
 987            and new_field.remote_field
 988            and (
 989                fks_dropped or not old_field.remote_field or not old_field.db_constraint
 990            )
 991            and new_field.db_constraint
 992        ):
 993            self.execute(
 994                self._create_fk_sql(model, new_field, "_fk_%(to_table)s_%(to_column)s")
 995            )
 996        # Rebuild FKs that pointed to us if we previously had to drop them
 997        if drop_foreign_keys:
 998            for _, rel in rels_to_update:
 999                if rel.field.db_constraint:
1000                    self.execute(
1001                        self._create_fk_sql(rel.related_model, rel.field, "_fk")
1002                    )
1003        # Does it have check constraints we need to add?
1004        if old_db_check != new_db_check and new_db_check:
1005            constraint_name = self._create_index_name(
1006                model._meta.db_table, [new_field.column], suffix="_check"
1007            )
1008            self.execute(
1009                self._create_check_sql(model, constraint_name, new_db_params["check"])
1010            )
1011        # Drop the default if we need to
1012        # (Plain usually does not use in-database defaults)
1013        if needs_database_default:
1014            changes_sql, params = self._alter_column_default_sql(
1015                model, old_field, new_field, drop=True
1016            )
1017            sql = self.sql_alter_column % {
1018                "table": self.quote_name(model._meta.db_table),
1019                "changes": changes_sql,
1020            }
1021            self.execute(sql, params)
1022        # Reset connection if required
1023        if self.connection.features.connection_persists_old_columns:
1024            self.connection.close()
1025
1026    def _alter_column_null_sql(self, model, old_field, new_field):
1027        """
1028        Hook to specialize column null alteration.
1029
1030        Return a (sql, params) fragment to set a column to null or non-null
1031        as required by new_field, or None if no changes are required.
1032        """
1033        new_db_params = new_field.db_parameters(connection=self.connection)
1034        sql = (
1035            self.sql_alter_column_null
1036            if new_field.allow_null
1037            else self.sql_alter_column_not_null
1038        )
1039        return (
1040            sql
1041            % {
1042                "column": self.quote_name(new_field.column),
1043                "type": new_db_params["type"],
1044            },
1045            [],
1046        )
1047
1048    def _alter_column_default_sql(self, model, old_field, new_field, drop=False):
1049        """
1050        Hook to specialize column default alteration.
1051
1052        Return a (sql, params) fragment to add or drop (depending on the drop
1053        argument) a default to new_field's column.
1054        """
1055        new_default = self.effective_default(new_field)
1056        default = self._column_default_sql(new_field)
1057        params = [new_default]
1058
1059        if drop:
1060            params = []
1061        elif self.connection.features.requires_literal_defaults:
1062            # Some databases (Oracle) can't take defaults as a parameter
1063            # If this is the case, the SchemaEditor for that database should
1064            # implement prepare_default().
1065            default = self.prepare_default(new_default)
1066            params = []
1067
1068        new_db_params = new_field.db_parameters(connection=self.connection)
1069        if drop:
1070            if new_field.allow_null:
1071                sql = self.sql_alter_column_no_default_null
1072            else:
1073                sql = self.sql_alter_column_no_default
1074        else:
1075            sql = self.sql_alter_column_default
1076        return (
1077            sql
1078            % {
1079                "column": self.quote_name(new_field.column),
1080                "type": new_db_params["type"],
1081                "default": default,
1082            },
1083            params,
1084        )
1085
1086    def _alter_column_type_sql(
1087        self, model, old_field, new_field, new_type, old_collation, new_collation
1088    ):
1089        """
1090        Hook to specialize column type alteration for different backends,
1091        for cases when a creation type is different to an alteration type
1092        (e.g. SERIAL in PostgreSQL, PostGIS fields).
1093
1094        Return a two-tuple of: an SQL fragment of (sql, params) to insert into
1095        an ALTER TABLE statement and a list of extra (sql, params) tuples to
1096        run once the field is altered.
1097        """
1098        other_actions = []
1099        if collate_sql := self._collate_sql(
1100            new_collation, old_collation, model._meta.db_table
1101        ):
1102            collate_sql = f" {collate_sql}"
1103        else:
1104            collate_sql = ""
1105        # Comment change?
1106        comment_sql = ""
1107        if self.connection.features.supports_comments and not new_field.many_to_many:
1108            if old_field.db_comment != new_field.db_comment:
1109                # PostgreSQL and Oracle can't execute 'ALTER COLUMN ...' and
1110                # 'COMMENT ON ...' at the same time.
1111                sql, params = self._alter_column_comment_sql(
1112                    model, new_field, new_type, new_field.db_comment
1113                )
1114                if sql:
1115                    other_actions.append((sql, params))
1116            if new_field.db_comment:
1117                comment_sql = self._comment_sql(new_field.db_comment)
1118        return (
1119            (
1120                self.sql_alter_column_type
1121                % {
1122                    "column": self.quote_name(new_field.column),
1123                    "type": new_type,
1124                    "collation": collate_sql,
1125                    "comment": comment_sql,
1126                },
1127                [],
1128            ),
1129            other_actions,
1130        )
1131
1132    def _alter_column_comment_sql(self, model, new_field, new_type, new_db_comment):
1133        return (
1134            self.sql_alter_column_comment
1135            % {
1136                "table": self.quote_name(model._meta.db_table),
1137                "column": self.quote_name(new_field.column),
1138                "comment": self._comment_sql(new_db_comment),
1139            },
1140            [],
1141        )
1142
1143    def _comment_sql(self, comment):
1144        return self.quote_value(comment or "")
1145
1146    def _alter_many_to_many(self, model, old_field, new_field, strict):
1147        """Alter M2Ms to repoint their to= endpoints."""
1148        # Rename the through table
1149        if (
1150            old_field.remote_field.through._meta.db_table
1151            != new_field.remote_field.through._meta.db_table
1152        ):
1153            self.alter_db_table(
1154                old_field.remote_field.through,
1155                old_field.remote_field.through._meta.db_table,
1156                new_field.remote_field.through._meta.db_table,
1157            )
1158        # Repoint the FK to the other side
1159        self.alter_field(
1160            new_field.remote_field.through,
1161            # The field that points to the target model is needed, so we can
1162            # tell alter_field to change it - this is m2m_reverse_field_name()
1163            # (as opposed to m2m_field_name(), which points to our model).
1164            old_field.remote_field.through._meta.get_field(
1165                old_field.m2m_reverse_field_name()
1166            ),
1167            new_field.remote_field.through._meta.get_field(
1168                new_field.m2m_reverse_field_name()
1169            ),
1170        )
1171        self.alter_field(
1172            new_field.remote_field.through,
1173            # for self-referential models we need to alter field from the other end too
1174            old_field.remote_field.through._meta.get_field(old_field.m2m_field_name()),
1175            new_field.remote_field.through._meta.get_field(new_field.m2m_field_name()),
1176        )
1177
1178    def _create_index_name(self, table_name, column_names, suffix=""):
1179        """
1180        Generate a unique name for an index/unique constraint.
1181
1182        The name is divided into 3 parts: the table name, the column names,
1183        and a unique digest and suffix.
1184        """
1185        _, table_name = split_identifier(table_name)
1186        hash_suffix_part = (
1187            f"{names_digest(table_name, *column_names, length=8)}{suffix}"
1188        )
1189        max_length = self.connection.ops.max_name_length() or 200
1190        # If everything fits into max_length, use that name.
1191        index_name = "{}_{}_{}".format(
1192            table_name, "_".join(column_names), hash_suffix_part
1193        )
1194        if len(index_name) <= max_length:
1195            return index_name
1196        # Shorten a long suffix.
1197        if len(hash_suffix_part) > max_length / 3:
1198            hash_suffix_part = hash_suffix_part[: max_length // 3]
1199        other_length = (max_length - len(hash_suffix_part)) // 2 - 1
1200        index_name = "{}_{}_{}".format(
1201            table_name[:other_length],
1202            "_".join(column_names)[:other_length],
1203            hash_suffix_part,
1204        )
1205        # Prepend D if needed to prevent the name from starting with an
1206        # underscore or a number (not permitted on Oracle).
1207        if index_name[0] == "_" or index_name[0].isdigit():
1208            index_name = f"D{index_name[:-1]}"
1209        return index_name
1210
1211    def _index_condition_sql(self, condition):
1212        if condition:
1213            return " WHERE " + condition
1214        return ""
1215
1216    def _index_include_sql(self, model, columns):
1217        if not columns or not self.connection.features.supports_covering_indexes:
1218            return ""
1219        return Statement(
1220            " INCLUDE (%(columns)s)",
1221            columns=Columns(model._meta.db_table, columns, self.quote_name),
1222        )
1223
1224    def _create_index_sql(
1225        self,
1226        model,
1227        *,
1228        fields=None,
1229        name=None,
1230        suffix="",
1231        using="",
1232        col_suffixes=(),
1233        sql=None,
1234        opclasses=(),
1235        condition=None,
1236        include=None,
1237        expressions=None,
1238    ):
1239        """
1240        Return the SQL statement to create the index for one or several fields
1241        or expressions. `sql` can be specified if the syntax differs from the
1242        standard (GIS indexes, ...).
1243        """
1244        fields = fields or []
1245        expressions = expressions or []
1246        compiler = Query(model, alias_cols=False).get_compiler()
1247        columns = [field.column for field in fields]
1248        sql_create_index = sql or self.sql_create_index
1249        table = model._meta.db_table
1250
1251        def create_index_name(*args, **kwargs):
1252            nonlocal name
1253            if name is None:
1254                name = self._create_index_name(*args, **kwargs)
1255            return self.quote_name(name)
1256
1257        return Statement(
1258            sql_create_index,
1259            table=Table(table, self.quote_name),
1260            name=IndexName(table, columns, suffix, create_index_name),
1261            using=using,
1262            columns=(
1263                self._index_columns(table, columns, col_suffixes, opclasses)
1264                if columns
1265                else Expressions(table, expressions, compiler, self.quote_value)
1266            ),
1267            extra="",
1268            condition=self._index_condition_sql(condition),
1269            include=self._index_include_sql(model, include),
1270        )
1271
1272    def _delete_index_sql(self, model, name, sql=None):
1273        return Statement(
1274            sql or self.sql_delete_index,
1275            table=Table(model._meta.db_table, self.quote_name),
1276            name=self.quote_name(name),
1277        )
1278
1279    def _rename_index_sql(self, model, old_name, new_name):
1280        return Statement(
1281            self.sql_rename_index,
1282            table=Table(model._meta.db_table, self.quote_name),
1283            old_name=self.quote_name(old_name),
1284            new_name=self.quote_name(new_name),
1285        )
1286
1287    def _index_columns(self, table, columns, col_suffixes, opclasses):
1288        return Columns(table, columns, self.quote_name, col_suffixes=col_suffixes)
1289
1290    def _model_indexes_sql(self, model):
1291        """
1292        Return a list of all index SQL statements (field indexes, Meta.indexes) for the specified model.
1293        """
1294        output = []
1295        for field in model._meta.local_fields:
1296            output.extend(self._field_indexes_sql(model, field))
1297
1298        for index in model._meta.indexes:
1299            if (
1300                not index.contains_expressions
1301                or self.connection.features.supports_expression_indexes
1302            ):
1303                output.append(index.create_sql(model, self))
1304        return output
1305
1306    def _field_indexes_sql(self, model, field):
1307        """
1308        Return a list of all index SQL statements for the specified field.
1309        """
1310        output = []
1311        if self._field_should_be_indexed(model, field):
1312            output.append(self._create_index_sql(model, fields=[field]))
1313        return output
1314
1315    def _field_should_be_altered(self, old_field, new_field, ignore=None):
1316        ignore = ignore or set()
1317        _, old_path, old_args, old_kwargs = old_field.deconstruct()
1318        _, new_path, new_args, new_kwargs = new_field.deconstruct()
1319        # Don't alter when:
1320        # - changing only a field name
1321        # - changing an attribute that doesn't affect the schema
1322        # - changing an attribute in the provided set of ignored attributes
1323        # - adding only a db_column and the column name is not changed
1324        for attr in ignore.union(old_field.non_db_attrs):
1325            old_kwargs.pop(attr, None)
1326        for attr in ignore.union(new_field.non_db_attrs):
1327            new_kwargs.pop(attr, None)
1328        return self.quote_name(old_field.column) != self.quote_name(
1329            new_field.column
1330        ) or (old_path, old_args, old_kwargs) != (new_path, new_args, new_kwargs)
1331
1332    def _field_should_be_indexed(self, model, field):
1333        return (field.remote_field and field.db_index) and not field.primary_key
1334
1335    def _field_became_primary_key(self, old_field, new_field):
1336        return not old_field.primary_key and new_field.primary_key
1337
1338    def _rename_field_sql(self, table, old_field, new_field, new_type):
1339        return self.sql_rename_column % {
1340            "table": self.quote_name(table),
1341            "old_column": self.quote_name(old_field.column),
1342            "new_column": self.quote_name(new_field.column),
1343            "type": new_type,
1344        }
1345
1346    def _create_fk_sql(self, model, field, suffix):
1347        table = Table(model._meta.db_table, self.quote_name)
1348        name = self._fk_constraint_name(model, field, suffix)
1349        column = Columns(model._meta.db_table, [field.column], self.quote_name)
1350        to_table = Table(field.target_field.model._meta.db_table, self.quote_name)
1351        to_column = Columns(
1352            field.target_field.model._meta.db_table,
1353            [field.target_field.column],
1354            self.quote_name,
1355        )
1356        deferrable = self.connection.ops.deferrable_sql()
1357        return Statement(
1358            self.sql_create_fk,
1359            table=table,
1360            name=name,
1361            column=column,
1362            to_table=to_table,
1363            to_column=to_column,
1364            deferrable=deferrable,
1365        )
1366
1367    def _fk_constraint_name(self, model, field, suffix):
1368        def create_fk_name(*args, **kwargs):
1369            return self.quote_name(self._create_index_name(*args, **kwargs))
1370
1371        return ForeignKeyName(
1372            model._meta.db_table,
1373            [field.column],
1374            split_identifier(field.target_field.model._meta.db_table)[1],
1375            [field.target_field.column],
1376            suffix,
1377            create_fk_name,
1378        )
1379
1380    def _delete_fk_sql(self, model, name):
1381        return self._delete_constraint_sql(self.sql_delete_fk, model, name)
1382
1383    def _deferrable_constraint_sql(self, deferrable):
1384        if deferrable is None:
1385            return ""
1386        if deferrable == Deferrable.DEFERRED:
1387            return " DEFERRABLE INITIALLY DEFERRED"
1388        if deferrable == Deferrable.IMMEDIATE:
1389            return " DEFERRABLE INITIALLY IMMEDIATE"
1390
1391    def _unique_sql(
1392        self,
1393        model,
1394        fields,
1395        name,
1396        condition=None,
1397        deferrable=None,
1398        include=None,
1399        opclasses=None,
1400        expressions=None,
1401    ):
1402        if (
1403            deferrable
1404            and not self.connection.features.supports_deferrable_unique_constraints
1405        ):
1406            return None
1407        if condition or include or opclasses or expressions:
1408            # Databases support conditional, covering, and functional unique
1409            # constraints via a unique index.
1410            sql = self._create_unique_sql(
1411                model,
1412                fields,
1413                name=name,
1414                condition=condition,
1415                include=include,
1416                opclasses=opclasses,
1417                expressions=expressions,
1418            )
1419            if sql:
1420                self.deferred_sql.append(sql)
1421            return None
1422        constraint = self.sql_unique_constraint % {
1423            "columns": ", ".join([self.quote_name(field.column) for field in fields]),
1424            "deferrable": self._deferrable_constraint_sql(deferrable),
1425        }
1426        return self.sql_constraint % {
1427            "name": self.quote_name(name),
1428            "constraint": constraint,
1429        }
1430
1431    def _create_unique_sql(
1432        self,
1433        model,
1434        fields,
1435        name=None,
1436        condition=None,
1437        deferrable=None,
1438        include=None,
1439        opclasses=None,
1440        expressions=None,
1441    ):
1442        if (
1443            (
1444                deferrable
1445                and not self.connection.features.supports_deferrable_unique_constraints
1446            )
1447            or (condition and not self.connection.features.supports_partial_indexes)
1448            or (include and not self.connection.features.supports_covering_indexes)
1449            or (
1450                expressions and not self.connection.features.supports_expression_indexes
1451            )
1452        ):
1453            return None
1454
1455        compiler = Query(model, alias_cols=False).get_compiler()
1456        table = model._meta.db_table
1457        columns = [field.column for field in fields]
1458        if name is None:
1459            name = self._unique_constraint_name(table, columns, quote=True)
1460        else:
1461            name = self.quote_name(name)
1462        if condition or include or opclasses or expressions:
1463            sql = self.sql_create_unique_index
1464        else:
1465            sql = self.sql_create_unique
1466        if columns:
1467            columns = self._index_columns(
1468                table, columns, col_suffixes=(), opclasses=opclasses
1469            )
1470        else:
1471            columns = Expressions(table, expressions, compiler, self.quote_value)
1472        return Statement(
1473            sql,
1474            table=Table(table, self.quote_name),
1475            name=name,
1476            columns=columns,
1477            condition=self._index_condition_sql(condition),
1478            deferrable=self._deferrable_constraint_sql(deferrable),
1479            include=self._index_include_sql(model, include),
1480        )
1481
1482    def _unique_constraint_name(self, table, columns, quote=True):
1483        if quote:
1484
1485            def create_unique_name(*args, **kwargs):
1486                return self.quote_name(self._create_index_name(*args, **kwargs))
1487
1488        else:
1489            create_unique_name = self._create_index_name
1490
1491        return IndexName(table, columns, "_uniq", create_unique_name)
1492
1493    def _delete_unique_sql(
1494        self,
1495        model,
1496        name,
1497        condition=None,
1498        deferrable=None,
1499        include=None,
1500        opclasses=None,
1501        expressions=None,
1502    ):
1503        if (
1504            (
1505                deferrable
1506                and not self.connection.features.supports_deferrable_unique_constraints
1507            )
1508            or (condition and not self.connection.features.supports_partial_indexes)
1509            or (include and not self.connection.features.supports_covering_indexes)
1510            or (
1511                expressions and not self.connection.features.supports_expression_indexes
1512            )
1513        ):
1514            return None
1515        if condition or include or opclasses or expressions:
1516            sql = self.sql_delete_index
1517        else:
1518            sql = self.sql_delete_unique
1519        return self._delete_constraint_sql(sql, model, name)
1520
1521    def _check_sql(self, name, check):
1522        return self.sql_constraint % {
1523            "name": self.quote_name(name),
1524            "constraint": self.sql_check_constraint % {"check": check},
1525        }
1526
1527    def _create_check_sql(self, model, name, check):
1528        if not self.connection.features.supports_table_check_constraints:
1529            return None
1530        return Statement(
1531            self.sql_create_check,
1532            table=Table(model._meta.db_table, self.quote_name),
1533            name=self.quote_name(name),
1534            check=check,
1535        )
1536
1537    def _delete_check_sql(self, model, name):
1538        if not self.connection.features.supports_table_check_constraints:
1539            return None
1540        return self._delete_constraint_sql(self.sql_delete_check, model, name)
1541
1542    def _delete_constraint_sql(self, template, model, name):
1543        return Statement(
1544            template,
1545            table=Table(model._meta.db_table, self.quote_name),
1546            name=self.quote_name(name),
1547        )
1548
1549    def _constraint_names(
1550        self,
1551        model,
1552        column_names=None,
1553        unique=None,
1554        primary_key=None,
1555        index=None,
1556        foreign_key=None,
1557        check=None,
1558        type_=None,
1559        exclude=None,
1560    ):
1561        """Return all constraint names matching the columns and conditions."""
1562        if column_names is not None:
1563            column_names = [
1564                self.connection.introspection.identifier_converter(
1565                    truncate_name(name, self.connection.ops.max_name_length())
1566                )
1567                if self.connection.features.truncates_names
1568                else self.connection.introspection.identifier_converter(name)
1569                for name in column_names
1570            ]
1571        with self.connection.cursor() as cursor:
1572            constraints = self.connection.introspection.get_constraints(
1573                cursor, model._meta.db_table
1574            )
1575        result = []
1576        for name, infodict in constraints.items():
1577            if column_names is None or column_names == infodict["columns"]:
1578                if unique is not None and infodict["unique"] != unique:
1579                    continue
1580                if primary_key is not None and infodict["primary_key"] != primary_key:
1581                    continue
1582                if index is not None and infodict["index"] != index:
1583                    continue
1584                if check is not None and infodict["check"] != check:
1585                    continue
1586                if foreign_key is not None and not infodict["foreign_key"]:
1587                    continue
1588                if type_ is not None and infodict["type"] != type_:
1589                    continue
1590                if not exclude or name not in exclude:
1591                    result.append(name)
1592        return result
1593
1594    def _delete_primary_key(self, model, strict=False):
1595        constraint_names = self._constraint_names(model, primary_key=True)
1596        if strict and len(constraint_names) != 1:
1597            raise ValueError(
1598                f"Found wrong number ({len(constraint_names)}) of PK constraints for {model._meta.db_table}"
1599            )
1600        for constraint_name in constraint_names:
1601            self.execute(self._delete_primary_key_sql(model, constraint_name))
1602
1603    def _create_primary_key_sql(self, model, field):
1604        return Statement(
1605            self.sql_create_pk,
1606            table=Table(model._meta.db_table, self.quote_name),
1607            name=self.quote_name(
1608                self._create_index_name(
1609                    model._meta.db_table, [field.column], suffix="_pk"
1610                )
1611            ),
1612            columns=Columns(model._meta.db_table, [field.column], self.quote_name),
1613        )
1614
1615    def _delete_primary_key_sql(self, model, name):
1616        return self._delete_constraint_sql(self.sql_delete_pk, model, name)
1617
1618    def _collate_sql(self, collation, old_collation=None, table_name=None):
1619        return "COLLATE " + self.quote_name(collation) if collation else ""