Plain is headed towards 1.0! Subscribe for development updates →

   1from __future__ import annotations
   2
   3import copy
   4import warnings
   5from collections.abc import Iterable, Iterator, Sequence
   6from itertools import chain
   7from typing import TYPE_CHECKING, Any
   8
   9if TYPE_CHECKING:
  10    from plain.models.meta import Meta
  11    from plain.models.options import Options
  12
  13import plain.runtime
  14from plain.exceptions import NON_FIELD_ERRORS, ValidationError
  15from plain.models import models_registry, transaction
  16from plain.models.constants import LOOKUP_SEP
  17from plain.models.constraints import CheckConstraint, UniqueConstraint
  18from plain.models.db import (
  19    PLAIN_VERSION_PICKLE_KEY,
  20    DatabaseError,
  21    db_connection,
  22)
  23from plain.models.deletion import Collector
  24from plain.models.exceptions import (
  25    DoesNotExistDescriptor,
  26    FieldDoesNotExist,
  27    MultipleObjectsReturnedDescriptor,
  28)
  29from plain.models.expressions import RawSQL, Value
  30from plain.models.fields import NOT_PROVIDED, PrimaryKeyField
  31from plain.models.fields.reverse_related import ForeignObjectRel
  32from plain.models.meta import Meta
  33from plain.models.options import Options
  34from plain.models.query import F, Q, QuerySet
  35from plain.preflight import PreflightResult
  36from plain.utils.encoding import force_str
  37from plain.utils.hashable import make_hashable
  38
  39
  40class Deferred:
  41    def __repr__(self) -> str:
  42        return "<Deferred field>"
  43
  44    def __str__(self) -> str:
  45        return "<Deferred field>"
  46
  47
  48DEFERRED = Deferred()
  49
  50
  51class ModelBase(type):
  52    """Metaclass for all models."""
  53
  54    def __new__(
  55        cls, name: str, bases: tuple[type, ...], attrs: dict[str, Any], **kwargs: Any
  56    ) -> type:
  57        # Don't do any of this for the root models.Model class.
  58        if not bases:
  59            return super().__new__(cls, name, bases, attrs)
  60
  61        for base in bases:
  62            # Models are required to directly inherit from model.Model, not a subclass of it.
  63            if issubclass(base, Model) and base is not Model:
  64                raise TypeError(
  65                    f"A model can't extend another model: {name} extends {base}"
  66                )
  67
  68        return super().__new__(cls, name, bases, attrs, **kwargs)
  69
  70
  71class ModelStateFieldsCacheDescriptor:
  72    def __get__(
  73        self, instance: ModelState | None, cls: type | None = None
  74    ) -> ModelStateFieldsCacheDescriptor | dict[str, Any]:
  75        if instance is None:
  76            return self
  77        res = instance.fields_cache = {}
  78        return res
  79
  80
  81class ModelState:
  82    """Store model instance state."""
  83
  84    # If true, uniqueness validation checks will consider this a new, unsaved
  85    # object. Necessary for correct validation of new instances of objects with
  86    # explicit (non-auto) PKs. This impacts validation only; it has no effect
  87    # on the actual save.
  88    adding = True
  89    fields_cache = ModelStateFieldsCacheDescriptor()
  90
  91
  92class Model(metaclass=ModelBase):
  93    # Every model gets an automatic id field
  94    id = PrimaryKeyField()
  95
  96    # Descriptors for other model behavior
  97    query = QuerySet()
  98    model_options = Options()
  99    _model_meta = Meta()
 100    DoesNotExist = DoesNotExistDescriptor()
 101    MultipleObjectsReturned = MultipleObjectsReturnedDescriptor()
 102
 103    def __init__(self, *args: Any, **kwargs: Any):
 104        # Alias some things as locals to avoid repeat global lookups
 105        cls = self.__class__
 106        meta = cls._model_meta
 107        _setattr = setattr
 108        _DEFERRED = DEFERRED
 109
 110        # Set up the storage for instance state
 111        self._state = ModelState()
 112
 113        # There is a rather weird disparity here; if kwargs, it's set, then args
 114        # overrides it. It should be one or the other; don't duplicate the work
 115        # The reason for the kwargs check is that standard iterator passes in by
 116        # args, and instantiation for iteration is 33% faster.
 117        if len(args) > len(meta.concrete_fields):
 118            # Daft, but matches old exception sans the err msg.
 119            raise IndexError("Number of args exceeds number of fields")
 120
 121        if not kwargs:
 122            fields_iter = iter(meta.concrete_fields)
 123            # The ordering of the zip calls matter - zip throws StopIteration
 124            # when an iter throws it. So if the first iter throws it, the second
 125            # is *not* consumed. We rely on this, so don't change the order
 126            # without changing the logic.
 127            for val, field in zip(args, fields_iter):
 128                if val is _DEFERRED:
 129                    continue
 130                _setattr(self, field.attname, val)
 131        else:
 132            # Slower, kwargs-ready version.
 133            fields_iter = iter(meta.fields)
 134            for val, field in zip(args, fields_iter):
 135                if val is _DEFERRED:
 136                    continue
 137                _setattr(self, field.attname, val)
 138                if kwargs.pop(field.name, NOT_PROVIDED) is not NOT_PROVIDED:
 139                    raise TypeError(
 140                        f"{cls.__qualname__}() got both positional and "
 141                        f"keyword arguments for field '{field.name}'."
 142                    )
 143
 144        # Now we're left with the unprocessed fields that *must* come from
 145        # keywords, or default.
 146
 147        for field in fields_iter:
 148            is_related_object = False
 149            # Virtual field
 150            if field.attname not in kwargs and field.column is None:
 151                continue
 152            if kwargs:
 153                if isinstance(field.remote_field, ForeignObjectRel):
 154                    try:
 155                        # Assume object instance was passed in.
 156                        rel_obj = kwargs.pop(field.name)
 157                        is_related_object = True
 158                    except KeyError:
 159                        try:
 160                            # Object instance wasn't passed in -- must be an ID.
 161                            val = kwargs.pop(field.attname)
 162                        except KeyError:
 163                            val = field.get_default()
 164                else:
 165                    try:
 166                        val = kwargs.pop(field.attname)
 167                    except KeyError:
 168                        # This is done with an exception rather than the
 169                        # default argument on pop because we don't want
 170                        # get_default() to be evaluated, and then not used.
 171                        # Refs #12057.
 172                        val = field.get_default()
 173            else:
 174                val = field.get_default()
 175
 176            if is_related_object:
 177                # If we are passed a related instance, set it using the
 178                # field.name instead of field.attname (e.g. "user" instead of
 179                # "user_id") so that the object gets properly cached (and type
 180                # checked) by the RelatedObjectDescriptor.
 181                if rel_obj is not _DEFERRED:
 182                    _setattr(self, field.name, rel_obj)
 183            else:
 184                if val is not _DEFERRED:
 185                    _setattr(self, field.attname, val)
 186
 187        if kwargs:
 188            property_names = meta._property_names
 189            unexpected = ()
 190            for prop, value in kwargs.items():
 191                # Any remaining kwargs must correspond to properties or virtual
 192                # fields.
 193                if prop in property_names:
 194                    if value is not _DEFERRED:
 195                        _setattr(self, prop, value)
 196                else:
 197                    try:
 198                        meta.get_field(prop)
 199                    except FieldDoesNotExist:
 200                        unexpected += (prop,)
 201                    else:
 202                        if value is not _DEFERRED:
 203                            _setattr(self, prop, value)
 204            if unexpected:
 205                unexpected_names = ", ".join(repr(n) for n in unexpected)
 206                raise TypeError(
 207                    f"{cls.__name__}() got unexpected keyword arguments: "
 208                    f"{unexpected_names}"
 209                )
 210        super().__init__()
 211
 212    @classmethod
 213    def from_db(cls, field_names: Iterable[str], values: Sequence[Any]) -> Model:
 214        if len(values) != len(cls._model_meta.concrete_fields):
 215            values_iter = iter(values)
 216            values = [
 217                next(values_iter) if f.attname in field_names else DEFERRED
 218                for f in cls._model_meta.concrete_fields
 219            ]
 220        new = cls(*values)
 221        new._state.adding = False
 222        return new
 223
 224    def __repr__(self) -> str:
 225        return f"<{self.__class__.__name__}: {self}>"
 226
 227    def __str__(self) -> str:
 228        return f"{self.__class__.__name__} object ({self.id})"
 229
 230    def __eq__(self, other: object) -> bool:
 231        if not isinstance(other, Model):
 232            return NotImplemented
 233        if self.__class__ != other.__class__:
 234            return False
 235        my_id = self.id
 236        if my_id is None:
 237            return self is other
 238        return my_id == other.id
 239
 240    def __hash__(self) -> int:
 241        if self.id is None:
 242            raise TypeError("Model instances without primary key value are unhashable")
 243        return hash(self.id)
 244
 245    def __reduce__(self) -> tuple[Any, tuple[Any, ...], dict[str, Any]]:
 246        data = self.__getstate__()
 247        data[PLAIN_VERSION_PICKLE_KEY] = plain.runtime.__version__
 248        class_id = (
 249            self.model_options.package_label,
 250            self.model_options.object_name,
 251        )
 252        return model_unpickle, (class_id,), data
 253
 254    def __getstate__(self) -> dict[str, Any]:
 255        """Hook to allow choosing the attributes to pickle."""
 256        state = self.__dict__.copy()
 257        state["_state"] = copy.copy(state["_state"])
 258        state["_state"].fields_cache = state["_state"].fields_cache.copy()
 259        # memoryview cannot be pickled, so cast it to bytes and store
 260        # separately.
 261        _memoryview_attrs = []
 262        for attr, value in state.items():
 263            if isinstance(value, memoryview):
 264                _memoryview_attrs.append((attr, bytes(value)))
 265        if _memoryview_attrs:
 266            state["_memoryview_attrs"] = _memoryview_attrs
 267            for attr, value in _memoryview_attrs:
 268                state.pop(attr)
 269        return state
 270
 271    def __setstate__(self, state: dict[str, Any]) -> None:
 272        pickled_version = state.get(PLAIN_VERSION_PICKLE_KEY)
 273        if pickled_version:
 274            if pickled_version != plain.runtime.__version__:
 275                warnings.warn(
 276                    f"Pickled model instance's Plain version {pickled_version} does not "
 277                    f"match the current version {plain.runtime.__version__}.",
 278                    RuntimeWarning,
 279                    stacklevel=2,
 280                )
 281        else:
 282            warnings.warn(
 283                "Pickled model instance's Plain version is not specified.",
 284                RuntimeWarning,
 285                stacklevel=2,
 286            )
 287        if "_memoryview_attrs" in state:
 288            for attr, value in state.pop("_memoryview_attrs"):
 289                state[attr] = memoryview(value)
 290        self.__dict__.update(state)
 291
 292    def get_deferred_fields(self) -> set[str]:
 293        """
 294        Return a set containing names of deferred fields for this instance.
 295        """
 296        return {
 297            f.attname
 298            for f in self._model_meta.concrete_fields
 299            if f.attname not in self.__dict__
 300        }
 301
 302    def refresh_from_db(self, fields: list[str] | None = None) -> None:
 303        """
 304        Reload field values from the database.
 305
 306        By default, the reloading happens from the database this instance was
 307        loaded from, or by the read router if this instance wasn't loaded from
 308        any database. The using parameter will override the default.
 309
 310        Fields can be used to specify which fields to reload. The fields
 311        should be an iterable of field attnames. If fields is None, then
 312        all non-deferred fields are reloaded.
 313
 314        When accessing deferred fields of an instance, the deferred loading
 315        of the field will call this method.
 316        """
 317        if fields is None:
 318            self._prefetched_objects_cache = {}
 319        else:
 320            prefetched_objects_cache = getattr(self, "_prefetched_objects_cache", ())
 321            for field in fields:
 322                if field in prefetched_objects_cache:
 323                    del prefetched_objects_cache[field]  # type: ignore[misc]
 324                    fields.remove(field)
 325            if not fields:
 326                return
 327            if any(LOOKUP_SEP in f for f in fields):
 328                raise ValueError(
 329                    f'Found "{LOOKUP_SEP}" in fields argument. Relations and transforms '
 330                    "are not allowed in fields."
 331                )
 332
 333        db_instance_qs = self._model_meta.base_queryset.filter(id=self.id)
 334
 335        # Use provided fields, if not set then reload all non-deferred fields.
 336        deferred_fields = self.get_deferred_fields()
 337        if fields is not None:
 338            fields = list(fields)
 339            db_instance_qs = db_instance_qs.only(*fields)
 340        elif deferred_fields:
 341            fields = [
 342                f.attname
 343                for f in self._model_meta.concrete_fields
 344                if f.attname not in deferred_fields
 345            ]
 346            db_instance_qs = db_instance_qs.only(*fields)
 347
 348        db_instance = db_instance_qs.get()
 349        non_loaded_fields = db_instance.get_deferred_fields()
 350        for field in self._model_meta.concrete_fields:
 351            if field.attname in non_loaded_fields:
 352                # This field wasn't refreshed - skip ahead.
 353                continue
 354            setattr(self, field.attname, getattr(db_instance, field.attname))
 355            # Clear cached foreign keys.
 356            if field.is_relation and field.is_cached(self):
 357                field.delete_cached_value(self)
 358
 359        # Clear cached relations.
 360        for field in self._model_meta.related_objects:
 361            if field.is_cached(self):
 362                field.delete_cached_value(self)
 363
 364    def serializable_value(self, field_name: str) -> Any:
 365        """
 366        Return the value of the field name for this instance. If the field is
 367        a foreign key, return the id value instead of the object. If there's
 368        no Field object with this name on the model, return the model
 369        attribute's value.
 370
 371        Used to serialize a field's value (in the serializer, or form output,
 372        for example). Normally, you would just access the attribute directly
 373        and not use this method.
 374        """
 375        try:
 376            field = self._model_meta.get_field(field_name)
 377        except FieldDoesNotExist:
 378            return getattr(self, field_name)
 379        return getattr(self, field.attname)
 380
 381    def save(
 382        self,
 383        *,
 384        clean_and_validate: bool = True,
 385        force_insert: bool = False,
 386        force_update: bool = False,
 387        update_fields: Iterable[str] | None = None,
 388    ) -> None:
 389        """
 390        Save the current instance. Override this in a subclass if you want to
 391        control the saving process.
 392
 393        The 'force_insert' and 'force_update' parameters can be used to insist
 394        that the "save" must be an SQL insert or update (or equivalent for
 395        non-SQL backends), respectively. Normally, they should not be set.
 396        """
 397        self._prepare_related_fields_for_save(operation_name="save")
 398
 399        if force_insert and (force_update or update_fields):
 400            raise ValueError("Cannot force both insert and updating in model saving.")
 401
 402        deferred_fields = self.get_deferred_fields()
 403        if update_fields is not None:
 404            # If update_fields is empty, skip the save. We do also check for
 405            # no-op saves later on for inheritance cases. This bailout is
 406            # still needed for skipping signal sending.
 407            if not update_fields:
 408                return
 409
 410            update_fields = frozenset(update_fields)
 411            field_names = self._model_meta._non_pk_concrete_field_names
 412            non_model_fields = update_fields.difference(field_names)
 413
 414            if non_model_fields:
 415                raise ValueError(
 416                    "The following fields do not exist in this model, are m2m "
 417                    "fields, or are non-concrete fields: {}".format(
 418                        ", ".join(non_model_fields)
 419                    )
 420                )
 421
 422        # If this model is deferred, automatically do an "update_fields" save
 423        # on the loaded fields.
 424        elif not force_insert and deferred_fields:
 425            field_names = set()
 426            for field in self._model_meta.concrete_fields:
 427                if not field.primary_key and not hasattr(field, "through"):
 428                    field_names.add(field.attname)
 429            loaded_fields = field_names.difference(deferred_fields)
 430            if loaded_fields:
 431                update_fields = frozenset(loaded_fields)
 432
 433        if clean_and_validate:
 434            self.full_clean(exclude=deferred_fields)
 435
 436        self.save_base(
 437            force_insert=force_insert,
 438            force_update=force_update,
 439            update_fields=update_fields,
 440        )
 441
 442    def save_base(
 443        self,
 444        *,
 445        raw: bool = False,
 446        force_insert: bool = False,
 447        force_update: bool = False,
 448        update_fields: Iterable[str] | None = None,
 449    ) -> None:
 450        """
 451        Handle the parts of saving which should be done only once per save,
 452        yet need to be done in raw saves, too. This includes some sanity
 453        checks and signal sending.
 454
 455        The 'raw' argument is telling save_base not to save any parent
 456        models and not to do any changes to the values before save. This
 457        is used by fixture loading.
 458        """
 459        assert not (force_insert and (force_update or update_fields))
 460        assert update_fields is None or update_fields
 461        cls = self.__class__
 462
 463        with transaction.mark_for_rollback_on_error():
 464            self._save_table(
 465                raw=raw,
 466                cls=cls,
 467                force_insert=force_insert,
 468                force_update=force_update,
 469                update_fields=update_fields,
 470            )
 471        # Once saved, this is no longer a to-be-added instance.
 472        self._state.adding = False
 473
 474    def _save_table(
 475        self,
 476        *,
 477        raw: bool,
 478        cls: type[Model],
 479        force_insert: bool = False,
 480        force_update: bool = False,
 481        update_fields: Iterable[str] | None = None,
 482    ) -> bool:
 483        """
 484        Do the heavy-lifting involved in saving. Update or insert the data
 485        for a single table.
 486        """
 487        meta = cls._model_meta
 488        non_pks = [f for f in meta.local_concrete_fields if not f.primary_key]
 489
 490        if update_fields:
 491            non_pks = [
 492                f
 493                for f in non_pks
 494                if f.name in update_fields or f.attname in update_fields
 495            ]
 496
 497        id_val = self.id
 498        if id_val is None:
 499            id_field = meta.get_field("id")
 500            id_val = id_field.get_id_value_on_save(self)
 501            setattr(self, id_field.attname, id_val)
 502        id_set = id_val is not None
 503        if not id_set and (force_update or update_fields):
 504            raise ValueError("Cannot force an update in save() with no primary key.")
 505        updated = False
 506        # Skip an UPDATE when adding an instance and primary key has a default.
 507        if (
 508            not raw
 509            and not force_insert
 510            and self._state.adding
 511            and meta.get_field("id").default
 512            and meta.get_field("id").default is not NOT_PROVIDED
 513        ):
 514            force_insert = True
 515        # If possible, try an UPDATE. If that doesn't update anything, do an INSERT.
 516        if id_set and not force_insert:
 517            base_qs = meta.base_queryset
 518            values = [
 519                (
 520                    f,
 521                    None,
 522                    (getattr(self, f.attname) if raw else f.pre_save(self, False)),
 523                )
 524                for f in non_pks
 525            ]
 526            forced_update = update_fields or force_update
 527            updated = self._do_update(
 528                base_qs, id_val, values, update_fields, forced_update
 529            )
 530            if force_update and not updated:
 531                raise DatabaseError("Forced update did not affect any rows.")
 532            if update_fields and not updated:
 533                raise DatabaseError("Save with update_fields did not affect any rows.")
 534        if not updated:
 535            fields = meta.local_concrete_fields
 536            if not id_set:
 537                id_field = meta.get_field("id")
 538                fields = [f for f in fields if f is not id_field]
 539
 540            returning_fields = meta.db_returning_fields
 541            results = self._do_insert(meta.base_queryset, fields, returning_fields, raw)
 542            if results:
 543                for value, field in zip(results[0], returning_fields):
 544                    setattr(self, field.attname, value)
 545        return updated
 546
 547    def _do_update(
 548        self,
 549        base_qs: QuerySet,
 550        id_val: Any,
 551        values: list[tuple[Any, Any, Any]],
 552        update_fields: Iterable[str] | None,
 553        forced_update: bool,
 554    ) -> bool:
 555        """
 556        Try to update the model. Return True if the model was updated (if an
 557        update query was done and a matching row was found in the DB).
 558        """
 559        filtered = base_qs.filter(id=id_val)
 560        if not values:
 561            # We can end up here when saving a model in inheritance chain where
 562            # update_fields doesn't target any field in current model. In that
 563            # case we just say the update succeeded. Another case ending up here
 564            # is a model with just PK - in that case check that the PK still
 565            # exists.
 566            return update_fields is not None or filtered.exists()
 567        return filtered._update(values) > 0
 568
 569    def _do_insert(
 570        self,
 571        manager: QuerySet,
 572        fields: Sequence[Any],
 573        returning_fields: Sequence[Any],
 574        raw: bool,
 575    ) -> list[Any]:
 576        """
 577        Do an INSERT. If returning_fields is defined then this method should
 578        return the newly created data for the model.
 579        """
 580        return manager._insert(  # type: ignore[return-value, arg-type]
 581            [self],
 582            fields=fields,  # type: ignore[arg-type]
 583            returning_fields=returning_fields,  # type: ignore[arg-type]
 584            raw=raw,
 585        )
 586
 587    def _prepare_related_fields_for_save(
 588        self, operation_name: str, fields: Sequence[Any] | None = None
 589    ) -> None:
 590        # Ensure that a model instance without a PK hasn't been assigned to
 591        # a ForeignKey on this model. If the field is nullable, allowing the save would result in silent data loss.
 592        for field in self._model_meta.concrete_fields:
 593            if fields and field not in fields:
 594                continue
 595            # If the related field isn't cached, then an instance hasn't been
 596            # assigned and there's no need to worry about this check.
 597            if field.is_relation and field.is_cached(self):
 598                obj = getattr(self, field.name, None)
 599                if not obj:
 600                    continue
 601                # A pk may have been assigned manually to a model instance not
 602                # saved to the database (or auto-generated in a case like
 603                # UUIDField), but we allow the save to proceed and rely on the
 604                # database to raise an IntegrityError if applicable. If
 605                # constraints aren't supported by the database, there's the
 606                # unavoidable risk of data corruption.
 607                if obj.id is None:
 608                    # Remove the object from a related instance cache.
 609                    if not field.remote_field.multiple:
 610                        field.remote_field.delete_cached_value(obj)
 611                    raise ValueError(
 612                        f"{operation_name}() prohibited to prevent data loss due to unsaved "
 613                        f"related object '{field.name}'."
 614                    )
 615                elif getattr(self, field.attname) in field.empty_values:
 616                    # Set related object if it has been saved after an
 617                    # assignment.
 618                    setattr(self, field.name, obj)
 619                # If the relationship's pk/to_field was changed, clear the
 620                # cached relationship.
 621                if getattr(obj, field.target_field.attname) != getattr(
 622                    self, field.attname
 623                ):
 624                    field.delete_cached_value(self)
 625
 626    def delete(self) -> tuple[int, dict[str, int]]:
 627        if self.id is None:
 628            raise ValueError(
 629                f"{self.model_options.object_name} object can't be deleted because its id attribute is set "
 630                "to None."
 631            )
 632        collector = Collector(origin=self)
 633        collector.collect([self])
 634        return collector.delete()
 635
 636    def get_field_display(self, field_name: str) -> str:
 637        """Get the display value for a field, especially useful for fields with choices."""
 638        # Get the field object from the field name
 639        field = self._model_meta.get_field(field_name)
 640        value = getattr(self, field.attname)
 641
 642        # If field has no choices, just return the value as string
 643        if not hasattr(field, "flatchoices") or not field.flatchoices:
 644            return force_str(value, strings_only=True)
 645
 646        # For fields with choices, look up the display value
 647        choices_dict = dict(make_hashable(field.flatchoices))
 648        return force_str(
 649            choices_dict.get(make_hashable(value), value), strings_only=True
 650        )
 651
 652    def _get_field_value_map(
 653        self, meta: Meta | None, exclude: set[str] | None = None
 654    ) -> dict[str, Value]:
 655        if exclude is None:
 656            exclude = set()
 657        meta = meta or self._model_meta
 658        return {
 659            field.name: Value(getattr(self, field.attname), field)
 660            for field in meta.local_concrete_fields
 661            if field.name not in exclude
 662        }
 663
 664    def prepare_database_save(self, field: Any) -> Any:
 665        if self.id is None:
 666            raise ValueError(
 667                f"Unsaved model instance {self!r} cannot be used in an ORM query."
 668            )
 669        return getattr(self, field.remote_field.get_related_field().attname)
 670
 671    def clean(self) -> None:
 672        """
 673        Hook for doing any extra model-wide validation after clean() has been
 674        called on every field by self.clean_fields. Any ValidationError raised
 675        by this method will not be associated with a particular field; it will
 676        have a special-case association with the field defined by NON_FIELD_ERRORS.
 677        """
 678        pass
 679
 680    def validate_unique(self, exclude: set[str] | None = None) -> None:
 681        """
 682        Check unique constraints on the model and raise ValidationError if any
 683        failed.
 684        """
 685        unique_checks = self._get_unique_checks(exclude=exclude)
 686
 687        if errors := self._perform_unique_checks(unique_checks):
 688            raise ValidationError(errors)
 689
 690    def _get_unique_checks(
 691        self, exclude: set[str] | None = None
 692    ) -> list[tuple[type, tuple[str, ...]]]:
 693        """
 694        Return a list of checks to perform. Since validate_unique() could be
 695        called from a ModelForm, some fields may have been excluded; we can't
 696        perform a unique check on a model that is missing fields involved
 697        in that check. Fields that did not validate should also be excluded,
 698        but they need to be passed in via the exclude argument.
 699        """
 700        if exclude is None:
 701            exclude = set()
 702        unique_checks = []
 703
 704        # Gather a list of checks for fields declared as unique and add them to
 705        # the list of checks.
 706
 707        fields_with_class = [(self.__class__, self._model_meta.local_fields)]
 708
 709        for model_class, fields in fields_with_class:
 710            for f in fields:
 711                name = f.name
 712                if name in exclude:
 713                    continue
 714                if f.primary_key:
 715                    unique_checks.append((model_class, (name,)))
 716
 717        return unique_checks
 718
 719    def _perform_unique_checks(
 720        self, unique_checks: list[tuple[type, tuple[str, ...]]]
 721    ) -> dict[str, list[ValidationError]]:
 722        errors = {}
 723
 724        for model_class, unique_check in unique_checks:
 725            # Try to look up an existing object with the same values as this
 726            # object's values for all the unique field.
 727
 728            lookup_kwargs = {}
 729            for field_name in unique_check:
 730                f = self._model_meta.get_field(field_name)
 731                lookup_value = getattr(self, f.attname)
 732                # TODO: Handle multiple backends with different feature flags.
 733                if lookup_value is None:
 734                    # no value, skip the lookup
 735                    continue
 736                if f.primary_key and not self._state.adding:
 737                    # no need to check for unique primary key when editing
 738                    continue
 739                lookup_kwargs[str(field_name)] = lookup_value
 740
 741            # some fields were skipped, no reason to do the check
 742            if len(unique_check) != len(lookup_kwargs):
 743                continue
 744
 745            qs = model_class.query.filter(**lookup_kwargs)  # type: ignore[attr-defined]
 746
 747            # Exclude the current object from the query if we are editing an
 748            # instance (as opposed to creating a new one)
 749            # Use the primary key defined by model_class. In previous versions
 750            # this could differ from `self.id` due to model inheritance.
 751            model_class_id = getattr(self, "id")
 752            if not self._state.adding and model_class_id is not None:
 753                qs = qs.exclude(id=model_class_id)
 754            if qs.exists():
 755                if len(unique_check) == 1:
 756                    key = unique_check[0]
 757                else:
 758                    key = NON_FIELD_ERRORS
 759                errors.setdefault(key, []).append(
 760                    self.unique_error_message(model_class, unique_check)
 761                )
 762
 763        return errors
 764
 765    def unique_error_message(
 766        self, model_class: type[Model], unique_check: tuple[str, ...]
 767    ) -> ValidationError:
 768        meta = model_class._model_meta
 769
 770        params = {
 771            "model": self,
 772            "model_class": model_class,
 773            "model_name": model_class.model_options.model_name,
 774            "unique_check": unique_check,
 775        }
 776
 777        if len(unique_check) == 1:
 778            field = meta.get_field(unique_check[0])
 779            params["field_label"] = field.name
 780            return ValidationError(
 781                message=field.error_messages["unique"],
 782                code="unique",
 783                params=params,
 784            )
 785        else:
 786            field_names = [meta.get_field(f).name for f in unique_check]
 787
 788            # Put an "and" before the last one
 789            field_names[-1] = f"and {field_names[-1]}"
 790
 791            if len(field_names) > 2:
 792                # Comma join if more than 2
 793                params["field_label"] = ", ".join(field_names)
 794            else:
 795                # Just a space if there are only 2
 796                params["field_label"] = " ".join(field_names)
 797
 798            # Use the first field as the message format...
 799            message = meta.get_field(unique_check[0]).error_messages["unique"]
 800
 801            return ValidationError(
 802                message=message,
 803                code="unique",
 804                params=params,
 805            )
 806
 807    def get_constraints(self) -> list[tuple[type, list[Any]]]:
 808        constraints = [(self.__class__, self.model_options.constraints)]
 809        return constraints
 810
 811    def validate_constraints(self, exclude: set[str] | None = None) -> None:
 812        constraints = self.get_constraints()
 813
 814        errors = {}
 815        for model_class, model_constraints in constraints:
 816            for constraint in model_constraints:
 817                try:
 818                    constraint.validate(model_class, self, exclude=exclude)
 819                except ValidationError as e:
 820                    if (
 821                        getattr(e, "code", None) == "unique"
 822                        and len(constraint.fields) == 1
 823                    ):
 824                        errors.setdefault(constraint.fields[0], []).append(e)
 825                    else:
 826                        errors = e.update_error_dict(errors)
 827        if errors:
 828            raise ValidationError(errors)
 829
 830    def full_clean(
 831        self,
 832        *,
 833        exclude: set[str] | Iterable[str] | None = None,
 834        validate_unique: bool = True,
 835        validate_constraints: bool = True,
 836    ) -> None:
 837        """
 838        Call clean_fields(), clean(), validate_unique(), and
 839        validate_constraints() on the model. Raise a ValidationError for any
 840        errors that occur.
 841        """
 842        errors = {}
 843        if exclude is None:
 844            exclude = set()
 845        else:
 846            exclude = set(exclude)
 847
 848        try:
 849            self.clean_fields(exclude=exclude)
 850        except ValidationError as e:
 851            errors = e.update_error_dict(errors)
 852
 853        # Form.clean() is run even if other validation fails, so do the
 854        # same with Model.clean() for consistency.
 855        try:
 856            self.clean()
 857        except ValidationError as e:
 858            errors = e.update_error_dict(errors)
 859
 860        # Run unique checks, but only for fields that passed validation.
 861        if validate_unique:
 862            for name in errors:
 863                if name != NON_FIELD_ERRORS and name not in exclude:
 864                    exclude.add(name)
 865            try:
 866                self.validate_unique(exclude=exclude)
 867            except ValidationError as e:
 868                errors = e.update_error_dict(errors)
 869
 870        # Run constraints checks, but only for fields that passed validation.
 871        if validate_constraints:
 872            for name in errors:
 873                if name != NON_FIELD_ERRORS and name not in exclude:
 874                    exclude.add(name)
 875            try:
 876                self.validate_constraints(exclude=exclude)
 877            except ValidationError as e:
 878                errors = e.update_error_dict(errors)
 879
 880        if errors:
 881            raise ValidationError(errors)
 882
 883    def clean_fields(self, exclude: set[str] | None = None) -> None:
 884        """
 885        Clean all fields and raise a ValidationError containing a dict
 886        of all validation errors if any occur.
 887        """
 888        if exclude is None:
 889            exclude = set()
 890
 891        errors = {}
 892        for f in self._model_meta.fields:
 893            if f.name in exclude:
 894                continue
 895            # Skip validation for empty fields with required=False. The developer
 896            # is responsible for making sure they have a valid value.
 897            raw_value = getattr(self, f.attname)
 898            if not f.required and raw_value in f.empty_values:
 899                continue
 900            try:
 901                setattr(self, f.attname, f.clean(raw_value, self))
 902            except ValidationError as e:
 903                errors[f.name] = e.error_list
 904
 905        if errors:
 906            raise ValidationError(errors)
 907
 908    @classmethod
 909    def preflight(cls) -> list[PreflightResult]:
 910        errors = []
 911
 912        errors += [
 913            *cls._check_fields(),
 914            *cls._check_m2m_through_same_relationship(),
 915            *cls._check_long_column_names(),
 916        ]
 917        clash_errors = (
 918            *cls._check_id_field(),
 919            *cls._check_field_name_clashes(),
 920            *cls._check_model_name_db_lookup_clashes(),
 921            *cls._check_property_name_related_field_accessor_clashes(),
 922            *cls._check_single_primary_key(),
 923        )
 924        errors.extend(clash_errors)
 925        # If there are field name clashes, hide consequent column name
 926        # clashes.
 927        if not clash_errors:
 928            errors.extend(cls._check_column_name_clashes())
 929        errors += [
 930            *cls._check_indexes(),
 931            *cls._check_ordering(),
 932            *cls._check_constraints(),
 933            *cls._check_db_table_comment(),
 934        ]
 935
 936        return errors
 937
 938    @classmethod
 939    def _check_db_table_comment(cls) -> list[PreflightResult]:
 940        if not cls.model_options.db_table_comment:
 941            return []
 942        errors = []
 943        if not (
 944            db_connection.features.supports_comments
 945            or "supports_comments" in cls.model_options.required_db_features
 946        ):
 947            errors.append(
 948                PreflightResult(
 949                    fix=f"{db_connection.display_name} does not support comments on "
 950                    f"tables (db_table_comment).",
 951                    obj=cls,
 952                    id="models.db_table_comment_unsupported",
 953                    warning=True,
 954                )
 955            )
 956        return errors
 957
 958    @classmethod
 959    def _check_fields(cls) -> list[PreflightResult]:
 960        """Perform all field checks."""
 961        errors = []
 962        for field in cls._model_meta.local_fields:
 963            errors.extend(field.preflight(from_model=cls))
 964        for field in cls._model_meta.local_many_to_many:
 965            errors.extend(field.preflight(from_model=cls))
 966        return errors
 967
 968    @classmethod
 969    def _check_m2m_through_same_relationship(cls) -> list[PreflightResult]:
 970        """Check if no relationship model is used by more than one m2m field."""
 971
 972        errors = []
 973        seen_intermediary_signatures = []
 974
 975        fields = cls._model_meta.local_many_to_many
 976
 977        # Skip when the target model wasn't found.
 978        fields = (f for f in fields if isinstance(f.remote_field.model, ModelBase))
 979
 980        # Skip when the relationship model wasn't found.
 981        fields = (f for f in fields if isinstance(f.remote_field.through, ModelBase))
 982
 983        for f in fields:
 984            signature = (
 985                f.remote_field.model,
 986                cls,
 987                f.remote_field.through,
 988                f.remote_field.through_fields,
 989            )
 990            if signature in seen_intermediary_signatures:
 991                errors.append(
 992                    PreflightResult(
 993                        fix="The model has two identical many-to-many relations "
 994                        f"through the intermediate model '{f.remote_field.through.model_options.label}'.",
 995                        obj=cls,
 996                        id="models.duplicate_many_to_many_relations",
 997                    )
 998                )
 999            else:
1000                seen_intermediary_signatures.append(signature)
1001        return errors
1002
1003    @classmethod
1004    def _check_id_field(cls) -> list[PreflightResult]:
1005        """Disallow user-defined fields named ``id``."""
1006        if any(
1007            f
1008            for f in cls._model_meta.local_fields
1009            if f.name == "id" and not f.auto_created
1010        ):
1011            return [
1012                PreflightResult(
1013                    fix="'id' is a reserved word that cannot be used as a field name.",
1014                    obj=cls,
1015                    id="models.reserved_field_name_id",
1016                )
1017            ]
1018        return []
1019
1020    @classmethod
1021    def _check_field_name_clashes(cls) -> list[PreflightResult]:
1022        """Forbid field shadowing in multi-table inheritance."""
1023        errors = []
1024        used_fields = {}  # name or attname -> field
1025
1026        for f in cls._model_meta.local_fields:
1027            clash = used_fields.get(f.name) or used_fields.get(f.attname) or None
1028            # Note that we may detect clash between user-defined non-unique
1029            # field "id" and automatically added unique field "id", both
1030            # defined at the same model. This special case is considered in
1031            # _check_id_field and here we ignore it.
1032            id_conflict = (
1033                f.name == "id" and clash and clash.name == "id" and clash.model == cls
1034            )
1035            if clash and not id_conflict:
1036                errors.append(
1037                    PreflightResult(
1038                        fix=f"The field '{f.name}' clashes with the field '{clash.name}' "
1039                        f"from model '{clash.model.model_options}'.",
1040                        obj=f,
1041                        id="models.field_name_clash",
1042                    )
1043                )
1044            used_fields[f.name] = f
1045            used_fields[f.attname] = f
1046
1047        return errors
1048
1049    @classmethod
1050    def _check_column_name_clashes(cls) -> list[PreflightResult]:
1051        # Store a list of column names which have already been used by other fields.
1052        used_column_names = []
1053        errors = []
1054
1055        for f in cls._model_meta.local_fields:
1056            _, column_name = f.get_attname_column()
1057
1058            # Ensure the column name is not already in use.
1059            if column_name and column_name in used_column_names:
1060                errors.append(
1061                    PreflightResult(
1062                        fix=f"Field '{f.name}' has column name '{column_name}' that is used by "
1063                        "another field. Specify a 'db_column' for the field.",
1064                        obj=cls,
1065                        id="models.db_column_clash",
1066                    )
1067                )
1068            else:
1069                used_column_names.append(column_name)
1070
1071        return errors
1072
1073    @classmethod
1074    def _check_model_name_db_lookup_clashes(cls) -> list[PreflightResult]:
1075        errors = []
1076        model_name = cls.__name__
1077        if model_name.startswith("_") or model_name.endswith("_"):
1078            errors.append(
1079                PreflightResult(
1080                    fix=f"The model name '{model_name}' cannot start or end with an underscore "
1081                    "as it collides with the query lookup syntax.",
1082                    obj=cls,
1083                    id="models.model_name_underscore_bounds",
1084                )
1085            )
1086        elif LOOKUP_SEP in model_name:
1087            errors.append(
1088                PreflightResult(
1089                    fix=f"The model name '{model_name}' cannot contain double underscores as "
1090                    "it collides with the query lookup syntax.",
1091                    obj=cls,
1092                    id="models.model_name_double_underscore",
1093                )
1094            )
1095        return errors
1096
1097    @classmethod
1098    def _check_property_name_related_field_accessor_clashes(
1099        cls,
1100    ) -> list[PreflightResult]:
1101        errors = []
1102        property_names = cls._model_meta._property_names
1103        related_field_accessors = (
1104            f.get_attname()
1105            for f in cls._model_meta._get_fields(reverse=False)
1106            if f.is_relation and f.related_model is not None
1107        )
1108        for accessor in related_field_accessors:
1109            if accessor in property_names:
1110                errors.append(
1111                    PreflightResult(
1112                        fix=f"The property '{accessor}' clashes with a related field "
1113                        "accessor.",
1114                        obj=cls,
1115                        id="models.property_related_field_clash",
1116                    )
1117                )
1118        return errors
1119
1120    @classmethod
1121    def _check_single_primary_key(cls) -> list[PreflightResult]:
1122        errors = []
1123        if sum(1 for f in cls._model_meta.local_fields if f.primary_key) > 1:
1124            errors.append(
1125                PreflightResult(
1126                    fix="The model cannot have more than one field with "
1127                    "'primary_key=True'.",
1128                    obj=cls,
1129                    id="models.multiple_primary_keys",
1130                )
1131            )
1132        return errors
1133
1134    @classmethod
1135    def _check_indexes(cls) -> list[PreflightResult]:
1136        """Check fields, names, and conditions of indexes."""
1137        errors = []
1138        references = set()
1139        for index in cls.model_options.indexes:
1140            # Index name can't start with an underscore or a number, restricted
1141            # for cross-database compatibility with Oracle.
1142            if index.name[0] == "_" or index.name[0].isdigit():
1143                errors.append(
1144                    PreflightResult(
1145                        fix=f"The index name '{index.name}' cannot start with an underscore "
1146                        "or a number.",
1147                        obj=cls,
1148                        id="models.index_name_invalid_start",
1149                    ),
1150                )
1151            if len(index.name) > index.max_name_length:
1152                errors.append(
1153                    PreflightResult(
1154                        fix="The index name '%s' cannot be longer than %d "  # noqa: UP031
1155                        "characters." % (index.name, index.max_name_length),
1156                        obj=cls,
1157                        id="models.index_name_too_long",
1158                    ),
1159                )
1160            if index.contains_expressions:
1161                for expression in index.expressions:
1162                    references.update(
1163                        ref[0] for ref in cls._get_expr_references(expression)
1164                    )
1165        if not (
1166            db_connection.features.supports_partial_indexes
1167            or "supports_partial_indexes" in cls.model_options.required_db_features
1168        ) and any(index.condition is not None for index in cls.model_options.indexes):
1169            errors.append(
1170                PreflightResult(
1171                    fix=f"{db_connection.display_name} does not support indexes with conditions. "
1172                    "Conditions will be ignored. Silence this warning "
1173                    "if you don't care about it.",
1174                    warning=True,
1175                    obj=cls,
1176                    id="models.index_conditions_ignored",
1177                )
1178            )
1179        if not (
1180            db_connection.features.supports_covering_indexes
1181            or "supports_covering_indexes" in cls.model_options.required_db_features
1182        ) and any(index.include for index in cls.model_options.indexes):
1183            errors.append(
1184                PreflightResult(
1185                    fix=f"{db_connection.display_name} does not support indexes with non-key columns. "
1186                    "Non-key columns will be ignored. Silence this "
1187                    "warning if you don't care about it.",
1188                    warning=True,
1189                    obj=cls,
1190                    id="models.index_non_key_columns_ignored",
1191                )
1192            )
1193        if not (
1194            db_connection.features.supports_expression_indexes
1195            or "supports_expression_indexes" in cls.model_options.required_db_features
1196        ) and any(index.contains_expressions for index in cls.model_options.indexes):
1197            errors.append(
1198                PreflightResult(
1199                    fix=f"{db_connection.display_name} does not support indexes on expressions. "
1200                    "An index won't be created. Silence this warning "
1201                    "if you don't care about it.",
1202                    warning=True,
1203                    obj=cls,
1204                    id="models.index_on_foreign_key",
1205                )
1206            )
1207        fields = [
1208            field
1209            for index in cls.model_options.indexes
1210            for field, _ in index.fields_orders
1211        ]
1212        fields += [
1213            include for index in cls.model_options.indexes for include in index.include
1214        ]
1215        fields += references
1216        errors.extend(cls._check_local_fields(fields, "indexes"))
1217        return errors
1218
1219    @classmethod
1220    def _check_local_fields(
1221        cls, fields: Iterable[str], option: str
1222    ) -> list[PreflightResult]:
1223        from plain import models
1224
1225        # In order to avoid hitting the relation tree prematurely, we use our
1226        # own fields_map instead of using get_field()
1227        forward_fields_map = {}
1228        for field in cls._model_meta._get_fields(reverse=False):
1229            forward_fields_map[field.name] = field
1230            if hasattr(field, "attname"):
1231                forward_fields_map[field.attname] = field
1232
1233        errors = []
1234        for field_name in fields:
1235            try:
1236                field = forward_fields_map[field_name]
1237            except KeyError:
1238                errors.append(
1239                    PreflightResult(
1240                        fix=f"'{option}' refers to the nonexistent field '{field_name}'.",
1241                        obj=cls,
1242                        id="models.nonexistent_field_reference",
1243                    )
1244                )
1245            else:
1246                if isinstance(field.remote_field, models.ManyToManyRel):
1247                    errors.append(
1248                        PreflightResult(
1249                            fix=f"'{option}' refers to a ManyToManyField '{field_name}', but "
1250                            f"ManyToManyFields are not permitted in '{option}'.",
1251                            obj=cls,
1252                            id="models.m2m_field_in_meta_option",
1253                        )
1254                    )
1255                elif field not in cls._model_meta.local_fields:
1256                    errors.append(
1257                        PreflightResult(
1258                            fix=f"'{option}' refers to field '{field_name}' which is not local to model "
1259                            f"'{cls.model_options.object_name}'. This issue may be caused by multi-table inheritance.",
1260                            obj=cls,
1261                            id="models.non_local_field_reference",
1262                        )
1263                    )
1264        return errors
1265
1266    @classmethod
1267    def _check_ordering(cls) -> list[PreflightResult]:
1268        """
1269        Check "ordering" option -- is it a list of strings and do all fields
1270        exist?
1271        """
1272
1273        if not cls.model_options.ordering:
1274            return []
1275
1276        if not isinstance(cls.model_options.ordering, list | tuple):
1277            return [
1278                PreflightResult(
1279                    fix="'ordering' must be a tuple or list (even if you want to order by "
1280                    "only one field).",
1281                    obj=cls,
1282                    id="models.ordering_not_tuple_or_list",
1283                )
1284            ]
1285
1286        errors = []
1287        fields = cls.model_options.ordering
1288
1289        # Skip expressions and '?' fields.
1290        fields = (f for f in fields if isinstance(f, str) and f != "?")
1291
1292        # Convert "-field" to "field".
1293        fields = (f.removeprefix("-") for f in fields)
1294
1295        # Separate related fields and non-related fields.
1296        _fields = []
1297        related_fields = []
1298        for f in fields:
1299            if LOOKUP_SEP in f:
1300                related_fields.append(f)
1301            else:
1302                _fields.append(f)
1303        fields = _fields
1304
1305        # Check related fields.
1306        for field in related_fields:
1307            _cls = cls
1308            fld = None
1309            for part in field.split(LOOKUP_SEP):
1310                try:
1311                    fld = _cls._model_meta.get_field(part)
1312                    if fld.is_relation:
1313                        _cls = fld.path_infos[-1].to_meta.model
1314                    else:
1315                        _cls = None
1316                except (FieldDoesNotExist, AttributeError):
1317                    if fld is None or (
1318                        fld.get_transform(part) is None and fld.get_lookup(part) is None
1319                    ):
1320                        errors.append(
1321                            PreflightResult(
1322                                fix="'ordering' refers to the nonexistent field, "
1323                                f"related field, or lookup '{field}'.",
1324                                obj=cls,
1325                                id="models.ordering_nonexistent_field",
1326                            )
1327                        )
1328
1329        # Check for invalid or nonexistent fields in ordering.
1330        invalid_fields = []
1331
1332        # Any field name that is not present in field_names does not exist.
1333        # Also, ordering by m2m fields is not allowed.
1334        meta = cls._model_meta
1335        valid_fields = set(
1336            chain.from_iterable(
1337                (f.name, f.attname)
1338                if not (f.auto_created and not f.concrete)
1339                else (f.field.related_query_name(),)
1340                for f in chain(meta.fields, meta.related_objects)
1341            )
1342        )
1343
1344        invalid_fields.extend(set(fields) - valid_fields)
1345
1346        for invalid_field in invalid_fields:
1347            errors.append(
1348                PreflightResult(
1349                    fix="'ordering' refers to the nonexistent field, related "
1350                    f"field, or lookup '{invalid_field}'.",
1351                    obj=cls,
1352                    id="models.ordering_nonexistent_field",
1353                )
1354            )
1355        return errors
1356
1357    @classmethod
1358    def _check_long_column_names(cls) -> list[PreflightResult]:
1359        """
1360        Check that any auto-generated column names are shorter than the limits
1361        for each database in which the model will be created.
1362        """
1363        errors = []
1364        allowed_len = None
1365
1366        max_name_length = db_connection.ops.max_name_length()
1367        if max_name_length is not None and not db_connection.features.truncates_names:
1368            allowed_len = max_name_length
1369
1370        if allowed_len is None:
1371            return errors
1372
1373        for f in cls._model_meta.local_fields:
1374            _, column_name = f.get_attname_column()
1375
1376            # Check if auto-generated name for the field is too long
1377            # for the database.
1378            if (
1379                f.db_column is None
1380                and column_name is not None
1381                and len(column_name) > allowed_len
1382            ):
1383                errors.append(
1384                    PreflightResult(
1385                        fix=f'Autogenerated column name too long for field "{column_name}". '
1386                        f'Maximum length is "{allowed_len}" for the database. '
1387                        "Set the column name manually using 'db_column'.",
1388                        obj=cls,
1389                        id="models.autogenerated_column_name_too_long",
1390                    )
1391                )
1392
1393        for f in cls._model_meta.local_many_to_many:
1394            # Skip nonexistent models.
1395            if isinstance(f.remote_field.through, str):
1396                continue
1397
1398            # Check if auto-generated name for the M2M field is too long
1399            # for the database.
1400            for m2m in f.remote_field.through._model_meta.local_fields:
1401                _, rel_name = m2m.get_attname_column()
1402                if (
1403                    m2m.db_column is None
1404                    and rel_name is not None
1405                    and len(rel_name) > allowed_len
1406                ):
1407                    errors.append(
1408                        PreflightResult(
1409                            fix="Autogenerated column name too long for M2M field "
1410                            f'"{rel_name}". Maximum length is "{allowed_len}" for the database. '
1411                            "Use 'through' to create a separate model for "
1412                            "M2M and then set column_name using 'db_column'.",
1413                            obj=cls,
1414                            id="models.m2m_column_name_too_long",
1415                        )
1416                    )
1417
1418        return errors
1419
1420    @classmethod
1421    def _get_expr_references(cls, expr: Any) -> Iterator[tuple[str, ...]]:
1422        if isinstance(expr, Q):
1423            for child in expr.children:
1424                if isinstance(child, tuple):
1425                    lookup, value = child
1426                    yield tuple(lookup.split(LOOKUP_SEP))
1427                    yield from cls._get_expr_references(value)
1428                else:
1429                    yield from cls._get_expr_references(child)
1430        elif isinstance(expr, F):
1431            yield tuple(expr.name.split(LOOKUP_SEP))
1432        elif hasattr(expr, "get_source_expressions"):
1433            for src_expr in expr.get_source_expressions():
1434                yield from cls._get_expr_references(src_expr)
1435
1436    @classmethod
1437    def _check_constraints(cls) -> list[PreflightResult]:
1438        errors = []
1439        if not (
1440            db_connection.features.supports_table_check_constraints
1441            or "supports_table_check_constraints"
1442            in cls.model_options.required_db_features
1443        ) and any(
1444            isinstance(constraint, CheckConstraint)
1445            for constraint in cls.model_options.constraints
1446        ):
1447            errors.append(
1448                PreflightResult(
1449                    fix=f"{db_connection.display_name} does not support check constraints. "
1450                    "A constraint won't be created. Silence this "
1451                    "warning if you don't care about it.",
1452                    obj=cls,
1453                    id="models.constraint_on_non_db_field",
1454                    warning=True,
1455                )
1456            )
1457
1458        if not (
1459            db_connection.features.supports_partial_indexes
1460            or "supports_partial_indexes" in cls.model_options.required_db_features
1461        ) and any(
1462            isinstance(constraint, UniqueConstraint)
1463            and constraint.condition is not None
1464            for constraint in cls.model_options.constraints
1465        ):
1466            errors.append(
1467                PreflightResult(
1468                    fix=f"{db_connection.display_name} does not support unique constraints with "
1469                    "conditions. A constraint won't be created. Silence this "
1470                    "warning if you don't care about it.",
1471                    obj=cls,
1472                    id="models.constraint_on_virtual_field",
1473                    warning=True,
1474                )
1475            )
1476
1477        if not (
1478            db_connection.features.supports_deferrable_unique_constraints
1479            or "supports_deferrable_unique_constraints"
1480            in cls.model_options.required_db_features
1481        ) and any(
1482            isinstance(constraint, UniqueConstraint)
1483            and constraint.deferrable is not None
1484            for constraint in cls.model_options.constraints
1485        ):
1486            errors.append(
1487                PreflightResult(
1488                    fix=f"{db_connection.display_name} does not support deferrable unique constraints. "
1489                    "A constraint won't be created. Silence this "
1490                    "warning if you don't care about it.",
1491                    obj=cls,
1492                    id="models.constraint_on_foreign_key",
1493                    warning=True,
1494                )
1495            )
1496
1497        if not (
1498            db_connection.features.supports_covering_indexes
1499            or "supports_covering_indexes" in cls.model_options.required_db_features
1500        ) and any(
1501            isinstance(constraint, UniqueConstraint) and constraint.include
1502            for constraint in cls.model_options.constraints
1503        ):
1504            errors.append(
1505                PreflightResult(
1506                    fix=f"{db_connection.display_name} does not support unique constraints with non-key "
1507                    "columns. A constraint won't be created. Silence this "
1508                    "warning if you don't care about it.",
1509                    obj=cls,
1510                    id="models.constraint_on_m2m_field",
1511                    warning=True,
1512                )
1513            )
1514
1515        if not (
1516            db_connection.features.supports_expression_indexes
1517            or "supports_expression_indexes" in cls.model_options.required_db_features
1518        ) and any(
1519            isinstance(constraint, UniqueConstraint) and constraint.contains_expressions
1520            for constraint in cls.model_options.constraints
1521        ):
1522            errors.append(
1523                PreflightResult(
1524                    fix=f"{db_connection.display_name} does not support unique constraints on "
1525                    "expressions. A constraint won't be created. Silence this "
1526                    "warning if you don't care about it.",
1527                    obj=cls,
1528                    id="models.constraint_on_self_referencing_fk",
1529                    warning=True,
1530                )
1531            )
1532        fields = set(
1533            chain.from_iterable(
1534                (*constraint.fields, *constraint.include)
1535                for constraint in cls.model_options.constraints
1536                if isinstance(constraint, UniqueConstraint)
1537            )
1538        )
1539        references = set()
1540        for constraint in cls.model_options.constraints:
1541            if isinstance(constraint, UniqueConstraint):
1542                if (
1543                    db_connection.features.supports_partial_indexes
1544                    or "supports_partial_indexes"
1545                    not in cls.model_options.required_db_features
1546                ) and isinstance(constraint.condition, Q):
1547                    references.update(cls._get_expr_references(constraint.condition))
1548                if (
1549                    db_connection.features.supports_expression_indexes
1550                    or "supports_expression_indexes"
1551                    not in cls.model_options.required_db_features
1552                ) and constraint.contains_expressions:
1553                    for expression in constraint.expressions:
1554                        references.update(cls._get_expr_references(expression))
1555            elif isinstance(constraint, CheckConstraint):
1556                if (
1557                    db_connection.features.supports_table_check_constraints
1558                    or "supports_table_check_constraints"
1559                    not in cls.model_options.required_db_features
1560                ):
1561                    if isinstance(constraint.check, Q):
1562                        references.update(cls._get_expr_references(constraint.check))
1563                    if any(
1564                        isinstance(expr, RawSQL) for expr in constraint.check.flatten()
1565                    ):
1566                        errors.append(
1567                            PreflightResult(
1568                                fix=f"Check constraint {constraint.name!r} contains "
1569                                f"RawSQL() expression and won't be validated "
1570                                f"during the model full_clean(). "
1571                                "Silence this warning if you don't care about it.",
1572                                warning=True,
1573                                obj=cls,
1574                                id="models.constraint_name_collision_autogenerated",
1575                            ),
1576                        )
1577        for field_name, *lookups in references:
1578            fields.add(field_name)
1579            if not lookups:
1580                # If it has no lookups it cannot result in a JOIN.
1581                continue
1582            try:
1583                field = cls._model_meta.get_field(field_name)
1584                if not field.is_relation or field.many_to_many or field.one_to_many:
1585                    continue
1586            except FieldDoesNotExist:
1587                continue
1588            # JOIN must happen at the first lookup.
1589            first_lookup = lookups[0]
1590            if (
1591                hasattr(field, "get_transform")
1592                and hasattr(field, "get_lookup")
1593                and field.get_transform(first_lookup) is None
1594                and field.get_lookup(first_lookup) is None
1595            ):
1596                errors.append(
1597                    PreflightResult(
1598                        fix=f"'constraints' refers to the joined field '{LOOKUP_SEP.join([field_name] + lookups)}'.",
1599                        obj=cls,
1600                        id="models.constraint_refers_to_joined_field",
1601                    )
1602                )
1603        errors.extend(cls._check_local_fields(fields, "constraints"))
1604        return errors
1605
1606
1607########
1608# MISC #
1609########
1610
1611
1612def model_unpickle(model_id: tuple[str, str] | type[Model]) -> Model:
1613    """Used to unpickle Model subclasses with deferred fields."""
1614    if isinstance(model_id, tuple):
1615        model = models_registry.get_model(*model_id)
1616    else:
1617        # Backwards compat - the model was cached directly in earlier versions.
1618        model = model_id
1619    return model.__new__(model)
1620
1621
1622model_unpickle.__safe_for_unpickle__ = True  # type: ignore[attr-defined]