Plain is headed towards 1.0! Subscribe for development updates →

   1from __future__ import annotations
   2
   3import copy
   4import warnings
   5from collections.abc import Iterable, Iterator, Sequence
   6from itertools import chain
   7from typing import TYPE_CHECKING, Any, ClassVar, dataclass_transform
   8
   9if TYPE_CHECKING:
  10    from plain.models.meta import Meta
  11    from plain.models.options import Options
  12
  13import plain.runtime
  14from plain.exceptions import NON_FIELD_ERRORS, ValidationError
  15from plain.models import models_registry, transaction, types
  16from plain.models.constants import LOOKUP_SEP
  17from plain.models.constraints import CheckConstraint, UniqueConstraint
  18from plain.models.db import (
  19    PLAIN_VERSION_PICKLE_KEY,
  20    DatabaseError,
  21    db_connection,
  22)
  23from plain.models.deletion import Collector
  24from plain.models.exceptions import (
  25    DoesNotExistDescriptor,
  26    FieldDoesNotExist,
  27    MultipleObjectsReturnedDescriptor,
  28)
  29from plain.models.expressions import RawSQL, Value
  30from plain.models.fields import NOT_PROVIDED
  31from plain.models.fields.reverse_related import ForeignObjectRel
  32from plain.models.meta import Meta
  33from plain.models.options import Options
  34from plain.models.query import F, Q, QuerySet
  35from plain.preflight import PreflightResult
  36from plain.utils.encoding import force_str
  37from plain.utils.hashable import make_hashable
  38
  39
  40class Deferred:
  41    def __repr__(self) -> str:
  42        return "<Deferred field>"
  43
  44    def __str__(self) -> str:
  45        return "<Deferred field>"
  46
  47
  48DEFERRED = Deferred()
  49
  50
  51@dataclass_transform(kw_only_default=True)
  52class ModelBase(type):
  53    """Metaclass for all models."""
  54
  55    def __new__(
  56        cls, name: str, bases: tuple[type, ...], attrs: dict[str, Any], **kwargs: Any
  57    ) -> type:
  58        # Don't do any of this for the root models.Model class.
  59        if not bases:
  60            return super().__new__(cls, name, bases, attrs)
  61
  62        for base in bases:
  63            # Models are required to directly inherit from model.Model, not a subclass of it.
  64            if issubclass(base, Model) and base is not Model:
  65                raise TypeError(
  66                    f"A model can't extend another model: {name} extends {base}"
  67                )
  68
  69        return super().__new__(cls, name, bases, attrs, **kwargs)
  70
  71
  72class ModelStateFieldsCacheDescriptor:
  73    def __get__(
  74        self, instance: ModelState | None, cls: type | None = None
  75    ) -> ModelStateFieldsCacheDescriptor | dict[str, Any]:
  76        if instance is None:
  77            return self
  78        res = instance.fields_cache = {}
  79        return res
  80
  81
  82class ModelState:
  83    """Store model instance state."""
  84
  85    # If true, uniqueness validation checks will consider this a new, unsaved
  86    # object. Necessary for correct validation of new instances of objects with
  87    # explicit (non-auto) PKs. This impacts validation only; it has no effect
  88    # on the actual save.
  89    adding = True
  90    fields_cache = ModelStateFieldsCacheDescriptor()
  91
  92
  93class Model(metaclass=ModelBase):
  94    # Every model gets an automatic id field
  95    id: int = types.PrimaryKeyField()
  96
  97    # Descriptors for other model behavior
  98    query: ClassVar[QuerySet[Model]] = QuerySet()
  99    model_options = Options()
 100    _model_meta = Meta()
 101    DoesNotExist = DoesNotExistDescriptor()
 102    MultipleObjectsReturned = MultipleObjectsReturnedDescriptor()
 103
 104    def __init__(self, **kwargs: Any):
 105        # Alias some things as locals to avoid repeat global lookups
 106        cls = self.__class__
 107        meta = cls._model_meta
 108        _setattr = setattr
 109        _DEFERRED = DEFERRED
 110
 111        # Set up the storage for instance state
 112        self._state = ModelState()
 113
 114        # Process all fields from kwargs or use defaults
 115        for field in meta.fields:
 116            is_related_object = False
 117            # Virtual field
 118            if field.attname not in kwargs and field.column is None:
 119                continue
 120            if isinstance(field.remote_field, ForeignObjectRel):
 121                try:
 122                    # Assume object instance was passed in.
 123                    rel_obj = kwargs.pop(field.name)
 124                    is_related_object = True
 125                except KeyError:
 126                    try:
 127                        # Object instance wasn't passed in -- must be an ID.
 128                        val = kwargs.pop(field.attname)
 129                    except KeyError:
 130                        val = field.get_default()
 131            else:
 132                try:
 133                    val = kwargs.pop(field.attname)
 134                except KeyError:
 135                    # This is done with an exception rather than the
 136                    # default argument on pop because we don't want
 137                    # get_default() to be evaluated, and then not used.
 138                    # Refs #12057.
 139                    val = field.get_default()
 140
 141            if is_related_object:
 142                # If we are passed a related instance, set it using the
 143                # field.name instead of field.attname (e.g. "user" instead of
 144                # "user_id") so that the object gets properly cached (and type
 145                # checked) by the RelatedObjectDescriptor.
 146                if rel_obj is not _DEFERRED:
 147                    _setattr(self, field.name, rel_obj)
 148            else:
 149                if val is not _DEFERRED:
 150                    _setattr(self, field.attname, val)
 151
 152        # Handle any remaining kwargs (properties or virtual fields)
 153        property_names = meta._property_names
 154        unexpected = ()
 155        for prop, value in kwargs.items():
 156            # Any remaining kwargs must correspond to properties or virtual
 157            # fields.
 158            if prop in property_names:
 159                if value is not _DEFERRED:
 160                    _setattr(self, prop, value)
 161            else:
 162                try:
 163                    meta.get_field(prop)
 164                except FieldDoesNotExist:
 165                    unexpected += (prop,)
 166                else:
 167                    if value is not _DEFERRED:
 168                        _setattr(self, prop, value)
 169        if unexpected:
 170            unexpected_names = ", ".join(repr(n) for n in unexpected)
 171            raise TypeError(
 172                f"{cls.__name__}() got unexpected keyword arguments: {unexpected_names}"
 173            )
 174
 175        super().__init__()
 176
 177    @classmethod
 178    def from_db(cls, field_names: Iterable[str], values: Sequence[Any]) -> Model:
 179        if len(values) != len(cls._model_meta.concrete_fields):
 180            values_iter = iter(values)
 181            values = [
 182                next(values_iter) if f.attname in field_names else DEFERRED
 183                for f in cls._model_meta.concrete_fields
 184            ]
 185        # Build kwargs dict from field names and values
 186        field_dict = dict(
 187            zip((f.attname for f in cls._model_meta.concrete_fields), values)
 188        )
 189        new = cls(**field_dict)
 190        new._state.adding = False
 191        return new
 192
 193    def __repr__(self) -> str:
 194        return f"<{self.__class__.__name__}: {self}>"
 195
 196    def __str__(self) -> str:
 197        return f"{self.__class__.__name__} object ({self.id})"
 198
 199    def __eq__(self, other: object) -> bool:
 200        if not isinstance(other, Model):
 201            return NotImplemented
 202        if self.__class__ != other.__class__:
 203            return False
 204        my_id = self.id
 205        if my_id is None:
 206            return self is other
 207        return my_id == other.id
 208
 209    def __hash__(self) -> int:
 210        if self.id is None:
 211            raise TypeError("Model instances without primary key value are unhashable")
 212        return hash(self.id)
 213
 214    def __reduce__(self) -> tuple[Any, tuple[Any, ...], dict[str, Any]]:
 215        data = self.__getstate__()
 216        data[PLAIN_VERSION_PICKLE_KEY] = plain.runtime.__version__
 217        class_id = (
 218            self.model_options.package_label,
 219            self.model_options.object_name,
 220        )
 221        return model_unpickle, (class_id,), data
 222
 223    def __getstate__(self) -> dict[str, Any]:
 224        """Hook to allow choosing the attributes to pickle."""
 225        state = self.__dict__.copy()
 226        state["_state"] = copy.copy(state["_state"])
 227        state["_state"].fields_cache = state["_state"].fields_cache.copy()
 228        # memoryview cannot be pickled, so cast it to bytes and store
 229        # separately.
 230        _memoryview_attrs = []
 231        for attr, value in state.items():
 232            if isinstance(value, memoryview):
 233                _memoryview_attrs.append((attr, bytes(value)))
 234        if _memoryview_attrs:
 235            state["_memoryview_attrs"] = _memoryview_attrs
 236            for attr, value in _memoryview_attrs:
 237                state.pop(attr)
 238        return state
 239
 240    def __setstate__(self, state: dict[str, Any]) -> None:
 241        pickled_version = state.get(PLAIN_VERSION_PICKLE_KEY)
 242        if pickled_version:
 243            if pickled_version != plain.runtime.__version__:
 244                warnings.warn(
 245                    f"Pickled model instance's Plain version {pickled_version} does not "
 246                    f"match the current version {plain.runtime.__version__}.",
 247                    RuntimeWarning,
 248                    stacklevel=2,
 249                )
 250        else:
 251            warnings.warn(
 252                "Pickled model instance's Plain version is not specified.",
 253                RuntimeWarning,
 254                stacklevel=2,
 255            )
 256        if "_memoryview_attrs" in state:
 257            for attr, value in state.pop("_memoryview_attrs"):
 258                state[attr] = memoryview(value)
 259        self.__dict__.update(state)
 260
 261    def get_deferred_fields(self) -> set[str]:
 262        """
 263        Return a set containing names of deferred fields for this instance.
 264        """
 265        return {
 266            f.attname
 267            for f in self._model_meta.concrete_fields
 268            if f.attname not in self.__dict__
 269        }
 270
 271    def refresh_from_db(self, fields: list[str] | None = None) -> None:
 272        """
 273        Reload field values from the database.
 274
 275        By default, the reloading happens from the database this instance was
 276        loaded from, or by the read router if this instance wasn't loaded from
 277        any database. The using parameter will override the default.
 278
 279        Fields can be used to specify which fields to reload. The fields
 280        should be an iterable of field attnames. If fields is None, then
 281        all non-deferred fields are reloaded.
 282
 283        When accessing deferred fields of an instance, the deferred loading
 284        of the field will call this method.
 285        """
 286        if fields is None:
 287            self._prefetched_objects_cache = {}
 288        else:
 289            prefetched_objects_cache = getattr(self, "_prefetched_objects_cache", ())
 290            for field in fields:
 291                if field in prefetched_objects_cache:
 292                    del prefetched_objects_cache[field]  # type: ignore[misc]
 293                    fields.remove(field)
 294            if not fields:
 295                return
 296            if any(LOOKUP_SEP in f for f in fields):
 297                raise ValueError(
 298                    f'Found "{LOOKUP_SEP}" in fields argument. Relations and transforms '
 299                    "are not allowed in fields."
 300                )
 301
 302        db_instance_qs = self._model_meta.base_queryset.filter(id=self.id)
 303
 304        # Use provided fields, if not set then reload all non-deferred fields.
 305        deferred_fields = self.get_deferred_fields()
 306        if fields is not None:
 307            fields = list(fields)
 308            db_instance_qs = db_instance_qs.only(*fields)
 309        elif deferred_fields:
 310            fields = [
 311                f.attname
 312                for f in self._model_meta.concrete_fields
 313                if f.attname not in deferred_fields
 314            ]
 315            db_instance_qs = db_instance_qs.only(*fields)
 316
 317        db_instance = db_instance_qs.get()
 318        non_loaded_fields = db_instance.get_deferred_fields()
 319        for field in self._model_meta.concrete_fields:
 320            if field.attname in non_loaded_fields:
 321                # This field wasn't refreshed - skip ahead.
 322                continue
 323            setattr(self, field.attname, getattr(db_instance, field.attname))
 324            # Clear cached foreign keys.
 325            if field.is_relation and field.is_cached(self):
 326                field.delete_cached_value(self)
 327
 328        # Clear cached relations.
 329        for field in self._model_meta.related_objects:
 330            if field.is_cached(self):
 331                field.delete_cached_value(self)
 332
 333    def serializable_value(self, field_name: str) -> Any:
 334        """
 335        Return the value of the field name for this instance. If the field is
 336        a foreign key, return the id value instead of the object. If there's
 337        no Field object with this name on the model, return the model
 338        attribute's value.
 339
 340        Used to serialize a field's value (in the serializer, or form output,
 341        for example). Normally, you would just access the attribute directly
 342        and not use this method.
 343        """
 344        try:
 345            field = self._model_meta.get_field(field_name)
 346        except FieldDoesNotExist:
 347            return getattr(self, field_name)
 348        return getattr(self, field.attname)
 349
 350    def save(
 351        self,
 352        *,
 353        clean_and_validate: bool = True,
 354        force_insert: bool = False,
 355        force_update: bool = False,
 356        update_fields: Iterable[str] | None = None,
 357    ) -> None:
 358        """
 359        Save the current instance. Override this in a subclass if you want to
 360        control the saving process.
 361
 362        The 'force_insert' and 'force_update' parameters can be used to insist
 363        that the "save" must be an SQL insert or update (or equivalent for
 364        non-SQL backends), respectively. Normally, they should not be set.
 365        """
 366        self._prepare_related_fields_for_save(operation_name="save")
 367
 368        if force_insert and (force_update or update_fields):
 369            raise ValueError("Cannot force both insert and updating in model saving.")
 370
 371        deferred_fields = self.get_deferred_fields()
 372        if update_fields is not None:
 373            # If update_fields is empty, skip the save. We do also check for
 374            # no-op saves later on for inheritance cases. This bailout is
 375            # still needed for skipping signal sending.
 376            if not update_fields:
 377                return
 378
 379            update_fields = frozenset(update_fields)
 380            field_names = self._model_meta._non_pk_concrete_field_names
 381            non_model_fields = update_fields.difference(field_names)
 382
 383            if non_model_fields:
 384                raise ValueError(
 385                    "The following fields do not exist in this model, are m2m "
 386                    "fields, or are non-concrete fields: {}".format(
 387                        ", ".join(non_model_fields)
 388                    )
 389                )
 390
 391        # If this model is deferred, automatically do an "update_fields" save
 392        # on the loaded fields.
 393        elif not force_insert and deferred_fields:
 394            field_names = set()
 395            for field in self._model_meta.concrete_fields:
 396                if not field.primary_key and not hasattr(field, "through"):
 397                    field_names.add(field.attname)
 398            loaded_fields = field_names.difference(deferred_fields)
 399            if loaded_fields:
 400                update_fields = frozenset(loaded_fields)
 401
 402        if clean_and_validate:
 403            self.full_clean(exclude=deferred_fields)
 404
 405        self.save_base(
 406            force_insert=force_insert,
 407            force_update=force_update,
 408            update_fields=update_fields,
 409        )
 410
 411    def save_base(
 412        self,
 413        *,
 414        raw: bool = False,
 415        force_insert: bool = False,
 416        force_update: bool = False,
 417        update_fields: Iterable[str] | None = None,
 418    ) -> None:
 419        """
 420        Handle the parts of saving which should be done only once per save,
 421        yet need to be done in raw saves, too. This includes some sanity
 422        checks and signal sending.
 423
 424        The 'raw' argument is telling save_base not to save any parent
 425        models and not to do any changes to the values before save. This
 426        is used by fixture loading.
 427        """
 428        assert not (force_insert and (force_update or update_fields))
 429        assert update_fields is None or update_fields
 430        cls = self.__class__
 431
 432        with transaction.mark_for_rollback_on_error():
 433            self._save_table(
 434                raw=raw,
 435                cls=cls,
 436                force_insert=force_insert,
 437                force_update=force_update,
 438                update_fields=update_fields,
 439            )
 440        # Once saved, this is no longer a to-be-added instance.
 441        self._state.adding = False
 442
 443    def _save_table(
 444        self,
 445        *,
 446        raw: bool,
 447        cls: type[Model],
 448        force_insert: bool = False,
 449        force_update: bool = False,
 450        update_fields: Iterable[str] | None = None,
 451    ) -> bool:
 452        """
 453        Do the heavy-lifting involved in saving. Update or insert the data
 454        for a single table.
 455        """
 456        meta = cls._model_meta
 457        non_pks = [f for f in meta.local_concrete_fields if not f.primary_key]
 458
 459        if update_fields:
 460            non_pks = [
 461                f
 462                for f in non_pks
 463                if f.name in update_fields or f.attname in update_fields
 464            ]
 465
 466        id_val = self.id
 467        if id_val is None:
 468            id_field = meta.get_field("id")
 469            id_val = id_field.get_id_value_on_save(self)
 470            setattr(self, id_field.attname, id_val)
 471        id_set = id_val is not None
 472        if not id_set and (force_update or update_fields):
 473            raise ValueError("Cannot force an update in save() with no primary key.")
 474        updated = False
 475        # Skip an UPDATE when adding an instance and primary key has a default.
 476        if (
 477            not raw
 478            and not force_insert
 479            and self._state.adding
 480            and meta.get_field("id").default
 481            and meta.get_field("id").default is not NOT_PROVIDED
 482        ):
 483            force_insert = True
 484        # If possible, try an UPDATE. If that doesn't update anything, do an INSERT.
 485        if id_set and not force_insert:
 486            base_qs = meta.base_queryset
 487            values = [
 488                (
 489                    f,
 490                    None,
 491                    (getattr(self, f.attname) if raw else f.pre_save(self, False)),
 492                )
 493                for f in non_pks
 494            ]
 495            forced_update = bool(update_fields or force_update)
 496            updated = self._do_update(
 497                base_qs, id_val, values, update_fields, forced_update
 498            )
 499            if force_update and not updated:
 500                raise DatabaseError("Forced update did not affect any rows.")
 501            if update_fields and not updated:
 502                raise DatabaseError("Save with update_fields did not affect any rows.")
 503        if not updated:
 504            fields = meta.local_concrete_fields
 505            if not id_set:
 506                id_field = meta.get_field("id")
 507                fields = [f for f in fields if f is not id_field]
 508
 509            returning_fields = meta.db_returning_fields
 510            results = self._do_insert(meta.base_queryset, fields, returning_fields, raw)
 511            if results:
 512                for value, field in zip(results[0], returning_fields):
 513                    setattr(self, field.attname, value)
 514        return updated
 515
 516    def _do_update(
 517        self,
 518        base_qs: QuerySet,
 519        id_val: Any,
 520        values: list[tuple[Any, Any, Any]],
 521        update_fields: Iterable[str] | None,
 522        forced_update: bool,
 523    ) -> bool:
 524        """
 525        Try to update the model. Return True if the model was updated (if an
 526        update query was done and a matching row was found in the DB).
 527        """
 528        filtered = base_qs.filter(id=id_val)
 529        if not values:
 530            # We can end up here when saving a model in inheritance chain where
 531            # update_fields doesn't target any field in current model. In that
 532            # case we just say the update succeeded. Another case ending up here
 533            # is a model with just PK - in that case check that the PK still
 534            # exists.
 535            return update_fields is not None or filtered.exists()
 536        return filtered._update(values) > 0
 537
 538    def _do_insert(
 539        self,
 540        manager: QuerySet,
 541        fields: Sequence[Any],
 542        returning_fields: Sequence[Any],
 543        raw: bool,
 544    ) -> list[Any]:
 545        """
 546        Do an INSERT. If returning_fields is defined then this method should
 547        return the newly created data for the model.
 548        """
 549        return manager._insert(  # type: ignore[return-value, arg-type]
 550            [self],
 551            fields=fields,  # type: ignore[arg-type]
 552            returning_fields=returning_fields,  # type: ignore[arg-type]
 553            raw=raw,
 554        )
 555
 556    def _prepare_related_fields_for_save(
 557        self, operation_name: str, fields: Sequence[Any] | None = None
 558    ) -> None:
 559        # Ensure that a model instance without a PK hasn't been assigned to
 560        # a ForeignKey on this model. If the field is nullable, allowing the save would result in silent data loss.
 561        for field in self._model_meta.concrete_fields:
 562            if fields and field not in fields:
 563                continue
 564            # If the related field isn't cached, then an instance hasn't been
 565            # assigned and there's no need to worry about this check.
 566            if field.is_relation and field.is_cached(self):
 567                obj = getattr(self, field.name, None)
 568                if not obj:
 569                    continue
 570                # A pk may have been assigned manually to a model instance not
 571                # saved to the database (or auto-generated in a case like
 572                # UUIDField), but we allow the save to proceed and rely on the
 573                # database to raise an IntegrityError if applicable. If
 574                # constraints aren't supported by the database, there's the
 575                # unavoidable risk of data corruption.
 576                if obj.id is None:
 577                    # Remove the object from a related instance cache.
 578                    if not field.remote_field.multiple:
 579                        field.remote_field.delete_cached_value(obj)
 580                    raise ValueError(
 581                        f"{operation_name}() prohibited to prevent data loss due to unsaved "
 582                        f"related object '{field.name}'."
 583                    )
 584                elif getattr(self, field.attname) in field.empty_values:
 585                    # Set related object if it has been saved after an
 586                    # assignment.
 587                    setattr(self, field.name, obj)
 588                # If the relationship's pk/to_field was changed, clear the
 589                # cached relationship.
 590                if getattr(obj, field.target_field.attname) != getattr(
 591                    self, field.attname
 592                ):
 593                    field.delete_cached_value(self)
 594
 595    def delete(self) -> tuple[int, dict[str, int]]:
 596        if self.id is None:
 597            raise ValueError(
 598                f"{self.model_options.object_name} object can't be deleted because its id attribute is set "
 599                "to None."
 600            )
 601        collector = Collector(origin=self)
 602        collector.collect([self])
 603        return collector.delete()
 604
 605    def get_field_display(self, field_name: str) -> str:
 606        """Get the display value for a field, especially useful for fields with choices."""
 607        # Get the field object from the field name
 608        field = self._model_meta.get_field(field_name)
 609        value = getattr(self, field.attname)
 610
 611        # If field has no choices, just return the value as string
 612        if not hasattr(field, "flatchoices") or not field.flatchoices:
 613            return force_str(value, strings_only=True)
 614
 615        # For fields with choices, look up the display value
 616        choices_dict = dict(make_hashable(field.flatchoices))
 617        return force_str(
 618            choices_dict.get(make_hashable(value), value), strings_only=True
 619        )
 620
 621    def _get_field_value_map(
 622        self, meta: Meta | None, exclude: set[str] | None = None
 623    ) -> dict[str, Value]:
 624        if exclude is None:
 625            exclude = set()
 626        meta = meta or self._model_meta
 627        return {
 628            field.name: Value(getattr(self, field.attname), field)
 629            for field in meta.local_concrete_fields
 630            if field.name not in exclude
 631        }
 632
 633    def prepare_database_save(self, field: Any) -> Any:
 634        if self.id is None:
 635            raise ValueError(
 636                f"Unsaved model instance {self!r} cannot be used in an ORM query."
 637            )
 638        return getattr(self, field.remote_field.get_related_field().attname)
 639
 640    def clean(self) -> None:
 641        """
 642        Hook for doing any extra model-wide validation after clean() has been
 643        called on every field by self.clean_fields. Any ValidationError raised
 644        by this method will not be associated with a particular field; it will
 645        have a special-case association with the field defined by NON_FIELD_ERRORS.
 646        """
 647        pass
 648
 649    def validate_unique(self, exclude: set[str] | None = None) -> None:
 650        """
 651        Check unique constraints on the model and raise ValidationError if any
 652        failed.
 653        """
 654        unique_checks = self._get_unique_checks(exclude=exclude)
 655
 656        if errors := self._perform_unique_checks(unique_checks):
 657            raise ValidationError(errors)
 658
 659    def _get_unique_checks(
 660        self, exclude: set[str] | None = None
 661    ) -> list[tuple[type[Model], tuple[str, ...]]]:
 662        """
 663        Return a list of checks to perform. Since validate_unique() could be
 664        called from a ModelForm, some fields may have been excluded; we can't
 665        perform a unique check on a model that is missing fields involved
 666        in that check. Fields that did not validate should also be excluded,
 667        but they need to be passed in via the exclude argument.
 668        """
 669        if exclude is None:
 670            exclude = set()
 671        unique_checks = []
 672
 673        # Gather a list of checks for fields declared as unique and add them to
 674        # the list of checks.
 675
 676        fields_with_class = [(self.__class__, self._model_meta.local_fields)]
 677
 678        for model_class, fields in fields_with_class:
 679            for f in fields:
 680                name = f.name
 681                if name in exclude:
 682                    continue
 683                if f.primary_key:
 684                    unique_checks.append((model_class, (name,)))
 685
 686        return unique_checks
 687
 688    def _perform_unique_checks(
 689        self, unique_checks: list[tuple[type[Model], tuple[str, ...]]]
 690    ) -> dict[str, list[ValidationError]]:
 691        errors = {}
 692
 693        for model_class, unique_check in unique_checks:
 694            # Try to look up an existing object with the same values as this
 695            # object's values for all the unique field.
 696
 697            lookup_kwargs = {}
 698            for field_name in unique_check:
 699                f = self._model_meta.get_field(field_name)
 700                lookup_value = getattr(self, f.attname)
 701                # TODO: Handle multiple backends with different feature flags.
 702                if lookup_value is None:
 703                    # no value, skip the lookup
 704                    continue
 705                if f.primary_key and not self._state.adding:
 706                    # no need to check for unique primary key when editing
 707                    continue
 708                lookup_kwargs[str(field_name)] = lookup_value
 709
 710            # some fields were skipped, no reason to do the check
 711            if len(unique_check) != len(lookup_kwargs):
 712                continue
 713
 714            qs = model_class.query.filter(**lookup_kwargs)  # type: ignore[attr-defined]
 715
 716            # Exclude the current object from the query if we are editing an
 717            # instance (as opposed to creating a new one)
 718            # Use the primary key defined by model_class. In previous versions
 719            # this could differ from `self.id` due to model inheritance.
 720            model_class_id = getattr(self, "id")
 721            if not self._state.adding and model_class_id is not None:
 722                qs = qs.exclude(id=model_class_id)
 723            if qs.exists():
 724                if len(unique_check) == 1:
 725                    key = unique_check[0]
 726                else:
 727                    key = NON_FIELD_ERRORS
 728                errors.setdefault(key, []).append(
 729                    self.unique_error_message(model_class, unique_check)
 730                )
 731
 732        return errors
 733
 734    def unique_error_message(
 735        self, model_class: type[Model], unique_check: tuple[str, ...]
 736    ) -> ValidationError:
 737        meta = model_class._model_meta
 738
 739        params = {
 740            "model": self,
 741            "model_class": model_class,
 742            "model_name": model_class.model_options.model_name,
 743            "unique_check": unique_check,
 744        }
 745
 746        if len(unique_check) == 1:
 747            field = meta.get_field(unique_check[0])
 748            params["field_label"] = field.name
 749            return ValidationError(
 750                message=field.error_messages["unique"],
 751                code="unique",
 752                params=params,
 753            )
 754        else:
 755            field_names = [meta.get_field(f).name for f in unique_check]
 756
 757            # Put an "and" before the last one
 758            field_names[-1] = f"and {field_names[-1]}"
 759
 760            if len(field_names) > 2:
 761                # Comma join if more than 2
 762                params["field_label"] = ", ".join(field_names)
 763            else:
 764                # Just a space if there are only 2
 765                params["field_label"] = " ".join(field_names)
 766
 767            # Use the first field as the message format...
 768            message = meta.get_field(unique_check[0]).error_messages["unique"]
 769
 770            return ValidationError(
 771                message=message,
 772                code="unique",
 773                params=params,
 774            )
 775
 776    def get_constraints(self) -> list[tuple[type, list[Any]]]:
 777        constraints = [(self.__class__, list(self.model_options.constraints))]
 778        return constraints
 779
 780    def validate_constraints(self, exclude: set[str] | None = None) -> None:
 781        constraints = self.get_constraints()
 782
 783        errors = {}
 784        for model_class, model_constraints in constraints:
 785            for constraint in model_constraints:
 786                try:
 787                    constraint.validate(model_class, self, exclude=exclude)
 788                except ValidationError as e:
 789                    if (
 790                        getattr(e, "code", None) == "unique"
 791                        and len(constraint.fields) == 1
 792                    ):
 793                        errors.setdefault(constraint.fields[0], []).append(e)
 794                    else:
 795                        errors = e.update_error_dict(errors)
 796        if errors:
 797            raise ValidationError(errors)
 798
 799    def full_clean(
 800        self,
 801        *,
 802        exclude: set[str] | Iterable[str] | None = None,
 803        validate_unique: bool = True,
 804        validate_constraints: bool = True,
 805    ) -> None:
 806        """
 807        Call clean_fields(), clean(), validate_unique(), and
 808        validate_constraints() on the model. Raise a ValidationError for any
 809        errors that occur.
 810        """
 811        errors = {}
 812        if exclude is None:
 813            exclude = set()
 814        else:
 815            exclude = set(exclude)
 816
 817        try:
 818            self.clean_fields(exclude=exclude)
 819        except ValidationError as e:
 820            errors = e.update_error_dict(errors)
 821
 822        # Form.clean() is run even if other validation fails, so do the
 823        # same with Model.clean() for consistency.
 824        try:
 825            self.clean()
 826        except ValidationError as e:
 827            errors = e.update_error_dict(errors)
 828
 829        # Run unique checks, but only for fields that passed validation.
 830        if validate_unique:
 831            for name in errors:
 832                if name != NON_FIELD_ERRORS and name not in exclude:
 833                    exclude.add(name)
 834            try:
 835                self.validate_unique(exclude=exclude)
 836            except ValidationError as e:
 837                errors = e.update_error_dict(errors)
 838
 839        # Run constraints checks, but only for fields that passed validation.
 840        if validate_constraints:
 841            for name in errors:
 842                if name != NON_FIELD_ERRORS and name not in exclude:
 843                    exclude.add(name)
 844            try:
 845                self.validate_constraints(exclude=exclude)
 846            except ValidationError as e:
 847                errors = e.update_error_dict(errors)
 848
 849        if errors:
 850            raise ValidationError(errors)
 851
 852    def clean_fields(self, exclude: set[str] | None = None) -> None:
 853        """
 854        Clean all fields and raise a ValidationError containing a dict
 855        of all validation errors if any occur.
 856        """
 857        if exclude is None:
 858            exclude = set()
 859
 860        errors = {}
 861        for f in self._model_meta.fields:
 862            if f.name in exclude:
 863                continue
 864            # Skip validation for empty fields with required=False. The developer
 865            # is responsible for making sure they have a valid value.
 866            raw_value = getattr(self, f.attname)
 867            if not f.required and raw_value in f.empty_values:
 868                continue
 869            try:
 870                setattr(self, f.attname, f.clean(raw_value, self))
 871            except ValidationError as e:
 872                errors[f.name] = e.error_list
 873
 874        if errors:
 875            raise ValidationError(errors)
 876
 877    @classmethod
 878    def preflight(cls) -> list[PreflightResult]:
 879        errors = []
 880
 881        errors += [
 882            *cls._check_fields(),
 883            *cls._check_m2m_through_same_relationship(),
 884            *cls._check_long_column_names(),
 885        ]
 886        clash_errors = (
 887            *cls._check_id_field(),
 888            *cls._check_field_name_clashes(),
 889            *cls._check_model_name_db_lookup_clashes(),
 890            *cls._check_property_name_related_field_accessor_clashes(),
 891            *cls._check_single_primary_key(),
 892        )
 893        errors.extend(clash_errors)
 894        # If there are field name clashes, hide consequent column name
 895        # clashes.
 896        if not clash_errors:
 897            errors.extend(cls._check_column_name_clashes())
 898        errors += [
 899            *cls._check_indexes(),
 900            *cls._check_ordering(),
 901            *cls._check_constraints(),
 902            *cls._check_db_table_comment(),
 903        ]
 904
 905        return errors
 906
 907    @classmethod
 908    def _check_db_table_comment(cls) -> list[PreflightResult]:
 909        if not cls.model_options.db_table_comment:
 910            return []
 911        errors = []
 912        if not (
 913            db_connection.features.supports_comments
 914            or "supports_comments" in cls.model_options.required_db_features
 915        ):
 916            errors.append(
 917                PreflightResult(
 918                    fix=f"{db_connection.display_name} does not support comments on "
 919                    f"tables (db_table_comment).",
 920                    obj=cls,
 921                    id="models.db_table_comment_unsupported",
 922                    warning=True,
 923                )
 924            )
 925        return errors
 926
 927    @classmethod
 928    def _check_fields(cls) -> list[PreflightResult]:
 929        """Perform all field checks."""
 930        errors = []
 931        for field in cls._model_meta.local_fields:
 932            errors.extend(field.preflight(from_model=cls))
 933        for field in cls._model_meta.local_many_to_many:
 934            errors.extend(field.preflight(from_model=cls))
 935        return errors
 936
 937    @classmethod
 938    def _check_m2m_through_same_relationship(cls) -> list[PreflightResult]:
 939        """Check if no relationship model is used by more than one m2m field."""
 940
 941        errors = []
 942        seen_intermediary_signatures = []
 943
 944        fields = cls._model_meta.local_many_to_many
 945
 946        # Skip when the target model wasn't found.
 947        fields = (f for f in fields if isinstance(f.remote_field.model, ModelBase))
 948
 949        # Skip when the relationship model wasn't found.
 950        fields = (f for f in fields if isinstance(f.remote_field.through, ModelBase))
 951
 952        for f in fields:
 953            signature = (
 954                f.remote_field.model,
 955                cls,
 956                f.remote_field.through,
 957                f.remote_field.through_fields,
 958            )
 959            if signature in seen_intermediary_signatures:
 960                errors.append(
 961                    PreflightResult(
 962                        fix="The model has two identical many-to-many relations "
 963                        f"through the intermediate model '{f.remote_field.through.model_options.label}'.",
 964                        obj=cls,
 965                        id="models.duplicate_many_to_many_relations",
 966                    )
 967                )
 968            else:
 969                seen_intermediary_signatures.append(signature)
 970        return errors
 971
 972    @classmethod
 973    def _check_id_field(cls) -> list[PreflightResult]:
 974        """Disallow user-defined fields named ``id``."""
 975        if any(
 976            f
 977            for f in cls._model_meta.local_fields
 978            if f.name == "id" and not f.auto_created
 979        ):
 980            return [
 981                PreflightResult(
 982                    fix="'id' is a reserved word that cannot be used as a field name.",
 983                    obj=cls,
 984                    id="models.reserved_field_name_id",
 985                )
 986            ]
 987        return []
 988
 989    @classmethod
 990    def _check_field_name_clashes(cls) -> list[PreflightResult]:
 991        """Forbid field shadowing in multi-table inheritance."""
 992        errors = []
 993        used_fields = {}  # name or attname -> field
 994
 995        for f in cls._model_meta.local_fields:
 996            clash = used_fields.get(f.name) or used_fields.get(f.attname) or None
 997            # Note that we may detect clash between user-defined non-unique
 998            # field "id" and automatically added unique field "id", both
 999            # defined at the same model. This special case is considered in
1000            # _check_id_field and here we ignore it.
1001            id_conflict = (
1002                f.name == "id" and clash and clash.name == "id" and clash.model == cls
1003            )
1004            if clash and not id_conflict:
1005                errors.append(
1006                    PreflightResult(
1007                        fix=f"The field '{f.name}' clashes with the field '{clash.name}' "
1008                        f"from model '{clash.model.model_options}'.",
1009                        obj=f,
1010                        id="models.field_name_clash",
1011                    )
1012                )
1013            used_fields[f.name] = f
1014            used_fields[f.attname] = f
1015
1016        return errors
1017
1018    @classmethod
1019    def _check_column_name_clashes(cls) -> list[PreflightResult]:
1020        # Store a list of column names which have already been used by other fields.
1021        used_column_names = []
1022        errors = []
1023
1024        for f in cls._model_meta.local_fields:
1025            _, column_name = f.get_attname_column()
1026
1027            # Ensure the column name is not already in use.
1028            if column_name and column_name in used_column_names:
1029                errors.append(
1030                    PreflightResult(
1031                        fix=f"Field '{f.name}' has column name '{column_name}' that is used by "
1032                        "another field. Specify a 'db_column' for the field.",
1033                        obj=cls,
1034                        id="models.db_column_clash",
1035                    )
1036                )
1037            else:
1038                used_column_names.append(column_name)
1039
1040        return errors
1041
1042    @classmethod
1043    def _check_model_name_db_lookup_clashes(cls) -> list[PreflightResult]:
1044        errors = []
1045        model_name = cls.__name__
1046        if model_name.startswith("_") or model_name.endswith("_"):
1047            errors.append(
1048                PreflightResult(
1049                    fix=f"The model name '{model_name}' cannot start or end with an underscore "
1050                    "as it collides with the query lookup syntax.",
1051                    obj=cls,
1052                    id="models.model_name_underscore_bounds",
1053                )
1054            )
1055        elif LOOKUP_SEP in model_name:
1056            errors.append(
1057                PreflightResult(
1058                    fix=f"The model name '{model_name}' cannot contain double underscores as "
1059                    "it collides with the query lookup syntax.",
1060                    obj=cls,
1061                    id="models.model_name_double_underscore",
1062                )
1063            )
1064        return errors
1065
1066    @classmethod
1067    def _check_property_name_related_field_accessor_clashes(
1068        cls,
1069    ) -> list[PreflightResult]:
1070        errors = []
1071        property_names = cls._model_meta._property_names
1072        related_field_accessors = (
1073            f.get_attname()
1074            for f in cls._model_meta._get_fields(reverse=False)
1075            if f.is_relation and f.related_model is not None
1076        )
1077        for accessor in related_field_accessors:
1078            if accessor in property_names:
1079                errors.append(
1080                    PreflightResult(
1081                        fix=f"The property '{accessor}' clashes with a related field "
1082                        "accessor.",
1083                        obj=cls,
1084                        id="models.property_related_field_clash",
1085                    )
1086                )
1087        return errors
1088
1089    @classmethod
1090    def _check_single_primary_key(cls) -> list[PreflightResult]:
1091        errors = []
1092        if sum(1 for f in cls._model_meta.local_fields if f.primary_key) > 1:
1093            errors.append(
1094                PreflightResult(
1095                    fix="The model cannot have more than one field with "
1096                    "'primary_key=True'.",
1097                    obj=cls,
1098                    id="models.multiple_primary_keys",
1099                )
1100            )
1101        return errors
1102
1103    @classmethod
1104    def _check_indexes(cls) -> list[PreflightResult]:
1105        """Check fields, names, and conditions of indexes."""
1106        errors = []
1107        references = set()
1108        for index in cls.model_options.indexes:
1109            # Index name can't start with an underscore or a number, restricted
1110            # for cross-database compatibility with Oracle.
1111            if index.name[0] == "_" or index.name[0].isdigit():
1112                errors.append(
1113                    PreflightResult(
1114                        fix=f"The index name '{index.name}' cannot start with an underscore "
1115                        "or a number.",
1116                        obj=cls,
1117                        id="models.index_name_invalid_start",
1118                    ),
1119                )
1120            if len(index.name) > index.max_name_length:
1121                errors.append(
1122                    PreflightResult(
1123                        fix="The index name '%s' cannot be longer than %d "  # noqa: UP031
1124                        "characters." % (index.name, index.max_name_length),
1125                        obj=cls,
1126                        id="models.index_name_too_long",
1127                    ),
1128                )
1129            if index.contains_expressions:
1130                for expression in index.expressions:
1131                    references.update(
1132                        ref[0] for ref in cls._get_expr_references(expression)
1133                    )
1134        if not (
1135            db_connection.features.supports_partial_indexes
1136            or "supports_partial_indexes" in cls.model_options.required_db_features
1137        ) and any(index.condition is not None for index in cls.model_options.indexes):
1138            errors.append(
1139                PreflightResult(
1140                    fix=f"{db_connection.display_name} does not support indexes with conditions. "
1141                    "Conditions will be ignored. Silence this warning "
1142                    "if you don't care about it.",
1143                    warning=True,
1144                    obj=cls,
1145                    id="models.index_conditions_ignored",
1146                )
1147            )
1148        if not (
1149            db_connection.features.supports_covering_indexes
1150            or "supports_covering_indexes" in cls.model_options.required_db_features
1151        ) and any(index.include for index in cls.model_options.indexes):
1152            errors.append(
1153                PreflightResult(
1154                    fix=f"{db_connection.display_name} does not support indexes with non-key columns. "
1155                    "Non-key columns will be ignored. Silence this "
1156                    "warning if you don't care about it.",
1157                    warning=True,
1158                    obj=cls,
1159                    id="models.index_non_key_columns_ignored",
1160                )
1161            )
1162        if not (
1163            db_connection.features.supports_expression_indexes
1164            or "supports_expression_indexes" in cls.model_options.required_db_features
1165        ) and any(index.contains_expressions for index in cls.model_options.indexes):
1166            errors.append(
1167                PreflightResult(
1168                    fix=f"{db_connection.display_name} does not support indexes on expressions. "
1169                    "An index won't be created. Silence this warning "
1170                    "if you don't care about it.",
1171                    warning=True,
1172                    obj=cls,
1173                    id="models.index_on_foreign_key",
1174                )
1175            )
1176        fields = [
1177            field
1178            for index in cls.model_options.indexes
1179            for field, _ in index.fields_orders
1180        ]
1181        fields += [
1182            include for index in cls.model_options.indexes for include in index.include
1183        ]
1184        fields += references
1185        errors.extend(cls._check_local_fields(fields, "indexes"))
1186        return errors
1187
1188    @classmethod
1189    def _check_local_fields(
1190        cls, fields: Iterable[str], option: str
1191    ) -> list[PreflightResult]:
1192        from plain.models.fields.reverse_related import ManyToManyRel
1193
1194        # In order to avoid hitting the relation tree prematurely, we use our
1195        # own fields_map instead of using get_field()
1196        forward_fields_map = {}
1197        for field in cls._model_meta._get_fields(reverse=False):
1198            forward_fields_map[field.name] = field
1199            if hasattr(field, "attname"):
1200                forward_fields_map[field.attname] = field
1201
1202        errors = []
1203        for field_name in fields:
1204            try:
1205                field = forward_fields_map[field_name]
1206            except KeyError:
1207                errors.append(
1208                    PreflightResult(
1209                        fix=f"'{option}' refers to the nonexistent field '{field_name}'.",
1210                        obj=cls,
1211                        id="models.nonexistent_field_reference",
1212                    )
1213                )
1214            else:
1215                if isinstance(field.remote_field, ManyToManyRel):
1216                    errors.append(
1217                        PreflightResult(
1218                            fix=f"'{option}' refers to a ManyToManyField '{field_name}', but "
1219                            f"ManyToManyFields are not permitted in '{option}'.",
1220                            obj=cls,
1221                            id="models.m2m_field_in_meta_option",
1222                        )
1223                    )
1224                elif field not in cls._model_meta.local_fields:
1225                    errors.append(
1226                        PreflightResult(
1227                            fix=f"'{option}' refers to field '{field_name}' which is not local to model "
1228                            f"'{cls.model_options.object_name}'. This issue may be caused by multi-table inheritance.",
1229                            obj=cls,
1230                            id="models.non_local_field_reference",
1231                        )
1232                    )
1233        return errors
1234
1235    @classmethod
1236    def _check_ordering(cls) -> list[PreflightResult]:
1237        """
1238        Check "ordering" option -- is it a list of strings and do all fields
1239        exist?
1240        """
1241
1242        if not cls.model_options.ordering:
1243            return []
1244
1245        if not isinstance(cls.model_options.ordering, list | tuple):
1246            return [
1247                PreflightResult(
1248                    fix="'ordering' must be a tuple or list (even if you want to order by "
1249                    "only one field).",
1250                    obj=cls,
1251                    id="models.ordering_not_tuple_or_list",
1252                )
1253            ]
1254
1255        errors = []
1256        fields = cls.model_options.ordering
1257
1258        # Skip expressions and '?' fields.
1259        fields = (f for f in fields if isinstance(f, str) and f != "?")
1260
1261        # Convert "-field" to "field".
1262        fields = (f.removeprefix("-") for f in fields)
1263
1264        # Separate related fields and non-related fields.
1265        _fields = []
1266        related_fields = []
1267        for f in fields:
1268            if LOOKUP_SEP in f:
1269                related_fields.append(f)
1270            else:
1271                _fields.append(f)
1272        fields = _fields
1273
1274        # Check related fields.
1275        for field in related_fields:
1276            _cls = cls
1277            fld = None
1278            for part in field.split(LOOKUP_SEP):
1279                try:
1280                    fld = _cls._model_meta.get_field(part)
1281                    if fld.is_relation:
1282                        _cls = fld.path_infos[-1].to_meta.model
1283                    else:
1284                        _cls = None
1285                except (FieldDoesNotExist, AttributeError):
1286                    if fld is None or (
1287                        fld.get_transform(part) is None and fld.get_lookup(part) is None
1288                    ):
1289                        errors.append(
1290                            PreflightResult(
1291                                fix="'ordering' refers to the nonexistent field, "
1292                                f"related field, or lookup '{field}'.",
1293                                obj=cls,
1294                                id="models.ordering_nonexistent_field",
1295                            )
1296                        )
1297
1298        # Check for invalid or nonexistent fields in ordering.
1299        invalid_fields = []
1300
1301        # Any field name that is not present in field_names does not exist.
1302        # Also, ordering by m2m fields is not allowed.
1303        meta = cls._model_meta
1304        valid_fields = set(
1305            chain.from_iterable(
1306                (f.name, f.attname)
1307                if not (f.auto_created and not f.concrete)
1308                else (f.field.related_query_name(),)
1309                for f in chain(meta.fields, meta.related_objects)
1310            )
1311        )
1312
1313        invalid_fields.extend(set(fields) - valid_fields)
1314
1315        for invalid_field in invalid_fields:
1316            errors.append(
1317                PreflightResult(
1318                    fix="'ordering' refers to the nonexistent field, related "
1319                    f"field, or lookup '{invalid_field}'.",
1320                    obj=cls,
1321                    id="models.ordering_nonexistent_field",
1322                )
1323            )
1324        return errors
1325
1326    @classmethod
1327    def _check_long_column_names(cls) -> list[PreflightResult]:
1328        """
1329        Check that any auto-generated column names are shorter than the limits
1330        for each database in which the model will be created.
1331        """
1332        errors = []
1333        allowed_len = None
1334
1335        max_name_length = db_connection.ops.max_name_length()
1336        if max_name_length is not None and not db_connection.features.truncates_names:
1337            allowed_len = max_name_length
1338
1339        if allowed_len is None:
1340            return errors
1341
1342        for f in cls._model_meta.local_fields:
1343            _, column_name = f.get_attname_column()
1344
1345            # Check if auto-generated name for the field is too long
1346            # for the database.
1347            if (
1348                f.db_column is None
1349                and column_name is not None
1350                and len(column_name) > allowed_len
1351            ):
1352                errors.append(
1353                    PreflightResult(
1354                        fix=f'Autogenerated column name too long for field "{column_name}". '
1355                        f'Maximum length is "{allowed_len}" for the database. '
1356                        "Set the column name manually using 'db_column'.",
1357                        obj=cls,
1358                        id="models.autogenerated_column_name_too_long",
1359                    )
1360                )
1361
1362        for f in cls._model_meta.local_many_to_many:
1363            # Skip nonexistent models.
1364            if isinstance(f.remote_field.through, str):
1365                continue
1366
1367            # Check if auto-generated name for the M2M field is too long
1368            # for the database.
1369            for m2m in f.remote_field.through._model_meta.local_fields:
1370                _, rel_name = m2m.get_attname_column()
1371                if (
1372                    m2m.db_column is None
1373                    and rel_name is not None
1374                    and len(rel_name) > allowed_len
1375                ):
1376                    errors.append(
1377                        PreflightResult(
1378                            fix="Autogenerated column name too long for M2M field "
1379                            f'"{rel_name}". Maximum length is "{allowed_len}" for the database. '
1380                            "Use 'through' to create a separate model for "
1381                            "M2M and then set column_name using 'db_column'.",
1382                            obj=cls,
1383                            id="models.m2m_column_name_too_long",
1384                        )
1385                    )
1386
1387        return errors
1388
1389    @classmethod
1390    def _get_expr_references(cls, expr: Any) -> Iterator[tuple[str, ...]]:
1391        if isinstance(expr, Q):
1392            for child in expr.children:
1393                if isinstance(child, tuple):
1394                    lookup, value = child
1395                    yield tuple(lookup.split(LOOKUP_SEP))
1396                    yield from cls._get_expr_references(value)
1397                else:
1398                    yield from cls._get_expr_references(child)
1399        elif isinstance(expr, F):
1400            yield tuple(expr.name.split(LOOKUP_SEP))
1401        elif hasattr(expr, "get_source_expressions"):
1402            for src_expr in expr.get_source_expressions():
1403                yield from cls._get_expr_references(src_expr)
1404
1405    @classmethod
1406    def _check_constraints(cls) -> list[PreflightResult]:
1407        errors = []
1408        if not (
1409            db_connection.features.supports_table_check_constraints
1410            or "supports_table_check_constraints"
1411            in cls.model_options.required_db_features
1412        ) and any(
1413            isinstance(constraint, CheckConstraint)
1414            for constraint in cls.model_options.constraints
1415        ):
1416            errors.append(
1417                PreflightResult(
1418                    fix=f"{db_connection.display_name} does not support check constraints. "
1419                    "A constraint won't be created. Silence this "
1420                    "warning if you don't care about it.",
1421                    obj=cls,
1422                    id="models.constraint_on_non_db_field",
1423                    warning=True,
1424                )
1425            )
1426
1427        if not (
1428            db_connection.features.supports_partial_indexes
1429            or "supports_partial_indexes" in cls.model_options.required_db_features
1430        ) and any(
1431            isinstance(constraint, UniqueConstraint)
1432            and constraint.condition is not None
1433            for constraint in cls.model_options.constraints
1434        ):
1435            errors.append(
1436                PreflightResult(
1437                    fix=f"{db_connection.display_name} does not support unique constraints with "
1438                    "conditions. A constraint won't be created. Silence this "
1439                    "warning if you don't care about it.",
1440                    obj=cls,
1441                    id="models.constraint_on_virtual_field",
1442                    warning=True,
1443                )
1444            )
1445
1446        if not (
1447            db_connection.features.supports_deferrable_unique_constraints
1448            or "supports_deferrable_unique_constraints"
1449            in cls.model_options.required_db_features
1450        ) and any(
1451            isinstance(constraint, UniqueConstraint)
1452            and constraint.deferrable is not None
1453            for constraint in cls.model_options.constraints
1454        ):
1455            errors.append(
1456                PreflightResult(
1457                    fix=f"{db_connection.display_name} does not support deferrable unique constraints. "
1458                    "A constraint won't be created. Silence this "
1459                    "warning if you don't care about it.",
1460                    obj=cls,
1461                    id="models.constraint_on_foreign_key",
1462                    warning=True,
1463                )
1464            )
1465
1466        if not (
1467            db_connection.features.supports_covering_indexes
1468            or "supports_covering_indexes" in cls.model_options.required_db_features
1469        ) and any(
1470            isinstance(constraint, UniqueConstraint) and constraint.include
1471            for constraint in cls.model_options.constraints
1472        ):
1473            errors.append(
1474                PreflightResult(
1475                    fix=f"{db_connection.display_name} does not support unique constraints with non-key "
1476                    "columns. A constraint won't be created. Silence this "
1477                    "warning if you don't care about it.",
1478                    obj=cls,
1479                    id="models.constraint_on_m2m_field",
1480                    warning=True,
1481                )
1482            )
1483
1484        if not (
1485            db_connection.features.supports_expression_indexes
1486            or "supports_expression_indexes" in cls.model_options.required_db_features
1487        ) and any(
1488            isinstance(constraint, UniqueConstraint) and constraint.contains_expressions
1489            for constraint in cls.model_options.constraints
1490        ):
1491            errors.append(
1492                PreflightResult(
1493                    fix=f"{db_connection.display_name} does not support unique constraints on "
1494                    "expressions. A constraint won't be created. Silence this "
1495                    "warning if you don't care about it.",
1496                    obj=cls,
1497                    id="models.constraint_on_self_referencing_fk",
1498                    warning=True,
1499                )
1500            )
1501        fields = set(
1502            chain.from_iterable(
1503                (*constraint.fields, *constraint.include)
1504                for constraint in cls.model_options.constraints
1505                if isinstance(constraint, UniqueConstraint)
1506            )
1507        )
1508        references = set()
1509        for constraint in cls.model_options.constraints:
1510            if isinstance(constraint, UniqueConstraint):
1511                if (
1512                    db_connection.features.supports_partial_indexes
1513                    or "supports_partial_indexes"
1514                    not in cls.model_options.required_db_features
1515                ) and isinstance(constraint.condition, Q):
1516                    references.update(cls._get_expr_references(constraint.condition))
1517                if (
1518                    db_connection.features.supports_expression_indexes
1519                    or "supports_expression_indexes"
1520                    not in cls.model_options.required_db_features
1521                ) and constraint.contains_expressions:
1522                    for expression in constraint.expressions:
1523                        references.update(cls._get_expr_references(expression))
1524            elif isinstance(constraint, CheckConstraint):
1525                if (
1526                    db_connection.features.supports_table_check_constraints
1527                    or "supports_table_check_constraints"
1528                    not in cls.model_options.required_db_features
1529                ):
1530                    if isinstance(constraint.check, Q):
1531                        references.update(cls._get_expr_references(constraint.check))
1532                    if any(
1533                        isinstance(expr, RawSQL) for expr in constraint.check.flatten()
1534                    ):
1535                        errors.append(
1536                            PreflightResult(
1537                                fix=f"Check constraint {constraint.name!r} contains "
1538                                f"RawSQL() expression and won't be validated "
1539                                f"during the model full_clean(). "
1540                                "Silence this warning if you don't care about it.",
1541                                warning=True,
1542                                obj=cls,
1543                                id="models.constraint_name_collision_autogenerated",
1544                            ),
1545                        )
1546        for field_name, *lookups in references:
1547            fields.add(field_name)
1548            if not lookups:
1549                # If it has no lookups it cannot result in a JOIN.
1550                continue
1551            try:
1552                field = cls._model_meta.get_field(field_name)
1553                if not field.is_relation or field.many_to_many or field.one_to_many:
1554                    continue
1555            except FieldDoesNotExist:
1556                continue
1557            # JOIN must happen at the first lookup.
1558            first_lookup = lookups[0]
1559            if (
1560                hasattr(field, "get_transform")
1561                and hasattr(field, "get_lookup")
1562                and field.get_transform(first_lookup) is None
1563                and field.get_lookup(first_lookup) is None
1564            ):
1565                errors.append(
1566                    PreflightResult(
1567                        fix=f"'constraints' refers to the joined field '{LOOKUP_SEP.join([field_name] + lookups)}'.",
1568                        obj=cls,
1569                        id="models.constraint_refers_to_joined_field",
1570                    )
1571                )
1572        errors.extend(cls._check_local_fields(fields, "constraints"))
1573        return errors
1574
1575
1576########
1577# MISC #
1578########
1579
1580
1581def model_unpickle(model_id: tuple[str, str] | type[Model]) -> Model:
1582    """Used to unpickle Model subclasses with deferred fields."""
1583    if isinstance(model_id, tuple):
1584        model = models_registry.get_model(*model_id)
1585    else:
1586        # Backwards compat - the model was cached directly in earlier versions.
1587        model = model_id
1588    return model.__new__(model)
1589
1590
1591model_unpickle.__safe_for_unpickle__ = True  # type: ignore[attr-defined]