1from __future__ import annotations
2
3import copy
4import warnings
5from collections.abc import Iterable, Iterator, Sequence
6from itertools import chain
7from typing import TYPE_CHECKING, Any, ClassVar, dataclass_transform
8
9if TYPE_CHECKING:
10 from plain.models.meta import Meta
11 from plain.models.options import Options
12
13import plain.runtime
14from plain.exceptions import NON_FIELD_ERRORS, ValidationError
15from plain.models import models_registry, transaction, types
16from plain.models.constants import LOOKUP_SEP
17from plain.models.constraints import CheckConstraint, UniqueConstraint
18from plain.models.db import (
19 PLAIN_VERSION_PICKLE_KEY,
20 DatabaseError,
21 db_connection,
22)
23from plain.models.deletion import Collector
24from plain.models.exceptions import (
25 DoesNotExistDescriptor,
26 FieldDoesNotExist,
27 MultipleObjectsReturnedDescriptor,
28)
29from plain.models.expressions import RawSQL, Value
30from plain.models.fields import NOT_PROVIDED
31from plain.models.fields.reverse_related import ForeignObjectRel
32from plain.models.meta import Meta
33from plain.models.options import Options
34from plain.models.query import F, Q, QuerySet
35from plain.preflight import PreflightResult
36from plain.utils.encoding import force_str
37from plain.utils.hashable import make_hashable
38
39
40class Deferred:
41 def __repr__(self) -> str:
42 return "<Deferred field>"
43
44 def __str__(self) -> str:
45 return "<Deferred field>"
46
47
48DEFERRED = Deferred()
49
50
51@dataclass_transform(kw_only_default=True)
52class ModelBase(type):
53 """Metaclass for all models."""
54
55 def __new__(
56 cls, name: str, bases: tuple[type, ...], attrs: dict[str, Any], **kwargs: Any
57 ) -> type:
58 # Don't do any of this for the root models.Model class.
59 if not bases:
60 return super().__new__(cls, name, bases, attrs)
61
62 for base in bases:
63 # Models are required to directly inherit from model.Model, not a subclass of it.
64 if issubclass(base, Model) and base is not Model:
65 raise TypeError(
66 f"A model can't extend another model: {name} extends {base}"
67 )
68
69 return super().__new__(cls, name, bases, attrs, **kwargs)
70
71
72class ModelStateFieldsCacheDescriptor:
73 def __get__(
74 self, instance: ModelState | None, cls: type | None = None
75 ) -> ModelStateFieldsCacheDescriptor | dict[str, Any]:
76 if instance is None:
77 return self
78 res = instance.fields_cache = {}
79 return res
80
81
82class ModelState:
83 """Store model instance state."""
84
85 # If true, uniqueness validation checks will consider this a new, unsaved
86 # object. Necessary for correct validation of new instances of objects with
87 # explicit (non-auto) PKs. This impacts validation only; it has no effect
88 # on the actual save.
89 adding = True
90 fields_cache = ModelStateFieldsCacheDescriptor()
91
92
93class Model(metaclass=ModelBase):
94 # Every model gets an automatic id field
95 id: int = types.PrimaryKeyField()
96
97 # Descriptors for other model behavior
98 query: ClassVar[QuerySet[Model]] = QuerySet()
99 model_options = Options()
100 _model_meta = Meta()
101 DoesNotExist = DoesNotExistDescriptor()
102 MultipleObjectsReturned = MultipleObjectsReturnedDescriptor()
103
104 def __init__(self, **kwargs: Any):
105 # Alias some things as locals to avoid repeat global lookups
106 cls = self.__class__
107 meta = cls._model_meta
108 _setattr = setattr
109 _DEFERRED = DEFERRED
110
111 # Set up the storage for instance state
112 self._state = ModelState()
113
114 # Process all fields from kwargs or use defaults
115 for field in meta.fields:
116 is_related_object = False
117 # Virtual field
118 if field.attname not in kwargs and field.column is None:
119 continue
120 if isinstance(field.remote_field, ForeignObjectRel):
121 try:
122 # Assume object instance was passed in.
123 rel_obj = kwargs.pop(field.name)
124 is_related_object = True
125 except KeyError:
126 try:
127 # Object instance wasn't passed in -- must be an ID.
128 val = kwargs.pop(field.attname)
129 except KeyError:
130 val = field.get_default()
131 else:
132 try:
133 val = kwargs.pop(field.attname)
134 except KeyError:
135 # This is done with an exception rather than the
136 # default argument on pop because we don't want
137 # get_default() to be evaluated, and then not used.
138 # Refs #12057.
139 val = field.get_default()
140
141 if is_related_object:
142 # If we are passed a related instance, set it using the
143 # field.name instead of field.attname (e.g. "user" instead of
144 # "user_id") so that the object gets properly cached (and type
145 # checked) by the RelatedObjectDescriptor.
146 if rel_obj is not _DEFERRED:
147 _setattr(self, field.name, rel_obj)
148 else:
149 if val is not _DEFERRED:
150 _setattr(self, field.attname, val)
151
152 # Handle any remaining kwargs (properties or virtual fields)
153 property_names = meta._property_names
154 unexpected = ()
155 for prop, value in kwargs.items():
156 # Any remaining kwargs must correspond to properties or virtual
157 # fields.
158 if prop in property_names:
159 if value is not _DEFERRED:
160 _setattr(self, prop, value)
161 else:
162 try:
163 meta.get_field(prop)
164 except FieldDoesNotExist:
165 unexpected += (prop,)
166 else:
167 if value is not _DEFERRED:
168 _setattr(self, prop, value)
169 if unexpected:
170 unexpected_names = ", ".join(repr(n) for n in unexpected)
171 raise TypeError(
172 f"{cls.__name__}() got unexpected keyword arguments: {unexpected_names}"
173 )
174
175 super().__init__()
176
177 @classmethod
178 def from_db(cls, field_names: Iterable[str], values: Sequence[Any]) -> Model:
179 if len(values) != len(cls._model_meta.concrete_fields):
180 values_iter = iter(values)
181 values = [
182 next(values_iter) if f.attname in field_names else DEFERRED
183 for f in cls._model_meta.concrete_fields
184 ]
185 # Build kwargs dict from field names and values
186 field_dict = dict(
187 zip((f.attname for f in cls._model_meta.concrete_fields), values)
188 )
189 new = cls(**field_dict)
190 new._state.adding = False
191 return new
192
193 def __repr__(self) -> str:
194 return f"<{self.__class__.__name__}: {self}>"
195
196 def __str__(self) -> str:
197 return f"{self.__class__.__name__} object ({self.id})"
198
199 def __eq__(self, other: object) -> bool:
200 if not isinstance(other, Model):
201 return NotImplemented
202 if self.__class__ != other.__class__:
203 return False
204 my_id = self.id
205 if my_id is None:
206 return self is other
207 return my_id == other.id
208
209 def __hash__(self) -> int:
210 if self.id is None:
211 raise TypeError("Model instances without primary key value are unhashable")
212 return hash(self.id)
213
214 def __reduce__(self) -> tuple[Any, tuple[Any, ...], dict[str, Any]]:
215 data = self.__getstate__()
216 data[PLAIN_VERSION_PICKLE_KEY] = plain.runtime.__version__
217 class_id = (
218 self.model_options.package_label,
219 self.model_options.object_name,
220 )
221 return model_unpickle, (class_id,), data
222
223 def __getstate__(self) -> dict[str, Any]:
224 """Hook to allow choosing the attributes to pickle."""
225 state = self.__dict__.copy()
226 state["_state"] = copy.copy(state["_state"])
227 state["_state"].fields_cache = state["_state"].fields_cache.copy()
228 # memoryview cannot be pickled, so cast it to bytes and store
229 # separately.
230 _memoryview_attrs = []
231 for attr, value in state.items():
232 if isinstance(value, memoryview):
233 _memoryview_attrs.append((attr, bytes(value)))
234 if _memoryview_attrs:
235 state["_memoryview_attrs"] = _memoryview_attrs
236 for attr, value in _memoryview_attrs:
237 state.pop(attr)
238 return state
239
240 def __setstate__(self, state: dict[str, Any]) -> None:
241 pickled_version = state.get(PLAIN_VERSION_PICKLE_KEY)
242 if pickled_version:
243 if pickled_version != plain.runtime.__version__:
244 warnings.warn(
245 f"Pickled model instance's Plain version {pickled_version} does not "
246 f"match the current version {plain.runtime.__version__}.",
247 RuntimeWarning,
248 stacklevel=2,
249 )
250 else:
251 warnings.warn(
252 "Pickled model instance's Plain version is not specified.",
253 RuntimeWarning,
254 stacklevel=2,
255 )
256 if "_memoryview_attrs" in state:
257 for attr, value in state.pop("_memoryview_attrs"):
258 state[attr] = memoryview(value)
259 self.__dict__.update(state)
260
261 def get_deferred_fields(self) -> set[str]:
262 """
263 Return a set containing names of deferred fields for this instance.
264 """
265 return {
266 f.attname
267 for f in self._model_meta.concrete_fields
268 if f.attname not in self.__dict__
269 }
270
271 def refresh_from_db(self, fields: list[str] | None = None) -> None:
272 """
273 Reload field values from the database.
274
275 By default, the reloading happens from the database this instance was
276 loaded from, or by the read router if this instance wasn't loaded from
277 any database. The using parameter will override the default.
278
279 Fields can be used to specify which fields to reload. The fields
280 should be an iterable of field attnames. If fields is None, then
281 all non-deferred fields are reloaded.
282
283 When accessing deferred fields of an instance, the deferred loading
284 of the field will call this method.
285 """
286 if fields is None:
287 self._prefetched_objects_cache = {}
288 else:
289 prefetched_objects_cache = getattr(self, "_prefetched_objects_cache", ())
290 for field in fields:
291 if field in prefetched_objects_cache:
292 del prefetched_objects_cache[field] # type: ignore[misc]
293 fields.remove(field)
294 if not fields:
295 return
296 if any(LOOKUP_SEP in f for f in fields):
297 raise ValueError(
298 f'Found "{LOOKUP_SEP}" in fields argument. Relations and transforms '
299 "are not allowed in fields."
300 )
301
302 db_instance_qs = self._model_meta.base_queryset.filter(id=self.id)
303
304 # Use provided fields, if not set then reload all non-deferred fields.
305 deferred_fields = self.get_deferred_fields()
306 if fields is not None:
307 fields = list(fields)
308 db_instance_qs = db_instance_qs.only(*fields)
309 elif deferred_fields:
310 fields = [
311 f.attname
312 for f in self._model_meta.concrete_fields
313 if f.attname not in deferred_fields
314 ]
315 db_instance_qs = db_instance_qs.only(*fields)
316
317 db_instance = db_instance_qs.get()
318 non_loaded_fields = db_instance.get_deferred_fields()
319 for field in self._model_meta.concrete_fields:
320 if field.attname in non_loaded_fields:
321 # This field wasn't refreshed - skip ahead.
322 continue
323 setattr(self, field.attname, getattr(db_instance, field.attname))
324 # Clear cached foreign keys.
325 if field.is_relation and field.is_cached(self):
326 field.delete_cached_value(self)
327
328 # Clear cached relations.
329 for field in self._model_meta.related_objects:
330 if field.is_cached(self):
331 field.delete_cached_value(self)
332
333 def serializable_value(self, field_name: str) -> Any:
334 """
335 Return the value of the field name for this instance. If the field is
336 a foreign key, return the id value instead of the object. If there's
337 no Field object with this name on the model, return the model
338 attribute's value.
339
340 Used to serialize a field's value (in the serializer, or form output,
341 for example). Normally, you would just access the attribute directly
342 and not use this method.
343 """
344 try:
345 field = self._model_meta.get_field(field_name)
346 except FieldDoesNotExist:
347 return getattr(self, field_name)
348 return getattr(self, field.attname)
349
350 def save(
351 self,
352 *,
353 clean_and_validate: bool = True,
354 force_insert: bool = False,
355 force_update: bool = False,
356 update_fields: Iterable[str] | None = None,
357 ) -> None:
358 """
359 Save the current instance. Override this in a subclass if you want to
360 control the saving process.
361
362 The 'force_insert' and 'force_update' parameters can be used to insist
363 that the "save" must be an SQL insert or update (or equivalent for
364 non-SQL backends), respectively. Normally, they should not be set.
365 """
366 self._prepare_related_fields_for_save(operation_name="save")
367
368 if force_insert and (force_update or update_fields):
369 raise ValueError("Cannot force both insert and updating in model saving.")
370
371 deferred_fields = self.get_deferred_fields()
372 if update_fields is not None:
373 # If update_fields is empty, skip the save. We do also check for
374 # no-op saves later on for inheritance cases. This bailout is
375 # still needed for skipping signal sending.
376 if not update_fields:
377 return
378
379 update_fields = frozenset(update_fields)
380 field_names = self._model_meta._non_pk_concrete_field_names
381 non_model_fields = update_fields.difference(field_names)
382
383 if non_model_fields:
384 raise ValueError(
385 "The following fields do not exist in this model, are m2m "
386 "fields, or are non-concrete fields: {}".format(
387 ", ".join(non_model_fields)
388 )
389 )
390
391 # If this model is deferred, automatically do an "update_fields" save
392 # on the loaded fields.
393 elif not force_insert and deferred_fields:
394 field_names = set()
395 for field in self._model_meta.concrete_fields:
396 if not field.primary_key and not hasattr(field, "through"):
397 field_names.add(field.attname)
398 loaded_fields = field_names.difference(deferred_fields)
399 if loaded_fields:
400 update_fields = frozenset(loaded_fields)
401
402 if clean_and_validate:
403 self.full_clean(exclude=deferred_fields)
404
405 self.save_base(
406 force_insert=force_insert,
407 force_update=force_update,
408 update_fields=update_fields,
409 )
410
411 def save_base(
412 self,
413 *,
414 raw: bool = False,
415 force_insert: bool = False,
416 force_update: bool = False,
417 update_fields: Iterable[str] | None = None,
418 ) -> None:
419 """
420 Handle the parts of saving which should be done only once per save,
421 yet need to be done in raw saves, too. This includes some sanity
422 checks and signal sending.
423
424 The 'raw' argument is telling save_base not to save any parent
425 models and not to do any changes to the values before save. This
426 is used by fixture loading.
427 """
428 assert not (force_insert and (force_update or update_fields))
429 assert update_fields is None or update_fields
430 cls = self.__class__
431
432 with transaction.mark_for_rollback_on_error():
433 self._save_table(
434 raw=raw,
435 cls=cls,
436 force_insert=force_insert,
437 force_update=force_update,
438 update_fields=update_fields,
439 )
440 # Once saved, this is no longer a to-be-added instance.
441 self._state.adding = False
442
443 def _save_table(
444 self,
445 *,
446 raw: bool,
447 cls: type[Model],
448 force_insert: bool = False,
449 force_update: bool = False,
450 update_fields: Iterable[str] | None = None,
451 ) -> bool:
452 """
453 Do the heavy-lifting involved in saving. Update or insert the data
454 for a single table.
455 """
456 meta = cls._model_meta
457 non_pks = [f for f in meta.local_concrete_fields if not f.primary_key]
458
459 if update_fields:
460 non_pks = [
461 f
462 for f in non_pks
463 if f.name in update_fields or f.attname in update_fields
464 ]
465
466 id_val = self.id
467 if id_val is None:
468 id_field = meta.get_field("id")
469 id_val = id_field.get_id_value_on_save(self)
470 setattr(self, id_field.attname, id_val)
471 id_set = id_val is not None
472 if not id_set and (force_update or update_fields):
473 raise ValueError("Cannot force an update in save() with no primary key.")
474 updated = False
475 # Skip an UPDATE when adding an instance and primary key has a default.
476 if (
477 not raw
478 and not force_insert
479 and self._state.adding
480 and meta.get_field("id").default
481 and meta.get_field("id").default is not NOT_PROVIDED
482 ):
483 force_insert = True
484 # If possible, try an UPDATE. If that doesn't update anything, do an INSERT.
485 if id_set and not force_insert:
486 base_qs = meta.base_queryset
487 values = [
488 (
489 f,
490 None,
491 (getattr(self, f.attname) if raw else f.pre_save(self, False)),
492 )
493 for f in non_pks
494 ]
495 forced_update = bool(update_fields or force_update)
496 updated = self._do_update(
497 base_qs, id_val, values, update_fields, forced_update
498 )
499 if force_update and not updated:
500 raise DatabaseError("Forced update did not affect any rows.")
501 if update_fields and not updated:
502 raise DatabaseError("Save with update_fields did not affect any rows.")
503 if not updated:
504 fields = meta.local_concrete_fields
505 if not id_set:
506 id_field = meta.get_field("id")
507 fields = [f for f in fields if f is not id_field]
508
509 returning_fields = meta.db_returning_fields
510 results = self._do_insert(meta.base_queryset, fields, returning_fields, raw)
511 if results:
512 for value, field in zip(results[0], returning_fields):
513 setattr(self, field.attname, value)
514 return updated
515
516 def _do_update(
517 self,
518 base_qs: QuerySet,
519 id_val: Any,
520 values: list[tuple[Any, Any, Any]],
521 update_fields: Iterable[str] | None,
522 forced_update: bool,
523 ) -> bool:
524 """
525 Try to update the model. Return True if the model was updated (if an
526 update query was done and a matching row was found in the DB).
527 """
528 filtered = base_qs.filter(id=id_val)
529 if not values:
530 # We can end up here when saving a model in inheritance chain where
531 # update_fields doesn't target any field in current model. In that
532 # case we just say the update succeeded. Another case ending up here
533 # is a model with just PK - in that case check that the PK still
534 # exists.
535 return update_fields is not None or filtered.exists()
536 return filtered._update(values) > 0
537
538 def _do_insert(
539 self,
540 manager: QuerySet,
541 fields: Sequence[Any],
542 returning_fields: Sequence[Any],
543 raw: bool,
544 ) -> list[Any]:
545 """
546 Do an INSERT. If returning_fields is defined then this method should
547 return the newly created data for the model.
548 """
549 return manager._insert( # type: ignore[return-value, arg-type]
550 [self],
551 fields=fields, # type: ignore[arg-type]
552 returning_fields=returning_fields, # type: ignore[arg-type]
553 raw=raw,
554 )
555
556 def _prepare_related_fields_for_save(
557 self, operation_name: str, fields: Sequence[Any] | None = None
558 ) -> None:
559 # Ensure that a model instance without a PK hasn't been assigned to
560 # a ForeignKey on this model. If the field is nullable, allowing the save would result in silent data loss.
561 for field in self._model_meta.concrete_fields:
562 if fields and field not in fields:
563 continue
564 # If the related field isn't cached, then an instance hasn't been
565 # assigned and there's no need to worry about this check.
566 if field.is_relation and field.is_cached(self):
567 obj = getattr(self, field.name, None)
568 if not obj:
569 continue
570 # A pk may have been assigned manually to a model instance not
571 # saved to the database (or auto-generated in a case like
572 # UUIDField), but we allow the save to proceed and rely on the
573 # database to raise an IntegrityError if applicable. If
574 # constraints aren't supported by the database, there's the
575 # unavoidable risk of data corruption.
576 if obj.id is None:
577 # Remove the object from a related instance cache.
578 if not field.remote_field.multiple:
579 field.remote_field.delete_cached_value(obj)
580 raise ValueError(
581 f"{operation_name}() prohibited to prevent data loss due to unsaved "
582 f"related object '{field.name}'."
583 )
584 elif getattr(self, field.attname) in field.empty_values:
585 # Set related object if it has been saved after an
586 # assignment.
587 setattr(self, field.name, obj)
588 # If the relationship's pk/to_field was changed, clear the
589 # cached relationship.
590 if getattr(obj, field.target_field.attname) != getattr(
591 self, field.attname
592 ):
593 field.delete_cached_value(self)
594
595 def delete(self) -> tuple[int, dict[str, int]]:
596 if self.id is None:
597 raise ValueError(
598 f"{self.model_options.object_name} object can't be deleted because its id attribute is set "
599 "to None."
600 )
601 collector = Collector(origin=self)
602 collector.collect([self])
603 return collector.delete()
604
605 def get_field_display(self, field_name: str) -> str:
606 """Get the display value for a field, especially useful for fields with choices."""
607 # Get the field object from the field name
608 field = self._model_meta.get_field(field_name)
609 value = getattr(self, field.attname)
610
611 # If field has no choices, just return the value as string
612 if not hasattr(field, "flatchoices") or not field.flatchoices:
613 return force_str(value, strings_only=True)
614
615 # For fields with choices, look up the display value
616 choices_dict = dict(make_hashable(field.flatchoices))
617 return force_str(
618 choices_dict.get(make_hashable(value), value), strings_only=True
619 )
620
621 def _get_field_value_map(
622 self, meta: Meta | None, exclude: set[str] | None = None
623 ) -> dict[str, Value]:
624 if exclude is None:
625 exclude = set()
626 meta = meta or self._model_meta
627 return {
628 field.name: Value(getattr(self, field.attname), field)
629 for field in meta.local_concrete_fields
630 if field.name not in exclude
631 }
632
633 def prepare_database_save(self, field: Any) -> Any:
634 if self.id is None:
635 raise ValueError(
636 f"Unsaved model instance {self!r} cannot be used in an ORM query."
637 )
638 return getattr(self, field.remote_field.get_related_field().attname)
639
640 def clean(self) -> None:
641 """
642 Hook for doing any extra model-wide validation after clean() has been
643 called on every field by self.clean_fields. Any ValidationError raised
644 by this method will not be associated with a particular field; it will
645 have a special-case association with the field defined by NON_FIELD_ERRORS.
646 """
647 pass
648
649 def validate_unique(self, exclude: set[str] | None = None) -> None:
650 """
651 Check unique constraints on the model and raise ValidationError if any
652 failed.
653 """
654 unique_checks = self._get_unique_checks(exclude=exclude)
655
656 if errors := self._perform_unique_checks(unique_checks):
657 raise ValidationError(errors)
658
659 def _get_unique_checks(
660 self, exclude: set[str] | None = None
661 ) -> list[tuple[type[Model], tuple[str, ...]]]:
662 """
663 Return a list of checks to perform. Since validate_unique() could be
664 called from a ModelForm, some fields may have been excluded; we can't
665 perform a unique check on a model that is missing fields involved
666 in that check. Fields that did not validate should also be excluded,
667 but they need to be passed in via the exclude argument.
668 """
669 if exclude is None:
670 exclude = set()
671 unique_checks = []
672
673 # Gather a list of checks for fields declared as unique and add them to
674 # the list of checks.
675
676 fields_with_class = [(self.__class__, self._model_meta.local_fields)]
677
678 for model_class, fields in fields_with_class:
679 for f in fields:
680 name = f.name
681 if name in exclude:
682 continue
683 if f.primary_key:
684 unique_checks.append((model_class, (name,)))
685
686 return unique_checks
687
688 def _perform_unique_checks(
689 self, unique_checks: list[tuple[type[Model], tuple[str, ...]]]
690 ) -> dict[str, list[ValidationError]]:
691 errors = {}
692
693 for model_class, unique_check in unique_checks:
694 # Try to look up an existing object with the same values as this
695 # object's values for all the unique field.
696
697 lookup_kwargs = {}
698 for field_name in unique_check:
699 f = self._model_meta.get_field(field_name)
700 lookup_value = getattr(self, f.attname)
701 # TODO: Handle multiple backends with different feature flags.
702 if lookup_value is None:
703 # no value, skip the lookup
704 continue
705 if f.primary_key and not self._state.adding:
706 # no need to check for unique primary key when editing
707 continue
708 lookup_kwargs[str(field_name)] = lookup_value
709
710 # some fields were skipped, no reason to do the check
711 if len(unique_check) != len(lookup_kwargs):
712 continue
713
714 qs = model_class.query.filter(**lookup_kwargs) # type: ignore[attr-defined]
715
716 # Exclude the current object from the query if we are editing an
717 # instance (as opposed to creating a new one)
718 # Use the primary key defined by model_class. In previous versions
719 # this could differ from `self.id` due to model inheritance.
720 model_class_id = getattr(self, "id")
721 if not self._state.adding and model_class_id is not None:
722 qs = qs.exclude(id=model_class_id)
723 if qs.exists():
724 if len(unique_check) == 1:
725 key = unique_check[0]
726 else:
727 key = NON_FIELD_ERRORS
728 errors.setdefault(key, []).append(
729 self.unique_error_message(model_class, unique_check)
730 )
731
732 return errors
733
734 def unique_error_message(
735 self, model_class: type[Model], unique_check: tuple[str, ...]
736 ) -> ValidationError:
737 meta = model_class._model_meta
738
739 params = {
740 "model": self,
741 "model_class": model_class,
742 "model_name": model_class.model_options.model_name,
743 "unique_check": unique_check,
744 }
745
746 if len(unique_check) == 1:
747 field = meta.get_field(unique_check[0])
748 params["field_label"] = field.name
749 return ValidationError(
750 message=field.error_messages["unique"],
751 code="unique",
752 params=params,
753 )
754 else:
755 field_names = [meta.get_field(f).name for f in unique_check]
756
757 # Put an "and" before the last one
758 field_names[-1] = f"and {field_names[-1]}"
759
760 if len(field_names) > 2:
761 # Comma join if more than 2
762 params["field_label"] = ", ".join(field_names)
763 else:
764 # Just a space if there are only 2
765 params["field_label"] = " ".join(field_names)
766
767 # Use the first field as the message format...
768 message = meta.get_field(unique_check[0]).error_messages["unique"]
769
770 return ValidationError(
771 message=message,
772 code="unique",
773 params=params,
774 )
775
776 def get_constraints(self) -> list[tuple[type, list[Any]]]:
777 constraints = [(self.__class__, list(self.model_options.constraints))]
778 return constraints
779
780 def validate_constraints(self, exclude: set[str] | None = None) -> None:
781 constraints = self.get_constraints()
782
783 errors = {}
784 for model_class, model_constraints in constraints:
785 for constraint in model_constraints:
786 try:
787 constraint.validate(model_class, self, exclude=exclude)
788 except ValidationError as e:
789 if (
790 getattr(e, "code", None) == "unique"
791 and len(constraint.fields) == 1
792 ):
793 errors.setdefault(constraint.fields[0], []).append(e)
794 else:
795 errors = e.update_error_dict(errors)
796 if errors:
797 raise ValidationError(errors)
798
799 def full_clean(
800 self,
801 *,
802 exclude: set[str] | Iterable[str] | None = None,
803 validate_unique: bool = True,
804 validate_constraints: bool = True,
805 ) -> None:
806 """
807 Call clean_fields(), clean(), validate_unique(), and
808 validate_constraints() on the model. Raise a ValidationError for any
809 errors that occur.
810 """
811 errors = {}
812 if exclude is None:
813 exclude = set()
814 else:
815 exclude = set(exclude)
816
817 try:
818 self.clean_fields(exclude=exclude)
819 except ValidationError as e:
820 errors = e.update_error_dict(errors)
821
822 # Form.clean() is run even if other validation fails, so do the
823 # same with Model.clean() for consistency.
824 try:
825 self.clean()
826 except ValidationError as e:
827 errors = e.update_error_dict(errors)
828
829 # Run unique checks, but only for fields that passed validation.
830 if validate_unique:
831 for name in errors:
832 if name != NON_FIELD_ERRORS and name not in exclude:
833 exclude.add(name)
834 try:
835 self.validate_unique(exclude=exclude)
836 except ValidationError as e:
837 errors = e.update_error_dict(errors)
838
839 # Run constraints checks, but only for fields that passed validation.
840 if validate_constraints:
841 for name in errors:
842 if name != NON_FIELD_ERRORS and name not in exclude:
843 exclude.add(name)
844 try:
845 self.validate_constraints(exclude=exclude)
846 except ValidationError as e:
847 errors = e.update_error_dict(errors)
848
849 if errors:
850 raise ValidationError(errors)
851
852 def clean_fields(self, exclude: set[str] | None = None) -> None:
853 """
854 Clean all fields and raise a ValidationError containing a dict
855 of all validation errors if any occur.
856 """
857 if exclude is None:
858 exclude = set()
859
860 errors = {}
861 for f in self._model_meta.fields:
862 if f.name in exclude:
863 continue
864 # Skip validation for empty fields with required=False. The developer
865 # is responsible for making sure they have a valid value.
866 raw_value = getattr(self, f.attname)
867 if not f.required and raw_value in f.empty_values:
868 continue
869 try:
870 setattr(self, f.attname, f.clean(raw_value, self))
871 except ValidationError as e:
872 errors[f.name] = e.error_list
873
874 if errors:
875 raise ValidationError(errors)
876
877 @classmethod
878 def preflight(cls) -> list[PreflightResult]:
879 errors = []
880
881 errors += [
882 *cls._check_fields(),
883 *cls._check_m2m_through_same_relationship(),
884 *cls._check_long_column_names(),
885 ]
886 clash_errors = (
887 *cls._check_id_field(),
888 *cls._check_field_name_clashes(),
889 *cls._check_model_name_db_lookup_clashes(),
890 *cls._check_property_name_related_field_accessor_clashes(),
891 *cls._check_single_primary_key(),
892 )
893 errors.extend(clash_errors)
894 # If there are field name clashes, hide consequent column name
895 # clashes.
896 if not clash_errors:
897 errors.extend(cls._check_column_name_clashes())
898 errors += [
899 *cls._check_indexes(),
900 *cls._check_ordering(),
901 *cls._check_constraints(),
902 *cls._check_db_table_comment(),
903 ]
904
905 return errors
906
907 @classmethod
908 def _check_db_table_comment(cls) -> list[PreflightResult]:
909 if not cls.model_options.db_table_comment:
910 return []
911 errors = []
912 if not (
913 db_connection.features.supports_comments
914 or "supports_comments" in cls.model_options.required_db_features
915 ):
916 errors.append(
917 PreflightResult(
918 fix=f"{db_connection.display_name} does not support comments on "
919 f"tables (db_table_comment).",
920 obj=cls,
921 id="models.db_table_comment_unsupported",
922 warning=True,
923 )
924 )
925 return errors
926
927 @classmethod
928 def _check_fields(cls) -> list[PreflightResult]:
929 """Perform all field checks."""
930 errors = []
931 for field in cls._model_meta.local_fields:
932 errors.extend(field.preflight(from_model=cls))
933 for field in cls._model_meta.local_many_to_many:
934 errors.extend(field.preflight(from_model=cls))
935 return errors
936
937 @classmethod
938 def _check_m2m_through_same_relationship(cls) -> list[PreflightResult]:
939 """Check if no relationship model is used by more than one m2m field."""
940
941 errors = []
942 seen_intermediary_signatures = []
943
944 fields = cls._model_meta.local_many_to_many
945
946 # Skip when the target model wasn't found.
947 fields = (f for f in fields if isinstance(f.remote_field.model, ModelBase))
948
949 # Skip when the relationship model wasn't found.
950 fields = (f for f in fields if isinstance(f.remote_field.through, ModelBase))
951
952 for f in fields:
953 signature = (
954 f.remote_field.model,
955 cls,
956 f.remote_field.through,
957 f.remote_field.through_fields,
958 )
959 if signature in seen_intermediary_signatures:
960 errors.append(
961 PreflightResult(
962 fix="The model has two identical many-to-many relations "
963 f"through the intermediate model '{f.remote_field.through.model_options.label}'.",
964 obj=cls,
965 id="models.duplicate_many_to_many_relations",
966 )
967 )
968 else:
969 seen_intermediary_signatures.append(signature)
970 return errors
971
972 @classmethod
973 def _check_id_field(cls) -> list[PreflightResult]:
974 """Disallow user-defined fields named ``id``."""
975 if any(
976 f
977 for f in cls._model_meta.local_fields
978 if f.name == "id" and not f.auto_created
979 ):
980 return [
981 PreflightResult(
982 fix="'id' is a reserved word that cannot be used as a field name.",
983 obj=cls,
984 id="models.reserved_field_name_id",
985 )
986 ]
987 return []
988
989 @classmethod
990 def _check_field_name_clashes(cls) -> list[PreflightResult]:
991 """Forbid field shadowing in multi-table inheritance."""
992 errors = []
993 used_fields = {} # name or attname -> field
994
995 for f in cls._model_meta.local_fields:
996 clash = used_fields.get(f.name) or used_fields.get(f.attname) or None
997 # Note that we may detect clash between user-defined non-unique
998 # field "id" and automatically added unique field "id", both
999 # defined at the same model. This special case is considered in
1000 # _check_id_field and here we ignore it.
1001 id_conflict = (
1002 f.name == "id" and clash and clash.name == "id" and clash.model == cls
1003 )
1004 if clash and not id_conflict:
1005 errors.append(
1006 PreflightResult(
1007 fix=f"The field '{f.name}' clashes with the field '{clash.name}' "
1008 f"from model '{clash.model.model_options}'.",
1009 obj=f,
1010 id="models.field_name_clash",
1011 )
1012 )
1013 used_fields[f.name] = f
1014 used_fields[f.attname] = f
1015
1016 return errors
1017
1018 @classmethod
1019 def _check_column_name_clashes(cls) -> list[PreflightResult]:
1020 # Store a list of column names which have already been used by other fields.
1021 used_column_names = []
1022 errors = []
1023
1024 for f in cls._model_meta.local_fields:
1025 _, column_name = f.get_attname_column()
1026
1027 # Ensure the column name is not already in use.
1028 if column_name and column_name in used_column_names:
1029 errors.append(
1030 PreflightResult(
1031 fix=f"Field '{f.name}' has column name '{column_name}' that is used by "
1032 "another field. Specify a 'db_column' for the field.",
1033 obj=cls,
1034 id="models.db_column_clash",
1035 )
1036 )
1037 else:
1038 used_column_names.append(column_name)
1039
1040 return errors
1041
1042 @classmethod
1043 def _check_model_name_db_lookup_clashes(cls) -> list[PreflightResult]:
1044 errors = []
1045 model_name = cls.__name__
1046 if model_name.startswith("_") or model_name.endswith("_"):
1047 errors.append(
1048 PreflightResult(
1049 fix=f"The model name '{model_name}' cannot start or end with an underscore "
1050 "as it collides with the query lookup syntax.",
1051 obj=cls,
1052 id="models.model_name_underscore_bounds",
1053 )
1054 )
1055 elif LOOKUP_SEP in model_name:
1056 errors.append(
1057 PreflightResult(
1058 fix=f"The model name '{model_name}' cannot contain double underscores as "
1059 "it collides with the query lookup syntax.",
1060 obj=cls,
1061 id="models.model_name_double_underscore",
1062 )
1063 )
1064 return errors
1065
1066 @classmethod
1067 def _check_property_name_related_field_accessor_clashes(
1068 cls,
1069 ) -> list[PreflightResult]:
1070 errors = []
1071 property_names = cls._model_meta._property_names
1072 related_field_accessors = (
1073 f.get_attname()
1074 for f in cls._model_meta._get_fields(reverse=False)
1075 if f.is_relation and f.related_model is not None
1076 )
1077 for accessor in related_field_accessors:
1078 if accessor in property_names:
1079 errors.append(
1080 PreflightResult(
1081 fix=f"The property '{accessor}' clashes with a related field "
1082 "accessor.",
1083 obj=cls,
1084 id="models.property_related_field_clash",
1085 )
1086 )
1087 return errors
1088
1089 @classmethod
1090 def _check_single_primary_key(cls) -> list[PreflightResult]:
1091 errors = []
1092 if sum(1 for f in cls._model_meta.local_fields if f.primary_key) > 1:
1093 errors.append(
1094 PreflightResult(
1095 fix="The model cannot have more than one field with "
1096 "'primary_key=True'.",
1097 obj=cls,
1098 id="models.multiple_primary_keys",
1099 )
1100 )
1101 return errors
1102
1103 @classmethod
1104 def _check_indexes(cls) -> list[PreflightResult]:
1105 """Check fields, names, and conditions of indexes."""
1106 errors = []
1107 references = set()
1108 for index in cls.model_options.indexes:
1109 # Index name can't start with an underscore or a number, restricted
1110 # for cross-database compatibility with Oracle.
1111 if index.name[0] == "_" or index.name[0].isdigit():
1112 errors.append(
1113 PreflightResult(
1114 fix=f"The index name '{index.name}' cannot start with an underscore "
1115 "or a number.",
1116 obj=cls,
1117 id="models.index_name_invalid_start",
1118 ),
1119 )
1120 if len(index.name) > index.max_name_length:
1121 errors.append(
1122 PreflightResult(
1123 fix="The index name '%s' cannot be longer than %d " # noqa: UP031
1124 "characters." % (index.name, index.max_name_length),
1125 obj=cls,
1126 id="models.index_name_too_long",
1127 ),
1128 )
1129 if index.contains_expressions:
1130 for expression in index.expressions:
1131 references.update(
1132 ref[0] for ref in cls._get_expr_references(expression)
1133 )
1134 if not (
1135 db_connection.features.supports_partial_indexes
1136 or "supports_partial_indexes" in cls.model_options.required_db_features
1137 ) and any(index.condition is not None for index in cls.model_options.indexes):
1138 errors.append(
1139 PreflightResult(
1140 fix=f"{db_connection.display_name} does not support indexes with conditions. "
1141 "Conditions will be ignored. Silence this warning "
1142 "if you don't care about it.",
1143 warning=True,
1144 obj=cls,
1145 id="models.index_conditions_ignored",
1146 )
1147 )
1148 if not (
1149 db_connection.features.supports_covering_indexes
1150 or "supports_covering_indexes" in cls.model_options.required_db_features
1151 ) and any(index.include for index in cls.model_options.indexes):
1152 errors.append(
1153 PreflightResult(
1154 fix=f"{db_connection.display_name} does not support indexes with non-key columns. "
1155 "Non-key columns will be ignored. Silence this "
1156 "warning if you don't care about it.",
1157 warning=True,
1158 obj=cls,
1159 id="models.index_non_key_columns_ignored",
1160 )
1161 )
1162 if not (
1163 db_connection.features.supports_expression_indexes
1164 or "supports_expression_indexes" in cls.model_options.required_db_features
1165 ) and any(index.contains_expressions for index in cls.model_options.indexes):
1166 errors.append(
1167 PreflightResult(
1168 fix=f"{db_connection.display_name} does not support indexes on expressions. "
1169 "An index won't be created. Silence this warning "
1170 "if you don't care about it.",
1171 warning=True,
1172 obj=cls,
1173 id="models.index_on_foreign_key",
1174 )
1175 )
1176 fields = [
1177 field
1178 for index in cls.model_options.indexes
1179 for field, _ in index.fields_orders
1180 ]
1181 fields += [
1182 include for index in cls.model_options.indexes for include in index.include
1183 ]
1184 fields += references
1185 errors.extend(cls._check_local_fields(fields, "indexes"))
1186 return errors
1187
1188 @classmethod
1189 def _check_local_fields(
1190 cls, fields: Iterable[str], option: str
1191 ) -> list[PreflightResult]:
1192 from plain.models.fields.reverse_related import ManyToManyRel
1193
1194 # In order to avoid hitting the relation tree prematurely, we use our
1195 # own fields_map instead of using get_field()
1196 forward_fields_map = {}
1197 for field in cls._model_meta._get_fields(reverse=False):
1198 forward_fields_map[field.name] = field
1199 if hasattr(field, "attname"):
1200 forward_fields_map[field.attname] = field
1201
1202 errors = []
1203 for field_name in fields:
1204 try:
1205 field = forward_fields_map[field_name]
1206 except KeyError:
1207 errors.append(
1208 PreflightResult(
1209 fix=f"'{option}' refers to the nonexistent field '{field_name}'.",
1210 obj=cls,
1211 id="models.nonexistent_field_reference",
1212 )
1213 )
1214 else:
1215 if isinstance(field.remote_field, ManyToManyRel):
1216 errors.append(
1217 PreflightResult(
1218 fix=f"'{option}' refers to a ManyToManyField '{field_name}', but "
1219 f"ManyToManyFields are not permitted in '{option}'.",
1220 obj=cls,
1221 id="models.m2m_field_in_meta_option",
1222 )
1223 )
1224 elif field not in cls._model_meta.local_fields:
1225 errors.append(
1226 PreflightResult(
1227 fix=f"'{option}' refers to field '{field_name}' which is not local to model "
1228 f"'{cls.model_options.object_name}'. This issue may be caused by multi-table inheritance.",
1229 obj=cls,
1230 id="models.non_local_field_reference",
1231 )
1232 )
1233 return errors
1234
1235 @classmethod
1236 def _check_ordering(cls) -> list[PreflightResult]:
1237 """
1238 Check "ordering" option -- is it a list of strings and do all fields
1239 exist?
1240 """
1241
1242 if not cls.model_options.ordering:
1243 return []
1244
1245 if not isinstance(cls.model_options.ordering, list | tuple):
1246 return [
1247 PreflightResult(
1248 fix="'ordering' must be a tuple or list (even if you want to order by "
1249 "only one field).",
1250 obj=cls,
1251 id="models.ordering_not_tuple_or_list",
1252 )
1253 ]
1254
1255 errors = []
1256 fields = cls.model_options.ordering
1257
1258 # Skip expressions and '?' fields.
1259 fields = (f for f in fields if isinstance(f, str) and f != "?")
1260
1261 # Convert "-field" to "field".
1262 fields = (f.removeprefix("-") for f in fields)
1263
1264 # Separate related fields and non-related fields.
1265 _fields = []
1266 related_fields = []
1267 for f in fields:
1268 if LOOKUP_SEP in f:
1269 related_fields.append(f)
1270 else:
1271 _fields.append(f)
1272 fields = _fields
1273
1274 # Check related fields.
1275 for field in related_fields:
1276 _cls = cls
1277 fld = None
1278 for part in field.split(LOOKUP_SEP):
1279 try:
1280 fld = _cls._model_meta.get_field(part)
1281 if fld.is_relation:
1282 _cls = fld.path_infos[-1].to_meta.model
1283 else:
1284 _cls = None
1285 except (FieldDoesNotExist, AttributeError):
1286 if fld is None or (
1287 fld.get_transform(part) is None and fld.get_lookup(part) is None
1288 ):
1289 errors.append(
1290 PreflightResult(
1291 fix="'ordering' refers to the nonexistent field, "
1292 f"related field, or lookup '{field}'.",
1293 obj=cls,
1294 id="models.ordering_nonexistent_field",
1295 )
1296 )
1297
1298 # Check for invalid or nonexistent fields in ordering.
1299 invalid_fields = []
1300
1301 # Any field name that is not present in field_names does not exist.
1302 # Also, ordering by m2m fields is not allowed.
1303 meta = cls._model_meta
1304 valid_fields = set(
1305 chain.from_iterable(
1306 (f.name, f.attname)
1307 if not (f.auto_created and not f.concrete)
1308 else (f.field.related_query_name(),)
1309 for f in chain(meta.fields, meta.related_objects)
1310 )
1311 )
1312
1313 invalid_fields.extend(set(fields) - valid_fields)
1314
1315 for invalid_field in invalid_fields:
1316 errors.append(
1317 PreflightResult(
1318 fix="'ordering' refers to the nonexistent field, related "
1319 f"field, or lookup '{invalid_field}'.",
1320 obj=cls,
1321 id="models.ordering_nonexistent_field",
1322 )
1323 )
1324 return errors
1325
1326 @classmethod
1327 def _check_long_column_names(cls) -> list[PreflightResult]:
1328 """
1329 Check that any auto-generated column names are shorter than the limits
1330 for each database in which the model will be created.
1331 """
1332 errors = []
1333 allowed_len = None
1334
1335 max_name_length = db_connection.ops.max_name_length()
1336 if max_name_length is not None and not db_connection.features.truncates_names:
1337 allowed_len = max_name_length
1338
1339 if allowed_len is None:
1340 return errors
1341
1342 for f in cls._model_meta.local_fields:
1343 _, column_name = f.get_attname_column()
1344
1345 # Check if auto-generated name for the field is too long
1346 # for the database.
1347 if (
1348 f.db_column is None
1349 and column_name is not None
1350 and len(column_name) > allowed_len
1351 ):
1352 errors.append(
1353 PreflightResult(
1354 fix=f'Autogenerated column name too long for field "{column_name}". '
1355 f'Maximum length is "{allowed_len}" for the database. '
1356 "Set the column name manually using 'db_column'.",
1357 obj=cls,
1358 id="models.autogenerated_column_name_too_long",
1359 )
1360 )
1361
1362 for f in cls._model_meta.local_many_to_many:
1363 # Skip nonexistent models.
1364 if isinstance(f.remote_field.through, str):
1365 continue
1366
1367 # Check if auto-generated name for the M2M field is too long
1368 # for the database.
1369 for m2m in f.remote_field.through._model_meta.local_fields:
1370 _, rel_name = m2m.get_attname_column()
1371 if (
1372 m2m.db_column is None
1373 and rel_name is not None
1374 and len(rel_name) > allowed_len
1375 ):
1376 errors.append(
1377 PreflightResult(
1378 fix="Autogenerated column name too long for M2M field "
1379 f'"{rel_name}". Maximum length is "{allowed_len}" for the database. '
1380 "Use 'through' to create a separate model for "
1381 "M2M and then set column_name using 'db_column'.",
1382 obj=cls,
1383 id="models.m2m_column_name_too_long",
1384 )
1385 )
1386
1387 return errors
1388
1389 @classmethod
1390 def _get_expr_references(cls, expr: Any) -> Iterator[tuple[str, ...]]:
1391 if isinstance(expr, Q):
1392 for child in expr.children:
1393 if isinstance(child, tuple):
1394 lookup, value = child
1395 yield tuple(lookup.split(LOOKUP_SEP))
1396 yield from cls._get_expr_references(value)
1397 else:
1398 yield from cls._get_expr_references(child)
1399 elif isinstance(expr, F):
1400 yield tuple(expr.name.split(LOOKUP_SEP))
1401 elif hasattr(expr, "get_source_expressions"):
1402 for src_expr in expr.get_source_expressions():
1403 yield from cls._get_expr_references(src_expr)
1404
1405 @classmethod
1406 def _check_constraints(cls) -> list[PreflightResult]:
1407 errors = []
1408 if not (
1409 db_connection.features.supports_table_check_constraints
1410 or "supports_table_check_constraints"
1411 in cls.model_options.required_db_features
1412 ) and any(
1413 isinstance(constraint, CheckConstraint)
1414 for constraint in cls.model_options.constraints
1415 ):
1416 errors.append(
1417 PreflightResult(
1418 fix=f"{db_connection.display_name} does not support check constraints. "
1419 "A constraint won't be created. Silence this "
1420 "warning if you don't care about it.",
1421 obj=cls,
1422 id="models.constraint_on_non_db_field",
1423 warning=True,
1424 )
1425 )
1426
1427 if not (
1428 db_connection.features.supports_partial_indexes
1429 or "supports_partial_indexes" in cls.model_options.required_db_features
1430 ) and any(
1431 isinstance(constraint, UniqueConstraint)
1432 and constraint.condition is not None
1433 for constraint in cls.model_options.constraints
1434 ):
1435 errors.append(
1436 PreflightResult(
1437 fix=f"{db_connection.display_name} does not support unique constraints with "
1438 "conditions. A constraint won't be created. Silence this "
1439 "warning if you don't care about it.",
1440 obj=cls,
1441 id="models.constraint_on_virtual_field",
1442 warning=True,
1443 )
1444 )
1445
1446 if not (
1447 db_connection.features.supports_deferrable_unique_constraints
1448 or "supports_deferrable_unique_constraints"
1449 in cls.model_options.required_db_features
1450 ) and any(
1451 isinstance(constraint, UniqueConstraint)
1452 and constraint.deferrable is not None
1453 for constraint in cls.model_options.constraints
1454 ):
1455 errors.append(
1456 PreflightResult(
1457 fix=f"{db_connection.display_name} does not support deferrable unique constraints. "
1458 "A constraint won't be created. Silence this "
1459 "warning if you don't care about it.",
1460 obj=cls,
1461 id="models.constraint_on_foreign_key",
1462 warning=True,
1463 )
1464 )
1465
1466 if not (
1467 db_connection.features.supports_covering_indexes
1468 or "supports_covering_indexes" in cls.model_options.required_db_features
1469 ) and any(
1470 isinstance(constraint, UniqueConstraint) and constraint.include
1471 for constraint in cls.model_options.constraints
1472 ):
1473 errors.append(
1474 PreflightResult(
1475 fix=f"{db_connection.display_name} does not support unique constraints with non-key "
1476 "columns. A constraint won't be created. Silence this "
1477 "warning if you don't care about it.",
1478 obj=cls,
1479 id="models.constraint_on_m2m_field",
1480 warning=True,
1481 )
1482 )
1483
1484 if not (
1485 db_connection.features.supports_expression_indexes
1486 or "supports_expression_indexes" in cls.model_options.required_db_features
1487 ) and any(
1488 isinstance(constraint, UniqueConstraint) and constraint.contains_expressions
1489 for constraint in cls.model_options.constraints
1490 ):
1491 errors.append(
1492 PreflightResult(
1493 fix=f"{db_connection.display_name} does not support unique constraints on "
1494 "expressions. A constraint won't be created. Silence this "
1495 "warning if you don't care about it.",
1496 obj=cls,
1497 id="models.constraint_on_self_referencing_fk",
1498 warning=True,
1499 )
1500 )
1501 fields = set(
1502 chain.from_iterable(
1503 (*constraint.fields, *constraint.include)
1504 for constraint in cls.model_options.constraints
1505 if isinstance(constraint, UniqueConstraint)
1506 )
1507 )
1508 references = set()
1509 for constraint in cls.model_options.constraints:
1510 if isinstance(constraint, UniqueConstraint):
1511 if (
1512 db_connection.features.supports_partial_indexes
1513 or "supports_partial_indexes"
1514 not in cls.model_options.required_db_features
1515 ) and isinstance(constraint.condition, Q):
1516 references.update(cls._get_expr_references(constraint.condition))
1517 if (
1518 db_connection.features.supports_expression_indexes
1519 or "supports_expression_indexes"
1520 not in cls.model_options.required_db_features
1521 ) and constraint.contains_expressions:
1522 for expression in constraint.expressions:
1523 references.update(cls._get_expr_references(expression))
1524 elif isinstance(constraint, CheckConstraint):
1525 if (
1526 db_connection.features.supports_table_check_constraints
1527 or "supports_table_check_constraints"
1528 not in cls.model_options.required_db_features
1529 ):
1530 if isinstance(constraint.check, Q):
1531 references.update(cls._get_expr_references(constraint.check))
1532 if any(
1533 isinstance(expr, RawSQL) for expr in constraint.check.flatten()
1534 ):
1535 errors.append(
1536 PreflightResult(
1537 fix=f"Check constraint {constraint.name!r} contains "
1538 f"RawSQL() expression and won't be validated "
1539 f"during the model full_clean(). "
1540 "Silence this warning if you don't care about it.",
1541 warning=True,
1542 obj=cls,
1543 id="models.constraint_name_collision_autogenerated",
1544 ),
1545 )
1546 for field_name, *lookups in references:
1547 fields.add(field_name)
1548 if not lookups:
1549 # If it has no lookups it cannot result in a JOIN.
1550 continue
1551 try:
1552 field = cls._model_meta.get_field(field_name)
1553 if not field.is_relation or field.many_to_many or field.one_to_many:
1554 continue
1555 except FieldDoesNotExist:
1556 continue
1557 # JOIN must happen at the first lookup.
1558 first_lookup = lookups[0]
1559 if (
1560 hasattr(field, "get_transform")
1561 and hasattr(field, "get_lookup")
1562 and field.get_transform(first_lookup) is None
1563 and field.get_lookup(first_lookup) is None
1564 ):
1565 errors.append(
1566 PreflightResult(
1567 fix=f"'constraints' refers to the joined field '{LOOKUP_SEP.join([field_name] + lookups)}'.",
1568 obj=cls,
1569 id="models.constraint_refers_to_joined_field",
1570 )
1571 )
1572 errors.extend(cls._check_local_fields(fields, "constraints"))
1573 return errors
1574
1575
1576########
1577# MISC #
1578########
1579
1580
1581def model_unpickle(model_id: tuple[str, str] | type[Model]) -> Model:
1582 """Used to unpickle Model subclasses with deferred fields."""
1583 if isinstance(model_id, tuple):
1584 model = models_registry.get_model(*model_id)
1585 else:
1586 # Backwards compat - the model was cached directly in earlier versions.
1587 model = model_id
1588 return model.__new__(model)
1589
1590
1591model_unpickle.__safe_for_unpickle__ = True # type: ignore[attr-defined]