1from __future__ import annotations
2
3import copy
4import warnings
5from collections.abc import Iterable, Iterator, Sequence
6from itertools import chain
7from typing import TYPE_CHECKING, Any
8
9if TYPE_CHECKING:
10 from plain.models.meta import Meta
11 from plain.models.options import Options
12
13import plain.runtime
14from plain.exceptions import NON_FIELD_ERRORS, ValidationError
15from plain.models import models_registry, transaction
16from plain.models.constants import LOOKUP_SEP
17from plain.models.constraints import CheckConstraint, UniqueConstraint
18from plain.models.db import (
19 PLAIN_VERSION_PICKLE_KEY,
20 DatabaseError,
21 db_connection,
22)
23from plain.models.deletion import Collector
24from plain.models.exceptions import (
25 DoesNotExistDescriptor,
26 FieldDoesNotExist,
27 MultipleObjectsReturnedDescriptor,
28)
29from plain.models.expressions import RawSQL, Value
30from plain.models.fields import NOT_PROVIDED, PrimaryKeyField
31from plain.models.fields.reverse_related import ForeignObjectRel
32from plain.models.meta import Meta
33from plain.models.options import Options
34from plain.models.query import F, Q, QuerySet
35from plain.preflight import PreflightResult
36from plain.utils.encoding import force_str
37from plain.utils.hashable import make_hashable
38
39
40class Deferred:
41 def __repr__(self) -> str:
42 return "<Deferred field>"
43
44 def __str__(self) -> str:
45 return "<Deferred field>"
46
47
48DEFERRED = Deferred()
49
50
51class ModelBase(type):
52 """Metaclass for all models."""
53
54 def __new__(
55 cls, name: str, bases: tuple[type, ...], attrs: dict[str, Any], **kwargs: Any
56 ) -> type:
57 # Don't do any of this for the root models.Model class.
58 if not bases:
59 return super().__new__(cls, name, bases, attrs)
60
61 for base in bases:
62 # Models are required to directly inherit from model.Model, not a subclass of it.
63 if issubclass(base, Model) and base is not Model:
64 raise TypeError(
65 f"A model can't extend another model: {name} extends {base}"
66 )
67
68 return super().__new__(cls, name, bases, attrs, **kwargs)
69
70
71class ModelStateFieldsCacheDescriptor:
72 def __get__(
73 self, instance: ModelState | None, cls: type | None = None
74 ) -> ModelStateFieldsCacheDescriptor | dict[str, Any]:
75 if instance is None:
76 return self
77 res = instance.fields_cache = {}
78 return res
79
80
81class ModelState:
82 """Store model instance state."""
83
84 # If true, uniqueness validation checks will consider this a new, unsaved
85 # object. Necessary for correct validation of new instances of objects with
86 # explicit (non-auto) PKs. This impacts validation only; it has no effect
87 # on the actual save.
88 adding = True
89 fields_cache = ModelStateFieldsCacheDescriptor()
90
91
92class Model(metaclass=ModelBase):
93 # Every model gets an automatic id field
94 id = PrimaryKeyField()
95
96 # Descriptors for other model behavior
97 query = QuerySet()
98 model_options = Options()
99 _model_meta = Meta()
100 DoesNotExist = DoesNotExistDescriptor()
101 MultipleObjectsReturned = MultipleObjectsReturnedDescriptor()
102
103 def __init__(self, *args: Any, **kwargs: Any):
104 # Alias some things as locals to avoid repeat global lookups
105 cls = self.__class__
106 meta = cls._model_meta
107 _setattr = setattr
108 _DEFERRED = DEFERRED
109
110 # Set up the storage for instance state
111 self._state = ModelState()
112
113 # There is a rather weird disparity here; if kwargs, it's set, then args
114 # overrides it. It should be one or the other; don't duplicate the work
115 # The reason for the kwargs check is that standard iterator passes in by
116 # args, and instantiation for iteration is 33% faster.
117 if len(args) > len(meta.concrete_fields):
118 # Daft, but matches old exception sans the err msg.
119 raise IndexError("Number of args exceeds number of fields")
120
121 if not kwargs:
122 fields_iter = iter(meta.concrete_fields)
123 # The ordering of the zip calls matter - zip throws StopIteration
124 # when an iter throws it. So if the first iter throws it, the second
125 # is *not* consumed. We rely on this, so don't change the order
126 # without changing the logic.
127 for val, field in zip(args, fields_iter):
128 if val is _DEFERRED:
129 continue
130 _setattr(self, field.attname, val)
131 else:
132 # Slower, kwargs-ready version.
133 fields_iter = iter(meta.fields)
134 for val, field in zip(args, fields_iter):
135 if val is _DEFERRED:
136 continue
137 _setattr(self, field.attname, val)
138 if kwargs.pop(field.name, NOT_PROVIDED) is not NOT_PROVIDED:
139 raise TypeError(
140 f"{cls.__qualname__}() got both positional and "
141 f"keyword arguments for field '{field.name}'."
142 )
143
144 # Now we're left with the unprocessed fields that *must* come from
145 # keywords, or default.
146
147 for field in fields_iter:
148 is_related_object = False
149 # Virtual field
150 if field.attname not in kwargs and field.column is None:
151 continue
152 if kwargs:
153 if isinstance(field.remote_field, ForeignObjectRel):
154 try:
155 # Assume object instance was passed in.
156 rel_obj = kwargs.pop(field.name)
157 is_related_object = True
158 except KeyError:
159 try:
160 # Object instance wasn't passed in -- must be an ID.
161 val = kwargs.pop(field.attname)
162 except KeyError:
163 val = field.get_default()
164 else:
165 try:
166 val = kwargs.pop(field.attname)
167 except KeyError:
168 # This is done with an exception rather than the
169 # default argument on pop because we don't want
170 # get_default() to be evaluated, and then not used.
171 # Refs #12057.
172 val = field.get_default()
173 else:
174 val = field.get_default()
175
176 if is_related_object:
177 # If we are passed a related instance, set it using the
178 # field.name instead of field.attname (e.g. "user" instead of
179 # "user_id") so that the object gets properly cached (and type
180 # checked) by the RelatedObjectDescriptor.
181 if rel_obj is not _DEFERRED:
182 _setattr(self, field.name, rel_obj)
183 else:
184 if val is not _DEFERRED:
185 _setattr(self, field.attname, val)
186
187 if kwargs:
188 property_names = meta._property_names
189 unexpected = ()
190 for prop, value in kwargs.items():
191 # Any remaining kwargs must correspond to properties or virtual
192 # fields.
193 if prop in property_names:
194 if value is not _DEFERRED:
195 _setattr(self, prop, value)
196 else:
197 try:
198 meta.get_field(prop)
199 except FieldDoesNotExist:
200 unexpected += (prop,)
201 else:
202 if value is not _DEFERRED:
203 _setattr(self, prop, value)
204 if unexpected:
205 unexpected_names = ", ".join(repr(n) for n in unexpected)
206 raise TypeError(
207 f"{cls.__name__}() got unexpected keyword arguments: "
208 f"{unexpected_names}"
209 )
210 super().__init__()
211
212 @classmethod
213 def from_db(cls, field_names: Iterable[str], values: Sequence[Any]) -> Model:
214 if len(values) != len(cls._model_meta.concrete_fields):
215 values_iter = iter(values)
216 values = [
217 next(values_iter) if f.attname in field_names else DEFERRED
218 for f in cls._model_meta.concrete_fields
219 ]
220 new = cls(*values)
221 new._state.adding = False
222 return new
223
224 def __repr__(self) -> str:
225 return f"<{self.__class__.__name__}: {self}>"
226
227 def __str__(self) -> str:
228 return f"{self.__class__.__name__} object ({self.id})"
229
230 def __eq__(self, other: object) -> bool:
231 if not isinstance(other, Model):
232 return NotImplemented
233 if self.__class__ != other.__class__:
234 return False
235 my_id = self.id
236 if my_id is None:
237 return self is other
238 return my_id == other.id
239
240 def __hash__(self) -> int:
241 if self.id is None:
242 raise TypeError("Model instances without primary key value are unhashable")
243 return hash(self.id)
244
245 def __reduce__(self) -> tuple[Any, tuple[Any, ...], dict[str, Any]]:
246 data = self.__getstate__()
247 data[PLAIN_VERSION_PICKLE_KEY] = plain.runtime.__version__
248 class_id = (
249 self.model_options.package_label,
250 self.model_options.object_name,
251 )
252 return model_unpickle, (class_id,), data
253
254 def __getstate__(self) -> dict[str, Any]:
255 """Hook to allow choosing the attributes to pickle."""
256 state = self.__dict__.copy()
257 state["_state"] = copy.copy(state["_state"])
258 state["_state"].fields_cache = state["_state"].fields_cache.copy()
259 # memoryview cannot be pickled, so cast it to bytes and store
260 # separately.
261 _memoryview_attrs = []
262 for attr, value in state.items():
263 if isinstance(value, memoryview):
264 _memoryview_attrs.append((attr, bytes(value)))
265 if _memoryview_attrs:
266 state["_memoryview_attrs"] = _memoryview_attrs
267 for attr, value in _memoryview_attrs:
268 state.pop(attr)
269 return state
270
271 def __setstate__(self, state: dict[str, Any]) -> None:
272 pickled_version = state.get(PLAIN_VERSION_PICKLE_KEY)
273 if pickled_version:
274 if pickled_version != plain.runtime.__version__:
275 warnings.warn(
276 f"Pickled model instance's Plain version {pickled_version} does not "
277 f"match the current version {plain.runtime.__version__}.",
278 RuntimeWarning,
279 stacklevel=2,
280 )
281 else:
282 warnings.warn(
283 "Pickled model instance's Plain version is not specified.",
284 RuntimeWarning,
285 stacklevel=2,
286 )
287 if "_memoryview_attrs" in state:
288 for attr, value in state.pop("_memoryview_attrs"):
289 state[attr] = memoryview(value)
290 self.__dict__.update(state)
291
292 def get_deferred_fields(self) -> set[str]:
293 """
294 Return a set containing names of deferred fields for this instance.
295 """
296 return {
297 f.attname
298 for f in self._model_meta.concrete_fields
299 if f.attname not in self.__dict__
300 }
301
302 def refresh_from_db(self, fields: list[str] | None = None) -> None:
303 """
304 Reload field values from the database.
305
306 By default, the reloading happens from the database this instance was
307 loaded from, or by the read router if this instance wasn't loaded from
308 any database. The using parameter will override the default.
309
310 Fields can be used to specify which fields to reload. The fields
311 should be an iterable of field attnames. If fields is None, then
312 all non-deferred fields are reloaded.
313
314 When accessing deferred fields of an instance, the deferred loading
315 of the field will call this method.
316 """
317 if fields is None:
318 self._prefetched_objects_cache = {}
319 else:
320 prefetched_objects_cache = getattr(self, "_prefetched_objects_cache", ())
321 for field in fields:
322 if field in prefetched_objects_cache:
323 del prefetched_objects_cache[field] # type: ignore[misc]
324 fields.remove(field)
325 if not fields:
326 return
327 if any(LOOKUP_SEP in f for f in fields):
328 raise ValueError(
329 f'Found "{LOOKUP_SEP}" in fields argument. Relations and transforms '
330 "are not allowed in fields."
331 )
332
333 db_instance_qs = self._model_meta.base_queryset.filter(id=self.id)
334
335 # Use provided fields, if not set then reload all non-deferred fields.
336 deferred_fields = self.get_deferred_fields()
337 if fields is not None:
338 fields = list(fields)
339 db_instance_qs = db_instance_qs.only(*fields)
340 elif deferred_fields:
341 fields = [
342 f.attname
343 for f in self._model_meta.concrete_fields
344 if f.attname not in deferred_fields
345 ]
346 db_instance_qs = db_instance_qs.only(*fields)
347
348 db_instance = db_instance_qs.get()
349 non_loaded_fields = db_instance.get_deferred_fields()
350 for field in self._model_meta.concrete_fields:
351 if field.attname in non_loaded_fields:
352 # This field wasn't refreshed - skip ahead.
353 continue
354 setattr(self, field.attname, getattr(db_instance, field.attname))
355 # Clear cached foreign keys.
356 if field.is_relation and field.is_cached(self):
357 field.delete_cached_value(self)
358
359 # Clear cached relations.
360 for field in self._model_meta.related_objects:
361 if field.is_cached(self):
362 field.delete_cached_value(self)
363
364 def serializable_value(self, field_name: str) -> Any:
365 """
366 Return the value of the field name for this instance. If the field is
367 a foreign key, return the id value instead of the object. If there's
368 no Field object with this name on the model, return the model
369 attribute's value.
370
371 Used to serialize a field's value (in the serializer, or form output,
372 for example). Normally, you would just access the attribute directly
373 and not use this method.
374 """
375 try:
376 field = self._model_meta.get_field(field_name)
377 except FieldDoesNotExist:
378 return getattr(self, field_name)
379 return getattr(self, field.attname)
380
381 def save(
382 self,
383 *,
384 clean_and_validate: bool = True,
385 force_insert: bool = False,
386 force_update: bool = False,
387 update_fields: Iterable[str] | None = None,
388 ) -> None:
389 """
390 Save the current instance. Override this in a subclass if you want to
391 control the saving process.
392
393 The 'force_insert' and 'force_update' parameters can be used to insist
394 that the "save" must be an SQL insert or update (or equivalent for
395 non-SQL backends), respectively. Normally, they should not be set.
396 """
397 self._prepare_related_fields_for_save(operation_name="save")
398
399 if force_insert and (force_update or update_fields):
400 raise ValueError("Cannot force both insert and updating in model saving.")
401
402 deferred_fields = self.get_deferred_fields()
403 if update_fields is not None:
404 # If update_fields is empty, skip the save. We do also check for
405 # no-op saves later on for inheritance cases. This bailout is
406 # still needed for skipping signal sending.
407 if not update_fields:
408 return
409
410 update_fields = frozenset(update_fields)
411 field_names = self._model_meta._non_pk_concrete_field_names
412 non_model_fields = update_fields.difference(field_names)
413
414 if non_model_fields:
415 raise ValueError(
416 "The following fields do not exist in this model, are m2m "
417 "fields, or are non-concrete fields: {}".format(
418 ", ".join(non_model_fields)
419 )
420 )
421
422 # If this model is deferred, automatically do an "update_fields" save
423 # on the loaded fields.
424 elif not force_insert and deferred_fields:
425 field_names = set()
426 for field in self._model_meta.concrete_fields:
427 if not field.primary_key and not hasattr(field, "through"):
428 field_names.add(field.attname)
429 loaded_fields = field_names.difference(deferred_fields)
430 if loaded_fields:
431 update_fields = frozenset(loaded_fields)
432
433 if clean_and_validate:
434 self.full_clean(exclude=deferred_fields)
435
436 self.save_base(
437 force_insert=force_insert,
438 force_update=force_update,
439 update_fields=update_fields,
440 )
441
442 def save_base(
443 self,
444 *,
445 raw: bool = False,
446 force_insert: bool = False,
447 force_update: bool = False,
448 update_fields: Iterable[str] | None = None,
449 ) -> None:
450 """
451 Handle the parts of saving which should be done only once per save,
452 yet need to be done in raw saves, too. This includes some sanity
453 checks and signal sending.
454
455 The 'raw' argument is telling save_base not to save any parent
456 models and not to do any changes to the values before save. This
457 is used by fixture loading.
458 """
459 assert not (force_insert and (force_update or update_fields))
460 assert update_fields is None or update_fields
461 cls = self.__class__
462
463 with transaction.mark_for_rollback_on_error():
464 self._save_table(
465 raw=raw,
466 cls=cls,
467 force_insert=force_insert,
468 force_update=force_update,
469 update_fields=update_fields,
470 )
471 # Once saved, this is no longer a to-be-added instance.
472 self._state.adding = False
473
474 def _save_table(
475 self,
476 *,
477 raw: bool,
478 cls: type[Model],
479 force_insert: bool = False,
480 force_update: bool = False,
481 update_fields: Iterable[str] | None = None,
482 ) -> bool:
483 """
484 Do the heavy-lifting involved in saving. Update or insert the data
485 for a single table.
486 """
487 meta = cls._model_meta
488 non_pks = [f for f in meta.local_concrete_fields if not f.primary_key]
489
490 if update_fields:
491 non_pks = [
492 f
493 for f in non_pks
494 if f.name in update_fields or f.attname in update_fields
495 ]
496
497 id_val = self.id
498 if id_val is None:
499 id_field = meta.get_field("id")
500 id_val = id_field.get_id_value_on_save(self)
501 setattr(self, id_field.attname, id_val)
502 id_set = id_val is not None
503 if not id_set and (force_update or update_fields):
504 raise ValueError("Cannot force an update in save() with no primary key.")
505 updated = False
506 # Skip an UPDATE when adding an instance and primary key has a default.
507 if (
508 not raw
509 and not force_insert
510 and self._state.adding
511 and meta.get_field("id").default
512 and meta.get_field("id").default is not NOT_PROVIDED
513 ):
514 force_insert = True
515 # If possible, try an UPDATE. If that doesn't update anything, do an INSERT.
516 if id_set and not force_insert:
517 base_qs = meta.base_queryset
518 values = [
519 (
520 f,
521 None,
522 (getattr(self, f.attname) if raw else f.pre_save(self, False)),
523 )
524 for f in non_pks
525 ]
526 forced_update = update_fields or force_update
527 updated = self._do_update(
528 base_qs, id_val, values, update_fields, forced_update
529 )
530 if force_update and not updated:
531 raise DatabaseError("Forced update did not affect any rows.")
532 if update_fields and not updated:
533 raise DatabaseError("Save with update_fields did not affect any rows.")
534 if not updated:
535 fields = meta.local_concrete_fields
536 if not id_set:
537 id_field = meta.get_field("id")
538 fields = [f for f in fields if f is not id_field]
539
540 returning_fields = meta.db_returning_fields
541 results = self._do_insert(meta.base_queryset, fields, returning_fields, raw)
542 if results:
543 for value, field in zip(results[0], returning_fields):
544 setattr(self, field.attname, value)
545 return updated
546
547 def _do_update(
548 self,
549 base_qs: QuerySet,
550 id_val: Any,
551 values: list[tuple[Any, Any, Any]],
552 update_fields: Iterable[str] | None,
553 forced_update: bool,
554 ) -> bool:
555 """
556 Try to update the model. Return True if the model was updated (if an
557 update query was done and a matching row was found in the DB).
558 """
559 filtered = base_qs.filter(id=id_val)
560 if not values:
561 # We can end up here when saving a model in inheritance chain where
562 # update_fields doesn't target any field in current model. In that
563 # case we just say the update succeeded. Another case ending up here
564 # is a model with just PK - in that case check that the PK still
565 # exists.
566 return update_fields is not None or filtered.exists()
567 return filtered._update(values) > 0
568
569 def _do_insert(
570 self,
571 manager: QuerySet,
572 fields: Sequence[Any],
573 returning_fields: Sequence[Any],
574 raw: bool,
575 ) -> list[Any]:
576 """
577 Do an INSERT. If returning_fields is defined then this method should
578 return the newly created data for the model.
579 """
580 return manager._insert( # type: ignore[return-value, arg-type]
581 [self],
582 fields=fields, # type: ignore[arg-type]
583 returning_fields=returning_fields, # type: ignore[arg-type]
584 raw=raw,
585 )
586
587 def _prepare_related_fields_for_save(
588 self, operation_name: str, fields: Sequence[Any] | None = None
589 ) -> None:
590 # Ensure that a model instance without a PK hasn't been assigned to
591 # a ForeignKey on this model. If the field is nullable, allowing the save would result in silent data loss.
592 for field in self._model_meta.concrete_fields:
593 if fields and field not in fields:
594 continue
595 # If the related field isn't cached, then an instance hasn't been
596 # assigned and there's no need to worry about this check.
597 if field.is_relation and field.is_cached(self):
598 obj = getattr(self, field.name, None)
599 if not obj:
600 continue
601 # A pk may have been assigned manually to a model instance not
602 # saved to the database (or auto-generated in a case like
603 # UUIDField), but we allow the save to proceed and rely on the
604 # database to raise an IntegrityError if applicable. If
605 # constraints aren't supported by the database, there's the
606 # unavoidable risk of data corruption.
607 if obj.id is None:
608 # Remove the object from a related instance cache.
609 if not field.remote_field.multiple:
610 field.remote_field.delete_cached_value(obj)
611 raise ValueError(
612 f"{operation_name}() prohibited to prevent data loss due to unsaved "
613 f"related object '{field.name}'."
614 )
615 elif getattr(self, field.attname) in field.empty_values:
616 # Set related object if it has been saved after an
617 # assignment.
618 setattr(self, field.name, obj)
619 # If the relationship's pk/to_field was changed, clear the
620 # cached relationship.
621 if getattr(obj, field.target_field.attname) != getattr(
622 self, field.attname
623 ):
624 field.delete_cached_value(self)
625
626 def delete(self) -> tuple[int, dict[str, int]]:
627 if self.id is None:
628 raise ValueError(
629 f"{self.model_options.object_name} object can't be deleted because its id attribute is set "
630 "to None."
631 )
632 collector = Collector(origin=self)
633 collector.collect([self])
634 return collector.delete()
635
636 def get_field_display(self, field_name: str) -> str:
637 """Get the display value for a field, especially useful for fields with choices."""
638 # Get the field object from the field name
639 field = self._model_meta.get_field(field_name)
640 value = getattr(self, field.attname)
641
642 # If field has no choices, just return the value as string
643 if not hasattr(field, "flatchoices") or not field.flatchoices:
644 return force_str(value, strings_only=True)
645
646 # For fields with choices, look up the display value
647 choices_dict = dict(make_hashable(field.flatchoices))
648 return force_str(
649 choices_dict.get(make_hashable(value), value), strings_only=True
650 )
651
652 def _get_field_value_map(
653 self, meta: Meta | None, exclude: set[str] | None = None
654 ) -> dict[str, Value]:
655 if exclude is None:
656 exclude = set()
657 meta = meta or self._model_meta
658 return {
659 field.name: Value(getattr(self, field.attname), field)
660 for field in meta.local_concrete_fields
661 if field.name not in exclude
662 }
663
664 def prepare_database_save(self, field: Any) -> Any:
665 if self.id is None:
666 raise ValueError(
667 f"Unsaved model instance {self!r} cannot be used in an ORM query."
668 )
669 return getattr(self, field.remote_field.get_related_field().attname)
670
671 def clean(self) -> None:
672 """
673 Hook for doing any extra model-wide validation after clean() has been
674 called on every field by self.clean_fields. Any ValidationError raised
675 by this method will not be associated with a particular field; it will
676 have a special-case association with the field defined by NON_FIELD_ERRORS.
677 """
678 pass
679
680 def validate_unique(self, exclude: set[str] | None = None) -> None:
681 """
682 Check unique constraints on the model and raise ValidationError if any
683 failed.
684 """
685 unique_checks = self._get_unique_checks(exclude=exclude)
686
687 if errors := self._perform_unique_checks(unique_checks):
688 raise ValidationError(errors)
689
690 def _get_unique_checks(
691 self, exclude: set[str] | None = None
692 ) -> list[tuple[type, tuple[str, ...]]]:
693 """
694 Return a list of checks to perform. Since validate_unique() could be
695 called from a ModelForm, some fields may have been excluded; we can't
696 perform a unique check on a model that is missing fields involved
697 in that check. Fields that did not validate should also be excluded,
698 but they need to be passed in via the exclude argument.
699 """
700 if exclude is None:
701 exclude = set()
702 unique_checks = []
703
704 # Gather a list of checks for fields declared as unique and add them to
705 # the list of checks.
706
707 fields_with_class = [(self.__class__, self._model_meta.local_fields)]
708
709 for model_class, fields in fields_with_class:
710 for f in fields:
711 name = f.name
712 if name in exclude:
713 continue
714 if f.primary_key:
715 unique_checks.append((model_class, (name,)))
716
717 return unique_checks
718
719 def _perform_unique_checks(
720 self, unique_checks: list[tuple[type, tuple[str, ...]]]
721 ) -> dict[str, list[ValidationError]]:
722 errors = {}
723
724 for model_class, unique_check in unique_checks:
725 # Try to look up an existing object with the same values as this
726 # object's values for all the unique field.
727
728 lookup_kwargs = {}
729 for field_name in unique_check:
730 f = self._model_meta.get_field(field_name)
731 lookup_value = getattr(self, f.attname)
732 # TODO: Handle multiple backends with different feature flags.
733 if lookup_value is None:
734 # no value, skip the lookup
735 continue
736 if f.primary_key and not self._state.adding:
737 # no need to check for unique primary key when editing
738 continue
739 lookup_kwargs[str(field_name)] = lookup_value
740
741 # some fields were skipped, no reason to do the check
742 if len(unique_check) != len(lookup_kwargs):
743 continue
744
745 qs = model_class.query.filter(**lookup_kwargs) # type: ignore[attr-defined]
746
747 # Exclude the current object from the query if we are editing an
748 # instance (as opposed to creating a new one)
749 # Use the primary key defined by model_class. In previous versions
750 # this could differ from `self.id` due to model inheritance.
751 model_class_id = getattr(self, "id")
752 if not self._state.adding and model_class_id is not None:
753 qs = qs.exclude(id=model_class_id)
754 if qs.exists():
755 if len(unique_check) == 1:
756 key = unique_check[0]
757 else:
758 key = NON_FIELD_ERRORS
759 errors.setdefault(key, []).append(
760 self.unique_error_message(model_class, unique_check)
761 )
762
763 return errors
764
765 def unique_error_message(
766 self, model_class: type[Model], unique_check: tuple[str, ...]
767 ) -> ValidationError:
768 meta = model_class._model_meta
769
770 params = {
771 "model": self,
772 "model_class": model_class,
773 "model_name": model_class.model_options.model_name,
774 "unique_check": unique_check,
775 }
776
777 if len(unique_check) == 1:
778 field = meta.get_field(unique_check[0])
779 params["field_label"] = field.name
780 return ValidationError(
781 message=field.error_messages["unique"],
782 code="unique",
783 params=params,
784 )
785 else:
786 field_names = [meta.get_field(f).name for f in unique_check]
787
788 # Put an "and" before the last one
789 field_names[-1] = f"and {field_names[-1]}"
790
791 if len(field_names) > 2:
792 # Comma join if more than 2
793 params["field_label"] = ", ".join(field_names)
794 else:
795 # Just a space if there are only 2
796 params["field_label"] = " ".join(field_names)
797
798 # Use the first field as the message format...
799 message = meta.get_field(unique_check[0]).error_messages["unique"]
800
801 return ValidationError(
802 message=message,
803 code="unique",
804 params=params,
805 )
806
807 def get_constraints(self) -> list[tuple[type, list[Any]]]:
808 constraints = [(self.__class__, self.model_options.constraints)]
809 return constraints
810
811 def validate_constraints(self, exclude: set[str] | None = None) -> None:
812 constraints = self.get_constraints()
813
814 errors = {}
815 for model_class, model_constraints in constraints:
816 for constraint in model_constraints:
817 try:
818 constraint.validate(model_class, self, exclude=exclude)
819 except ValidationError as e:
820 if (
821 getattr(e, "code", None) == "unique"
822 and len(constraint.fields) == 1
823 ):
824 errors.setdefault(constraint.fields[0], []).append(e)
825 else:
826 errors = e.update_error_dict(errors)
827 if errors:
828 raise ValidationError(errors)
829
830 def full_clean(
831 self,
832 *,
833 exclude: set[str] | Iterable[str] | None = None,
834 validate_unique: bool = True,
835 validate_constraints: bool = True,
836 ) -> None:
837 """
838 Call clean_fields(), clean(), validate_unique(), and
839 validate_constraints() on the model. Raise a ValidationError for any
840 errors that occur.
841 """
842 errors = {}
843 if exclude is None:
844 exclude = set()
845 else:
846 exclude = set(exclude)
847
848 try:
849 self.clean_fields(exclude=exclude)
850 except ValidationError as e:
851 errors = e.update_error_dict(errors)
852
853 # Form.clean() is run even if other validation fails, so do the
854 # same with Model.clean() for consistency.
855 try:
856 self.clean()
857 except ValidationError as e:
858 errors = e.update_error_dict(errors)
859
860 # Run unique checks, but only for fields that passed validation.
861 if validate_unique:
862 for name in errors:
863 if name != NON_FIELD_ERRORS and name not in exclude:
864 exclude.add(name)
865 try:
866 self.validate_unique(exclude=exclude)
867 except ValidationError as e:
868 errors = e.update_error_dict(errors)
869
870 # Run constraints checks, but only for fields that passed validation.
871 if validate_constraints:
872 for name in errors:
873 if name != NON_FIELD_ERRORS and name not in exclude:
874 exclude.add(name)
875 try:
876 self.validate_constraints(exclude=exclude)
877 except ValidationError as e:
878 errors = e.update_error_dict(errors)
879
880 if errors:
881 raise ValidationError(errors)
882
883 def clean_fields(self, exclude: set[str] | None = None) -> None:
884 """
885 Clean all fields and raise a ValidationError containing a dict
886 of all validation errors if any occur.
887 """
888 if exclude is None:
889 exclude = set()
890
891 errors = {}
892 for f in self._model_meta.fields:
893 if f.name in exclude:
894 continue
895 # Skip validation for empty fields with required=False. The developer
896 # is responsible for making sure they have a valid value.
897 raw_value = getattr(self, f.attname)
898 if not f.required and raw_value in f.empty_values:
899 continue
900 try:
901 setattr(self, f.attname, f.clean(raw_value, self))
902 except ValidationError as e:
903 errors[f.name] = e.error_list
904
905 if errors:
906 raise ValidationError(errors)
907
908 @classmethod
909 def preflight(cls) -> list[PreflightResult]:
910 errors = []
911
912 errors += [
913 *cls._check_fields(),
914 *cls._check_m2m_through_same_relationship(),
915 *cls._check_long_column_names(),
916 ]
917 clash_errors = (
918 *cls._check_id_field(),
919 *cls._check_field_name_clashes(),
920 *cls._check_model_name_db_lookup_clashes(),
921 *cls._check_property_name_related_field_accessor_clashes(),
922 *cls._check_single_primary_key(),
923 )
924 errors.extend(clash_errors)
925 # If there are field name clashes, hide consequent column name
926 # clashes.
927 if not clash_errors:
928 errors.extend(cls._check_column_name_clashes())
929 errors += [
930 *cls._check_indexes(),
931 *cls._check_ordering(),
932 *cls._check_constraints(),
933 *cls._check_db_table_comment(),
934 ]
935
936 return errors
937
938 @classmethod
939 def _check_db_table_comment(cls) -> list[PreflightResult]:
940 if not cls.model_options.db_table_comment:
941 return []
942 errors = []
943 if not (
944 db_connection.features.supports_comments
945 or "supports_comments" in cls.model_options.required_db_features
946 ):
947 errors.append(
948 PreflightResult(
949 fix=f"{db_connection.display_name} does not support comments on "
950 f"tables (db_table_comment).",
951 obj=cls,
952 id="models.db_table_comment_unsupported",
953 warning=True,
954 )
955 )
956 return errors
957
958 @classmethod
959 def _check_fields(cls) -> list[PreflightResult]:
960 """Perform all field checks."""
961 errors = []
962 for field in cls._model_meta.local_fields:
963 errors.extend(field.preflight(from_model=cls))
964 for field in cls._model_meta.local_many_to_many:
965 errors.extend(field.preflight(from_model=cls))
966 return errors
967
968 @classmethod
969 def _check_m2m_through_same_relationship(cls) -> list[PreflightResult]:
970 """Check if no relationship model is used by more than one m2m field."""
971
972 errors = []
973 seen_intermediary_signatures = []
974
975 fields = cls._model_meta.local_many_to_many
976
977 # Skip when the target model wasn't found.
978 fields = (f for f in fields if isinstance(f.remote_field.model, ModelBase))
979
980 # Skip when the relationship model wasn't found.
981 fields = (f for f in fields if isinstance(f.remote_field.through, ModelBase))
982
983 for f in fields:
984 signature = (
985 f.remote_field.model,
986 cls,
987 f.remote_field.through,
988 f.remote_field.through_fields,
989 )
990 if signature in seen_intermediary_signatures:
991 errors.append(
992 PreflightResult(
993 fix="The model has two identical many-to-many relations "
994 f"through the intermediate model '{f.remote_field.through.model_options.label}'.",
995 obj=cls,
996 id="models.duplicate_many_to_many_relations",
997 )
998 )
999 else:
1000 seen_intermediary_signatures.append(signature)
1001 return errors
1002
1003 @classmethod
1004 def _check_id_field(cls) -> list[PreflightResult]:
1005 """Disallow user-defined fields named ``id``."""
1006 if any(
1007 f
1008 for f in cls._model_meta.local_fields
1009 if f.name == "id" and not f.auto_created
1010 ):
1011 return [
1012 PreflightResult(
1013 fix="'id' is a reserved word that cannot be used as a field name.",
1014 obj=cls,
1015 id="models.reserved_field_name_id",
1016 )
1017 ]
1018 return []
1019
1020 @classmethod
1021 def _check_field_name_clashes(cls) -> list[PreflightResult]:
1022 """Forbid field shadowing in multi-table inheritance."""
1023 errors = []
1024 used_fields = {} # name or attname -> field
1025
1026 for f in cls._model_meta.local_fields:
1027 clash = used_fields.get(f.name) or used_fields.get(f.attname) or None
1028 # Note that we may detect clash between user-defined non-unique
1029 # field "id" and automatically added unique field "id", both
1030 # defined at the same model. This special case is considered in
1031 # _check_id_field and here we ignore it.
1032 id_conflict = (
1033 f.name == "id" and clash and clash.name == "id" and clash.model == cls
1034 )
1035 if clash and not id_conflict:
1036 errors.append(
1037 PreflightResult(
1038 fix=f"The field '{f.name}' clashes with the field '{clash.name}' "
1039 f"from model '{clash.model.model_options}'.",
1040 obj=f,
1041 id="models.field_name_clash",
1042 )
1043 )
1044 used_fields[f.name] = f
1045 used_fields[f.attname] = f
1046
1047 return errors
1048
1049 @classmethod
1050 def _check_column_name_clashes(cls) -> list[PreflightResult]:
1051 # Store a list of column names which have already been used by other fields.
1052 used_column_names = []
1053 errors = []
1054
1055 for f in cls._model_meta.local_fields:
1056 _, column_name = f.get_attname_column()
1057
1058 # Ensure the column name is not already in use.
1059 if column_name and column_name in used_column_names:
1060 errors.append(
1061 PreflightResult(
1062 fix=f"Field '{f.name}' has column name '{column_name}' that is used by "
1063 "another field. Specify a 'db_column' for the field.",
1064 obj=cls,
1065 id="models.db_column_clash",
1066 )
1067 )
1068 else:
1069 used_column_names.append(column_name)
1070
1071 return errors
1072
1073 @classmethod
1074 def _check_model_name_db_lookup_clashes(cls) -> list[PreflightResult]:
1075 errors = []
1076 model_name = cls.__name__
1077 if model_name.startswith("_") or model_name.endswith("_"):
1078 errors.append(
1079 PreflightResult(
1080 fix=f"The model name '{model_name}' cannot start or end with an underscore "
1081 "as it collides with the query lookup syntax.",
1082 obj=cls,
1083 id="models.model_name_underscore_bounds",
1084 )
1085 )
1086 elif LOOKUP_SEP in model_name:
1087 errors.append(
1088 PreflightResult(
1089 fix=f"The model name '{model_name}' cannot contain double underscores as "
1090 "it collides with the query lookup syntax.",
1091 obj=cls,
1092 id="models.model_name_double_underscore",
1093 )
1094 )
1095 return errors
1096
1097 @classmethod
1098 def _check_property_name_related_field_accessor_clashes(
1099 cls,
1100 ) -> list[PreflightResult]:
1101 errors = []
1102 property_names = cls._model_meta._property_names
1103 related_field_accessors = (
1104 f.get_attname()
1105 for f in cls._model_meta._get_fields(reverse=False)
1106 if f.is_relation and f.related_model is not None
1107 )
1108 for accessor in related_field_accessors:
1109 if accessor in property_names:
1110 errors.append(
1111 PreflightResult(
1112 fix=f"The property '{accessor}' clashes with a related field "
1113 "accessor.",
1114 obj=cls,
1115 id="models.property_related_field_clash",
1116 )
1117 )
1118 return errors
1119
1120 @classmethod
1121 def _check_single_primary_key(cls) -> list[PreflightResult]:
1122 errors = []
1123 if sum(1 for f in cls._model_meta.local_fields if f.primary_key) > 1:
1124 errors.append(
1125 PreflightResult(
1126 fix="The model cannot have more than one field with "
1127 "'primary_key=True'.",
1128 obj=cls,
1129 id="models.multiple_primary_keys",
1130 )
1131 )
1132 return errors
1133
1134 @classmethod
1135 def _check_indexes(cls) -> list[PreflightResult]:
1136 """Check fields, names, and conditions of indexes."""
1137 errors = []
1138 references = set()
1139 for index in cls.model_options.indexes:
1140 # Index name can't start with an underscore or a number, restricted
1141 # for cross-database compatibility with Oracle.
1142 if index.name[0] == "_" or index.name[0].isdigit():
1143 errors.append(
1144 PreflightResult(
1145 fix=f"The index name '{index.name}' cannot start with an underscore "
1146 "or a number.",
1147 obj=cls,
1148 id="models.index_name_invalid_start",
1149 ),
1150 )
1151 if len(index.name) > index.max_name_length:
1152 errors.append(
1153 PreflightResult(
1154 fix="The index name '%s' cannot be longer than %d " # noqa: UP031
1155 "characters." % (index.name, index.max_name_length),
1156 obj=cls,
1157 id="models.index_name_too_long",
1158 ),
1159 )
1160 if index.contains_expressions:
1161 for expression in index.expressions:
1162 references.update(
1163 ref[0] for ref in cls._get_expr_references(expression)
1164 )
1165 if not (
1166 db_connection.features.supports_partial_indexes
1167 or "supports_partial_indexes" in cls.model_options.required_db_features
1168 ) and any(index.condition is not None for index in cls.model_options.indexes):
1169 errors.append(
1170 PreflightResult(
1171 fix=f"{db_connection.display_name} does not support indexes with conditions. "
1172 "Conditions will be ignored. Silence this warning "
1173 "if you don't care about it.",
1174 warning=True,
1175 obj=cls,
1176 id="models.index_conditions_ignored",
1177 )
1178 )
1179 if not (
1180 db_connection.features.supports_covering_indexes
1181 or "supports_covering_indexes" in cls.model_options.required_db_features
1182 ) and any(index.include for index in cls.model_options.indexes):
1183 errors.append(
1184 PreflightResult(
1185 fix=f"{db_connection.display_name} does not support indexes with non-key columns. "
1186 "Non-key columns will be ignored. Silence this "
1187 "warning if you don't care about it.",
1188 warning=True,
1189 obj=cls,
1190 id="models.index_non_key_columns_ignored",
1191 )
1192 )
1193 if not (
1194 db_connection.features.supports_expression_indexes
1195 or "supports_expression_indexes" in cls.model_options.required_db_features
1196 ) and any(index.contains_expressions for index in cls.model_options.indexes):
1197 errors.append(
1198 PreflightResult(
1199 fix=f"{db_connection.display_name} does not support indexes on expressions. "
1200 "An index won't be created. Silence this warning "
1201 "if you don't care about it.",
1202 warning=True,
1203 obj=cls,
1204 id="models.index_on_foreign_key",
1205 )
1206 )
1207 fields = [
1208 field
1209 for index in cls.model_options.indexes
1210 for field, _ in index.fields_orders
1211 ]
1212 fields += [
1213 include for index in cls.model_options.indexes for include in index.include
1214 ]
1215 fields += references
1216 errors.extend(cls._check_local_fields(fields, "indexes"))
1217 return errors
1218
1219 @classmethod
1220 def _check_local_fields(
1221 cls, fields: Iterable[str], option: str
1222 ) -> list[PreflightResult]:
1223 from plain import models
1224
1225 # In order to avoid hitting the relation tree prematurely, we use our
1226 # own fields_map instead of using get_field()
1227 forward_fields_map = {}
1228 for field in cls._model_meta._get_fields(reverse=False):
1229 forward_fields_map[field.name] = field
1230 if hasattr(field, "attname"):
1231 forward_fields_map[field.attname] = field
1232
1233 errors = []
1234 for field_name in fields:
1235 try:
1236 field = forward_fields_map[field_name]
1237 except KeyError:
1238 errors.append(
1239 PreflightResult(
1240 fix=f"'{option}' refers to the nonexistent field '{field_name}'.",
1241 obj=cls,
1242 id="models.nonexistent_field_reference",
1243 )
1244 )
1245 else:
1246 if isinstance(field.remote_field, models.ManyToManyRel):
1247 errors.append(
1248 PreflightResult(
1249 fix=f"'{option}' refers to a ManyToManyField '{field_name}', but "
1250 f"ManyToManyFields are not permitted in '{option}'.",
1251 obj=cls,
1252 id="models.m2m_field_in_meta_option",
1253 )
1254 )
1255 elif field not in cls._model_meta.local_fields:
1256 errors.append(
1257 PreflightResult(
1258 fix=f"'{option}' refers to field '{field_name}' which is not local to model "
1259 f"'{cls.model_options.object_name}'. This issue may be caused by multi-table inheritance.",
1260 obj=cls,
1261 id="models.non_local_field_reference",
1262 )
1263 )
1264 return errors
1265
1266 @classmethod
1267 def _check_ordering(cls) -> list[PreflightResult]:
1268 """
1269 Check "ordering" option -- is it a list of strings and do all fields
1270 exist?
1271 """
1272
1273 if not cls.model_options.ordering:
1274 return []
1275
1276 if not isinstance(cls.model_options.ordering, list | tuple):
1277 return [
1278 PreflightResult(
1279 fix="'ordering' must be a tuple or list (even if you want to order by "
1280 "only one field).",
1281 obj=cls,
1282 id="models.ordering_not_tuple_or_list",
1283 )
1284 ]
1285
1286 errors = []
1287 fields = cls.model_options.ordering
1288
1289 # Skip expressions and '?' fields.
1290 fields = (f for f in fields if isinstance(f, str) and f != "?")
1291
1292 # Convert "-field" to "field".
1293 fields = (f.removeprefix("-") for f in fields)
1294
1295 # Separate related fields and non-related fields.
1296 _fields = []
1297 related_fields = []
1298 for f in fields:
1299 if LOOKUP_SEP in f:
1300 related_fields.append(f)
1301 else:
1302 _fields.append(f)
1303 fields = _fields
1304
1305 # Check related fields.
1306 for field in related_fields:
1307 _cls = cls
1308 fld = None
1309 for part in field.split(LOOKUP_SEP):
1310 try:
1311 fld = _cls._model_meta.get_field(part)
1312 if fld.is_relation:
1313 _cls = fld.path_infos[-1].to_meta.model
1314 else:
1315 _cls = None
1316 except (FieldDoesNotExist, AttributeError):
1317 if fld is None or (
1318 fld.get_transform(part) is None and fld.get_lookup(part) is None
1319 ):
1320 errors.append(
1321 PreflightResult(
1322 fix="'ordering' refers to the nonexistent field, "
1323 f"related field, or lookup '{field}'.",
1324 obj=cls,
1325 id="models.ordering_nonexistent_field",
1326 )
1327 )
1328
1329 # Check for invalid or nonexistent fields in ordering.
1330 invalid_fields = []
1331
1332 # Any field name that is not present in field_names does not exist.
1333 # Also, ordering by m2m fields is not allowed.
1334 meta = cls._model_meta
1335 valid_fields = set(
1336 chain.from_iterable(
1337 (f.name, f.attname)
1338 if not (f.auto_created and not f.concrete)
1339 else (f.field.related_query_name(),)
1340 for f in chain(meta.fields, meta.related_objects)
1341 )
1342 )
1343
1344 invalid_fields.extend(set(fields) - valid_fields)
1345
1346 for invalid_field in invalid_fields:
1347 errors.append(
1348 PreflightResult(
1349 fix="'ordering' refers to the nonexistent field, related "
1350 f"field, or lookup '{invalid_field}'.",
1351 obj=cls,
1352 id="models.ordering_nonexistent_field",
1353 )
1354 )
1355 return errors
1356
1357 @classmethod
1358 def _check_long_column_names(cls) -> list[PreflightResult]:
1359 """
1360 Check that any auto-generated column names are shorter than the limits
1361 for each database in which the model will be created.
1362 """
1363 errors = []
1364 allowed_len = None
1365
1366 max_name_length = db_connection.ops.max_name_length()
1367 if max_name_length is not None and not db_connection.features.truncates_names:
1368 allowed_len = max_name_length
1369
1370 if allowed_len is None:
1371 return errors
1372
1373 for f in cls._model_meta.local_fields:
1374 _, column_name = f.get_attname_column()
1375
1376 # Check if auto-generated name for the field is too long
1377 # for the database.
1378 if (
1379 f.db_column is None
1380 and column_name is not None
1381 and len(column_name) > allowed_len
1382 ):
1383 errors.append(
1384 PreflightResult(
1385 fix=f'Autogenerated column name too long for field "{column_name}". '
1386 f'Maximum length is "{allowed_len}" for the database. '
1387 "Set the column name manually using 'db_column'.",
1388 obj=cls,
1389 id="models.autogenerated_column_name_too_long",
1390 )
1391 )
1392
1393 for f in cls._model_meta.local_many_to_many:
1394 # Skip nonexistent models.
1395 if isinstance(f.remote_field.through, str):
1396 continue
1397
1398 # Check if auto-generated name for the M2M field is too long
1399 # for the database.
1400 for m2m in f.remote_field.through._model_meta.local_fields:
1401 _, rel_name = m2m.get_attname_column()
1402 if (
1403 m2m.db_column is None
1404 and rel_name is not None
1405 and len(rel_name) > allowed_len
1406 ):
1407 errors.append(
1408 PreflightResult(
1409 fix="Autogenerated column name too long for M2M field "
1410 f'"{rel_name}". Maximum length is "{allowed_len}" for the database. '
1411 "Use 'through' to create a separate model for "
1412 "M2M and then set column_name using 'db_column'.",
1413 obj=cls,
1414 id="models.m2m_column_name_too_long",
1415 )
1416 )
1417
1418 return errors
1419
1420 @classmethod
1421 def _get_expr_references(cls, expr: Any) -> Iterator[tuple[str, ...]]:
1422 if isinstance(expr, Q):
1423 for child in expr.children:
1424 if isinstance(child, tuple):
1425 lookup, value = child
1426 yield tuple(lookup.split(LOOKUP_SEP))
1427 yield from cls._get_expr_references(value)
1428 else:
1429 yield from cls._get_expr_references(child)
1430 elif isinstance(expr, F):
1431 yield tuple(expr.name.split(LOOKUP_SEP))
1432 elif hasattr(expr, "get_source_expressions"):
1433 for src_expr in expr.get_source_expressions():
1434 yield from cls._get_expr_references(src_expr)
1435
1436 @classmethod
1437 def _check_constraints(cls) -> list[PreflightResult]:
1438 errors = []
1439 if not (
1440 db_connection.features.supports_table_check_constraints
1441 or "supports_table_check_constraints"
1442 in cls.model_options.required_db_features
1443 ) and any(
1444 isinstance(constraint, CheckConstraint)
1445 for constraint in cls.model_options.constraints
1446 ):
1447 errors.append(
1448 PreflightResult(
1449 fix=f"{db_connection.display_name} does not support check constraints. "
1450 "A constraint won't be created. Silence this "
1451 "warning if you don't care about it.",
1452 obj=cls,
1453 id="models.constraint_on_non_db_field",
1454 warning=True,
1455 )
1456 )
1457
1458 if not (
1459 db_connection.features.supports_partial_indexes
1460 or "supports_partial_indexes" in cls.model_options.required_db_features
1461 ) and any(
1462 isinstance(constraint, UniqueConstraint)
1463 and constraint.condition is not None
1464 for constraint in cls.model_options.constraints
1465 ):
1466 errors.append(
1467 PreflightResult(
1468 fix=f"{db_connection.display_name} does not support unique constraints with "
1469 "conditions. A constraint won't be created. Silence this "
1470 "warning if you don't care about it.",
1471 obj=cls,
1472 id="models.constraint_on_virtual_field",
1473 warning=True,
1474 )
1475 )
1476
1477 if not (
1478 db_connection.features.supports_deferrable_unique_constraints
1479 or "supports_deferrable_unique_constraints"
1480 in cls.model_options.required_db_features
1481 ) and any(
1482 isinstance(constraint, UniqueConstraint)
1483 and constraint.deferrable is not None
1484 for constraint in cls.model_options.constraints
1485 ):
1486 errors.append(
1487 PreflightResult(
1488 fix=f"{db_connection.display_name} does not support deferrable unique constraints. "
1489 "A constraint won't be created. Silence this "
1490 "warning if you don't care about it.",
1491 obj=cls,
1492 id="models.constraint_on_foreign_key",
1493 warning=True,
1494 )
1495 )
1496
1497 if not (
1498 db_connection.features.supports_covering_indexes
1499 or "supports_covering_indexes" in cls.model_options.required_db_features
1500 ) and any(
1501 isinstance(constraint, UniqueConstraint) and constraint.include
1502 for constraint in cls.model_options.constraints
1503 ):
1504 errors.append(
1505 PreflightResult(
1506 fix=f"{db_connection.display_name} does not support unique constraints with non-key "
1507 "columns. A constraint won't be created. Silence this "
1508 "warning if you don't care about it.",
1509 obj=cls,
1510 id="models.constraint_on_m2m_field",
1511 warning=True,
1512 )
1513 )
1514
1515 if not (
1516 db_connection.features.supports_expression_indexes
1517 or "supports_expression_indexes" in cls.model_options.required_db_features
1518 ) and any(
1519 isinstance(constraint, UniqueConstraint) and constraint.contains_expressions
1520 for constraint in cls.model_options.constraints
1521 ):
1522 errors.append(
1523 PreflightResult(
1524 fix=f"{db_connection.display_name} does not support unique constraints on "
1525 "expressions. A constraint won't be created. Silence this "
1526 "warning if you don't care about it.",
1527 obj=cls,
1528 id="models.constraint_on_self_referencing_fk",
1529 warning=True,
1530 )
1531 )
1532 fields = set(
1533 chain.from_iterable(
1534 (*constraint.fields, *constraint.include)
1535 for constraint in cls.model_options.constraints
1536 if isinstance(constraint, UniqueConstraint)
1537 )
1538 )
1539 references = set()
1540 for constraint in cls.model_options.constraints:
1541 if isinstance(constraint, UniqueConstraint):
1542 if (
1543 db_connection.features.supports_partial_indexes
1544 or "supports_partial_indexes"
1545 not in cls.model_options.required_db_features
1546 ) and isinstance(constraint.condition, Q):
1547 references.update(cls._get_expr_references(constraint.condition))
1548 if (
1549 db_connection.features.supports_expression_indexes
1550 or "supports_expression_indexes"
1551 not in cls.model_options.required_db_features
1552 ) and constraint.contains_expressions:
1553 for expression in constraint.expressions:
1554 references.update(cls._get_expr_references(expression))
1555 elif isinstance(constraint, CheckConstraint):
1556 if (
1557 db_connection.features.supports_table_check_constraints
1558 or "supports_table_check_constraints"
1559 not in cls.model_options.required_db_features
1560 ):
1561 if isinstance(constraint.check, Q):
1562 references.update(cls._get_expr_references(constraint.check))
1563 if any(
1564 isinstance(expr, RawSQL) for expr in constraint.check.flatten()
1565 ):
1566 errors.append(
1567 PreflightResult(
1568 fix=f"Check constraint {constraint.name!r} contains "
1569 f"RawSQL() expression and won't be validated "
1570 f"during the model full_clean(). "
1571 "Silence this warning if you don't care about it.",
1572 warning=True,
1573 obj=cls,
1574 id="models.constraint_name_collision_autogenerated",
1575 ),
1576 )
1577 for field_name, *lookups in references:
1578 fields.add(field_name)
1579 if not lookups:
1580 # If it has no lookups it cannot result in a JOIN.
1581 continue
1582 try:
1583 field = cls._model_meta.get_field(field_name)
1584 if not field.is_relation or field.many_to_many or field.one_to_many:
1585 continue
1586 except FieldDoesNotExist:
1587 continue
1588 # JOIN must happen at the first lookup.
1589 first_lookup = lookups[0]
1590 if (
1591 hasattr(field, "get_transform")
1592 and hasattr(field, "get_lookup")
1593 and field.get_transform(first_lookup) is None
1594 and field.get_lookup(first_lookup) is None
1595 ):
1596 errors.append(
1597 PreflightResult(
1598 fix=f"'constraints' refers to the joined field '{LOOKUP_SEP.join([field_name] + lookups)}'.",
1599 obj=cls,
1600 id="models.constraint_refers_to_joined_field",
1601 )
1602 )
1603 errors.extend(cls._check_local_fields(fields, "constraints"))
1604 return errors
1605
1606
1607########
1608# MISC #
1609########
1610
1611
1612def model_unpickle(model_id: tuple[str, str] | type[Model]) -> Model:
1613 """Used to unpickle Model subclasses with deferred fields."""
1614 if isinstance(model_id, tuple):
1615 model = models_registry.get_model(*model_id)
1616 else:
1617 # Backwards compat - the model was cached directly in earlier versions.
1618 model = model_id
1619 return model.__new__(model)
1620
1621
1622model_unpickle.__safe_for_unpickle__ = True # type: ignore[attr-defined]