Plain is headed towards 1.0! Subscribe for development updates →

   1import logging
   2import operator
   3from datetime import datetime
   4
   5from plain.models.backends.ddl_references import (
   6    Columns,
   7    Expressions,
   8    ForeignKeyName,
   9    IndexName,
  10    Statement,
  11    Table,
  12)
  13from plain.models.backends.utils import names_digest, split_identifier, truncate_name
  14from plain.models.constraints import Deferrable
  15from plain.models.indexes import Index
  16from plain.models.sql import Query
  17from plain.models.transaction import TransactionManagementError, atomic
  18from plain.runtime import settings
  19from plain.utils import timezone
  20
  21logger = logging.getLogger("plain.models.backends.schema")
  22
  23
  24def _is_relevant_relation(relation, altered_field):
  25    """
  26    When altering the given field, must constraints on its model from the given
  27    relation be temporarily dropped?
  28    """
  29    field = relation.field
  30    if field.many_to_many:
  31        # M2M reverse field
  32        return False
  33    if altered_field.primary_key and field.to_fields == [None]:
  34        # Foreign key constraint on the primary key, which is being altered.
  35        return True
  36    # Is the constraint targeting the field being altered?
  37    return altered_field.name in field.to_fields
  38
  39
  40def _all_related_fields(model):
  41    # Related fields must be returned in a deterministic order.
  42    return sorted(
  43        model._meta._get_fields(
  44            forward=False,
  45            reverse=True,
  46            include_hidden=True,
  47            include_parents=False,
  48        ),
  49        key=operator.attrgetter("name"),
  50    )
  51
  52
  53def _related_non_m2m_objects(old_field, new_field):
  54    # Filter out m2m objects from reverse relations.
  55    # Return (old_relation, new_relation) tuples.
  56    related_fields = zip(
  57        (
  58            obj
  59            for obj in _all_related_fields(old_field.model)
  60            if _is_relevant_relation(obj, old_field)
  61        ),
  62        (
  63            obj
  64            for obj in _all_related_fields(new_field.model)
  65            if _is_relevant_relation(obj, new_field)
  66        ),
  67    )
  68    for old_rel, new_rel in related_fields:
  69        yield old_rel, new_rel
  70        yield from _related_non_m2m_objects(
  71            old_rel.remote_field,
  72            new_rel.remote_field,
  73        )
  74
  75
  76class BaseDatabaseSchemaEditor:
  77    """
  78    This class and its subclasses are responsible for emitting schema-changing
  79    statements to the databases - model creation/removal/alteration, field
  80    renaming, index fiddling, and so on.
  81    """
  82
  83    # Overrideable SQL templates
  84    sql_create_table = "CREATE TABLE %(table)s (%(definition)s)"
  85    sql_rename_table = "ALTER TABLE %(old_table)s RENAME TO %(new_table)s"
  86    sql_retablespace_table = "ALTER TABLE %(table)s SET TABLESPACE %(new_tablespace)s"
  87    sql_delete_table = "DROP TABLE %(table)s CASCADE"
  88
  89    sql_create_column = "ALTER TABLE %(table)s ADD COLUMN %(column)s %(definition)s"
  90    sql_alter_column = "ALTER TABLE %(table)s %(changes)s"
  91    sql_alter_column_type = "ALTER COLUMN %(column)s TYPE %(type)s%(collation)s"
  92    sql_alter_column_null = "ALTER COLUMN %(column)s DROP NOT NULL"
  93    sql_alter_column_not_null = "ALTER COLUMN %(column)s SET NOT NULL"
  94    sql_alter_column_default = "ALTER COLUMN %(column)s SET DEFAULT %(default)s"
  95    sql_alter_column_no_default = "ALTER COLUMN %(column)s DROP DEFAULT"
  96    sql_alter_column_no_default_null = sql_alter_column_no_default
  97    sql_delete_column = "ALTER TABLE %(table)s DROP COLUMN %(column)s CASCADE"
  98    sql_rename_column = (
  99        "ALTER TABLE %(table)s RENAME COLUMN %(old_column)s TO %(new_column)s"
 100    )
 101    sql_update_with_default = (
 102        "UPDATE %(table)s SET %(column)s = %(default)s WHERE %(column)s IS NULL"
 103    )
 104
 105    sql_unique_constraint = "UNIQUE (%(columns)s)%(deferrable)s"
 106    sql_check_constraint = "CHECK (%(check)s)"
 107    sql_delete_constraint = "ALTER TABLE %(table)s DROP CONSTRAINT %(name)s"
 108    sql_constraint = "CONSTRAINT %(name)s %(constraint)s"
 109
 110    sql_create_check = "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s CHECK (%(check)s)"
 111    sql_delete_check = sql_delete_constraint
 112
 113    sql_create_unique = (
 114        "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s "
 115        "UNIQUE (%(columns)s)%(deferrable)s"
 116    )
 117    sql_delete_unique = sql_delete_constraint
 118
 119    sql_create_fk = (
 120        "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s FOREIGN KEY (%(column)s) "
 121        "REFERENCES %(to_table)s (%(to_column)s)%(deferrable)s"
 122    )
 123    sql_create_inline_fk = None
 124    sql_create_column_inline_fk = None
 125    sql_delete_fk = sql_delete_constraint
 126
 127    sql_create_index = (
 128        "CREATE INDEX %(name)s ON %(table)s "
 129        "(%(columns)s)%(include)s%(extra)s%(condition)s"
 130    )
 131    sql_create_unique_index = (
 132        "CREATE UNIQUE INDEX %(name)s ON %(table)s "
 133        "(%(columns)s)%(include)s%(condition)s"
 134    )
 135    sql_rename_index = "ALTER INDEX %(old_name)s RENAME TO %(new_name)s"
 136    sql_delete_index = "DROP INDEX %(name)s"
 137
 138    sql_create_pk = (
 139        "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s PRIMARY KEY (%(columns)s)"
 140    )
 141    sql_delete_pk = sql_delete_constraint
 142
 143    sql_delete_procedure = "DROP PROCEDURE %(procedure)s"
 144
 145    sql_alter_table_comment = "COMMENT ON TABLE %(table)s IS %(comment)s"
 146    sql_alter_column_comment = "COMMENT ON COLUMN %(table)s.%(column)s IS %(comment)s"
 147
 148    def __init__(self, connection, collect_sql=False, atomic=True):
 149        self.connection = connection
 150        self.collect_sql = collect_sql
 151        if self.collect_sql:
 152            self.collected_sql = []
 153        self.atomic_migration = self.connection.features.can_rollback_ddl and atomic
 154
 155    # State-managing methods
 156
 157    def __enter__(self):
 158        self.deferred_sql = []
 159        if self.atomic_migration:
 160            self.atomic = atomic(self.connection.alias)
 161            self.atomic.__enter__()
 162        return self
 163
 164    def __exit__(self, exc_type, exc_value, traceback):
 165        if exc_type is None:
 166            for sql in self.deferred_sql:
 167                self.execute(sql)
 168        if self.atomic_migration:
 169            self.atomic.__exit__(exc_type, exc_value, traceback)
 170
 171    # Core utility functions
 172
 173    def execute(self, sql, params=()):
 174        """Execute the given SQL statement, with optional parameters."""
 175        # Don't perform the transactional DDL check if SQL is being collected
 176        # as it's not going to be executed anyway.
 177        if (
 178            not self.collect_sql
 179            and self.connection.in_atomic_block
 180            and not self.connection.features.can_rollback_ddl
 181        ):
 182            raise TransactionManagementError(
 183                "Executing DDL statements while in a transaction on databases "
 184                "that can't perform a rollback is prohibited."
 185            )
 186        # Account for non-string statement objects.
 187        sql = str(sql)
 188        # Log the command we're running, then run it
 189        logger.debug(
 190            "%s; (params %r)", sql, params, extra={"params": params, "sql": sql}
 191        )
 192        if self.collect_sql:
 193            ending = "" if sql.rstrip().endswith(";") else ";"
 194            if params is not None:
 195                self.collected_sql.append(
 196                    (sql % tuple(map(self.quote_value, params))) + ending
 197                )
 198            else:
 199                self.collected_sql.append(sql + ending)
 200        else:
 201            with self.connection.cursor() as cursor:
 202                cursor.execute(sql, params)
 203
 204    def quote_name(self, name):
 205        return self.connection.ops.quote_name(name)
 206
 207    def table_sql(self, model):
 208        """Take a model and return its table definition."""
 209        # Create column SQL, add FK deferreds if needed.
 210        column_sqls = []
 211        params = []
 212        for field in model._meta.local_fields:
 213            # SQL.
 214            definition, extra_params = self.column_sql(model, field)
 215            if definition is None:
 216                continue
 217            # Check constraints can go on the column SQL here.
 218            db_params = field.db_parameters(connection=self.connection)
 219            if db_params["check"]:
 220                definition += " " + self.sql_check_constraint % db_params
 221            # Autoincrement SQL (for backends with inline variant).
 222            col_type_suffix = field.db_type_suffix(connection=self.connection)
 223            if col_type_suffix:
 224                definition += " %s" % col_type_suffix
 225            params.extend(extra_params)
 226            # FK.
 227            if field.remote_field and field.db_constraint:
 228                to_table = field.remote_field.model._meta.db_table
 229                to_column = field.remote_field.model._meta.get_field(
 230                    field.remote_field.field_name
 231                ).column
 232                if self.sql_create_inline_fk:
 233                    definition += " " + self.sql_create_inline_fk % {
 234                        "to_table": self.quote_name(to_table),
 235                        "to_column": self.quote_name(to_column),
 236                    }
 237                elif self.connection.features.supports_foreign_keys:
 238                    self.deferred_sql.append(
 239                        self._create_fk_sql(
 240                            model, field, "_fk_%(to_table)s_%(to_column)s"
 241                        )
 242                    )
 243            # Add the SQL to our big list.
 244            column_sqls.append(f"{self.quote_name(field.column)} {definition}")
 245            # Autoincrement SQL (for backends with post table definition
 246            # variant).
 247            if field.get_internal_type() in (
 248                "AutoField",
 249                "BigAutoField",
 250                "SmallAutoField",
 251            ):
 252                autoinc_sql = self.connection.ops.autoinc_sql(
 253                    model._meta.db_table, field.column
 254                )
 255                if autoinc_sql:
 256                    self.deferred_sql.extend(autoinc_sql)
 257        constraints = [
 258            constraint.constraint_sql(model, self)
 259            for constraint in model._meta.constraints
 260        ]
 261        sql = self.sql_create_table % {
 262            "table": self.quote_name(model._meta.db_table),
 263            "definition": ", ".join(
 264                str(constraint)
 265                for constraint in (*column_sqls, *constraints)
 266                if constraint
 267            ),
 268        }
 269        if model._meta.db_tablespace:
 270            tablespace_sql = self.connection.ops.tablespace_sql(
 271                model._meta.db_tablespace
 272            )
 273            if tablespace_sql:
 274                sql += " " + tablespace_sql
 275        return sql, params
 276
 277    # Field <-> database mapping functions
 278
 279    def _iter_column_sql(
 280        self, column_db_type, params, model, field, field_db_params, include_default
 281    ):
 282        yield column_db_type
 283        if collation := field_db_params.get("collation"):
 284            yield self._collate_sql(collation)
 285        if self.connection.features.supports_comments_inline and field.db_comment:
 286            yield self._comment_sql(field.db_comment)
 287        # Work out nullability.
 288        null = field.null
 289        # Include a default value, if requested.
 290        include_default = (
 291            include_default
 292            and not self.skip_default(field)
 293            and
 294            # Don't include a default value if it's a nullable field and the
 295            # default cannot be dropped in the ALTER COLUMN statement (e.g.
 296            # MySQL longtext and longblob).
 297            not (null and self.skip_default_on_alter(field))
 298        )
 299        if include_default:
 300            default_value = self.effective_default(field)
 301            if default_value is not None:
 302                column_default = "DEFAULT " + self._column_default_sql(field)
 303                if self.connection.features.requires_literal_defaults:
 304                    # Some databases can't take defaults as a parameter (Oracle).
 305                    # If this is the case, the individual schema backend should
 306                    # implement prepare_default().
 307                    yield column_default % self.prepare_default(default_value)
 308                else:
 309                    yield column_default
 310                    params.append(default_value)
 311        # Oracle treats the empty string ('') as null, so coerce the null
 312        # option whenever '' is a possible value.
 313        if (
 314            field.empty_strings_allowed
 315            and not field.primary_key
 316            and self.connection.features.interprets_empty_strings_as_nulls
 317        ):
 318            null = True
 319        if not null:
 320            yield "NOT NULL"
 321        elif not self.connection.features.implied_column_null:
 322            yield "NULL"
 323        if field.primary_key:
 324            yield "PRIMARY KEY"
 325        elif field.unique:
 326            yield "UNIQUE"
 327        # Optionally add the tablespace if it's an implicitly indexed column.
 328        tablespace = field.db_tablespace or model._meta.db_tablespace
 329        if (
 330            tablespace
 331            and self.connection.features.supports_tablespaces
 332            and field.unique
 333        ):
 334            yield self.connection.ops.tablespace_sql(tablespace, inline=True)
 335
 336    def column_sql(self, model, field, include_default=False):
 337        """
 338        Return the column definition for a field. The field must already have
 339        had set_attributes_from_name() called.
 340        """
 341        # Get the column's type and use that as the basis of the SQL.
 342        field_db_params = field.db_parameters(connection=self.connection)
 343        column_db_type = field_db_params["type"]
 344        # Check for fields that aren't actually columns (e.g. M2M).
 345        if column_db_type is None:
 346            return None, None
 347        params = []
 348        return (
 349            " ".join(
 350                # This appends to the params being returned.
 351                self._iter_column_sql(
 352                    column_db_type,
 353                    params,
 354                    model,
 355                    field,
 356                    field_db_params,
 357                    include_default,
 358                )
 359            ),
 360            params,
 361        )
 362
 363    def skip_default(self, field):
 364        """
 365        Some backends don't accept default values for certain columns types
 366        (i.e. MySQL longtext and longblob).
 367        """
 368        return False
 369
 370    def skip_default_on_alter(self, field):
 371        """
 372        Some backends don't accept default values for certain columns types
 373        (i.e. MySQL longtext and longblob) in the ALTER COLUMN statement.
 374        """
 375        return False
 376
 377    def prepare_default(self, value):
 378        """
 379        Only used for backends which have requires_literal_defaults feature
 380        """
 381        raise NotImplementedError(
 382            "subclasses of BaseDatabaseSchemaEditor for backends which have "
 383            "requires_literal_defaults must provide a prepare_default() method"
 384        )
 385
 386    def _column_default_sql(self, field):
 387        """
 388        Return the SQL to use in a DEFAULT clause. The resulting string should
 389        contain a '%s' placeholder for a default value.
 390        """
 391        return "%s"
 392
 393    @staticmethod
 394    def _effective_default(field):
 395        # This method allows testing its logic without a connection.
 396        if field.has_default():
 397            default = field.get_default()
 398        elif not field.null and field.blank and field.empty_strings_allowed:
 399            if field.get_internal_type() == "BinaryField":
 400                default = b""
 401            else:
 402                default = ""
 403        elif getattr(field, "auto_now", False) or getattr(field, "auto_now_add", False):
 404            internal_type = field.get_internal_type()
 405            if internal_type == "DateTimeField":
 406                default = timezone.now()
 407            else:
 408                default = datetime.now()
 409                if internal_type == "DateField":
 410                    default = default.date()
 411                elif internal_type == "TimeField":
 412                    default = default.time()
 413        else:
 414            default = None
 415        return default
 416
 417    def effective_default(self, field):
 418        """Return a field's effective database default value."""
 419        return field.get_db_prep_save(self._effective_default(field), self.connection)
 420
 421    def quote_value(self, value):
 422        """
 423        Return a quoted version of the value so it's safe to use in an SQL
 424        string. This is not safe against injection from user code; it is
 425        intended only for use in making SQL scripts or preparing default values
 426        for particularly tricky backends (defaults are not user-defined, though,
 427        so this is safe).
 428        """
 429        raise NotImplementedError()
 430
 431    # Actions
 432
 433    def create_model(self, model):
 434        """
 435        Create a table and any accompanying indexes or unique constraints for
 436        the given `model`.
 437        """
 438        sql, params = self.table_sql(model)
 439        # Prevent using [] as params, in the case a literal '%' is used in the
 440        # definition.
 441        self.execute(sql, params or None)
 442
 443        if self.connection.features.supports_comments:
 444            # Add table comment.
 445            if model._meta.db_table_comment:
 446                self.alter_db_table_comment(model, None, model._meta.db_table_comment)
 447            # Add column comments.
 448            if not self.connection.features.supports_comments_inline:
 449                for field in model._meta.local_fields:
 450                    if field.db_comment:
 451                        field_db_params = field.db_parameters(
 452                            connection=self.connection
 453                        )
 454                        field_type = field_db_params["type"]
 455                        self.execute(
 456                            *self._alter_column_comment_sql(
 457                                model, field, field_type, field.db_comment
 458                            )
 459                        )
 460        # Add any field index (deferred as SQLite _remake_table needs it).
 461        self.deferred_sql.extend(self._model_indexes_sql(model))
 462
 463        # Make M2M tables
 464        for field in model._meta.local_many_to_many:
 465            if field.remote_field.through._meta.auto_created:
 466                self.create_model(field.remote_field.through)
 467
 468    def delete_model(self, model):
 469        """Delete a model from the database."""
 470        # Handle auto-created intermediary models
 471        for field in model._meta.local_many_to_many:
 472            if field.remote_field.through._meta.auto_created:
 473                self.delete_model(field.remote_field.through)
 474
 475        # Delete the table
 476        self.execute(
 477            self.sql_delete_table
 478            % {
 479                "table": self.quote_name(model._meta.db_table),
 480            }
 481        )
 482        # Remove all deferred statements referencing the deleted table.
 483        for sql in list(self.deferred_sql):
 484            if isinstance(sql, Statement) and sql.references_table(
 485                model._meta.db_table
 486            ):
 487                self.deferred_sql.remove(sql)
 488
 489    def add_index(self, model, index):
 490        """Add an index on a model."""
 491        if (
 492            index.contains_expressions
 493            and not self.connection.features.supports_expression_indexes
 494        ):
 495            return None
 496        # Index.create_sql returns interpolated SQL which makes params=None a
 497        # necessity to avoid escaping attempts on execution.
 498        self.execute(index.create_sql(model, self), params=None)
 499
 500    def remove_index(self, model, index):
 501        """Remove an index from a model."""
 502        if (
 503            index.contains_expressions
 504            and not self.connection.features.supports_expression_indexes
 505        ):
 506            return None
 507        self.execute(index.remove_sql(model, self))
 508
 509    def rename_index(self, model, old_index, new_index):
 510        if self.connection.features.can_rename_index:
 511            self.execute(
 512                self._rename_index_sql(model, old_index.name, new_index.name),
 513                params=None,
 514            )
 515        else:
 516            self.remove_index(model, old_index)
 517            self.add_index(model, new_index)
 518
 519    def add_constraint(self, model, constraint):
 520        """Add a constraint to a model."""
 521        sql = constraint.create_sql(model, self)
 522        if sql:
 523            # Constraint.create_sql returns interpolated SQL which makes
 524            # params=None a necessity to avoid escaping attempts on execution.
 525            self.execute(sql, params=None)
 526
 527    def remove_constraint(self, model, constraint):
 528        """Remove a constraint from a model."""
 529        sql = constraint.remove_sql(model, self)
 530        if sql:
 531            self.execute(sql)
 532
 533    def alter_db_table(self, model, old_db_table, new_db_table):
 534        """Rename the table a model points to."""
 535        if old_db_table == new_db_table or (
 536            self.connection.features.ignores_table_name_case
 537            and old_db_table.lower() == new_db_table.lower()
 538        ):
 539            return
 540        self.execute(
 541            self.sql_rename_table
 542            % {
 543                "old_table": self.quote_name(old_db_table),
 544                "new_table": self.quote_name(new_db_table),
 545            }
 546        )
 547        # Rename all references to the old table name.
 548        for sql in self.deferred_sql:
 549            if isinstance(sql, Statement):
 550                sql.rename_table_references(old_db_table, new_db_table)
 551
 552    def alter_db_table_comment(self, model, old_db_table_comment, new_db_table_comment):
 553        self.execute(
 554            self.sql_alter_table_comment
 555            % {
 556                "table": self.quote_name(model._meta.db_table),
 557                "comment": self.quote_value(new_db_table_comment or ""),
 558            }
 559        )
 560
 561    def alter_db_tablespace(self, model, old_db_tablespace, new_db_tablespace):
 562        """Move a model's table between tablespaces."""
 563        self.execute(
 564            self.sql_retablespace_table
 565            % {
 566                "table": self.quote_name(model._meta.db_table),
 567                "old_tablespace": self.quote_name(old_db_tablespace),
 568                "new_tablespace": self.quote_name(new_db_tablespace),
 569            }
 570        )
 571
 572    def add_field(self, model, field):
 573        """
 574        Create a field on a model. Usually involves adding a column, but may
 575        involve adding a table instead (for M2M fields).
 576        """
 577        # Special-case implicit M2M tables
 578        if field.many_to_many and field.remote_field.through._meta.auto_created:
 579            return self.create_model(field.remote_field.through)
 580        # Get the column's definition
 581        definition, params = self.column_sql(model, field, include_default=True)
 582        # It might not actually have a column behind it
 583        if definition is None:
 584            return
 585        if col_type_suffix := field.db_type_suffix(connection=self.connection):
 586            definition += f" {col_type_suffix}"
 587        # Check constraints can go on the column SQL here
 588        db_params = field.db_parameters(connection=self.connection)
 589        if db_params["check"]:
 590            definition += " " + self.sql_check_constraint % db_params
 591        if (
 592            field.remote_field
 593            and self.connection.features.supports_foreign_keys
 594            and field.db_constraint
 595        ):
 596            constraint_suffix = "_fk_%(to_table)s_%(to_column)s"
 597            # Add FK constraint inline, if supported.
 598            if self.sql_create_column_inline_fk:
 599                to_table = field.remote_field.model._meta.db_table
 600                to_column = field.remote_field.model._meta.get_field(
 601                    field.remote_field.field_name
 602                ).column
 603                namespace, _ = split_identifier(model._meta.db_table)
 604                definition += " " + self.sql_create_column_inline_fk % {
 605                    "name": self._fk_constraint_name(model, field, constraint_suffix),
 606                    "namespace": "%s." % self.quote_name(namespace)
 607                    if namespace
 608                    else "",
 609                    "column": self.quote_name(field.column),
 610                    "to_table": self.quote_name(to_table),
 611                    "to_column": self.quote_name(to_column),
 612                    "deferrable": self.connection.ops.deferrable_sql(),
 613                }
 614            # Otherwise, add FK constraints later.
 615            else:
 616                self.deferred_sql.append(
 617                    self._create_fk_sql(model, field, constraint_suffix)
 618                )
 619        # Build the SQL and run it
 620        sql = self.sql_create_column % {
 621            "table": self.quote_name(model._meta.db_table),
 622            "column": self.quote_name(field.column),
 623            "definition": definition,
 624        }
 625        self.execute(sql, params)
 626        # Drop the default if we need to
 627        # (Plain usually does not use in-database defaults)
 628        if (
 629            not self.skip_default_on_alter(field)
 630            and self.effective_default(field) is not None
 631        ):
 632            changes_sql, params = self._alter_column_default_sql(
 633                model, None, field, drop=True
 634            )
 635            sql = self.sql_alter_column % {
 636                "table": self.quote_name(model._meta.db_table),
 637                "changes": changes_sql,
 638            }
 639            self.execute(sql, params)
 640        # Add field comment, if required.
 641        if (
 642            field.db_comment
 643            and self.connection.features.supports_comments
 644            and not self.connection.features.supports_comments_inline
 645        ):
 646            field_type = db_params["type"]
 647            self.execute(
 648                *self._alter_column_comment_sql(
 649                    model, field, field_type, field.db_comment
 650                )
 651            )
 652        # Add an index, if required
 653        self.deferred_sql.extend(self._field_indexes_sql(model, field))
 654        # Reset connection if required
 655        if self.connection.features.connection_persists_old_columns:
 656            self.connection.close()
 657
 658    def remove_field(self, model, field):
 659        """
 660        Remove a field from a model. Usually involves deleting a column,
 661        but for M2Ms may involve deleting a table.
 662        """
 663        # Special-case implicit M2M tables
 664        if field.many_to_many and field.remote_field.through._meta.auto_created:
 665            return self.delete_model(field.remote_field.through)
 666        # It might not actually have a column behind it
 667        if field.db_parameters(connection=self.connection)["type"] is None:
 668            return
 669        # Drop any FK constraints, MySQL requires explicit deletion
 670        if field.remote_field:
 671            fk_names = self._constraint_names(model, [field.column], foreign_key=True)
 672            for fk_name in fk_names:
 673                self.execute(self._delete_fk_sql(model, fk_name))
 674        # Delete the column
 675        sql = self.sql_delete_column % {
 676            "table": self.quote_name(model._meta.db_table),
 677            "column": self.quote_name(field.column),
 678        }
 679        self.execute(sql)
 680        # Reset connection if required
 681        if self.connection.features.connection_persists_old_columns:
 682            self.connection.close()
 683        # Remove all deferred statements referencing the deleted column.
 684        for sql in list(self.deferred_sql):
 685            if isinstance(sql, Statement) and sql.references_column(
 686                model._meta.db_table, field.column
 687            ):
 688                self.deferred_sql.remove(sql)
 689
 690    def alter_field(self, model, old_field, new_field, strict=False):
 691        """
 692        Allow a field's type, uniqueness, nullability, default, column,
 693        constraints, etc. to be modified.
 694        `old_field` is required to compute the necessary changes.
 695        If `strict` is True, raise errors if the old column does not match
 696        `old_field` precisely.
 697        """
 698        if not self._field_should_be_altered(old_field, new_field):
 699            return
 700        # Ensure this field is even column-based
 701        old_db_params = old_field.db_parameters(connection=self.connection)
 702        old_type = old_db_params["type"]
 703        new_db_params = new_field.db_parameters(connection=self.connection)
 704        new_type = new_db_params["type"]
 705        if (old_type is None and old_field.remote_field is None) or (
 706            new_type is None and new_field.remote_field is None
 707        ):
 708            raise ValueError(
 709                "Cannot alter field {} into {} - they do not properly define "
 710                "db_type (are you using a badly-written custom field?)".format(
 711                    old_field, new_field
 712                ),
 713            )
 714        elif (
 715            old_type is None
 716            and new_type is None
 717            and (
 718                old_field.remote_field.through
 719                and new_field.remote_field.through
 720                and old_field.remote_field.through._meta.auto_created
 721                and new_field.remote_field.through._meta.auto_created
 722            )
 723        ):
 724            return self._alter_many_to_many(model, old_field, new_field, strict)
 725        elif (
 726            old_type is None
 727            and new_type is None
 728            and (
 729                old_field.remote_field.through
 730                and new_field.remote_field.through
 731                and not old_field.remote_field.through._meta.auto_created
 732                and not new_field.remote_field.through._meta.auto_created
 733            )
 734        ):
 735            # Both sides have through models; this is a no-op.
 736            return
 737        elif old_type is None or new_type is None:
 738            raise ValueError(
 739                "Cannot alter field {} into {} - they are not compatible types "
 740                "(you cannot alter to or from M2M fields, or add or remove "
 741                "through= on M2M fields)".format(old_field, new_field)
 742            )
 743
 744        self._alter_field(
 745            model,
 746            old_field,
 747            new_field,
 748            old_type,
 749            new_type,
 750            old_db_params,
 751            new_db_params,
 752            strict,
 753        )
 754
 755    def _field_db_check(self, field, field_db_params):
 756        # Always check constraints with the same mocked column name to avoid
 757        # recreating constrains when the column is renamed.
 758        check_constraints = self.connection.data_type_check_constraints
 759        data = field.db_type_parameters(self.connection)
 760        data["column"] = "__column_name__"
 761        try:
 762            return check_constraints[field.get_internal_type()] % data
 763        except KeyError:
 764            return None
 765
 766    def _alter_field(
 767        self,
 768        model,
 769        old_field,
 770        new_field,
 771        old_type,
 772        new_type,
 773        old_db_params,
 774        new_db_params,
 775        strict=False,
 776    ):
 777        """Perform a "physical" (non-ManyToMany) field update."""
 778        # Drop any FK constraints, we'll remake them later
 779        fks_dropped = set()
 780        if (
 781            self.connection.features.supports_foreign_keys
 782            and old_field.remote_field
 783            and old_field.db_constraint
 784            and self._field_should_be_altered(
 785                old_field,
 786                new_field,
 787                ignore={"db_comment"},
 788            )
 789        ):
 790            fk_names = self._constraint_names(
 791                model, [old_field.column], foreign_key=True
 792            )
 793            if strict and len(fk_names) != 1:
 794                raise ValueError(
 795                    "Found wrong number ({}) of foreign key constraints for {}.{}".format(
 796                        len(fk_names),
 797                        model._meta.db_table,
 798                        old_field.column,
 799                    )
 800                )
 801            for fk_name in fk_names:
 802                fks_dropped.add((old_field.column,))
 803                self.execute(self._delete_fk_sql(model, fk_name))
 804        # Has unique been removed?
 805        if old_field.unique and (
 806            not new_field.unique or self._field_became_primary_key(old_field, new_field)
 807        ):
 808            # Find the unique constraint for this field
 809            meta_constraint_names = {
 810                constraint.name for constraint in model._meta.constraints
 811            }
 812            constraint_names = self._constraint_names(
 813                model,
 814                [old_field.column],
 815                unique=True,
 816                primary_key=False,
 817                exclude=meta_constraint_names,
 818            )
 819            if strict and len(constraint_names) != 1:
 820                raise ValueError(
 821                    "Found wrong number ({}) of unique constraints for {}.{}".format(
 822                        len(constraint_names),
 823                        model._meta.db_table,
 824                        old_field.column,
 825                    )
 826                )
 827            for constraint_name in constraint_names:
 828                self.execute(self._delete_unique_sql(model, constraint_name))
 829        # Drop incoming FK constraints if the field is a primary key or unique,
 830        # which might be a to_field target, and things are going to change.
 831        old_collation = old_db_params.get("collation")
 832        new_collation = new_db_params.get("collation")
 833        drop_foreign_keys = (
 834            self.connection.features.supports_foreign_keys
 835            and (
 836                (old_field.primary_key and new_field.primary_key)
 837                or (old_field.unique and new_field.unique)
 838            )
 839            and ((old_type != new_type) or (old_collation != new_collation))
 840        )
 841        if drop_foreign_keys:
 842            # '_meta.related_field' also contains M2M reverse fields, these
 843            # will be filtered out
 844            for _old_rel, new_rel in _related_non_m2m_objects(old_field, new_field):
 845                rel_fk_names = self._constraint_names(
 846                    new_rel.related_model, [new_rel.field.column], foreign_key=True
 847                )
 848                for fk_name in rel_fk_names:
 849                    self.execute(self._delete_fk_sql(new_rel.related_model, fk_name))
 850        # Removed an index? (no strict check, as multiple indexes are possible)
 851        # Remove indexes if db_index switched to False or a unique constraint
 852        # will now be used in lieu of an index. The following lines from the
 853        # truth table show all True cases; the rest are False:
 854        #
 855        # old_field.db_index | old_field.unique | new_field.db_index | new_field.unique
 856        # ------------------------------------------------------------------------------
 857        # True               | False            | False              | False
 858        # True               | False            | False              | True
 859        # True               | False            | True               | True
 860        if (
 861            old_field.db_index
 862            and not old_field.unique
 863            and (not new_field.db_index or new_field.unique)
 864        ):
 865            # Find the index for this field
 866            meta_index_names = {index.name for index in model._meta.indexes}
 867            # Retrieve only BTREE indexes since this is what's created with
 868            # db_index=True.
 869            index_names = self._constraint_names(
 870                model,
 871                [old_field.column],
 872                index=True,
 873                type_=Index.suffix,
 874                exclude=meta_index_names,
 875            )
 876            for index_name in index_names:
 877                # The only way to check if an index was created with
 878                # db_index=True or with Index(['field'], name='foo')
 879                # is to look at its name (refs #28053).
 880                self.execute(self._delete_index_sql(model, index_name))
 881        # Change check constraints?
 882        old_db_check = self._field_db_check(old_field, old_db_params)
 883        new_db_check = self._field_db_check(new_field, new_db_params)
 884        if old_db_check != new_db_check and old_db_check:
 885            meta_constraint_names = {
 886                constraint.name for constraint in model._meta.constraints
 887            }
 888            constraint_names = self._constraint_names(
 889                model,
 890                [old_field.column],
 891                check=True,
 892                exclude=meta_constraint_names,
 893            )
 894            if strict and len(constraint_names) != 1:
 895                raise ValueError(
 896                    "Found wrong number ({}) of check constraints for {}.{}".format(
 897                        len(constraint_names),
 898                        model._meta.db_table,
 899                        old_field.column,
 900                    )
 901                )
 902            for constraint_name in constraint_names:
 903                self.execute(self._delete_check_sql(model, constraint_name))
 904        # Have they renamed the column?
 905        if old_field.column != new_field.column:
 906            self.execute(
 907                self._rename_field_sql(
 908                    model._meta.db_table, old_field, new_field, new_type
 909                )
 910            )
 911            # Rename all references to the renamed column.
 912            for sql in self.deferred_sql:
 913                if isinstance(sql, Statement):
 914                    sql.rename_column_references(
 915                        model._meta.db_table, old_field.column, new_field.column
 916                    )
 917        # Next, start accumulating actions to do
 918        actions = []
 919        null_actions = []
 920        post_actions = []
 921        # Type suffix change? (e.g. auto increment).
 922        old_type_suffix = old_field.db_type_suffix(connection=self.connection)
 923        new_type_suffix = new_field.db_type_suffix(connection=self.connection)
 924        # Type, collation, or comment change?
 925        if (
 926            old_type != new_type
 927            or old_type_suffix != new_type_suffix
 928            or old_collation != new_collation
 929            or (
 930                self.connection.features.supports_comments
 931                and old_field.db_comment != new_field.db_comment
 932            )
 933        ):
 934            fragment, other_actions = self._alter_column_type_sql(
 935                model, old_field, new_field, new_type, old_collation, new_collation
 936            )
 937            actions.append(fragment)
 938            post_actions.extend(other_actions)
 939        # When changing a column NULL constraint to NOT NULL with a given
 940        # default value, we need to perform 4 steps:
 941        #  1. Add a default for new incoming writes
 942        #  2. Update existing NULL rows with new default
 943        #  3. Replace NULL constraint with NOT NULL
 944        #  4. Drop the default again.
 945        # Default change?
 946        needs_database_default = False
 947        if old_field.null and not new_field.null:
 948            old_default = self.effective_default(old_field)
 949            new_default = self.effective_default(new_field)
 950            if (
 951                not self.skip_default_on_alter(new_field)
 952                and old_default != new_default
 953                and new_default is not None
 954            ):
 955                needs_database_default = True
 956                actions.append(
 957                    self._alter_column_default_sql(model, old_field, new_field)
 958                )
 959        # Nullability change?
 960        if old_field.null != new_field.null:
 961            fragment = self._alter_column_null_sql(model, old_field, new_field)
 962            if fragment:
 963                null_actions.append(fragment)
 964        # Only if we have a default and there is a change from NULL to NOT NULL
 965        four_way_default_alteration = new_field.has_default() and (
 966            old_field.null and not new_field.null
 967        )
 968        if actions or null_actions:
 969            if not four_way_default_alteration:
 970                # If we don't have to do a 4-way default alteration we can
 971                # directly run a (NOT) NULL alteration
 972                actions += null_actions
 973            # Combine actions together if we can (e.g. postgres)
 974            if self.connection.features.supports_combined_alters and actions:
 975                sql, params = tuple(zip(*actions))
 976                actions = [(", ".join(sql), sum(params, []))]
 977            # Apply those actions
 978            for sql, params in actions:
 979                self.execute(
 980                    self.sql_alter_column
 981                    % {
 982                        "table": self.quote_name(model._meta.db_table),
 983                        "changes": sql,
 984                    },
 985                    params,
 986                )
 987            if four_way_default_alteration:
 988                # Update existing rows with default value
 989                self.execute(
 990                    self.sql_update_with_default
 991                    % {
 992                        "table": self.quote_name(model._meta.db_table),
 993                        "column": self.quote_name(new_field.column),
 994                        "default": "%s",
 995                    },
 996                    [new_default],
 997                )
 998                # Since we didn't run a NOT NULL change before we need to do it
 999                # now
1000                for sql, params in null_actions:
1001                    self.execute(
1002                        self.sql_alter_column
1003                        % {
1004                            "table": self.quote_name(model._meta.db_table),
1005                            "changes": sql,
1006                        },
1007                        params,
1008                    )
1009        if post_actions:
1010            for sql, params in post_actions:
1011                self.execute(sql, params)
1012        # If primary_key changed to False, delete the primary key constraint.
1013        if old_field.primary_key and not new_field.primary_key:
1014            self._delete_primary_key(model, strict)
1015        # Added a unique?
1016        if self._unique_should_be_added(old_field, new_field):
1017            self.execute(self._create_unique_sql(model, [new_field]))
1018        # Added an index? Add an index if db_index switched to True or a unique
1019        # constraint will no longer be used in lieu of an index. The following
1020        # lines from the truth table show all True cases; the rest are False:
1021        #
1022        # old_field.db_index | old_field.unique | new_field.db_index | new_field.unique
1023        # ------------------------------------------------------------------------------
1024        # False              | False            | True               | False
1025        # False              | True             | True               | False
1026        # True               | True             | True               | False
1027        if (
1028            (not old_field.db_index or old_field.unique)
1029            and new_field.db_index
1030            and not new_field.unique
1031        ):
1032            self.execute(self._create_index_sql(model, fields=[new_field]))
1033        # Type alteration on primary key? Then we need to alter the column
1034        # referring to us.
1035        rels_to_update = []
1036        if drop_foreign_keys:
1037            rels_to_update.extend(_related_non_m2m_objects(old_field, new_field))
1038        # Changed to become primary key?
1039        if self._field_became_primary_key(old_field, new_field):
1040            # Make the new one
1041            self.execute(self._create_primary_key_sql(model, new_field))
1042            # Update all referencing columns
1043            rels_to_update.extend(_related_non_m2m_objects(old_field, new_field))
1044        # Handle our type alters on the other end of rels from the PK stuff above
1045        for old_rel, new_rel in rels_to_update:
1046            rel_db_params = new_rel.field.db_parameters(connection=self.connection)
1047            rel_type = rel_db_params["type"]
1048            rel_collation = rel_db_params.get("collation")
1049            old_rel_db_params = old_rel.field.db_parameters(connection=self.connection)
1050            old_rel_collation = old_rel_db_params.get("collation")
1051            fragment, other_actions = self._alter_column_type_sql(
1052                new_rel.related_model,
1053                old_rel.field,
1054                new_rel.field,
1055                rel_type,
1056                old_rel_collation,
1057                rel_collation,
1058            )
1059            self.execute(
1060                self.sql_alter_column
1061                % {
1062                    "table": self.quote_name(new_rel.related_model._meta.db_table),
1063                    "changes": fragment[0],
1064                },
1065                fragment[1],
1066            )
1067            for sql, params in other_actions:
1068                self.execute(sql, params)
1069        # Does it have a foreign key?
1070        if (
1071            self.connection.features.supports_foreign_keys
1072            and new_field.remote_field
1073            and (
1074                fks_dropped or not old_field.remote_field or not old_field.db_constraint
1075            )
1076            and new_field.db_constraint
1077        ):
1078            self.execute(
1079                self._create_fk_sql(model, new_field, "_fk_%(to_table)s_%(to_column)s")
1080            )
1081        # Rebuild FKs that pointed to us if we previously had to drop them
1082        if drop_foreign_keys:
1083            for _, rel in rels_to_update:
1084                if rel.field.db_constraint:
1085                    self.execute(
1086                        self._create_fk_sql(rel.related_model, rel.field, "_fk")
1087                    )
1088        # Does it have check constraints we need to add?
1089        if old_db_check != new_db_check and new_db_check:
1090            constraint_name = self._create_index_name(
1091                model._meta.db_table, [new_field.column], suffix="_check"
1092            )
1093            self.execute(
1094                self._create_check_sql(model, constraint_name, new_db_params["check"])
1095            )
1096        # Drop the default if we need to
1097        # (Plain usually does not use in-database defaults)
1098        if needs_database_default:
1099            changes_sql, params = self._alter_column_default_sql(
1100                model, old_field, new_field, drop=True
1101            )
1102            sql = self.sql_alter_column % {
1103                "table": self.quote_name(model._meta.db_table),
1104                "changes": changes_sql,
1105            }
1106            self.execute(sql, params)
1107        # Reset connection if required
1108        if self.connection.features.connection_persists_old_columns:
1109            self.connection.close()
1110
1111    def _alter_column_null_sql(self, model, old_field, new_field):
1112        """
1113        Hook to specialize column null alteration.
1114
1115        Return a (sql, params) fragment to set a column to null or non-null
1116        as required by new_field, or None if no changes are required.
1117        """
1118        if (
1119            self.connection.features.interprets_empty_strings_as_nulls
1120            and new_field.empty_strings_allowed
1121        ):
1122            # The field is nullable in the database anyway, leave it alone.
1123            return
1124        else:
1125            new_db_params = new_field.db_parameters(connection=self.connection)
1126            sql = (
1127                self.sql_alter_column_null
1128                if new_field.null
1129                else self.sql_alter_column_not_null
1130            )
1131            return (
1132                sql
1133                % {
1134                    "column": self.quote_name(new_field.column),
1135                    "type": new_db_params["type"],
1136                },
1137                [],
1138            )
1139
1140    def _alter_column_default_sql(self, model, old_field, new_field, drop=False):
1141        """
1142        Hook to specialize column default alteration.
1143
1144        Return a (sql, params) fragment to add or drop (depending on the drop
1145        argument) a default to new_field's column.
1146        """
1147        new_default = self.effective_default(new_field)
1148        default = self._column_default_sql(new_field)
1149        params = [new_default]
1150
1151        if drop:
1152            params = []
1153        elif self.connection.features.requires_literal_defaults:
1154            # Some databases (Oracle) can't take defaults as a parameter
1155            # If this is the case, the SchemaEditor for that database should
1156            # implement prepare_default().
1157            default = self.prepare_default(new_default)
1158            params = []
1159
1160        new_db_params = new_field.db_parameters(connection=self.connection)
1161        if drop:
1162            if new_field.null:
1163                sql = self.sql_alter_column_no_default_null
1164            else:
1165                sql = self.sql_alter_column_no_default
1166        else:
1167            sql = self.sql_alter_column_default
1168        return (
1169            sql
1170            % {
1171                "column": self.quote_name(new_field.column),
1172                "type": new_db_params["type"],
1173                "default": default,
1174            },
1175            params,
1176        )
1177
1178    def _alter_column_type_sql(
1179        self, model, old_field, new_field, new_type, old_collation, new_collation
1180    ):
1181        """
1182        Hook to specialize column type alteration for different backends,
1183        for cases when a creation type is different to an alteration type
1184        (e.g. SERIAL in PostgreSQL, PostGIS fields).
1185
1186        Return a two-tuple of: an SQL fragment of (sql, params) to insert into
1187        an ALTER TABLE statement and a list of extra (sql, params) tuples to
1188        run once the field is altered.
1189        """
1190        other_actions = []
1191        if collate_sql := self._collate_sql(
1192            new_collation, old_collation, model._meta.db_table
1193        ):
1194            collate_sql = f" {collate_sql}"
1195        else:
1196            collate_sql = ""
1197        # Comment change?
1198        comment_sql = ""
1199        if self.connection.features.supports_comments and not new_field.many_to_many:
1200            if old_field.db_comment != new_field.db_comment:
1201                # PostgreSQL and Oracle can't execute 'ALTER COLUMN ...' and
1202                # 'COMMENT ON ...' at the same time.
1203                sql, params = self._alter_column_comment_sql(
1204                    model, new_field, new_type, new_field.db_comment
1205                )
1206                if sql:
1207                    other_actions.append((sql, params))
1208            if new_field.db_comment:
1209                comment_sql = self._comment_sql(new_field.db_comment)
1210        return (
1211            (
1212                self.sql_alter_column_type
1213                % {
1214                    "column": self.quote_name(new_field.column),
1215                    "type": new_type,
1216                    "collation": collate_sql,
1217                    "comment": comment_sql,
1218                },
1219                [],
1220            ),
1221            other_actions,
1222        )
1223
1224    def _alter_column_comment_sql(self, model, new_field, new_type, new_db_comment):
1225        return (
1226            self.sql_alter_column_comment
1227            % {
1228                "table": self.quote_name(model._meta.db_table),
1229                "column": self.quote_name(new_field.column),
1230                "comment": self._comment_sql(new_db_comment),
1231            },
1232            [],
1233        )
1234
1235    def _comment_sql(self, comment):
1236        return self.quote_value(comment or "")
1237
1238    def _alter_many_to_many(self, model, old_field, new_field, strict):
1239        """Alter M2Ms to repoint their to= endpoints."""
1240        # Rename the through table
1241        if (
1242            old_field.remote_field.through._meta.db_table
1243            != new_field.remote_field.through._meta.db_table
1244        ):
1245            self.alter_db_table(
1246                old_field.remote_field.through,
1247                old_field.remote_field.through._meta.db_table,
1248                new_field.remote_field.through._meta.db_table,
1249            )
1250        # Repoint the FK to the other side
1251        self.alter_field(
1252            new_field.remote_field.through,
1253            # The field that points to the target model is needed, so we can
1254            # tell alter_field to change it - this is m2m_reverse_field_name()
1255            # (as opposed to m2m_field_name(), which points to our model).
1256            old_field.remote_field.through._meta.get_field(
1257                old_field.m2m_reverse_field_name()
1258            ),
1259            new_field.remote_field.through._meta.get_field(
1260                new_field.m2m_reverse_field_name()
1261            ),
1262        )
1263        self.alter_field(
1264            new_field.remote_field.through,
1265            # for self-referential models we need to alter field from the other end too
1266            old_field.remote_field.through._meta.get_field(old_field.m2m_field_name()),
1267            new_field.remote_field.through._meta.get_field(new_field.m2m_field_name()),
1268        )
1269
1270    def _create_index_name(self, table_name, column_names, suffix=""):
1271        """
1272        Generate a unique name for an index/unique constraint.
1273
1274        The name is divided into 3 parts: the table name, the column names,
1275        and a unique digest and suffix.
1276        """
1277        _, table_name = split_identifier(table_name)
1278        hash_suffix_part = "{}{}".format(
1279            names_digest(table_name, *column_names, length=8),
1280            suffix,
1281        )
1282        max_length = self.connection.ops.max_name_length() or 200
1283        # If everything fits into max_length, use that name.
1284        index_name = "{}_{}_{}".format(
1285            table_name, "_".join(column_names), hash_suffix_part
1286        )
1287        if len(index_name) <= max_length:
1288            return index_name
1289        # Shorten a long suffix.
1290        if len(hash_suffix_part) > max_length / 3:
1291            hash_suffix_part = hash_suffix_part[: max_length // 3]
1292        other_length = (max_length - len(hash_suffix_part)) // 2 - 1
1293        index_name = "{}_{}_{}".format(
1294            table_name[:other_length],
1295            "_".join(column_names)[:other_length],
1296            hash_suffix_part,
1297        )
1298        # Prepend D if needed to prevent the name from starting with an
1299        # underscore or a number (not permitted on Oracle).
1300        if index_name[0] == "_" or index_name[0].isdigit():
1301            index_name = "D%s" % index_name[:-1]
1302        return index_name
1303
1304    def _get_index_tablespace_sql(self, model, fields, db_tablespace=None):
1305        if db_tablespace is None:
1306            if len(fields) == 1 and fields[0].db_tablespace:
1307                db_tablespace = fields[0].db_tablespace
1308            elif settings.DEFAULT_INDEX_TABLESPACE:
1309                db_tablespace = settings.DEFAULT_INDEX_TABLESPACE
1310            elif model._meta.db_tablespace:
1311                db_tablespace = model._meta.db_tablespace
1312        if db_tablespace is not None:
1313            return " " + self.connection.ops.tablespace_sql(db_tablespace)
1314        return ""
1315
1316    def _index_condition_sql(self, condition):
1317        if condition:
1318            return " WHERE " + condition
1319        return ""
1320
1321    def _index_include_sql(self, model, columns):
1322        if not columns or not self.connection.features.supports_covering_indexes:
1323            return ""
1324        return Statement(
1325            " INCLUDE (%(columns)s)",
1326            columns=Columns(model._meta.db_table, columns, self.quote_name),
1327        )
1328
1329    def _create_index_sql(
1330        self,
1331        model,
1332        *,
1333        fields=None,
1334        name=None,
1335        suffix="",
1336        using="",
1337        db_tablespace=None,
1338        col_suffixes=(),
1339        sql=None,
1340        opclasses=(),
1341        condition=None,
1342        include=None,
1343        expressions=None,
1344    ):
1345        """
1346        Return the SQL statement to create the index for one or several fields
1347        or expressions. `sql` can be specified if the syntax differs from the
1348        standard (GIS indexes, ...).
1349        """
1350        fields = fields or []
1351        expressions = expressions or []
1352        compiler = Query(model, alias_cols=False).get_compiler(
1353            connection=self.connection,
1354        )
1355        tablespace_sql = self._get_index_tablespace_sql(
1356            model, fields, db_tablespace=db_tablespace
1357        )
1358        columns = [field.column for field in fields]
1359        sql_create_index = sql or self.sql_create_index
1360        table = model._meta.db_table
1361
1362        def create_index_name(*args, **kwargs):
1363            nonlocal name
1364            if name is None:
1365                name = self._create_index_name(*args, **kwargs)
1366            return self.quote_name(name)
1367
1368        return Statement(
1369            sql_create_index,
1370            table=Table(table, self.quote_name),
1371            name=IndexName(table, columns, suffix, create_index_name),
1372            using=using,
1373            columns=(
1374                self._index_columns(table, columns, col_suffixes, opclasses)
1375                if columns
1376                else Expressions(table, expressions, compiler, self.quote_value)
1377            ),
1378            extra=tablespace_sql,
1379            condition=self._index_condition_sql(condition),
1380            include=self._index_include_sql(model, include),
1381        )
1382
1383    def _delete_index_sql(self, model, name, sql=None):
1384        return Statement(
1385            sql or self.sql_delete_index,
1386            table=Table(model._meta.db_table, self.quote_name),
1387            name=self.quote_name(name),
1388        )
1389
1390    def _rename_index_sql(self, model, old_name, new_name):
1391        return Statement(
1392            self.sql_rename_index,
1393            table=Table(model._meta.db_table, self.quote_name),
1394            old_name=self.quote_name(old_name),
1395            new_name=self.quote_name(new_name),
1396        )
1397
1398    def _index_columns(self, table, columns, col_suffixes, opclasses):
1399        return Columns(table, columns, self.quote_name, col_suffixes=col_suffixes)
1400
1401    def _model_indexes_sql(self, model):
1402        """
1403        Return a list of all index SQL statements (field indexes, Meta.indexes) for the specified model.
1404        """
1405        if not model._meta.managed or model._meta.swapped:
1406            return []
1407        output = []
1408        for field in model._meta.local_fields:
1409            output.extend(self._field_indexes_sql(model, field))
1410
1411        for index in model._meta.indexes:
1412            if (
1413                not index.contains_expressions
1414                or self.connection.features.supports_expression_indexes
1415            ):
1416                output.append(index.create_sql(model, self))
1417        return output
1418
1419    def _field_indexes_sql(self, model, field):
1420        """
1421        Return a list of all index SQL statements for the specified field.
1422        """
1423        output = []
1424        if self._field_should_be_indexed(model, field):
1425            output.append(self._create_index_sql(model, fields=[field]))
1426        return output
1427
1428    def _field_should_be_altered(self, old_field, new_field, ignore=None):
1429        ignore = ignore or set()
1430        _, old_path, old_args, old_kwargs = old_field.deconstruct()
1431        _, new_path, new_args, new_kwargs = new_field.deconstruct()
1432        # Don't alter when:
1433        # - changing only a field name
1434        # - changing an attribute that doesn't affect the schema
1435        # - changing an attribute in the provided set of ignored attributes
1436        # - adding only a db_column and the column name is not changed
1437        for attr in ignore.union(old_field.non_db_attrs):
1438            old_kwargs.pop(attr, None)
1439        for attr in ignore.union(new_field.non_db_attrs):
1440            new_kwargs.pop(attr, None)
1441        return self.quote_name(old_field.column) != self.quote_name(
1442            new_field.column
1443        ) or (old_path, old_args, old_kwargs) != (new_path, new_args, new_kwargs)
1444
1445    def _field_should_be_indexed(self, model, field):
1446        return field.db_index and not field.unique
1447
1448    def _field_became_primary_key(self, old_field, new_field):
1449        return not old_field.primary_key and new_field.primary_key
1450
1451    def _unique_should_be_added(self, old_field, new_field):
1452        return (
1453            not new_field.primary_key
1454            and new_field.unique
1455            and (not old_field.unique or old_field.primary_key)
1456        )
1457
1458    def _rename_field_sql(self, table, old_field, new_field, new_type):
1459        return self.sql_rename_column % {
1460            "table": self.quote_name(table),
1461            "old_column": self.quote_name(old_field.column),
1462            "new_column": self.quote_name(new_field.column),
1463            "type": new_type,
1464        }
1465
1466    def _create_fk_sql(self, model, field, suffix):
1467        table = Table(model._meta.db_table, self.quote_name)
1468        name = self._fk_constraint_name(model, field, suffix)
1469        column = Columns(model._meta.db_table, [field.column], self.quote_name)
1470        to_table = Table(field.target_field.model._meta.db_table, self.quote_name)
1471        to_column = Columns(
1472            field.target_field.model._meta.db_table,
1473            [field.target_field.column],
1474            self.quote_name,
1475        )
1476        deferrable = self.connection.ops.deferrable_sql()
1477        return Statement(
1478            self.sql_create_fk,
1479            table=table,
1480            name=name,
1481            column=column,
1482            to_table=to_table,
1483            to_column=to_column,
1484            deferrable=deferrable,
1485        )
1486
1487    def _fk_constraint_name(self, model, field, suffix):
1488        def create_fk_name(*args, **kwargs):
1489            return self.quote_name(self._create_index_name(*args, **kwargs))
1490
1491        return ForeignKeyName(
1492            model._meta.db_table,
1493            [field.column],
1494            split_identifier(field.target_field.model._meta.db_table)[1],
1495            [field.target_field.column],
1496            suffix,
1497            create_fk_name,
1498        )
1499
1500    def _delete_fk_sql(self, model, name):
1501        return self._delete_constraint_sql(self.sql_delete_fk, model, name)
1502
1503    def _deferrable_constraint_sql(self, deferrable):
1504        if deferrable is None:
1505            return ""
1506        if deferrable == Deferrable.DEFERRED:
1507            return " DEFERRABLE INITIALLY DEFERRED"
1508        if deferrable == Deferrable.IMMEDIATE:
1509            return " DEFERRABLE INITIALLY IMMEDIATE"
1510
1511    def _unique_sql(
1512        self,
1513        model,
1514        fields,
1515        name,
1516        condition=None,
1517        deferrable=None,
1518        include=None,
1519        opclasses=None,
1520        expressions=None,
1521    ):
1522        if (
1523            deferrable
1524            and not self.connection.features.supports_deferrable_unique_constraints
1525        ):
1526            return None
1527        if condition or include or opclasses or expressions:
1528            # Databases support conditional, covering, and functional unique
1529            # constraints via a unique index.
1530            sql = self._create_unique_sql(
1531                model,
1532                fields,
1533                name=name,
1534                condition=condition,
1535                include=include,
1536                opclasses=opclasses,
1537                expressions=expressions,
1538            )
1539            if sql:
1540                self.deferred_sql.append(sql)
1541            return None
1542        constraint = self.sql_unique_constraint % {
1543            "columns": ", ".join([self.quote_name(field.column) for field in fields]),
1544            "deferrable": self._deferrable_constraint_sql(deferrable),
1545        }
1546        return self.sql_constraint % {
1547            "name": self.quote_name(name),
1548            "constraint": constraint,
1549        }
1550
1551    def _create_unique_sql(
1552        self,
1553        model,
1554        fields,
1555        name=None,
1556        condition=None,
1557        deferrable=None,
1558        include=None,
1559        opclasses=None,
1560        expressions=None,
1561    ):
1562        if (
1563            (
1564                deferrable
1565                and not self.connection.features.supports_deferrable_unique_constraints
1566            )
1567            or (condition and not self.connection.features.supports_partial_indexes)
1568            or (include and not self.connection.features.supports_covering_indexes)
1569            or (
1570                expressions and not self.connection.features.supports_expression_indexes
1571            )
1572        ):
1573            return None
1574
1575        compiler = Query(model, alias_cols=False).get_compiler(
1576            connection=self.connection
1577        )
1578        table = model._meta.db_table
1579        columns = [field.column for field in fields]
1580        if name is None:
1581            name = self._unique_constraint_name(table, columns, quote=True)
1582        else:
1583            name = self.quote_name(name)
1584        if condition or include or opclasses or expressions:
1585            sql = self.sql_create_unique_index
1586        else:
1587            sql = self.sql_create_unique
1588        if columns:
1589            columns = self._index_columns(
1590                table, columns, col_suffixes=(), opclasses=opclasses
1591            )
1592        else:
1593            columns = Expressions(table, expressions, compiler, self.quote_value)
1594        return Statement(
1595            sql,
1596            table=Table(table, self.quote_name),
1597            name=name,
1598            columns=columns,
1599            condition=self._index_condition_sql(condition),
1600            deferrable=self._deferrable_constraint_sql(deferrable),
1601            include=self._index_include_sql(model, include),
1602        )
1603
1604    def _unique_constraint_name(self, table, columns, quote=True):
1605        if quote:
1606
1607            def create_unique_name(*args, **kwargs):
1608                return self.quote_name(self._create_index_name(*args, **kwargs))
1609
1610        else:
1611            create_unique_name = self._create_index_name
1612
1613        return IndexName(table, columns, "_uniq", create_unique_name)
1614
1615    def _delete_unique_sql(
1616        self,
1617        model,
1618        name,
1619        condition=None,
1620        deferrable=None,
1621        include=None,
1622        opclasses=None,
1623        expressions=None,
1624    ):
1625        if (
1626            (
1627                deferrable
1628                and not self.connection.features.supports_deferrable_unique_constraints
1629            )
1630            or (condition and not self.connection.features.supports_partial_indexes)
1631            or (include and not self.connection.features.supports_covering_indexes)
1632            or (
1633                expressions and not self.connection.features.supports_expression_indexes
1634            )
1635        ):
1636            return None
1637        if condition or include or opclasses or expressions:
1638            sql = self.sql_delete_index
1639        else:
1640            sql = self.sql_delete_unique
1641        return self._delete_constraint_sql(sql, model, name)
1642
1643    def _check_sql(self, name, check):
1644        return self.sql_constraint % {
1645            "name": self.quote_name(name),
1646            "constraint": self.sql_check_constraint % {"check": check},
1647        }
1648
1649    def _create_check_sql(self, model, name, check):
1650        if not self.connection.features.supports_table_check_constraints:
1651            return None
1652        return Statement(
1653            self.sql_create_check,
1654            table=Table(model._meta.db_table, self.quote_name),
1655            name=self.quote_name(name),
1656            check=check,
1657        )
1658
1659    def _delete_check_sql(self, model, name):
1660        if not self.connection.features.supports_table_check_constraints:
1661            return None
1662        return self._delete_constraint_sql(self.sql_delete_check, model, name)
1663
1664    def _delete_constraint_sql(self, template, model, name):
1665        return Statement(
1666            template,
1667            table=Table(model._meta.db_table, self.quote_name),
1668            name=self.quote_name(name),
1669        )
1670
1671    def _constraint_names(
1672        self,
1673        model,
1674        column_names=None,
1675        unique=None,
1676        primary_key=None,
1677        index=None,
1678        foreign_key=None,
1679        check=None,
1680        type_=None,
1681        exclude=None,
1682    ):
1683        """Return all constraint names matching the columns and conditions."""
1684        if column_names is not None:
1685            column_names = [
1686                self.connection.introspection.identifier_converter(
1687                    truncate_name(name, self.connection.ops.max_name_length())
1688                )
1689                if self.connection.features.truncates_names
1690                else self.connection.introspection.identifier_converter(name)
1691                for name in column_names
1692            ]
1693        with self.connection.cursor() as cursor:
1694            constraints = self.connection.introspection.get_constraints(
1695                cursor, model._meta.db_table
1696            )
1697        result = []
1698        for name, infodict in constraints.items():
1699            if column_names is None or column_names == infodict["columns"]:
1700                if unique is not None and infodict["unique"] != unique:
1701                    continue
1702                if primary_key is not None and infodict["primary_key"] != primary_key:
1703                    continue
1704                if index is not None and infodict["index"] != index:
1705                    continue
1706                if check is not None and infodict["check"] != check:
1707                    continue
1708                if foreign_key is not None and not infodict["foreign_key"]:
1709                    continue
1710                if type_ is not None and infodict["type"] != type_:
1711                    continue
1712                if not exclude or name not in exclude:
1713                    result.append(name)
1714        return result
1715
1716    def _delete_primary_key(self, model, strict=False):
1717        constraint_names = self._constraint_names(model, primary_key=True)
1718        if strict and len(constraint_names) != 1:
1719            raise ValueError(
1720                "Found wrong number ({}) of PK constraints for {}".format(
1721                    len(constraint_names),
1722                    model._meta.db_table,
1723                )
1724            )
1725        for constraint_name in constraint_names:
1726            self.execute(self._delete_primary_key_sql(model, constraint_name))
1727
1728    def _create_primary_key_sql(self, model, field):
1729        return Statement(
1730            self.sql_create_pk,
1731            table=Table(model._meta.db_table, self.quote_name),
1732            name=self.quote_name(
1733                self._create_index_name(
1734                    model._meta.db_table, [field.column], suffix="_pk"
1735                )
1736            ),
1737            columns=Columns(model._meta.db_table, [field.column], self.quote_name),
1738        )
1739
1740    def _delete_primary_key_sql(self, model, name):
1741        return self._delete_constraint_sql(self.sql_delete_pk, model, name)
1742
1743    def _collate_sql(self, collation, old_collation=None, table_name=None):
1744        return "COLLATE " + self.quote_name(collation) if collation else ""
1745
1746    def remove_procedure(self, procedure_name, param_types=()):
1747        sql = self.sql_delete_procedure % {
1748            "procedure": self.quote_name(procedure_name),
1749            "param_types": ",".join(param_types),
1750        }
1751        self.execute(sql)