Plain is headed towards 1.0! Subscribe for development updates →

   1import logging
   2import operator
   3from datetime import datetime
   4
   5from plain.models.backends.ddl_references import (
   6    Columns,
   7    Expressions,
   8    ForeignKeyName,
   9    IndexName,
  10    Statement,
  11    Table,
  12)
  13from plain.models.backends.utils import names_digest, split_identifier, truncate_name
  14from plain.models.constraints import Deferrable
  15from plain.models.indexes import Index
  16from plain.models.sql import Query
  17from plain.models.transaction import TransactionManagementError, atomic
  18from plain.utils import timezone
  19
  20logger = logging.getLogger("plain.models.backends.schema")
  21
  22
  23def _is_relevant_relation(relation, altered_field):
  24    """
  25    When altering the given field, must constraints on its model from the given
  26    relation be temporarily dropped?
  27    """
  28    field = relation.field
  29    if field.many_to_many:
  30        # M2M reverse field
  31        return False
  32    if altered_field.primary_key and field.to_fields == [None]:
  33        # Foreign key constraint on the primary key, which is being altered.
  34        return True
  35    # Is the constraint targeting the field being altered?
  36    return altered_field.name in field.to_fields
  37
  38
  39def _all_related_fields(model):
  40    # Related fields must be returned in a deterministic order.
  41    return sorted(
  42        model._meta._get_fields(
  43            forward=False,
  44            reverse=True,
  45            include_hidden=True,
  46        ),
  47        key=operator.attrgetter("name"),
  48    )
  49
  50
  51def _related_non_m2m_objects(old_field, new_field):
  52    # Filter out m2m objects from reverse relations.
  53    # Return (old_relation, new_relation) tuples.
  54    related_fields = zip(
  55        (
  56            obj
  57            for obj in _all_related_fields(old_field.model)
  58            if _is_relevant_relation(obj, old_field)
  59        ),
  60        (
  61            obj
  62            for obj in _all_related_fields(new_field.model)
  63            if _is_relevant_relation(obj, new_field)
  64        ),
  65    )
  66    for old_rel, new_rel in related_fields:
  67        yield old_rel, new_rel
  68        yield from _related_non_m2m_objects(
  69            old_rel.remote_field,
  70            new_rel.remote_field,
  71        )
  72
  73
  74class BaseDatabaseSchemaEditor:
  75    """
  76    This class and its subclasses are responsible for emitting schema-changing
  77    statements to the databases - model creation/removal/alteration, field
  78    renaming, index fiddling, and so on.
  79    """
  80
  81    # Overrideable SQL templates
  82    sql_create_table = "CREATE TABLE %(table)s (%(definition)s)"
  83    sql_rename_table = "ALTER TABLE %(old_table)s RENAME TO %(new_table)s"
  84    sql_delete_table = "DROP TABLE %(table)s CASCADE"
  85
  86    sql_create_column = "ALTER TABLE %(table)s ADD COLUMN %(column)s %(definition)s"
  87    sql_alter_column = "ALTER TABLE %(table)s %(changes)s"
  88    sql_alter_column_type = "ALTER COLUMN %(column)s TYPE %(type)s%(collation)s"
  89    sql_alter_column_null = "ALTER COLUMN %(column)s DROP NOT NULL"
  90    sql_alter_column_not_null = "ALTER COLUMN %(column)s SET NOT NULL"
  91    sql_alter_column_default = "ALTER COLUMN %(column)s SET DEFAULT %(default)s"
  92    sql_alter_column_no_default = "ALTER COLUMN %(column)s DROP DEFAULT"
  93    sql_alter_column_no_default_null = sql_alter_column_no_default
  94    sql_delete_column = "ALTER TABLE %(table)s DROP COLUMN %(column)s CASCADE"
  95    sql_rename_column = (
  96        "ALTER TABLE %(table)s RENAME COLUMN %(old_column)s TO %(new_column)s"
  97    )
  98    sql_update_with_default = (
  99        "UPDATE %(table)s SET %(column)s = %(default)s WHERE %(column)s IS NULL"
 100    )
 101
 102    sql_unique_constraint = "UNIQUE (%(columns)s)%(deferrable)s"
 103    sql_check_constraint = "CHECK (%(check)s)"
 104    sql_delete_constraint = "ALTER TABLE %(table)s DROP CONSTRAINT %(name)s"
 105    sql_constraint = "CONSTRAINT %(name)s %(constraint)s"
 106
 107    sql_create_check = "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s CHECK (%(check)s)"
 108    sql_delete_check = sql_delete_constraint
 109
 110    sql_create_unique = (
 111        "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s "
 112        "UNIQUE (%(columns)s)%(deferrable)s"
 113    )
 114    sql_delete_unique = sql_delete_constraint
 115
 116    sql_create_fk = (
 117        "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s FOREIGN KEY (%(column)s) "
 118        "REFERENCES %(to_table)s (%(to_column)s)%(deferrable)s"
 119    )
 120    sql_create_inline_fk = None
 121    sql_create_column_inline_fk = None
 122    sql_delete_fk = sql_delete_constraint
 123
 124    sql_create_index = (
 125        "CREATE INDEX %(name)s ON %(table)s "
 126        "(%(columns)s)%(include)s%(extra)s%(condition)s"
 127    )
 128    sql_create_unique_index = (
 129        "CREATE UNIQUE INDEX %(name)s ON %(table)s "
 130        "(%(columns)s)%(include)s%(condition)s"
 131    )
 132    sql_rename_index = "ALTER INDEX %(old_name)s RENAME TO %(new_name)s"
 133    sql_delete_index = "DROP INDEX %(name)s"
 134
 135    sql_create_pk = (
 136        "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s PRIMARY KEY (%(columns)s)"
 137    )
 138    sql_delete_pk = sql_delete_constraint
 139
 140    sql_alter_table_comment = "COMMENT ON TABLE %(table)s IS %(comment)s"
 141    sql_alter_column_comment = "COMMENT ON COLUMN %(table)s.%(column)s IS %(comment)s"
 142
 143    def __init__(self, connection, collect_sql=False, atomic=True):
 144        self.connection = connection
 145        self.collect_sql = collect_sql
 146        if self.collect_sql:
 147            self.collected_sql = []
 148        self.atomic_migration = self.connection.features.can_rollback_ddl and atomic
 149
 150    # State-managing methods
 151
 152    def __enter__(self):
 153        self.deferred_sql = []
 154        if self.atomic_migration:
 155            self.atomic = atomic()
 156            self.atomic.__enter__()
 157        return self
 158
 159    def __exit__(self, exc_type, exc_value, traceback):
 160        if exc_type is None:
 161            for sql in self.deferred_sql:
 162                self.execute(sql)
 163        if self.atomic_migration:
 164            self.atomic.__exit__(exc_type, exc_value, traceback)
 165
 166    # Core utility functions
 167
 168    def execute(self, sql, params=()):
 169        """Execute the given SQL statement, with optional parameters."""
 170        # Don't perform the transactional DDL check if SQL is being collected
 171        # as it's not going to be executed anyway.
 172        if (
 173            not self.collect_sql
 174            and self.connection.in_atomic_block
 175            and not self.connection.features.can_rollback_ddl
 176        ):
 177            raise TransactionManagementError(
 178                "Executing DDL statements while in a transaction on databases "
 179                "that can't perform a rollback is prohibited."
 180            )
 181        # Account for non-string statement objects.
 182        sql = str(sql)
 183        # Log the command we're running, then run it
 184        logger.debug(
 185            "%s; (params %r)", sql, params, extra={"params": params, "sql": sql}
 186        )
 187        if self.collect_sql:
 188            ending = "" if sql.rstrip().endswith(";") else ";"
 189            if params is not None:
 190                self.collected_sql.append(
 191                    (sql % tuple(map(self.quote_value, params))) + ending
 192                )
 193            else:
 194                self.collected_sql.append(sql + ending)
 195        else:
 196            with self.connection.cursor() as cursor:
 197                cursor.execute(sql, params)
 198
 199    def quote_name(self, name):
 200        return self.connection.ops.quote_name(name)
 201
 202    def table_sql(self, model):
 203        """Take a model and return its table definition."""
 204        # Create column SQL, add FK deferreds if needed.
 205        column_sqls = []
 206        params = []
 207        for field in model._meta.local_fields:
 208            # SQL.
 209            definition, extra_params = self.column_sql(model, field)
 210            if definition is None:
 211                continue
 212            # Check constraints can go on the column SQL here.
 213            db_params = field.db_parameters(connection=self.connection)
 214            if db_params["check"]:
 215                definition += " " + self.sql_check_constraint % db_params
 216            # Autoincrement SQL (for backends with inline variant).
 217            col_type_suffix = field.db_type_suffix(connection=self.connection)
 218            if col_type_suffix:
 219                definition += f" {col_type_suffix}"
 220            params.extend(extra_params)
 221            # FK.
 222            if field.remote_field and field.db_constraint:
 223                to_table = field.remote_field.model._meta.db_table
 224                to_column = field.remote_field.model._meta.get_field(
 225                    field.remote_field.field_name
 226                ).column
 227                if self.sql_create_inline_fk:
 228                    definition += " " + self.sql_create_inline_fk % {
 229                        "to_table": self.quote_name(to_table),
 230                        "to_column": self.quote_name(to_column),
 231                    }
 232                elif self.connection.features.supports_foreign_keys:
 233                    self.deferred_sql.append(
 234                        self._create_fk_sql(
 235                            model, field, "_fk_%(to_table)s_%(to_column)s"
 236                        )
 237                    )
 238            # Add the SQL to our big list.
 239            column_sqls.append(f"{self.quote_name(field.column)} {definition}")
 240            # Autoincrement SQL (for backends with post table definition
 241            # variant).
 242            if field.get_internal_type() in (
 243                "AutoField",
 244                "BigAutoField",
 245                "SmallAutoField",
 246            ):
 247                autoinc_sql = self.connection.ops.autoinc_sql(
 248                    model._meta.db_table, field.column
 249                )
 250                if autoinc_sql:
 251                    self.deferred_sql.extend(autoinc_sql)
 252        constraints = [
 253            constraint.constraint_sql(model, self)
 254            for constraint in model._meta.constraints
 255        ]
 256        sql = self.sql_create_table % {
 257            "table": self.quote_name(model._meta.db_table),
 258            "definition": ", ".join(
 259                str(constraint)
 260                for constraint in (*column_sqls, *constraints)
 261                if constraint
 262            ),
 263        }
 264        return sql, params
 265
 266    # Field <-> database mapping functions
 267
 268    def _iter_column_sql(
 269        self, column_db_type, params, model, field, field_db_params, include_default
 270    ):
 271        yield column_db_type
 272        if collation := field_db_params.get("collation"):
 273            yield self._collate_sql(collation)
 274        if self.connection.features.supports_comments_inline and field.db_comment:
 275            yield self._comment_sql(field.db_comment)
 276        # Work out nullability.
 277        null = field.allow_null
 278        # Include a default value, if requested.
 279        include_default = (
 280            include_default
 281            and not self.skip_default(field)
 282            and
 283            # Don't include a default value if it's a nullable field and the
 284            # default cannot be dropped in the ALTER COLUMN statement (e.g.
 285            # MySQL longtext and longblob).
 286            not (null and self.skip_default_on_alter(field))
 287        )
 288        if include_default:
 289            default_value = self.effective_default(field)
 290            if default_value is not None:
 291                column_default = "DEFAULT " + self._column_default_sql(field)
 292                if self.connection.features.requires_literal_defaults:
 293                    # Some databases can't take defaults as a parameter (Oracle).
 294                    # If this is the case, the individual schema backend should
 295                    # implement prepare_default().
 296                    yield column_default % self.prepare_default(default_value)
 297                else:
 298                    yield column_default
 299                    params.append(default_value)
 300
 301        if not null:
 302            yield "NOT NULL"
 303        elif not self.connection.features.implied_column_null:
 304            yield "NULL"
 305
 306        if field.primary_key:
 307            yield "PRIMARY KEY"
 308
 309    def column_sql(self, model, field, include_default=False):
 310        """
 311        Return the column definition for a field. The field must already have
 312        had set_attributes_from_name() called.
 313        """
 314        # Get the column's type and use that as the basis of the SQL.
 315        field_db_params = field.db_parameters(connection=self.connection)
 316        column_db_type = field_db_params["type"]
 317        # Check for fields that aren't actually columns (e.g. M2M).
 318        if column_db_type is None:
 319            return None, None
 320        params = []
 321        return (
 322            " ".join(
 323                # This appends to the params being returned.
 324                self._iter_column_sql(
 325                    column_db_type,
 326                    params,
 327                    model,
 328                    field,
 329                    field_db_params,
 330                    include_default,
 331                )
 332            ),
 333            params,
 334        )
 335
 336    def skip_default(self, field):
 337        """
 338        Some backends don't accept default values for certain columns types
 339        (i.e. MySQL longtext and longblob).
 340        """
 341        return False
 342
 343    def skip_default_on_alter(self, field):
 344        """
 345        Some backends don't accept default values for certain columns types
 346        (i.e. MySQL longtext and longblob) in the ALTER COLUMN statement.
 347        """
 348        return False
 349
 350    def prepare_default(self, value):
 351        """
 352        Only used for backends which have requires_literal_defaults feature
 353        """
 354        raise NotImplementedError(
 355            "subclasses of BaseDatabaseSchemaEditor for backends which have "
 356            "requires_literal_defaults must provide a prepare_default() method"
 357        )
 358
 359    def _column_default_sql(self, field):
 360        """
 361        Return the SQL to use in a DEFAULT clause. The resulting string should
 362        contain a '%s' placeholder for a default value.
 363        """
 364        return "%s"
 365
 366    @staticmethod
 367    def _effective_default(field):
 368        # This method allows testing its logic without a connection.
 369        if field.has_default():
 370            default = field.get_default()
 371        elif (
 372            not field.allow_null and not field.required and field.empty_strings_allowed
 373        ):
 374            if field.get_internal_type() == "BinaryField":
 375                default = b""
 376            else:
 377                default = ""
 378        elif getattr(field, "auto_now", False) or getattr(field, "auto_now_add", False):
 379            internal_type = field.get_internal_type()
 380            if internal_type == "DateTimeField":
 381                default = timezone.now()
 382            else:
 383                default = datetime.now()
 384                if internal_type == "DateField":
 385                    default = default.date()
 386                elif internal_type == "TimeField":
 387                    default = default.time()
 388        else:
 389            default = None
 390        return default
 391
 392    def effective_default(self, field):
 393        """Return a field's effective database default value."""
 394        return field.get_db_prep_save(self._effective_default(field), self.connection)
 395
 396    def quote_value(self, value):
 397        """
 398        Return a quoted version of the value so it's safe to use in an SQL
 399        string. This is not safe against injection from user code; it is
 400        intended only for use in making SQL scripts or preparing default values
 401        for particularly tricky backends (defaults are not user-defined, though,
 402        so this is safe).
 403        """
 404        raise NotImplementedError()
 405
 406    # Actions
 407
 408    def create_model(self, model):
 409        """
 410        Create a table and any accompanying indexes or unique constraints for
 411        the given `model`.
 412        """
 413        sql, params = self.table_sql(model)
 414        # Prevent using [] as params, in the case a literal '%' is used in the
 415        # definition.
 416        self.execute(sql, params or None)
 417
 418        if self.connection.features.supports_comments:
 419            # Add table comment.
 420            if model._meta.db_table_comment:
 421                self.alter_db_table_comment(model, None, model._meta.db_table_comment)
 422            # Add column comments.
 423            if not self.connection.features.supports_comments_inline:
 424                for field in model._meta.local_fields:
 425                    if field.db_comment:
 426                        field_db_params = field.db_parameters(
 427                            connection=self.connection
 428                        )
 429                        field_type = field_db_params["type"]
 430                        self.execute(
 431                            *self._alter_column_comment_sql(
 432                                model, field, field_type, field.db_comment
 433                            )
 434                        )
 435        # Add any field index (deferred as SQLite _remake_table needs it).
 436        self.deferred_sql.extend(self._model_indexes_sql(model))
 437
 438    def delete_model(self, model):
 439        """Delete a model from the database."""
 440
 441        # Delete the table
 442        self.execute(
 443            self.sql_delete_table
 444            % {
 445                "table": self.quote_name(model._meta.db_table),
 446            }
 447        )
 448        # Remove all deferred statements referencing the deleted table.
 449        for sql in list(self.deferred_sql):
 450            if isinstance(sql, Statement) and sql.references_table(
 451                model._meta.db_table
 452            ):
 453                self.deferred_sql.remove(sql)
 454
 455    def add_index(self, model, index):
 456        """Add an index on a model."""
 457        if (
 458            index.contains_expressions
 459            and not self.connection.features.supports_expression_indexes
 460        ):
 461            return None
 462        # Index.create_sql returns interpolated SQL which makes params=None a
 463        # necessity to avoid escaping attempts on execution.
 464        self.execute(index.create_sql(model, self), params=None)
 465
 466    def remove_index(self, model, index):
 467        """Remove an index from a model."""
 468        if (
 469            index.contains_expressions
 470            and not self.connection.features.supports_expression_indexes
 471        ):
 472            return None
 473        self.execute(index.remove_sql(model, self))
 474
 475    def rename_index(self, model, old_index, new_index):
 476        if self.connection.features.can_rename_index:
 477            self.execute(
 478                self._rename_index_sql(model, old_index.name, new_index.name),
 479                params=None,
 480            )
 481        else:
 482            self.remove_index(model, old_index)
 483            self.add_index(model, new_index)
 484
 485    def add_constraint(self, model, constraint):
 486        """Add a constraint to a model."""
 487        sql = constraint.create_sql(model, self)
 488        if sql:
 489            # Constraint.create_sql returns interpolated SQL which makes
 490            # params=None a necessity to avoid escaping attempts on execution.
 491            self.execute(sql, params=None)
 492
 493    def remove_constraint(self, model, constraint):
 494        """Remove a constraint from a model."""
 495        sql = constraint.remove_sql(model, self)
 496        if sql:
 497            self.execute(sql)
 498
 499    def alter_db_table(self, model, old_db_table, new_db_table):
 500        """Rename the table a model points to."""
 501        if old_db_table == new_db_table or (
 502            self.connection.features.ignores_table_name_case
 503            and old_db_table.lower() == new_db_table.lower()
 504        ):
 505            return
 506        self.execute(
 507            self.sql_rename_table
 508            % {
 509                "old_table": self.quote_name(old_db_table),
 510                "new_table": self.quote_name(new_db_table),
 511            }
 512        )
 513        # Rename all references to the old table name.
 514        for sql in self.deferred_sql:
 515            if isinstance(sql, Statement):
 516                sql.rename_table_references(old_db_table, new_db_table)
 517
 518    def alter_db_table_comment(self, model, old_db_table_comment, new_db_table_comment):
 519        self.execute(
 520            self.sql_alter_table_comment
 521            % {
 522                "table": self.quote_name(model._meta.db_table),
 523                "comment": self.quote_value(new_db_table_comment or ""),
 524            }
 525        )
 526
 527    def add_field(self, model, field):
 528        """
 529        Create a field on a model. Usually involves adding a column, but may
 530        involve adding a table instead (for M2M fields).
 531        """
 532        # Get the column's definition
 533        definition, params = self.column_sql(model, field, include_default=True)
 534        # It might not actually have a column behind it
 535        if definition is None:
 536            return
 537        if col_type_suffix := field.db_type_suffix(connection=self.connection):
 538            definition += f" {col_type_suffix}"
 539        # Check constraints can go on the column SQL here
 540        db_params = field.db_parameters(connection=self.connection)
 541        if db_params["check"]:
 542            definition += " " + self.sql_check_constraint % db_params
 543        if (
 544            field.remote_field
 545            and self.connection.features.supports_foreign_keys
 546            and field.db_constraint
 547        ):
 548            constraint_suffix = "_fk_%(to_table)s_%(to_column)s"
 549            # Add FK constraint inline, if supported.
 550            if self.sql_create_column_inline_fk:
 551                to_table = field.remote_field.model._meta.db_table
 552                to_column = field.remote_field.model._meta.get_field(
 553                    field.remote_field.field_name
 554                ).column
 555                namespace, _ = split_identifier(model._meta.db_table)
 556                definition += " " + self.sql_create_column_inline_fk % {
 557                    "name": self._fk_constraint_name(model, field, constraint_suffix),
 558                    "namespace": f"{self.quote_name(namespace)}." if namespace else "",
 559                    "column": self.quote_name(field.column),
 560                    "to_table": self.quote_name(to_table),
 561                    "to_column": self.quote_name(to_column),
 562                    "deferrable": self.connection.ops.deferrable_sql(),
 563                }
 564            # Otherwise, add FK constraints later.
 565            else:
 566                self.deferred_sql.append(
 567                    self._create_fk_sql(model, field, constraint_suffix)
 568                )
 569        # Build the SQL and run it
 570        sql = self.sql_create_column % {
 571            "table": self.quote_name(model._meta.db_table),
 572            "column": self.quote_name(field.column),
 573            "definition": definition,
 574        }
 575        self.execute(sql, params)
 576        # Drop the default if we need to
 577        # (Plain usually does not use in-database defaults)
 578        if (
 579            not self.skip_default_on_alter(field)
 580            and self.effective_default(field) is not None
 581        ):
 582            changes_sql, params = self._alter_column_default_sql(
 583                model, None, field, drop=True
 584            )
 585            sql = self.sql_alter_column % {
 586                "table": self.quote_name(model._meta.db_table),
 587                "changes": changes_sql,
 588            }
 589            self.execute(sql, params)
 590        # Add field comment, if required.
 591        if (
 592            field.db_comment
 593            and self.connection.features.supports_comments
 594            and not self.connection.features.supports_comments_inline
 595        ):
 596            field_type = db_params["type"]
 597            self.execute(
 598                *self._alter_column_comment_sql(
 599                    model, field, field_type, field.db_comment
 600                )
 601            )
 602        # Add an index, if required
 603        self.deferred_sql.extend(self._field_indexes_sql(model, field))
 604        # Reset connection if required
 605        if self.connection.features.connection_persists_old_columns:
 606            self.connection.close()
 607
 608    def remove_field(self, model, field):
 609        """
 610        Remove a field from a model. Usually involves deleting a column,
 611        but for M2Ms may involve deleting a table.
 612        """
 613        # It might not actually have a column behind it
 614        if field.db_parameters(connection=self.connection)["type"] is None:
 615            return
 616        # Drop any FK constraints, MySQL requires explicit deletion
 617        if field.remote_field:
 618            fk_names = self._constraint_names(model, [field.column], foreign_key=True)
 619            for fk_name in fk_names:
 620                self.execute(self._delete_fk_sql(model, fk_name))
 621        # Delete the column
 622        sql = self.sql_delete_column % {
 623            "table": self.quote_name(model._meta.db_table),
 624            "column": self.quote_name(field.column),
 625        }
 626        self.execute(sql)
 627        # Reset connection if required
 628        if self.connection.features.connection_persists_old_columns:
 629            self.connection.close()
 630        # Remove all deferred statements referencing the deleted column.
 631        for sql in list(self.deferred_sql):
 632            if isinstance(sql, Statement) and sql.references_column(
 633                model._meta.db_table, field.column
 634            ):
 635                self.deferred_sql.remove(sql)
 636
 637    def alter_field(self, model, old_field, new_field, strict=False):
 638        """
 639        Allow a field's type, uniqueness, nullability, default, column,
 640        constraints, etc. to be modified.
 641        `old_field` is required to compute the necessary changes.
 642        If `strict` is True, raise errors if the old column does not match
 643        `old_field` precisely.
 644        """
 645        if not self._field_should_be_altered(old_field, new_field):
 646            return
 647        # Ensure this field is even column-based
 648        old_db_params = old_field.db_parameters(connection=self.connection)
 649        old_type = old_db_params["type"]
 650        new_db_params = new_field.db_parameters(connection=self.connection)
 651        new_type = new_db_params["type"]
 652        if (old_type is None and old_field.remote_field is None) or (
 653            new_type is None and new_field.remote_field is None
 654        ):
 655            raise ValueError(
 656                f"Cannot alter field {old_field} into {new_field} - they do not properly define "
 657                "db_type (are you using a badly-written custom field?)",
 658            )
 659        elif (
 660            old_type is None
 661            and new_type is None
 662            and (old_field.remote_field.through and new_field.remote_field.through)
 663        ):
 664            # Both sides have through models; this is a no-op.
 665            return
 666        elif old_type is None or new_type is None:
 667            raise ValueError(
 668                f"Cannot alter field {old_field} into {new_field} - they are not compatible types "
 669                "(you cannot alter to or from M2M fields, or add or remove "
 670                "through= on M2M fields)"
 671            )
 672
 673        self._alter_field(
 674            model,
 675            old_field,
 676            new_field,
 677            old_type,
 678            new_type,
 679            old_db_params,
 680            new_db_params,
 681            strict,
 682        )
 683
 684    def _field_db_check(self, field, field_db_params):
 685        # Always check constraints with the same mocked column name to avoid
 686        # recreating constrains when the column is renamed.
 687        check_constraints = self.connection.data_type_check_constraints
 688        data = field.db_type_parameters(self.connection)
 689        data["column"] = "__column_name__"
 690        try:
 691            return check_constraints[field.get_internal_type()] % data
 692        except KeyError:
 693            return None
 694
 695    def _alter_field(
 696        self,
 697        model,
 698        old_field,
 699        new_field,
 700        old_type,
 701        new_type,
 702        old_db_params,
 703        new_db_params,
 704        strict=False,
 705    ):
 706        """Perform a "physical" (non-ManyToMany) field update."""
 707        # Drop any FK constraints, we'll remake them later
 708        fks_dropped = set()
 709        if (
 710            self.connection.features.supports_foreign_keys
 711            and old_field.remote_field
 712            and old_field.db_constraint
 713            and self._field_should_be_altered(
 714                old_field,
 715                new_field,
 716                ignore={"db_comment"},
 717            )
 718        ):
 719            fk_names = self._constraint_names(
 720                model, [old_field.column], foreign_key=True
 721            )
 722            if strict and len(fk_names) != 1:
 723                raise ValueError(
 724                    f"Found wrong number ({len(fk_names)}) of foreign key constraints for {model._meta.db_table}.{old_field.column}"
 725                )
 726            for fk_name in fk_names:
 727                fks_dropped.add((old_field.column,))
 728                self.execute(self._delete_fk_sql(model, fk_name))
 729        # Has unique been removed?
 730        if old_field.primary_key and (
 731            not new_field.primary_key
 732            or self._field_became_primary_key(old_field, new_field)
 733        ):
 734            # Find the unique constraint for this field
 735            meta_constraint_names = {
 736                constraint.name for constraint in model._meta.constraints
 737            }
 738            constraint_names = self._constraint_names(
 739                model,
 740                [old_field.column],
 741                unique=True,
 742                primary_key=False,
 743                exclude=meta_constraint_names,
 744            )
 745            if strict and len(constraint_names) != 1:
 746                raise ValueError(
 747                    f"Found wrong number ({len(constraint_names)}) of unique constraints for {model._meta.db_table}.{old_field.column}"
 748                )
 749            for constraint_name in constraint_names:
 750                self.execute(self._delete_unique_sql(model, constraint_name))
 751        # Drop incoming FK constraints if the field is a primary key or unique,
 752        # which might be a to_field target, and things are going to change.
 753        old_collation = old_db_params.get("collation")
 754        new_collation = new_db_params.get("collation")
 755        drop_foreign_keys = (
 756            self.connection.features.supports_foreign_keys
 757            and (old_field.primary_key and new_field.primary_key)
 758            and ((old_type != new_type) or (old_collation != new_collation))
 759        )
 760        if drop_foreign_keys:
 761            # '_meta.related_field' also contains M2M reverse fields, these
 762            # will be filtered out
 763            for _old_rel, new_rel in _related_non_m2m_objects(old_field, new_field):
 764                rel_fk_names = self._constraint_names(
 765                    new_rel.related_model, [new_rel.field.column], foreign_key=True
 766                )
 767                for fk_name in rel_fk_names:
 768                    self.execute(self._delete_fk_sql(new_rel.related_model, fk_name))
 769        # Removed an index? (no strict check, as multiple indexes are possible)
 770        # Remove indexes if db_index switched to False or a unique constraint
 771        # will now be used in lieu of an index. The following lines from the
 772        # truth table show all True cases; the rest are False:
 773        #
 774        # old_field.db_index | old_field.primary_key | new_field.db_index | new_field.primary_key
 775        # ------------------------------------------------------------------------------
 776        # True               | False            | False              | False
 777        # True               | False            | False              | True
 778        # True               | False            | True               | True
 779        if (
 780            (old_field.remote_field and old_field.db_index)
 781            and not old_field.primary_key
 782            and (
 783                not (new_field.remote_field and new_field.db_index)
 784                or new_field.primary_key
 785            )
 786        ):
 787            # Find the index for this field
 788            meta_index_names = {index.name for index in model._meta.indexes}
 789            # Retrieve only BTREE indexes since this is what's created with
 790            # db_index=True.
 791            index_names = self._constraint_names(
 792                model,
 793                [old_field.column],
 794                index=True,
 795                type_=Index.suffix,
 796                exclude=meta_index_names,
 797            )
 798            for index_name in index_names:
 799                # The only way to check if an index was created with
 800                # db_index=True or with Index(['field'], name='foo')
 801                # is to look at its name (refs #28053).
 802                self.execute(self._delete_index_sql(model, index_name))
 803        # Change check constraints?
 804        old_db_check = self._field_db_check(old_field, old_db_params)
 805        new_db_check = self._field_db_check(new_field, new_db_params)
 806        if old_db_check != new_db_check and old_db_check:
 807            meta_constraint_names = {
 808                constraint.name for constraint in model._meta.constraints
 809            }
 810            constraint_names = self._constraint_names(
 811                model,
 812                [old_field.column],
 813                check=True,
 814                exclude=meta_constraint_names,
 815            )
 816            if strict and len(constraint_names) != 1:
 817                raise ValueError(
 818                    f"Found wrong number ({len(constraint_names)}) of check constraints for {model._meta.db_table}.{old_field.column}"
 819                )
 820            for constraint_name in constraint_names:
 821                self.execute(self._delete_check_sql(model, constraint_name))
 822        # Have they renamed the column?
 823        if old_field.column != new_field.column:
 824            self.execute(
 825                self._rename_field_sql(
 826                    model._meta.db_table, old_field, new_field, new_type
 827                )
 828            )
 829            # Rename all references to the renamed column.
 830            for sql in self.deferred_sql:
 831                if isinstance(sql, Statement):
 832                    sql.rename_column_references(
 833                        model._meta.db_table, old_field.column, new_field.column
 834                    )
 835        # Next, start accumulating actions to do
 836        actions = []
 837        null_actions = []
 838        post_actions = []
 839        # Type suffix change? (e.g. auto increment).
 840        old_type_suffix = old_field.db_type_suffix(connection=self.connection)
 841        new_type_suffix = new_field.db_type_suffix(connection=self.connection)
 842        # Type, collation, or comment change?
 843        if (
 844            old_type != new_type
 845            or old_type_suffix != new_type_suffix
 846            or old_collation != new_collation
 847            or (
 848                self.connection.features.supports_comments
 849                and old_field.db_comment != new_field.db_comment
 850            )
 851        ):
 852            fragment, other_actions = self._alter_column_type_sql(
 853                model, old_field, new_field, new_type, old_collation, new_collation
 854            )
 855            actions.append(fragment)
 856            post_actions.extend(other_actions)
 857        # When changing a column NULL constraint to NOT NULL with a given
 858        # default value, we need to perform 4 steps:
 859        #  1. Add a default for new incoming writes
 860        #  2. Update existing NULL rows with new default
 861        #  3. Replace NULL constraint with NOT NULL
 862        #  4. Drop the default again.
 863        # Default change?
 864        needs_database_default = False
 865        if old_field.allow_null and not new_field.allow_null:
 866            old_default = self.effective_default(old_field)
 867            new_default = self.effective_default(new_field)
 868            if (
 869                not self.skip_default_on_alter(new_field)
 870                and old_default != new_default
 871                and new_default is not None
 872            ):
 873                needs_database_default = True
 874                actions.append(
 875                    self._alter_column_default_sql(model, old_field, new_field)
 876                )
 877        # Nullability change?
 878        if old_field.allow_null != new_field.allow_null:
 879            fragment = self._alter_column_null_sql(model, old_field, new_field)
 880            if fragment:
 881                null_actions.append(fragment)
 882        # Only if we have a default and there is a change from NULL to NOT NULL
 883        four_way_default_alteration = new_field.has_default() and (
 884            old_field.allow_null and not new_field.allow_null
 885        )
 886        if actions or null_actions:
 887            if not four_way_default_alteration:
 888                # If we don't have to do a 4-way default alteration we can
 889                # directly run a (NOT) NULL alteration
 890                actions += null_actions
 891            # Combine actions together if we can (e.g. postgres)
 892            if self.connection.features.supports_combined_alters and actions:
 893                sql, params = tuple(zip(*actions))
 894                actions = [(", ".join(sql), sum(params, []))]
 895            # Apply those actions
 896            for sql, params in actions:
 897                self.execute(
 898                    self.sql_alter_column
 899                    % {
 900                        "table": self.quote_name(model._meta.db_table),
 901                        "changes": sql,
 902                    },
 903                    params,
 904                )
 905            if four_way_default_alteration:
 906                # Update existing rows with default value
 907                self.execute(
 908                    self.sql_update_with_default
 909                    % {
 910                        "table": self.quote_name(model._meta.db_table),
 911                        "column": self.quote_name(new_field.column),
 912                        "default": "%s",
 913                    },
 914                    [new_default],
 915                )
 916                # Since we didn't run a NOT NULL change before we need to do it
 917                # now
 918                for sql, params in null_actions:
 919                    self.execute(
 920                        self.sql_alter_column
 921                        % {
 922                            "table": self.quote_name(model._meta.db_table),
 923                            "changes": sql,
 924                        },
 925                        params,
 926                    )
 927        if post_actions:
 928            for sql, params in post_actions:
 929                self.execute(sql, params)
 930        # If primary_key changed to False, delete the primary key constraint.
 931        if old_field.primary_key and not new_field.primary_key:
 932            self._delete_primary_key(model, strict)
 933
 934        # Added an index? Add an index if db_index switched to True or a unique
 935        # constraint will no longer be used in lieu of an index. The following
 936        # lines from the truth table show all True cases; the rest are False:
 937        #
 938        # old_field.db_index | old_field.primary_key | new_field.db_index | new_field.primary_key
 939        # ------------------------------------------------------------------------------
 940        # False              | False            | True               | False
 941        # False              | True             | True               | False
 942        # True               | True             | True               | False
 943        if (
 944            (
 945                not (old_field.remote_field and old_field.db_index)
 946                or old_field.primary_key
 947            )
 948            and (new_field.remote_field and new_field.db_index)
 949            and not new_field.primary_key
 950        ):
 951            self.execute(self._create_index_sql(model, fields=[new_field]))
 952        # Type alteration on primary key? Then we need to alter the column
 953        # referring to us.
 954        rels_to_update = []
 955        if drop_foreign_keys:
 956            rels_to_update.extend(_related_non_m2m_objects(old_field, new_field))
 957        # Changed to become primary key?
 958        if self._field_became_primary_key(old_field, new_field):
 959            # Make the new one
 960            self.execute(self._create_primary_key_sql(model, new_field))
 961            # Update all referencing columns
 962            rels_to_update.extend(_related_non_m2m_objects(old_field, new_field))
 963        # Handle our type alters on the other end of rels from the PK stuff above
 964        for old_rel, new_rel in rels_to_update:
 965            rel_db_params = new_rel.field.db_parameters(connection=self.connection)
 966            rel_type = rel_db_params["type"]
 967            rel_collation = rel_db_params.get("collation")
 968            old_rel_db_params = old_rel.field.db_parameters(connection=self.connection)
 969            old_rel_collation = old_rel_db_params.get("collation")
 970            fragment, other_actions = self._alter_column_type_sql(
 971                new_rel.related_model,
 972                old_rel.field,
 973                new_rel.field,
 974                rel_type,
 975                old_rel_collation,
 976                rel_collation,
 977            )
 978            self.execute(
 979                self.sql_alter_column
 980                % {
 981                    "table": self.quote_name(new_rel.related_model._meta.db_table),
 982                    "changes": fragment[0],
 983                },
 984                fragment[1],
 985            )
 986            for sql, params in other_actions:
 987                self.execute(sql, params)
 988        # Does it have a foreign key?
 989        if (
 990            self.connection.features.supports_foreign_keys
 991            and new_field.remote_field
 992            and (
 993                fks_dropped or not old_field.remote_field or not old_field.db_constraint
 994            )
 995            and new_field.db_constraint
 996        ):
 997            self.execute(
 998                self._create_fk_sql(model, new_field, "_fk_%(to_table)s_%(to_column)s")
 999            )
1000        # Rebuild FKs that pointed to us if we previously had to drop them
1001        if drop_foreign_keys:
1002            for _, rel in rels_to_update:
1003                if rel.field.db_constraint:
1004                    self.execute(
1005                        self._create_fk_sql(rel.related_model, rel.field, "_fk")
1006                    )
1007        # Does it have check constraints we need to add?
1008        if old_db_check != new_db_check and new_db_check:
1009            constraint_name = self._create_index_name(
1010                model._meta.db_table, [new_field.column], suffix="_check"
1011            )
1012            self.execute(
1013                self._create_check_sql(model, constraint_name, new_db_params["check"])
1014            )
1015        # Drop the default if we need to
1016        # (Plain usually does not use in-database defaults)
1017        if needs_database_default:
1018            changes_sql, params = self._alter_column_default_sql(
1019                model, old_field, new_field, drop=True
1020            )
1021            sql = self.sql_alter_column % {
1022                "table": self.quote_name(model._meta.db_table),
1023                "changes": changes_sql,
1024            }
1025            self.execute(sql, params)
1026        # Reset connection if required
1027        if self.connection.features.connection_persists_old_columns:
1028            self.connection.close()
1029
1030    def _alter_column_null_sql(self, model, old_field, new_field):
1031        """
1032        Hook to specialize column null alteration.
1033
1034        Return a (sql, params) fragment to set a column to null or non-null
1035        as required by new_field, or None if no changes are required.
1036        """
1037        new_db_params = new_field.db_parameters(connection=self.connection)
1038        sql = (
1039            self.sql_alter_column_null
1040            if new_field.allow_null
1041            else self.sql_alter_column_not_null
1042        )
1043        return (
1044            sql
1045            % {
1046                "column": self.quote_name(new_field.column),
1047                "type": new_db_params["type"],
1048            },
1049            [],
1050        )
1051
1052    def _alter_column_default_sql(self, model, old_field, new_field, drop=False):
1053        """
1054        Hook to specialize column default alteration.
1055
1056        Return a (sql, params) fragment to add or drop (depending on the drop
1057        argument) a default to new_field's column.
1058        """
1059        new_default = self.effective_default(new_field)
1060        default = self._column_default_sql(new_field)
1061        params = [new_default]
1062
1063        if drop:
1064            params = []
1065        elif self.connection.features.requires_literal_defaults:
1066            # Some databases (Oracle) can't take defaults as a parameter
1067            # If this is the case, the SchemaEditor for that database should
1068            # implement prepare_default().
1069            default = self.prepare_default(new_default)
1070            params = []
1071
1072        new_db_params = new_field.db_parameters(connection=self.connection)
1073        if drop:
1074            if new_field.allow_null:
1075                sql = self.sql_alter_column_no_default_null
1076            else:
1077                sql = self.sql_alter_column_no_default
1078        else:
1079            sql = self.sql_alter_column_default
1080        return (
1081            sql
1082            % {
1083                "column": self.quote_name(new_field.column),
1084                "type": new_db_params["type"],
1085                "default": default,
1086            },
1087            params,
1088        )
1089
1090    def _alter_column_type_sql(
1091        self, model, old_field, new_field, new_type, old_collation, new_collation
1092    ):
1093        """
1094        Hook to specialize column type alteration for different backends,
1095        for cases when a creation type is different to an alteration type
1096        (e.g. SERIAL in PostgreSQL, PostGIS fields).
1097
1098        Return a two-tuple of: an SQL fragment of (sql, params) to insert into
1099        an ALTER TABLE statement and a list of extra (sql, params) tuples to
1100        run once the field is altered.
1101        """
1102        other_actions = []
1103        if collate_sql := self._collate_sql(
1104            new_collation, old_collation, model._meta.db_table
1105        ):
1106            collate_sql = f" {collate_sql}"
1107        else:
1108            collate_sql = ""
1109        # Comment change?
1110        comment_sql = ""
1111        if self.connection.features.supports_comments and not new_field.many_to_many:
1112            if old_field.db_comment != new_field.db_comment:
1113                # PostgreSQL and Oracle can't execute 'ALTER COLUMN ...' and
1114                # 'COMMENT ON ...' at the same time.
1115                sql, params = self._alter_column_comment_sql(
1116                    model, new_field, new_type, new_field.db_comment
1117                )
1118                if sql:
1119                    other_actions.append((sql, params))
1120            if new_field.db_comment:
1121                comment_sql = self._comment_sql(new_field.db_comment)
1122        return (
1123            (
1124                self.sql_alter_column_type
1125                % {
1126                    "column": self.quote_name(new_field.column),
1127                    "type": new_type,
1128                    "collation": collate_sql,
1129                    "comment": comment_sql,
1130                },
1131                [],
1132            ),
1133            other_actions,
1134        )
1135
1136    def _alter_column_comment_sql(self, model, new_field, new_type, new_db_comment):
1137        return (
1138            self.sql_alter_column_comment
1139            % {
1140                "table": self.quote_name(model._meta.db_table),
1141                "column": self.quote_name(new_field.column),
1142                "comment": self._comment_sql(new_db_comment),
1143            },
1144            [],
1145        )
1146
1147    def _comment_sql(self, comment):
1148        return self.quote_value(comment or "")
1149
1150    def _alter_many_to_many(self, model, old_field, new_field, strict):
1151        """Alter M2Ms to repoint their to= endpoints."""
1152        # Rename the through table
1153        if (
1154            old_field.remote_field.through._meta.db_table
1155            != new_field.remote_field.through._meta.db_table
1156        ):
1157            self.alter_db_table(
1158                old_field.remote_field.through,
1159                old_field.remote_field.through._meta.db_table,
1160                new_field.remote_field.through._meta.db_table,
1161            )
1162        # Repoint the FK to the other side
1163        self.alter_field(
1164            new_field.remote_field.through,
1165            # The field that points to the target model is needed, so we can
1166            # tell alter_field to change it - this is m2m_reverse_field_name()
1167            # (as opposed to m2m_field_name(), which points to our model).
1168            old_field.remote_field.through._meta.get_field(
1169                old_field.m2m_reverse_field_name()
1170            ),
1171            new_field.remote_field.through._meta.get_field(
1172                new_field.m2m_reverse_field_name()
1173            ),
1174        )
1175        self.alter_field(
1176            new_field.remote_field.through,
1177            # for self-referential models we need to alter field from the other end too
1178            old_field.remote_field.through._meta.get_field(old_field.m2m_field_name()),
1179            new_field.remote_field.through._meta.get_field(new_field.m2m_field_name()),
1180        )
1181
1182    def _create_index_name(self, table_name, column_names, suffix=""):
1183        """
1184        Generate a unique name for an index/unique constraint.
1185
1186        The name is divided into 3 parts: the table name, the column names,
1187        and a unique digest and suffix.
1188        """
1189        _, table_name = split_identifier(table_name)
1190        hash_suffix_part = (
1191            f"{names_digest(table_name, *column_names, length=8)}{suffix}"
1192        )
1193        max_length = self.connection.ops.max_name_length() or 200
1194        # If everything fits into max_length, use that name.
1195        index_name = "{}_{}_{}".format(
1196            table_name, "_".join(column_names), hash_suffix_part
1197        )
1198        if len(index_name) <= max_length:
1199            return index_name
1200        # Shorten a long suffix.
1201        if len(hash_suffix_part) > max_length / 3:
1202            hash_suffix_part = hash_suffix_part[: max_length // 3]
1203        other_length = (max_length - len(hash_suffix_part)) // 2 - 1
1204        index_name = "{}_{}_{}".format(
1205            table_name[:other_length],
1206            "_".join(column_names)[:other_length],
1207            hash_suffix_part,
1208        )
1209        # Prepend D if needed to prevent the name from starting with an
1210        # underscore or a number (not permitted on Oracle).
1211        if index_name[0] == "_" or index_name[0].isdigit():
1212            index_name = f"D{index_name[:-1]}"
1213        return index_name
1214
1215    def _index_condition_sql(self, condition):
1216        if condition:
1217            return " WHERE " + condition
1218        return ""
1219
1220    def _index_include_sql(self, model, columns):
1221        if not columns or not self.connection.features.supports_covering_indexes:
1222            return ""
1223        return Statement(
1224            " INCLUDE (%(columns)s)",
1225            columns=Columns(model._meta.db_table, columns, self.quote_name),
1226        )
1227
1228    def _create_index_sql(
1229        self,
1230        model,
1231        *,
1232        fields=None,
1233        name=None,
1234        suffix="",
1235        using="",
1236        col_suffixes=(),
1237        sql=None,
1238        opclasses=(),
1239        condition=None,
1240        include=None,
1241        expressions=None,
1242    ):
1243        """
1244        Return the SQL statement to create the index for one or several fields
1245        or expressions. `sql` can be specified if the syntax differs from the
1246        standard (GIS indexes, ...).
1247        """
1248        fields = fields or []
1249        expressions = expressions or []
1250        compiler = Query(model, alias_cols=False).get_compiler()
1251        columns = [field.column for field in fields]
1252        sql_create_index = sql or self.sql_create_index
1253        table = model._meta.db_table
1254
1255        def create_index_name(*args, **kwargs):
1256            nonlocal name
1257            if name is None:
1258                name = self._create_index_name(*args, **kwargs)
1259            return self.quote_name(name)
1260
1261        return Statement(
1262            sql_create_index,
1263            table=Table(table, self.quote_name),
1264            name=IndexName(table, columns, suffix, create_index_name),
1265            using=using,
1266            columns=(
1267                self._index_columns(table, columns, col_suffixes, opclasses)
1268                if columns
1269                else Expressions(table, expressions, compiler, self.quote_value)
1270            ),
1271            extra="",
1272            condition=self._index_condition_sql(condition),
1273            include=self._index_include_sql(model, include),
1274        )
1275
1276    def _delete_index_sql(self, model, name, sql=None):
1277        return Statement(
1278            sql or self.sql_delete_index,
1279            table=Table(model._meta.db_table, self.quote_name),
1280            name=self.quote_name(name),
1281        )
1282
1283    def _rename_index_sql(self, model, old_name, new_name):
1284        return Statement(
1285            self.sql_rename_index,
1286            table=Table(model._meta.db_table, self.quote_name),
1287            old_name=self.quote_name(old_name),
1288            new_name=self.quote_name(new_name),
1289        )
1290
1291    def _index_columns(self, table, columns, col_suffixes, opclasses):
1292        return Columns(table, columns, self.quote_name, col_suffixes=col_suffixes)
1293
1294    def _model_indexes_sql(self, model):
1295        """
1296        Return a list of all index SQL statements (field indexes, Meta.indexes) for the specified model.
1297        """
1298        output = []
1299        for field in model._meta.local_fields:
1300            output.extend(self._field_indexes_sql(model, field))
1301
1302        for index in model._meta.indexes:
1303            if (
1304                not index.contains_expressions
1305                or self.connection.features.supports_expression_indexes
1306            ):
1307                output.append(index.create_sql(model, self))
1308        return output
1309
1310    def _field_indexes_sql(self, model, field):
1311        """
1312        Return a list of all index SQL statements for the specified field.
1313        """
1314        output = []
1315        if self._field_should_be_indexed(model, field):
1316            output.append(self._create_index_sql(model, fields=[field]))
1317        return output
1318
1319    def _field_should_be_altered(self, old_field, new_field, ignore=None):
1320        ignore = ignore or set()
1321        _, old_path, old_args, old_kwargs = old_field.deconstruct()
1322        _, new_path, new_args, new_kwargs = new_field.deconstruct()
1323        # Don't alter when:
1324        # - changing only a field name
1325        # - changing an attribute that doesn't affect the schema
1326        # - changing an attribute in the provided set of ignored attributes
1327        # - adding only a db_column and the column name is not changed
1328        for attr in ignore.union(old_field.non_db_attrs):
1329            old_kwargs.pop(attr, None)
1330        for attr in ignore.union(new_field.non_db_attrs):
1331            new_kwargs.pop(attr, None)
1332        return self.quote_name(old_field.column) != self.quote_name(
1333            new_field.column
1334        ) or (old_path, old_args, old_kwargs) != (new_path, new_args, new_kwargs)
1335
1336    def _field_should_be_indexed(self, model, field):
1337        return (field.remote_field and field.db_index) and not field.primary_key
1338
1339    def _field_became_primary_key(self, old_field, new_field):
1340        return not old_field.primary_key and new_field.primary_key
1341
1342    def _rename_field_sql(self, table, old_field, new_field, new_type):
1343        return self.sql_rename_column % {
1344            "table": self.quote_name(table),
1345            "old_column": self.quote_name(old_field.column),
1346            "new_column": self.quote_name(new_field.column),
1347            "type": new_type,
1348        }
1349
1350    def _create_fk_sql(self, model, field, suffix):
1351        table = Table(model._meta.db_table, self.quote_name)
1352        name = self._fk_constraint_name(model, field, suffix)
1353        column = Columns(model._meta.db_table, [field.column], self.quote_name)
1354        to_table = Table(field.target_field.model._meta.db_table, self.quote_name)
1355        to_column = Columns(
1356            field.target_field.model._meta.db_table,
1357            [field.target_field.column],
1358            self.quote_name,
1359        )
1360        deferrable = self.connection.ops.deferrable_sql()
1361        return Statement(
1362            self.sql_create_fk,
1363            table=table,
1364            name=name,
1365            column=column,
1366            to_table=to_table,
1367            to_column=to_column,
1368            deferrable=deferrable,
1369        )
1370
1371    def _fk_constraint_name(self, model, field, suffix):
1372        def create_fk_name(*args, **kwargs):
1373            return self.quote_name(self._create_index_name(*args, **kwargs))
1374
1375        return ForeignKeyName(
1376            model._meta.db_table,
1377            [field.column],
1378            split_identifier(field.target_field.model._meta.db_table)[1],
1379            [field.target_field.column],
1380            suffix,
1381            create_fk_name,
1382        )
1383
1384    def _delete_fk_sql(self, model, name):
1385        return self._delete_constraint_sql(self.sql_delete_fk, model, name)
1386
1387    def _deferrable_constraint_sql(self, deferrable):
1388        if deferrable is None:
1389            return ""
1390        if deferrable == Deferrable.DEFERRED:
1391            return " DEFERRABLE INITIALLY DEFERRED"
1392        if deferrable == Deferrable.IMMEDIATE:
1393            return " DEFERRABLE INITIALLY IMMEDIATE"
1394
1395    def _unique_sql(
1396        self,
1397        model,
1398        fields,
1399        name,
1400        condition=None,
1401        deferrable=None,
1402        include=None,
1403        opclasses=None,
1404        expressions=None,
1405    ):
1406        if (
1407            deferrable
1408            and not self.connection.features.supports_deferrable_unique_constraints
1409        ):
1410            return None
1411        if condition or include or opclasses or expressions:
1412            # Databases support conditional, covering, and functional unique
1413            # constraints via a unique index.
1414            sql = self._create_unique_sql(
1415                model,
1416                fields,
1417                name=name,
1418                condition=condition,
1419                include=include,
1420                opclasses=opclasses,
1421                expressions=expressions,
1422            )
1423            if sql:
1424                self.deferred_sql.append(sql)
1425            return None
1426        constraint = self.sql_unique_constraint % {
1427            "columns": ", ".join([self.quote_name(field.column) for field in fields]),
1428            "deferrable": self._deferrable_constraint_sql(deferrable),
1429        }
1430        return self.sql_constraint % {
1431            "name": self.quote_name(name),
1432            "constraint": constraint,
1433        }
1434
1435    def _create_unique_sql(
1436        self,
1437        model,
1438        fields,
1439        name=None,
1440        condition=None,
1441        deferrable=None,
1442        include=None,
1443        opclasses=None,
1444        expressions=None,
1445    ):
1446        if (
1447            (
1448                deferrable
1449                and not self.connection.features.supports_deferrable_unique_constraints
1450            )
1451            or (condition and not self.connection.features.supports_partial_indexes)
1452            or (include and not self.connection.features.supports_covering_indexes)
1453            or (
1454                expressions and not self.connection.features.supports_expression_indexes
1455            )
1456        ):
1457            return None
1458
1459        compiler = Query(model, alias_cols=False).get_compiler()
1460        table = model._meta.db_table
1461        columns = [field.column for field in fields]
1462        if name is None:
1463            name = self._unique_constraint_name(table, columns, quote=True)
1464        else:
1465            name = self.quote_name(name)
1466        if condition or include or opclasses or expressions:
1467            sql = self.sql_create_unique_index
1468        else:
1469            sql = self.sql_create_unique
1470        if columns:
1471            columns = self._index_columns(
1472                table, columns, col_suffixes=(), opclasses=opclasses
1473            )
1474        else:
1475            columns = Expressions(table, expressions, compiler, self.quote_value)
1476        return Statement(
1477            sql,
1478            table=Table(table, self.quote_name),
1479            name=name,
1480            columns=columns,
1481            condition=self._index_condition_sql(condition),
1482            deferrable=self._deferrable_constraint_sql(deferrable),
1483            include=self._index_include_sql(model, include),
1484        )
1485
1486    def _unique_constraint_name(self, table, columns, quote=True):
1487        if quote:
1488
1489            def create_unique_name(*args, **kwargs):
1490                return self.quote_name(self._create_index_name(*args, **kwargs))
1491
1492        else:
1493            create_unique_name = self._create_index_name
1494
1495        return IndexName(table, columns, "_uniq", create_unique_name)
1496
1497    def _delete_unique_sql(
1498        self,
1499        model,
1500        name,
1501        condition=None,
1502        deferrable=None,
1503        include=None,
1504        opclasses=None,
1505        expressions=None,
1506    ):
1507        if (
1508            (
1509                deferrable
1510                and not self.connection.features.supports_deferrable_unique_constraints
1511            )
1512            or (condition and not self.connection.features.supports_partial_indexes)
1513            or (include and not self.connection.features.supports_covering_indexes)
1514            or (
1515                expressions and not self.connection.features.supports_expression_indexes
1516            )
1517        ):
1518            return None
1519        if condition or include or opclasses or expressions:
1520            sql = self.sql_delete_index
1521        else:
1522            sql = self.sql_delete_unique
1523        return self._delete_constraint_sql(sql, model, name)
1524
1525    def _check_sql(self, name, check):
1526        return self.sql_constraint % {
1527            "name": self.quote_name(name),
1528            "constraint": self.sql_check_constraint % {"check": check},
1529        }
1530
1531    def _create_check_sql(self, model, name, check):
1532        if not self.connection.features.supports_table_check_constraints:
1533            return None
1534        return Statement(
1535            self.sql_create_check,
1536            table=Table(model._meta.db_table, self.quote_name),
1537            name=self.quote_name(name),
1538            check=check,
1539        )
1540
1541    def _delete_check_sql(self, model, name):
1542        if not self.connection.features.supports_table_check_constraints:
1543            return None
1544        return self._delete_constraint_sql(self.sql_delete_check, model, name)
1545
1546    def _delete_constraint_sql(self, template, model, name):
1547        return Statement(
1548            template,
1549            table=Table(model._meta.db_table, self.quote_name),
1550            name=self.quote_name(name),
1551        )
1552
1553    def _constraint_names(
1554        self,
1555        model,
1556        column_names=None,
1557        unique=None,
1558        primary_key=None,
1559        index=None,
1560        foreign_key=None,
1561        check=None,
1562        type_=None,
1563        exclude=None,
1564    ):
1565        """Return all constraint names matching the columns and conditions."""
1566        if column_names is not None:
1567            column_names = [
1568                self.connection.introspection.identifier_converter(
1569                    truncate_name(name, self.connection.ops.max_name_length())
1570                )
1571                if self.connection.features.truncates_names
1572                else self.connection.introspection.identifier_converter(name)
1573                for name in column_names
1574            ]
1575        with self.connection.cursor() as cursor:
1576            constraints = self.connection.introspection.get_constraints(
1577                cursor, model._meta.db_table
1578            )
1579        result = []
1580        for name, infodict in constraints.items():
1581            if column_names is None or column_names == infodict["columns"]:
1582                if unique is not None and infodict["unique"] != unique:
1583                    continue
1584                if primary_key is not None and infodict["primary_key"] != primary_key:
1585                    continue
1586                if index is not None and infodict["index"] != index:
1587                    continue
1588                if check is not None and infodict["check"] != check:
1589                    continue
1590                if foreign_key is not None and not infodict["foreign_key"]:
1591                    continue
1592                if type_ is not None and infodict["type"] != type_:
1593                    continue
1594                if not exclude or name not in exclude:
1595                    result.append(name)
1596        return result
1597
1598    def _delete_primary_key(self, model, strict=False):
1599        constraint_names = self._constraint_names(model, primary_key=True)
1600        if strict and len(constraint_names) != 1:
1601            raise ValueError(
1602                f"Found wrong number ({len(constraint_names)}) of PK constraints for {model._meta.db_table}"
1603            )
1604        for constraint_name in constraint_names:
1605            self.execute(self._delete_primary_key_sql(model, constraint_name))
1606
1607    def _create_primary_key_sql(self, model, field):
1608        return Statement(
1609            self.sql_create_pk,
1610            table=Table(model._meta.db_table, self.quote_name),
1611            name=self.quote_name(
1612                self._create_index_name(
1613                    model._meta.db_table, [field.column], suffix="_pk"
1614                )
1615            ),
1616            columns=Columns(model._meta.db_table, [field.column], self.quote_name),
1617        )
1618
1619    def _delete_primary_key_sql(self, model, name):
1620        return self._delete_constraint_sql(self.sql_delete_pk, model, name)
1621
1622    def _collate_sql(self, collation, old_collation=None, table_name=None):
1623        return "COLLATE " + self.quote_name(collation) if collation else ""