Plain is headed towards 1.0! Subscribe for development updates →

   1import copy
   2import inspect
   3import warnings
   4from itertools import chain
   5
   6import plain.runtime
   7from plain import preflight
   8from plain.exceptions import (
   9    NON_FIELD_ERRORS,
  10    FieldDoesNotExist,
  11    MultipleObjectsReturned,
  12    ObjectDoesNotExist,
  13    ValidationError,
  14)
  15from plain.models import models_registry, transaction
  16from plain.models.constants import LOOKUP_SEP
  17from plain.models.constraints import CheckConstraint, UniqueConstraint
  18from plain.models.db import (
  19    PLAIN_VERSION_PICKLE_KEY,
  20    DatabaseError,
  21    db_connection,
  22)
  23from plain.models.deletion import Collector
  24from plain.models.expressions import RawSQL, Value
  25from plain.models.fields import NOT_PROVIDED
  26from plain.models.fields.reverse_related import ForeignObjectRel
  27from plain.models.options import Options
  28from plain.models.query import F, Q, QuerySet
  29from plain.packages import packages_registry
  30from plain.utils.encoding import force_str
  31from plain.utils.hashable import make_hashable
  32
  33
  34class Deferred:
  35    def __repr__(self):
  36        return "<Deferred field>"
  37
  38    def __str__(self):
  39        return "<Deferred field>"
  40
  41
  42DEFERRED = Deferred()
  43
  44
  45def _has_contribute_to_class(value):
  46    # Only call contribute_to_class() if it's bound.
  47    return not inspect.isclass(value) and hasattr(value, "contribute_to_class")
  48
  49
  50class ModelBase(type):
  51    """Metaclass for all models."""
  52
  53    def __new__(cls, name, bases, attrs, **kwargs):
  54        # Don't do any of this for the root models.Model class.
  55        if not bases:
  56            return super().__new__(cls, name, bases, attrs)
  57
  58        for base in bases:
  59            # Models are required to directly inherit from model.Model, not a subclass of it.
  60            if issubclass(base, Model) and base is not Model:
  61                raise TypeError(
  62                    f"A model can't extend another model: {name} extends {base}"
  63                )
  64            # Meta has to be defined on the model itself.
  65            if hasattr(base, "Meta"):
  66                raise TypeError(
  67                    "Meta can only be defined on a model itself, not a parent class: "
  68                    f"{name} extends {base}"
  69                )
  70
  71        new_class = super().__new__(cls, name, bases, attrs, **kwargs)
  72
  73        new_class._setup_meta()
  74        new_class._add_exceptions()
  75
  76        # Now go back over all the attrs on this class see if they have a contribute_to_class() method.
  77        # Attributes with contribute_to_class are fields and meta options.
  78        for attr_name, attr_value in inspect.getmembers(new_class):
  79            if attr_name.startswith("_"):
  80                continue
  81
  82            if _has_contribute_to_class(attr_value):
  83                if attr_name not in attrs:
  84                    # If the field came from an inherited class/mixin,
  85                    # we need to make a copy of it to avoid altering the
  86                    # original class and other classes that inherit from it.
  87                    field = copy.deepcopy(attr_value)
  88                else:
  89                    field = attr_value
  90                new_class.add_to_class(attr_name, field)
  91
  92        new_class._meta.concrete_model = new_class
  93
  94        # Copy indexes so that index names are unique when models extend another class.
  95        new_class._meta.indexes = [
  96            copy.deepcopy(idx) for idx in new_class._meta.indexes
  97        ]
  98
  99        new_class._prepare()
 100
 101        return new_class
 102
 103    def add_to_class(cls, name, value):
 104        if _has_contribute_to_class(value):
 105            value.contribute_to_class(cls, name)
 106        else:
 107            setattr(cls, name, value)
 108
 109    def _setup_meta(cls):
 110        name = cls.__name__
 111        module = cls.__module__
 112
 113        # The model's Meta class, if it has one.
 114        meta = getattr(cls, "Meta", None)
 115
 116        # Look for an application configuration to attach the model to.
 117        package_config = packages_registry.get_containing_package_config(module)
 118
 119        package_label = getattr(meta, "package_label", None)
 120        if package_label is None:
 121            if package_config is None:
 122                raise RuntimeError(
 123                    f"Model class {module}.{name} doesn't declare an explicit "
 124                    "package_label and isn't in an application in "
 125                    "INSTALLED_PACKAGES."
 126                )
 127            else:
 128                package_label = package_config.package_label
 129
 130        cls.add_to_class("_meta", Options(meta, package_label))
 131
 132    def _add_exceptions(cls):
 133        cls.DoesNotExist = type(
 134            "DoesNotExist",
 135            (ObjectDoesNotExist,),
 136            {
 137                "__module__": cls.__module__,
 138                "__qualname__": f"{cls.__qualname__}.DoesNotExist",
 139            },
 140        )
 141
 142        cls.MultipleObjectsReturned = type(
 143            "MultipleObjectsReturned",
 144            (MultipleObjectsReturned,),
 145            {
 146                "__module__": cls.__module__,
 147                "__qualname__": f"{cls.__qualname__}.MultipleObjectsReturned",
 148            },
 149        )
 150
 151    def _prepare(cls):
 152        """Create some methods once self._meta has been populated."""
 153        opts = cls._meta
 154        opts._prepare(cls)
 155
 156        # Give the class a docstring -- its definition.
 157        if cls.__doc__ is None:
 158            cls.__doc__ = "{}({})".format(
 159                cls.__name__,
 160                ", ".join(f.name for f in opts.fields),
 161            )
 162
 163        # Set the name of _meta.indexes. This can't be done in
 164        # Options.contribute_to_class() because fields haven't been added to
 165        # the model at that point.
 166        for index in cls._meta.indexes:
 167            if not index.name:
 168                index.set_name_with_model(cls)
 169
 170    @property
 171    def query(cls) -> QuerySet:
 172        """Create a new QuerySet for this model."""
 173        return cls._meta.queryset
 174
 175
 176class ModelStateFieldsCacheDescriptor:
 177    def __get__(self, instance, cls=None):
 178        if instance is None:
 179            return self
 180        res = instance.fields_cache = {}
 181        return res
 182
 183
 184class ModelState:
 185    """Store model instance state."""
 186
 187    # If true, uniqueness validation checks will consider this a new, unsaved
 188    # object. Necessary for correct validation of new instances of objects with
 189    # explicit (non-auto) PKs. This impacts validation only; it has no effect
 190    # on the actual save.
 191    adding = True
 192    fields_cache = ModelStateFieldsCacheDescriptor()
 193
 194
 195class Model(metaclass=ModelBase):
 196    DoesNotExist: type[ObjectDoesNotExist]
 197    MultipleObjectsReturned: type[MultipleObjectsReturned]
 198
 199    def __init__(self, *args, **kwargs):
 200        # Alias some things as locals to avoid repeat global lookups
 201        cls = self.__class__
 202        opts = self._meta
 203        _setattr = setattr
 204        _DEFERRED = DEFERRED
 205
 206        # Set up the storage for instance state
 207        self._state = ModelState()
 208
 209        # There is a rather weird disparity here; if kwargs, it's set, then args
 210        # overrides it. It should be one or the other; don't duplicate the work
 211        # The reason for the kwargs check is that standard iterator passes in by
 212        # args, and instantiation for iteration is 33% faster.
 213        if len(args) > len(opts.concrete_fields):
 214            # Daft, but matches old exception sans the err msg.
 215            raise IndexError("Number of args exceeds number of fields")
 216
 217        if not kwargs:
 218            fields_iter = iter(opts.concrete_fields)
 219            # The ordering of the zip calls matter - zip throws StopIteration
 220            # when an iter throws it. So if the first iter throws it, the second
 221            # is *not* consumed. We rely on this, so don't change the order
 222            # without changing the logic.
 223            for val, field in zip(args, fields_iter):
 224                if val is _DEFERRED:
 225                    continue
 226                _setattr(self, field.attname, val)
 227        else:
 228            # Slower, kwargs-ready version.
 229            fields_iter = iter(opts.fields)
 230            for val, field in zip(args, fields_iter):
 231                if val is _DEFERRED:
 232                    continue
 233                _setattr(self, field.attname, val)
 234                if kwargs.pop(field.name, NOT_PROVIDED) is not NOT_PROVIDED:
 235                    raise TypeError(
 236                        f"{cls.__qualname__}() got both positional and "
 237                        f"keyword arguments for field '{field.name}'."
 238                    )
 239
 240        # Now we're left with the unprocessed fields that *must* come from
 241        # keywords, or default.
 242
 243        for field in fields_iter:
 244            is_related_object = False
 245            # Virtual field
 246            if field.attname not in kwargs and field.column is None:
 247                continue
 248            if kwargs:
 249                if isinstance(field.remote_field, ForeignObjectRel):
 250                    try:
 251                        # Assume object instance was passed in.
 252                        rel_obj = kwargs.pop(field.name)
 253                        is_related_object = True
 254                    except KeyError:
 255                        try:
 256                            # Object instance wasn't passed in -- must be an ID.
 257                            val = kwargs.pop(field.attname)
 258                        except KeyError:
 259                            val = field.get_default()
 260                else:
 261                    try:
 262                        val = kwargs.pop(field.attname)
 263                    except KeyError:
 264                        # This is done with an exception rather than the
 265                        # default argument on pop because we don't want
 266                        # get_default() to be evaluated, and then not used.
 267                        # Refs #12057.
 268                        val = field.get_default()
 269            else:
 270                val = field.get_default()
 271
 272            if is_related_object:
 273                # If we are passed a related instance, set it using the
 274                # field.name instead of field.attname (e.g. "user" instead of
 275                # "user_id") so that the object gets properly cached (and type
 276                # checked) by the RelatedObjectDescriptor.
 277                if rel_obj is not _DEFERRED:
 278                    _setattr(self, field.name, rel_obj)
 279            else:
 280                if val is not _DEFERRED:
 281                    _setattr(self, field.attname, val)
 282
 283        if kwargs:
 284            property_names = opts._property_names
 285            unexpected = ()
 286            for prop, value in kwargs.items():
 287                # Any remaining kwargs must correspond to properties or virtual
 288                # fields.
 289                if prop in property_names:
 290                    if value is not _DEFERRED:
 291                        _setattr(self, prop, value)
 292                else:
 293                    try:
 294                        opts.get_field(prop)
 295                    except FieldDoesNotExist:
 296                        unexpected += (prop,)
 297                    else:
 298                        if value is not _DEFERRED:
 299                            _setattr(self, prop, value)
 300            if unexpected:
 301                unexpected_names = ", ".join(repr(n) for n in unexpected)
 302                raise TypeError(
 303                    f"{cls.__name__}() got unexpected keyword arguments: "
 304                    f"{unexpected_names}"
 305                )
 306        super().__init__()
 307
 308    @classmethod
 309    def from_db(cls, field_names, values):
 310        if len(values) != len(cls._meta.concrete_fields):
 311            values_iter = iter(values)
 312            values = [
 313                next(values_iter) if f.attname in field_names else DEFERRED
 314                for f in cls._meta.concrete_fields
 315            ]
 316        new = cls(*values)
 317        new._state.adding = False
 318        return new
 319
 320    def __repr__(self):
 321        return f"<{self.__class__.__name__}: {self}>"
 322
 323    def __str__(self):
 324        return f"{self.__class__.__name__} object ({self.id})"
 325
 326    def __eq__(self, other):
 327        if not isinstance(other, Model):
 328            return NotImplemented
 329        if self._meta.concrete_model != other._meta.concrete_model:
 330            return False
 331        my_id = self.id
 332        if my_id is None:
 333            return self is other
 334        return my_id == other.id
 335
 336    def __hash__(self):
 337        if self.id is None:
 338            raise TypeError("Model instances without primary key value are unhashable")
 339        return hash(self.id)
 340
 341    def __reduce__(self):
 342        data = self.__getstate__()
 343        data[PLAIN_VERSION_PICKLE_KEY] = plain.runtime.__version__
 344        class_id = self._meta.package_label, self._meta.object_name
 345        return model_unpickle, (class_id,), data
 346
 347    def __getstate__(self):
 348        """Hook to allow choosing the attributes to pickle."""
 349        state = self.__dict__.copy()
 350        state["_state"] = copy.copy(state["_state"])
 351        state["_state"].fields_cache = state["_state"].fields_cache.copy()
 352        # memoryview cannot be pickled, so cast it to bytes and store
 353        # separately.
 354        _memoryview_attrs = []
 355        for attr, value in state.items():
 356            if isinstance(value, memoryview):
 357                _memoryview_attrs.append((attr, bytes(value)))
 358        if _memoryview_attrs:
 359            state["_memoryview_attrs"] = _memoryview_attrs
 360            for attr, value in _memoryview_attrs:
 361                state.pop(attr)
 362        return state
 363
 364    def __setstate__(self, state):
 365        pickled_version = state.get(PLAIN_VERSION_PICKLE_KEY)
 366        if pickled_version:
 367            if pickled_version != plain.runtime.__version__:
 368                warnings.warn(
 369                    f"Pickled model instance's Plain version {pickled_version} does not "
 370                    f"match the current version {plain.runtime.__version__}.",
 371                    RuntimeWarning,
 372                    stacklevel=2,
 373                )
 374        else:
 375            warnings.warn(
 376                "Pickled model instance's Plain version is not specified.",
 377                RuntimeWarning,
 378                stacklevel=2,
 379            )
 380        if "_memoryview_attrs" in state:
 381            for attr, value in state.pop("_memoryview_attrs"):
 382                state[attr] = memoryview(value)
 383        self.__dict__.update(state)
 384
 385    def get_deferred_fields(self):
 386        """
 387        Return a set containing names of deferred fields for this instance.
 388        """
 389        return {
 390            f.attname
 391            for f in self._meta.concrete_fields
 392            if f.attname not in self.__dict__
 393        }
 394
 395    def refresh_from_db(self, fields=None):
 396        """
 397        Reload field values from the database.
 398
 399        By default, the reloading happens from the database this instance was
 400        loaded from, or by the read router if this instance wasn't loaded from
 401        any database. The using parameter will override the default.
 402
 403        Fields can be used to specify which fields to reload. The fields
 404        should be an iterable of field attnames. If fields is None, then
 405        all non-deferred fields are reloaded.
 406
 407        When accessing deferred fields of an instance, the deferred loading
 408        of the field will call this method.
 409        """
 410        if fields is None:
 411            self._prefetched_objects_cache = {}
 412        else:
 413            prefetched_objects_cache = getattr(self, "_prefetched_objects_cache", ())
 414            for field in fields:
 415                if field in prefetched_objects_cache:
 416                    del prefetched_objects_cache[field]
 417                    fields.remove(field)
 418            if not fields:
 419                return
 420            if any(LOOKUP_SEP in f for f in fields):
 421                raise ValueError(
 422                    f'Found "{LOOKUP_SEP}" in fields argument. Relations and transforms '
 423                    "are not allowed in fields."
 424                )
 425
 426        db_instance_qs = self.__class__._meta.base_queryset.filter(id=self.id)
 427
 428        # Use provided fields, if not set then reload all non-deferred fields.
 429        deferred_fields = self.get_deferred_fields()
 430        if fields is not None:
 431            fields = list(fields)
 432            db_instance_qs = db_instance_qs.only(*fields)
 433        elif deferred_fields:
 434            fields = [
 435                f.attname
 436                for f in self._meta.concrete_fields
 437                if f.attname not in deferred_fields
 438            ]
 439            db_instance_qs = db_instance_qs.only(*fields)
 440
 441        db_instance = db_instance_qs.get()
 442        non_loaded_fields = db_instance.get_deferred_fields()
 443        for field in self._meta.concrete_fields:
 444            if field.attname in non_loaded_fields:
 445                # This field wasn't refreshed - skip ahead.
 446                continue
 447            setattr(self, field.attname, getattr(db_instance, field.attname))
 448            # Clear cached foreign keys.
 449            if field.is_relation and field.is_cached(self):
 450                field.delete_cached_value(self)
 451
 452        # Clear cached relations.
 453        for field in self._meta.related_objects:
 454            if field.is_cached(self):
 455                field.delete_cached_value(self)
 456
 457    def serializable_value(self, field_name):
 458        """
 459        Return the value of the field name for this instance. If the field is
 460        a foreign key, return the id value instead of the object. If there's
 461        no Field object with this name on the model, return the model
 462        attribute's value.
 463
 464        Used to serialize a field's value (in the serializer, or form output,
 465        for example). Normally, you would just access the attribute directly
 466        and not use this method.
 467        """
 468        try:
 469            field = self._meta.get_field(field_name)
 470        except FieldDoesNotExist:
 471            return getattr(self, field_name)
 472        return getattr(self, field.attname)
 473
 474    def save(
 475        self,
 476        *,
 477        clean_and_validate=True,
 478        force_insert=False,
 479        force_update=False,
 480        update_fields=None,
 481    ):
 482        """
 483        Save the current instance. Override this in a subclass if you want to
 484        control the saving process.
 485
 486        The 'force_insert' and 'force_update' parameters can be used to insist
 487        that the "save" must be an SQL insert or update (or equivalent for
 488        non-SQL backends), respectively. Normally, they should not be set.
 489        """
 490        self._prepare_related_fields_for_save(operation_name="save")
 491
 492        if force_insert and (force_update or update_fields):
 493            raise ValueError("Cannot force both insert and updating in model saving.")
 494
 495        deferred_fields = self.get_deferred_fields()
 496        if update_fields is not None:
 497            # If update_fields is empty, skip the save. We do also check for
 498            # no-op saves later on for inheritance cases. This bailout is
 499            # still needed for skipping signal sending.
 500            if not update_fields:
 501                return
 502
 503            update_fields = frozenset(update_fields)
 504            field_names = self._meta._non_pk_concrete_field_names
 505            non_model_fields = update_fields.difference(field_names)
 506
 507            if non_model_fields:
 508                raise ValueError(
 509                    "The following fields do not exist in this model, are m2m "
 510                    "fields, or are non-concrete fields: {}".format(
 511                        ", ".join(non_model_fields)
 512                    )
 513                )
 514
 515        # If this model is deferred, automatically do an "update_fields" save
 516        # on the loaded fields.
 517        elif not force_insert and deferred_fields:
 518            field_names = set()
 519            for field in self._meta.concrete_fields:
 520                if not field.primary_key and not hasattr(field, "through"):
 521                    field_names.add(field.attname)
 522            loaded_fields = field_names.difference(deferred_fields)
 523            if loaded_fields:
 524                update_fields = frozenset(loaded_fields)
 525
 526        if clean_and_validate:
 527            self.full_clean(exclude=deferred_fields)
 528
 529        self.save_base(
 530            force_insert=force_insert,
 531            force_update=force_update,
 532            update_fields=update_fields,
 533        )
 534
 535    def save_base(
 536        self,
 537        *,
 538        raw=False,
 539        force_insert=False,
 540        force_update=False,
 541        update_fields=None,
 542    ):
 543        """
 544        Handle the parts of saving which should be done only once per save,
 545        yet need to be done in raw saves, too. This includes some sanity
 546        checks and signal sending.
 547
 548        The 'raw' argument is telling save_base not to save any parent
 549        models and not to do any changes to the values before save. This
 550        is used by fixture loading.
 551        """
 552        assert not (force_insert and (force_update or update_fields))
 553        assert update_fields is None or update_fields
 554        cls = self.__class__
 555
 556        with transaction.mark_for_rollback_on_error():
 557            self._save_table(
 558                raw,
 559                cls,
 560                force_insert,
 561                force_update,
 562                update_fields,
 563            )
 564        # Once saved, this is no longer a to-be-added instance.
 565        self._state.adding = False
 566
 567    def _save_table(
 568        self,
 569        raw=False,
 570        cls=None,
 571        force_insert=False,
 572        force_update=False,
 573        update_fields=None,
 574    ):
 575        """
 576        Do the heavy-lifting involved in saving. Update or insert the data
 577        for a single table.
 578        """
 579        meta = cls._meta
 580        non_pks = [f for f in meta.local_concrete_fields if not f.primary_key]
 581
 582        if update_fields:
 583            non_pks = [
 584                f
 585                for f in non_pks
 586                if f.name in update_fields or f.attname in update_fields
 587            ]
 588
 589        id_val = self.id
 590        if id_val is None:
 591            id_field = meta.get_field("id")
 592            id_val = id_field.get_id_value_on_save(self)
 593            setattr(self, id_field.attname, id_val)
 594        id_set = id_val is not None
 595        if not id_set and (force_update or update_fields):
 596            raise ValueError("Cannot force an update in save() with no primary key.")
 597        updated = False
 598        # Skip an UPDATE when adding an instance and primary key has a default.
 599        if (
 600            not raw
 601            and not force_insert
 602            and self._state.adding
 603            and meta.get_field("id").default
 604            and meta.get_field("id").default is not NOT_PROVIDED
 605        ):
 606            force_insert = True
 607        # If possible, try an UPDATE. If that doesn't update anything, do an INSERT.
 608        if id_set and not force_insert:
 609            base_qs = meta.base_queryset
 610            values = [
 611                (
 612                    f,
 613                    None,
 614                    (getattr(self, f.attname) if raw else f.pre_save(self, False)),
 615                )
 616                for f in non_pks
 617            ]
 618            forced_update = update_fields or force_update
 619            updated = self._do_update(
 620                base_qs, id_val, values, update_fields, forced_update
 621            )
 622            if force_update and not updated:
 623                raise DatabaseError("Forced update did not affect any rows.")
 624            if update_fields and not updated:
 625                raise DatabaseError("Save with update_fields did not affect any rows.")
 626        if not updated:
 627            fields = meta.local_concrete_fields
 628            if not id_set:
 629                id_field = meta.get_field("id")
 630                fields = [f for f in fields if f is not id_field]
 631
 632            returning_fields = meta.db_returning_fields
 633            results = self._do_insert(meta.base_queryset, fields, returning_fields, raw)
 634            if results:
 635                for value, field in zip(results[0], returning_fields):
 636                    setattr(self, field.attname, value)
 637        return updated
 638
 639    def _do_update(self, base_qs, id_val, values, update_fields, forced_update):
 640        """
 641        Try to update the model. Return True if the model was updated (if an
 642        update query was done and a matching row was found in the DB).
 643        """
 644        filtered = base_qs.filter(id=id_val)
 645        if not values:
 646            # We can end up here when saving a model in inheritance chain where
 647            # update_fields doesn't target any field in current model. In that
 648            # case we just say the update succeeded. Another case ending up here
 649            # is a model with just PK - in that case check that the PK still
 650            # exists.
 651            return update_fields is not None or filtered.exists()
 652        return filtered._update(values) > 0
 653
 654    def _do_insert(self, manager, fields, returning_fields, raw):
 655        """
 656        Do an INSERT. If returning_fields is defined then this method should
 657        return the newly created data for the model.
 658        """
 659        return manager._insert(
 660            [self],
 661            fields=fields,
 662            returning_fields=returning_fields,
 663            raw=raw,
 664        )
 665
 666    def _prepare_related_fields_for_save(self, operation_name, fields=None):
 667        # Ensure that a model instance without a PK hasn't been assigned to
 668        # a ForeignKey on this model. If the field is nullable, allowing the save would result in silent data loss.
 669        for field in self._meta.concrete_fields:
 670            if fields and field not in fields:
 671                continue
 672            # If the related field isn't cached, then an instance hasn't been
 673            # assigned and there's no need to worry about this check.
 674            if field.is_relation and field.is_cached(self):
 675                obj = getattr(self, field.name, None)
 676                if not obj:
 677                    continue
 678                # A pk may have been assigned manually to a model instance not
 679                # saved to the database (or auto-generated in a case like
 680                # UUIDField), but we allow the save to proceed and rely on the
 681                # database to raise an IntegrityError if applicable. If
 682                # constraints aren't supported by the database, there's the
 683                # unavoidable risk of data corruption.
 684                if obj.id is None:
 685                    # Remove the object from a related instance cache.
 686                    if not field.remote_field.multiple:
 687                        field.remote_field.delete_cached_value(obj)
 688                    raise ValueError(
 689                        f"{operation_name}() prohibited to prevent data loss due to unsaved "
 690                        f"related object '{field.name}'."
 691                    )
 692                elif getattr(self, field.attname) in field.empty_values:
 693                    # Set related object if it has been saved after an
 694                    # assignment.
 695                    setattr(self, field.name, obj)
 696                # If the relationship's pk/to_field was changed, clear the
 697                # cached relationship.
 698                if getattr(obj, field.target_field.attname) != getattr(
 699                    self, field.attname
 700                ):
 701                    field.delete_cached_value(self)
 702
 703    def delete(self):
 704        if self.id is None:
 705            raise ValueError(
 706                f"{self._meta.object_name} object can't be deleted because its id attribute is set "
 707                "to None."
 708            )
 709        collector = Collector(origin=self)
 710        collector.collect([self])
 711        return collector.delete()
 712
 713    def _get_FIELD_display(self, field):
 714        value = getattr(self, field.attname)
 715        choices_dict = dict(make_hashable(field.flatchoices))
 716        # force_str() to coerce lazy strings.
 717        return force_str(
 718            choices_dict.get(make_hashable(value), value), strings_only=True
 719        )
 720
 721    def _get_next_or_previous_by_FIELD(self, field, is_next, **kwargs):
 722        if not self.id:
 723            raise ValueError("get_next/get_previous cannot be used on unsaved objects.")
 724        op = "gt" if is_next else "lt"
 725        order = "" if is_next else "-"
 726        param = getattr(self, field.attname)
 727        q = Q.create([(field.name, param), (f"id__{op}", self.id)], connector=Q.AND)
 728        q = Q.create([q, (f"{field.name}__{op}", param)], connector=Q.OR)
 729        qs = (
 730            self.__class__.query.filter(**kwargs)
 731            .filter(q)
 732            .order_by(f"{order}{field.name}", f"{order}id")
 733        )
 734        try:
 735            return qs[0]
 736        except IndexError:
 737            raise self.DoesNotExist(
 738                f"{self.__class__._meta.object_name} matching query does not exist."
 739            )
 740
 741    def _get_field_value_map(self, meta, exclude=None):
 742        if exclude is None:
 743            exclude = set()
 744        meta = meta or self._meta
 745        return {
 746            field.name: Value(getattr(self, field.attname), field)
 747            for field in meta.local_concrete_fields
 748            if field.name not in exclude
 749        }
 750
 751    def prepare_database_save(self, field):
 752        if self.id is None:
 753            raise ValueError(
 754                f"Unsaved model instance {self!r} cannot be used in an ORM query."
 755            )
 756        return getattr(self, field.remote_field.get_related_field().attname)
 757
 758    def clean(self):
 759        """
 760        Hook for doing any extra model-wide validation after clean() has been
 761        called on every field by self.clean_fields. Any ValidationError raised
 762        by this method will not be associated with a particular field; it will
 763        have a special-case association with the field defined by NON_FIELD_ERRORS.
 764        """
 765        pass
 766
 767    def validate_unique(self, exclude=None):
 768        """
 769        Check unique constraints on the model and raise ValidationError if any
 770        failed.
 771        """
 772        unique_checks = self._get_unique_checks(exclude=exclude)
 773
 774        if errors := self._perform_unique_checks(unique_checks):
 775            raise ValidationError(errors)
 776
 777    def _get_unique_checks(self, exclude=None):
 778        """
 779        Return a list of checks to perform. Since validate_unique() could be
 780        called from a ModelForm, some fields may have been excluded; we can't
 781        perform a unique check on a model that is missing fields involved
 782        in that check. Fields that did not validate should also be excluded,
 783        but they need to be passed in via the exclude argument.
 784        """
 785        if exclude is None:
 786            exclude = set()
 787        unique_checks = []
 788
 789        # Gather a list of checks for fields declared as unique and add them to
 790        # the list of checks.
 791
 792        fields_with_class = [(self.__class__, self._meta.local_fields)]
 793
 794        for model_class, fields in fields_with_class:
 795            for f in fields:
 796                name = f.name
 797                if name in exclude:
 798                    continue
 799                if f.primary_key:
 800                    unique_checks.append((model_class, (name,)))
 801
 802        return unique_checks
 803
 804    def _perform_unique_checks(self, unique_checks):
 805        errors = {}
 806
 807        for model_class, unique_check in unique_checks:
 808            # Try to look up an existing object with the same values as this
 809            # object's values for all the unique field.
 810
 811            lookup_kwargs = {}
 812            for field_name in unique_check:
 813                f = self._meta.get_field(field_name)
 814                lookup_value = getattr(self, f.attname)
 815                # TODO: Handle multiple backends with different feature flags.
 816                if lookup_value is None:
 817                    # no value, skip the lookup
 818                    continue
 819                if f.primary_key and not self._state.adding:
 820                    # no need to check for unique primary key when editing
 821                    continue
 822                lookup_kwargs[str(field_name)] = lookup_value
 823
 824            # some fields were skipped, no reason to do the check
 825            if len(unique_check) != len(lookup_kwargs):
 826                continue
 827
 828            qs = model_class.query.filter(**lookup_kwargs)
 829
 830            # Exclude the current object from the query if we are editing an
 831            # instance (as opposed to creating a new one)
 832            # Use the primary key defined by model_class. In previous versions
 833            # this could differ from `self.id` due to model inheritance.
 834            model_class_id = getattr(self, "id")
 835            if not self._state.adding and model_class_id is not None:
 836                qs = qs.exclude(id=model_class_id)
 837            if qs.exists():
 838                if len(unique_check) == 1:
 839                    key = unique_check[0]
 840                else:
 841                    key = NON_FIELD_ERRORS
 842                errors.setdefault(key, []).append(
 843                    self.unique_error_message(model_class, unique_check)
 844                )
 845
 846        return errors
 847
 848    def unique_error_message(self, model_class, unique_check):
 849        opts = model_class._meta
 850
 851        params = {
 852            "model": self,
 853            "model_class": model_class,
 854            "model_name": opts.model_name,
 855            "unique_check": unique_check,
 856        }
 857
 858        if len(unique_check) == 1:
 859            field = opts.get_field(unique_check[0])
 860            params["field_label"] = field.name
 861            return ValidationError(
 862                message=field.error_messages["unique"],
 863                code="unique",
 864                params=params,
 865            )
 866        else:
 867            field_names = [opts.get_field(f).name for f in unique_check]
 868
 869            # Put an "and" before the last one
 870            field_names[-1] = f"and {field_names[-1]}"
 871
 872            if len(field_names) > 2:
 873                # Comma join if more than 2
 874                params["field_label"] = ", ".join(field_names)
 875            else:
 876                # Just a space if there are only 2
 877                params["field_label"] = " ".join(field_names)
 878
 879            # Use the first field as the message format...
 880            message = opts.get_field(unique_check[0]).error_messages["unique"]
 881
 882            return ValidationError(
 883                message=message,
 884                code="unique",
 885                params=params,
 886            )
 887
 888    def get_constraints(self):
 889        constraints = [(self.__class__, self._meta.constraints)]
 890        return constraints
 891
 892    def validate_constraints(self, exclude=None):
 893        constraints = self.get_constraints()
 894
 895        errors = {}
 896        for model_class, model_constraints in constraints:
 897            for constraint in model_constraints:
 898                try:
 899                    constraint.validate(model_class, self, exclude=exclude)
 900                except ValidationError as e:
 901                    if (
 902                        getattr(e, "code", None) == "unique"
 903                        and len(constraint.fields) == 1
 904                    ):
 905                        errors.setdefault(constraint.fields[0], []).append(e)
 906                    else:
 907                        errors = e.update_error_dict(errors)
 908        if errors:
 909            raise ValidationError(errors)
 910
 911    def full_clean(
 912        self, *, exclude=None, validate_unique=True, validate_constraints=True
 913    ):
 914        """
 915        Call clean_fields(), clean(), validate_unique(), and
 916        validate_constraints() on the model. Raise a ValidationError for any
 917        errors that occur.
 918        """
 919        errors = {}
 920        if exclude is None:
 921            exclude = set()
 922        else:
 923            exclude = set(exclude)
 924
 925        try:
 926            self.clean_fields(exclude=exclude)
 927        except ValidationError as e:
 928            errors = e.update_error_dict(errors)
 929
 930        # Form.clean() is run even if other validation fails, so do the
 931        # same with Model.clean() for consistency.
 932        try:
 933            self.clean()
 934        except ValidationError as e:
 935            errors = e.update_error_dict(errors)
 936
 937        # Run unique checks, but only for fields that passed validation.
 938        if validate_unique:
 939            for name in errors:
 940                if name != NON_FIELD_ERRORS and name not in exclude:
 941                    exclude.add(name)
 942            try:
 943                self.validate_unique(exclude=exclude)
 944            except ValidationError as e:
 945                errors = e.update_error_dict(errors)
 946
 947        # Run constraints checks, but only for fields that passed validation.
 948        if validate_constraints:
 949            for name in errors:
 950                if name != NON_FIELD_ERRORS and name not in exclude:
 951                    exclude.add(name)
 952            try:
 953                self.validate_constraints(exclude=exclude)
 954            except ValidationError as e:
 955                errors = e.update_error_dict(errors)
 956
 957        if errors:
 958            raise ValidationError(errors)
 959
 960    def clean_fields(self, exclude=None):
 961        """
 962        Clean all fields and raise a ValidationError containing a dict
 963        of all validation errors if any occur.
 964        """
 965        if exclude is None:
 966            exclude = set()
 967
 968        errors = {}
 969        for f in self._meta.fields:
 970            if f.name in exclude:
 971                continue
 972            # Skip validation for empty fields with required=False. The developer
 973            # is responsible for making sure they have a valid value.
 974            raw_value = getattr(self, f.attname)
 975            if not f.required and raw_value in f.empty_values:
 976                continue
 977            try:
 978                setattr(self, f.attname, f.clean(raw_value, self))
 979            except ValidationError as e:
 980                errors[f.name] = e.error_list
 981
 982        if errors:
 983            raise ValidationError(errors)
 984
 985    @classmethod
 986    def check(cls, **kwargs):
 987        errors = []
 988
 989        database = kwargs.get("database", False)
 990        errors += [
 991            *cls._check_fields(**kwargs),
 992            *cls._check_m2m_through_same_relationship(),
 993            *cls._check_long_column_names(database),
 994        ]
 995        clash_errors = (
 996            *cls._check_id_field(),
 997            *cls._check_field_name_clashes(),
 998            *cls._check_model_name_db_lookup_clashes(),
 999            *cls._check_property_name_related_field_accessor_clashes(),
1000            *cls._check_single_primary_key(),
1001        )
1002        errors.extend(clash_errors)
1003        # If there are field name clashes, hide consequent column name
1004        # clashes.
1005        if not clash_errors:
1006            errors.extend(cls._check_column_name_clashes())
1007        errors += [
1008            *cls._check_indexes(database),
1009            *cls._check_ordering(),
1010            *cls._check_constraints(database),
1011            *cls._check_db_table_comment(database),
1012        ]
1013
1014        return errors
1015
1016    @classmethod
1017    def _check_db_table_comment(cls, database):
1018        if not cls._meta.db_table_comment or not database:
1019            return []
1020        errors = []
1021        if not (
1022            db_connection.features.supports_comments
1023            or "supports_comments" in cls._meta.required_db_features
1024        ):
1025            errors.append(
1026                preflight.Warning(
1027                    f"{db_connection.display_name} does not support comments on "
1028                    f"tables (db_table_comment).",
1029                    obj=cls,
1030                    id="models.W046",
1031                )
1032            )
1033        return errors
1034
1035    @classmethod
1036    def _check_fields(cls, **kwargs):
1037        """Perform all field checks."""
1038        errors = []
1039        for field in cls._meta.local_fields:
1040            errors.extend(field.check(**kwargs))
1041        for field in cls._meta.local_many_to_many:
1042            errors.extend(field.check(from_model=cls, **kwargs))
1043        return errors
1044
1045    @classmethod
1046    def _check_m2m_through_same_relationship(cls):
1047        """Check if no relationship model is used by more than one m2m field."""
1048
1049        errors = []
1050        seen_intermediary_signatures = []
1051
1052        fields = cls._meta.local_many_to_many
1053
1054        # Skip when the target model wasn't found.
1055        fields = (f for f in fields if isinstance(f.remote_field.model, ModelBase))
1056
1057        # Skip when the relationship model wasn't found.
1058        fields = (f for f in fields if isinstance(f.remote_field.through, ModelBase))
1059
1060        for f in fields:
1061            signature = (
1062                f.remote_field.model,
1063                cls,
1064                f.remote_field.through,
1065                f.remote_field.through_fields,
1066            )
1067            if signature in seen_intermediary_signatures:
1068                errors.append(
1069                    preflight.Error(
1070                        "The model has two identical many-to-many relations "
1071                        f"through the intermediate model '{f.remote_field.through._meta.label}'.",
1072                        obj=cls,
1073                        id="models.E003",
1074                    )
1075                )
1076            else:
1077                seen_intermediary_signatures.append(signature)
1078        return errors
1079
1080    @classmethod
1081    def _check_id_field(cls):
1082        """Disallow user-defined fields named ``id``."""
1083        if any(
1084            f for f in cls._meta.local_fields if f.name == "id" and not f.auto_created
1085        ):
1086            return [
1087                preflight.Error(
1088                    "'id' is a reserved word that cannot be used as a field name.",
1089                    obj=cls,
1090                    id="models.E004",
1091                )
1092            ]
1093        return []
1094
1095    @classmethod
1096    def _check_field_name_clashes(cls):
1097        """Forbid field shadowing in multi-table inheritance."""
1098        errors = []
1099        used_fields = {}  # name or attname -> field
1100
1101        for f in cls._meta.local_fields:
1102            clash = used_fields.get(f.name) or used_fields.get(f.attname) or None
1103            # Note that we may detect clash between user-defined non-unique
1104            # field "id" and automatically added unique field "id", both
1105            # defined at the same model. This special case is considered in
1106            # _check_id_field and here we ignore it.
1107            id_conflict = (
1108                f.name == "id" and clash and clash.name == "id" and clash.model == cls
1109            )
1110            if clash and not id_conflict:
1111                errors.append(
1112                    preflight.Error(
1113                        f"The field '{f.name}' clashes with the field '{clash.name}' "
1114                        f"from model '{clash.model._meta}'.",
1115                        obj=f,
1116                        id="models.E006",
1117                    )
1118                )
1119            used_fields[f.name] = f
1120            used_fields[f.attname] = f
1121
1122        return errors
1123
1124    @classmethod
1125    def _check_column_name_clashes(cls):
1126        # Store a list of column names which have already been used by other fields.
1127        used_column_names = []
1128        errors = []
1129
1130        for f in cls._meta.local_fields:
1131            _, column_name = f.get_attname_column()
1132
1133            # Ensure the column name is not already in use.
1134            if column_name and column_name in used_column_names:
1135                errors.append(
1136                    preflight.Error(
1137                        f"Field '{f.name}' has column name '{column_name}' that is used by "
1138                        "another field.",
1139                        hint="Specify a 'db_column' for the field.",
1140                        obj=cls,
1141                        id="models.E007",
1142                    )
1143                )
1144            else:
1145                used_column_names.append(column_name)
1146
1147        return errors
1148
1149    @classmethod
1150    def _check_model_name_db_lookup_clashes(cls):
1151        errors = []
1152        model_name = cls.__name__
1153        if model_name.startswith("_") or model_name.endswith("_"):
1154            errors.append(
1155                preflight.Error(
1156                    f"The model name '{model_name}' cannot start or end with an underscore "
1157                    "as it collides with the query lookup syntax.",
1158                    obj=cls,
1159                    id="models.E023",
1160                )
1161            )
1162        elif LOOKUP_SEP in model_name:
1163            errors.append(
1164                preflight.Error(
1165                    f"The model name '{model_name}' cannot contain double underscores as "
1166                    "it collides with the query lookup syntax.",
1167                    obj=cls,
1168                    id="models.E024",
1169                )
1170            )
1171        return errors
1172
1173    @classmethod
1174    def _check_property_name_related_field_accessor_clashes(cls):
1175        errors = []
1176        property_names = cls._meta._property_names
1177        related_field_accessors = (
1178            f.get_attname()
1179            for f in cls._meta._get_fields(reverse=False)
1180            if f.is_relation and f.related_model is not None
1181        )
1182        for accessor in related_field_accessors:
1183            if accessor in property_names:
1184                errors.append(
1185                    preflight.Error(
1186                        f"The property '{accessor}' clashes with a related field "
1187                        "accessor.",
1188                        obj=cls,
1189                        id="models.E025",
1190                    )
1191                )
1192        return errors
1193
1194    @classmethod
1195    def _check_single_primary_key(cls):
1196        errors = []
1197        if sum(1 for f in cls._meta.local_fields if f.primary_key) > 1:
1198            errors.append(
1199                preflight.Error(
1200                    "The model cannot have more than one field with "
1201                    "'primary_key=True'.",
1202                    obj=cls,
1203                    id="models.E026",
1204                )
1205            )
1206        return errors
1207
1208    @classmethod
1209    def _check_indexes(cls, database):
1210        """Check fields, names, and conditions of indexes."""
1211        errors = []
1212        references = set()
1213        for index in cls._meta.indexes:
1214            # Index name can't start with an underscore or a number, restricted
1215            # for cross-database compatibility with Oracle.
1216            if index.name[0] == "_" or index.name[0].isdigit():
1217                errors.append(
1218                    preflight.Error(
1219                        f"The index name '{index.name}' cannot start with an underscore "
1220                        "or a number.",
1221                        obj=cls,
1222                        id="models.E033",
1223                    ),
1224                )
1225            if len(index.name) > index.max_name_length:
1226                errors.append(
1227                    preflight.Error(
1228                        "The index name '%s' cannot be longer than %d "  # noqa: UP031
1229                        "characters." % (index.name, index.max_name_length),
1230                        obj=cls,
1231                        id="models.E034",
1232                    ),
1233                )
1234            if index.contains_expressions:
1235                for expression in index.expressions:
1236                    references.update(
1237                        ref[0] for ref in cls._get_expr_references(expression)
1238                    )
1239        if (
1240            database
1241            and not (
1242                db_connection.features.supports_partial_indexes
1243                or "supports_partial_indexes" in cls._meta.required_db_features
1244            )
1245            and any(index.condition is not None for index in cls._meta.indexes)
1246        ):
1247            errors.append(
1248                preflight.Warning(
1249                    f"{db_connection.display_name} does not support indexes with conditions.",
1250                    hint=(
1251                        "Conditions will be ignored. Silence this warning "
1252                        "if you don't care about it."
1253                    ),
1254                    obj=cls,
1255                    id="models.W037",
1256                )
1257            )
1258        if (
1259            database
1260            and not (
1261                db_connection.features.supports_covering_indexes
1262                or "supports_covering_indexes" in cls._meta.required_db_features
1263            )
1264            and any(index.include for index in cls._meta.indexes)
1265        ):
1266            errors.append(
1267                preflight.Warning(
1268                    f"{db_connection.display_name} does not support indexes with non-key columns.",
1269                    hint=(
1270                        "Non-key columns will be ignored. Silence this "
1271                        "warning if you don't care about it."
1272                    ),
1273                    obj=cls,
1274                    id="models.W040",
1275                )
1276            )
1277        if (
1278            database
1279            and not (
1280                db_connection.features.supports_expression_indexes
1281                or "supports_expression_indexes" in cls._meta.required_db_features
1282            )
1283            and any(index.contains_expressions for index in cls._meta.indexes)
1284        ):
1285            errors.append(
1286                preflight.Warning(
1287                    f"{db_connection.display_name} does not support indexes on expressions.",
1288                    hint=(
1289                        "An index won't be created. Silence this warning "
1290                        "if you don't care about it."
1291                    ),
1292                    obj=cls,
1293                    id="models.W043",
1294                )
1295            )
1296        fields = [
1297            field for index in cls._meta.indexes for field, _ in index.fields_orders
1298        ]
1299        fields += [include for index in cls._meta.indexes for include in index.include]
1300        fields += references
1301        errors.extend(cls._check_local_fields(fields, "indexes"))
1302        return errors
1303
1304    @classmethod
1305    def _check_local_fields(cls, fields, option):
1306        from plain import models
1307
1308        # In order to avoid hitting the relation tree prematurely, we use our
1309        # own fields_map instead of using get_field()
1310        forward_fields_map = {}
1311        for field in cls._meta._get_fields(reverse=False):
1312            forward_fields_map[field.name] = field
1313            if hasattr(field, "attname"):
1314                forward_fields_map[field.attname] = field
1315
1316        errors = []
1317        for field_name in fields:
1318            try:
1319                field = forward_fields_map[field_name]
1320            except KeyError:
1321                errors.append(
1322                    preflight.Error(
1323                        f"'{option}' refers to the nonexistent field '{field_name}'.",
1324                        obj=cls,
1325                        id="models.E012",
1326                    )
1327                )
1328            else:
1329                if isinstance(field.remote_field, models.ManyToManyRel):
1330                    errors.append(
1331                        preflight.Error(
1332                            f"'{option}' refers to a ManyToManyField '{field_name}', but "
1333                            f"ManyToManyFields are not permitted in '{option}'.",
1334                            obj=cls,
1335                            id="models.E013",
1336                        )
1337                    )
1338                elif field not in cls._meta.local_fields:
1339                    errors.append(
1340                        preflight.Error(
1341                            f"'{option}' refers to field '{field_name}' which is not local to model "
1342                            f"'{cls._meta.object_name}'.",
1343                            hint="This issue may be caused by multi-table inheritance.",
1344                            obj=cls,
1345                            id="models.E016",
1346                        )
1347                    )
1348        return errors
1349
1350    @classmethod
1351    def _check_ordering(cls):
1352        """
1353        Check "ordering" option -- is it a list of strings and do all fields
1354        exist?
1355        """
1356
1357        if not cls._meta.ordering:
1358            return []
1359
1360        if not isinstance(cls._meta.ordering, list | tuple):
1361            return [
1362                preflight.Error(
1363                    "'ordering' must be a tuple or list (even if you want to order by "
1364                    "only one field).",
1365                    obj=cls,
1366                    id="models.E014",
1367                )
1368            ]
1369
1370        errors = []
1371        fields = cls._meta.ordering
1372
1373        # Skip expressions and '?' fields.
1374        fields = (f for f in fields if isinstance(f, str) and f != "?")
1375
1376        # Convert "-field" to "field".
1377        fields = (f.removeprefix("-") for f in fields)
1378
1379        # Separate related fields and non-related fields.
1380        _fields = []
1381        related_fields = []
1382        for f in fields:
1383            if LOOKUP_SEP in f:
1384                related_fields.append(f)
1385            else:
1386                _fields.append(f)
1387        fields = _fields
1388
1389        # Check related fields.
1390        for field in related_fields:
1391            _cls = cls
1392            fld = None
1393            for part in field.split(LOOKUP_SEP):
1394                try:
1395                    fld = _cls._meta.get_field(part)
1396                    if fld.is_relation:
1397                        _cls = fld.path_infos[-1].to_opts.model
1398                    else:
1399                        _cls = None
1400                except (FieldDoesNotExist, AttributeError):
1401                    if fld is None or (
1402                        fld.get_transform(part) is None and fld.get_lookup(part) is None
1403                    ):
1404                        errors.append(
1405                            preflight.Error(
1406                                "'ordering' refers to the nonexistent field, "
1407                                f"related field, or lookup '{field}'.",
1408                                obj=cls,
1409                                id="models.E015",
1410                            )
1411                        )
1412
1413        # Check for invalid or nonexistent fields in ordering.
1414        invalid_fields = []
1415
1416        # Any field name that is not present in field_names does not exist.
1417        # Also, ordering by m2m fields is not allowed.
1418        opts = cls._meta
1419        valid_fields = set(
1420            chain.from_iterable(
1421                (f.name, f.attname)
1422                if not (f.auto_created and not f.concrete)
1423                else (f.field.related_query_name(),)
1424                for f in chain(opts.fields, opts.related_objects)
1425            )
1426        )
1427
1428        invalid_fields.extend(set(fields) - valid_fields)
1429
1430        for invalid_field in invalid_fields:
1431            errors.append(
1432                preflight.Error(
1433                    "'ordering' refers to the nonexistent field, related "
1434                    f"field, or lookup '{invalid_field}'.",
1435                    obj=cls,
1436                    id="models.E015",
1437                )
1438            )
1439        return errors
1440
1441    @classmethod
1442    def _check_long_column_names(cls, database):
1443        """
1444        Check that any auto-generated column names are shorter than the limits
1445        for each database in which the model will be created.
1446        """
1447        if not database:
1448            return []
1449        errors = []
1450        allowed_len = None
1451
1452        max_name_length = db_connection.ops.max_name_length()
1453        if max_name_length is not None and not db_connection.features.truncates_names:
1454            allowed_len = max_name_length
1455
1456        if allowed_len is None:
1457            return errors
1458
1459        for f in cls._meta.local_fields:
1460            _, column_name = f.get_attname_column()
1461
1462            # Check if auto-generated name for the field is too long
1463            # for the database.
1464            if (
1465                f.db_column is None
1466                and column_name is not None
1467                and len(column_name) > allowed_len
1468            ):
1469                errors.append(
1470                    preflight.Error(
1471                        f'Autogenerated column name too long for field "{column_name}". '
1472                        f'Maximum length is "{allowed_len}" for the database.',
1473                        hint="Set the column name manually using 'db_column'.",
1474                        obj=cls,
1475                        id="models.E018",
1476                    )
1477                )
1478
1479        for f in cls._meta.local_many_to_many:
1480            # Skip nonexistent models.
1481            if isinstance(f.remote_field.through, str):
1482                continue
1483
1484            # Check if auto-generated name for the M2M field is too long
1485            # for the database.
1486            for m2m in f.remote_field.through._meta.local_fields:
1487                _, rel_name = m2m.get_attname_column()
1488                if (
1489                    m2m.db_column is None
1490                    and rel_name is not None
1491                    and len(rel_name) > allowed_len
1492                ):
1493                    errors.append(
1494                        preflight.Error(
1495                            "Autogenerated column name too long for M2M field "
1496                            f'"{rel_name}". Maximum length is "{allowed_len}" for the database.',
1497                            hint=(
1498                                "Use 'through' to create a separate model for "
1499                                "M2M and then set column_name using 'db_column'."
1500                            ),
1501                            obj=cls,
1502                            id="models.E019",
1503                        )
1504                    )
1505
1506        return errors
1507
1508    @classmethod
1509    def _get_expr_references(cls, expr):
1510        if isinstance(expr, Q):
1511            for child in expr.children:
1512                if isinstance(child, tuple):
1513                    lookup, value = child
1514                    yield tuple(lookup.split(LOOKUP_SEP))
1515                    yield from cls._get_expr_references(value)
1516                else:
1517                    yield from cls._get_expr_references(child)
1518        elif isinstance(expr, F):
1519            yield tuple(expr.name.split(LOOKUP_SEP))
1520        elif hasattr(expr, "get_source_expressions"):
1521            for src_expr in expr.get_source_expressions():
1522                yield from cls._get_expr_references(src_expr)
1523
1524    @classmethod
1525    def _check_constraints(cls, database):
1526        errors = []
1527        if database:
1528            if not (
1529                db_connection.features.supports_table_check_constraints
1530                or "supports_table_check_constraints" in cls._meta.required_db_features
1531            ) and any(
1532                isinstance(constraint, CheckConstraint)
1533                for constraint in cls._meta.constraints
1534            ):
1535                errors.append(
1536                    preflight.Warning(
1537                        f"{db_connection.display_name} does not support check constraints.",
1538                        hint=(
1539                            "A constraint won't be created. Silence this "
1540                            "warning if you don't care about it."
1541                        ),
1542                        obj=cls,
1543                        id="models.W027",
1544                    )
1545                )
1546            if not (
1547                db_connection.features.supports_partial_indexes
1548                or "supports_partial_indexes" in cls._meta.required_db_features
1549            ) and any(
1550                isinstance(constraint, UniqueConstraint)
1551                and constraint.condition is not None
1552                for constraint in cls._meta.constraints
1553            ):
1554                errors.append(
1555                    preflight.Warning(
1556                        f"{db_connection.display_name} does not support unique constraints with "
1557                        "conditions.",
1558                        hint=(
1559                            "A constraint won't be created. Silence this "
1560                            "warning if you don't care about it."
1561                        ),
1562                        obj=cls,
1563                        id="models.W036",
1564                    )
1565                )
1566            if not (
1567                db_connection.features.supports_deferrable_unique_constraints
1568                or "supports_deferrable_unique_constraints"
1569                in cls._meta.required_db_features
1570            ) and any(
1571                isinstance(constraint, UniqueConstraint)
1572                and constraint.deferrable is not None
1573                for constraint in cls._meta.constraints
1574            ):
1575                errors.append(
1576                    preflight.Warning(
1577                        f"{db_connection.display_name} does not support deferrable unique constraints.",
1578                        hint=(
1579                            "A constraint won't be created. Silence this "
1580                            "warning if you don't care about it."
1581                        ),
1582                        obj=cls,
1583                        id="models.W038",
1584                    )
1585                )
1586            if not (
1587                db_connection.features.supports_covering_indexes
1588                or "supports_covering_indexes" in cls._meta.required_db_features
1589            ) and any(
1590                isinstance(constraint, UniqueConstraint) and constraint.include
1591                for constraint in cls._meta.constraints
1592            ):
1593                errors.append(
1594                    preflight.Warning(
1595                        f"{db_connection.display_name} does not support unique constraints with non-key "
1596                        "columns.",
1597                        hint=(
1598                            "A constraint won't be created. Silence this "
1599                            "warning if you don't care about it."
1600                        ),
1601                        obj=cls,
1602                        id="models.W039",
1603                    )
1604                )
1605            if not (
1606                db_connection.features.supports_expression_indexes
1607                or "supports_expression_indexes" in cls._meta.required_db_features
1608            ) and any(
1609                isinstance(constraint, UniqueConstraint)
1610                and constraint.contains_expressions
1611                for constraint in cls._meta.constraints
1612            ):
1613                errors.append(
1614                    preflight.Warning(
1615                        f"{db_connection.display_name} does not support unique constraints on "
1616                        "expressions.",
1617                        hint=(
1618                            "A constraint won't be created. Silence this "
1619                            "warning if you don't care about it."
1620                        ),
1621                        obj=cls,
1622                        id="models.W044",
1623                    )
1624                )
1625        fields = set(
1626            chain.from_iterable(
1627                (*constraint.fields, *constraint.include)
1628                for constraint in cls._meta.constraints
1629                if isinstance(constraint, UniqueConstraint)
1630            )
1631        )
1632        references = set()
1633        for constraint in cls._meta.constraints:
1634            if isinstance(constraint, UniqueConstraint):
1635                if (
1636                    db_connection.features.supports_partial_indexes
1637                    or "supports_partial_indexes" not in cls._meta.required_db_features
1638                ) and isinstance(constraint.condition, Q):
1639                    references.update(cls._get_expr_references(constraint.condition))
1640                if (
1641                    db_connection.features.supports_expression_indexes
1642                    or "supports_expression_indexes"
1643                    not in cls._meta.required_db_features
1644                ) and constraint.contains_expressions:
1645                    for expression in constraint.expressions:
1646                        references.update(cls._get_expr_references(expression))
1647            elif isinstance(constraint, CheckConstraint):
1648                if (
1649                    db_connection.features.supports_table_check_constraints
1650                    or "supports_table_check_constraints"
1651                    not in cls._meta.required_db_features
1652                ):
1653                    if isinstance(constraint.check, Q):
1654                        references.update(cls._get_expr_references(constraint.check))
1655                    if any(
1656                        isinstance(expr, RawSQL) for expr in constraint.check.flatten()
1657                    ):
1658                        errors.append(
1659                            preflight.Warning(
1660                                f"Check constraint {constraint.name!r} contains "
1661                                f"RawSQL() expression and won't be validated "
1662                                f"during the model full_clean().",
1663                                hint=(
1664                                    "Silence this warning if you don't care about it."
1665                                ),
1666                                obj=cls,
1667                                id="models.W045",
1668                            ),
1669                        )
1670        for field_name, *lookups in references:
1671            fields.add(field_name)
1672            if not lookups:
1673                # If it has no lookups it cannot result in a JOIN.
1674                continue
1675            try:
1676                field = cls._meta.get_field(field_name)
1677                if not field.is_relation or field.many_to_many or field.one_to_many:
1678                    continue
1679            except FieldDoesNotExist:
1680                continue
1681            # JOIN must happen at the first lookup.
1682            first_lookup = lookups[0]
1683            if (
1684                hasattr(field, "get_transform")
1685                and hasattr(field, "get_lookup")
1686                and field.get_transform(first_lookup) is None
1687                and field.get_lookup(first_lookup) is None
1688            ):
1689                errors.append(
1690                    preflight.Error(
1691                        f"'constraints' refers to the joined field '{LOOKUP_SEP.join([field_name] + lookups)}'.",
1692                        obj=cls,
1693                        id="models.E041",
1694                    )
1695                )
1696        errors.extend(cls._check_local_fields(fields, "constraints"))
1697        return errors
1698
1699
1700########
1701# MISC #
1702########
1703
1704
1705def model_unpickle(model_id):
1706    """Used to unpickle Model subclasses with deferred fields."""
1707    if isinstance(model_id, tuple):
1708        model = models_registry.get_model(*model_id)
1709    else:
1710        # Backwards compat - the model was cached directly in earlier versions.
1711        model = model_id
1712    return model.__new__(model)
1713
1714
1715model_unpickle.__safe_for_unpickle__ = True